diff --git a/dbc/mongo/mgo_connection.go b/dbc/mongo/mgo_connection.go index 8f94f1c..f12e5fd 100644 --- a/dbc/mongo/mgo_connection.go +++ b/dbc/mongo/mgo_connection.go @@ -93,11 +93,21 @@ func (c *Connection) Connect() error { if ci == nil { return errorlib.Error(packageName, modConnection, "Connect", "ConnectionInfo is not initialized") } + if ci.UserName != "" { info.Username = ci.UserName info.Password = ci.Password info.Source = "admin" } + + if ci.Settings != nil { + info.Mechanism = ci.Settings.GetString("authenticationMechanism") + + if val := ci.Settings.GetString("authenticationDatabase"); val != "" { + info.Source = val + } + } + info.Addrs = []string{ci.Host} info.Database = ci.Database @@ -115,6 +125,8 @@ func (c *Connection) Connect() error { info.Timeout = time.Duration(timeout) * time.Second } + // toolkit.Printfn("----- %#v", *info) + //sess, e := mgo.Dial(info.Addrs[0]) sess, e := mgo.DialWithInfo(info) if e != nil { diff --git a/deployment_20180416.zip_aa b/deployment_20180416.zip_aa deleted file mode 100644 index 4792990..0000000 Binary files a/deployment_20180416.zip_aa and /dev/null differ diff --git a/deployment_20180416.zip_ab b/deployment_20180416.zip_ab deleted file mode 100644 index 91b8c22..0000000 Binary files a/deployment_20180416.zip_ab and /dev/null differ diff --git a/deployment_20180416.zip_ac b/deployment_20180416.zip_ac deleted file mode 100644 index a320e3d..0000000 Binary files a/deployment_20180416.zip_ac and /dev/null differ diff --git a/pooling.go b/pooling.go new file mode 100644 index 0000000..e60c9bb --- /dev/null +++ b/pooling.go @@ -0,0 +1,289 @@ +package dbox + +import ( + "context" + "sync" + "time" + + "github.com/eaciit/toolkit" +) + +// DbPooling is database pooling system in dbflex +type DbPooling struct { + sync.RWMutex + size int + items []*PoolItem + fnNew func() (IConnection, error) + + // Timeout max time required to obtain new connection + Timeout time.Duration + + // AutoRelease defines max time for a connection to be auto released after it is being idle. 0 = no autorelease (default) + AutoRelease time.Duration + + // AutoClose defines max time for a connection to be autoclosed after it is being idle. 0 = no auto close (default) + AutoClose time.Duration +} + +// PoolItem is Item in the pool +type PoolItem struct { + sync.RWMutex + conn IConnection + used bool + closed bool + + lastUsed time.Time + + AutoRelease time.Duration + AutoClose time.Duration +} + +// NewDbPooling create new pooling with given size +func NewDbPooling(size int, fnNew func() (IConnection, error)) *DbPooling { + dbp := new(DbPooling) + dbp.size = size + dbp.fnNew = fnNew + dbp.Timeout = time.Second * 2 + return dbp +} + +// Get new connection. If all connection is being used and number of connection is less than +// pool capacity, new connection will be spin off. If capabity has been max out. It will waiting for +// any connection to be released before timeout reach +func (p *DbPooling) Get() (*PoolItem, error) { + ctx, cancel := context.WithTimeout(context.Background(), p.Timeout) + defer cancel() + + cpi := make(chan *PoolItem) + cerr := make(chan error) + + //--- remove closed pi + bufferItems := []*PoolItem{} + for _, pi := range p.items { + if !pi.isClosed() { + bufferItems = append(bufferItems, pi) + } + } + p.items = bufferItems + + go func(ctx context.Context) { + + // check if there is an idle connection from pool. if it is, then use it. + for _, pi := range p.items { + if pi.IsFree() { + pi.Use() + cpi <- pi + return + } + } + + // no idle connections are found from the pool. + // then perform another check. + // if the total created connection is still lower than pool max conn size, create new one. + p.RLock() + shouldCreateNewPoolItem := len(p.items) < p.size + p.RUnlock() + + if shouldCreateNewPoolItem { + + // creating new connection, end the routine if an error occurs + pi, err := p.newItem() + if err != nil { + cerr <- err + return + } + + // add the newly created connection into pool + p.Lock() + p.items = append(p.items, pi) + p.Unlock() + + // use newly created connection, then end the routine + pi.retrieveDbPoolingInfo(p) + pi.Use() + cpi <- pi + return + } + + // block underneath will only be executed if the two criteria below are met: + // 1. no idle connection is found from the pool + // 2. cannot create new connection, because total created conns met allowed max conns + + // what will happen next, we'll wait until `p.Timeout`. + // - if one connection is found idle and not closed before exceeding timeout, then use that one + // - if timeout is exceeded, then return an error + for done := false; !done; { + select { + case <-time.After(10 * time.Millisecond): + for _, pi := range p.items { + if pi.IsFree() && !pi.isClosed() { + pi.retrieveDbPoolingInfo(p) + pi.Use() + cpi <- pi + } + } + + case <-ctx.Done(): + done = true + } + } + }(ctx) + + select { + case pi := <-cpi: + //toolkit.Printfn("Connection is used. Size: %d Count: %d", p.Size(), p.Count()) + return pi, nil + + case err := <-cerr: + return nil, toolkit.Errorf("unable to create new pool item. %s", err.Error()) + + case <-ctx.Done(): + return nil, toolkit.Errorf("Pool size (%d) has been reached", p.size) + } +} + +// Count number of connection within connection pooling +func (p *DbPooling) Count() int { + return len(p.items) +} + +// FreeCount number of item has been released +func (p *DbPooling) FreeCount() int { + i := 0 + for _, pi := range p.items { + if pi.IsFree() && !pi.closed { + i++ + } + } + return i +} + +// ClosedCount number of item has been closed +func (p *DbPooling) ClosedCount() int { + i := 0 + for _, pi := range p.items { + if pi.closed { + i++ + } + } + return i +} + +// Size number of connection can be hold within the connection pooling +func (p *DbPooling) Size() int { + return p.size +} + +// Close all connection within connection pooling +func (p *DbPooling) Close() { + p.Lock() + for _, pi := range p.items { + pi.conn.Close() + } + + p.items = []*PoolItem{} + p.Unlock() +} + +func (p *DbPooling) newItem() (*PoolItem, error) { + conn, err := p.fnNew() + if err != nil { + return nil, toolkit.Errorf("unable to open connection for DB pool. %s", err.Error()) + } + + pi := &PoolItem{conn: conn, used: false} + pi.retrieveDbPoolingInfo(p) + + //-- auto release + go func() { + for { + if pi == nil { + return + } + + if pi.AutoRelease == 0 { + time.Sleep(100 * time.Millisecond) + } else { + select { + case <-time.After(100 * time.Millisecond): + diff := time.Now().Sub(pi.lastUsed) + if diff > pi.AutoRelease && !pi.IsFree() { + pi.Release() + } + } + } + } + }() + + //-- auto close + go func() { + for { + if pi == nil { + return + } + + if pi.AutoClose == 0 { + time.Sleep(100 * time.Millisecond) + } else { + select { + case <-time.After(100 * time.Millisecond): + diff := time.Now().Sub(pi.lastUsed) + if diff > pi.AutoClose && pi.IsFree() { + pi.conn.Close() + pi.Lock() + pi.closed = true + pi.Unlock() + return + } + } + } + } + }() + + return pi, nil +} + +func (pi *PoolItem) retrieveDbPoolingInfo(p *DbPooling) { + pi.AutoClose = p.AutoClose + pi.AutoRelease = p.AutoRelease +} + +func (pi *PoolItem) isClosed() bool { + ret := false + pi.RLock() + ret = pi.closed + pi.RUnlock() + + return ret +} + +// Release PoolItem +func (pi *PoolItem) Release() { + pi.Lock() + pi.used = false + pi.lastUsed = time.Now() + pi.Unlock() +} + +// IsFree check and return true if PoolItem is free +func (pi *PoolItem) IsFree() bool { + free := false + pi.RLock() + free = !pi.used + pi.RUnlock() + + return free +} + +// Use mark that this PoolItem is used +func (pi *PoolItem) Use() { + pi.Lock() + pi.used = true + pi.lastUsed = time.Now() + pi.Unlock() +} + +// Connection return PoolItem connection +func (pi *PoolItem) Connection() IConnection { + return pi.conn +} diff --git a/pooling.md b/pooling.md new file mode 100644 index 0000000..9ea9c03 --- /dev/null +++ b/pooling.md @@ -0,0 +1,114 @@ +# Pooling + +Use `dbox.NewDbPooling()` to create a db connection pool. Fill the 1st parameter with a number represent the max allowed connection, and the 2nd parameter with closure that return `(dbox.IConnection, error)` data. + +The database connection initialization will happen inside the closure. + +```go +func DBConnection() (*dbox.DbPooling, error) { + connectionPool := dbox.NewDbPooling(5, func() (dbox.IConnection, error) { + connInfo := &dbox.ConnectionInfo{ + Host: "localhost:27123", + Database: "test", + UserName: "", + Password: "", + } + + conn, err := dbox.NewConnection("mongo", connInfo) + if err != nil { + return nil, err + } + + err = conn.Connect() + if err != nil { + return nil, err + } + + return conn, nil + }) + connectionPool.Timeout = 30 * time.Second + + return connectionPool, nil +} + +func main() { + pool, err := DBConnection() + defer pool.Close() + if err != nil { + log.Fatal(err.Error()) + return + } + + // ... +} +``` + +On code above, max allowed connection is set to `5`, meaning that only 5 concurrent connection are allowed (remember, **CONCURRENT**, meaning the processes are happening in near-same time!). + +If another one connection needed, meanwhile the current active connection are met the maximum number allowed, then it'll go into queue. If, before the `connectionPool.Timeout` there is one free connection, then it'll be immediately used. + +Use the `.Get()` method from pool object to get one what-so-called pool item (contains the db connection object). Better to immediately defer `.Release()` the pool item after being used. + +```go +poolItem, err := pool.Get() +defer poolItem.Release() +if err != nil { + log.Fatal(err.Error()) + return +} + +// ... +``` + +If some database process is required inside a loop, don't use defer release, instead release it explicitly after the process done. Because defer will make the statement executed at the end of block function. And loop is not a block of function. + +```go +for _, each := range data { + poolItem, err := pool.Get() + + if err != nil { + log.Error(err.Error()) + poolItem.Release() + continue + } + + // ... + poolItem.Release() +} +``` + +Or, just wrap it with anonymous function. + +```go +for _, each := range data { + func () { + poolItem, err := pool.Get() + defer poolItem.Release() + + if err != nil { + log.Error(err.Error()) + return + } + + // ... + }() +} +``` + +If the process inside the loop will be executed concurrently, don't forget to pass the loop data. No need to pass the `pool` object since it's already a pointer. + +```go +for _, each := range data { + go func (each Data) { + poolItem, err := pool.Get() + defer poolItem.Release() + + if err != nil { + log.Error(err.Error()) + return + } + + // ... + }(each) +} +``` diff --git a/pooling_test.go b/pooling_test.go new file mode 100644 index 0000000..3366644 --- /dev/null +++ b/pooling_test.go @@ -0,0 +1,175 @@ +package dbox + +import ( + "testing" + "time" + + "github.com/eaciit/toolkit" + + . "github.com/smartystreets/goconvey/convey" +) + +type fakeConnection struct { + ConnectionBase +} + +func (c *fakeConnection) Connect() error { + return nil +} + +func (c *fakeConnection) Close() { +} + +func TestDbPooling(t *testing.T) { + Convey("DB Pooling", t, func() { + p := NewDbPooling(3, func() (IConnection, error) { + conn := new(fakeConnection) + conn.Connect() + return conn, nil + }) + + Convey("Initiate connections", func() { + pi1, err1 := p.Get() + pi2, err2 := p.Get() + + Convey("Connection are generated", func() { + So(pi1, ShouldNotBeNil) + So(pi2, ShouldNotBeNil) + }) + + Convey("Error are nil", func() { + So(err1, ShouldBeNil) + So(err2, ShouldBeNil) + }) + + Convey("Count should be 2", func() { + So(p.Count(), ShouldEqual, 2) + }) + + Convey("Make more connection", func() { + p.Get() + Convey("Count should be 3", func() { + So(p.Count(), ShouldEqual, 3) + }) + + Convey("Make 4th connection, should be rejected", func() { + _, err := p.Get() + So(err, ShouldNotBeNil) + toolkit.Printfn("\nErr: %s", err.Error()) + + Convey("Count should be still 3", func() { + So(p.Count(), ShouldEqual, 3) + }) + }) + + Convey("Release a connection", func() { + pi1.Release() + + Convey("Err should be nil", func() { + _, err := p.Get() + So(err, ShouldBeNil) + + Convey("Count remains 3", func() { + So(p.Count(), ShouldEqual, 3) + + p.Close() + }) + }) + }) + }) + }) + }) +} + +func TestPoolingQue(t *testing.T) { + Convey("Test Que", t, func() { + p := NewDbPooling(3, func() (IConnection, error) { + conn := new(fakeConnection) + conn.Connect() + return conn, nil + }) + + Convey("Prepare 3 connections", func() { + p1, err1 := p.Get() + _, err2 := p.Get() + _, err3 := p.Get() + + So(err1, ShouldBeNil) + So(err2, ShouldBeNil) + So(err3, ShouldBeNil) + + Convey("Get 4th connection and it should wait for 1s", func() { + go func() { + time.Sleep(1 * time.Second) + p1.Release() + }() + + _, err2 := p.Get() + So(err2, ShouldBeNil) + + p.Close() + }) + }) + }) +} + +func TestAutorelease(t *testing.T) { + Convey("Test Autorelease", t, func() { + p := NewDbPooling(3, func() (IConnection, error) { + conn := new(fakeConnection) + conn.Connect() + return conn, nil + }) + p.AutoRelease = 1 * time.Second + + Convey("Prepare 3 connections and hold them for 1.2s", func() { + _, err1 := p.Get() + _, err2 := p.Get() + _, err3 := p.Get() + + So(err1, ShouldBeNil) + So(err2, ShouldBeNil) + So(err3, ShouldBeNil) + + time.Sleep(1200 * time.Millisecond) + + Convey("All of 3 connection should be release already", func() { + So(p.FreeCount(), ShouldEqual, 3) + }) + }) + }) +} + +func TestAutoClose(t *testing.T) { + Convey("Test Auto Close", t, func() { + p := NewDbPooling(3, func() (IConnection, error) { + conn := new(fakeConnection) + conn.Connect() + return conn, nil + }) + + Convey("Prepare 3 connections and hold them for 1.2s while set autoclose of first one to 1s", func() { + p1, err1 := p.Get() + p1.AutoClose = 1 * time.Second + + _, err2 := p.Get() + _, err3 := p.Get() + + So(err1, ShouldBeNil) + So(err2, ShouldBeNil) + So(err3, ShouldBeNil) + + go func() { + time.Sleep(1 * time.Second) + p1.Release() + }() + + time.Sleep(2200 * time.Millisecond) + + Convey("1 conn should be closed. 2 are not yet", func() { + So(p.ClosedCount(), ShouldEqual, 1) + So(p.FreeCount(), ShouldEqual, 0) + }) + }) + }) +} diff --git a/readme.md b/readme.md index 2f2c783..53d7253 100644 --- a/readme.md +++ b/readme.md @@ -184,3 +184,31 @@ Delete is as simple as select. panic("Query Failed") } ``` + +## Additional info for MongoDB driver + +For MongoDB driver, it is possible to pass optional authentication options through `Settings`. Those two are: `authenticationDatabase` and `authenticationMechanism`. + +The default value for `authenticationDatabase` is `admin`. + +Please refer to https://docs.mongodb.com/manual/reference/program/mongo/#authentication-options for more informations. + +### Example + +```go +connInfo := &db.ConnectionInfo{ + Host: "localhost:27123", + Database: "dbname", + UserName: "", + Password: "", + Settings: tk.M{}. + Set("authenticationDatabase", "adminuser"). + Set("authenticationMechanism", "SCRAM-SHA-256"), +} + +conn, err := db.NewConnection("mongo", connInfo) +if err != nil { + log.Fatal(err.Error()) + os.Exit(0) +} +```