diff --git a/Makefile b/Makefile index 6e2dc7b..3a769a2 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,7 @@ composer-install: phpcs: @rm -rf phpcs-reports @mkdir phpcs-reports + @touch phpcs-reports/.empty @docker exec ${CID} /bin/sh -c 'vendor/bin/phpcs -p --colors --report-full=./phpcs-reports/phpcs-report-full.txt --report-gitblame=./phpcs-reports/phpcs-report-gitblame.txt --report-info=./phpcs-reports/phpcs-report-info.txt --standard=phpcs.xml .' aws-config: diff --git a/demo/BakingPasta/BakingPastaDecider.php b/demo/BakingPasta/BakingPastaDecider.php new file mode 100644 index 0000000..780a5e4 --- /dev/null +++ b/demo/BakingPasta/BakingPastaDecider.php @@ -0,0 +1,34 @@ +events = $events; + } + + /** + * Process event for schedule decisions + */ + public function process() + { + $this->addDecisionTask(new CompleteWorkflowExecutionDecision()); + } +} diff --git a/demo/BakingPasta/BakingPastaWorkflow.php b/demo/BakingPasta/BakingPastaWorkflow.php index 5d3e668..604dd22 100644 --- a/demo/BakingPasta/BakingPastaWorkflow.php +++ b/demo/BakingPasta/BakingPastaWorkflow.php @@ -2,7 +2,6 @@ namespace Continuous\Demo\Swf\BakingPasta; -use Aws\Result; use Continuous\Swf\Entity\Workflow; /** @@ -11,26 +10,64 @@ */ class BakingPastaWorkflow extends Workflow { - public function process(Result $result) + const NAME = 'bakingpasta'; + const VERSION = '0.1.0'; + + protected $pasta; + protected $weight; + + public function getName() : string { - // TODO: Implement process() method. + return static::NAME; } - public function setResult() + public function getVersion() : string { - // TODO: Implement setResult() method. + return static::VERSION; } - public function extract() :array + /** + * Type of pasta + * + * @param $pasta + * @return $this + * @throws \Exception + */ + public function setPasta($pasta) + { + if ('spaghetti' !== $pasta) { + throw new \Exception('Only spaghetti pasta is supported'); + } + + $this->pasta = $pasta; + return $this; + } + + /** + * Gram of pasta + * + * @param int $weight + * @return $this + */ + public function setWeight(int $weight) { - // TODO: Implement extract() method. + $this->weight = $weight; + return $this; + } + public function extract() :array + { return [ + 'parent' => $this->parent, + 'pasta' => $this->pasta, + 'weight' => $this->weight, ]; } public function hydrate(array $data) { - // TODO: Implement hydrate() method. + $this->parent = $data['parent']; + $this->pasta = $data['pasta']; + $this->weight = $data['weight']; } } diff --git a/demo/Sauce/SauceDecider.php b/demo/Sauce/SauceDecider.php new file mode 100644 index 0000000..6497f33 --- /dev/null +++ b/demo/Sauce/SauceDecider.php @@ -0,0 +1,24 @@ +events = $events; + } + + public function process() + { + $this->addDecisionTask(new CompleteWorkflowExecutionDecision()); + } +} diff --git a/demo/Sauce/SauceWorkflow.php b/demo/Sauce/SauceWorkflow.php index 5530a1f..333bb18 100644 --- a/demo/Sauce/SauceWorkflow.php +++ b/demo/Sauce/SauceWorkflow.php @@ -2,7 +2,6 @@ namespace Continuous\Demo\Swf\Sauce; -use Aws\Result; use Continuous\Swf\Entity\Workflow; /** @@ -11,26 +10,45 @@ */ class SauceWorkflow extends Workflow { - public function process(Result $result) + const NAME = 'sauce'; + const VERSION = '0.1.0'; + + /** + * @var bool + */ + protected $onions = false; + + public function getName() : string { - // TODO: Implement process() method. + return static::NAME; } - public function setResult() + public function getVersion() : string { - // TODO: Implement setResult() method. + return static::VERSION; } - public function extract() :array + /** + * @param bool $wantOnions + * @return $this + */ + public function setOnions(bool $wantOnions) { - // TODO: Implement extract() method. + $this->onions = $wantOnions; + return $this; + } + public function extract() :array + { return [ + 'parent' => $this->parent, + 'onions' => $this->onions, ]; } public function hydrate(array $data) { - // TODO: Implement hydrate() method. + $this->parent = $data['parent']; + $this->onions = $data['onions']; } } diff --git a/demo/Spaghetti/CompileActivity.php b/demo/Spaghetti/CompileActivity.php index 3a40045..7e6c718 100644 --- a/demo/Spaghetti/CompileActivity.php +++ b/demo/Spaghetti/CompileActivity.php @@ -1,23 +1,35 @@ completed(); } } diff --git a/demo/Spaghetti/EatActivity.php b/demo/Spaghetti/EatActivity.php index 3a40045..7376361 100644 --- a/demo/Spaghetti/EatActivity.php +++ b/demo/Spaghetti/EatActivity.php @@ -1,23 +1,35 @@ completed(); } } diff --git a/demo/Spaghetti/SpaghettiDecider.php b/demo/Spaghetti/SpaghettiDecider.php new file mode 100644 index 0000000..fae642d --- /dev/null +++ b/demo/Spaghetti/SpaghettiDecider.php @@ -0,0 +1,158 @@ +events = $events; + eval(\Psy\sh()); + } + + public function process() + { + //TODO if compile activity finish, start activity eat. + + /*$this->filter( + $this->events, + [ Event::ACTIVITY_TASK_COMPLETED ], + function($event) { + $name = $event['childWorkflowExecutionCompletedEventAttributes']['workflowType']['name']; + return in_array($name, ['sauce', 'bakingpasta']); + } + );*/ + + if (false) { + $this->finish(); + return; + } + + //TODO if compile activity finish, start activity eat. + + if (false) { + $this->eat(); + return; + } + + $events = $this->filter( + $this->events, + [ Event::ACTIVITY_TASK_SCHEDULED ] + ); + + if (0 < count($events)) { + return; + } + + $names = $this->filter( + $this->events, + [ Event::CHILD_WORKFLOW_EXECUTION_COMPLETED ], + function ($event) { + $name = $event['childWorkflowExecutionCompletedEventAttributes']['workflowType']['name']; + return in_array($name, ['sauce', 'bakingpasta']); + } + ); + + if (2 === count($names)) { + $this->compile(); + return; + } + + $names = $this->filter( + $this->events, + [ Event::CHILD_WORKFLOW_EXECUTION_STARTED ] + ); + + if (0 === count($names)) { + $this->startChildren(); + return; + } + } + + public function startChildren() + { + $bakingPastaWorkflow = new BakingPastaWorkflow(); + $sauceWorkflow = new SauceWorkflow(); + + $bakingPastaWorkflow + ->setParent($this) + ->setId(Uuid::uuid4()) + ->setPasta('spaghetti') + ->setWeight(250) + ; + + $sauceWorkflow + ->setParent($this) + ->setId(Uuid::uuid4()) + ->setOnions(true) + ; + + $startChildPasta = new StartChildWorkflowExecutionDecision( + $bakingPastaWorkflow, + ['name' => 'default'] + ); + + $startChildSauce = new StartChildWorkflowExecutionDecision( + $sauceWorkflow, + ['name' => 'default'] + ); + + $this + ->addDecisionTask($startChildPasta) + ->addDecisionTask($startChildSauce) + ; + } + + public function compile() + { + $compileActivity = new CompileActivity(); + $compileActivity + ->setId(Uuid::uuid4()) + ; + + $compileTask = new ScheduleActivityTaskDecision( + $compileActivity->getId(), + ['name' => $compileActivity->getName(), 'version' => $compileActivity->getVersion()] + ); + + $this->addDecisionTask($compileTask); + } + + public function eat() + { + $eatActivity = new EatActivity(); + $eatActivity + ->setId(Uuid::uuid4()) + ; + + $eatTask = new ScheduleActivityTaskDecision( + $eatActivity->getId(), + ['name' => $eatActivity->getName(), 'version' => $eatActivity->getVersion()] + ); + + $this->addDecisionTask($eatTask); + } + + public function finish() + { + $this->addDecisionTask(new CompleteWorkflowExecutionDecision()); + } +} diff --git a/demo/Spaghetti/SpaghettiWorkflow.php b/demo/Spaghetti/SpaghettiWorkflow.php index 0dd36a8..5acd133 100644 --- a/demo/Spaghetti/SpaghettiWorkflow.php +++ b/demo/Spaghetti/SpaghettiWorkflow.php @@ -2,7 +2,6 @@ namespace Continuous\Demo\Swf\Spaghetti; -use Aws\Result; use Continuous\Swf\Entity\Workflow; /** @@ -11,26 +10,62 @@ */ class SpaghettiWorkflow extends Workflow { - public function process(Result $result) + const NAME = 'spaghetti'; + const VERSION = '0.1.0'; + + /** + * @var string + */ + protected $kwisto; + + /** + * @var string + */ + protected $client; + + public function getName() : string { - // TODO: Implement process() method. + return static::NAME; } - public function setResult() + public function getVersion() : string { - // TODO: Implement setResult() method. + return static::VERSION; } - public function extract() :array + /** + * @param string $kwisto + * @return $this + */ + public function setKwisto(string $kwisto) + { + $this->kwisto = $kwisto; + return $this; + } + + /** + * @param string $client + * @return $this + */ + public function setClient(string $client) { - // TODO: Implement extract() method. + $this->client = $client; + return $this; + } + public function extract() :array + { return [ + 'parent' => $this->parent, + 'kwisto' => $this->kwisto, + 'client' => $this->client, ]; } public function hydrate(array $data) { - // TODO: Implement hydrate() method. + $this->parent = $data['parent']; + $this->kwisto = $data['kwisto']; + $this->client = $data['client']; } } diff --git a/src/ActivityInterface.php b/src/ActivityInterface.php deleted file mode 100644 index 1ae46dd..0000000 --- a/src/ActivityInterface.php +++ /dev/null @@ -1,8 +0,0 @@ -timerId = $timerId; + } + + public function toRespondDecision() : array + { + return [ + 'decisionType' => static::DECISION_TYPE, + static::ATTRIBUTE_KEY => [ + 'timerId' => $this->timerId, + ], + ]; + } + + public function getDecisionType() : string + { + return static::DECISION_TYPE; + } + + public function getAttributeKey() : string + { + return static::ATTRIBUTE_KEY; + } +} diff --git a/src/DataTypes/Decision/CompleteWorkflowExecutionDecision.php b/src/DataTypes/Decision/CompleteWorkflowExecutionDecision.php new file mode 100644 index 0000000..ac97cf8 --- /dev/null +++ b/src/DataTypes/Decision/CompleteWorkflowExecutionDecision.php @@ -0,0 +1,55 @@ + strlen($result) )) { + throw new DecisionException('result must be string greater than 0 and less than 32768 characters'); + } + + $this->result = $result; + } + + public function toRespondDecision() : array + { + $response = [ + 'decisionType' => static::DECISION_TYPE, + ]; + + if (null !== $this->result) { + $response[static::ATTRIBUTE_KEY] = [ + 'result' => $this->result, + ]; + } + + return $response; + } + + public function getDecisionType() : string + { + return static::DECISION_TYPE; + } + + public function getAttributeKey() : string + { + return static::ATTRIBUTE_KEY; + } +} diff --git a/src/DataTypes/Decision/DecisionException.php b/src/DataTypes/Decision/DecisionException.php new file mode 100644 index 0000000..eaf3b59 --- /dev/null +++ b/src/DataTypes/Decision/DecisionException.php @@ -0,0 +1,7 @@ +taskToken = $taskToken; + return $this; + } + + /** + * @return string + */ + public function getTaskToken() : string + { + return $this->taskToken; + } + + /** + * @param DecisionInterface $decision + */ + public function addDecisionTask(DecisionInterface $decision) + { + $this->decisionTasksList[] = $decision; + return $this; + } + + /** + * @return DecisionInterface[] + */ + public function getDecisionTasksList() + { + return $this->decisionTasksList; + } + + /** + * Return true if we have decisionsTask to respond + * + * @return bool + */ + public function hasDecisions() + { + return 0 < count($this->decisionTasksList); + } + + /** + * @param array $events + * @param array $eventType + * @param $callable + */ + public function filter(array $events, array $eventType, $callable = null) + { + $result = []; + + foreach ($events as $event) { + if (false === in_array($event['eventType'], $eventType)) { + continue; + } + + if (null === $callable) { + $result[] = $event; + continue; + } + + if (true === call_user_func($callable, $event)) { + $result[] = $event; + } + } + + return $result; + } +} diff --git a/src/DataTypes/Decision/ScheduleActivityTaskDecision.php b/src/DataTypes/Decision/ScheduleActivityTaskDecision.php new file mode 100644 index 0000000..4b37726 --- /dev/null +++ b/src/DataTypes/Decision/ScheduleActivityTaskDecision.php @@ -0,0 +1,136 @@ +activityId = $activityId; + + if (2 !== count($activityType) || empty($activityType['name']) || empty($activityType['version'])) { + new DecisionException('activityType argument must be match to AWS-SWF activity type structure.'); + } + + $this->activityType = $activityType; + + if ('' !== $control) { + $this->control = $control; + } + + if (-1 !== $heartbeatTimeout && 'NONE' !== $heartbeatTimeout && 0 > (int)$heartbeatTimeout) { + new DecisionException('heartbeatTimeout must be greater than 0 or string NONE.'); + } + + $this->heartbeatTimeout = $heartbeatTimeout; + + if (-1 !== $scheduleToCloseTimeout && 'NONE' !== $scheduleToCloseTimeout && 0 > (int)$scheduleToCloseTimeout) { + new DecisionException('scheduleToCloseTimeout must be greater than 0 or string NONE.'); + } + + $this->scheduleToCloseTimeout = $scheduleToCloseTimeout; + + if (-1 !== $scheduleToStartTimeout && 'NONE' !== $scheduleToStartTimeout && 0 > (int)$scheduleToStartTimeout) { + new DecisionException('scheduleToStartTimeout must be greater than 0 or string NONE.'); + } + + $this->scheduleToStartTimeout = $scheduleToStartTimeout; + + if (-1 !== $startToCloseTimeout && 'NONE' !== $startToCloseTimeout && 0 > (int)$startToCloseTimeout) { + new DecisionException('startToCloseTimeout must be greater than 0 or string NONE.'); + } + + $this->startToCloseTimeout = $startToCloseTimeout; + + if (null !== $taskList && 1 !== count($taskList) || empty($taskList['name'])) { + new DecisionException('Tasklist argument must be match to AWS-SWF taskList structure.'); + } + + $this->taskList = $taskList; + + if (-2147483648 > $taskPriority || 2147483647 < $taskPriority) { + new DecisionException('taskPriority must be an integer between -2147483648 and 2147483647'); + } + + $this->taskPriority = $taskPriority; + } + + public function toRespondDecision() : array + { + $attributes = [ + 'activityId' => $this->activityId, + 'activityType' => $this->activityType, + ]; + + if (null !== $this->control) { + $attributes['control'] = $this->control; + } + + if (-1 !== $this->heartbeatTimeout) { + $attributes['heartbeatTimeout'] = $this->heartbeatTimeout; + } + + if (-1 !== $this->scheduleToCloseTimeout) { + $attributes['scheduleToCloseTimeout'] = $this->scheduleToCloseTimeout; + } + + if (-1 !== $this->scheduleToStartTimeout) { + $attributes['scheduleToStartTimeout'] = $this->scheduleToStartTimeout; + } + + if (-1 !== $this->startToCloseTimeout) { + $attributes['startToCloseTimeout'] = $this->startToCloseTimeout; + } + + if (null !== $this->taskList) { + $attributes['taskList'] = $this->taskList; + } + + if (0 !== $this->taskPriority) { + $attributes['taskPriority'] = $this->taskPriority; + } + + return [ + 'decisionType' => static::DECISION_TYPE, + static::ATTRIBUTE_KEY => $attributes, + ]; + } + + public function getDecisionType() : string + { + return static::DECISION_TYPE; + } + + public function getAttributeKey() : string + { + return static::ATTRIBUTE_KEY; + } +} diff --git a/src/DataTypes/Decision/StartChildWorkflowExecutionDecision.php b/src/DataTypes/Decision/StartChildWorkflowExecutionDecision.php new file mode 100644 index 0000000..5552fb3 --- /dev/null +++ b/src/DataTypes/Decision/StartChildWorkflowExecutionDecision.php @@ -0,0 +1,155 @@ +workflow = $workflow; + + if (1 !== count($taskList) || empty($taskList['name'])) { + new DecisionException('Tasklist argument must be match to AWS-SWF taskList structure.'); + } + + $this->taskList = $taskList; + + if ('' !== $childPolicy + && false == in_array($childPolicy, ['TERMINATE', 'REQUEST_CANCEL', 'ABANDON']) + ) { + new DecisionException('childPolicy must be one of TERMINATE, REQUEST_CANCEL, ABANDON.'); + } + + $this->childPolicy = $childPolicy; + + if ('' !== $control) { + $this->control = $control; + } + + if (-1 !== $executionStartToCloseTimeout && 0 > $executionStartToCloseTimeout) { + new DecisionException('executionStartToCloseTimeout must be greater than 0.'); + } + + $this->executionStartToCloseTimeout = $executionStartToCloseTimeout; + + if ('' !== $lambdaRoleArn) { + $this->lambdaRoleArn = $lambdaRoleArn; + } + + if (5 < count($tagList)) { + new DecisionException('You can specify only maximum 5 tags on tagList argument'); + } + + $this->tagList = $tagList; + + if (-2147483648 > $taskPriority || 2147483647 < $taskPriority) { + new DecisionException('taskPriority must be an integer between -2147483648 and 2147483647'); + } + + $this->taskPriority = $taskPriority; + + if (-1 !== $taskStartToCloseTimeout + && 'NONE' !== $taskStartToCloseTimeout + && 0 > (int)$taskStartToCloseTimeout + ) { + new DecisionException('taskStartToCloseTimeout must be greater than 0 or string NONE.'); + } + + $this->taskStartToCloseTimeout = $taskStartToCloseTimeout; + } + + public function toRespondDecision() : array + { + $id = $this->workflow->getId(); + + if (false === Uuid::isValid($id)) { + throw new \Exception('Workflow must have ID defined. ', get_class($this->workflow)); + } + + $attributes = [ + 'taskList' => $this->taskList, + 'workflowId' => $id, + 'workflowType' => [ + 'name' => $this->workflow->getName(), + 'version' => $this->workflow->getVersion(), + ], + 'input' => json_encode($this->workflow->extract()) + ]; + + if ('' !== $this->childPolicy) { + $attributes['childPolicy'] = $this->childPolicy; + } + + if (null !== $this->control) { + $attributes['control'] = $this->control; + } + + if (-1 !== $this->executionStartToCloseTimeout) { + $attributes['executionStartToCloseTimeout'] = $this->executionStartToCloseTimeout; + } + + if (null !== $this->lambdaRoleArn) { + $attributes['lambdaRole'] = $this->lambdaRoleArn; + } + + if (!empty($this->tagList)) { + $attributes['tagList'] = $this->tagList; + } + + if (0 !== $this->taskPriority) { + $attributes['taskPriority'] = $this->taskPriority; + } + + if (-1 !== $this->taskStartToCloseTimeout) { + $attributes['taskStartToCloseTimeout'] = $this->taskStartToCloseTimeout; + } + + return [ + 'decisionType' => static::DECISION_TYPE, + static::ATTRIBUTE_KEY => $attributes, + ]; + } + + public function getDecisionType() : string + { + return static::DECISION_TYPE; + } + + public function getAttributeKey() : string + { + return static::ATTRIBUTE_KEY; + } +} diff --git a/src/DataTypes/Event.php b/src/DataTypes/Event.php new file mode 100644 index 0000000..a9591e4 --- /dev/null +++ b/src/DataTypes/Event.php @@ -0,0 +1,61 @@ +id = $id; + return $this; + } + + /** + * @param string $taskToken + * @return $this + */ + public function setTaskToken(string $taskToken) + { + $this->taskToken = $taskToken; + return $this; + } + + /** + * @return string + */ + public function getId() : string + { + return $this->id; + } + + /** + * @return string + */ + public function getTaskToken() : string + { + return $this->taskToken; + } + + public function getStatus() : string + { + return $this->status; + } + + public function canceled() + { + $this->status = static::CANCELED; + } + + public function completed() + { + $this->status = static::COMPLETED; + } + + public function failed() + { + $this->status = static::FAILED; + } + abstract public function extract() : array; abstract public function hydrate(array $data); } diff --git a/src/Entity/ActivityInterface.php b/src/Entity/ActivityInterface.php new file mode 100644 index 0000000..1fceccd --- /dev/null +++ b/src/Entity/ActivityInterface.php @@ -0,0 +1,24 @@ +id = $id; + return $this; + } + + /** + * @return string + */ + public function getId() : string + { + return $this->id; + } + + /** + * @param Workflow $workflow + * @return $this + */ + public function setParent(Workflow $workflow) + { + $this->parent = get_class($workflow); + return $this; + } + + /** + * @return string + */ + public function getParent() : string + { + return $this->parent; + } + abstract public function extract() : array; abstract public function hydrate(array $data); } diff --git a/src/Entity/WorkflowInterface.php b/src/Entity/WorkflowInterface.php new file mode 100644 index 0000000..4626895 --- /dev/null +++ b/src/Entity/WorkflowInterface.php @@ -0,0 +1,27 @@ +swfClient->pollForDecisionTask([ 'domain' => $this->config->domain, @@ -70,22 +71,33 @@ public function pollWorkflow(string $taskList = 'default') : WorkflowInterface 'name' => $taskList, ], 'identify' => $this->config->identity, - 'maximumPageSize' => 50, - 'reverseOrder' => true, + 'reverseOrder' => false, ]); - //TODO try catch result not Aws\Result... + //TODO poll with nextPageToken if max number of events is reach - $workflowType = $result['workflowType']; - $workflow = $this->getWorkflowEntity($workflowType['name'], $workflowType['version']); + if (empty($result['events'])) { + var_dump($result); + throw new ServiceException('No events detect on decision request.'); + } + + foreach ($result['events'] as $event) { + if ('WorkflowExecutionStarted' !== $event['eventType']) { + continue; + } + + $workflowType = $event['workflowExecutionStartedEventAttributes']['workflowType']; + $decider = $this->getDeciderEntity($workflowType['name']); + $decider->setId($result['workflowExecution']['workflowId']); + $decider->setTaskToken($result['taskToken']); + $decider->hydrate(json_decode($event['workflowExecutionStartedEventAttributes']['input'], true)); - if (isset($result['input'])) { - $workflow->hydrate(json_decode($result['input'], true)); + break; } - $workflow->process($result); + $decider->setEvents(array_reverse($result['events'])); - return $workflow; + return $decider; } /** @@ -107,7 +119,7 @@ public function pollWorkflowGenerator(string $taskList = 'default') : \Generator * @param string $taskList * @return ActivityInterface */ - public function pollActivity(string $taskList = 'default') : ActivityInterface + public function pollActivity(string $taskList = 'default') : Activity { $result = $this->swfClient->pollForActivityTask([ 'domain' => $this->config->domain, @@ -116,9 +128,15 @@ public function pollActivity(string $taskList = 'default') : ActivityInterface ] ]); + //TODO if nothing to treat... + + eval(\Psy\sh()); + $activityName = $result['activityType']['name']; $activity = $this->getActivityEntity($activityName); + $activity->setId($result['activityId']); + $activity->setTaskToken($result['taskToken']); $activity->hydrate($result['input'], $activity); return $activity; @@ -137,6 +155,22 @@ public function pollActivityGenerator(string $taskList = 'default') : \Generator } } + /** + * @param string $deciderName + * @param string $version + * @return DeciderInterface + */ + protected function getDeciderEntity(string $deciderName) : DeciderInterface + { + $className = ClassFinder::findClass($this->config->namespace, $deciderName, 'Decider'); + + if (null === $className) { + throw new ServiceException('Decider entity not found for ' . $deciderName); + } + + return new $className(); + } + /** * @param string $name * @param $version @@ -146,6 +180,10 @@ protected function getWorkflowEntity(string $workflowName, string $version) : Wo { $className = ClassFinder::findClass($this->config->namespace, $workflowName, 'Workflow'); + if (null === $className) { + throw new ServiceException('Workflow entity not found for ' . $workflowName); + } + return new $className(); } @@ -157,14 +195,66 @@ protected function getActivityEntity(string $activityName) : Activity { $className = ClassFinder::findClass($this->config->namespace, $activityName, 'Activity'); + if (null === $className) { + throw new ServiceException('Activity entity not found for ' . $activityName); + } + return new $className(); } /** + * Send RespondDecisionTask to SWF API. * + * @param DeciderInterface $decider */ - public function startWorkflow(Workflow $workflowEntity) + public function respondDecisionTaskCompleted(DeciderInterface $decider) { - //$workflowEntity->extract(); + $decisions = []; + + foreach ($decider->getDecisionTasksList() as $decision) { + $decisions[] = $decision->toRespondDecision(); + } + + if (empty($decisions)) { + return; + } + + $this->swfClient->respondDecisionTaskCompleted([ + 'taskToken' => $decider->getTaskToken(), + 'decisions' => $decisions, + ]); + } + + public function respondActivityTaskCompleted(ActivityInterface $activity, string $detail = '', string $reason = '') + { + $status = $activity->getStatus(); + + if (Activity::COMPLETED === $status) { + $this->swfClient->respondActivityTaskCompleted([ + 'taskToken' => $activity->getTaskToken(), + 'result' => json_encode($activity->extract()), + ]); + + return; + } + + if (Activity::CANCELED === $status) { + $this->swfClient->respondActivityTaskCanceled([ + 'taskToken' => $activity->getTaskToken(), + 'detail' => $detail, + ]); + + return; + } + + if (Activity::FAILED === $status) { + $this->swfClient->respondActivityTaskFailed([ + 'taskToken' => $activity->getTaskToken(), + 'detail' => $detail, + 'reason' => $reason, + ]); + + return; + } } } diff --git a/src/ServiceException.php b/src/ServiceException.php new file mode 100644 index 0000000..f34c593 --- /dev/null +++ b/src/ServiceException.php @@ -0,0 +1,7 @@ +sdkAws = new \Aws\Sdk($awsConfig); + $this->swfClient = $this->sdkAws->createClient('Swf'); + + $allConfig = \Continuous\Swf\Helper\Config::getAll(); + + $this->service = new \Continuous\Swf\Service(new \Continuous\Swf\ServiceConfig( + $allConfig->swf->domain, + $allConfig->identity, + $this->swfClient, + 'Continuous\\Demo\\Swf' + )); + } + + public function decisions() + { + foreach ($this->service->pollWorkflowGenerator() as $decider) { + + $decider->process(); + + if (true === $decider->hasDecisions()) { + $this->service->respondDecisionTaskCompleted($decider); + } + } + } +} + +$scratch = new Decider(); +$scratch->decisions(); diff --git a/tests/scratch.php b/tests/scratch.php new file mode 100644 index 0000000..e501551 --- /dev/null +++ b/tests/scratch.php @@ -0,0 +1,64 @@ +sdkAws = new \Aws\Sdk($awsConfig); + $this->swfClient = $this->sdkAws->createClient('Swf'); + + $allConfig = \Continuous\Swf\Helper\Config::getAll(); + + $this->service = new \Continuous\Swf\Service(new \Continuous\Swf\ServiceConfig( + $allConfig->swf->domain, + $allConfig->identity, + $this->swfClient, + 'Continuous\\Demo\\Swf' + )); + } + + public function run() + { + $workflow = new \Continuous\Demo\Swf\Spaghetti\SpaghettiWorkflow(); + + $workflow + ->setId(\Ramsey\Uuid\Uuid::uuid4()) + ->setKwisto('Ducasse') + ->setClient('John') + ; + + $input = json_encode($workflow->extract()); + + $domain = $this->swfClient->startWorkflowExecution([ + 'domain' => $this->domainName, + 'taskList' => [ + 'name' => 'default' + ], + 'workflowId' => $workflow->getId(), + 'workflowType' => [ + 'name' => $workflow->getName(), + 'version' => $workflow->getVersion(), + ], + 'input' => $input, + ]); + } +} + +$scratch = new Scratch(); +$scratch->run(); + diff --git a/tests/worker.php b/tests/worker.php new file mode 100644 index 0000000..68ed5d4 --- /dev/null +++ b/tests/worker.php @@ -0,0 +1,50 @@ +sdkAws = new \Aws\Sdk($awsConfig); + $this->swfClient = $this->sdkAws->createClient('Swf'); + + $allConfig = \Continuous\Swf\Helper\Config::getAll(); + + $this->service = new \Continuous\Swf\Service(new \Continuous\Swf\ServiceConfig( + $allConfig->swf->domain, + $allConfig->identity, + $this->swfClient, + 'Continuous\\Demo\\Swf' + )); + } + + public function work() + { + foreach ($this->service->pollActivityGenerator() as $activity) { + + try { + $activity->process(); + } catch (\Exception $e) { + $activity->failed(); + $this->service->respondActivityTaskCompleted($activity, $e->getTraceAsString(), $e->getMessage()); + + continue; + } + + $this->service->respondActivityTaskCompleted($activity); + } + } +} + +$scratch = new Worker(); +$scratch->work();