diff --git a/backend/mysql/mysql.go b/backend/mysql/mysql.go index afa3bec8..b0dd63f4 100644 --- a/backend/mysql/mysql.go +++ b/backend/mysql/mysql.go @@ -52,10 +52,42 @@ func NewMysqlBackend(host string, port int, user, password, database string, opt } b := &mysqlBackend{ - dsn: dsn, - db: db, - workerName: getWorkerName(options), - options: options, + dsn: dsn, + db: db, + workerName: getWorkerName(options), + options: options, + ownsConnection: true, + } + + if options.ApplyMigrations { + if err := b.Migrate(); err != nil { + panic(err) + } + } + + return b +} + +// NewMysqlBackendWithDB creates a new MySQL backend using an existing database connection. +// When using this constructor, the backend will not close the database connection when Close() is called. +// Migrations are disabled by default; to enable them, use WithApplyMigrations(true) along with +// WithMigrationDSN to provide a DSN that supports multi-statement queries. +func NewMysqlBackendWithDB(db *sql.DB, opts ...option) *mysqlBackend { + options := &options{ + Options: backend.ApplyOptions(), + ApplyMigrations: false, + } + + for _, opt := range opts { + opt(options) + } + + b := &mysqlBackend{ + dsn: "", + db: db, + workerName: getWorkerName(options), + options: options, + ownsConnection: false, } if options.ApplyMigrations { @@ -68,10 +100,11 @@ func NewMysqlBackend(host string, port int, user, password, database string, opt } type mysqlBackend struct { - dsn string - db *sql.DB - workerName string - options *options + dsn string + db *sql.DB + workerName string + options *options + ownsConnection bool } func (mb *mysqlBackend) FeatureSupported(feature backend.Feature) bool { @@ -79,12 +112,24 @@ func (mb *mysqlBackend) FeatureSupported(feature backend.Feature) bool { } func (mb *mysqlBackend) Close() error { + if !mb.ownsConnection { + return nil + } return mb.db.Close() } // Migrate applies any pending database migrations. func (mb *mysqlBackend) Migrate() error { - schemaDsn := mb.dsn + "&multiStatements=true" + // Determine which DSN to use for migrations + var schemaDsn string + if mb.options.MigrationDSN != "" { + schemaDsn = mb.options.MigrationDSN + } else if mb.dsn != "" { + schemaDsn = mb.dsn + "&multiStatements=true" + } else { + return errors.New("cannot apply migrations: no DSN available; use WithMigrationDSN option or apply migrations externally") + } + db, err := sql.Open("mysql", schemaDsn) if err != nil { return fmt.Errorf("opening schema database: %w", err) diff --git a/backend/mysql/mysql_test.go b/backend/mysql/mysql_test.go index fa435fbb..919c1c2d 100644 --- a/backend/mysql/mysql_test.go +++ b/backend/mysql/mysql_test.go @@ -216,3 +216,126 @@ func Test_MysqlBackend_WorkerName(t *testing.T) { } }) } + +func Test_MysqlBackendWithDB(t *testing.T) { + if testing.Short() { + t.Skip() + } + + t.Run("UsesProvidedConnection", func(t *testing.T) { + // Create database for test + adminDB, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword)) + if err != nil { + t.Fatal(err) + } + + dbName := "test_withdb_" + strings.ReplaceAll(uuid.NewString(), "-", "") + if _, err := adminDB.Exec("CREATE DATABASE " + dbName); err != nil { + t.Fatal(err) + } + defer func() { + adminDB.Exec("DROP DATABASE IF EXISTS " + dbName) + adminDB.Close() + }() + + // Create our own connection to the test database + dsn := fmt.Sprintf("%s:%s@tcp(localhost:3306)/%s?parseTime=true&interpolateParams=true", testUser, testPassword, dbName) + db, err := sql.Open("mysql", dsn) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // Create backend with existing connection and migration DSN + migrationDSN := dsn + "&multiStatements=true" + backend := NewMysqlBackendWithDB(db, + WithApplyMigrations(true), + WithMigrationDSN(migrationDSN), + ) + + // Verify the backend uses our connection + if backend.db != db { + t.Error("Backend should use provided db connection") + } + if backend.ownsConnection { + t.Error("Backend should not own the connection") + } + + // Close backend - should NOT close our connection + if err := backend.Close(); err != nil { + t.Fatal(err) + } + + // Verify our connection is still usable + if err := db.Ping(); err != nil { + t.Errorf("Connection should still be open after backend.Close(): %v", err) + } + }) + + t.Run("MigrationsDisabledByDefault", func(t *testing.T) { + // Create database for test + adminDB, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword)) + if err != nil { + t.Fatal(err) + } + + dbName := "test_withdb2_" + strings.ReplaceAll(uuid.NewString(), "-", "") + if _, err := adminDB.Exec("CREATE DATABASE " + dbName); err != nil { + t.Fatal(err) + } + defer func() { + adminDB.Exec("DROP DATABASE IF EXISTS " + dbName) + adminDB.Close() + }() + + dsn := fmt.Sprintf("%s:%s@tcp(localhost:3306)/%s?parseTime=true&interpolateParams=true", testUser, testPassword, dbName) + db, err := sql.Open("mysql", dsn) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // Create backend without enabling migrations + backend := NewMysqlBackendWithDB(db) + defer backend.Close() + + // Tables should not exist since migrations weren't applied + _, err = db.Exec("SELECT 1 FROM instances LIMIT 1") + if err == nil { + t.Error("Expected error because table should not exist") + } + }) + + t.Run("MigrationFailsWithoutDSN", func(t *testing.T) { + // Create database for test + adminDB, err := sql.Open("mysql", fmt.Sprintf("%s:%s@/?parseTime=true&interpolateParams=true", testUser, testPassword)) + if err != nil { + t.Fatal(err) + } + + dbName := "test_withdb3_" + strings.ReplaceAll(uuid.NewString(), "-", "") + if _, err := adminDB.Exec("CREATE DATABASE " + dbName); err != nil { + t.Fatal(err) + } + defer func() { + adminDB.Exec("DROP DATABASE IF EXISTS " + dbName) + adminDB.Close() + }() + + dsn := fmt.Sprintf("%s:%s@tcp(localhost:3306)/%s?parseTime=true&interpolateParams=true", testUser, testPassword, dbName) + db, err := sql.Open("mysql", dsn) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // Create backend without migration DSN - should panic when trying to migrate + defer func() { + if r := recover(); r == nil { + t.Error("Expected panic when ApplyMigrations=true without MigrationDSN") + } + }() + + NewMysqlBackendWithDB(db, WithApplyMigrations(true)) + }) +} diff --git a/backend/mysql/options.go b/backend/mysql/options.go index a244903c..3169880d 100644 --- a/backend/mysql/options.go +++ b/backend/mysql/options.go @@ -13,6 +13,11 @@ type options struct { // ApplyMigrations automatically applies database migrations on startup. ApplyMigrations bool + + // MigrationDSN is an optional DSN to use for running migrations. This is useful when + // using NewMysqlBackendWithDB where no DSN is available. The DSN must support + // multi-statement queries (e.g., include &multiStatements=true). + MigrationDSN string } type option func(*options) @@ -30,6 +35,15 @@ func WithMySQLOptions(f func(db *sql.DB)) option { } } +// WithMigrationDSN sets the DSN to use for running migrations. This is required when +// using NewMysqlBackendWithDB with ApplyMigrations enabled. The DSN should support +// multi-statement queries. +func WithMigrationDSN(dsn string) option { + return func(o *options) { + o.MigrationDSN = dsn + } +} + // WithBackendOptions allows to pass generic backend options. func WithBackendOptions(opts ...backend.BackendOption) option { return func(o *options) { diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index c628aca6..bc3e4a84 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -51,10 +51,42 @@ func NewPostgresBackend(host string, port int, user, password, database string, } b := &postgresBackend{ - dsn: dsn, - db: db, - workerName: getWorkerName(options), - options: options, + dsn: dsn, + db: db, + workerName: getWorkerName(options), + options: options, + ownsConnection: true, + } + + if options.ApplyMigrations { + if err := b.Migrate(); err != nil { + panic(err) + } + } + + return b +} + +// NewPostgresBackendWithDB creates a new Postgres backend using an existing database connection. +// When using this constructor, the backend will not close the database connection when Close() is called. +// Migrations can still be applied using WithApplyMigrations(true) as Postgres does not require +// special connection settings for migrations. +func NewPostgresBackendWithDB(db *sql.DB, opts ...option) *postgresBackend { + options := &options{ + Options: backend.ApplyOptions(), + ApplyMigrations: false, + } + + for _, opt := range opts { + opt(options) + } + + b := &postgresBackend{ + dsn: "", + db: db, + workerName: getWorkerName(options), + options: options, + ownsConnection: false, } if options.ApplyMigrations { @@ -67,10 +99,11 @@ func NewPostgresBackend(host string, port int, user, password, database string, } type postgresBackend struct { - dsn string - db *sql.DB - workerName string - options *options + dsn string + db *sql.DB + workerName string + options *options + ownsConnection bool } func (pb *postgresBackend) FeatureSupported(feature backend.Feature) bool { @@ -78,15 +111,27 @@ func (pb *postgresBackend) FeatureSupported(feature backend.Feature) bool { } func (pb *postgresBackend) Close() error { + if !pb.ownsConnection { + return nil + } return pb.db.Close() } // Migrate applies any pending database migrations. func (pb *postgresBackend) Migrate() error { - schemaDsn := pb.dsn - db, err := sql.Open("pgx", schemaDsn) - if err != nil { - return fmt.Errorf("opening schema database: %w", err) + var db *sql.DB + var needsClose bool + + if pb.dsn != "" { + var err error + db, err = sql.Open("pgx", pb.dsn) + if err != nil { + return fmt.Errorf("opening schema database: %w", err) + } + needsClose = true + } else { + db = pb.db + needsClose = false } dbi, err := postgres.WithInstance(db, &postgres.Config{}) @@ -110,8 +155,10 @@ func (pb *postgresBackend) Migrate() error { } } - if err := db.Close(); err != nil { - return fmt.Errorf("closing schema database: %w", err) + if needsClose { + if err := db.Close(); err != nil { + return fmt.Errorf("closing schema database: %w", err) + } } return nil diff --git a/backend/postgres/postgres_test.go b/backend/postgres/postgres_test.go index 7dcf9bea..03166c24 100644 --- a/backend/postgres/postgres_test.go +++ b/backend/postgres/postgres_test.go @@ -210,3 +210,120 @@ func Test_PostgresBackend_WorkerName(t *testing.T) { } }) } + +func Test_PostgresBackendWithDB(t *testing.T) { + if testing.Short() { + t.Skip() + } + + t.Run("UsesProvidedConnection", func(t *testing.T) { + // Create database for test + adminDB, err := sql.Open("pgx", fmt.Sprintf("host=localhost port=5432 user=%s password=%s dbname=postgres sslmode=disable", testUser, testPassword)) + if err != nil { + t.Fatal(err) + } + + dbName := "test_withdb_" + strings.ReplaceAll(uuid.NewString(), "-", "") + if _, err := adminDB.Exec("CREATE DATABASE " + dbName); err != nil { + t.Fatal(err) + } + defer func() { + adminDB.Exec("DROP DATABASE IF EXISTS " + dbName + " WITH (FORCE)") + adminDB.Close() + }() + + // Create our own connection to the test database + db, err := sql.Open("pgx", fmt.Sprintf("host=localhost port=5432 user=%s password=%s dbname=%s sslmode=disable", testUser, testPassword, dbName)) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // Create backend with existing connection and apply migrations + backend := NewPostgresBackendWithDB(db, WithApplyMigrations(true)) + + // Verify the backend uses our connection + if backend.db != db { + t.Error("Backend should use provided db connection") + } + if backend.ownsConnection { + t.Error("Backend should not own the connection") + } + + // Close backend - should NOT close our connection + if err := backend.Close(); err != nil { + t.Fatal(err) + } + + // Verify our connection is still usable + if err := db.Ping(); err != nil { + t.Errorf("Connection should still be open after backend.Close(): %v", err) + } + }) + + t.Run("MigrationsDisabledByDefault", func(t *testing.T) { + // Create database for test + adminDB, err := sql.Open("pgx", fmt.Sprintf("host=localhost port=5432 user=%s password=%s dbname=postgres sslmode=disable", testUser, testPassword)) + if err != nil { + t.Fatal(err) + } + + dbName := "test_withdb2_" + strings.ReplaceAll(uuid.NewString(), "-", "") + if _, err := adminDB.Exec("CREATE DATABASE " + dbName); err != nil { + t.Fatal(err) + } + defer func() { + adminDB.Exec("DROP DATABASE IF EXISTS " + dbName + " WITH (FORCE)") + adminDB.Close() + }() + + db, err := sql.Open("pgx", fmt.Sprintf("host=localhost port=5432 user=%s password=%s dbname=%s sslmode=disable", testUser, testPassword, dbName)) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // Create backend without enabling migrations + backend := NewPostgresBackendWithDB(db) + defer backend.Close() + + // Tables should not exist since migrations weren't applied + _, err = db.Exec("SELECT 1 FROM instances LIMIT 1") + if err == nil { + t.Error("Expected error because table should not exist") + } + }) + + t.Run("MigrationsCanBeEnabled", func(t *testing.T) { + // Create database for test + adminDB, err := sql.Open("pgx", fmt.Sprintf("host=localhost port=5432 user=%s password=%s dbname=postgres sslmode=disable", testUser, testPassword)) + if err != nil { + t.Fatal(err) + } + + dbName := "test_withdb3_" + strings.ReplaceAll(uuid.NewString(), "-", "") + if _, err := adminDB.Exec("CREATE DATABASE " + dbName); err != nil { + t.Fatal(err) + } + defer func() { + adminDB.Exec("DROP DATABASE IF EXISTS " + dbName + " WITH (FORCE)") + adminDB.Close() + }() + + db, err := sql.Open("pgx", fmt.Sprintf("host=localhost port=5432 user=%s password=%s dbname=%s sslmode=disable", testUser, testPassword, dbName)) + if err != nil { + t.Fatal(err) + } + defer db.Close() + + // Create backend with migrations enabled + backend := NewPostgresBackendWithDB(db, WithApplyMigrations(true)) + defer backend.Close() + + // Tables should exist + _, err = db.Exec("SELECT 1 FROM instances LIMIT 1") + if err != nil { + t.Errorf("Table should exist after migrations: %v", err) + } + }) +} diff --git a/backend/sqlite/sqlite.go b/backend/sqlite/sqlite.go index 271789b7..a444d6cb 100644 --- a/backend/sqlite/sqlite.go +++ b/backend/sqlite/sqlite.go @@ -85,9 +85,10 @@ func newSqliteBackend(dsn string, opts ...option) *sqliteBackend { db.SetMaxOpenConns(1) b := &sqliteBackend{ - db: db, - workerName: getWorkerName(options), - options: options, + db: db, + workerName: getWorkerName(options), + options: options, + ownsConnection: true, } // Apply migrations @@ -100,10 +101,43 @@ func newSqliteBackend(dsn string, opts ...option) *sqliteBackend { return b } +// NewSqliteBackendWithDB creates a new SQLite backend using an existing database connection. +// When using this constructor, the backend will not close the database connection when Close() is called. +// Migrations can still be applied using WithApplyMigrations(true) as SQLite does not require +// special connection settings for migrations. +// Note: The caller is responsible for configuring the connection appropriately (e.g., WAL mode, +// busy timeout, max open connections). +func NewSqliteBackendWithDB(db *sql.DB, opts ...option) *sqliteBackend { + options := &options{ + Options: backend.ApplyOptions(), + ApplyMigrations: false, + } + + for _, opt := range opts { + opt(options) + } + + b := &sqliteBackend{ + db: db, + workerName: getWorkerName(options), + options: options, + ownsConnection: false, + } + + if options.ApplyMigrations { + if err := b.Migrate(); err != nil { + panic(err) + } + } + + return b +} + type sqliteBackend struct { - db *sql.DB - workerName string - options *options + db *sql.DB + workerName string + options *options + ownsConnection bool memConn *sql.Conn } @@ -121,6 +155,10 @@ func (sb *sqliteBackend) Close() error { } } + if !sb.ownsConnection { + return nil + } + return sb.db.Close() } diff --git a/backend/sqlite/sqlite_test.go b/backend/sqlite/sqlite_test.go index 83b3a1c2..d6b20c13 100644 --- a/backend/sqlite/sqlite_test.go +++ b/backend/sqlite/sqlite_test.go @@ -1,6 +1,7 @@ package sqlite import ( + "database/sql" "testing" "github.com/cschleiden/go-workflows/backend" @@ -73,3 +74,69 @@ func Test_SqliteBackend_WorkerName(t *testing.T) { require.Equal(t, customWorkerName, backend.workerName) }) } + +func Test_SqliteBackendWithDB(t *testing.T) { + t.Run("UsesProvidedConnection", func(t *testing.T) { + // Create our own database connection + db, err := sql.Open("sqlite", "file:testdb?mode=memory&cache=shared") + require.NoError(t, err) + defer db.Close() + + // Configure connection as recommended + _, err = db.Exec("PRAGMA journal_mode=WAL;") + require.NoError(t, err) + _, err = db.Exec("PRAGMA busy_timeout = 5000;") + require.NoError(t, err) + db.SetMaxOpenConns(1) + + // Create backend with existing connection and apply migrations + backend := NewSqliteBackendWithDB(db, WithApplyMigrations(true)) + + // Verify the backend works + require.NotNil(t, backend) + require.Equal(t, db, backend.db) + require.False(t, backend.ownsConnection) + + // Close backend - should NOT close our connection + err = backend.Close() + require.NoError(t, err) + + // Verify our connection is still usable + err = db.Ping() + require.NoError(t, err) + }) + + t.Run("MigrationsDisabledByDefault", func(t *testing.T) { + db, err := sql.Open("sqlite", "file:testdb2?mode=memory&cache=shared") + require.NoError(t, err) + defer db.Close() + + db.SetMaxOpenConns(1) + + // Create backend without enabling migrations + backend := NewSqliteBackendWithDB(db) + defer backend.Close() + + // Tables should not exist since migrations weren't applied + _, err = db.Exec("SELECT 1 FROM instances LIMIT 1") + require.Error(t, err) // Table doesn't exist + }) + + t.Run("MigrationsCanBeEnabled", func(t *testing.T) { + db, err := sql.Open("sqlite", "file:testdb3?mode=memory&cache=shared") + require.NoError(t, err) + defer db.Close() + + _, err = db.Exec("PRAGMA journal_mode=WAL;") + require.NoError(t, err) + db.SetMaxOpenConns(1) + + // Create backend with migrations enabled + backend := NewSqliteBackendWithDB(db, WithApplyMigrations(true)) + defer backend.Close() + + // Tables should exist + _, err = db.Exec("SELECT 1 FROM instances LIMIT 1") + require.NoError(t, err) + }) +} diff --git a/docs/source/includes/_backends.md b/docs/source/includes/_backends.md index 3f341a9f..a7edb567 100644 --- a/docs/source/includes/_backends.md +++ b/docs/source/includes/_backends.md @@ -21,9 +21,30 @@ func NewSqliteBackend(path string, opts ...option) Create a new SQLite backend instance with `NewSqliteBackend`. +### Using an Existing Connection + +```go +func NewSqliteBackendWithDB(db *sql.DB, opts ...option) +``` + +If you already have a `*sql.DB` connection, you can use `NewSqliteBackendWithDB` to create a backend that uses your existing connection. When using this constructor: + +- The backend will **not** close the database connection when `Close()` is called +- Migrations are **disabled by default** - use `WithApplyMigrations(true)` to enable them +- You are responsible for configuring the connection appropriately (e.g., WAL mode, busy timeout, max open connections) + +```go +db, _ := sql.Open("sqlite", "file:mydb.sqlite?_txlock=immediate") +db.Exec("PRAGMA journal_mode=WAL;") +db.Exec("PRAGMA busy_timeout = 5000;") +db.SetMaxOpenConns(1) + +backend := sqlite.NewSqliteBackendWithDB(db, sqlite.WithApplyMigrations(true)) +``` + ### Options -- `WithApplyMigrations(applyMigrations bool)` - Set whether migrations should be applied on startup. Defaults to `true` +- `WithApplyMigrations(applyMigrations bool)` - Set whether migrations should be applied on startup. Defaults to `true` for `NewSqliteBackend`, `false` for `NewSqliteBackendWithDB` - `WithBackendOptions(opts ...backend.BackendOption)` - Apply generic backend options ### Schema @@ -44,10 +65,35 @@ func NewMysqlBackend(host string, port int, user, password, database string, opt Create a new MySQL backend instance with `NewMysqlBackend`. +### Using an Existing Connection + +```go +func NewMysqlBackendWithDB(db *sql.DB, opts ...option) +``` + +If you already have a `*sql.DB` connection, you can use `NewMysqlBackendWithDB` to create a backend that uses your existing connection. When using this constructor: + +- The backend will **not** close the database connection when `Close()` is called +- Migrations are **disabled by default** +- To enable migrations, you must provide a DSN with `WithMigrationDSN()` that supports multi-statement queries + +```go +db, _ := sql.Open("mysql", "user:pass@tcp(localhost:3306)/mydb?parseTime=true") + +// To enable migrations, provide a DSN with multiStatements=true +migrationDSN := "user:pass@tcp(localhost:3306)/mydb?parseTime=true&multiStatements=true" + +backend := mysql.NewMysqlBackendWithDB(db, + mysql.WithApplyMigrations(true), + mysql.WithMigrationDSN(migrationDSN), +) +``` + ### Options - `WithMySQLOptions(f func(db *sql.DB))` - Apply custom options to the MySQL database connection -- `WithApplyMigrations(applyMigrations bool)` - Set whether migrations should be applied on startup. Defaults to `true` +- `WithApplyMigrations(applyMigrations bool)` - Set whether migrations should be applied on startup. Defaults to `true` for `NewMysqlBackend`, `false` for `NewMysqlBackendWithDB` +- `WithMigrationDSN(dsn string)` - Set the DSN to use for migrations. Required when using `NewMysqlBackendWithDB` with `ApplyMigrations` enabled. The DSN must support multi-statement queries. - `WithBackendOptions(opts ...backend.BackendOption)` - Apply generic backend options @@ -69,10 +115,27 @@ func NewPostgresBackend(host string, port int, user, password, database string, Create a new PostgreSQL backend instance with `NewPostgresBackend`. +### Using an Existing Connection + +```go +func NewPostgresBackendWithDB(db *sql.DB, opts ...option) +``` + +If you already have a `*sql.DB` connection, you can use `NewPostgresBackendWithDB` to create a backend that uses your existing connection. When using this constructor: + +- The backend will **not** close the database connection when `Close()` is called +- Migrations are **disabled by default** - use `WithApplyMigrations(true)` to enable them + +```go +db, _ := sql.Open("pgx", "host=localhost port=5432 user=myuser password=mypass dbname=mydb sslmode=disable") + +backend := postgres.NewPostgresBackendWithDB(db, postgres.WithApplyMigrations(true)) +``` + ### Options - `WithPostgresOptions(f func(db *sql.DB))` - Apply custom options to the PostgreSQL database connection -- `WithApplyMigrations(applyMigrations bool)` - Set whether migrations should be applied on startup. Defaults to `true` +- `WithApplyMigrations(applyMigrations bool)` - Set whether migrations should be applied on startup. Defaults to `true` for `NewPostgresBackend`, `false` for `NewPostgresBackendWithDB` - `WithBackendOptions(opts ...backend.BackendOption)` - Apply generic backend options