added server workspace files 47/3447/1
authorkhemendra kumar <khemendra.kumar@huawei.com>
Thu, 14 May 2020 10:51:28 +0000 (16:21 +0530)
committerkhemendra kumar <khemendra.kumar@huawei.com>
Thu, 14 May 2020 10:51:28 +0000 (16:21 +0530)
Change-Id: Ia556fa15aff28f984c8cbe1207fc15a5e0d8ffc6

mep/mepserver/mp1/arch/workspace/runner.go [new file with mode: 0644]
mep/mepserver/mp1/arch/workspace/task_base.go [new file with mode: 0644]
mep/mepserver/mp1/arch/workspace/workspace_base.go [new file with mode: 0644]

diff --git a/mep/mepserver/mp1/arch/workspace/runner.go b/mep/mepserver/mp1/arch/workspace/runner.go
new file mode 100644 (file)
index 0000000..0670ebe
--- /dev/null
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package workspace
+
+import (
+       "sync"
+
+       "mepserver/mp1/arch/bus"
+)
+
+type GoPolicy int
+
+const (
+       _ GoPolicy = iota
+       GoParallel
+       GoBackground
+       GoSerial
+)
+
+func WkRun(plan SpaceIf) ErrCode {
+       curPlan := plan.getPlan()
+       for {
+               if curPlan.CurGrpIdx >= len(curPlan.PlanGrp) {
+                       break
+               }
+               curSub := &curPlan.PlanGrp[curPlan.CurGrpIdx]
+               retCode, stepIdx := GrpRun(curSub, plan, &curPlan.WtPlan)
+               if retCode <= TaskOK {
+                       curPlan.CurGrpIdx++
+                       continue
+               }
+               RecordErrInfo(curPlan, stepIdx)
+               GotoErrorProc(curPlan)
+       }
+       // wait backgroud job finish
+       curPlan.WtPlan.Wait()
+       return TaskOK
+
+}
+
+func TaskRunner(wkSpace interface{}, stepIf TaskBaseIf) int {
+       for {
+               bus.LoadObjByInd(stepIf, wkSpace, "in")
+               retCode := stepIf.OnRequest("")
+               if retCode <= TaskFinish {
+                       bus.LoadObjByInd(stepIf, wkSpace, "out")
+                       break
+               }
+       }
+       return 0
+}
+
+func StepPolicy(wg *sync.WaitGroup, curSub *SubGrp, plan SpaceIf, wtPlan *sync.WaitGroup, stepIf TaskBaseIf) ErrCode {
+       taskRet := TaskOK
+       switch curSub.Policy {
+       case GoBackground:
+               wtPlan.Add(1)
+               go func() {
+                       defer wtPlan.Done()
+                       TaskRunner(plan, stepIf)
+               }()
+
+       case GoParallel:
+               wg.Add(1)
+               go func() {
+                       defer wg.Done()
+                       TaskRunner(plan, stepIf)
+               }()
+       default:
+               TaskRunner(plan, stepIf)
+               taskRet, _ = stepIf.GetErrCode()
+       }
+
+       return taskRet
+}
+
+func GrpOneStep(wg *sync.WaitGroup, curSub *SubGrp, plan SpaceIf, wtPlan *sync.WaitGroup) bool {
+       if curSub.CurStepIdx >= len(curSub.StepObjs) {
+               return false
+       }
+       curStep := curSub.StepObjs[curSub.CurStepIdx]
+       if curStep == nil {
+               curSub.CurStepIdx++
+               return true
+       }
+       stepIf, ok := curStep.(TaskBaseIf)
+       if !ok {
+               return false
+       }
+       taskRet := StepPolicy(wg, curSub, plan, wtPlan, stepIf)
+       curSub.CurStepIdx++
+
+       return taskRet <= TaskOK
+}
+
+func GrpGetRetCode(curSub *SubGrp) (ErrCode, int) {
+       for idx, curStep := range curSub.StepObjs {
+               stepIf, ok := curStep.(TaskBaseIf)
+               if !ok {
+                       continue
+               }
+               errCode, _ := stepIf.GetErrCode()
+               if errCode > TaskOK {
+                       return errCode, idx
+               }
+       }
+
+       return TaskOK, -1
+}
+
+func GrpRun(curSub *SubGrp, plan SpaceIf, wtPlan *sync.WaitGroup) (ErrCode, int) {
+       var wg sync.WaitGroup
+       for {
+               nextStep := GrpOneStep(&wg, curSub, plan, wtPlan)
+               if !nextStep {
+                       break
+               }
+       }
+       if curSub.Policy == GoParallel {
+               wg.Wait()
+       }
+       return GrpGetRetCode(curSub)
+}
diff --git a/mep/mepserver/mp1/arch/workspace/task_base.go b/mep/mepserver/mp1/arch/workspace/task_base.go
new file mode 100644 (file)
index 0000000..7176bd3
--- /dev/null
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package workspace
+
+type TaskBaseIf interface {
+       OnRequest(data string) TaskCode
+       Parse(params string)
+       OnFork(wkSpace interface{}, param interface{}) int
+       GetErrCode() (ErrCode, string)
+       OnStop() int
+       WithName(name string)
+       SetSerErrInfo(serErr *SerErrInfo)
+}
+
+type TaskBase struct {
+       serErr     *SerErrInfo
+       errMsg     string
+       Name       string
+       Param      []string
+       resultCode ErrCode
+}
+
+func (t *TaskBase) WithName(name string) {
+       t.Name = name
+}
+
+func (t *TaskBase) Parse(params string) {
+       t.Param = append(t.Param, params)
+}
+
+func (t *TaskBase) OnFork(wkSpace interface{}, param interface{}) int {
+       return 0
+}
+
+func (t *TaskBase) OnStop() int {
+       return 0
+}
+
+type TaskCode int
+
+const (
+       TaskFinish TaskCode = iota
+       TaskWaitMore
+       TaskError
+)
+
+func (t *TaskBase) OnRequest(wkSpace interface{}) TaskCode {
+       return TaskFinish
+}
+
+func (t *TaskBase) SetFirstErrorCode(code ErrCode, msg string) {
+       if t.resultCode > TaskOK {
+               return
+       }
+       t.resultCode = code
+       t.errMsg = msg
+}
+
+func (t *TaskBase) GetErrCode() (ErrCode, string) {
+       return t.resultCode, t.errMsg
+}
+
+func (t *TaskBase) SetSerErrInfo(serErr *SerErrInfo) {
+       t.serErr = serErr
+}
+
+func (t *TaskBase) GetSerErrInfo() *SerErrInfo {
+       return t.serErr
+}
+
+type StepMap map[string]interface{}
+
+var StepData StepMap = StepMap{}
+
+func NameMap(name string, obj interface{}, listenOn string) bool {
+       StepData[name] = obj
+       return true
+}
diff --git a/mep/mepserver/mp1/arch/workspace/workspace_base.go b/mep/mepserver/mp1/arch/workspace/workspace_base.go
new file mode 100644 (file)
index 0000000..e15002c
--- /dev/null
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package workspace
+
+import (
+       "sync"
+)
+
+type PlanIf interface {
+       SetErrorStep(stepName string)
+       UsingSpace(spaceName string)
+       RunBackground(task ...interface{})
+       RunParallel(task ...interface{})
+       RunSerial(task ...interface{})
+}
+
+type SpaceIf interface {
+       PlanIf
+       getPlan() *PlanBase
+}
+
+type ErrCode int
+
+const (
+       TaskContinue ErrCode = -1
+       TaskOK       ErrCode = iota
+       TaskFail
+)
+
+const FINALLY string = "finally"
+
+
+type SubGrp struct {
+       Policy     GoPolicy
+       CurStepIdx int
+       StepNames  []string
+       StepObjs   []interface{}
+}
+
+func (s *SubGrp) Install(task []interface{}) bool {
+       for _, stepObj := range task {
+               if stepObj == nil {
+                       return false
+               }
+               s.StepObjs = append(s.StepObjs, stepObj)
+       }
+       return true
+}
+
+type SerErrInfo struct {
+       ErrCode int
+       Message string
+       GrpIdx  int
+       TaskIdx int
+}
+
+type PlanBase struct {
+       SerError  *SerErrInfo
+       PlanName  string
+       SpaceName string
+       ErrStep   string
+       PlanGrp   []SubGrp
+       CurGrpIdx int
+       WtPlan    sync.WaitGroup
+}
+
+func (b *PlanBase) SetErrorStep(stepName string) {
+       b.ErrStep = stepName
+}
+
+func (b *PlanBase) UsingSpace(spaceName string) {
+       b.SpaceName = spaceName
+}
+
+func (b *PlanBase) RunBackground(task ...interface{}) {
+       b.LoadTask(GoBackground, task)
+}
+
+func (b *PlanBase) RunParallel(task ...interface{}) {
+       b.LoadTask(GoParallel, task)
+}
+
+func (b *PlanBase) RunSerial(task ...interface{}) {
+       b.LoadTask(GoSerial, task)
+}
+
+func (b *PlanBase) RunSerialName(task interface{}, name string) {
+       var subGrp SubGrp
+       subGrp.Policy = GoSerial
+       subGrp.Install([]interface{}{task})
+       b.LoadData([]interface{}{task})
+       subGrp.StepNames = []string{name}
+       b.PlanGrp = append(b.PlanGrp, subGrp)
+}
+
+func (b *PlanBase) Try(task ...interface{}) {
+       b.SetErrorStep(FINALLY)
+       b.LoadTask(GoSerial, task)
+}
+
+func (b *PlanBase) Finally(task interface{}) {
+       var subGrp SubGrp
+       subGrp.Policy = GoSerial
+       subGrp.Install([]interface{}{task})
+       b.LoadData([]interface{}{task})
+       subGrp.StepNames = []string{FINALLY}
+       b.PlanGrp = append(b.PlanGrp, subGrp)
+}
+
+func (b *PlanBase) LoadTask(policy GoPolicy, task []interface{}) {
+       var subGrp SubGrp
+       subGrp.Policy = policy
+       subGrp.Install(task)
+       b.LoadData(task)
+       b.PlanGrp = append(b.PlanGrp, subGrp)
+}
+
+func (b *PlanBase) LoadData(task []interface{}) bool {
+       for _, stepObj := range task {
+               if stepObj == nil {
+                       return false
+               }
+               stepIf, ok := stepObj.(TaskBaseIf)
+               if !ok {
+                       continue
+               }
+               stepIf.SetSerErrInfo(b.SerError)
+       }
+       return true
+}
+
+type SpaceBase struct {
+       PlanBase
+}
+
+func (s *SpaceBase) Init() {
+       s.SerError = &SerErrInfo{}
+}
+
+func (s *SpaceBase) getPlan() *PlanBase {
+       return &s.PlanBase
+}
+
+func GotoErrorStep(curPlan *PlanBase, grpNum int) bool {
+       for idx, stepName := range curPlan.PlanGrp[grpNum].StepNames {
+               if stepName == curPlan.ErrStep {
+                       if curPlan.CurGrpIdx > grpNum {
+                               return true
+                       }
+                       curPlan.CurGrpIdx = grpNum
+                       if curPlan.PlanGrp[grpNum].CurStepIdx < idx {
+                               curPlan.PlanGrp[grpNum].CurStepIdx = idx
+                       }
+                       return true
+               }
+       }
+       return false
+}
+
+func RecordErrInfo(curPlan *PlanBase, stepIdx int) {
+       if curPlan.CurGrpIdx >= len(curPlan.PlanGrp) {
+               return
+       }
+       curGrp := curPlan.PlanGrp[curPlan.CurGrpIdx]
+       if stepIdx < 0 || stepIdx >= len(curGrp.StepObjs) {
+               return
+       }
+
+       curPlan.SerError.GrpIdx = curPlan.CurGrpIdx
+       curPlan.SerError.TaskIdx = stepIdx
+       curStep := curGrp.StepObjs[stepIdx]
+       stepIf, ok := curStep.(TaskBaseIf)
+       if !ok {
+               return
+       }
+       errCode, msg := stepIf.GetErrCode()
+       curPlan.SerError.ErrCode = int(errCode)
+       curPlan.SerError.Message = msg
+}
+
+func GotoErrorProc(curPlan *PlanBase) {
+       curPlan.CurGrpIdx++
+       for grpNum := 0; grpNum < len(curPlan.PlanGrp); grpNum++ {
+               done := GotoErrorStep(curPlan, grpNum)
+               if done {
+                       break
+               }
+       }
+}