From 4b22a778043a0e71b5fe0f7f8693c3878d2bf7ca Mon Sep 17 00:00:00 2001 From: Jonas Vautherin Date: Wed, 4 Feb 2026 23:21:46 +0100 Subject: [PATCH 1/2] Do not share the infinite streams anymore, now supported in MAVSDK-C++ --- sdk/templates/stream.j2 | 102 ++++++++++++++++++---------------------- 1 file changed, 45 insertions(+), 57 deletions(-) diff --git a/sdk/templates/stream.j2 b/sdk/templates/stream.j2 index 6f76dcd..f1b7e3f 100644 --- a/sdk/templates/stream.j2 +++ b/sdk/templates/stream.j2 @@ -1,64 +1,52 @@ {% if not is_finite %} // Infinite stream -private final FlowableProcessor<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name.upper_camel_case }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name.upper_camel_case }}{% endif %}> {{ name.lower_camel_case }}Processor = PublishProcessor.create(); -private final Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name.upper_camel_case }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name.upper_camel_case }}{% endif %}> {{ name.lower_camel_case }} = {{ name.lower_camel_case }}Processor.onBackpressureBuffer().share();; -private volatile boolean is{{ name.upper_camel_case }}Initialized = false; - -private void process{{ name.upper_camel_case }}({% for param in params %}@NonNull {% if param.type_info.is_primitive %}{{ param.type_info.name }}{% else %}{{ param.type_info.name.upper_camel_case }}{% endif %} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) { - {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request request = {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request.newBuilder() - {%- for param in params %} - {%- if param.type_info.is_primitive %} - .set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}) - {%- elif param.type_info.is_repeated %} - .addAll{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.stream().map(elem -> elem.rpc{{ param.type_info.inner_name.upper_camel_case }}())::iterator) - {%- else %} - .set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.rpc{{ param.type_info.name.upper_camel_case }}()) - {%- endif %} - {%- endfor %} - .build(); - - stub.subscribe{{ name.upper_camel_case }}(request, new StreamObserver<{{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response>() { - - @Override - public void onNext({{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response value) { - {%- if return_type.is_repeated %} - {%- if return_type.is_primitive %} - {{ name.lower_camel_case }}Processor.onNext(value.get{{ return_name.upper_camel_case }}List()); - {%- else %} - {{ name.lower_camel_case }}Processor.onNext(value.get{{ return_name.upper_camel_case }}List().stream().map({{ return_type.inner_name.upper_camel_case }}::translateFromRpc).collect(Collectors.toList())); - {%- endif %} - {%- else %} - {%- if return_type.is_primitive %} - {{ name.lower_camel_case }}Processor.onNext(value.get{{ return_name.upper_camel_case }}()); - {%- else %} - {{ name.lower_camel_case }}Processor.onNext({{ return_type.name.upper_camel_case }}.translateFromRpc(value.get{{ return_name.upper_camel_case }}())); - {%- endif %} - {%- endif %} - } - - @Override - public void onError(Throwable t) { - {{ name.lower_camel_case }}Processor.onError(t); - } - - @Override - public void onCompleted() { - {{ name.lower_camel_case }}Processor.onComplete(); - } - }); -} - @CheckReturnValue public Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name.upper_camel_case }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name.upper_camel_case }}{% endif %}> get{{ name.upper_camel_case }}({% for param in params %}@NonNull {% if param.type_info.is_primitive %}{{ param.type_info.name }}{% else %}{{ param.type_info.name.upper_camel_case }}{% endif %} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) { - if (!is{{ name.upper_camel_case }}Initialized) { - synchronized (this) { - if (!is{{ name.upper_camel_case }}Initialized) { - MavsdkEventQueue.executor().execute(() -> process{{ name.upper_camel_case }}({% for param in params %}{{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %})); - is{{ name.upper_camel_case }}Initialized = true; - } - } - } - return {{ name.lower_camel_case }}; + return Flowable.create(emitter -> { + MavsdkEventQueue.executor().execute(() -> { + {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request request = {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request.newBuilder() + {%- for param in params %} + {%- if param.type_info.is_primitive %} + .set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}) + {%- elif param.type_info.is_repeated %} + .addAll{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.stream().map(elem -> elem.rpc{{ param.type_info.inner_name.upper_camel_case }}())::iterator) + {%- else %} + .set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.rpc{{ param.type_info.name.upper_camel_case }}()) + {%- endif %} + {%- endfor %} + .build(); + + stub.subscribe{{ name.upper_camel_case }}(request, new StreamObserver<{{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response>() { + + @Override + public void onNext({{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response value) { + {%- if return_type.is_repeated %} + {%- if return_type.is_primitive %} + emitter.onNext(value.get{{ return_name.upper_camel_case }}List()); + {%- else %} + emitter.onNext(value.get{{ return_name.upper_camel_case }}List().stream().map({{ return_type.inner_name.upper_camel_case }}::translateFromRpc).collect(Collectors.toList())); + {%- endif %} + {%- else %} + {%- if return_type.is_primitive %} + emitter.onNext(value.get{{ return_name.upper_camel_case }}()); + {%- else %} + emitter.onNext({{ return_type.name.upper_camel_case }}.translateFromRpc(value.get{{ return_name.upper_camel_case }}())); + {%- endif %} + {%- endif %} + } + + @Override + public void onError(Throwable t) { + emitter.onError(t); + } + + @Override + public void onCompleted() { + emitter.onComplete(); + } + }); + }); + }, BackpressureStrategy.LATEST); } {% else %} From 97a3e02d8b8333a4ec4affc71de08ba64e69485d Mon Sep 17 00:00:00 2001 From: Jonas Vautherin Date: Wed, 4 Feb 2026 23:31:07 +0100 Subject: [PATCH 2/2] ci: improve robustness of proto fetching --- .github/workflows/main.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/main.yml b/.github/workflows/main.yml index b4b6a90..96ef953 100644 --- a/.github/workflows/main.yml +++ b/.github/workflows/main.yml @@ -67,7 +67,7 @@ jobs: # Update our local proto submodule to match cd $GITHUB_WORKSPACE/sdk/proto - git fetch origin + git fetch origin $proto_commit_hash echo "About to checkout $proto_commit_hash" git checkout $proto_commit_hash