From 237a243795aa04860dfe0cf8d48d018c71585979 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Mon, 19 Jan 2026 19:15:51 +0800 Subject: [PATCH 1/4] use the request_async_id in cpp --- src/ffi_client.cpp | 552 +++++++++++++++++++++++++++++++-------------- src/ffi_client.h | 19 +- 2 files changed, 404 insertions(+), 167 deletions(-) diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 811cebb..4ad2cf6 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -15,6 +15,7 @@ */ #include +#include #include "build.h" #include "e2ee.pb.h" @@ -37,6 +38,14 @@ std::string bytesToString(const std::vector &b) { return std::string(reinterpret_cast(b.data()), b.size()); } +// Helper to log errors and conditionally throw in debug mode +inline void logAndThrowInDebug(const std::string &error_msg) { + std::cerr << "LiveKit SDK Error: " << error_msg << std::endl; +#ifndef NDEBUG + throw std::runtime_error(error_msg); +#endif +} + } // namespace FfiClient::~FfiClient() { @@ -150,11 +159,36 @@ void LivekitFfiCallback(const uint8_t *buf, size_t len) { FfiClient::instance().PushEvent(event); } +FfiClient::AsyncId FfiClient::generateAsyncId() { + return nextAsyncId_.fetch_add(1, std::memory_order_relaxed); +} + +bool FfiClient::cancelPendingByAsyncId(AsyncId async_id) { + std::unique_ptr to_cancel; + { + std::lock_guard guard(lock_); + for (auto it = pending_.begin(); it != pending_.end(); ++it) { + if ((*it)->async_id == async_id) { + to_cancel = std::move(*it); + pending_.erase(it); + break; + } + } + } + + if (to_cancel) { + to_cancel->cancel(); + return true; + } + return false; +} + template std::future FfiClient::registerAsync( - std::function match, + AsyncId async_id, std::function match, std::function &)> handler) { auto pending = std::make_unique>(); + pending->async_id = async_id; auto fut = pending->promise.get_future(); pending->match = std::move(match); pending->handler = std::move(handler); @@ -170,10 +204,35 @@ std::future FfiClient::connectAsync(const std::string &url, const std::string &token, const RoomOptions &options) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, + // match lambda: is this the connect event with our async_id? + [async_id](const proto::FfiEvent &event) { + return event.has_connect() && event.connect().async_id() == async_id; + }, + // handler lambda: fill the promise with RoomInfo or an exception + [](const proto::FfiEvent &event, + std::promise &pr) { + const auto &connectCb = event.connect(); + if (!connectCb.error().empty()) { + pr.set_exception( + std::make_exception_ptr(std::runtime_error(connectCb.error()))); + return; + } + + pr.set_value(connectCb); + }); + + // Build and send the request proto::FfiRequest req; auto *connect = req.mutable_connect(); connect->set_url(url); connect->set_token(token); + connect->set_request_async_id(async_id); auto *opts = connect->mutable_options(); opts->set_auto_subscribe(options.auto_subscribe); opts->set_dynacast(options.dynacast); @@ -245,50 +304,31 @@ FfiClient::connectAsync(const std::string &url, const std::string &token, } } } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_connect()) { - throw std::runtime_error("FfiResponse missing connect"); - } - const AsyncId async_id = resp.connect().async_id(); - - // Now we register an async op that completes with RoomInfo - return registerAsync( - // match lambda: is this the connect event with our async_id? - [async_id](const proto::FfiEvent &event) { - return event.has_connect() && event.connect().async_id() == async_id; - }, - // handler lambda: fill the promise with RoomInfo or an exception - [](const proto::FfiEvent &event, - std::promise &pr) { - const auto &connectCb = event.connect(); - if (!connectCb.error().empty()) { - pr.set_exception( - std::make_exception_ptr(std::runtime_error(connectCb.error()))); - return; - } + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_connect()) { + logAndThrowInDebug("FfiResponse missing connect"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } - pr.set_value(connectCb); - }); + return fut; } // Track APIs Implementation std::future> FfiClient::getTrackStatsAsync(uintptr_t track_handle) { - proto::FfiRequest req; - auto *get_stats_req = req.mutable_get_stats(); - get_stats_req->set_track_handle(track_handle); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_get_stats()) { - throw std::runtime_error("FfiResponse missing get_stats"); - } - - const AsyncId async_id = resp.get_stats().async_id(); + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - // Register pending op: - // - match: event.has_get_stats() && ids equal - // - handler: convert proto stats to C++ wrapper + fulfill promise - return registerAsync>( + // Register the async handler BEFORE sending the request + auto fut = registerAsync>( + async_id, // match [async_id](const proto::FfiEvent &event) { return event.has_get_stats() && @@ -312,6 +352,26 @@ FfiClient::getTrackStatsAsync(uintptr_t track_handle) { } pr.set_value(std::move(stats_vec)); }); + + // Build and send the request + proto::FfiRequest req; + auto *get_stats_req = req.mutable_get_stats(); + get_stats_req->set_track_handle(track_handle); + get_stats_req->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_get_stats()) { + logAndThrowInDebug("FfiResponse missing get_stats"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } // Participant APIs Implementation @@ -319,19 +379,12 @@ std::future FfiClient::publishTrackAsync(std::uint64_t local_participant_handle, std::uint64_t track_handle, const TrackPublishOptions &options) { - proto::FfiRequest req; - auto *msg = req.mutable_publish_track(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_track_handle(track_handle); - auto optionProto = toProto(options); - msg->mutable_options()->CopyFrom(optionProto); + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_publish_track()) { - throw std::runtime_error("FfiResponse missing publish_track"); - } - const AsyncId async_id = resp.publish_track().async_id(); - return registerAsync( + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, // Match: is this our PublishTrackCallback? [async_id](const proto::FfiEvent &event) { return event.has_publish_track() && @@ -358,23 +411,41 @@ FfiClient::publishTrackAsync(std::uint64_t local_participant_handle, proto::OwnedTrackPublication pub = cb.publication(); pr.set_value(std::move(pub)); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_publish_track(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_track_handle(track_handle); + msg->set_request_async_id(async_id); + auto optionProto = toProto(options); + msg->mutable_options()->CopyFrom(optionProto); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_publish_track()) { + logAndThrowInDebug("FfiResponse missing publish_track"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::unpublishTrackAsync(std::uint64_t local_participant_handle, const std::string &track_sid, bool stop_on_unpublish) { - proto::FfiRequest req; - auto *msg = req.mutable_unpublish_track(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_track_sid(track_sid); - msg->set_stop_on_unpublish(stop_on_unpublish); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_unpublish_track()) { - throw std::runtime_error("FfiResponse missing unpublish_track"); - } - const AsyncId async_id = resp.unpublish_track().async_id(); - return registerAsync( + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &event) { return event.has_unpublish_track() && event.unpublish_track().async_id() == async_id; @@ -388,6 +459,28 @@ FfiClient::unpublishTrackAsync(std::uint64_t local_participant_handle, } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_unpublish_track(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_track_sid(track_sid); + msg->set_stop_on_unpublish(stop_on_unpublish); + msg->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_unpublish_track()) { + logAndThrowInDebug("FfiResponse missing unpublish_track"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::publishDataAsync( @@ -395,23 +488,12 @@ std::future FfiClient::publishDataAsync( std::uint64_t data_len, bool reliable, const std::vector &destination_identities, const std::string &topic) { - proto::FfiRequest req; - auto *msg = req.mutable_publish_data(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_data_ptr(reinterpret_cast(data_ptr)); - msg->set_data_len(data_len); - msg->set_reliable(reliable); - msg->set_topic(topic); - for (const auto &id : destination_identities) { - msg->add_destination_identities(id); - } + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_publish_data()) { - throw std::runtime_error("FfiResponse missing publish_data"); - } - const AsyncId async_id = resp.publish_data().async_id(); - return registerAsync( + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &event) { return event.has_publish_data() && event.publish_data().async_id() == async_id; @@ -425,26 +507,45 @@ std::future FfiClient::publishDataAsync( } pr.set_value(); }); -} -std::future FfiClient::publishSipDtmfAsync( - std::uint64_t local_participant_handle, std::uint32_t code, - const std::string &digit, - const std::vector &destination_identities) { + // Build and send the request proto::FfiRequest req; - auto *msg = req.mutable_publish_sip_dtmf(); + auto *msg = req.mutable_publish_data(); msg->set_local_participant_handle(local_participant_handle); - msg->set_code(code); - msg->set_digit(digit); + msg->set_data_ptr(reinterpret_cast(data_ptr)); + msg->set_data_len(data_len); + msg->set_reliable(reliable); + msg->set_topic(topic); + msg->set_request_async_id(async_id); for (const auto &id : destination_identities) { msg->add_destination_identities(id); } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_publish_sip_dtmf()) { - throw std::runtime_error("FfiResponse missing publish_sip_dtmf"); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_publish_data()) { + logAndThrowInDebug("FfiResponse missing publish_data"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; } - const AsyncId async_id = resp.publish_sip_dtmf().async_id(); - return registerAsync( + + return fut; +} + +std::future FfiClient::publishSipDtmfAsync( + std::uint64_t local_participant_handle, std::uint32_t code, + const std::string &digit, + const std::vector &destination_identities) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &event) { return event.has_publish_sip_dtmf() && event.publish_sip_dtmf().async_id() == async_id; @@ -458,21 +559,42 @@ std::future FfiClient::publishSipDtmfAsync( } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_publish_sip_dtmf(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_code(code); + msg->set_digit(digit); + msg->set_request_async_id(async_id); + for (const auto &id : destination_identities) { + msg->add_destination_identities(id); + } + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_publish_sip_dtmf()) { + logAndThrowInDebug("FfiResponse missing publish_sip_dtmf"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::setLocalMetadataAsync(std::uint64_t local_participant_handle, const std::string &metadata) { - proto::FfiRequest req; - auto *msg = req.mutable_set_local_metadata(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_metadata(metadata); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_set_local_metadata()) { - throw std::runtime_error("FfiResponse missing set_local_metadata"); - } - const AsyncId async_id = resp.set_local_metadata().async_id(); - return registerAsync( + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &event) { return event.has_set_local_metadata() && event.set_local_metadata().async_id() == async_id; @@ -486,24 +608,38 @@ FfiClient::setLocalMetadataAsync(std::uint64_t local_participant_handle, } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_set_local_metadata(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_metadata(metadata); + msg->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_set_local_metadata()) { + logAndThrowInDebug("FfiResponse missing set_local_metadata"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::captureAudioFrameAsync(std::uint64_t source_handle, const proto::AudioFrameBufferInfo &buffer) { - proto::FfiRequest req; - auto *msg = req.mutable_capture_audio_frame(); - msg->set_source_handle(source_handle); - msg->mutable_buffer()->CopyFrom(buffer); + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_capture_audio_frame()) { - throw std::runtime_error("FfiResponse missing capture_audio_frame"); - } - - const AsyncId async_id = resp.capture_audio_frame().async_id(); - - return registerAsync( + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, // match predicate [async_id](const proto::FfiEvent &event) { return event.has_capture_audio_frame() && @@ -519,6 +655,27 @@ FfiClient::captureAudioFrameAsync(std::uint64_t source_handle, } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_capture_audio_frame(); + msg->set_source_handle(source_handle); + msg->set_request_async_id(async_id); + msg->mutable_buffer()->CopyFrom(buffer); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_capture_audio_frame()) { + logAndThrowInDebug("FfiResponse missing capture_audio_frame"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future @@ -527,21 +684,12 @@ FfiClient::performRpcAsync(std::uint64_t local_participant_handle, const std::string &method, const std::string &payload, std::optional response_timeout_ms) { - proto::FfiRequest req; - auto *msg = req.mutable_perform_rpc(); - msg->set_local_participant_handle(local_participant_handle); - msg->set_destination_identity(destination_identity); - msg->set_method(method); - msg->set_payload(payload); - if (response_timeout_ms.has_value()) { - msg->set_response_timeout_ms(*response_timeout_ms); - } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_perform_rpc()) { - throw std::runtime_error("FfiResponse missing perform_rpc"); - } - const AsyncId async_id = resp.perform_rpc().async_id(); - return registerAsync( + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, // match predicate [async_id](const proto::FfiEvent &event) { return event.has_perform_rpc() && @@ -558,6 +706,32 @@ FfiClient::performRpcAsync(std::uint64_t local_participant_handle, } pr.set_value(cb.payload()); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_perform_rpc(); + msg->set_local_participant_handle(local_participant_handle); + msg->set_destination_identity(destination_identity); + msg->set_method(method); + msg->set_payload(payload); + msg->set_request_async_id(async_id); + if (response_timeout_ms.has_value()) { + msg->set_response_timeout_ms(*response_timeout_ms); + } + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_perform_rpc()) { + logAndThrowInDebug("FfiResponse missing perform_rpc"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } std::future FfiClient::sendStreamHeaderAsync( @@ -565,22 +739,12 @@ std::future FfiClient::sendStreamHeaderAsync( const proto::DataStream::Header &header, const std::vector &destination_identities, const std::string &sender_identity) { - proto::FfiRequest req; - auto *msg = req.mutable_send_stream_header(); - msg->set_local_participant_handle(local_participant_handle); - *msg->mutable_header() = header; - msg->set_sender_identity(sender_identity); - for (const auto &id : destination_identities) { - msg->add_destination_identities(id); - } + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_send_stream_header()) { - throw std::runtime_error("FfiResponse missing send_stream_header"); - } - const AsyncId async_id = resp.send_stream_header().async_id(); - - return registerAsync( + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &e) { return e.has_send_stream_header() && e.send_stream_header().async_id() == async_id; @@ -594,28 +758,44 @@ std::future FfiClient::sendStreamHeaderAsync( } pr.set_value(); }); -} -std::future FfiClient::sendStreamChunkAsync( - std::uint64_t local_participant_handle, - const proto::DataStream::Chunk &chunk, - const std::vector &destination_identities, - const std::string &sender_identity) { + // Build and send the request proto::FfiRequest req; - auto *msg = req.mutable_send_stream_chunk(); + auto *msg = req.mutable_send_stream_header(); msg->set_local_participant_handle(local_participant_handle); - *msg->mutable_chunk() = chunk; + *msg->mutable_header() = header; msg->set_sender_identity(sender_identity); + msg->set_request_async_id(async_id); for (const auto &id : destination_identities) { msg->add_destination_identities(id); } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_send_stream_chunk()) { - throw std::runtime_error("FfiResponse missing send_stream_chunk"); + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_send_stream_header()) { + logAndThrowInDebug("FfiResponse missing send_stream_header"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; } - const AsyncId async_id = resp.send_stream_chunk().async_id(); - return registerAsync( + + return fut; +} + +std::future FfiClient::sendStreamChunkAsync( + std::uint64_t local_participant_handle, + const proto::DataStream::Chunk &chunk, + const std::vector &destination_identities, + const std::string &sender_identity) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &e) { return e.has_send_stream_chunk() && e.send_stream_chunk().async_id() == async_id; @@ -629,25 +809,43 @@ std::future FfiClient::sendStreamChunkAsync( } pr.set_value(); }); -} -std::future -FfiClient::sendStreamTrailerAsync(std::uint64_t local_participant_handle, - const proto::DataStream::Trailer &trailer, - const std::string &sender_identity) { + // Build and send the request proto::FfiRequest req; - auto *msg = req.mutable_send_stream_trailer(); + auto *msg = req.mutable_send_stream_chunk(); msg->set_local_participant_handle(local_participant_handle); - *msg->mutable_trailer() = trailer; + *msg->mutable_chunk() = chunk; msg->set_sender_identity(sender_identity); + msg->set_request_async_id(async_id); + for (const auto &id : destination_identities) { + msg->add_destination_identities(id); + } - proto::FfiResponse resp = sendRequest(req); - if (!resp.has_send_stream_trailer()) { - throw std::runtime_error("FfiResponse missing send_stream_trailer"); + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_send_stream_chunk()) { + logAndThrowInDebug("FfiResponse missing send_stream_chunk"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; } - const AsyncId async_id = resp.send_stream_trailer().async_id(); - return registerAsync( + return fut; +} + +std::future +FfiClient::sendStreamTrailerAsync(std::uint64_t local_participant_handle, + const proto::DataStream::Trailer &trailer, + const std::string &sender_identity) { + // Generate client-side async_id first + const AsyncId async_id = generateAsyncId(); + + // Register the async handler BEFORE sending the request + auto fut = registerAsync( + async_id, [async_id](const proto::FfiEvent &e) { return e.has_send_stream_trailer() && e.send_stream_trailer().async_id() == async_id; @@ -661,6 +859,28 @@ FfiClient::sendStreamTrailerAsync(std::uint64_t local_participant_handle, } pr.set_value(); }); + + // Build and send the request + proto::FfiRequest req; + auto *msg = req.mutable_send_stream_trailer(); + msg->set_local_participant_handle(local_participant_handle); + *msg->mutable_trailer() = trailer; + msg->set_sender_identity(sender_identity); + msg->set_request_async_id(async_id); + + try { + proto::FfiResponse resp = sendRequest(req); + if (!resp.has_send_stream_trailer()) { + logAndThrowInDebug("FfiResponse missing send_stream_trailer"); + cancelPendingByAsyncId(async_id); + return fut; + } + } catch (...) { + cancelPendingByAsyncId(async_id); + throw; + } + + return fut; } } // namespace livekit diff --git a/src/ffi_client.h b/src/ffi_client.h index 36cf72c..3fa9818 100644 --- a/src/ffi_client.h +++ b/src/ffi_client.h @@ -17,11 +17,13 @@ #ifndef LIVEKIT_FFI_CLIENT_H #define LIVEKIT_FFI_CLIENT_H +#include #include #include #include #include #include +#include #include #include "livekit/stats.h" @@ -147,9 +149,11 @@ class FfiClient { // Base class for type-erased pending ops struct PendingBase { + AsyncId async_id = 0; // Client-generated async ID for cancellation virtual ~PendingBase() = default; virtual bool matches(const proto::FfiEvent &event) const = 0; virtual void complete(const proto::FfiEvent &event) = 0; + virtual void cancel() = 0; // Cancel the pending operation }; template struct Pending : PendingBase { std::promise promise; @@ -163,17 +167,30 @@ class FfiClient { void complete(const proto::FfiEvent &event) override { handler(event, promise); } + + void cancel() override { + promise.set_exception(std::make_exception_ptr( + std::runtime_error("Async operation cancelled"))); + } }; template std::future registerAsync( - std::function match, + AsyncId async_id, std::function match, std::function &)> handler); + // Generate a unique client-side async ID for request correlation + AsyncId generateAsyncId(); + + // Cancel a pending async operation by its async_id. Returns true if found and + // removed. + bool cancelPendingByAsyncId(AsyncId async_id); + std::unordered_map listeners_; ListenerId nextListenerId = 1; mutable std::mutex lock_; mutable std::vector> pending_; + std::atomic nextAsyncId_{1}; void PushEvent(const proto::FfiEvent &event) const; friend void LivekitFfiCallback(const uint8_t *buf, size_t len); From a4448213b8751e65a9afc694e0b0116ee37304e8 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Mon, 19 Jan 2026 19:40:06 +0800 Subject: [PATCH 2/4] changed the code back to always throw when ffi returns invalid result --- src/ffi_client.cpp | 54 ++++++++++++---------------------------------- 1 file changed, 14 insertions(+), 40 deletions(-) diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 4ad2cf6..ec6a022 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -38,12 +38,10 @@ std::string bytesToString(const std::vector &b) { return std::string(reinterpret_cast(b.data()), b.size()); } -// Helper to log errors and conditionally throw in debug mode -inline void logAndThrowInDebug(const std::string &error_msg) { +// Helper to log errors and throw +inline void logAndThrow(const std::string &error_msg) { std::cerr << "LiveKit SDK Error: " << error_msg << std::endl; -#ifndef NDEBUG throw std::runtime_error(error_msg); -#endif } } // namespace @@ -308,9 +306,7 @@ FfiClient::connectAsync(const std::string &url, const std::string &token, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_connect()) { - logAndThrowInDebug("FfiResponse missing connect"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing connect"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -362,9 +358,7 @@ FfiClient::getTrackStatsAsync(uintptr_t track_handle) { try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_get_stats()) { - logAndThrowInDebug("FfiResponse missing get_stats"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing get_stats"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -424,9 +418,7 @@ FfiClient::publishTrackAsync(std::uint64_t local_participant_handle, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_publish_track()) { - logAndThrowInDebug("FfiResponse missing publish_track"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing publish_track"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -471,9 +463,7 @@ FfiClient::unpublishTrackAsync(std::uint64_t local_participant_handle, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_unpublish_track()) { - logAndThrowInDebug("FfiResponse missing unpublish_track"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing unpublish_track"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -524,9 +514,7 @@ std::future FfiClient::publishDataAsync( try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_publish_data()) { - logAndThrowInDebug("FfiResponse missing publish_data"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing publish_data"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -574,9 +562,7 @@ std::future FfiClient::publishSipDtmfAsync( try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_publish_sip_dtmf()) { - logAndThrowInDebug("FfiResponse missing publish_sip_dtmf"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing publish_sip_dtmf"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -619,9 +605,7 @@ FfiClient::setLocalMetadataAsync(std::uint64_t local_participant_handle, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_set_local_metadata()) { - logAndThrowInDebug("FfiResponse missing set_local_metadata"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing set_local_metadata"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -666,9 +650,7 @@ FfiClient::captureAudioFrameAsync(std::uint64_t source_handle, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_capture_audio_frame()) { - logAndThrowInDebug("FfiResponse missing capture_audio_frame"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing capture_audio_frame"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -722,9 +704,7 @@ FfiClient::performRpcAsync(std::uint64_t local_participant_handle, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_perform_rpc()) { - logAndThrowInDebug("FfiResponse missing perform_rpc"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing perform_rpc"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -773,9 +753,7 @@ std::future FfiClient::sendStreamHeaderAsync( try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_send_stream_header()) { - logAndThrowInDebug("FfiResponse missing send_stream_header"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing send_stream_header"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -824,9 +802,7 @@ std::future FfiClient::sendStreamChunkAsync( try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_send_stream_chunk()) { - logAndThrowInDebug("FfiResponse missing send_stream_chunk"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing send_stream_chunk"); } } catch (...) { cancelPendingByAsyncId(async_id); @@ -871,9 +847,7 @@ FfiClient::sendStreamTrailerAsync(std::uint64_t local_participant_handle, try { proto::FfiResponse resp = sendRequest(req); if (!resp.has_send_stream_trailer()) { - logAndThrowInDebug("FfiResponse missing send_stream_trailer"); - cancelPendingByAsyncId(async_id); - return fut; + logAndThrow("FfiResponse missing send_stream_trailer"); } } catch (...) { cancelPendingByAsyncId(async_id); From 5f1bf882b08d2444e2525a4f156bfb77f33ea1b5 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Wed, 28 Jan 2026 23:04:04 -0800 Subject: [PATCH 3/4] fix the timeout issue and a few bugs related to the SimpleRoom example --- client-sdk-rust | 2 +- examples/simple_room/main.cpp | 4 +- examples/simple_room/sdl_media_manager.cpp | 1 + examples/simple_room/sdl_video_renderer.cpp | 12 ++ examples/simple_room/sdl_video_renderer.h | 1 + include/livekit/audio_source.h | 63 +++++++-- src/ffi_client.cpp | 143 ++++++++++++++++---- src/ffi_client.h | 15 +- 8 files changed, 192 insertions(+), 49 deletions(-) diff --git a/client-sdk-rust b/client-sdk-rust index 3a3f42d..6e47737 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit 3a3f42d7403a648c40920d60f3cf6f1e4b808aea +Subproject commit 6e477379611bf9bd728be5418d7c3309b642ba9c diff --git a/examples/simple_room/main.cpp b/examples/simple_room/main.cpp index 71b0aff..ef29ac4 100644 --- a/examples/simple_room/main.cpp +++ b/examples/simple_room/main.cpp @@ -319,7 +319,7 @@ int main(int argc, char *argv[]) { << " Creation time (ms): " << info.creation_time << "\n"; // Setup Audio Source / Track - auto audioSource = std::make_shared(44100, 1, 10); + auto audioSource = std::make_shared(44100, 1, 0); auto audioTrack = LocalAudioTrack::createLocalAudioTrack("micTrack", audioSource); @@ -385,6 +385,8 @@ int main(int argc, char *argv[]) { // Shutdown the audio / video capture threads. media.stopMic(); media.stopCamera(); + media.stopSpeaker(); + media.shutdownRenderer(); // Drain any queued tasks that might still try to update the renderer / // speaker diff --git a/examples/simple_room/sdl_media_manager.cpp b/examples/simple_room/sdl_media_manager.cpp index 6cd9d22..ac6dad7 100644 --- a/examples/simple_room/sdl_media_manager.cpp +++ b/examples/simple_room/sdl_media_manager.cpp @@ -31,6 +31,7 @@ SDLMediaManager::~SDLMediaManager() { stopMic(); stopCamera(); stopSpeaker(); + shutdownRenderer(); } bool SDLMediaManager::ensureSDLInit(Uint32 flags) { diff --git a/examples/simple_room/sdl_video_renderer.cpp b/examples/simple_room/sdl_video_renderer.cpp index 1008f38..7ba3a78 100644 --- a/examples/simple_room/sdl_video_renderer.cpp +++ b/examples/simple_room/sdl_video_renderer.cpp @@ -22,6 +22,8 @@ using namespace livekit; +constexpr int kMaxFPS = 60; + SDLVideoRenderer::SDLVideoRenderer() = default; SDLVideoRenderer::~SDLVideoRenderer() { shutdown(); } @@ -95,6 +97,16 @@ void SDLVideoRenderer::render() { return; } + // Throttle rendering to kMaxFPS + const auto now = std::chrono::steady_clock::now(); + if (last_render_time_.time_since_epoch().count() != 0) { + const auto min_interval = std::chrono::microseconds(1'000'000 / kMaxFPS); + if (now - last_render_time_ < min_interval) { + return; + } + } + last_render_time_ = now; + // 3) Read a frame from VideoStream (blocking until one is available) livekit::VideoFrameEvent vfe; bool gotFrame = stream_->read(vfe); diff --git a/examples/simple_room/sdl_video_renderer.h b/examples/simple_room/sdl_video_renderer.h index 6e666ea..fb0d41e 100644 --- a/examples/simple_room/sdl_video_renderer.h +++ b/examples/simple_room/sdl_video_renderer.h @@ -49,4 +49,5 @@ class SDLVideoRenderer { std::shared_ptr stream_; int width_ = 0; int height_ = 0; + std::chrono::steady_clock::time_point last_render_time_{}; }; diff --git a/include/livekit/audio_source.h b/include/livekit/audio_source.h index 92401de..4fbc58c 100644 --- a/include/livekit/audio_source.h +++ b/include/livekit/audio_source.h @@ -41,8 +41,34 @@ class AudioSource { * @param sample_rate Sample rate in Hz. * @param num_channels Number of channels. * @param queue_size_ms Max buffer duration for the internal queue in ms. + * + * Buffering behavior: + * ------------------- + * - queue_size_ms == 0 (recommended for real-time capture): + * Disables internal buffering entirely. Audio frames are forwarded + * directly to WebRTC sinks and consumed synchronously. + * + * This mode is optimized for real-time audio capture driven by hardware + * media callbacks (e.g. microphone capture). The caller is expected to + * provide fixed-size real-time frames (typically 10 ms per call). + * + * Because the native side consumes frames immediately, this mode + * minimizes latency and jitter and is the best choice for live capture + * scenarios. + * + * - queue_size_ms > 0 (buffered / blocking mode): + * Enables an internal queue that buffers audio up to the specified + * duration. Frames are accumulated and flushed asynchronously once the buffer + * reaches its threshold. + * + * This mode is intended for non-real-time producers (e.g. TTS engines, + * file-based audio, or agents generating audio faster or slower than + * real-time). The buffering layer smooths timing and allows the audio to + * be streamed out in real time even if the producer is bursty. + * + * queue_size_ms must be a multiple of 10. */ - AudioSource(int sample_rate, int num_channels, int queue_size_ms = 1000); + AudioSource(int sample_rate, int num_channels, int queue_size_ms = 0); virtual ~AudioSource() = default; AudioSource(const AudioSource &) = delete; @@ -86,19 +112,32 @@ class AudioSource { * callback arrives (recommended for production unless the caller needs * explicit timeout control). * - * Notes: - * - This is a blocking call. - * - timeout_ms == 0 (infinite wait) is the safest mode because it - * guarantees the callback completes before the function returns, which in - * turn guarantees that the audio buffer lifetime is fully protected. The - * caller does not need to manage or extend the frame lifetime manually. + * Blocking semantics: + * The blocking behavior of this call depends on the buffering mode selected + * at construction time: + * + * - queue_size_ms == 0 (real-time capture mode): + * Frames are consumed synchronously by the native layer. The FFI callback + * is invoked immediately as part of the capture call, so this function + * returns quickly. + * + * This mode relies on the caller being paced by a real-time media + * callback (e.g. audio hardware interrupt / capture thread). It provides the + * lowest possible latency and is ideal for live microphone capture. + * + * - queue_size_ms > 0 (buffered / non-real-time mode): + * Frames are queued internally and flushed asynchronously. This function + * will block until the buffered audio corresponding to this frame has + * been consumed by the native side and the FFI callback fires. * - * - May throw std::runtime_error if: - * • the FFI reports an error + * This mode is best suited for non-real-time audio producers (such as TTS + * engines or agents) that generate audio independently of real-time + * pacing, while still streaming audio out in real time. * - * - The underlying FFI request *must* eventually produce a callback for - * each frame. If the FFI layer is misbehaving or the event loop is stalled, - * a timeout may occur in bounded-wait mode. + * Safety notes: + * May throw std::runtime_error if: + * - the FFI reports an error + * - a timeout occurs in bounded-wait mode */ void captureFrame(const AudioFrame &frame, int timeout_ms = 20); diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index ec6a022..4f2cbe7 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -44,6 +44,93 @@ inline void logAndThrow(const std::string &error_msg) { throw std::runtime_error(error_msg); } +std::optional ExtractAsyncId(const proto::FfiEvent &event) { + using E = proto::FfiEvent; + switch (event.message_case()) { + case E::kConnect: + return event.connect().async_id(); + case E::kDisconnect: + return event.disconnect().async_id(); + case E::kDispose: + return event.dispose().async_id(); + case E::kPublishTrack: + return event.publish_track().async_id(); + case E::kUnpublishTrack: + return event.unpublish_track().async_id(); + case E::kPublishData: + return event.publish_data().async_id(); + case E::kPublishTranscription: + return event.publish_transcription().async_id(); + case E::kCaptureAudioFrame: + return event.capture_audio_frame().async_id(); + case E::kSetLocalMetadata: + return event.set_local_metadata().async_id(); + case E::kSetLocalName: + return event.set_local_name().async_id(); + case E::kSetLocalAttributes: + return event.set_local_attributes().async_id(); + case E::kGetStats: + return event.get_stats().async_id(); + case E::kGetSessionStats: + return event.get_session_stats().async_id(); + case E::kPublishSipDtmf: + return event.publish_sip_dtmf().async_id(); + case E::kChatMessage: + return event.chat_message().async_id(); + case E::kPerformRpc: + return event.perform_rpc().async_id(); + + // low-level data stream callbacks + case E::kSendStreamHeader: + return event.send_stream_header().async_id(); + case E::kSendStreamChunk: + return event.send_stream_chunk().async_id(); + case E::kSendStreamTrailer: + return event.send_stream_trailer().async_id(); + + // high-level + case E::kByteStreamReaderReadAll: + return event.byte_stream_reader_read_all().async_id(); + case E::kByteStreamReaderWriteToFile: + return event.byte_stream_reader_write_to_file().async_id(); + case E::kByteStreamOpen: + return event.byte_stream_open().async_id(); + case E::kByteStreamWriterWrite: + return event.byte_stream_writer_write().async_id(); + case E::kByteStreamWriterClose: + return event.byte_stream_writer_close().async_id(); + case E::kSendFile: + return event.send_file().async_id(); + + case E::kTextStreamReaderReadAll: + return event.text_stream_reader_read_all().async_id(); + case E::kTextStreamOpen: + return event.text_stream_open().async_id(); + case E::kTextStreamWriterWrite: + return event.text_stream_writer_write().async_id(); + case E::kTextStreamWriterClose: + return event.text_stream_writer_close().async_id(); + case E::kSendText: + return event.send_text().async_id(); + case E::kSendBytes: + return event.send_bytes().async_id(); + + // NOT async completion: + case E::kRoomEvent: + case E::kTrackEvent: + case E::kVideoStreamEvent: + case E::kAudioStreamEvent: + case E::kByteStreamReaderEvent: + case E::kTextStreamReaderEvent: + case E::kRpcMethodInvocation: + case E::kLogs: + case E::kPanic: + case E::MESSAGE_NOT_SET: + default: + return std::nullopt; + } +} + } // namespace FfiClient::~FfiClient() { @@ -77,7 +164,7 @@ bool FfiClient::isInitialized() const noexcept { FfiClient::ListenerId FfiClient::AddListener(const FfiClient::Listener &listener) { std::lock_guard guard(lock_); - FfiClient::ListenerId id = nextListenerId++; + FfiClient::ListenerId id = next_listener_id++; listeners_[id] = listener; return id; } @@ -117,34 +204,33 @@ FfiClient::sendRequest(const proto::FfiRequest &request) const { } void FfiClient::PushEvent(const proto::FfiEvent &event) const { - std::vector> to_complete; + std::unique_ptr to_complete; + std::vector listeners_copy; { std::lock_guard guard(lock_); - for (auto it = pending_.begin(); it != pending_.end();) { - if ((*it)->matches(event)) { - to_complete.push_back(std::move(*it)); - it = pending_.erase(it); - } else { - ++it; + + // Complete pending future if this event is a callback with async_id + if (auto async_id = ExtractAsyncId(event)) { + auto it = pending_by_id_.find(*async_id); + if (it != pending_by_id_.end() && it->second && + it->second->matches(event)) { + to_complete = std::move(it->second); + pending_by_id_.erase(it); } } - } - - // Run handlers outside lock - for (auto &p : to_complete) { - p->complete(event); - } - // Notify listeners. Note, we copy the listeners here to avoid calling into - // the listeners under the lock, which could potentially cause deadlock. - std::vector listeners_copy; - { - std::lock_guard guard(lock_); + // Snapshot listeners listeners_copy.reserve(listeners_.size()); - for (auto &[_, listener] : listeners_) { - listeners_copy.push_back(listener); + for (const auto &kv : listeners_) { + listeners_copy.push_back(kv.second); } } + // Run handler outside lock + if (to_complete) { + to_complete->complete(event); + } + + // Notify listeners outside lock for (auto &listener : listeners_copy) { listener(event); } @@ -158,22 +244,19 @@ void LivekitFfiCallback(const uint8_t *buf, size_t len) { } FfiClient::AsyncId FfiClient::generateAsyncId() { - return nextAsyncId_.fetch_add(1, std::memory_order_relaxed); + return next_async_id_.fetch_add(1, std::memory_order_relaxed); } bool FfiClient::cancelPendingByAsyncId(AsyncId async_id) { std::unique_ptr to_cancel; { std::lock_guard guard(lock_); - for (auto it = pending_.begin(); it != pending_.end(); ++it) { - if ((*it)->async_id == async_id) { - to_cancel = std::move(*it); - pending_.erase(it); - break; - } + auto it = pending_by_id_.find(async_id); + if (it != pending_by_id_.end()) { + to_cancel = std::move(it->second); + pending_by_id_.erase(it); } } - if (to_cancel) { to_cancel->cancel(); return true; @@ -192,7 +275,7 @@ std::future FfiClient::registerAsync( pending->handler = std::move(handler); { std::lock_guard guard(lock_); - pending_.push_back(std::move(pending)); + pending_by_id_.emplace(async_id, std::move(pending)); } return fut; } diff --git a/src/ffi_client.h b/src/ffi_client.h index 3fa9818..667100e 100644 --- a/src/ffi_client.h +++ b/src/ffi_client.h @@ -169,8 +169,12 @@ class FfiClient { } void cancel() override { - promise.set_exception(std::make_exception_ptr( - std::runtime_error("Async operation cancelled"))); + try { + promise.set_exception(std::make_exception_ptr( + std::runtime_error("Async operation cancelled"))); + } catch (const std::future_error &) { + // already satisfied + } } }; @@ -187,10 +191,11 @@ class FfiClient { bool cancelPendingByAsyncId(AsyncId async_id); std::unordered_map listeners_; - ListenerId nextListenerId = 1; + std::atomic next_listener_id{1}; mutable std::mutex lock_; - mutable std::vector> pending_; - std::atomic nextAsyncId_{1}; + mutable std::unordered_map> + pending_by_id_; + std::atomic next_async_id_{1}; void PushEvent(const proto::FfiEvent &event) const; friend void LivekitFfiCallback(const uint8_t *buf, size_t len); From 9ef2b21bbe75c15d187d2f6e2fde589e92b071a1 Mon Sep 17 00:00:00 2001 From: shijing xian Date: Thu, 29 Jan 2026 11:28:17 -0800 Subject: [PATCH 4/4] update to the latest rust sdk commit with the audio thread fix --- client-sdk-rust | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client-sdk-rust b/client-sdk-rust index 6e47737..d7c19ce 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit 6e477379611bf9bd728be5418d7c3309b642ba9c +Subproject commit d7c19cedc6938231cb8849e76e584f109452a45c