Skip to content

gopherflow is a Small, pragmatic workflow engine for Go with a built-in web console. Define workflows in Go, persist their execution, and observe/operate them via the web UI.

License

Notifications You must be signed in to change notification settings

RealZimboGuy/gopherflow

Repository files navigation

GopherFlow

Small, pragmatic workflow engine for Go with a built-in web console. Define workflows in Go, persist their execution, and observe/operate them via the web UI.

GopherFlow

Highlights

  • Define workflows in Go using a state-machine approach
  • each function is idempotent and can be retried
  • Persistent storage (Postgres, SQLite, Mysql supported) with action history
  • Concurrent execution with executor registration, heartbeats, and stuck-workflow repair
  • Web console (dashboard, search, definitions with diagrams, executors, details)
  • Mermaid-like flow visualization generated from workflow definitions
  • Container-friendly, single-binary deployment
  • Parent / Child Workflows
    • GopherFlow supports parent workflows spawning child workflows, allowing for parallel execution and coordination.
    • Spawn Children: A parent workflow can create multiple child workflow requests.
    • Wait & Wake: The parent can wait for children to complete. Children can explicitly wake their parent when they reach a certain state or finish.
    • Parallel Execution: Child workflows run independently and in parallel.

Quick start

Prerequisites:

  • Go 1.24+ (or Docker if you prefer containers)

Demo Application

This starts the demo application with a SQLite database, there are two workflows

  • DemoWorkflow - does some ficticious steps and adds some variables

  • GetIpWorkflow - gets the current public IP address from ifconfig.io and puts it into a state variable

      docker run -p 8080:8080 \
      -e GFLOW_DATABASE_TYPE=SQLLITE\
      -e GFLOW_DATABASE_SQLLITE_FILE_NAME=/data/gflow.db\
      -v "$(pwd):/data"\
      --security-opt seccomp=unconfined \
      juliangpurse/gopherflow:1.6.0
    

note the --security-opt seccomp=unconfined is required because of sqllite being run in a container

Access the web console at http://localhost:8080/

Username : admin
Password : admin

Web Console

GopherFlow GopherFlow GopherFlow

REST API

GopherFlow provides a REST API for programmatic interaction with workflows. A Postman collection is available in the postman directory to help you get started:

  • Collection File: in the postman/ directory
  • API Key Authentication: All endpoints use an X-API-Key header for authentication, check users tab in the web ui for the api key

Available Endpoints:

  1. Get Workflow Definitions - GET /api/definitions
  2. Create Workflow - POST /api/workflows
  3. Get Workflow Details - GET /api/workflows/{id}
  4. Get Workflow by External ID - GET /api/workflowByExternalId/{externalId}
  5. Search Workflows - POST /api/workflows/search
  6. Create and Wait - POST /api/createAndWait - Create a workflow and wait for it to reach specific states
  7. Update State and Wait - POST /api/workflows/{externalId}/stateAndWait - Update a workflow's state and wait for it to reach specific states

To use the Postman collection:

  1. Import the collection into Postman
  2. Configure your environment variables (if needed)
  3. Use the pre-configured requests to interact with your GopherFlow instance

Performance

  • Tested to a few thousand simple workflows per minute with the concurrent workers increased, see system settings (ENGINE_CHECK_DB_INTERVAL, ENGINE_BATCH_SIZE and ENGINE_EXECUTOR_SIZE )
  • Something to note, there are no official records of this to put on the repo.... why:
    • at a certain point if you need raw throughput, you dont need a workflow engine and will hand tool the code.
    • if you are chasing performance to that level, the convenience of a framework like GopherFlow is not worth it.
    • if you have complicated Directed Acyclic Graphs (DAGs) you will most likely need a workflow engine, in that case your executions per minute is more limited by external factors like APIs you are calling, database performance, etc.
    • having a workflow engine gives a single place for workflows to live and makes the trivial things like persistence, retry and observability easier.
  • these are mostly the rants of the developer :) take it with some salt.

Building your own Workflow and running it

refer to the example application: https://github.com/RealZimboGuy/gopherflow-examples

Specific details

go get github.com/RealZimboGuy/gopherflow@v1.6.0

a struct that extends the base

type GetIpWorkflow struct {
core.BaseWorkflow
}

the workflow interface must be fully implemented

type Workflow interface {
StateTransitions() map[string][]string // map of state name -> list of next state names
InitialState() string // where to start
Description() string
Setup(wf *domain.Workflow)
GetWorkflowData() *domain.Workflow
GetStateVariables() map[string]string
GetAllStates() []models.WorkflowState 
GetRetryConfig() models.RetryConfig
}

Here is the example for the GetIpWorkflow

import (
"github.com/RealZimboGuy/gopherflow/pkg/gopherflow/core"
domain "github.com/RealZimboGuy/gopherflow/pkg/gopherflow/domain"
models "github.com/RealZimboGuy/gopherflow/pkg/gopherflow/models"

	"io"
	"log/slog"
	"net/http"
	"time"
)

