Busline is an agnostic asynchronous pub/sub library.
You can choose between a pair Publisher/Subscriber or a Client, i.e. a set of publishers and subscribers.
flowchart LR
P@{ shape: rect, label: "Local Publisher" }
S@{ shape: rect, label: "Local Subscriber" }
Q@{ shape: das, label: "Local EventBus" }
P-->Q
Q-->S
flowchart LR
P@{ shape: rect, label: "MQTT Publisher" }
S@{ shape: rect, label: "MQTT Subscriber" }
Q@{ shape: das, label: "MQTT Broker" }
P-->Q
Q-->S
Client allows you to use a heterogeneous combination of pubs/subs (e.g., local + MQTT).
flowchart LR
subgraph C1["Client"]
PL@{ shape: rect, label: "Local Publisher" }
SM@{ shape: rect, label: "MQTT Subscriber" }
end
subgraph C2["Client"]
PM@{ shape: rect, label: "MQTT Publisher" }
SL@{ shape: rect, label: "Local Subscriber" }
end
QL@{ shape: das, label: "Local EventBus" }
PL --> QL
QL --> SL
QM@{ shape: das, label: "MQTT Broker" }
PM --> QM
QM --> SM
Thanks to Busline, you can choose your favorite programming pattern between callback and iterator (or both!).
Warning
All listed method names and variables in this document will use snake_case convention, but actual language-specific implementation follows the language convention (e.g. snake case for Python and camel case for Java).
Busline aims to provide an unified way to shared event-based messages across multiple and different channels, providing support for both programming style (event handlers and iterators), in order to make programmers free to choose their favorite style.
In other words, Busline is the fundamental layer between low-level communication protocals (e.g., MQTT) and large high-level applications (e.g., Orbitalis).
Publisher and Subscriber are used in messaging patterns commonly used in software architecture to enable decoupled communication between components.
In a pub/sub system, publishers send messages without knowing who will receive them, while subscribers listen for specific message types or topics, allowing for scalable and flexible event-driven designs.
An EventBus is a specific implementation of this pattern, that acts as a central hub where components can post and subscribe to events. This abstraction helps reduce direct dependencies between modules, making the system easier to maintain, extend, and test.
flowchart LR
P@{ shape: rect, label: "Publisher" }
S@{ shape: rect, label: "Subscriber" }
Q@{ shape: das, label: "EventBus" }
P-->Q
Q-->S
Busline provide an agnostic abstraction of this pattern. In particular, it provides an common and user-friendly interface to implement a Subscriber and a Publisher which can be used together with an EventBus.
Thanks to its abstractness, we can use plethora of different Publishers and Subscribers together, even if they are related with different EventBuses.
You can notice that, given the logical separation between Publisher and Subscriber, you can use only one of them. For example, if you have Edge sensor which produces data, you only need to use a Publisher.
flowchart LR
P["MQTT Publisher"] --> Q["MQTT Broker"]
S["Your Sensor"] --> P
P@{ shape: rect}
Q@{ shape: das}
S@{ shape: dbl-circ}
- Choose your context: Local, MQTT or both
- Choose what you need: Publisher, Subscriber or both (i.e., Client)
- Choose your events handling style: handler or iterator
We provide you an example in which a Client initialized with Local pub/sub is used to share messages with a standalone Local Subscriber:
flowchart LR
subgraph C["Client"]
CP["Local Publisher"]
CS["Local Subscriber"]
end
CP -- "0, 1, 2, ..." --> E["Local EventBus"]
E --> S["Local Subscriber"]
CP@{ shape: rect}
CS@{ shape: rect}
E@{ shape: das}
S@{ shape: rect}
In this section you will see how to use Busline, if you want to know more or if you need custom implementation, please see Advance Guide.
We have 2 different concepts related to events in Busline:
Message: actual information that you publishEvent: inbound envelope of messages, providing useful information (such as who and when)
Message is the class which contains data which can be published using publishers.
We must provide serialize and deserialize methods, in order to be able to publish the message.
Fortunately, Busline provides out-of-the-box a set of mixins to avoid custom serde implementations:
- Messages based on Avro
- Messages based on JSON
StringMessage,Int64Message,Int32Message,Float32Message,Float64Messageto wrap primitive data
Primitive wraps already support Avro and JSON serialization.
Event is the envelope for messages. It is what you will receive from subscribers.
Events can be sent also without a payload, for example if you want to notify only.
Generally, you must not care about its creation, because it is performed in subscribers logic.
Following information provided:
identifier: unique event identifierpublisher_identifier: identifier of publisherpayload: message datatimestamp: event generation datetime
Publisher is the abstract class which can be implemented to create publishers.
The main method is publish, used to publish a message in only one topic.
If you want to publish a message in more topics at the same time: multi_publish.
publish method takes two parameters: topic and message.
topic is a string and represent the topic in which message must be published.
message can be None if you want to send a payload-empty event, otherwise you can provide:
- Implementation of
Message strwhich is wrapped intoStringMessageautomaticallyintwhich is wrapped intoInt64Messageautomaticallyfloatwhich is wrapped intoFloat64Messageautomatically
In addiction, all publishers provide also following methods:
connect(): connect to eventbusdisconnect(): disconnect from eventbus
If you want to implement your publishers you must implement only _internal_publish,
in which you must insert logic to send messages.
There are two additional hooks: _on_publishing and _on_published, called before and after _internal_publish when publish method is called.
Other hooks:
_on_publishing: called before publish_on_published: called after publish
Subscriber is the abstract class which provides the base implementation for a subscriber.
It has some similarities with Publisher.
You can subscribe to a topic using subscribe method and unsubscribe thanks to unsubscribe.
If no topic is specified during unsubscription, subscriber unsubscribes itself from all topics.
Also multi_subscribe and multi_unsubscribe are provided.
If you want to manually notify a subscriber on a given topic (e.g., "my_topic"): notify method. We advise you to avoid this during regular uses.
In addiction, all subscribers provide also following methods:
connect(): connect to eventbusdisconnect(): disconnect from eventbus
There are two main ways to consume messages:
- Handler: set an handler (callback) which will elaborate messages of a specific topic
- Iterator: retrive
inbound_eventsandinbound_unhandled_eventsusing loops
You can also use both!
The main methods which you should implement if you want to create your custom subscriber are _internal_subscribe and _internal_unsubscribe,
in which must be inserted logic of subscription and unsubscription.
Remember notify method, this already processes every needed operations in a subscriber, therefore you should only collect remote message (from eventbus)
and call it.
Other hooks:
_on_subscribing: called before subscribe_on_subscribed: called after subscribe_on_unsubscribing: called before unsubscribe_on_unsubscribed: called after unsubscribe
flowchart LR
P@{ shape: rect, label: "Local Publisher" }
S@{ shape: rect, label: "Local Subscriber" }
Q@{ shape: das, label: "Local EventBus" }
P-->Q
Q-->S
PubSubClient is a class which wraps a list of publishers and subscribers in order to provide both methods in an all-in-one object.
PubSubClient allows you to use different kinds of publishers and subscribers!
Therefore, you can publish a message in more eventbus at the same time.
To simplify its creation, PubSubClientBuilder is provided.
For example, if you want to create a client able to share messages both in LocalEventBus and to a MQTT broker:
flowchart LR
subgraph C1["PubSubClient"]
PL@{ shape: rect, label: "LocalPublisher" }
SM@{ shape: rect, label: "MqttSubscriber" }
end
subgraph C2["PubSubClient"]
PM@{ shape: rect, label: "MqttPublisher" }
SL@{ shape: rect, label: "LocalSubscriber" }
end
QL@{ shape: das, label: "LocalEventBus" }
PL --> QL
QL --> SL
QM@{ shape: das, label: "MQTT Broker" }
PM --> QM
QM --> SM
In this example we create an E-Commerce Backend (MQTT Publisher) application which publish new messages when new orders arrive using Mqtt. Then, 3 different services are employed to process new events:
ConfirmationEmailSender: send the order's confirmation mail to userSupplierNotifier: notify product supplier to erogate new orderDataProcessor: cleans order data to train an AI model
We suppose that AI model is trained locally and for this reason we send to it data thanks to a local eventbus.
flowchart LR
subgraph BE["Backend"]
BEP["MqttPublisher"]
end
subgraph DP["DataProcessor"]
DPP["LocalPublisher"]
DPS["MqttSubscriber"]
end
subgraph AIM["AIModel"]
AIMS["LocalSubscriber"]
end
subgraph CES["ConfirmationEmailSender"]
CESS["MqttSubscriber"]
end
subgraph SN["SupplierNotifier"]
SNS["MqttSubscriber"]
end
BEP -- Order --> MB["MQTT Broker"]
MB -- NewOrderMessage --> DPS & CESS & SNS
DPP --> LE["LocalEventBus"]
LE --> AIMS
MB@{ shape: das}
LE@{ shape: das}
In order to coordinate the work, please open an issue or a pull request.
Thank you for your contributions!
Architectural designer and developer of code base. Documentation referent.
University of Modena and Reggio Emilia.
Co-Designer of main components. Documentation reviewer.
Associate Professor at University of Modena and Reggio Emilia.