From 31f4ab44e567f62a9d60a9471c1e98a14de964be Mon Sep 17 00:00:00 2001 From: rayman520 Date: Mon, 12 Nov 2018 14:42:44 +0100 Subject: [PATCH 1/8] git pull dev --- base/cron.go | 60 +++++++ base/worker.go | 459 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 519 insertions(+) create mode 100644 base/cron.go create mode 100644 base/worker.go diff --git a/base/cron.go b/base/cron.go new file mode 100644 index 0000000..9fdc4dc --- /dev/null +++ b/base/cron.go @@ -0,0 +1,60 @@ +package base + +import ( + "fmt" + + "github.com/hexya-erp/hexya/hexya/models" + + "time" + + "github.com/hexya-erp/hexya/pool/h" +) + +var schedulerUpdateChan chan bool + +func init() { + h.Cron().DeclareModel() + h.Cron().AddFields(map[string]models.FieldDefinition{ + "Name": models.CharField{ + String: "Cron", + Unique: true}, + "TargetWorker": models.CharField{ + String: "Worker", + Default: models.DefaultValue("Main")}, + "TargetModel": models.CharField{ + String: "Model", + Required: true}, + "TargetMethod": models.CharField{ + String: "Method", + Required: true}, + "ModelMethodStr": models.CharField{ + String: "Method", + Compute: h.Cron().Methods().ComputeModelMethodStr()}, + }) + + h.Cron().Methods().ComputeModelMethodStr().DeclareMethod( + ``, + func(rs h.CronSet) h.CronData { + return h.CronData{ModelMethodStr: fmt.Sprintf("%s - %s", rs.TargetModel(), rs.TargetMethod())} + }) + + h.Cron().Methods().StartScheduler().DeclareMethod( + ``, + func(rs h.CronSet) { + schedulerUpdateChan = make(chan bool) + go rs.SchedulerLoop(15 * time.Minute) + }) + + h.Cron().Methods().SchedulerLoop().DeclareMethod( + ``, + func(rs h.CronSet, next time.Duration) { + for { + select { + case <-time.After(next): + fmt.Println("time trigger") + case <-schedulerUpdateChan: + fmt.Println("update") + } + } + }) +} diff --git a/base/worker.go b/base/worker.go new file mode 100644 index 0000000..765c2f4 --- /dev/null +++ b/base/worker.go @@ -0,0 +1,459 @@ +package base + +import ( + "fmt" + "time" + + "errors" + "reflect" + + "encoding/json" + + "strconv" + + "strings" + + "github.com/hexya-erp/hexya-base/base/workerMechanics" + "github.com/hexya-erp/hexya/hexya/models" + "github.com/hexya-erp/hexya/hexya/models/security" + "github.com/hexya-erp/hexya/hexya/models/types" + "github.com/hexya-erp/hexya/hexya/models/types/dates" + "github.com/hexya-erp/hexya/pool/h" + "github.com/hexya-erp/hexya/pool/q" + + "github.com/google/uuid" +) + +func init() { + h.Worker().DeclareModel() + h.Worker().AddFields(map[string]models.FieldDefinition{ + "Name": models.CharField{ + String: "Worker", + Unique: true}, + "MaxThreads": models.IntegerField{ + String: "Threads"}, + "JobsInQueue": models.IntegerField{ + String: "Jobs in Queue", + Compute: h.Worker().Methods().GetJobsInQueueCount()}, + "PauseTime": models.IntegerField{}, + "IsRunning": models.BooleanField{}, + }) + + h.Worker().Methods().DummyFunc().DeclareMethod( + ``, + func(rs h.WorkerSet, str string) { + fmt.Println("lel ", str) + time.Sleep(2 * time.Second) + }) + + h.Worker().Methods().DummyFuncPanic().DeclareMethod( + ``, + func(rs h.WorkerSet) { + panic("PAAAAAAANIC") + }) + + h.Worker().Methods().DummyFuncSingleReturn().DeclareMethod( + ``, + func(rs h.WorkerSet) string { + return "hoy" + }) + + h.Worker().Methods().DummyFuncMult().DeclareMethod( + ``, + func(rs h.WorkerSet, a, b float64) (float64, float64, float64) { + return a, b, float64(a * b) + }) + + h.Worker().Methods().GetJobsInQueueCount().DeclareMethod( + `returns the ammount of jobs currently in worker queue`, + func(rs h.WorkerSet) *h.WorkerData { + QSize := h.WorkerJob().Search(rs.Env(), q.WorkerJob().ParentWorkerName().Equals(rs.Name())).Len() + if QSize == 0 { + for i := 0; i < 20; i++ { + switch { + case i%3 == 0: + rs.Enqueue(h.Worker().Methods().DummyFuncSingleReturn()) + case i%5 == 0: + rs.Enqueue(h.Worker().Methods().DummyFuncPanic()) + case i%7 == 0: + rs.WithParams(i, i+2).Enqueue(h.Worker().Methods().DummyFuncMult()) + default: + rs.WithWorker("Main").WithParams(strconv.Itoa(i)).Enqueue(h.Worker().Methods().DummyFunc()) + } + } + } + return &h.WorkerData{JobsInQueue: int64(QSize)} + }) + + h.Worker().Methods().CreateAndRegisterNewWorker().DeclareMethod( + ``, + func(rs h.WorkerSet, wo workerMechanics.Worker) *workerMechanics.Worker { + w := rs.CreateNewWorker(wo) + rs.RegisterWorker(w) + return w + }) + + h.Worker().Methods().CreateRegisterStartNewWorker().DeclareMethod( + ``, + func(rs h.WorkerSet, wo workerMechanics.Worker) *workerMechanics.Worker { + w := rs.CreateAndRegisterNewWorker(wo) + rs.StartWorker(w) + return w + }) + + h.Worker().Methods().CreateNewWorker().DeclareMethod( + ``, + func(rs h.WorkerSet, w workerMechanics.Worker) *workerMechanics.Worker { + var out *workerMechanics.Worker + models.ExecuteInNewEnvironment(rs.Env().Uid(), func(env models.Environment) { + out = workerMechanics.CreateNewWorker(w) + }) + return out + }) + + h.Worker().Methods().RegisterWorker().DeclareMethod( + ``, + func(rs h.WorkerSet, w *workerMechanics.Worker) { + models.ExecuteInNewEnvironment(rs.Env().Uid(), func(env models.Environment) { + err := registerWorker(w, rs) + if err != nil { + log.Error(fmt.Sprintf("%v", err)) + } + }) + }) + + h.Worker().Methods().StartWorker().DeclareMethod( + ``, + func(rs h.WorkerSet, w *workerMechanics.Worker) { + models.ExecuteInNewEnvironment(rs.Env().Uid(), func(env models.Environment) { + err := w.StartWorker(rs.Env()) + if err != nil { + log.Error(fmt.Sprintf("%v", err)) + } else { + go rs.WorkerLoop(w) + for i := 0; i < w.MaxThreads; i++ { + w.Threadschan <- true + } + } + }) + }) + + h.Worker().Methods().GetWorker().DeclareMethod( + ``, + func(rs h.WorkerSet, str string) *workerMechanics.Worker { + return workerMechanics.Worker{}.Get(str) + }) + + h.Worker().Methods().LoadWorkers().DeclareMethod( + ``, + func(rs h.WorkerSet) { + set := h.Worker().Search(rs.Env(), q.Worker().ID().Greater(-1)) + for _, s := range set.All() { + neww := workerMechanics.CreateNewWorker(workerMechanics.Worker{ + Name: s.Name, + PauseTime: time.Duration(s.PauseTime) * time.Second, + MaxThreads: int(s.MaxThreads), + }) + workerMechanics.EndRegistration(neww) + go rs.WorkerLoop(neww) + for i := 0; i < neww.MaxThreads; i++ { + neww.Threadschan <- true + } + } + if rs.GetWorker("Main") == nil { + rs.CreateRegisterStartNewWorker(workerMechanics.Worker{ + Name: "Main", + PauseTime: 1 * time.Second, + MaxThreads: 1, + }) + rs.WithWorker("Main").WithParams("hey").Enqueue(h.Worker().Methods().DummyFunc()) + + } + }) + + h.Worker().Methods().WorkerLoop().DeclareMethod( + ``, + func(rs h.WorkerSet, w *workerMechanics.Worker) bool { + for { + select { + case <-w.Threadschan: + var hadJob bool + models.ExecuteInNewEnvironment(security.SuperUserID, func(env2 models.Environment) { + set := h.WorkerJob().Search(env2, q.WorkerJob().ParentWorkerName().Equals(w.Name)) + hadJob = set.Len() > 0 + if hadJob { + res := set.Sorted(func(rs1, rs2 h.WorkerJobSet) bool { + if rs1.CreateDate().LowerEqual(rs2.CreateDate()) { + return true + } + return false + }).All()[0] + set.Browse([]int64{res.ID}).Unlink() + h.Worker().NewSet(env2).Execute(w, res) + } + }) + if !hadJob { + w.Threadschan <- true + time.Sleep(w.PauseTime) + } + default: + time.Sleep(w.PauseTime) + } + } + }) + + h.Worker().Methods().Execute().DeclareMethod( + ``, + func(rs h.WorkerSet, w *workerMechanics.Worker, res *h.WorkerJobData) { + models.ExecuteInNewEnvironment(security.SuperUserID, func(env3 models.Environment) { + historyEntry := h.WorkerJobHistory().Search(env3, q.WorkerJobHistory().TaskUUID().Equals(res.TaskUUID)) + historyEntry.SetStatus("running") + historyEntry.SetStartDate(dates.Now()) + }) + go models.ExecuteInNewEnvironment(security.SuperUserID, func(env2 models.Environment) { + historyEntry := h.WorkerJobHistory().Search(env2, q.WorkerJobHistory().TaskUUID().Equals(res.TaskUUID)) + if _, ok := models.Registry.Get(res.ModelName); !ok { + historyEntry.SetStatus("fail") + historyEntry.SetMethodOutput(fmt.Sprintf("error: no Model known as '%s'", res.ModelName)) + w.Threadschan <- true + return + } + rc := env2.Pool(res.ModelName) + method, ok := rc.Model().Methods().Get(res.Method) + if !ok { + historyEntry.SetStatus("fail") + historyEntry.SetMethodOutput(fmt.Sprintf("error: no method known as '%s' in model '%s'", res.Method, res.ModelName)) + w.Threadschan <- true + return + } + json.Unmarshal([]byte(res.Method), &method) + var params interface{} + json.Unmarshal([]byte(res.ParamsJson), ¶ms) + var out []interface{} + err := models.ExecuteInNewEnvironment(security.SuperUserID, func(env3 models.Environment) { + if params == nil { + out = method.CallMulti(env3.Pool(res.ModelName)) + } else { + out = method.CallMulti(env3.Pool(res.ModelName), interfaceSlice(params)...) + } + }) + historyEntry.SetReturnDate(dates.Now()) + if err != nil { + historyEntry.SetStatus("fail") + split := strings.Split(err.Error(), "\n----------------------------------\n") + historyEntry.SetMethodOutput(fmt.Sprintf("error: %s", split[0])) + historyEntry.SetExcInfo(split[1]) + } else { + historyEntry.SetStatus("done") + outStr := "" + for _, o := range out { + outStr += fmt.Sprintf("%v\n", o) + } + historyEntry.SetMethodOutput(outStr) + } + w.Threadschan <- true + }) + }) + + h.WorkerJob().DeclareModel() + h.WorkerJob().AddFields(map[string]models.FieldDefinition{ + "Name": models.CharField{ + String: "name", + }, + "Method": models.CharField{}, + "ModelName": models.CharField{}, + "ParamsJson": models.CharField{}, + "ParentWorkerName": models.CharField{}, + "TaskUUID": models.CharField{}, + }) + + h.WorkerJobHistory().DeclareModel() + h.WorkerJobHistory().AddFields(map[string]models.FieldDefinition{ + "Name": models.CharField{ + String: "Method Name", + Default: models.DefaultValue("Custom Job"), + Required: true, + }, + "WorkerName": models.CharField{ + String: "Worker", + Default: models.DefaultValue("Main"), + }, + "ModelName": models.CharField{ + String: "Model", + Required: true, + }, + "MethodName": models.CharField{ + String: "Method", + Required: true, + }, + "ParamsJson": models.TextField{ + String: "Parameters", + Help: "The parameters given to the method. With JSON formating.", + }, + "MethodOutput": models.TextField{ + String: "Return Value", + ReadOnly: true, + }, + "ExcInfo": models.TextField{ + String: "Exception Information", + ReadOnly: true, + }, + "Status": models.SelectionField{ + String: "Job Status", + Selection: types.Selection{ + "pending": "Pending", + "cancel": "Canceled", + "running": "Running", + "abort": "Aborted", + "done": "Done", + "fail": "Failed", + }, + ReadOnly: true, + }, + "QueuedDate": models.DateTimeField{ + String: "Queued at", + ReadOnly: true, + }, + "StartDate": models.DateTimeField{ + String: "Started at", + ReadOnly: true, + }, + "ReturnDate": models.DateTimeField{ + String: "finished at", + ReadOnly: true, + }, + "TaskUUID": models.CharField{ + ReadOnly: true, + }, + }) + h.WorkerJobHistory().Fields().CreateDate().SetReadOnly(true) + + h.WorkerJobHistory().Methods().ButtonDone().DeclareMethod( + ``, + func(rs h.WorkerJobHistorySet) { + switch rs.Status() { + case "": + panic("Please finish creating the job before trying to mark it as done") + case "done": + panic("The Job is already marked as done") + case "pending": + panic("You can't mark a pending job as done. please cancel it first") + default: + //confirmation box + rs.SetStatus("done") + } + }) + + h.WorkerJobHistory().Methods().Requeue().DeclareMethod( + ``, + func(rs h.WorkerJobHistorySet) { + if rs.Status() == "pending" { + panic("You can't requeue a job that is already queued") + } else if rs.Status() == "" { + panic("Please finish creating the job before trying to requeue it") + } + h.WorkerJob().Create(rs.Env(), &h.WorkerJobData{ + Name: rs.Name(), + Method: rs.MethodName(), + ModelName: rs.ModelName(), + ParamsJson: rs.ParamsJson(), + ParentWorkerName: rs.WorkerName(), + TaskUUID: rs.TaskUUID(), + }) + rs.SetStatus("pending") + rs.SetQueuedDate(dates.Now()) + rs.SetExcInfo("") + }) + + h.WorkerJobHistory().Methods().Create().Extend( + ``, + func(set h.WorkerJobHistorySet, data *h.WorkerJobHistoryData, namer ...models.FieldNamer) h.WorkerJobHistorySet { + if data.TaskUUID == "" { + data.TaskUUID = uuid.New().String() + } + if data.WorkerName == "" { + data.WorkerName = "Main" + } + h.WorkerJob().Create(set.Env(), &h.WorkerJobData{ + Name: data.Name, + Method: data.MethodName, + ModelName: data.ModelName, + ParamsJson: data.ParamsJson, + ParentWorkerName: data.WorkerName, + TaskUUID: data.TaskUUID, + }) + data.Status = "pending" + data.QueuedDate = dates.Now() + return set.Super().Create(data, namer...) + }) + + h.JobArgs().DeclareModel() + h.JobArgs().AddFields(map[string]models.FieldDefinition{ + "WorkerName": models.CharField{}, + "ModelName": models.CharField{}, + "Methoder": models.CharField{}, + "Params": models.CharField{}, + }) + + h.JobArgs().Methods().WithParams().DeclareMethod( + ``, + func(rs h.JobArgsSet, params ...interface{}) h.JobArgsSet { + paramsJson, _ := json.Marshal(params) + rs.SetParams(string(paramsJson)) + return rs + }) + + h.JobArgs().Methods().WithWorker().DeclareMethod( + ``, + func(rs h.JobArgsSet, workerName string) h.JobArgsSet { + rs.SetWorkerName(workerName) + return rs + }) + + h.JobArgs().Methods().Enqueue().DeclareMethod( + ``, + func(rs h.JobArgsSet, method models.Methoder) { + h.WorkerJobHistory().Create(rs.Env(), &h.WorkerJobHistoryData{ + Name: method.Underlying().Name(), + Status: "pending", + WorkerName: rs.WorkerName(), + ModelName: rs.ModelName(), + MethodName: method.Underlying().Name(), + ParamsJson: rs.Params(), + QueuedDate: dates.Now(), + }) + }) +} + +func registerWorker(w *workerMechanics.Worker, rs h.WorkerSet) error { + if w.Registered() == false { + if rs.Search(q.Worker().Name().Equals(w.Name)).Len() > 0 { + return errors.New(fmt.Sprintf(`Could not register worker "%s". another worker with this name is already registered`, w.Name)) + } + if w.PauseTime.Seconds() < 1 { + w.PauseTime = 1 * time.Second + } + rs.Create(&h.WorkerData{ + Name: w.Name, + MaxThreads: int64(w.MaxThreads), + PauseTime: int64(w.PauseTime / time.Second), + }) + workerMechanics.EndRegistration(w) + } + return nil +} + +func interfaceSlice(slice interface{}) []interface{} { + s := reflect.ValueOf(slice) + if s.Kind() != reflect.Slice { + panic("interfaceSlice() given a non-slice type") + } + + ret := make([]interface{}, s.Len()) + + for i := 0; i < s.Len(); i++ { + ret[i] = s.Index(i).Interface() + } + + return ret +} From 02ebececd605e2e27cd12386be4ee73d132e634e Mon Sep 17 00:00:00 2001 From: rayman520 Date: Mon, 12 Nov 2018 14:57:26 +0100 Subject: [PATCH 2/8] workers first commit --- base/workerMechanics/mechanics.go | 61 +++++++++++++++++++++++++++++++ base/workerMechanics/typedefs.go | 30 +++++++++++++++ 2 files changed, 91 insertions(+) create mode 100644 base/workerMechanics/mechanics.go create mode 100644 base/workerMechanics/typedefs.go diff --git a/base/workerMechanics/mechanics.go b/base/workerMechanics/mechanics.go new file mode 100644 index 0000000..4977a84 --- /dev/null +++ b/base/workerMechanics/mechanics.go @@ -0,0 +1,61 @@ +package workerMechanics + +import ( + "errors" + "fmt" + "time" + + "github.com/hexya-erp/hexya/hexya/models" + "github.com/hexya-erp/hexya/hexya/tools/strutils" +) + +func CreateNewWorker(w Worker) *Worker { + out := &Worker{ + Name: strutils.MakeUnique(w.Name, workers.names), + PauseTime: 1000 * time.Millisecond, + MaxThreads: 0, + created: true, + } + if w.PauseTime != 0 { + out.PauseTime = w.PauseTime + } + if w.MaxThreads != 0 { + out.MaxThreads = w.MaxThreads + } + out.Threadschan = make(chan bool, out.MaxThreads) + return out +} + +func (w *Worker) Registered() bool { + return w.registered +} + +func EndRegistration(w *Worker) { + w.registered = true + if workers.workers == nil { + workers.workers = make(map[string]*Worker) + } + workers.workers[w.Name] = w + workers.names = append(workers.names, w.Name) +} + +func (w *Worker) StartWorker(env models.Environment) error { + switch { + case !w.created: + return errors.New(fmt.Sprintf("worker %s attempted to start without being created.", w.Name)) + case !w.registered: + return errors.New(fmt.Sprintf("worker %s attempted to start without being registered.", w.Name)) + case w.running: + return errors.New(fmt.Sprintf("worker %s is already running", w.Name)) + } + return nil +} + +func (w Worker) Get(str string) *Worker { + for k, v := range workers.workers { + if k == str { + return v + } + } + return nil +} diff --git a/base/workerMechanics/typedefs.go b/base/workerMechanics/typedefs.go new file mode 100644 index 0000000..4abd88c --- /dev/null +++ b/base/workerMechanics/typedefs.go @@ -0,0 +1,30 @@ +package workerMechanics + +import ( + "time" + + "github.com/hexya-erp/hexya/hexya/models" +) + +type JobPreArgs struct { + WorkerName string + RecCol *models.RecordCollection + Params []interface{} +} + +var workers workerList + +type Worker struct { + Name string + PauseTime time.Duration + MaxThreads int + Threadschan chan bool + created bool + registered bool + running bool +} + +type workerList struct { + workers map[string]*Worker + names []string +} From 25456d22f02c350a8d7144d03ad1cd9dc682baf7 Mon Sep 17 00:00:00 2001 From: rayman520 Date: Tue, 20 Nov 2018 15:03:53 +0100 Subject: [PATCH 3/8] Crons & Workers near done --- base/000hexya.go | 3 + base/cron.go | 378 +++++++++++++++++++++++++++++- base/model_mixin.go | 29 +++ base/resources/cron.xml | 352 ++++++++++++++++++++++++++++ base/resources/menus.xml | 1 + base/resources/worker.xml | 100 ++++++++ base/worker.go | 192 +++++---------- base/workerMechanics/mechanics.go | 61 ----- base/workerMechanics/typedefs.go | 30 --- 9 files changed, 914 insertions(+), 232 deletions(-) create mode 100644 base/resources/cron.xml create mode 100644 base/resources/worker.xml delete mode 100644 base/workerMechanics/mechanics.go delete mode 100644 base/workerMechanics/typedefs.go diff --git a/base/000hexya.go b/base/000hexya.go index f3266ed..60a1da1 100644 --- a/base/000hexya.go +++ b/base/000hexya.go @@ -35,7 +35,10 @@ func init() { Name: MODULE_NAME, PostInit: func() { err := models.ExecuteInNewEnvironment(security.SuperUserID, func(env models.Environment) { + threadsChanMap = make(map[string]chan bool) h.Group().NewSet(env).ReloadGroups() + h.Worker().NewSet(env).LoadWorkers() + h.Cron().NewSet(env).StartScheduler() }) if err != nil { log.Panic("Error while initializing", "error", err) diff --git a/base/cron.go b/base/cron.go index 9fdc4dc..f767e17 100644 --- a/base/cron.go +++ b/base/cron.go @@ -7,7 +7,19 @@ import ( "time" + "encoding/json" + + "strings" + + "strconv" + + "sort" + + "github.com/hexya-erp/hexya/hexya/models/security" + "github.com/hexya-erp/hexya/hexya/models/types" + "github.com/hexya-erp/hexya/hexya/models/types/dates" "github.com/hexya-erp/hexya/pool/h" + "github.com/hexya-erp/hexya/pool/q" ) var schedulerUpdateChan chan bool @@ -16,45 +28,387 @@ func init() { h.Cron().DeclareModel() h.Cron().AddFields(map[string]models.FieldDefinition{ "Name": models.CharField{ - String: "Cron", - Unique: true}, - "TargetWorker": models.CharField{ - String: "Worker", - Default: models.DefaultValue("Main")}, + String: "Cron", + Unique: true, + Required: true, + Default: models.DefaultValue("Scheduled Func"), + Constraint: h.Cron().Methods().ConstraintCronCreation()}, + "ModelMethodStr": models.CharField{ + String: "Method", + Compute: h.Cron().Methods().ComputeModelMethodStr()}, + "Status": models.BooleanField{ + Default: models.DefaultValue(true)}, + "ExecuteCount": models.IntegerField{ + ReadOnly: true}, + "ExecuteNET": models.CharField{ + Compute: h.Cron().Methods().ComputeExecuteNET(), + ReadOnly: true}, + /* ---------------------------------------------------------------------------------------------------------- */ + "TargetWorker": models.Many2OneField{ + Required: true, + RelationModel: h.Worker()}, "TargetModel": models.CharField{ String: "Model", Required: true}, "TargetMethod": models.CharField{ String: "Method", Required: true}, - "ModelMethodStr": models.CharField{ - String: "Method", - Compute: h.Cron().Methods().ComputeModelMethodStr()}, + "TargetParams": models.TextField{ + String: "Parameters", + Default: models.DefaultValue("[]")}, + /* ---------------------------------------------------------------------------------------------------------- */ + "TimeAtSelector": models.SelectionField{ + String: "At", + Selection: types.Selection{ + "time": "time", + "date": "date"}, + Default: models.DefaultValue("date")}, + "TimeAtDate": models.DateTimeField{ + Default: models.DefaultValue(dates.Now())}, + "Catchup": models.BooleanField{}, + /* ---------------------------------------------------------------------------------------------------------- */ + "RepeatBool": models.BooleanField{}, + "RepeatLapseAmmount": models.IntegerField{ + Default: models.DefaultValue(2)}, + "RepeatLapseSelection": models.SelectionField{ + Selection: types.Selection{ + `month`: `Months`, + `day`: `Days`, + `hour`: `Hours`, + `minute`: `Minutes`, + `second`: `Seconds`, + }, + Default: models.DefaultValue("minute"), + Required: true}, + "RepeatAmmountBool": models.BooleanField{}, + "RepeatAmmount": models.IntegerField{ + Default: models.DefaultValue(5)}, + /* ---------------------------------------------------------------------------------------------------------- */ + "MaskBool": models.BooleanField{}, + "Mask": models.Many2OneField{ + RelationModel: h.CronTimeMask(), + }, }) + h.Cron().Methods().ConstraintCronCreation().DeclareMethod( + ``, + func(rs h.CronSet) { + var out string + if h.Worker().NewSet(rs.Env()).GetWorker(rs.TargetWorker().Name()) == nil { + out += fmt.Sprintf("No Worker found with name '%s'\n", rs.TargetWorker()) + } + model, ok := models.Registry.Get(rs.TargetModel()) + if !ok { + out += fmt.Sprintf("No Model found with name '%s'\n", rs.TargetModel()) + } else if _, ok := model.Methods().Get(rs.TargetMethod()); !ok { + out += fmt.Sprintf("No Method found in '%s' as '%s'\n", rs.TargetModel(), rs.TargetMethod()) + } + var js interface{} + if err := json.Unmarshal([]byte(rs.TargetParams()), &js); err != nil { + out += fmt.Sprintf("Parameters could not be Unmarshalled: %v\n", err) + } + if rs.RepeatBool() { + if rs.RepeatLapseAmmount() < 0 { + out += fmt.Sprintln("Lapse ammount can't be negative") + } + if rs.RepeatAmmount() < 0 { + out += fmt.Sprintln("Repeat ammount can't be negative") + } + } + if out != "" { + panic(out) + } + }) + h.Cron().Methods().ComputeModelMethodStr().DeclareMethod( ``, func(rs h.CronSet) h.CronData { + rs.TimeAtSelector() return h.CronData{ModelMethodStr: fmt.Sprintf("%s - %s", rs.TargetModel(), rs.TargetMethod())} }) + h.Cron().Methods().ComputeExecuteNET().DeclareMethod( + ``, + func(rs h.CronSet) h.CronData { + etaDateTime := rs.TimeAtDate().UTC().Sub(dates.Now().UTC()) + if etaDateTime.Seconds() == 0 { + return h.CronData{ExecuteNET: "ASAP"} + } + days := etaDateTime / (24 * time.Hour) + etaDateTime = etaDateTime % (24 * time.Hour) + hours := etaDateTime / time.Hour + etaDateTime = etaDateTime % time.Hour + minutes := etaDateTime / time.Minute + etaDateTime = etaDateTime % time.Minute + seconds := etaDateTime / time.Second + o := []rune(fmt.Sprintf("%dd %dh %dm %ds", days, hours, minutes, seconds)) + var out []rune + var switc bool + for _, c := range o { + if c >= '1' && c <= '9' { + switc = true + } + if switc { + out = append(out, c) + } else { + out = append(out, ' ') + } + } + outStr := strings.TrimSpace(string(out)) + if outStr == "" { + outStr = "Now" + } + return h.CronData{ExecuteNET: string(outStr)} + }) + + h.Cron().Methods().ButtonResume().DeclareMethod( + ``, + func(rs h.CronSet) { + rs.SetStatus(true) + go func() { + time.Sleep(50 * time.Millisecond) + schedulerUpdateChan <- true + }() + }) + + h.Cron().Methods().ButtonSuspend().DeclareMethod( + ``, + func(rs h.CronSet) { + rs.SetStatus(false) + go func() { + time.Sleep(50 * time.Millisecond) + schedulerUpdateChan <- true + }() + }) + + h.Cron().Methods().ButtonRun().DeclareMethod( + ``, + func(rs h.CronSet) { + var param interface{} + json.Unmarshal([]byte(rs.TargetParams()), ¶m) + params := interfaceSlice(param) + h.Worker().NewSet(rs.Env()).WithWorker(rs.TargetWorker().Name()).WithParams(params...).Enqueue(models.Registry.MustGet(rs.TargetModel()).Methods().MustGet(rs.TargetMethod())) + }) + + h.Cron().Methods().ButtonRefresh().DeclareMethod( + ``, + func(rs h.CronSet) { + go func() { + time.Sleep(50 * time.Millisecond) + schedulerUpdateChan <- true + }() + }) + + h.Cron().Methods().Create().Extend( + ``, + func(rs h.CronSet, data *h.CronData, namer ...models.FieldNamer) h.CronSet { + out := rs.Super().Create(data, namer...) + go func() { + time.Sleep(50 * time.Millisecond) + schedulerUpdateChan <- true + }() + return out + }) + h.Cron().Methods().StartScheduler().DeclareMethod( ``, func(rs h.CronSet) { - schedulerUpdateChan = make(chan bool) - go rs.SchedulerLoop(15 * time.Minute) + go rs.SchedulerLoop(15 * time.Second) }) h.Cron().Methods().SchedulerLoop().DeclareMethod( ``, func(rs h.CronSet, next time.Duration) { + def := next + schedulerUpdateChan = make(chan bool) for { select { case <-time.After(next): - fmt.Println("time trigger") + next = h.Cron().NewSet(rs.Env()).Sync() case <-schedulerUpdateChan: - fmt.Println("update") + next = h.Cron().NewSet(rs.Env()).Sync() + case <-time.After(def): + next = h.Cron().NewSet(rs.Env()).Sync() + } + } + }) + + h.Cron().Methods().CheckTimeMask().DeclareMethod( + ``, + func(rs h.CronSet, data h.CronData) bool { + if data.MaskBool { + if data.Mask.MonthBool() { + curMonth := string([]byte(data.TimeAtDate.Month().String()))[:3] + if !data.Mask.Get(curMonth).(bool) { + return false + } + } + if data.Mask.WeekDayBool() { + curWD := string([]byte(data.TimeAtDate.Weekday().String()))[:3] + if !data.Mask.Get(curWD).(bool) { + return false + } + } + if data.Mask.DayBool() { + str := "," + data.Mask.DayStr() + "," + if !strings.Contains(str, ","+strconv.Itoa(data.TimeAtDate.Day())+",") { + return false + } + } + if data.Mask.HourBool() { + str := "," + data.Mask.HourStr() + "," + if !strings.Contains(str, ","+strconv.Itoa(data.TimeAtDate.Hour())+",") { + return false + } + } + if data.Mask.MinuteBool() { + str := "," + data.Mask.MinuteStr() + "," + if !strings.Contains(str, ","+strconv.Itoa(data.TimeAtDate.Minute())+",") { + return false + } + } + } + return true + }) + + h.Cron().Methods().Sync().DeclareMethod( + ``, + func(rs h.CronSet) time.Duration { + out := float64(15 * 60) + models.ExecuteInNewEnvironment(security.SuperUserID, func(env models.Environment) { + for _, rec := range h.Cron().Search(env, q.Cron().Status().Equals(true)).Records() { + data := rec.First() + funcExecuted := false + for (data.TimeAtDate.LowerEqual(dates.Now().UTC().Add(time.Second)) || !rs.CheckTimeMask(data)) && data.Status { + if !funcExecuted && rs.CheckTimeMask(data) { + var param interface{} + json.Unmarshal([]byte(data.TargetParams), ¶m) + params := interfaceSlice(param) + h.Worker().NewSet(env).WithWorker(data.TargetWorker.Name()).WithParams(params...).Enqueue(models.Registry.MustGet(data.TargetModel).Methods().MustGet(data.TargetMethod)) + if !data.Catchup { + funcExecuted = true + } + data.ExecuteCount += 1 + if data.RepeatAmmountBool { + data.RepeatAmmount -= 1 + if data.RepeatAmmount == 0 { + data.Status = false + } + } + } + switch data.RepeatLapseSelection { + case `month`: + data.TimeAtDate = data.TimeAtDate.AddDate(0, int(data.RepeatLapseAmmount), 0) + case `day`: + data.TimeAtDate = data.TimeAtDate.AddDate(0, 0, int(data.RepeatLapseAmmount)) + case `hour`: + data.TimeAtDate = data.TimeAtDate.Add(time.Duration(data.RepeatLapseAmmount) * time.Hour) + case `minute`: + data.TimeAtDate = data.TimeAtDate.Add(time.Duration(data.RepeatLapseAmmount) * time.Minute) + case `second`: + data.TimeAtDate = data.TimeAtDate.Add(time.Duration(data.RepeatLapseAmmount) * time.Second) + } + } + lapse := data.TimeAtDate.Sub(dates.Now()).Seconds() + if lapse < out { + out = lapse + } + rec.Write(&h.CronData{ + TimeAtDate: data.TimeAtDate, + ExecuteCount: data.ExecuteCount, + RepeatAmmount: data.RepeatAmmount, + Status: data.Status}, + h.Cron().Status()) + } + }) + return time.Duration(out) * time.Second + }) + + h.CronTimeMask().DeclareModel() + + h.CronTimeMask().AddFields(map[string]models.FieldDefinition{ + "Name": models.CharField{ + Required: true, + Unique: true}, + "MonthBool": models.BooleanField{}, + "Jan": models.BooleanField{}, + "Feb": models.BooleanField{}, + "Mar": models.BooleanField{}, + "Apr": models.BooleanField{}, + "May": models.BooleanField{}, + "Jun": models.BooleanField{}, + "Jul": models.BooleanField{}, + "Aug": models.BooleanField{}, + "Sep": models.BooleanField{}, + "Oct": models.BooleanField{}, + "Nov": models.BooleanField{}, + "Dec": models.BooleanField{}, + "WeekDayBool": models.BooleanField{}, + "Mon": models.BooleanField{}, + "Tue": models.BooleanField{}, + "Wed": models.BooleanField{}, + "Thu": models.BooleanField{}, + "Fri": models.BooleanField{}, + "Sat": models.BooleanField{}, + "Sun": models.BooleanField{}, + "DayBool": models.BooleanField{}, + "DayStr": models.CharField{}, + "HourBool": models.BooleanField{}, + "HourStr": models.CharField{}, + "MinuteBool": models.BooleanField{}, + "MinuteStr": models.CharField{}, + }) + + h.CronTimeMask().Methods().Create().Extend( + ``, + func(rs h.CronTimeMaskSet, data *h.CronTimeMaskData, namer ...models.FieldNamer) h.CronTimeMaskSet { + data.DayStr = rs.CompileNbStr(data.DayStr, 1, 31) + data.HourStr = rs.CompileNbStr(data.HourStr, 0, 23) + data.MinuteStr = rs.CompileNbStr(data.MinuteStr, 0, 59) + out := rs.Super().Create(data, namer...) + return out + }) + + h.CronTimeMask().Methods().CompileNbStr().DeclareMethod( + ``, + func(rs h.CronTimeMaskSet, str string, min, max int) string { + spl := strings.Split(str, ",") + intSl := []int{} + for _, s := range spl { + s = strings.TrimSpace(s) + if strings.ContainsRune(s, '-') { + sp := strings.Split(s, "-") + int1, err1 := strconv.Atoi(sp[0]) + int2, err2 := strconv.Atoi(sp[1]) + if err1 != nil || err2 != nil { + continue + } + if int2 < int1 { + int2 = int2 + max + 1 + } + for ; int1 <= int2; int1++ { + if int1 > min { + intSl = append(intSl, int1%(max+1)) + } + } + } else { + i, err := strconv.Atoi(s) + if err == nil && i > min { + intSl = append(intSl, i%(max+1)) + } } } + sort.Ints(intSl) + var out []byte + for i, n := range intSl { + if n >= min { + if i != 0 { + out = append(out, ',') + } + out = append(out, []byte(strconv.Itoa(n))...) + } + } + return string(out) }) + } diff --git a/base/model_mixin.go b/base/model_mixin.go index aee150c..8c32905 100644 --- a/base/model_mixin.go +++ b/base/model_mixin.go @@ -4,6 +4,8 @@ package base import ( + "encoding/json" + "github.com/hexya-erp/hexya/hexya/models" "github.com/hexya-erp/hexya/pool/h" "github.com/hexya-erp/hexya/pool/q" @@ -51,4 +53,31 @@ func init() { } return rs.Search(activeCond) }) + + h.CommonMixin().Methods().WithWorker().DeclareMethod( + `WithWorker`, + func(rs h.CommonMixinSet, workerName string) h.JobArgsSet { + out := h.JobArgs().NewSet(rs.Env()).Create(&h.JobArgsData{}) + out.SetModelName(rs.Collection().ModelName()) + out.SetWorkerName(workerName) + return out + }) + + h.CommonMixin().Methods().WithParams().DeclareMethod( + `WithParams`, + func(rs h.CommonMixinSet, params ...interface{}) h.JobArgsSet { + out := h.JobArgs().NewSet(rs.Env()).Create(&h.JobArgsData{}) + out.SetModelName(rs.Collection().ModelName()) + json, _ := json.Marshal(params) + out.SetParams(string(json)) + return out + }) + + h.CommonMixin().Methods().Enqueue().DeclareMethod( + `EnqueueJob`, + func(rs h.CommonMixinSet, method models.Methoder) { + out := h.JobArgs().NewSet(rs.Env()).Create(&h.JobArgsData{}) + out.SetModelName(rs.Collection().ModelName()) + out.Enqueue(method) + }) } diff --git a/base/resources/cron.xml b/base/resources/cron.xml new file mode 100644 index 0000000..b69fd1d --- /dev/null +++ b/base/resources/cron.xml @@ -0,0 +1,352 @@ + + + + + + + + + +