// Define a named string type
var StateStart string = "Start"
var StateGetIpData string = "StateGetIpData"

const VAR_IP = "ip"

type GetIpWorkflow struct {
core.BaseWorkflow
}

func (m *GetIpWorkflow) Setup(wf *domain.Workflow) {
m.BaseWorkflow.Setup(wf)
}
func (m *GetIpWorkflow) GetWorkflowData() *domain.Workflow {
return m.WorkflowState
}
func (m *GetIpWorkflow) GetStateVariables() map[string]string {
return m.StateVariables
}
func (m *GetIpWorkflow) InitialState() string {
return StateStart
}

func (m *GetIpWorkflow) Description() string {
return "This is a Demo Workflow showing how it can be used"
}

func (m *GetIpWorkflow) GetRetryConfig() models.RetryConfig {
return models.RetryConfig{
MaxRetryCount:    10,
RetryIntervalMin: time.Second * 10,
RetryIntervalMax: time.Minute * 60,
}
}

func (m *GetIpWorkflow) StateTransitions() map[string][]string {
return map[string][]string{
StateStart:     []string{StateGetIpData}, // Init -> StateGetIpData
StateGetIpData: []string{StateFinish},    // StateGetIpData -> finish
}
}
func (m *GetIpWorkflow) GetAllStates() []models.WorkflowState {
states := []models.WorkflowState{
{Name: StateStart, StateType: models.StateStart},
{Name: StateGetIpData, StateType: models.StateNormal},
{Name: StateFinish, StateType: models.StateEnd},
}
return states
}

// Each method returns the next state
func (m *GetIpWorkflow) Start(ctx context.Context) (*models.NextState, error) {

//use the Context slog because there are workerids and other fields in the context that get written by the logger
slog.InfoContext(ctx,"Starting workflow")

	return &models.NextState{
		Name:      StateGetIpData,
		ActionLog: "using ifconfig.io to return the public IP address",
	}, nil
}

func (m *GetIpWorkflow) StateGetIpData(ctx context.Context) (*models.NextState, error) {
resp, err := http.Get("http://ifconfig.io")
if err != nil {
return nil, err
}
defer resp.Body.Close()

	ipBytes, err := io.ReadAll(resp.Body)
	if err != nil {
		return nil, err
	}
	ip := string(ipBytes)
	m.StateVariables[VAR_IP] = ip

	return &models.NextState{
		Name: StateFinish,
	}, nil
}

Main function

   import (
    "context"
    "log/slog"
    
    "github.com/RealZimboGuy/gopherflow/internal/workflows"
    "github.com/RealZimboGuy/gopherflow/pkg/gopherflow"
    "github.com/RealZimboGuy/gopherflow/pkg/gopherflow/core"
    )
    
    func main() {
    
    //you may do your own logger setup here or use this default one with slog
    ctx := context.Background()
    
    gopherflow.SetupLogger(slog.LevelInfo)
    
    workflowRegistry := map[string]func() core.Workflow{
                "DemoWorkflow": func() core.Workflow {
                     return &workflows.DemoWorkflow{}
                },
                "GetIpWorkflow": func() core.Workflow {
                // You can inject dependencies here
                    return &workflows.GetIpWorkflow{
                    // HTTPClient: httpClient,
                    // MyService: myService,
                    }
                },
        }
    //uses the defaul ServeMux
    app := gopherflow.Setup(workflowRegistry)
    
    if err := app.Run(ctx); err != nil {
        slog.Error("Engine exited with error", "error", err)
    }
}

Example: Spawning Children

In your parent workflow state transition:

func (w *MyParentWorkflow) SpawnChildren(ctx context.Context) (*models.NextState, error) {
    // Create child workflow requests
    childRequests := []models.ChildWorkflowRequest{
        gopherflow.CreateChildWorkflowRequest(
            "MyChildWorkflow",
            fmt.Sprintf("child-%d", w.WorkflowState.ID),
            "ChildInit",
            map[string]string{"input": "value"},
        ),
    }

    return &models.NextState{
        Name:           "WaitForChildren",
        ChildWorkflows: childRequests,
    }, nil
}

Example: Waiting for Children

func (w *MyParentWorkflow) WaitForChildren(ctx context.Context) (*models.NextState, error) {
    children, err := w.GetChildWorkflows(ctx)
    if err != nil {
        return nil, err
    }

    allComplete := true
    for _, child := range children {
        if child.Status != models.WorkflowStatusFinished {
            allComplete = false
            break
        }
    }

    if !allComplete {
        // Wait and check again later
        return &models.NextState{
            Name:                "WaitForChildren",
            NextExecutionOffset: "1 minute",
        }, nil
    }

    return &models.NextState{Name: "Finish"}, nil
}

About

gopherflow is a Small, pragmatic workflow engine for Go with a built-in web console. Define workflows in Go, persist their execution, and observe/operate them via the web UI.

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published