diff --git a/ADAPTERS.md b/ADAPTERS.md index 5e688ec..a928ccb 100644 --- a/ADAPTERS.md +++ b/ADAPTERS.md @@ -294,6 +294,90 @@ This adapter uses Redis's built-in pubsub feature. composer require predis/predis ``` +## Segment + +| Adapter | Publish | Consume | Library | +|---------|---------|---------|---------| +| `Nimbly\Syndicate\Adapter\Segment` | Y | N | `segmentio/analytics-php` | + +Only "track", "identify", and "group" calls are supported at this time. The Message's topic should contain `track`, `identify`, or `group`. + +**ALL** published messages *must* contain *either* an `anonymousId` or a `userId` message attribute. + +```php +$publisher->publish( + new Message( + topic: "track", + payload: \json_encode($item), + attributes: [ + "event" => "ItemAdded", + "anonymousId"=> $session->id, + ] + ) +); +``` + +For details on specific payload options, see https://segment.com/docs/connections/spec. + +### Install + +```bash +composer require segmentio/analytics-php +``` + +### Message attributes + + +#### Track + +Track calls *must* provide the `event` name in a message attribute. The message payload contains traits about the specific event. + +```php +$publisher->publish( + new Message( + topic: "track", + payload: \json_encode($order), + attributes: [ + "event" => "OrderPlaced", + "userId" => $user->id, + ] + ) +); +``` + +#### Identify + +The message payload contains traits about the user. + +```php +$publisher->publish( + new Message( + topic: "identify", + payload: \json_encode($user), + attributes: [ + "userId" => $user->id, + ] + ) +); +``` + +#### Group + +Group calls *must* contain a `groupId` in a message attribute. The message payload contains traits about the group. + +```php +$publisher->publish( + new Message( + topic: "group", + payload: \json_encode($group), + attributes: [ + "groupId" => $group->id, + "userId" => $user->id, + ] + ) +); +``` + ## SNS | Adapter | Publish | Consume | Library | diff --git a/README.md b/README.md index e688407..df2ffae 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ Syndicate is a powerful framework able to both publish and consume messages - id | [RabbitMQ](/ADAPTERS.md#rabbitmq) | Y | Y | `php-amqplib/php-amqplib` | | [Redis](/ADAPTERS.md#redis-queue) | Y | Y | `predis/predis` | | [RedisPubsub](/ADAPTERS.md#redis-pubsub) | Y | Y* | `predis/predis` | +| [Segment](/ADAPTERS.md#segment) | Y | N | `segmentio/analytics-php` | | [SNS](/ADAPTERS.md#sns) | Y | N | `aws/aws-sdk-php` | | [SQS](/ADAPTERS.md#sqs) | Y | Y | `aws/aws-sdk-php` | | [Webhook](/ADAPTERS.md#webhook) | Y | N | Any `psr/http-client` implementation | diff --git a/composer.json b/composer.json index 9cce9f7..f497ce0 100644 --- a/composer.json +++ b/composer.json @@ -36,7 +36,8 @@ "iron-io/iron_mq": "^4.0", "php-mqtt/client": "^2.2", "php-amqplib/php-amqplib": "^3.7", - "aws/aws-sdk-php": "^3.336" + "aws/aws-sdk-php": "^3.336", + "segmentio/analytics-php": "^3.8" }, "suggest": { "ext-pcntl": "Enables graceful shutdown of consumers.", diff --git a/src/Adapter/Segment.php b/src/Adapter/Segment.php new file mode 100644 index 0000000..206c0ed --- /dev/null +++ b/src/Adapter/Segment.php @@ -0,0 +1,157 @@ +getTopic() ){ + "track" => $this->client->track( + $this->buildTrackRequest($message) + ), + + "identify" => $this->client->identify( + $this->buildIdentifyRequest($message) + ), + + "group" => $this->client->group( + $this->buildGroupRequest($message) + ), + + default => throw new PublishException( + message: \sprintf( + "Unknown or unsupported Segment call %s.", + $message->getTopic() + ) + ) + }; + + if( $result === false ){ + throw new PublishException( + message: "Failed to publish message." + ); + } + + if( $this->autoflush ){ + $this->client->flush(); + } + + return null; + } + + /** + * Build the base/common request elements for all Segment actions. + * + * @param Message $message + * @return array + */ + protected function buildCommonRequest(Message $message): array + { + $request = \array_filter([ + "anonymousId" => $message->getAttributes()["anonymousId"] ?? null, + "userId" => $message->getAttributes()["userId"] ?? null, + "integrations" => $message->getAttributes()["integrations"] ?? [], + "timestamp" => $message->getAttributes()["timestamp"] ?? null, + "context" => $message->getAttributes()["context"] ?? null, + ]); + + if( !isset($request["anonymousId"]) && !isset($request["userId"]) ){ + throw new PublishException( + message: "Segment requires an anonymous ID or a user ID. Please add either an \"anonymousId\" or \"userId\" to the message attributes." + ); + } + + return $request; + } + + /** + * Build the request needed to make a track call. + * + * @param Message $message + * @return array + */ + protected function buildTrackRequest(Message $message): array + { + $request = \array_merge( + $this->buildCommonRequest($message), + [ + "event" => $message->getAttributes()["event"] ?? null, + "properties" => \json_decode($message->getPayload(), true), + ] + ); + + if( !isset($request["event"]) ){ + throw new PublishException( + message: "Segment track call requires an event name. Please add an \"event\" attribute to the message." + ); + } + + return $request; + } + + /** + * Build the request needed to make an Identify call. + * + * @param Message $message + * @return array + */ + protected function buildIdentifyRequest(Message $message): array + { + $request = \array_merge( + $this->buildCommonRequest($message), + [ + "traits" => \json_decode($message->getPayload(), true), + ] + ); + + return $request; + } + + /** + * Build the request needed to make a Group call. + * + * @param Message $message + * @return array + */ + protected function buildGroupRequest(Message $message): array + { + if( !isset($message->getAttributes()["groupId"]) ){ + throw new PublishException( + message: "Segment group call requires a groupId. Please add a \"groupId\" attribute to the message." + ); + } + + $request = \array_merge( + $this->buildCommonRequest($message), + [ + "groupId" => $message->getAttributes()["groupId"], + "traits" => \json_decode($message->getPayload(), true), + ] + ); + + return $request; + } +} \ No newline at end of file diff --git a/tests/Adapter/SegmentTest.php b/tests/Adapter/SegmentTest.php new file mode 100644 index 0000000..172e5c1 --- /dev/null +++ b/tests/Adapter/SegmentTest.php @@ -0,0 +1,191 @@ +shouldReceive("track") + ->andReturn(false); + + $publisher = new Segment($mock); + + $this->expectException(PublishException::class); + $publisher->publish( + new Message("track", "Ok", ["event" => "Foo", "userId" => "abc123"]) + ); + } + + public function test_publish_auto_flush_disabled(): void + { + $mock = Mockery::mock(Client::class); + $mock->shouldReceive("track") + ->andReturn(true); + + $publisher = new Segment($mock, false); + + $publisher->publish( + new Message("track", "Ok", ["event" => "Foo", "userId" => "abc123"]) + ); + + $mock->shouldHaveReceived("track"); + $mock->shouldNotHaveReceived("flush"); + } + + public function test_unsupported_topic_throws_publish_exception(): void + { + $mock = Mockery::mock(Client::class); + $publisher = new Segment($mock); + + $this->expectException(PublishException::class); + $publisher->publish( + new Message("unsupported", "Ok", ["userId" => "abc123"]) + ); + } + + public function test_track_requires_event(): void + { + $mock = Mockery::mock(Client::class); + $publisher = new Segment($mock); + + $this->expectException(PublishException::class); + $publisher->publish( + new Message("track", "Ok", ["userId" => "abc123"]) + ); + } + + public function test_track(): void + { + $mock = Mockery::mock(Client::class); + $mock->shouldReceive("track") + ->andReturn(true); + $mock->shouldReceive("flush"); + + $publisher = new Segment($mock); + $publisher->publish( + new Message( + "track", + \json_encode(["status" => "Ok"]), + [ + "event" => "fruits", + "userId" => "abc123" + ] + ) + ); + + $mock->shouldHaveReceived( + "track", + [ + [ + "userId" => "abc123", + "event" => "fruits", + "properties" => ["status" => "Ok"] + ] + ] + ); + $mock->shouldHaveReceived("flush"); + } + + public function test_track_userid_or_anonymousid_required(): void + { + $mock = Mockery::mock(Client::class); + $publisher = new Segment($mock); + + $this->expectException(PublishException::class); + $publisher->publish( + new Message("track", "Ok") + ); + } + + public function test_identify(): void + { + $mock = Mockery::mock(Client::class); + $mock->shouldReceive("identify") + ->andReturn(true); + $mock->shouldReceive("flush"); + + $publisher = new Segment($mock); + $publisher->publish( + new Message( + "identify", + \json_encode(["status" => "Ok"]), + [ + "userId" => "abc123" + ] + ) + ); + + $mock->shouldHaveReceived( + "identify", + [ + [ + "userId" => "abc123", + "traits" => ["status" => "Ok"] + ] + ] + ); + $mock->shouldHaveReceived("flush"); + } + + public function test_group_requires_group_id(): void + { + $mock = Mockery::mock(Client::class); + $publisher = new Segment($mock); + + $this->expectException(PublishException::class); + $publisher->publish( + new Message("group", \json_encode(["name" => "grp"])) + ); + } + + public function test_group(): void + { + $mock = Mockery::mock(Client::class); + $mock->shouldReceive("group") + ->andReturn(true); + $mock->shouldReceive("flush"); + + $publisher = new Segment($mock); + + $publisher->publish( + new Message( + topic: "group", + payload: \json_encode(["name" => "grp"]), + attributes: [ + "groupId" => "2b44c921-6711-475e-8ae6-2188e59e5888", + "userId" => "db92915a-334c-4051-9218-88eb7b049252" + ] + ) + ); + + $mock->shouldHaveReceived( + "group", + [ + [ + "userId" => "db92915a-334c-4051-9218-88eb7b049252", + "groupId" => "2b44c921-6711-475e-8ae6-2188e59e5888", + "traits" => [ + "name" => "grp" + ] + ] + ] + ); + + $mock->shouldHaveReceived("flush"); + } +} \ No newline at end of file