Skip to content

Conversation

@tfarago
Copy link
Contributor

@tfarago tfarago commented Oct 10, 2025

Addresses #565.

A full-blown working example

import logging
import numpy as np
import os
import zmq
import concert
concert.require("0.30.1")

from concert.devices.cameras.dummy import Camera
from concert.helpers import CommData
from concert.quantities import q
from concert.networking.base import get_tango_device
from concert.session.utils import ddoc, dstate, pdoc, code_of
from concert.experiments.base import Acquisition, Experiment, remote
from concert.experiments.addons import tango as tango_addons
from concert.storage import RemoteDirectoryWalker
from concert.ext.viewers import PyQtGraphViewer


concert.config.PROGRESS_BAR = False
LOG = logging.getLogger(__name__)
WIDTH = HEIGHT = 256
NUM_RADIOS = 30


viewer = await PyQtGraphViewer(show_refresh_rate=True)
slice_viewer = await PyQtGraphViewer(show_refresh_rate=True, title="Slices")
camera_1 = await Camera(background=np.zeros((HEIGHT, WIDTH)))
camera_2 = await Camera(background=np.ones((HEIGHT, WIDTH)) * 1000)  # Distinguish this camera from the other, mean will be higher
await camera_1.set_frame_rate(10 / q.s)
await camera_2.set_frame_rate(10 / q.s)

walker_device = get_tango_device(f'{os.uname()[1]}:1238/concert/tango/walker#dbase=no', timeout=1000 * q.s)
walker = await RemoteDirectoryWalker(
    device=walker_device,
    root='/mnt/fast3/remote-tests',
    bytes_per_file=2 ** 40,
)


class TwoCameraExperiment(Experiment):
   async def __ainit__(self, camera_1, camera_2):
      # camera_1 and camera_2 are two different remote cameras running on different machines
      dark_acq = await Acquisition("darks", self._take_darks, producer=camera_1)
      flat_acq = await Acquisition("flats", self._take_flats, producer=camera_1)
      radio_acq = await Acquisition("radios", self._take_radios, producer=camera_2)
      await super().__ainit__([dark_acq, flat_acq, radio_acq], walker=walker)

   @remote
   async def _take_darks(self):
      await self.get_acquisition('darks').producer.start_recording()
      await self.get_acquisition('darks').producer.grab_send(10)
      await self.get_acquisition('darks').producer.stop_recording()

   @remote
   async def _take_flats(self):
      await self.get_acquisition('flats').producer.start_recording()
      await self.get_acquisition('flats').producer.grab_send(20)
      await self.get_acquisition('flats').producer.stop_recording()

   @remote
   async def _take_radios(self):
      await self.get_acquisition('radios').producer.start_recording()
      await self.get_acquisition('radios').producer.grab_send(NUM_RADIOS)
      await self.get_acquisition('radios').producer.stop_recording()

ex = await TwoCameraExperiment(camera_1, camera_2)

SERVERS = {
        "writer": {
            'darks': CommData("localhost", port=8992, socket_type=zmq.PUSH),
            'flats': CommData("localhost", port=8992, socket_type=zmq.PUSH),
            'radios': CommData("localhost", port=9992, socket_type=zmq.PUSH), # use different port for camera_2
        },
        "reco": {
            'darks': CommData("localhost", port=8993, socket_type=zmq.PUSH),
            'flats': CommData("localhost", port=8993, socket_type=zmq.PUSH),
            'radios': CommData("localhost", port=9993, socket_type=zmq.PUSH), # use different port for camera_2
        },
        "live": {
            'darks': CommData("localhost", port=8995, socket_type=zmq.PUB, sndhwm=1),
            'flats': CommData("localhost", port=8995, socket_type=zmq.PUB, sndhwm=1),
            'radios': CommData("localhost", port=9995, socket_type=zmq.PUB, sndhwm=1), # use different port for camera_2
        },
        "bench": {
            'darks': CommData("localhost", port=8997, socket_type=zmq.PUSH),
            'flats': CommData("localhost", port=8997, socket_type=zmq.PUSH),
            'radios': CommData("localhost", port=9997, socket_type=zmq.PUSH), # use different port for camera_2
        }
}

# Live View
if "live" in SERVERS:
    live = await tango_addons.LiveView(viewer, SERVERS["live"], ex)

# Writer
writer = await tango_addons.ImageWriter(ex, SERVERS["writer"])

if "bench" in SERVERS:
    bench_device = get_tango_device(f'{os.uname()[1]}:1240/concert/tango/benchmarker#dbase=no', timeout=1000 * q.s)
    bench = await tango_addons.Benchmarker(ex, bench_device, SERVERS["bench"])

