--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package workspace
+
+import (
+ "sync"
+
+ "mepserver/mp1/arch/bus"
+)
+
+type GoPolicy int
+
+const (
+ _ GoPolicy = iota
+ GoParallel
+ GoBackground
+ GoSerial
+)
+
+func WkRun(plan SpaceIf) ErrCode {
+ curPlan := plan.getPlan()
+ for {
+ if curPlan.CurGrpIdx >= len(curPlan.PlanGrp) {
+ break
+ }
+ curSub := &curPlan.PlanGrp[curPlan.CurGrpIdx]
+ retCode, stepIdx := GrpRun(curSub, plan, &curPlan.WtPlan)
+ if retCode <= TaskOK {
+ curPlan.CurGrpIdx++
+ continue
+ }
+ RecordErrInfo(curPlan, stepIdx)
+ GotoErrorProc(curPlan)
+ }
+ // wait backgroud job finish
+ curPlan.WtPlan.Wait()
+ return TaskOK
+
+}
+
+func TaskRunner(wkSpace interface{}, stepIf TaskBaseIf) int {
+ for {
+ bus.LoadObjByInd(stepIf, wkSpace, "in")
+ retCode := stepIf.OnRequest("")
+ if retCode <= TaskFinish {
+ bus.LoadObjByInd(stepIf, wkSpace, "out")
+ break
+ }
+ }
+ return 0
+}
+
+func StepPolicy(wg *sync.WaitGroup, curSub *SubGrp, plan SpaceIf, wtPlan *sync.WaitGroup, stepIf TaskBaseIf) ErrCode {
+ taskRet := TaskOK
+ switch curSub.Policy {
+ case GoBackground:
+ wtPlan.Add(1)
+ go func() {
+ defer wtPlan.Done()
+ TaskRunner(plan, stepIf)
+ }()
+
+ case GoParallel:
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ TaskRunner(plan, stepIf)
+ }()
+ default:
+ TaskRunner(plan, stepIf)
+ taskRet, _ = stepIf.GetErrCode()
+ }
+
+ return taskRet
+}
+
+func GrpOneStep(wg *sync.WaitGroup, curSub *SubGrp, plan SpaceIf, wtPlan *sync.WaitGroup) bool {
+ if curSub.CurStepIdx >= len(curSub.StepObjs) {
+ return false
+ }
+ curStep := curSub.StepObjs[curSub.CurStepIdx]
+ if curStep == nil {
+ curSub.CurStepIdx++
+ return true
+ }
+ stepIf, ok := curStep.(TaskBaseIf)
+ if !ok {
+ return false
+ }
+ taskRet := StepPolicy(wg, curSub, plan, wtPlan, stepIf)
+ curSub.CurStepIdx++
+
+ return taskRet <= TaskOK
+}
+
+func GrpGetRetCode(curSub *SubGrp) (ErrCode, int) {
+ for idx, curStep := range curSub.StepObjs {
+ stepIf, ok := curStep.(TaskBaseIf)
+ if !ok {
+ continue
+ }
+ errCode, _ := stepIf.GetErrCode()
+ if errCode > TaskOK {
+ return errCode, idx
+ }
+ }
+
+ return TaskOK, -1
+}
+
+func GrpRun(curSub *SubGrp, plan SpaceIf, wtPlan *sync.WaitGroup) (ErrCode, int) {
+ var wg sync.WaitGroup
+ for {
+ nextStep := GrpOneStep(&wg, curSub, plan, wtPlan)
+ if !nextStep {
+ break
+ }
+ }
+ if curSub.Policy == GoParallel {
+ wg.Wait()
+ }
+ return GrpGetRetCode(curSub)
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package workspace
+
+type TaskBaseIf interface {
+ OnRequest(data string) TaskCode
+ Parse(params string)
+ OnFork(wkSpace interface{}, param interface{}) int
+ GetErrCode() (ErrCode, string)
+ OnStop() int
+ WithName(name string)
+ SetSerErrInfo(serErr *SerErrInfo)
+}
+
+type TaskBase struct {
+ serErr *SerErrInfo
+ errMsg string
+ Name string
+ Param []string
+ resultCode ErrCode
+}
+
+func (t *TaskBase) WithName(name string) {
+ t.Name = name
+}
+
+func (t *TaskBase) Parse(params string) {
+ t.Param = append(t.Param, params)
+}
+
+func (t *TaskBase) OnFork(wkSpace interface{}, param interface{}) int {
+ return 0
+}
+
+func (t *TaskBase) OnStop() int {
+ return 0
+}
+
+type TaskCode int
+
+const (
+ TaskFinish TaskCode = iota
+ TaskWaitMore
+ TaskError
+)
+
+func (t *TaskBase) OnRequest(wkSpace interface{}) TaskCode {
+ return TaskFinish
+}
+
+func (t *TaskBase) SetFirstErrorCode(code ErrCode, msg string) {
+ if t.resultCode > TaskOK {
+ return
+ }
+ t.resultCode = code
+ t.errMsg = msg
+}
+
+func (t *TaskBase) GetErrCode() (ErrCode, string) {
+ return t.resultCode, t.errMsg
+}
+
+func (t *TaskBase) SetSerErrInfo(serErr *SerErrInfo) {
+ t.serErr = serErr
+}
+
+func (t *TaskBase) GetSerErrInfo() *SerErrInfo {
+ return t.serErr
+}
+
+type StepMap map[string]interface{}
+
+var StepData StepMap = StepMap{}
+
+func NameMap(name string, obj interface{}, listenOn string) bool {
+ StepData[name] = obj
+ return true
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package workspace
+
+import (
+ "sync"
+)
+
+type PlanIf interface {
+ SetErrorStep(stepName string)
+ UsingSpace(spaceName string)
+ RunBackground(task ...interface{})
+ RunParallel(task ...interface{})
+ RunSerial(task ...interface{})
+}
+
+type SpaceIf interface {
+ PlanIf
+ getPlan() *PlanBase
+}
+
+type ErrCode int
+
+const (
+ TaskContinue ErrCode = -1
+ TaskOK ErrCode = iota
+ TaskFail
+)
+
+const FINALLY string = "finally"
+
+
+type SubGrp struct {
+ Policy GoPolicy
+ CurStepIdx int
+ StepNames []string
+ StepObjs []interface{}
+}
+
+func (s *SubGrp) Install(task []interface{}) bool {
+ for _, stepObj := range task {
+ if stepObj == nil {
+ return false
+ }
+ s.StepObjs = append(s.StepObjs, stepObj)
+ }
+ return true
+}
+
+type SerErrInfo struct {
+ ErrCode int
+ Message string
+ GrpIdx int
+ TaskIdx int
+}
+
+type PlanBase struct {
+ SerError *SerErrInfo
+ PlanName string
+ SpaceName string
+ ErrStep string
+ PlanGrp []SubGrp
+ CurGrpIdx int
+ WtPlan sync.WaitGroup
+}
+
+func (b *PlanBase) SetErrorStep(stepName string) {
+ b.ErrStep = stepName
+}
+
+func (b *PlanBase) UsingSpace(spaceName string) {
+ b.SpaceName = spaceName
+}
+
+func (b *PlanBase) RunBackground(task ...interface{}) {
+ b.LoadTask(GoBackground, task)
+}
+
+func (b *PlanBase) RunParallel(task ...interface{}) {
+ b.LoadTask(GoParallel, task)
+}
+
+func (b *PlanBase) RunSerial(task ...interface{}) {
+ b.LoadTask(GoSerial, task)
+}
+
+func (b *PlanBase) RunSerialName(task interface{}, name string) {
+ var subGrp SubGrp
+ subGrp.Policy = GoSerial
+ subGrp.Install([]interface{}{task})
+ b.LoadData([]interface{}{task})
+ subGrp.StepNames = []string{name}
+ b.PlanGrp = append(b.PlanGrp, subGrp)
+}
+
+func (b *PlanBase) Try(task ...interface{}) {
+ b.SetErrorStep(FINALLY)
+ b.LoadTask(GoSerial, task)
+}
+
+func (b *PlanBase) Finally(task interface{}) {
+ var subGrp SubGrp
+ subGrp.Policy = GoSerial
+ subGrp.Install([]interface{}{task})
+ b.LoadData([]interface{}{task})
+ subGrp.StepNames = []string{FINALLY}
+ b.PlanGrp = append(b.PlanGrp, subGrp)
+}
+
+func (b *PlanBase) LoadTask(policy GoPolicy, task []interface{}) {
+ var subGrp SubGrp
+ subGrp.Policy = policy
+ subGrp.Install(task)
+ b.LoadData(task)
+ b.PlanGrp = append(b.PlanGrp, subGrp)
+}
+
+func (b *PlanBase) LoadData(task []interface{}) bool {
+ for _, stepObj := range task {
+ if stepObj == nil {
+ return false
+ }
+ stepIf, ok := stepObj.(TaskBaseIf)
+ if !ok {
+ continue
+ }
+ stepIf.SetSerErrInfo(b.SerError)
+ }
+ return true
+}
+
+type SpaceBase struct {
+ PlanBase
+}
+
+func (s *SpaceBase) Init() {
+ s.SerError = &SerErrInfo{}
+}
+
+func (s *SpaceBase) getPlan() *PlanBase {
+ return &s.PlanBase
+}
+
+func GotoErrorStep(curPlan *PlanBase, grpNum int) bool {
+ for idx, stepName := range curPlan.PlanGrp[grpNum].StepNames {
+ if stepName == curPlan.ErrStep {
+ if curPlan.CurGrpIdx > grpNum {
+ return true
+ }
+ curPlan.CurGrpIdx = grpNum
+ if curPlan.PlanGrp[grpNum].CurStepIdx < idx {
+ curPlan.PlanGrp[grpNum].CurStepIdx = idx
+ }
+ return true
+ }
+ }
+ return false
+}
+
+func RecordErrInfo(curPlan *PlanBase, stepIdx int) {
+ if curPlan.CurGrpIdx >= len(curPlan.PlanGrp) {
+ return
+ }
+ curGrp := curPlan.PlanGrp[curPlan.CurGrpIdx]
+ if stepIdx < 0 || stepIdx >= len(curGrp.StepObjs) {
+ return
+ }
+
+ curPlan.SerError.GrpIdx = curPlan.CurGrpIdx
+ curPlan.SerError.TaskIdx = stepIdx
+ curStep := curGrp.StepObjs[stepIdx]
+ stepIf, ok := curStep.(TaskBaseIf)
+ if !ok {
+ return
+ }
+ errCode, msg := stepIf.GetErrCode()
+ curPlan.SerError.ErrCode = int(errCode)
+ curPlan.SerError.Message = msg
+}
+
+func GotoErrorProc(curPlan *PlanBase) {
+ curPlan.CurGrpIdx++
+ for grpNum := 0; grpNum < len(curPlan.PlanGrp); grpNum++ {
+ done := GotoErrorStep(curPlan, grpNum)
+ if done {
+ break
+ }
+ }
+}