From: khemendra kumar Date: Thu, 14 May 2020 10:51:28 +0000 (+0530) Subject: added server workspace files X-Git-Url: https://gerrit.akraino.org/r/gitweb?a=commitdiff_plain;h=ee1dadbea3758d8312147eff711c4702b7bc42d5;p=ealt-edge.git added server workspace files Change-Id: Ia556fa15aff28f984c8cbe1207fc15a5e0d8ffc6 --- diff --git a/mep/mepserver/mp1/arch/workspace/runner.go b/mep/mepserver/mp1/arch/workspace/runner.go new file mode 100644 index 0000000..0670ebe --- /dev/null +++ b/mep/mepserver/mp1/arch/workspace/runner.go @@ -0,0 +1,137 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package workspace + +import ( + "sync" + + "mepserver/mp1/arch/bus" +) + +type GoPolicy int + +const ( + _ GoPolicy = iota + GoParallel + GoBackground + GoSerial +) + +func WkRun(plan SpaceIf) ErrCode { + curPlan := plan.getPlan() + for { + if curPlan.CurGrpIdx >= len(curPlan.PlanGrp) { + break + } + curSub := &curPlan.PlanGrp[curPlan.CurGrpIdx] + retCode, stepIdx := GrpRun(curSub, plan, &curPlan.WtPlan) + if retCode <= TaskOK { + curPlan.CurGrpIdx++ + continue + } + RecordErrInfo(curPlan, stepIdx) + GotoErrorProc(curPlan) + } + // wait backgroud job finish + curPlan.WtPlan.Wait() + return TaskOK + +} + +func TaskRunner(wkSpace interface{}, stepIf TaskBaseIf) int { + for { + bus.LoadObjByInd(stepIf, wkSpace, "in") + retCode := stepIf.OnRequest("") + if retCode <= TaskFinish { + bus.LoadObjByInd(stepIf, wkSpace, "out") + break + } + } + return 0 +} + +func StepPolicy(wg *sync.WaitGroup, curSub *SubGrp, plan SpaceIf, wtPlan *sync.WaitGroup, stepIf TaskBaseIf) ErrCode { + taskRet := TaskOK + switch curSub.Policy { + case GoBackground: + wtPlan.Add(1) + go func() { + defer wtPlan.Done() + TaskRunner(plan, stepIf) + }() + + case GoParallel: + wg.Add(1) + go func() { + defer wg.Done() + TaskRunner(plan, stepIf) + }() + default: + TaskRunner(plan, stepIf) + taskRet, _ = stepIf.GetErrCode() + } + + return taskRet +} + +func GrpOneStep(wg *sync.WaitGroup, curSub *SubGrp, plan SpaceIf, wtPlan *sync.WaitGroup) bool { + if curSub.CurStepIdx >= len(curSub.StepObjs) { + return false + } + curStep := curSub.StepObjs[curSub.CurStepIdx] + if curStep == nil { + curSub.CurStepIdx++ + return true + } + stepIf, ok := curStep.(TaskBaseIf) + if !ok { + return false + } + taskRet := StepPolicy(wg, curSub, plan, wtPlan, stepIf) + curSub.CurStepIdx++ + + return taskRet <= TaskOK +} + +func GrpGetRetCode(curSub *SubGrp) (ErrCode, int) { + for idx, curStep := range curSub.StepObjs { + stepIf, ok := curStep.(TaskBaseIf) + if !ok { + continue + } + errCode, _ := stepIf.GetErrCode() + if errCode > TaskOK { + return errCode, idx + } + } + + return TaskOK, -1 +} + +func GrpRun(curSub *SubGrp, plan SpaceIf, wtPlan *sync.WaitGroup) (ErrCode, int) { + var wg sync.WaitGroup + for { + nextStep := GrpOneStep(&wg, curSub, plan, wtPlan) + if !nextStep { + break + } + } + if curSub.Policy == GoParallel { + wg.Wait() + } + return GrpGetRetCode(curSub) +} diff --git a/mep/mepserver/mp1/arch/workspace/task_base.go b/mep/mepserver/mp1/arch/workspace/task_base.go new file mode 100644 index 0000000..7176bd3 --- /dev/null +++ b/mep/mepserver/mp1/arch/workspace/task_base.go @@ -0,0 +1,93 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package workspace + +type TaskBaseIf interface { + OnRequest(data string) TaskCode + Parse(params string) + OnFork(wkSpace interface{}, param interface{}) int + GetErrCode() (ErrCode, string) + OnStop() int + WithName(name string) + SetSerErrInfo(serErr *SerErrInfo) +} + +type TaskBase struct { + serErr *SerErrInfo + errMsg string + Name string + Param []string + resultCode ErrCode +} + +func (t *TaskBase) WithName(name string) { + t.Name = name +} + +func (t *TaskBase) Parse(params string) { + t.Param = append(t.Param, params) +} + +func (t *TaskBase) OnFork(wkSpace interface{}, param interface{}) int { + return 0 +} + +func (t *TaskBase) OnStop() int { + return 0 +} + +type TaskCode int + +const ( + TaskFinish TaskCode = iota + TaskWaitMore + TaskError +) + +func (t *TaskBase) OnRequest(wkSpace interface{}) TaskCode { + return TaskFinish +} + +func (t *TaskBase) SetFirstErrorCode(code ErrCode, msg string) { + if t.resultCode > TaskOK { + return + } + t.resultCode = code + t.errMsg = msg +} + +func (t *TaskBase) GetErrCode() (ErrCode, string) { + return t.resultCode, t.errMsg +} + +func (t *TaskBase) SetSerErrInfo(serErr *SerErrInfo) { + t.serErr = serErr +} + +func (t *TaskBase) GetSerErrInfo() *SerErrInfo { + return t.serErr +} + +type StepMap map[string]interface{} + +var StepData StepMap = StepMap{} + +func NameMap(name string, obj interface{}, listenOn string) bool { + StepData[name] = obj + return true +} diff --git a/mep/mepserver/mp1/arch/workspace/workspace_base.go b/mep/mepserver/mp1/arch/workspace/workspace_base.go new file mode 100644 index 0000000..e15002c --- /dev/null +++ b/mep/mepserver/mp1/arch/workspace/workspace_base.go @@ -0,0 +1,203 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package workspace + +import ( + "sync" +) + +type PlanIf interface { + SetErrorStep(stepName string) + UsingSpace(spaceName string) + RunBackground(task ...interface{}) + RunParallel(task ...interface{}) + RunSerial(task ...interface{}) +} + +type SpaceIf interface { + PlanIf + getPlan() *PlanBase +} + +type ErrCode int + +const ( + TaskContinue ErrCode = -1 + TaskOK ErrCode = iota + TaskFail +) + +const FINALLY string = "finally" + + +type SubGrp struct { + Policy GoPolicy + CurStepIdx int + StepNames []string + StepObjs []interface{} +} + +func (s *SubGrp) Install(task []interface{}) bool { + for _, stepObj := range task { + if stepObj == nil { + return false + } + s.StepObjs = append(s.StepObjs, stepObj) + } + return true +} + +type SerErrInfo struct { + ErrCode int + Message string + GrpIdx int + TaskIdx int +} + +type PlanBase struct { + SerError *SerErrInfo + PlanName string + SpaceName string + ErrStep string + PlanGrp []SubGrp + CurGrpIdx int + WtPlan sync.WaitGroup +} + +func (b *PlanBase) SetErrorStep(stepName string) { + b.ErrStep = stepName +} + +func (b *PlanBase) UsingSpace(spaceName string) { + b.SpaceName = spaceName +} + +func (b *PlanBase) RunBackground(task ...interface{}) { + b.LoadTask(GoBackground, task) +} + +func (b *PlanBase) RunParallel(task ...interface{}) { + b.LoadTask(GoParallel, task) +} + +func (b *PlanBase) RunSerial(task ...interface{}) { + b.LoadTask(GoSerial, task) +} + +func (b *PlanBase) RunSerialName(task interface{}, name string) { + var subGrp SubGrp + subGrp.Policy = GoSerial + subGrp.Install([]interface{}{task}) + b.LoadData([]interface{}{task}) + subGrp.StepNames = []string{name} + b.PlanGrp = append(b.PlanGrp, subGrp) +} + +func (b *PlanBase) Try(task ...interface{}) { + b.SetErrorStep(FINALLY) + b.LoadTask(GoSerial, task) +} + +func (b *PlanBase) Finally(task interface{}) { + var subGrp SubGrp + subGrp.Policy = GoSerial + subGrp.Install([]interface{}{task}) + b.LoadData([]interface{}{task}) + subGrp.StepNames = []string{FINALLY} + b.PlanGrp = append(b.PlanGrp, subGrp) +} + +func (b *PlanBase) LoadTask(policy GoPolicy, task []interface{}) { + var subGrp SubGrp + subGrp.Policy = policy + subGrp.Install(task) + b.LoadData(task) + b.PlanGrp = append(b.PlanGrp, subGrp) +} + +func (b *PlanBase) LoadData(task []interface{}) bool { + for _, stepObj := range task { + if stepObj == nil { + return false + } + stepIf, ok := stepObj.(TaskBaseIf) + if !ok { + continue + } + stepIf.SetSerErrInfo(b.SerError) + } + return true +} + +type SpaceBase struct { + PlanBase +} + +func (s *SpaceBase) Init() { + s.SerError = &SerErrInfo{} +} + +func (s *SpaceBase) getPlan() *PlanBase { + return &s.PlanBase +} + +func GotoErrorStep(curPlan *PlanBase, grpNum int) bool { + for idx, stepName := range curPlan.PlanGrp[grpNum].StepNames { + if stepName == curPlan.ErrStep { + if curPlan.CurGrpIdx > grpNum { + return true + } + curPlan.CurGrpIdx = grpNum + if curPlan.PlanGrp[grpNum].CurStepIdx < idx { + curPlan.PlanGrp[grpNum].CurStepIdx = idx + } + return true + } + } + return false +} + +func RecordErrInfo(curPlan *PlanBase, stepIdx int) { + if curPlan.CurGrpIdx >= len(curPlan.PlanGrp) { + return + } + curGrp := curPlan.PlanGrp[curPlan.CurGrpIdx] + if stepIdx < 0 || stepIdx >= len(curGrp.StepObjs) { + return + } + + curPlan.SerError.GrpIdx = curPlan.CurGrpIdx + curPlan.SerError.TaskIdx = stepIdx + curStep := curGrp.StepObjs[stepIdx] + stepIf, ok := curStep.(TaskBaseIf) + if !ok { + return + } + errCode, msg := stepIf.GetErrCode() + curPlan.SerError.ErrCode = int(errCode) + curPlan.SerError.Message = msg +} + +func GotoErrorProc(curPlan *PlanBase) { + curPlan.CurGrpIdx++ + for grpNum := 0; grpNum < len(curPlan.PlanGrp); grpNum++ { + done := GotoErrorStep(curPlan, grpNum) + if done { + break + } + } +}