# Online Reco
if "reco" in SERVERS:
    reco_device = get_tango_device(f'{os.uname()[1]}:1237/concert/tango/reco#dbase=no', timeout=1000 * q.s)
    reco = await tango_addons.OnlineReconstruction(reco_device, ex,
                                                   SERVERS["reco"],
                                                   do_normalization=True,
                                                   average_normalization=True,
                                                   slice_directory="online-slices",
                                                   viewer=slice_viewer)
    await reco.set_region([0., 1., 1.])
    await reco.set_center_position_x([WIDTH / 2] * q.px)
    await reco.set_fix_nan_and_inf(True)
    await reco_device.write_attribute('center_position_z', [(await camera_1.get_roi_height()).magnitude / 2 + 0.5])
    await reco_device.write_attribute('number', NUM_RADIOS)
    await reco_device.write_attribute('overall_angle', np.pi)
    await reco_device.setup_walker(
        [
            f'{os.uname()[1]}:2238/concert/tango/walker#dbase=no',
            '/home/tomas/data/remote-tests',
            "tcp", "localhost", "8996"
        ]
    )

Tango setup

concert tango walker --port 1238  # Writer
concert tango walker --port 2238  # Online reco writer
concert tango reco --port 1237
concert tango benchmarker --port 1240

After the experiment run, darks and flats have mean 100 and radios 1000, which shows that radios use the second camera.

@codecov
Copy link

codecov bot commented Oct 10, 2025

Codecov Report

❌ Patch coverage is 26.08696% with 119 lines in your changes missing coverage. Please review.
✅ Project coverage is 83.19%. Comparing base (43dcd7b) to head (895c93d).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
concert/experiments/addons/tango.py 0.00% 99 Missing ⚠️
concert/experiments/addons/local.py 70.83% 14 Missing ⚠️
concert/experiments/base.py 54.54% 5 Missing ⚠️
concert/experiments/addons/base.py 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master     #566      +/-   ##
==========================================
- Coverage   83.52%   83.19%   -0.33%     
==========================================
  Files         133      133              
  Lines       10718    10728      +10     
==========================================
- Hits         8952     8925      -27     
- Misses       1766     1803      +37     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@tfarago
Copy link
Contributor Author

tfarago commented Oct 18, 2025

Look at https://github.com/ufo-kit/concert/blob/different-producers/concert/experiments/base.py#L169-L171. Producer should be started last to make sure the consumers are ready when producers starts. @MarcusZuber you can this already now.

@MarcusZuber
Copy link
Member

What I don't like so far is that in our current implementation the Acquisition reauires a producer which needs to have the interace of a remote camera.

@MarcusZuber
Copy link
Member

Note (mostly) to myself: I think adding and removing acquistions in runtime screws it up.

@MarcusZuber
Copy link
Member

This is my current diff to make it working:

diff --git a/concert/experiments/addons/tango.py b/concert/experiments/addons/tango.py
index fe89689e..f490b29e 100644
--- a/concert/experiments/addons/tango.py
+++ b/concert/experiments/addons/tango.py
@@ -201,7 +201,7 @@ class ImageWriter(TangoMixin, base.ImageWriter):
 class LiveView(base.LiveView):
 
     async def __ainit__(self, viewer, endpoints, experiment, acquisitions=None):
-        self.endpoints = endpoints
+        self._endpoints = endpoints
         await base.LiveView.__ainit__(self,
                                       viewer,
                                       experiment=experiment,
@@ -216,7 +216,7 @@ class LiveView(base.LiveView):
         use_same_endpoint = type(self._endpoints) is CommData
 
         for acq in acquisitions:
-            consumers[acq] = LiveViewConsumer(self._viewer, self._endpoints if use_same_endpoint else  self.endpoints[acq.name], consume)
+            consumers[acq] = LiveViewConsumer(self._viewer, self._endpoints if use_same_endpoint else  self._endpoints[acq.name], consume)
 
         return consumers
 
diff --git a/concert/experiments/base.py b/concert/experiments/base.py
index 2f2fbdb7..ef1b39e4 100644
--- a/concert/experiments/base.py
+++ b/concert/experiments/base.py
@@ -84,8 +84,8 @@ class Acquisition(RunnableParameterizable):
         self.name = name
         self.producer = producer
         if producer_corofunc.remote:
-            if producer is None:
-                raise ValueError("producer must be specified for remote acquisitions")
+            #if producer is None:
+            #    raise ValueError("producer must be specified for remote acquisitions")
             self._connect = self._connect_remote
             self.remote = True
         else:

I will try to push this soon, but wanted to put it here one to net forget it over the holidays.
In one point I mexed up _endpoints and endpoints and I removed the requirement for a producer, since I wanted to just trigger a zmq_send. The second I will reconsider when I find some time.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants