Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions src/audio_ring_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,18 @@ class SendspinAudioRingBuffer {
/// @param entry Pointer previously returned by receive_chunk(). May be nullptr (no-op).
void return_chunk(AudioRingBufferEntry* entry);

/// @brief Returns the number of audio chunks waiting to be received.
/// @return Count of chunks written but not yet received by the consumer.
size_t chunks_waiting() const {
return this->ring_buffer_.items_waiting();
}

/// @brief Returns true if no audio chunks are waiting to be received.
/// @return true if the consumer has received every chunk written so far.
bool is_empty() const {
return this->ring_buffer_.is_empty();
}

/// @brief Drains all items from the ring buffer.
/// @note Only safe to call when the consumer task is stopped.
void reset();
Expand Down
37 changes: 36 additions & 1 deletion src/platform/spsc_ring_buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,23 @@ class SpscRingBuffer {
return this->handle_ != nullptr;
}

/// @brief Returns the number of committed items waiting to be received
/// @return Count of items written and committed but not yet received by the consumer.
size_t items_waiting() const {
if (this->handle_ == nullptr) {
return 0;
}
UBaseType_t items = 0;
vRingbufferGetInfo(this->handle_, nullptr, nullptr, nullptr, nullptr, &items);
return static_cast<size_t>(items);
}

/// @brief Returns true if no committed items are waiting to be received
/// @return true if the ring buffer has no items pending for the consumer.
bool is_empty() const {
return this->items_waiting() == 0;
}

/// @brief Two-phase write: acquire contiguous space
/// @param size Number of bytes to acquire.
/// @param timeout_ms Milliseconds to wait if space is unavailable (UINT32_MAX = wait forever).
Expand Down Expand Up @@ -192,6 +209,7 @@ class SpscRingBuffer {
this->write_offset_ = 0;
this->read_offset_ = 0;
this->free_bytes_ = size;
this->items_waiting_ = 0;
this->created_ = true;
return true;
}
Expand All @@ -202,6 +220,20 @@ class SpscRingBuffer {
return this->created_;
}

/// @brief Returns the number of committed items waiting to be received
/// @return Count of items written and committed but not yet received by the consumer.
size_t items_waiting() const {
std::lock_guard<std::mutex> lock(this->mtx_);
return this->items_waiting_;
}

/// @brief Returns true if no committed items are waiting to be received
/// @return true if the ring buffer has no items pending for the consumer.
bool is_empty() const {
std::lock_guard<std::mutex> lock(this->mtx_);
return this->items_waiting_ == 0;
}

/// @brief Two-phase write: acquire contiguous space
/// @param size Number of bytes to acquire.
/// @param timeout_ms Milliseconds to wait if space is unavailable (UINT32_MAX = wait forever).
Expand Down Expand Up @@ -254,6 +286,7 @@ class SpscRingBuffer {
auto* header =
reinterpret_cast<ItemHeader*>(static_cast<uint8_t*>(ptr) - sizeof(ItemHeader));
header->flags = FLAG_WRITTEN;
++this->items_waiting_;
this->cv_read_.notify_all();
return true;
}
Expand Down Expand Up @@ -390,6 +423,7 @@ class SpscRingBuffer {
}
if (header->flags == FLAG_WRITTEN) {
*item_size = header->size;
--this->items_waiting_;
return this->storage_ + this->read_offset_ + sizeof(ItemHeader);
}
// ACQUIRED but not yet committed -- wait
Expand All @@ -401,13 +435,14 @@ class SpscRingBuffer {
// Struct fields
std::condition_variable cv_read_;
std::condition_variable cv_write_;
std::mutex mtx_;
mutable std::mutex mtx_;

// Pointer fields
uint8_t* storage_{nullptr};

// size_t fields
size_t free_bytes_{0};
size_t items_waiting_{0};
size_t read_offset_{0};
size_t storage_size_{0};
size_t write_offset_{0};
Expand Down
Loading