From 2833f77e2ddc33bcf57562ccb5b430af82071425 Mon Sep 17 00:00:00 2001 From: Rock Huang Date: Tue, 21 Feb 2023 09:00:03 +0000 Subject: [PATCH 1/2] Move trans stream receiver callbacks out of `mu_` This prevents deadlock against wire writer issues. --- .../binder/wire_format/wire_reader_impl.cc | 90 ++++++++++++------- .../binder/wire_format/wire_reader_impl.h | 10 ++- 2 files changed, 64 insertions(+), 36 deletions(-) diff --git a/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc b/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc index 407fee796d4d8..027e37310292b 100644 --- a/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc +++ b/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc @@ -248,40 +248,24 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction( transaction_code_t code, ReadableParcel* parcel) { bool need_to_send_ack = false; int64_t num_bytes = 0; + // Indicates which callbacks should be cancelled. It will be initialized as + // the flags the in-coming transaction carries, and when a particular + // callback is completed, the corresponding bit in cancellation_flag will be + // set to 0 so that we won't cancel it afterward. + int cancellation_flags = 0; + // The queue saves the actions needed to be done "WITHOUT" `mu_`. + // It prevents deadlock against wire writer issues. + std::queue> deferred_func_queue; absl::Status tx_process_result; + { grpc_core::MutexLock lock(&mu_); if (!connected_) { return absl::InvalidArgumentError("Transports not connected yet"); } - // Indicate which callbacks should be cancelled. It will be initialized as - // the flags the in-coming transaction carries, and when a particular - // callback is completed, the corresponding bit in cancellation_flag will be - // set to 0 so that we won't cancel it afterward. - int cancellation_flags = 0; - tx_process_result = - ProcessStreamingTransactionImpl(code, parcel, &cancellation_flags); - if (!tx_process_result.ok()) { - gpr_log(GPR_ERROR, "Failed to process streaming transaction: %s", - tx_process_result.ToString().c_str()); - // Something went wrong when receiving transaction. Cancel failed - // requests. - if (cancellation_flags & kFlagPrefix) { - gpr_log(GPR_INFO, "cancelling initial metadata"); - transport_stream_receiver_->NotifyRecvInitialMetadata( - code, tx_process_result); - } - if (cancellation_flags & kFlagMessageData) { - gpr_log(GPR_INFO, "cancelling message data"); - transport_stream_receiver_->NotifyRecvMessage(code, tx_process_result); - } - if (cancellation_flags & kFlagSuffix) { - gpr_log(GPR_INFO, "cancelling trailing metadata"); - transport_stream_receiver_->NotifyRecvTrailingMetadata( - code, tx_process_result, 0); - } - } + tx_process_result = ProcessStreamingTransactionImpl( + code, parcel, &cancellation_flags, deferred_func_queue); if ((num_incoming_bytes_ - num_acknowledged_bytes_) >= kFlowControlAckBytes) { need_to_send_ack = true; @@ -289,6 +273,33 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction( num_acknowledged_bytes_ = num_incoming_bytes_; } } + // Executes all actions in the queue. + while (!deferred_func_queue.empty()) { + const auto& func = deferred_func_queue.front(); + func(); + deferred_func_queue.pop(); + } + + if (!tx_process_result.ok()) { + gpr_log(GPR_ERROR, "Failed to process streaming transaction: %s", + tx_process_result.ToString().c_str()); + // Something went wrong when receiving transaction. Cancel failed requests. + if (cancellation_flags & kFlagPrefix) { + gpr_log(GPR_INFO, "cancelling initial metadata"); + transport_stream_receiver_->NotifyRecvInitialMetadata(code, + tx_process_result); + } + if (cancellation_flags & kFlagMessageData) { + gpr_log(GPR_INFO, "cancelling message data"); + transport_stream_receiver_->NotifyRecvMessage(code, tx_process_result); + } + if (cancellation_flags & kFlagSuffix) { + gpr_log(GPR_INFO, "cancelling trailing metadata"); + transport_stream_receiver_->NotifyRecvTrailingMetadata( + code, tx_process_result, 0); + } + } + if (need_to_send_ack) { if (!wire_writer_ready_notification_.WaitForNotificationWithTimeout( absl::Seconds(5))) { @@ -310,7 +321,8 @@ absl::Status WireReaderImpl::ProcessStreamingTransaction( } absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( - transaction_code_t code, ReadableParcel* parcel, int* cancellation_flags) { + transaction_code_t code, ReadableParcel* parcel, int* cancellation_flags, + std::queue>& deferred_func_queue) { GPR_ASSERT(cancellation_flags); num_incoming_bytes_ += parcel->GetDataSize(); gpr_log(GPR_INFO, "Total incoming bytes: %" PRId64, num_incoming_bytes_); @@ -380,8 +392,14 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( "binder.authority"); } } - transport_stream_receiver_->NotifyRecvInitialMetadata( - code, *initial_metadata_or_error); + deferred_func_queue.emplace( + [this, code, + initial_metadata_or_error = std::move(initial_metadata_or_error)]() { + // this->transport_stream_receiver_->NotifyRecvInitialMetadata(code, + // *initial_metadata_or_error); + this->transport_stream_receiver_->NotifyRecvInitialMetadata( + code, std::move(initial_metadata_or_error)); + }); *cancellation_flags &= ~kFlagPrefix; } if (flags & kFlagMessageData) { @@ -396,7 +414,9 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( if ((flags & kFlagMessageDataIsPartial) == 0) { std::string s = std::move(message_buffer_[code]); message_buffer_.erase(code); - transport_stream_receiver_->NotifyRecvMessage(code, std::move(s)); + deferred_func_queue.emplace([this, code, s = std::move(s)]() { + this->transport_stream_receiver_->NotifyRecvMessage(code, std::move(s)); + }); } *cancellation_flags &= ~kFlagMessageData; } @@ -416,8 +436,12 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( } trailing_metadata = *trailing_metadata_or_error; } - transport_stream_receiver_->NotifyRecvTrailingMetadata( - code, std::move(trailing_metadata), status); + deferred_func_queue.emplace( + [this, code, trailing_metadata = std::move(trailing_metadata), + status]() { + this->transport_stream_receiver_->NotifyRecvTrailingMetadata( + code, std::move(trailing_metadata), status); + }); *cancellation_flags &= ~kFlagSuffix; } return absl::OkStatus(); diff --git a/src/core/ext/transport/binder/wire_format/wire_reader_impl.h b/src/core/ext/transport/binder/wire_format/wire_reader_impl.h index e0ed31ef1bf03..163853c6babfb 100644 --- a/src/core/ext/transport/binder/wire_format/wire_reader_impl.h +++ b/src/core/ext/transport/binder/wire_format/wire_reader_impl.h @@ -17,7 +17,9 @@ #include +#include #include +#include #include #include "absl/container/flat_hash_map.h" @@ -99,9 +101,11 @@ class WireReaderImpl : public WireReader { private: absl::Status ProcessStreamingTransaction(transaction_code_t code, ReadableParcel* parcel); - absl::Status ProcessStreamingTransactionImpl(transaction_code_t code, - ReadableParcel* parcel, - int* cancellation_flags) + absl::Status ProcessStreamingTransactionImpl( + transaction_code_t code, ReadableParcel* parcel, int* cancellation_flags, + // The queue saves the actions needed to be done "WITHOUT" `mu_`. + // It prevents deadlock against wire writer issues. + std::queue>& deferred_func_queue) ABSL_EXCLUSIVE_LOCKS_REQUIRED(mu_); std::shared_ptr transport_stream_receiver_; From 5cd8e4e576bdb65136aa23287c2172c00bc2d22b Mon Sep 17 00:00:00 2001 From: rock1246 Date: Tue, 21 Feb 2023 10:04:14 +0000 Subject: [PATCH 2/2] Automated change: Fix sanity tests --- src/core/ext/transport/binder/wire_format/wire_reader_impl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc b/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc index 027e37310292b..574653642c71e 100644 --- a/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc +++ b/src/core/ext/transport/binder/wire_format/wire_reader_impl.cc @@ -398,7 +398,7 @@ absl::Status WireReaderImpl::ProcessStreamingTransactionImpl( // this->transport_stream_receiver_->NotifyRecvInitialMetadata(code, // *initial_metadata_or_error); this->transport_stream_receiver_->NotifyRecvInitialMetadata( - code, std::move(initial_metadata_or_error)); + code, initial_metadata_or_error); }); *cancellation_flags &= ~kFlagPrefix; }