Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,18 @@ Each cache can be:
- **In-Memory Cache** (`type: "in-memory"`): Simple in-memory key-value storage.
Use only when sharing the stream state across different service
instances is not necessary. However, when it is possible use a sharable stream state.
- **Farm Data Cache** (`type: "farmdata"`): Cache backed by Farm Data Service.
- `url`: Farm Data Service endpoint URL
- `head`: Head collection for aggregation
- `http2`: set http2_prior_knowledge to true when connecting to an http2 enabled endpoint (default: false)
- **Http Rest** (`type: "http-rest"`): Cache backed by Farm Data Service.
- `url`: Base URL endpoint
- `http2`: set http2_prior_knowledge to true when connecting to an http2 enabled endpoint (default: false)
- `get`: GET request configuration
- `path`: path to perform GET requests (dynamic parameters can be used by wrapping them in `{}` brackets)
- `headers`: optional headers to include in GET requests as an array of objects with `key` and `value` properties
- `query`: optional query parameters to include in GET requests as an array of objects with `key` and `value` properties
- `method`: HTTP method to use for GET requests

For more details on how to interact with the caches, please read the
[dedicated section](/products/fast_data/fast_data_engine_v2/stream_processor/30_Usage.md#cache-access-) in the Usage page.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -551,10 +551,35 @@ In the table below are reported the available cache types with their supported o
| Cache Type | Get | Set | Update | Delete |
|------------|:---:|:---:|:------:|:------:|
| `mongodb` | ✔️ | ✔️ | ✔️ | ✔️ |
| `farmdata` | ✔️ | ❌ | ❌ | ❌ |
| `http-rest`| ✔️ | ❌ | ❌ | ❌ |

It is up to each cache implementation to provide their underlying logic. When
no logic is defined for one of the methods, calling it returns an `Unimplemented` error.

####### FarmData Cache

It is meant to be used in conjunction with fast data `Farm Data` service. In case output
messages from aggregation are too large to fit in a single Kafka message, they can be stored
in `Farm Data` cache. `Farm Data` will send a message with empty `after` and `before` fields
and the `Kafka` key can be used in the `farmdata` cache in the `Stream Processor` to retrieve
the full content via HTTP REST API.

####### HTTP-REST Cache

Performs a generic HTTP REST call to a user-defined endpoint to retrieve data.
The key argument for the get must be an object with the following shape for the cache GET operation:

```typescript
interface HttpRestCacheGetKey {
params?: {
query?: Array<{ key: string; value: string }>;
path?: { [key: string]: string }; // path parameters
}
headers?: Array<{ key: string; value: string }>;
}
```

#### Dead Letter Queue (DLQ) Error Handling

The Stream Processor provides robust error handling capabilities through Dead Letter Queue (DLQ) configuration. When processing errors occur (such as thrown exceptions, timeouts, memory exhaustion, or built-in function failures like `JSON.parse()` or `new URL()`), messages can be automatically sent to a DLQ topic instead of causing the entire processing engine to fail.
Expand Down Expand Up @@ -658,3 +683,49 @@ only the following objects are available within its `globalThis` context:
> `BigInt64Array`, `BigUint64Array`, `Float16Array`, `Float32Array`, `Float64Array`,
> `DataView`, `Atomics`, `Promise`, `BigInt`, `WeakRef`, `FinalizationRegistry`,
> `performance`, `console`, `CacheError`

## Event Routing Pattern

When a single Kafka topic contains events that need to be routed to different Single Views
based on their content or type, the recommended pattern is to deploy **multiple Stream Processor
instances**, each subscribing to the same source topic with distinct consumer groups.

Each Stream Processor filters and processes only the messages relevant to its specific Single View,
discarding all others using the filtering capabilities described in the [Filtering section](#filtering-).

```mermaid
graph LR
RawEvents[Raw Events Topic]

SP1[Stream Processor 1<br/>Customer Events]
SP2[Stream Processor 2<br/>Order Events]
SP3[Stream Processor 3<br/>Product Events]

SV1[Single View<br/>Customers]
SV2[Single View<br/>Orders]
SV3[Single View<br/>Products]

RawEvents -->|subscribe| SP1
RawEvents -->|subscribe| SP2
RawEvents -->|subscribe| SP3

SP1 -->|filtered events| SV1
SP2 -->|filtered events| SV2
SP3 -->|filtered events| SV3

style RawEvents fill:#e1f5ff
style SP1 fill:#fff4e1
style SP2 fill:#fff4e1
style SP3 fill:#fff4e1
style SV1 fill:#e8f5e9
style SV2 fill:#e8f5e9
style SV3 fill:#e8f5e9
```


When implementing the event routing pattern, consider the following recommendations:

- **Use Distinct Consumer Groups**: Ensure each Stream Processor uses a unique consumer
group ID so that all instances receive every message from the source topic;
- **Efficient Filtering**: Place filtering logic at the beginning of your processing
function to minimize unnecessary computation on irrelevant events