From 8a5ebdb362858ce4305c9d079a83124fb5c67ba7 Mon Sep 17 00:00:00 2001 From: Pierre Tomasina Date: Fri, 18 Nov 2016 10:22:25 +0100 Subject: [PATCH 1/4] Add DataTypes Decision VO --- .../Decision/CancelTimerDecision.php | 46 +++++++++++++++++++ src/DataTypes/Decision/DecisionInterface.php | 45 ++++++++++++++++++ src/DataTypes/Decision/DecisionTrait.php | 23 ++++++++++ 3 files changed, 114 insertions(+) create mode 100644 src/DataTypes/Decision/CancelTimerDecision.php create mode 100644 src/DataTypes/Decision/DecisionInterface.php create mode 100644 src/DataTypes/Decision/DecisionTrait.php diff --git a/src/DataTypes/Decision/CancelTimerDecision.php b/src/DataTypes/Decision/CancelTimerDecision.php new file mode 100644 index 0000000..4c6f126 --- /dev/null +++ b/src/DataTypes/Decision/CancelTimerDecision.php @@ -0,0 +1,46 @@ +timerId = $timerId; + } + + public function toRespondDecision() : array + { + return [ + 'decisionType' => static::DECISION_TYPE, + static::ATTRIBUTE_KEY => [ + 'timerId' => $this->timerId, + ], + ]; + } + + public function getDecisionType() : string + { + return static::DECISION_TYPE; + } + + public function getAttributeKey() : string + { + return static::ATTRIBUTE_KEY; + } +} diff --git a/src/DataTypes/Decision/DecisionInterface.php b/src/DataTypes/Decision/DecisionInterface.php new file mode 100644 index 0000000..28fb5b7 --- /dev/null +++ b/src/DataTypes/Decision/DecisionInterface.php @@ -0,0 +1,45 @@ +$decisionTasksList[] = $decision; + } +} From 8ae23ef70527157079dbc31a050178d70b7c919b Mon Sep 17 00:00:00 2001 From: Pierre Tomasina Date: Mon, 21 Nov 2016 01:20:27 +0100 Subject: [PATCH 2/4] Refactorise Workflow/Decider architecture --- Makefile | 1 + demo/BakingPasta/BakingPastaDecider.php | 34 ++++ demo/BakingPasta/BakingPastaWorkflow.php | 53 +++++- demo/Sauce/SauceDecider.php | 24 +++ demo/Sauce/SauceWorkflow.php | 34 +++- demo/Spaghetti/CompileActivity.php | 18 +- demo/Spaghetti/EatActivity.php | 4 +- demo/Spaghetti/SpaghettiDecider.php | 151 +++++++++++++++++ demo/Spaghetti/SpaghettiWorkflow.php | 51 +++++- src/ActivityInterface.php | 8 - .../Decision/CancelTimerDecision.php | 4 +- .../CompleteWorkflowExecutionDecision.php | 55 +++++++ src/DataTypes/Decision/DecisionException.php | 7 + src/DataTypes/Decision/DecisionTrait.php | 77 ++++++++- .../Decision/ScheduleActivityTaskDecision.php | 136 +++++++++++++++ .../StartChildWorkflowExecutionDecision.php | 155 ++++++++++++++++++ src/DataTypes/Event.php | 61 +++++++ src/DeciderInterface.php | 13 ++ src/Entity/Activity.php | 25 ++- src/Entity/ActivityInterface.php | 7 + src/Entity/HydratorInterface.php | 13 ++ src/Entity/Workflow.php | 48 +++++- src/Entity/WorkflowInterface.php | 27 +++ src/Service.php | 79 +++++++-- src/ServiceException.php | 7 + src/WorkflowInterface.php | 19 --- tests/decider.php | 54 ++++++ tests/scratch.php | 65 ++++++++ tests/worker.php | 0 29 files changed, 1148 insertions(+), 82 deletions(-) create mode 100644 demo/BakingPasta/BakingPastaDecider.php create mode 100644 demo/Sauce/SauceDecider.php create mode 100644 demo/Spaghetti/SpaghettiDecider.php delete mode 100644 src/ActivityInterface.php create mode 100644 src/DataTypes/Decision/CompleteWorkflowExecutionDecision.php create mode 100644 src/DataTypes/Decision/DecisionException.php create mode 100644 src/DataTypes/Decision/ScheduleActivityTaskDecision.php create mode 100644 src/DataTypes/Decision/StartChildWorkflowExecutionDecision.php create mode 100644 src/DataTypes/Event.php create mode 100644 src/DeciderInterface.php create mode 100644 src/Entity/ActivityInterface.php create mode 100644 src/Entity/HydratorInterface.php create mode 100644 src/Entity/WorkflowInterface.php create mode 100644 src/ServiceException.php delete mode 100644 src/WorkflowInterface.php create mode 100644 tests/decider.php create mode 100644 tests/scratch.php create mode 100644 tests/worker.php diff --git a/Makefile b/Makefile index 6e2dc7b..3a769a2 100644 --- a/Makefile +++ b/Makefile @@ -20,6 +20,7 @@ composer-install: phpcs: @rm -rf phpcs-reports @mkdir phpcs-reports + @touch phpcs-reports/.empty @docker exec ${CID} /bin/sh -c 'vendor/bin/phpcs -p --colors --report-full=./phpcs-reports/phpcs-report-full.txt --report-gitblame=./phpcs-reports/phpcs-report-gitblame.txt --report-info=./phpcs-reports/phpcs-report-info.txt --standard=phpcs.xml .' aws-config: diff --git a/demo/BakingPasta/BakingPastaDecider.php b/demo/BakingPasta/BakingPastaDecider.php new file mode 100644 index 0000000..780a5e4 --- /dev/null +++ b/demo/BakingPasta/BakingPastaDecider.php @@ -0,0 +1,34 @@ +events = $events; + } + + /** + * Process event for schedule decisions + */ + public function process() + { + $this->addDecisionTask(new CompleteWorkflowExecutionDecision()); + } +} diff --git a/demo/BakingPasta/BakingPastaWorkflow.php b/demo/BakingPasta/BakingPastaWorkflow.php index 5d3e668..604dd22 100644 --- a/demo/BakingPasta/BakingPastaWorkflow.php +++ b/demo/BakingPasta/BakingPastaWorkflow.php @@ -2,7 +2,6 @@ namespace Continuous\Demo\Swf\BakingPasta; -use Aws\Result; use Continuous\Swf\Entity\Workflow; /** @@ -11,26 +10,64 @@ */ class BakingPastaWorkflow extends Workflow { - public function process(Result $result) + const NAME = 'bakingpasta'; + const VERSION = '0.1.0'; + + protected $pasta; + protected $weight; + + public function getName() : string { - // TODO: Implement process() method. + return static::NAME; } - public function setResult() + public function getVersion() : string { - // TODO: Implement setResult() method. + return static::VERSION; } - public function extract() :array + /** + * Type of pasta + * + * @param $pasta + * @return $this + * @throws \Exception + */ + public function setPasta($pasta) + { + if ('spaghetti' !== $pasta) { + throw new \Exception('Only spaghetti pasta is supported'); + } + + $this->pasta = $pasta; + return $this; + } + + /** + * Gram of pasta + * + * @param int $weight + * @return $this + */ + public function setWeight(int $weight) { - // TODO: Implement extract() method. + $this->weight = $weight; + return $this; + } + public function extract() :array + { return [ + 'parent' => $this->parent, + 'pasta' => $this->pasta, + 'weight' => $this->weight, ]; } public function hydrate(array $data) { - // TODO: Implement hydrate() method. + $this->parent = $data['parent']; + $this->pasta = $data['pasta']; + $this->weight = $data['weight']; } } diff --git a/demo/Sauce/SauceDecider.php b/demo/Sauce/SauceDecider.php new file mode 100644 index 0000000..6497f33 --- /dev/null +++ b/demo/Sauce/SauceDecider.php @@ -0,0 +1,24 @@ +events = $events; + } + + public function process() + { + $this->addDecisionTask(new CompleteWorkflowExecutionDecision()); + } +} diff --git a/demo/Sauce/SauceWorkflow.php b/demo/Sauce/SauceWorkflow.php index 5530a1f..333bb18 100644 --- a/demo/Sauce/SauceWorkflow.php +++ b/demo/Sauce/SauceWorkflow.php @@ -2,7 +2,6 @@ namespace Continuous\Demo\Swf\Sauce; -use Aws\Result; use Continuous\Swf\Entity\Workflow; /** @@ -11,26 +10,45 @@ */ class SauceWorkflow extends Workflow { - public function process(Result $result) + const NAME = 'sauce'; + const VERSION = '0.1.0'; + + /** + * @var bool + */ + protected $onions = false; + + public function getName() : string { - // TODO: Implement process() method. + return static::NAME; } - public function setResult() + public function getVersion() : string { - // TODO: Implement setResult() method. + return static::VERSION; } - public function extract() :array + /** + * @param bool $wantOnions + * @return $this + */ + public function setOnions(bool $wantOnions) { - // TODO: Implement extract() method. + $this->onions = $wantOnions; + return $this; + } + public function extract() :array + { return [ + 'parent' => $this->parent, + 'onions' => $this->onions, ]; } public function hydrate(array $data) { - // TODO: Implement hydrate() method. + $this->parent = $data['parent']; + $this->onions = $data['onions']; } } diff --git a/demo/Spaghetti/CompileActivity.php b/demo/Spaghetti/CompileActivity.php index 3a40045..eb43dc7 100644 --- a/demo/Spaghetti/CompileActivity.php +++ b/demo/Spaghetti/CompileActivity.php @@ -1,11 +1,25 @@ events = $events; + eval(\Psy\sh()); + } + + public function process() + { + //TODO if compile activity finish, start activity eat. + + /*$this->filter( + $this->events, + [ Event::ACTIVITY_TASK_COMPLETED ], + function($event) { + $name = $event['childWorkflowExecutionCompletedEventAttributes']['workflowType']['name']; + return in_array($name, ['sauce', 'bakingpasta']); + } + );*/ + + if (false) { + $this->finish(); + return; + } + + //TODO if compile activity finish, start activity eat. + + if (false) { + $this->eat(); + return; + } + + $names = $this->filter( + $this->events, + [ Event::CHILD_WORKFLOW_EXECUTION_COMPLETED ], + function ($event) { + $name = $event['childWorkflowExecutionCompletedEventAttributes']['workflowType']['name']; + return in_array($name, ['sauce', 'bakingpasta']); + } + ); + + eval(\Psy\sh()); + + if (2 === count($names)) { + $this->compile(); + return; + } + + $names = $this->filter( + $this->events, + [ Event::CHILD_WORKFLOW_EXECUTION_STARTED ] + ); + + if (0 === count($names)) { + $this->startChildren(); + return; + } + } + + public function startChildren() + { + $bakingPastaWorkflow = new BakingPastaWorkflow(); + $sauceWorkflow = new SauceWorkflow(); + + $bakingPastaWorkflow + ->setParent($this) + ->setId(Uuid::uuid4()) + ->setPasta('spaghetti') + ->setWeight(250) + ; + + $sauceWorkflow + ->setParent($this) + ->setId(Uuid::uuid4()) + ->setOnions(true) + ; + + $startChildPasta = new StartChildWorkflowExecutionDecision( + $bakingPastaWorkflow, + ['name' => 'default'] + ); + + $startChildSauce = new StartChildWorkflowExecutionDecision( + $sauceWorkflow, + ['name' => 'default'] + ); + + $this + ->addDecisionTask($startChildPasta) + ->addDecisionTask($startChildSauce) + ; + } + + public function compile() + { + $compileActivity = new CompileActivity(); + $compileActivity + ->setId(Uuid::uuid4()) + ; + + $compileTask = new ScheduleActivityTaskDecision( + $compileActivity->getId(), + ['name' => $compileActivity->getName(), 'version' => $compileActivity->getVersion()] + ); + + $this->addDecisionTask($compileTask); + } + + public function eat() + { + $eatActivity = new EatActivity(); + $eatActivity + ->setId(Uuid::uuid4()) + ; + + $eatTask = new ScheduleActivityTaskDecision( + $eatActivity->getId(), + ['name' => $eatActivity->getName(), 'version' => $eatActivity->getVersion()] + ); + + $this->addDecisionTask($eatTask); + } + + public function finish() + { + $this->addDecisionTask(new CompleteWorkflowExecutionDecision()); + } +} diff --git a/demo/Spaghetti/SpaghettiWorkflow.php b/demo/Spaghetti/SpaghettiWorkflow.php index 0dd36a8..5acd133 100644 --- a/demo/Spaghetti/SpaghettiWorkflow.php +++ b/demo/Spaghetti/SpaghettiWorkflow.php @@ -2,7 +2,6 @@ namespace Continuous\Demo\Swf\Spaghetti; -use Aws\Result; use Continuous\Swf\Entity\Workflow; /** @@ -11,26 +10,62 @@ */ class SpaghettiWorkflow extends Workflow { - public function process(Result $result) + const NAME = 'spaghetti'; + const VERSION = '0.1.0'; + + /** + * @var string + */ + protected $kwisto; + + /** + * @var string + */ + protected $client; + + public function getName() : string { - // TODO: Implement process() method. + return static::NAME; } - public function setResult() + public function getVersion() : string { - // TODO: Implement setResult() method. + return static::VERSION; } - public function extract() :array + /** + * @param string $kwisto + * @return $this + */ + public function setKwisto(string $kwisto) + { + $this->kwisto = $kwisto; + return $this; + } + + /** + * @param string $client + * @return $this + */ + public function setClient(string $client) { - // TODO: Implement extract() method. + $this->client = $client; + return $this; + } + public function extract() :array + { return [ + 'parent' => $this->parent, + 'kwisto' => $this->kwisto, + 'client' => $this->client, ]; } public function hydrate(array $data) { - // TODO: Implement hydrate() method. + $this->parent = $data['parent']; + $this->kwisto = $data['kwisto']; + $this->client = $data['client']; } } diff --git a/src/ActivityInterface.php b/src/ActivityInterface.php deleted file mode 100644 index 1ae46dd..0000000 --- a/src/ActivityInterface.php +++ /dev/null @@ -1,8 +0,0 @@ - strlen($result) )) { + throw new DecisionException('result must be string greater than 0 and less than 32768 characters'); + } + + $this->result = $result; + } + + public function toRespondDecision() : array + { + $response = [ + 'decisionType' => static::DECISION_TYPE, + ]; + + if (null !== $this->result) { + $response[static::ATTRIBUTE_KEY] = [ + 'result' => $this->result, + ]; + } + + return $response; + } + + public function getDecisionType() : string + { + return static::DECISION_TYPE; + } + + public function getAttributeKey() : string + { + return static::ATTRIBUTE_KEY; + } +} diff --git a/src/DataTypes/Decision/DecisionException.php b/src/DataTypes/Decision/DecisionException.php new file mode 100644 index 0000000..eaf3b59 --- /dev/null +++ b/src/DataTypes/Decision/DecisionException.php @@ -0,0 +1,7 @@ +taskToken = $taskToken; + return $this; + } + + /** + * @return string + */ + public function getTaskToken() : string + { + return $this->taskToken; + } + + /** + * @param DecisionInterface $decision + */ + public function addDecisionTask(DecisionInterface $decision) + { + $this->decisionTasksList[] = $decision; + return $this; + } + + /** + * @return DecisionInterface[] + */ + public function getDecisionTasksList() + { + return $this->decisionTasksList; + } + + /** + * Return true if we have decisionsTask to respond + * + * @return bool + */ + public function hasDecisions() + { + return 0 < count($this->decisionTasksList); + } + + /** + * @param array $events + * @param array $eventType + * @param $callable */ - protected function addDecisionTask(DecisionInterface $decision) + public function filter(array $events, array $eventType, $callable = null) { - $this->$decisionTasksList[] = $decision; + $result = []; + + foreach ($events as $event) { + if (false === in_array($event['eventType'], $eventType)) { + continue; + } + + if (null === $callable) { + $result[] = $event; + continue; + } + + if (true === call_user_func($callable, $event)) { + $result[] = $event; + } + } + + return $result; } } diff --git a/src/DataTypes/Decision/ScheduleActivityTaskDecision.php b/src/DataTypes/Decision/ScheduleActivityTaskDecision.php new file mode 100644 index 0000000..4b37726 --- /dev/null +++ b/src/DataTypes/Decision/ScheduleActivityTaskDecision.php @@ -0,0 +1,136 @@ +activityId = $activityId; + + if (2 !== count($activityType) || empty($activityType['name']) || empty($activityType['version'])) { + new DecisionException('activityType argument must be match to AWS-SWF activity type structure.'); + } + + $this->activityType = $activityType; + + if ('' !== $control) { + $this->control = $control; + } + + if (-1 !== $heartbeatTimeout && 'NONE' !== $heartbeatTimeout && 0 > (int)$heartbeatTimeout) { + new DecisionException('heartbeatTimeout must be greater than 0 or string NONE.'); + } + + $this->heartbeatTimeout = $heartbeatTimeout; + + if (-1 !== $scheduleToCloseTimeout && 'NONE' !== $scheduleToCloseTimeout && 0 > (int)$scheduleToCloseTimeout) { + new DecisionException('scheduleToCloseTimeout must be greater than 0 or string NONE.'); + } + + $this->scheduleToCloseTimeout = $scheduleToCloseTimeout; + + if (-1 !== $scheduleToStartTimeout && 'NONE' !== $scheduleToStartTimeout && 0 > (int)$scheduleToStartTimeout) { + new DecisionException('scheduleToStartTimeout must be greater than 0 or string NONE.'); + } + + $this->scheduleToStartTimeout = $scheduleToStartTimeout; + + if (-1 !== $startToCloseTimeout && 'NONE' !== $startToCloseTimeout && 0 > (int)$startToCloseTimeout) { + new DecisionException('startToCloseTimeout must be greater than 0 or string NONE.'); + } + + $this->startToCloseTimeout = $startToCloseTimeout; + + if (null !== $taskList && 1 !== count($taskList) || empty($taskList['name'])) { + new DecisionException('Tasklist argument must be match to AWS-SWF taskList structure.'); + } + + $this->taskList = $taskList; + + if (-2147483648 > $taskPriority || 2147483647 < $taskPriority) { + new DecisionException('taskPriority must be an integer between -2147483648 and 2147483647'); + } + + $this->taskPriority = $taskPriority; + } + + public function toRespondDecision() : array + { + $attributes = [ + 'activityId' => $this->activityId, + 'activityType' => $this->activityType, + ]; + + if (null !== $this->control) { + $attributes['control'] = $this->control; + } + + if (-1 !== $this->heartbeatTimeout) { + $attributes['heartbeatTimeout'] = $this->heartbeatTimeout; + } + + if (-1 !== $this->scheduleToCloseTimeout) { + $attributes['scheduleToCloseTimeout'] = $this->scheduleToCloseTimeout; + } + + if (-1 !== $this->scheduleToStartTimeout) { + $attributes['scheduleToStartTimeout'] = $this->scheduleToStartTimeout; + } + + if (-1 !== $this->startToCloseTimeout) { + $attributes['startToCloseTimeout'] = $this->startToCloseTimeout; + } + + if (null !== $this->taskList) { + $attributes['taskList'] = $this->taskList; + } + + if (0 !== $this->taskPriority) { + $attributes['taskPriority'] = $this->taskPriority; + } + + return [ + 'decisionType' => static::DECISION_TYPE, + static::ATTRIBUTE_KEY => $attributes, + ]; + } + + public function getDecisionType() : string + { + return static::DECISION_TYPE; + } + + public function getAttributeKey() : string + { + return static::ATTRIBUTE_KEY; + } +} diff --git a/src/DataTypes/Decision/StartChildWorkflowExecutionDecision.php b/src/DataTypes/Decision/StartChildWorkflowExecutionDecision.php new file mode 100644 index 0000000..5552fb3 --- /dev/null +++ b/src/DataTypes/Decision/StartChildWorkflowExecutionDecision.php @@ -0,0 +1,155 @@ +workflow = $workflow; + + if (1 !== count($taskList) || empty($taskList['name'])) { + new DecisionException('Tasklist argument must be match to AWS-SWF taskList structure.'); + } + + $this->taskList = $taskList; + + if ('' !== $childPolicy + && false == in_array($childPolicy, ['TERMINATE', 'REQUEST_CANCEL', 'ABANDON']) + ) { + new DecisionException('childPolicy must be one of TERMINATE, REQUEST_CANCEL, ABANDON.'); + } + + $this->childPolicy = $childPolicy; + + if ('' !== $control) { + $this->control = $control; + } + + if (-1 !== $executionStartToCloseTimeout && 0 > $executionStartToCloseTimeout) { + new DecisionException('executionStartToCloseTimeout must be greater than 0.'); + } + + $this->executionStartToCloseTimeout = $executionStartToCloseTimeout; + + if ('' !== $lambdaRoleArn) { + $this->lambdaRoleArn = $lambdaRoleArn; + } + + if (5 < count($tagList)) { + new DecisionException('You can specify only maximum 5 tags on tagList argument'); + } + + $this->tagList = $tagList; + + if (-2147483648 > $taskPriority || 2147483647 < $taskPriority) { + new DecisionException('taskPriority must be an integer between -2147483648 and 2147483647'); + } + + $this->taskPriority = $taskPriority; + + if (-1 !== $taskStartToCloseTimeout + && 'NONE' !== $taskStartToCloseTimeout + && 0 > (int)$taskStartToCloseTimeout + ) { + new DecisionException('taskStartToCloseTimeout must be greater than 0 or string NONE.'); + } + + $this->taskStartToCloseTimeout = $taskStartToCloseTimeout; + } + + public function toRespondDecision() : array + { + $id = $this->workflow->getId(); + + if (false === Uuid::isValid($id)) { + throw new \Exception('Workflow must have ID defined. ', get_class($this->workflow)); + } + + $attributes = [ + 'taskList' => $this->taskList, + 'workflowId' => $id, + 'workflowType' => [ + 'name' => $this->workflow->getName(), + 'version' => $this->workflow->getVersion(), + ], + 'input' => json_encode($this->workflow->extract()) + ]; + + if ('' !== $this->childPolicy) { + $attributes['childPolicy'] = $this->childPolicy; + } + + if (null !== $this->control) { + $attributes['control'] = $this->control; + } + + if (-1 !== $this->executionStartToCloseTimeout) { + $attributes['executionStartToCloseTimeout'] = $this->executionStartToCloseTimeout; + } + + if (null !== $this->lambdaRoleArn) { + $attributes['lambdaRole'] = $this->lambdaRoleArn; + } + + if (!empty($this->tagList)) { + $attributes['tagList'] = $this->tagList; + } + + if (0 !== $this->taskPriority) { + $attributes['taskPriority'] = $this->taskPriority; + } + + if (-1 !== $this->taskStartToCloseTimeout) { + $attributes['taskStartToCloseTimeout'] = $this->taskStartToCloseTimeout; + } + + return [ + 'decisionType' => static::DECISION_TYPE, + static::ATTRIBUTE_KEY => $attributes, + ]; + } + + public function getDecisionType() : string + { + return static::DECISION_TYPE; + } + + public function getAttributeKey() : string + { + return static::ATTRIBUTE_KEY; + } +} diff --git a/src/DataTypes/Event.php b/src/DataTypes/Event.php new file mode 100644 index 0000000..a9591e4 --- /dev/null +++ b/src/DataTypes/Event.php @@ -0,0 +1,61 @@ +id = $id; + return $this; + } + + /** + * @return string + */ + public function getId() : string + { + return $this->id; + } + abstract public function extract() : array; abstract public function hydrate(array $data); } diff --git a/src/Entity/ActivityInterface.php b/src/Entity/ActivityInterface.php new file mode 100644 index 0000000..ec738d5 --- /dev/null +++ b/src/Entity/ActivityInterface.php @@ -0,0 +1,7 @@ +id = $id; + return $this; + } + + /** + * @return string + */ + public function getId() : string + { + return $this->id; + } + + /** + * @param Workflow $workflow + * @return $this + */ + public function setParent(Workflow $workflow) + { + $this->parent = get_class($workflow); + return $this; + } + + /** + * @return string + */ + public function getParent() : string + { + return $this->parent; + } + abstract public function extract() : array; abstract public function hydrate(array $data); } diff --git a/src/Entity/WorkflowInterface.php b/src/Entity/WorkflowInterface.php new file mode 100644 index 0000000..4626895 --- /dev/null +++ b/src/Entity/WorkflowInterface.php @@ -0,0 +1,27 @@ +swfClient->pollForDecisionTask([ 'domain' => $this->config->domain, @@ -70,22 +69,33 @@ public function pollWorkflow(string $taskList = 'default') : WorkflowInterface 'name' => $taskList, ], 'identify' => $this->config->identity, - 'maximumPageSize' => 50, - 'reverseOrder' => true, + 'reverseOrder' => false, ]); - //TODO try catch result not Aws\Result... + //TODO poll with nextPageToken if max number of events is reach - $workflowType = $result['workflowType']; - $workflow = $this->getWorkflowEntity($workflowType['name'], $workflowType['version']); + if (empty($result['events'])) { + var_dump($result); + throw new ServiceException('No events detect on decision request.'); + } + + foreach ($result['events'] as $event) { + if ('WorkflowExecutionStarted' !== $event['eventType']) { + continue; + } + + $workflowType = $event['workflowExecutionStartedEventAttributes']['workflowType']; + $decider = $this->getDeciderEntity($workflowType['name']); + $decider->setId($result['workflowExecution']['workflowId']); + $decider->setTaskToken($result['taskToken']); + $decider->hydrate(json_decode($event['workflowExecutionStartedEventAttributes']['input'], true)); - if (isset($result['input'])) { - $workflow->hydrate(json_decode($result['input'], true)); + break; } - $workflow->process($result); + $decider->setEvents(array_reverse($result['events'])); - return $workflow; + return $decider; } /** @@ -137,6 +147,22 @@ public function pollActivityGenerator(string $taskList = 'default') : \Generator } } + /** + * @param string $deciderName + * @param string $version + * @return DeciderInterface + */ + protected function getDeciderEntity(string $deciderName) : DeciderInterface + { + $className = ClassFinder::findClass($this->config->namespace, $deciderName, 'Decider'); + + if (null === $className) { + throw new ServiceException('Decider entity not found for ' . $deciderName); + } + + return new $className(); + } + /** * @param string $name * @param $version @@ -146,6 +172,10 @@ protected function getWorkflowEntity(string $workflowName, string $version) : Wo { $className = ClassFinder::findClass($this->config->namespace, $workflowName, 'Workflow'); + if (null === $className) { + throw new ServiceException('Workflow entity not found for ' . $workflowName); + } + return new $className(); } @@ -157,14 +187,33 @@ protected function getActivityEntity(string $activityName) : Activity { $className = ClassFinder::findClass($this->config->namespace, $activityName, 'Activity'); + if (null === $className) { + throw new ServiceException('Activity entity not found for ' . $activityName); + } + return new $className(); } /** + * Send RespondDecisionTask to SWF API. * + * @param DeciderInterface $decider */ - public function startWorkflow(Workflow $workflowEntity) + public function respondDecisionTaskCompleted(DeciderInterface $decider) { - //$workflowEntity->extract(); + $decisions = []; + + foreach ($decider->getDecisionTasksList() as $decision) { + $decisions[] = $decision->toRespondDecision(); + } + + if (empty($decisions)) { + return; + } + + $result = $this->swfClient->respondDecisionTaskCompleted([ + 'taskToken' => $decider->getTaskToken(), + 'decisions' => $decisions, + ]); } } diff --git a/src/ServiceException.php b/src/ServiceException.php new file mode 100644 index 0000000..f34c593 --- /dev/null +++ b/src/ServiceException.php @@ -0,0 +1,7 @@ +sdkAws = new \Aws\Sdk($awsConfig); + $this->swfClient = $this->sdkAws->createClient('Swf'); + + $allConfig = \Continuous\Swf\Helper\Config::getAll(); + + $this->service = new \Continuous\Swf\Service(new \Continuous\Swf\ServiceConfig( + $allConfig->swf->domain, + $allConfig->identity, + $this->swfClient, + 'Continuous\\Demo\\Swf' + )); + } + + public function decisions() + { + $workflow = $this->service->pollWorkflow(); + eval(\Psy\sh()); + } + + public function spaghettiDecider() + { + + } + + public function bakingPastaDecider() + { + } + + public function sauceDecider() + { + } +} + +$scratch = new Decider(); +$scratch->decisions(); + diff --git a/tests/scratch.php b/tests/scratch.php new file mode 100644 index 0000000..014c981 --- /dev/null +++ b/tests/scratch.php @@ -0,0 +1,65 @@ +sdkAws = new \Aws\Sdk($awsConfig); + $this->swfClient = $this->sdkAws->createClient('Swf'); + + $allConfig = \Continuous\Swf\Helper\Config::getAll(); + + $this->service = new \Continuous\Swf\Service(new \Continuous\Swf\ServiceConfig( + $allConfig->swf->domain, + $allConfig->identity, + $this->swfClient, + 'Continuous\\Demo\\Swf' + )); + } + + public function run() + { + $workflow = new \Continuous\Demo\Swf\Spaghetti\SpaghettiWorkflow(); + + $workflow + ->setId(\Ramsey\Uuid\Uuid::uuid4()) + ->setKwisto('Ducasse') + ->setClient('John') + ->setOnions(true) + ; + + $input = json_encode($workflow->extract()); + + $domain = $this->swfClient->startWorkflowExecution([ + 'domain' => $this->domainName, + 'taskList' => [ + 'name' => 'default' + ], + 'workflowId' => $workflow->getId(), + 'workflowType' => [ + 'name' => $workflow::NAME, + 'version' => $workflow::VERSION, + ], + 'input' => $input, + ]); + } +} + +$scratch = new Scratch(); +$scratch->run(); + diff --git a/tests/worker.php b/tests/worker.php new file mode 100644 index 0000000..e69de29 From 9b7f7c438f23a3f7a82ed509a1cb8d2f47e73077 Mon Sep 17 00:00:00 2001 From: Pierre Tomasina Date: Mon, 21 Nov 2016 02:17:27 +0100 Subject: [PATCH 3/4] Activity archi --- demo/Spaghetti/CompileActivity.php | 12 +++---- demo/Spaghetti/EatActivity.php | 20 ++++++++--- demo/Spaghetti/SpaghettiDecider.php | 11 ++++-- src/Entity/Activity.php | 52 +++++++++++++++++++++++++++++ src/Entity/ActivityInterface.php | 17 ++++++++++ src/Service.php | 47 ++++++++++++++++++++++++-- 6 files changed, 143 insertions(+), 16 deletions(-) diff --git a/demo/Spaghetti/CompileActivity.php b/demo/Spaghetti/CompileActivity.php index eb43dc7..7e6c718 100644 --- a/demo/Spaghetti/CompileActivity.php +++ b/demo/Spaghetti/CompileActivity.php @@ -19,19 +19,17 @@ public function getVersion() : string return static::VERSION; } - - public function setResult() + public function extract() : array { - // TODO: Implement setResult() method. + return []; } - public function extract() : array + public function hydrate(array $data) { - // TODO: Implement extract() method. } - public function hydrate(array $data) + public function process() { - // TODO: Implement hydrate() method. + $this->completed(); } } diff --git a/demo/Spaghetti/EatActivity.php b/demo/Spaghetti/EatActivity.php index c376337..7376361 100644 --- a/demo/Spaghetti/EatActivity.php +++ b/demo/Spaghetti/EatActivity.php @@ -6,18 +6,30 @@ class EatActivity extends Activity { - public function setResult() + const NAME = 'spaghetti.eat'; + const VERSION = '0.1.0'; + + public function getName() : string + { + return static::NAME; + } + + public function getVersion() : string { - // TODO: Implement setResult() method. + return static::VERSION; } public function extract() : array { - // TODO: Implement extract() method. + return []; } public function hydrate(array $data) { - // TODO: Implement hydrate() method. + } + + public function process() + { + $this->completed(); } } diff --git a/demo/Spaghetti/SpaghettiDecider.php b/demo/Spaghetti/SpaghettiDecider.php index d0c40ee..fae642d 100644 --- a/demo/Spaghetti/SpaghettiDecider.php +++ b/demo/Spaghetti/SpaghettiDecider.php @@ -53,6 +53,15 @@ function($event) { return; } + $events = $this->filter( + $this->events, + [ Event::ACTIVITY_TASK_SCHEDULED ] + ); + + if (0 < count($events)) { + return; + } + $names = $this->filter( $this->events, [ Event::CHILD_WORKFLOW_EXECUTION_COMPLETED ], @@ -62,8 +71,6 @@ function ($event) { } ); - eval(\Psy\sh()); - if (2 === count($names)) { $this->compile(); return; diff --git a/src/Entity/Activity.php b/src/Entity/Activity.php index 2b9cacd..e385586 100644 --- a/src/Entity/Activity.php +++ b/src/Entity/Activity.php @@ -18,11 +18,25 @@ */ abstract class Activity implements ActivityInterface { + const CANCELED = 1; + const COMPLETED = 2; + const FAILED = 3; + /** * @var string uuid4 */ protected $id; + /** + * @var string token + */ + protected $taskToken; + + /** + * @var string CANCELED | COMPLETED | FAILED + */ + protected $status; + /** * @var string uuid4 */ @@ -32,6 +46,16 @@ public function setId(string $id) return $this; } + /** + * @param string $taskToken + * @return $this + */ + public function setTaskToken(string $taskToken) + { + $this->taskToken = $taskToken; + return $this; + } + /** * @return string */ @@ -40,6 +64,34 @@ public function getId() : string return $this->id; } + /** + * @return string + */ + public function getTaskToken() : string + { + return $this->taskToken; + } + + public function getStatus() : string + { + return $this->status; + } + + public function canceled() + { + $this->status = static::CANCELED; + } + + public function completed() + { + $this->status = static::COMPLETED; + } + + public function failed() + { + $this->status = static::FAILED; + } + abstract public function extract() : array; abstract public function hydrate(array $data); } diff --git a/src/Entity/ActivityInterface.php b/src/Entity/ActivityInterface.php index ec738d5..1fceccd 100644 --- a/src/Entity/ActivityInterface.php +++ b/src/Entity/ActivityInterface.php @@ -4,4 +4,21 @@ interface ActivityInterface extends HydratorInterface { + public function setId(string $id); + + public function setTaskToken(string $taskToken); + + public function getId() : string; + + public function getTaskToken() : string; + + public function getStatus() : string; + + public function canceled(); + + public function completed(); + + public function failed(); + + public function process(); } diff --git a/src/Service.php b/src/Service.php index 968417a..b47f0a3 100644 --- a/src/Service.php +++ b/src/Service.php @@ -14,6 +14,7 @@ use Aws\Swf\SwfClient; use Continuous\Swf\DataTypes\Decision\DecisionTrait; use Continuous\Swf\Entity\Activity; +use Continuous\Swf\Entity\ActivityInterface; use Continuous\Swf\Entity\Workflow; use Continuous\Swf\Helper\ClassFinder; @@ -58,10 +59,11 @@ public function __construct(ServiceConfig $serviceConfig) } /** + * Poll a workflow decider * * @param string $taskList */ - public function pollWorkflow(string $taskList = 'default') + public function pollWorkflow(string $taskList = 'default') : DeciderInterface { $result = $this->swfClient->pollForDecisionTask([ 'domain' => $this->config->domain, @@ -117,7 +119,7 @@ public function pollWorkflowGenerator(string $taskList = 'default') : \Generator * @param string $taskList * @return ActivityInterface */ - public function pollActivity(string $taskList = 'default') : ActivityInterface + public function pollActivity(string $taskList = 'default') : Activity { $result = $this->swfClient->pollForActivityTask([ 'domain' => $this->config->domain, @@ -126,9 +128,15 @@ public function pollActivity(string $taskList = 'default') : ActivityInterface ] ]); + //TODO if nothing to treat... + + eval(\Psy\sh()); + $activityName = $result['activityType']['name']; $activity = $this->getActivityEntity($activityName); + $activity->setId($result['activityId']); + $activity->setTaskToken($result['taskToken']); $activity->hydrate($result['input'], $activity); return $activity; @@ -211,9 +219,42 @@ public function respondDecisionTaskCompleted(DeciderInterface $decider) return; } - $result = $this->swfClient->respondDecisionTaskCompleted([ + $this->swfClient->respondDecisionTaskCompleted([ 'taskToken' => $decider->getTaskToken(), 'decisions' => $decisions, ]); } + + public function respondActivityTaskCompleted(ActivityInterface $activity, string $detail = '', string $reason = '') + { + $status = $activity->getStatus(); + + if (Activity::COMPLETED === $status) { + $this->swfClient->respondActivityTaskCompleted([ + 'taskToken' => $activity->getTaskToken(), + 'result' => json_encode($activity->extract()), + ]); + + return; + } + + if (Activity::CANCELED === $status) { + $this->swfClient->respondActivityTaskCanceled([ + 'taskToken' => $activity->getTaskToken(), + 'detail' => $detail, + ]); + + return; + } + + if (Activity::FAILED === $status) { + $this->swfClient->respondActivityTaskFailed([ + 'taskToken' => $activity->getTaskToken(), + 'detail' => $detail, + 'reason' => $reason, + ]); + + return; + } + } } From 53311bfe9734cde3bf19b1371934be06e6e5d0ca Mon Sep 17 00:00:00 2001 From: Pierre Tomasina Date: Mon, 21 Nov 2016 02:18:03 +0100 Subject: [PATCH 4/4] add scratch cli test --- tests/decider.php | 21 ++++++-------------- tests/scratch.php | 5 ++--- tests/worker.php | 50 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 18 deletions(-) diff --git a/tests/decider.php b/tests/decider.php index 733cb53..d2c6554 100644 --- a/tests/decider.php +++ b/tests/decider.php @@ -4,7 +4,6 @@ class Decider { - /** * @var \Continuous\Swf\Service */ @@ -31,24 +30,16 @@ public function __construct() public function decisions() { - $workflow = $this->service->pollWorkflow(); - eval(\Psy\sh()); - } - - public function spaghettiDecider() - { - - } + foreach ($this->service->pollWorkflowGenerator() as $decider) { - public function bakingPastaDecider() - { - } + $decider->process(); - public function sauceDecider() - { + if (true === $decider->hasDecisions()) { + $this->service->respondDecisionTaskCompleted($decider); + } + } } } $scratch = new Decider(); $scratch->decisions(); - diff --git a/tests/scratch.php b/tests/scratch.php index 014c981..e501551 100644 --- a/tests/scratch.php +++ b/tests/scratch.php @@ -40,7 +40,6 @@ public function run() ->setId(\Ramsey\Uuid\Uuid::uuid4()) ->setKwisto('Ducasse') ->setClient('John') - ->setOnions(true) ; $input = json_encode($workflow->extract()); @@ -52,8 +51,8 @@ public function run() ], 'workflowId' => $workflow->getId(), 'workflowType' => [ - 'name' => $workflow::NAME, - 'version' => $workflow::VERSION, + 'name' => $workflow->getName(), + 'version' => $workflow->getVersion(), ], 'input' => $input, ]); diff --git a/tests/worker.php b/tests/worker.php index e69de29..68ed5d4 100644 --- a/tests/worker.php +++ b/tests/worker.php @@ -0,0 +1,50 @@ +sdkAws = new \Aws\Sdk($awsConfig); + $this->swfClient = $this->sdkAws->createClient('Swf'); + + $allConfig = \Continuous\Swf\Helper\Config::getAll(); + + $this->service = new \Continuous\Swf\Service(new \Continuous\Swf\ServiceConfig( + $allConfig->swf->domain, + $allConfig->identity, + $this->swfClient, + 'Continuous\\Demo\\Swf' + )); + } + + public function work() + { + foreach ($this->service->pollActivityGenerator() as $activity) { + + try { + $activity->process(); + } catch (\Exception $e) { + $activity->failed(); + $this->service->respondActivityTaskCompleted($activity, $e->getTraceAsString(), $e->getMessage()); + + continue; + } + + $this->service->respondActivityTaskCompleted($activity); + } + } +} + +$scratch = new Worker(); +$scratch->work();