Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions ADAPTERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,90 @@ This adapter uses Redis's built-in pubsub feature.
composer require predis/predis
```

## Segment

| Adapter | Publish | Consume | Library |
|---------|---------|---------|---------|
| `Nimbly\Syndicate\Adapter\Segment` | Y | N | `segmentio/analytics-php` |

Only "track", "identify", and "group" calls are supported at this time. The Message's topic should contain `track`, `identify`, or `group`.

**ALL** published messages *must* contain *either* an `anonymousId` or a `userId` message attribute.

```php
$publisher->publish(
new Message(
topic: "track",
payload: \json_encode($item),
attributes: [
"event" => "ItemAdded",
"anonymousId"=> $session->id,
]
)
);
```

For details on specific payload options, see https://segment.com/docs/connections/spec.

### Install

```bash
composer require segmentio/analytics-php
```

### Message attributes


#### Track

Track calls *must* provide the `event` name in a message attribute. The message payload contains traits about the specific event.

```php
$publisher->publish(
new Message(
topic: "track",
payload: \json_encode($order),
attributes: [
"event" => "OrderPlaced",
"userId" => $user->id,
]
)
);
```

#### Identify

The message payload contains traits about the user.

```php
$publisher->publish(
new Message(
topic: "identify",
payload: \json_encode($user),
attributes: [
"userId" => $user->id,
]
)
);
```

#### Group

Group calls *must* contain a `groupId` in a message attribute. The message payload contains traits about the group.

```php
$publisher->publish(
new Message(
topic: "group",
payload: \json_encode($group),
attributes: [
"groupId" => $group->id,
"userId" => $user->id,
]
)
);
```

## SNS

| Adapter | Publish | Consume | Library |
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ Syndicate is a powerful framework able to both publish and consume messages - id
| [RabbitMQ](/ADAPTERS.md#rabbitmq) | Y | Y | `php-amqplib/php-amqplib` |
| [Redis](/ADAPTERS.md#redis-queue) | Y | Y | `predis/predis` |
| [RedisPubsub](/ADAPTERS.md#redis-pubsub) | Y | Y* | `predis/predis` |
| [Segment](/ADAPTERS.md#segment) | Y | N | `segmentio/analytics-php` |
| [SNS](/ADAPTERS.md#sns) | Y | N | `aws/aws-sdk-php` |
| [SQS](/ADAPTERS.md#sqs) | Y | Y | `aws/aws-sdk-php` |
| [Webhook](/ADAPTERS.md#webhook) | Y | N | Any `psr/http-client` implementation |
Expand Down
3 changes: 2 additions & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@
"iron-io/iron_mq": "^4.0",
"php-mqtt/client": "^2.2",
"php-amqplib/php-amqplib": "^3.7",
"aws/aws-sdk-php": "^3.336"
"aws/aws-sdk-php": "^3.336",
"segmentio/analytics-php": "^3.8"
},
"suggest": {
"ext-pcntl": "Enables graceful shutdown of consumers.",
Expand Down
157 changes: 157 additions & 0 deletions src/Adapter/Segment.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
<?php

namespace Nimbly\Syndicate\Adapter;

use Segment\Client;
use Nimbly\Syndicate\Adapter\PublisherInterface;
use Nimbly\Syndicate\Exception\PublishException;
use Nimbly\Syndicate\Message;

/**
* Segment.io adapter supporting "track", "identify", and "group" calls.
*
* @see https://segment.com/docs/connections/spec
*/
class Segment implements PublisherInterface
{
public function __construct(
protected Client $client,
protected bool $autoflush = true,
)
{
}

/**
* @inheritDoc
*
* NOTE: The Message topic name is the Segment call to make: `track`, `identify`, and `group`.
*/
public function publish(Message $message, array $options = []): ?string
{
$result = match( $message->getTopic() ){
"track" => $this->client->track(
$this->buildTrackRequest($message)
),

"identify" => $this->client->identify(
$this->buildIdentifyRequest($message)
),

"group" => $this->client->group(
$this->buildGroupRequest($message)
),

default => throw new PublishException(
message: \sprintf(
"Unknown or unsupported Segment call %s.",
$message->getTopic()
)
)
};

if( $result === false ){
throw new PublishException(
message: "Failed to publish message."
);
}

if( $this->autoflush ){
$this->client->flush();
}

return null;
}

/**
* Build the base/common request elements for all Segment actions.
*
* @param Message $message
* @return array
*/
protected function buildCommonRequest(Message $message): array
{
$request = \array_filter([
"anonymousId" => $message->getAttributes()["anonymousId"] ?? null,
"userId" => $message->getAttributes()["userId"] ?? null,
"integrations" => $message->getAttributes()["integrations"] ?? [],
"timestamp" => $message->getAttributes()["timestamp"] ?? null,
"context" => $message->getAttributes()["context"] ?? null,
]);

if( !isset($request["anonymousId"]) && !isset($request["userId"]) ){
throw new PublishException(
message: "Segment requires an anonymous ID or a user ID. Please add either an \"anonymousId\" or \"userId\" to the message attributes."
);
}

return $request;
}

/**
* Build the request needed to make a track call.
*
* @param Message $message
* @return array
*/
protected function buildTrackRequest(Message $message): array
{
$request = \array_merge(
$this->buildCommonRequest($message),
[
"event" => $message->getAttributes()["event"] ?? null,
"properties" => \json_decode($message->getPayload(), true),
]
);

if( !isset($request["event"]) ){
throw new PublishException(
message: "Segment track call requires an event name. Please add an \"event\" attribute to the message."
);
}

return $request;
}

/**
* Build the request needed to make an Identify call.
*
* @param Message $message
* @return array
*/
protected function buildIdentifyRequest(Message $message): array
{
$request = \array_merge(
$this->buildCommonRequest($message),
[
"traits" => \json_decode($message->getPayload(), true),
]
);

return $request;
}

/**
* Build the request needed to make a Group call.
*
* @param Message $message
* @return array
*/
protected function buildGroupRequest(Message $message): array
{
if( !isset($message->getAttributes()["groupId"]) ){
throw new PublishException(
message: "Segment group call requires a groupId. Please add a \"groupId\" attribute to the message."
);
}

$request = \array_merge(
$this->buildCommonRequest($message),
[
"groupId" => $message->getAttributes()["groupId"],
"traits" => \json_decode($message->getPayload(), true),
]
);

return $request;
}
}
Loading