Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ jobs:

# Update our local proto submodule to match
cd $GITHUB_WORKSPACE/sdk/proto
git fetch origin
git fetch origin $proto_commit_hash
echo "About to checkout $proto_commit_hash"
git checkout $proto_commit_hash

Expand Down
102 changes: 45 additions & 57 deletions sdk/templates/stream.j2
Original file line number Diff line number Diff line change
@@ -1,64 +1,52 @@
{% if not is_finite %}
// Infinite stream
private final FlowableProcessor<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name.upper_camel_case }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name.upper_camel_case }}{% endif %}> {{ name.lower_camel_case }}Processor = PublishProcessor.create();
private final Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name.upper_camel_case }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name.upper_camel_case }}{% endif %}> {{ name.lower_camel_case }} = {{ name.lower_camel_case }}Processor.onBackpressureBuffer().share();;
private volatile boolean is{{ name.upper_camel_case }}Initialized = false;

private void process{{ name.upper_camel_case }}({% for param in params %}@NonNull {% if param.type_info.is_primitive %}{{ param.type_info.name }}{% else %}{{ param.type_info.name.upper_camel_case }}{% endif %} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) {
{{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request request = {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request.newBuilder()
{%- for param in params %}
{%- if param.type_info.is_primitive %}
.set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }})
{%- elif param.type_info.is_repeated %}
.addAll{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.stream().map(elem -> elem.rpc{{ param.type_info.inner_name.upper_camel_case }}())::iterator)
{%- else %}
.set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.rpc{{ param.type_info.name.upper_camel_case }}())
{%- endif %}
{%- endfor %}
.build();

stub.subscribe{{ name.upper_camel_case }}(request, new StreamObserver<{{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response>() {

@Override
public void onNext({{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response value) {
{%- if return_type.is_repeated %}
{%- if return_type.is_primitive %}
{{ name.lower_camel_case }}Processor.onNext(value.get{{ return_name.upper_camel_case }}List());
{%- else %}
{{ name.lower_camel_case }}Processor.onNext(value.get{{ return_name.upper_camel_case }}List().stream().map({{ return_type.inner_name.upper_camel_case }}::translateFromRpc).collect(Collectors.toList()));
{%- endif %}
{%- else %}
{%- if return_type.is_primitive %}
{{ name.lower_camel_case }}Processor.onNext(value.get{{ return_name.upper_camel_case }}());
{%- else %}
{{ name.lower_camel_case }}Processor.onNext({{ return_type.name.upper_camel_case }}.translateFromRpc(value.get{{ return_name.upper_camel_case }}()));
{%- endif %}
{%- endif %}
}

@Override
public void onError(Throwable t) {
{{ name.lower_camel_case }}Processor.onError(t);
}

@Override
public void onCompleted() {
{{ name.lower_camel_case }}Processor.onComplete();
}
});
}

@CheckReturnValue
public Flowable<{% if return_type.is_primitive %}{{ return_type.name }}{% elif return_type.is_repeated %}List<{{ plugin_name.upper_camel_case }}.{{ return_type.inner_name.upper_camel_case }}>{% else %}{{ plugin_name.upper_camel_case }}.{{ return_type.name.upper_camel_case }}{% endif %}> get{{ name.upper_camel_case }}({% for param in params %}@NonNull {% if param.type_info.is_primitive %}{{ param.type_info.name }}{% else %}{{ param.type_info.name.upper_camel_case }}{% endif %} {{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}) {
if (!is{{ name.upper_camel_case }}Initialized) {
synchronized (this) {
if (!is{{ name.upper_camel_case }}Initialized) {
MavsdkEventQueue.executor().execute(() -> process{{ name.upper_camel_case }}({% for param in params %}{{ param.name.lower_camel_case }}{{ ", " if not loop.last }}{% endfor %}));
is{{ name.upper_camel_case }}Initialized = true;
}
}
}
return {{ name.lower_camel_case }};
return Flowable.create(emitter -> {
MavsdkEventQueue.executor().execute(() -> {
{{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request request = {{ plugin_name.upper_camel_case }}Proto.Subscribe{{ name.upper_camel_case }}Request.newBuilder()
{%- for param in params %}
{%- if param.type_info.is_primitive %}
.set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }})
{%- elif param.type_info.is_repeated %}
.addAll{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.stream().map(elem -> elem.rpc{{ param.type_info.inner_name.upper_camel_case }}())::iterator)
{%- else %}
.set{{ param.name.upper_camel_case }}({{ param.name.lower_camel_case }}.rpc{{ param.type_info.name.upper_camel_case }}())
{%- endif %}
{%- endfor %}
.build();

stub.subscribe{{ name.upper_camel_case }}(request, new StreamObserver<{{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response>() {

@Override
public void onNext({{ plugin_name.upper_camel_case }}Proto.{{ name.upper_camel_case }}Response value) {
{%- if return_type.is_repeated %}
{%- if return_type.is_primitive %}
emitter.onNext(value.get{{ return_name.upper_camel_case }}List());
{%- else %}
emitter.onNext(value.get{{ return_name.upper_camel_case }}List().stream().map({{ return_type.inner_name.upper_camel_case }}::translateFromRpc).collect(Collectors.toList()));
{%- endif %}
{%- else %}
{%- if return_type.is_primitive %}
emitter.onNext(value.get{{ return_name.upper_camel_case }}());
{%- else %}
emitter.onNext({{ return_type.name.upper_camel_case }}.translateFromRpc(value.get{{ return_name.upper_camel_case }}()));
{%- endif %}
{%- endif %}
}

@Override
public void onError(Throwable t) {
emitter.onError(t);
}

@Override
public void onCompleted() {
emitter.onComplete();
}
});
});
}, BackpressureStrategy.LATEST);
}

{% else %}
Expand Down