0670ebe18c07712e7bf5c55fb8e50e8c6173df60
[ealt-edge.git] / mep / mepserver / mp1 / arch / workspace / runner.go
1 /*
2  * Copyright 2020 Huawei Technologies Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package workspace
18
19 import (
20         "sync"
21
22         "mepserver/mp1/arch/bus"
23 )
24
25 type GoPolicy int
26
27 const (
28         _ GoPolicy = iota
29         GoParallel
30         GoBackground
31         GoSerial
32 )
33
34 func WkRun(plan SpaceIf) ErrCode {
35         curPlan := plan.getPlan()
36         for {
37                 if curPlan.CurGrpIdx >= len(curPlan.PlanGrp) {
38                         break
39                 }
40                 curSub := &curPlan.PlanGrp[curPlan.CurGrpIdx]
41                 retCode, stepIdx := GrpRun(curSub, plan, &curPlan.WtPlan)
42                 if retCode <= TaskOK {
43                         curPlan.CurGrpIdx++
44                         continue
45                 }
46                 RecordErrInfo(curPlan, stepIdx)
47                 GotoErrorProc(curPlan)
48         }
49         // wait backgroud job finish
50         curPlan.WtPlan.Wait()
51         return TaskOK
52
53 }
54
55 func TaskRunner(wkSpace interface{}, stepIf TaskBaseIf) int {
56         for {
57                 bus.LoadObjByInd(stepIf, wkSpace, "in")
58                 retCode := stepIf.OnRequest("")
59                 if retCode <= TaskFinish {
60                         bus.LoadObjByInd(stepIf, wkSpace, "out")
61                         break
62                 }
63         }
64         return 0
65 }
66
67 func StepPolicy(wg *sync.WaitGroup, curSub *SubGrp, plan SpaceIf, wtPlan *sync.WaitGroup, stepIf TaskBaseIf) ErrCode {
68         taskRet := TaskOK
69         switch curSub.Policy {
70         case GoBackground:
71                 wtPlan.Add(1)
72                 go func() {
73                         defer wtPlan.Done()
74                         TaskRunner(plan, stepIf)
75                 }()
76
77         case GoParallel:
78                 wg.Add(1)
79                 go func() {
80                         defer wg.Done()
81                         TaskRunner(plan, stepIf)
82                 }()
83         default:
84                 TaskRunner(plan, stepIf)
85                 taskRet, _ = stepIf.GetErrCode()
86         }
87
88         return taskRet
89 }
90
91 func GrpOneStep(wg *sync.WaitGroup, curSub *SubGrp, plan SpaceIf, wtPlan *sync.WaitGroup) bool {
92         if curSub.CurStepIdx >= len(curSub.StepObjs) {
93                 return false
94         }
95         curStep := curSub.StepObjs[curSub.CurStepIdx]
96         if curStep == nil {
97                 curSub.CurStepIdx++
98                 return true
99         }
100         stepIf, ok := curStep.(TaskBaseIf)
101         if !ok {
102                 return false
103         }
104         taskRet := StepPolicy(wg, curSub, plan, wtPlan, stepIf)
105         curSub.CurStepIdx++
106
107         return taskRet <= TaskOK
108 }
109
110 func GrpGetRetCode(curSub *SubGrp) (ErrCode, int) {
111         for idx, curStep := range curSub.StepObjs {
112                 stepIf, ok := curStep.(TaskBaseIf)
113                 if !ok {
114                         continue
115                 }
116                 errCode, _ := stepIf.GetErrCode()
117                 if errCode > TaskOK {
118                         return errCode, idx
119                 }
120         }
121
122         return TaskOK, -1
123 }
124
125 func GrpRun(curSub *SubGrp, plan SpaceIf, wtPlan *sync.WaitGroup) (ErrCode, int) {
126         var wg sync.WaitGroup
127         for {
128                 nextStep := GrpOneStep(&wg, curSub, plan, wtPlan)
129                 if !nextStep {
130                         break
131                 }
132         }
133         if curSub.Policy == GoParallel {
134                 wg.Wait()
135         }
136         return GrpGetRetCode(curSub)
137 }