Intial commit for MEP server 23/3423/1
authorSwarup Nayak <swarup.nayak1@huawei.com>
Tue, 12 May 2020 15:48:14 +0000 (21:18 +0530)
committerSwarup Nayak <swarup.nayak1@huawei.com>
Tue, 12 May 2020 15:48:14 +0000 (21:18 +0530)
Change-Id: I93dbbbc77f158e2897db98469097c6918b8b336b

16 files changed:
mep/mepserver/mp1/controller.go [new file with mode: 0644]
mep/mepserver/mp1/instance_event_handler.go [new file with mode: 0644]
mep/mepserver/mp1/mepspace.go [new file with mode: 0644]
mep/mepserver/mp1/mp1event.go [new file with mode: 0644]
mep/mepserver/mp1/plan_del_one_subscribe.go [new file with mode: 0644]
mep/mepserver/mp1/plan_delete_svc.go [new file with mode: 0644]
mep/mepserver/mp1/plan_discover_svc.go [new file with mode: 0644]
mep/mepserver/mp1/plan_get_one_subscribe.go [new file with mode: 0644]
mep/mepserver/mp1/plan_get_one_svc.go [new file with mode: 0644]
mep/mepserver/mp1/plan_get_subsribes.go [new file with mode: 0644]
mep/mepserver/mp1/plan_register_svc.go [new file with mode: 0644]
mep/mepserver/mp1/plan_send_httprsp_created.go [new file with mode: 0644]
mep/mepserver/mp1/plan_subscribe_app.go [new file with mode: 0644]
mep/mepserver/mp1/plan_update_svc.go [new file with mode: 0644]
mep/mepserver/mp1/publisher.go [new file with mode: 0644]
mep/mepserver/mp1/websocket.go [new file with mode: 0644]

diff --git a/mep/mepserver/mp1/controller.go b/mep/mepserver/mp1/controller.go
new file mode 100644 (file)
index 0000000..95308ab
--- /dev/null
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "context"
+       "net/http"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/rest"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+       "github.com/apache/servicecomb-service-center/server/notify"
+       v4 "github.com/apache/servicecomb-service-center/server/rest/controller/v4"
+       svcutil "github.com/apache/servicecomb-service-center/server/service/util"
+
+       "mepserver/mp1/arch/workspace"
+       "mepserver/mp1/models"
+)
+
+const (
+       basePath         = "/mep/mec_service_mgmt/v1"
+       servicesPath     = basePath + "/services"
+       appServicesPath  = basePath + "/applications/:appInstanceId" + "/services"
+       appSubscribePath = basePath + "/applications/:appInstanceId/subscriptions"
+)
+
+const (
+       SerErrFailBase         workspace.ErrCode = workspace.TaskFail
+       SerErrServiceNotFound                    = 2
+       SerInstanceNotFound                      = 3
+       ParseInfoErr                             = 4
+       SubscriptionNotFound                     = 5
+       OperateDataWithEtcdErr                   = 6
+       SerErrServiceDelFailed                   = 7
+       SerErrServiceUpdFailed                   = 8
+)
+
+type APIHookFunc func() models.EndPointInfo
+
+type APIGwHook struct {
+       APIHook APIHookFunc
+}
+
+var apihook APIGwHook
+
+func SetAPIHook(hook APIGwHook) {
+       apihook = hook
+}
+
+func GetApiHook() APIGwHook {
+       return apihook
+}
+
+func init() {
+       initRouter()
+}
+
+func initRouter() {
+       rest.
+               RegisterServant(&Mp1Service{})
+}
+
+type Mp1Service struct {
+       v4.MicroServiceService
+}
+
+func (m *Mp1Service) URLPatterns() []rest.Route {
+       return []rest.Route{
+               // appSubscriptions
+               {Method: rest.HTTP_METHOD_POST, Path: appSubscribePath, Func: doAppSubscribe},
+               {Method: rest.HTTP_METHOD_GET, Path: appSubscribePath, Func: getAppSubscribes},
+               {Method: rest.HTTP_METHOD_GET, Path: appSubscribePath + "/:subscriptionId", Func: getOneAppSubscribe},
+               {Method: rest.HTTP_METHOD_DELETE, Path: appSubscribePath + "/:subscriptionId", Func: delOneAppSubscribe},
+               // appServices
+               {Method: rest.HTTP_METHOD_POST, Path: appServicesPath, Func: serviceRegister},
+               {Method: rest.HTTP_METHOD_GET, Path: appServicesPath, Func: serviceDiscover},
+               {Method: rest.HTTP_METHOD_PUT, Path: appServicesPath + "/:serviceId", Func: serviceUpdate},
+               {Method: rest.HTTP_METHOD_GET, Path: appServicesPath + "/:serviceId", Func: getOneService},
+               {Method: rest.HTTP_METHOD_DELETE, Path: appServicesPath + "/:serviceId", Func: serviceDelete},
+               // services
+               {Method: rest.HTTP_METHOD_GET, Path: servicesPath, Func: serviceDiscover},
+               {Method: rest.HTTP_METHOD_GET, Path: servicesPath + "/:serviceId", Func: getOneService},
+       }
+}
+
+func doAppSubscribe(w http.ResponseWriter, r *http.Request) {
+
+       workPlan := NewWorkSpace(w, r)
+       workPlan.Try(
+               (&DecodeRestReq{}).WithBody(&models.SerAvailabilityNotificationSubscription{}),
+               &SubscribeIst{})
+       workPlan.Finally(&SendHttpRspCreated{})
+
+       workspace.WkRun(workPlan)
+}
+
+func getAppSubscribes(w http.ResponseWriter, r *http.Request) {
+
+       workPlan := NewWorkSpace(w, r)
+       workPlan.Try(
+               &DecodeRestReq{},
+               &GetSubscribes{})
+       workPlan.Finally(&SendHttpRsp{})
+
+       workspace.WkRun(workPlan)
+}
+
+func getOneAppSubscribe(w http.ResponseWriter, r *http.Request) {
+
+       workPlan := NewWorkSpace(w, r)
+       workPlan.Try(
+               &DecodeRestReq{},
+               &GetOneSubscribe{})
+       workPlan.Finally(&SendHttpRsp{})
+
+       workspace.WkRun(workPlan)
+}
+
+func delOneAppSubscribe(w http.ResponseWriter, r *http.Request) {
+
+       workPlan := NewWorkSpace(w, r)
+       workPlan.Try(
+               &DecodeRestReq{},
+               &DelOneSubscribe{})
+       workPlan.Finally(&SendHttpRsp{})
+
+       workspace.WkRun(workPlan)
+}
+
+func serviceRegister(w http.ResponseWriter, r *http.Request) {
+       log.Info("Register service start...")
+
+       workPlan := NewWorkSpace(w, r)
+       workPlan.Try(
+               (&DecodeRestReq{}).WithBody(&models.ServiceInfo{}),
+               &RegisterServiceId{},
+               &RegisterServiceInst{})
+       workPlan.Finally(&SendHttpRspCreated{})
+
+       workspace.WkRun(workPlan)
+}
+
+func serviceDiscover(w http.ResponseWriter, r *http.Request) {
+       log.Info("Discover service service start...")
+
+       workPlan := NewWorkSpace(w, r)
+       workPlan.Try(
+               &DiscoverDecode{},
+               &DiscoverService{},
+               &ToStrDiscover{},
+               &RspHook{})
+       workPlan.Finally(&SendHttpRsp{})
+
+       workspace.WkRun(workPlan)
+}
+
+func serviceUpdate(w http.ResponseWriter, r *http.Request) {
+       log.Info("Update a service start...")
+
+       workPlan := NewWorkSpace(w, r)
+       workPlan.Try(
+               (&DecodeRestReq{}).WithBody(&models.ServiceInfo{}),
+               &UpdateInstance{})
+       workPlan.Finally(&SendHttpRsp{})
+
+       workspace.WkRun(workPlan)
+}
+
+func getOneService(w http.ResponseWriter, r *http.Request) {
+       log.Info("Register service start...")
+
+       workPlan := NewWorkSpace(w, r)
+       workPlan.Try(
+               &GetOneDecode{},
+               &GetOneInstance{})
+       workPlan.Finally(&SendHttpRsp{})
+
+       workspace.WkRun(workPlan)
+
+}
+
+func serviceDelete(w http.ResponseWriter, r *http.Request) {
+       log.Info("Delete a service start...")
+
+       workPlan := NewWorkSpace(w, r)
+       workPlan.Try(
+               &DecodeRestReq{},
+               &DeleteService{})
+       workPlan.Finally(&SendHttpRsp{})
+
+       workspace.WkRun(workPlan)
+}
+
+func WebsocketListAndWatch(ctx context.Context, req *proto.WatchInstanceRequest, consumerSvcId string) {
+       if req == nil || len(req.SelfServiceId) == 0 {
+               log.Warn("request fomat invalid!")
+               return
+       }
+       domainProject := util.ParseDomainProject(ctx)
+       if !svcutil.ServiceExist(ctx, domainProject, req.SelfServiceId) {
+               log.Warn("service does not exist!")
+               return
+       }
+       DoWebsocketListAndWatch(ctx, req.SelfServiceId, consumerSvcId, func() ([]*proto.WatchInstanceResponse, int64) {
+               return svcutil.QueryAllProvidersInstances(ctx, req.SelfServiceId)
+       })
+}
+
+func DoWebsocketListAndWatch(ctx context.Context, serviceId string, consumerSvcId string, f func() ([]*proto.WatchInstanceResponse, int64)) {
+       domainProject := util.ParseDomainProject(ctx)
+       socket := &Websocket{
+               ctx:       ctx,
+               watcher:   notify.NewInstanceEventListWatcher(serviceId, domainProject, f),
+               serviceID: consumerSvcId,
+       }
+       ProcessSocket(socket)
+}
+
+func ProcessSocket(socket *Websocket) {
+       if err := socket.Init(); err != nil {
+               return
+       }
+       socket.HandleWatchWebSocketControlMessage()
+}
diff --git a/mep/mepserver/mp1/instance_event_handler.go b/mep/mepserver/mp1/instance_event_handler.go
new file mode 100644 (file)
index 0000000..34ab04c
--- /dev/null
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "encoding/json"
+       "strings"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       apt "github.com/apache/servicecomb-service-center/server/core"
+       "github.com/apache/servicecomb-service-center/server/core/backend"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+       "github.com/apache/servicecomb-service-center/server/notify"
+       "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+       "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+       "github.com/apache/servicecomb-service-center/server/service/cache"
+       "github.com/apache/servicecomb-service-center/server/service/metrics"
+       svcutil "github.com/apache/servicecomb-service-center/server/service/util"
+       "golang.org/x/net/context"
+
+       "mepserver/mp1/models"
+)
+
+const indexStart, indexEnd = 33, 17
+
+type InstanceEtsiEventHandler struct {
+}
+
+func (h *InstanceEtsiEventHandler) Type() discovery.Type {
+       return backend.INSTANCE
+}
+
+func (h *InstanceEtsiEventHandler) OnEvent(evt discovery.KvEvent) {
+       action := evt.Type
+       instance, ok := evt.KV.Value.(*proto.MicroServiceInstance)
+       if !ok {
+               log.Warn("cast to instance failed")
+       }
+       providerId, providerInstanceId, domainProject := apt.GetInfoFromInstKV(evt.KV.Key)
+       idx := strings.Index(domainProject, "/")
+       domainName := domainProject[:idx]
+       switch action {
+       case proto.EVT_INIT:
+               metrics.ReportInstances(domainName, 1)
+               return
+       case proto.EVT_CREATE:
+               metrics.ReportInstances(domainName, 1)
+       case proto.EVT_DELETE:
+               metrics.ReportInstances(domainName, -1)
+               if !apt.IsDefaultDomainProject(domainProject) {
+                       projectName := domainProject[idx+1:]
+                       svcutil.RemandInstanceQuota(util.SetDomainProject(context.Background(), domainName, projectName))
+               }
+       }
+
+       if notify.NotifyCenter().Closed() {
+               log.Warnf("caught [%s] instance [%s/%s] event, endpoints %v, but notify service is closed",
+                       action, providerId, providerInstanceId, instance.Endpoints)
+               return
+       }
+
+       ctx := context.WithValue(context.WithValue(context.Background(),
+               svcutil.CTX_CACHEONLY, "1"),
+               svcutil.CTX_GLOBAL, "1")
+       ms, err := svcutil.GetService(ctx, domainProject, providerId)
+       if ms == nil {
+               log.Errorf(err, "caught [%s] instance [%s/%s] event, endpoints %v, get cached provider's file failed",
+                       action, providerId, providerInstanceId, instance.Endpoints)
+               return
+       }
+
+       log.Infof("caught [%s] service[%s][%s/%s/%s/%s] isntance[%s] event, endpoints %v",
+               action, providerId, ms.Environment, ms.AppId, ms.ServiceName, ms.Version, providerInstanceId, instance.Endpoints)
+
+       consumerIds := getCosumerIds()
+
+       log.Infof("there are %d consuemrIDs, %s", len(consumerIds), consumerIds)
+       PublishInstanceEvent(evt, domainProject, proto.MicroServiceToKey(domainProject, ms), consumerIds)
+}
+
+func getCosumerIds() []string {
+       var consumerIds []string
+       opts := []registry.PluginOp{
+               registry.OpGet(registry.WithStrKey("/cse-sr/inst/files"), registry.WithPrefix()),
+       }
+       resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+       if err != nil {
+               log.Errorf(err, "get subscription from etcd failed")
+               return consumerIds
+       }
+
+       for _, kvs := range resp.Kvs {
+               key := kvs.Key
+               keystring := string(key)
+               value := kvs.Value
+
+               var mp1Req models.ServiceInfo
+               err = json.Unmarshal(value, &mp1Req)
+               if err != nil {
+                       log.Errorf(err, "parse serviceInfo failed")
+               }
+               length := len(keystring)
+               keystring = keystring[length-indexStart : length-indexEnd]
+               if StringContains(consumerIds, keystring) == -1 {
+                       consumerIds = append(consumerIds, keystring)
+               }
+       }
+       return consumerIds
+}
+
+func NewInstanceEtsiEventHandler() *InstanceEtsiEventHandler {
+       return &InstanceEtsiEventHandler{}
+}
+
+func PublishInstanceEvent(evt discovery.KvEvent, domainProject string, serviceKey *proto.MicroServiceKey, subscribers []string) {
+       defer cache.FindInstances.Remove(serviceKey)
+       if len(subscribers) == 0 {
+               log.Warn("the subscribers size is 0")
+               return
+       }
+
+       response := &proto.WatchInstanceResponse{
+               Response: proto.CreateResponse(proto.Response_SUCCESS, "Watch instance successfully."),
+               Action:   string(evt.Type),
+               Key:      serviceKey,
+               Instance: evt.KV.Value.(*proto.MicroServiceInstance),
+       }
+       for _, consumerId := range subscribers {
+               job := notify.NewInstanceEventWithTime(consumerId, domainProject, evt.Revision, evt.CreateAt, response)
+               err := notify.NotifyCenter().Publish(job)
+               if err != nil {
+                       log.Errorf(err, "publish failed")
+               }
+       }
+}
diff --git a/mep/mepserver/mp1/mepspace.go b/mep/mepserver/mp1/mepspace.go
new file mode 100644 (file)
index 0000000..2d8a1d2
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package mp1
+
+import (
+       "net/http"
+       "net/url"
+
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+       "golang.org/x/net/context"
+
+       "mepserver/mp1/arch/workspace"
+)
+
+type MepSpace struct {
+       workspace.SpaceBase
+       R *http.Request       `json:"r"`
+       W http.ResponseWriter `json:"w"`
+
+       Ctx           context.Context `json:"ctx"`
+       ServiceId     string          `json:"serviceId"`
+       RestBody      interface{}     `json:"restBody"`
+       AppInstanceId string          `json:"appInstanceId"`
+       InstanceId    string          `json:"instanceId"`
+       SubscribeId   string          `json:"subscribeId"`
+       QueryParam    url.Values      `json:"queryParam"`
+
+       CoreRequest interface{}     `json:"coreRequest"`
+       CoreRsp     interface{}     `json:"coreRsp"`
+       HttPErrInf  *proto.Response `json:"httpErrInf"`
+       HttPRsp     interface{}     `json:"httpRsp"`
+}
+
+func NewWorkSpace(w http.ResponseWriter, r *http.Request) *MepSpace {
+       var plan = MepSpace{
+               W: w,
+               R: r,
+       }
+
+       plan.Init()
+       return &plan
+}
diff --git a/mep/mepserver/mp1/mp1event.go b/mep/mepserver/mp1/mp1event.go
new file mode 100644 (file)
index 0000000..a6c08d5
--- /dev/null
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       mgr "github.com/apache/servicecomb-service-center/server/plugin"
+       "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+       "github.com/apache/servicecomb-service-center/server/plugin/pkg/quota"
+       "github.com/apache/servicecomb-service-center/server/plugin/pkg/quota/buildin"
+)
+
+func init() {
+       mgr.RegisterPlugin(mgr.Plugin{PName: mgr.QUOTA, Name: "buildin", New: New})
+       discovery.AddEventHandler(NewInstanceEtsiEventHandler())
+}
+
+func New() mgr.PluginInstance {
+       buildin.InitConfigs()
+       log.Infof("quota init, service: %d, instance: %d, schema: %d/service, tag: %d/service, rule: %d/service",
+               quota.DefaultServiceQuota, quota.DefaultInstanceQuota, quota.DefaultSchemaQuota, quota.DefaultTagQuota, quota.DefaultRuleQuota)
+       return &buildin.BuildInQuota{}
+}
diff --git a/mep/mepserver/mp1/plan_del_one_subscribe.go b/mep/mepserver/mp1/plan_del_one_subscribe.go
new file mode 100644 (file)
index 0000000..efea6f2
--- /dev/null
@@ -0,0 +1,75 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "context"
+       "net/http"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/server/core/backend"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+       "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+
+       "mepserver/mp1/arch/workspace"
+)
+
+type DelOneSubscribe struct {
+       workspace.TaskBase
+       R             *http.Request       `json:"r,in"`
+       HttpErrInf    *proto.Response     `json:"httpErrInf,out"`
+       Ctx           context.Context     `json:"ctx,in"`
+       W             http.ResponseWriter `json:"w,in"`
+       AppInstanceId string              `json:"appInstanceId,in"`
+       SubscribeId   string              `json:"subscribeId,in"`
+       HttpRsp       interface{}         `json:"httpRsp,out"`
+}
+
+func (t *DelOneSubscribe) OnRequest(data string) workspace.TaskCode {
+
+       appInstanceId := t.AppInstanceId
+       subscribeId := t.SubscribeId
+
+       opts := []registry.PluginOp{
+               registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/" + appInstanceId + "/" + subscribeId)),
+       }
+       resp, errGet := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+       if errGet != nil {
+               log.Errorf(errGet, "get subscription from etcd failed")
+               t.SetFirstErrorCode(OperateDataWithEtcdErr, "get subscription from etch failed")
+               return workspace.TaskFinish
+       }
+
+       if len(resp.Kvs) == 0 {
+               log.Errorf(errGet, "subscription not exist")
+               t.SetFirstErrorCode(SubscriptionNotFound, "subscription not exist")
+               return workspace.TaskFinish
+       }
+
+       opts = []registry.PluginOp{
+               registry.OpDel(registry.WithStrKey("/cse-sr/etsi/subscribe/" + appInstanceId + "/" + subscribeId)),
+       }
+       _, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+       if err != nil {
+               log.Errorf(err, "delete subscription from etcd failed")
+               t.SetFirstErrorCode(OperateDataWithEtcdErr, "delete subscription from etch failed")
+               return workspace.TaskFinish
+       }
+
+       t.HttpRsp = ""
+       return workspace.TaskFinish
+}
diff --git a/mep/mepserver/mp1/plan_delete_svc.go b/mep/mepserver/mp1/plan_delete_svc.go
new file mode 100644 (file)
index 0000000..f0f1ece
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "context"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/server/core"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+
+       "mepserver/mp1/arch/workspace"
+)
+
+type DeleteService struct {
+       HttpErrInf *proto.Response `json:"httpErrInf,out"`
+       workspace.TaskBase
+       Ctx       context.Context `json:"ctx,in"`
+       ServiceId string          `json:"serviceId,in"`
+}
+
+func (t *DeleteService) OnRequest(data string) workspace.TaskCode {
+       if t.ServiceId == "" {
+               t.SetFirstErrorCode(SerErrServiceDelFailed, "param is empty")
+               return workspace.TaskFinish
+       }
+       serviceID := t.ServiceId[:len(t.ServiceId)/2]
+       instanceID := t.ServiceId[len(t.ServiceId)/2:]
+       req := &proto.UnregisterInstanceRequest{
+               ServiceId:  serviceID,
+               InstanceId: instanceID,
+       }
+       resp, err := core.InstanceAPI.Unregister(t.Ctx, req)
+       if err != nil {
+               log.Errorf(err, "Service delete failed!")
+       }
+       t.HttpErrInf = resp.Response
+
+       return workspace.TaskFinish
+}
diff --git a/mep/mepserver/mp1/plan_discover_svc.go b/mep/mepserver/mp1/plan_discover_svc.go
new file mode 100644 (file)
index 0000000..6ba4a69
--- /dev/null
@@ -0,0 +1,212 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "context"
+       "net/http"
+       "net/url"
+
+       "github.com/apache/servicecomb-service-center/server/core"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+
+       "mepserver/mp1/arch/workspace"
+       "mepserver/mp1/models"
+       meputil "mepserver/mp1/util"
+)
+
+type DiscoverDecode struct {
+       workspace.TaskBase
+       R           *http.Request   `json:"r,in"`
+       Ctx         context.Context `json:"ctx,out"`
+       QueryParam  url.Values      `json:"queryParam,out"`
+       CoreRequest interface{}     `json:"coreRequest,out"`
+}
+
+func (t *DiscoverDecode) OnRequest(data string) workspace.TaskCode {
+       t.Ctx, t.CoreRequest, t.QueryParam = meputil.GetFindParam(t.R)
+       return workspace.TaskFinish
+}
+
+type DiscoverService struct {
+       workspace.TaskBase
+       Ctx         context.Context `json:"ctx,in"`
+       QueryParam  url.Values      `json:"queryParam,in"`
+       CoreRequest interface{}     `json:"coreRequest,in"`
+       CoreRsp     interface{}     `json:"coreRsp,out"`
+}
+
+func (t *DiscoverService) checkInstanceId(req *proto.FindInstancesRequest) bool {
+       instanceId := req.AppId
+       if instanceId != "default" {
+               instances := t.CoreRsp.(*proto.FindInstancesResponse).Instances
+               for _, val := range instances {
+                       if val.ServiceId+val.InstanceId == instanceId {
+                               return true
+                       }
+               }
+               return false
+       }
+       return true
+}
+
+func (t *DiscoverService) OnRequest(data string) workspace.TaskCode {
+       req, ok := t.CoreRequest.(*proto.FindInstancesRequest)
+       if !ok {
+               t.SetFirstErrorCode(SerErrServiceNotFound, "cast to request fail")
+               return workspace.TaskFinish
+       }
+       if req.ServiceName == "" {
+               var errFindByKey error
+               t.CoreRsp, errFindByKey = meputil.FindInstanceByKey(t.QueryParam)
+               if errFindByKey != nil || t.CoreRsp == nil {
+                       t.SetFirstErrorCode(SerErrServiceNotFound, errFindByKey.Error())
+                       return workspace.TaskFinish
+               }
+               if !t.checkInstanceId(req) {
+                       t.SetFirstErrorCode(SerErrServiceNotFound, "instance id not found")
+               }
+               return workspace.TaskFinish
+       }
+
+       findInstance, err := core.InstanceAPI.Find(t.Ctx, req)
+       if err != nil {
+               t.SetFirstErrorCode(SerErrServiceNotFound, err.Error())
+               return workspace.TaskFinish
+       }
+       if findInstance == nil || len(findInstance.Instances) == 0 {
+               t.SetFirstErrorCode(SerErrServiceNotFound, "service not found")
+               return workspace.TaskFinish
+       }
+
+       t.CoreRsp = findInstance
+       return workspace.TaskFinish
+}
+
+type ToStrDiscover struct {
+       HttpErrInf *proto.Response `json:"httpErrInf,out"`
+       workspace.TaskBase
+       CoreRsp interface{} `json:"coreRsp,in"`
+       HttpRsp interface{} `json:"httpRsp,out"`
+}
+
+func (t *ToStrDiscover) OnRequest(data string) workspace.TaskCode {
+       t.HttpErrInf, t.HttpRsp = mp1CvtSrvDiscover(t.CoreRsp.(*proto.FindInstancesResponse))
+       return workspace.TaskFinish
+}
+
+type RspHook struct {
+       R *http.Request `json:"r,in"`
+       workspace.TaskBase
+       Ctx     context.Context `json:"ctx,in"`
+       HttpRsp interface{}     `json:"httpRsp,in"`
+       HookRsp interface{}     `json:"hookRsp,out"`
+}
+
+func (t *RspHook) OnRequest(data string) workspace.TaskCode {
+       t.HookRsp = instanceHook(t.Ctx, t.R, t.HttpRsp)
+       return workspace.TaskFinish
+}
+
+func instanceHook(ctx context.Context, r *http.Request, rspData interface{}) interface{} {
+       rspBody, ok := rspData.([]*models.ServiceInfo)
+       if !ok {
+               return rspData
+       }
+
+       if len(rspBody) == 0 {
+               return rspBody
+       }
+       consumerName := r.Header.Get("X-ConsumerName")
+       if consumerName == "APIGW" {
+               return rspBody
+       }
+
+       for _, v := range rspBody {
+               if apihook.APIHook != nil {
+                       info := apihook.APIHook()
+                       if len(info.Addresses) == 0 && len(info.Uris) == 0 {
+                               return rspBody
+                       }
+                       v.TransportInfo.Endpoint = info
+               }
+       }
+       return rspBody
+}
+
+type SendHttpRsp struct {
+       HttpErrInf *proto.Response `json:"httpErrInf,in"`
+       workspace.TaskBase
+       W       http.ResponseWriter `json:"w,in"`
+       HttpRsp interface{}         `json:"httpRsp,in"`
+}
+
+func (t *SendHttpRsp) OnRequest(data string) workspace.TaskCode {
+       errInfo := t.GetSerErrInfo()
+       if errInfo.ErrCode >= int(workspace.TaskFail) {
+               statusCode, httpBody := t.cvtHttpErrInfo(errInfo)
+               meputil.HttpErrResponse(t.W, statusCode, httpBody)
+
+               return workspace.TaskFinish
+       }
+       meputil.WriteResponse(t.W, t.HttpErrInf, t.HttpRsp)
+       return workspace.TaskFinish
+}
+
+func (t *SendHttpRsp) cvtHttpErrInfo(errInfo *workspace.SerErrInfo) (int, interface{}) {
+       statusCode := http.StatusBadRequest
+       var httpBody interface{}
+       switch workspace.ErrCode(errInfo.ErrCode) {
+       case SerErrServiceNotFound:
+               {
+                       //status should return bad request
+                       body := &models.ProblemDetails{
+                               Title:  "Can not found resource",
+                               Status: uint32(errInfo.ErrCode),
+                               Detail: errInfo.Message,
+                       }
+                       httpBody = body
+               }
+       case SerInstanceNotFound:
+               {
+                       statusCode = http.StatusNotFound
+                       body := &models.ProblemDetails{
+                               Title:  "Can not found resource",
+                               Status: uint32(errInfo.ErrCode),
+                               Detail: errInfo.Message,
+                       }
+                       httpBody = body
+               }
+       }
+
+       return statusCode, httpBody
+}
+
+func mp1CvtSrvDiscover(findInsResp *proto.FindInstancesResponse) (*proto.Response, []*models.ServiceInfo) {
+       resp := findInsResp.Response
+       if resp != nil && resp.GetCode() != proto.Response_SUCCESS {
+               return resp, nil
+       }
+       serviceInfos := make([]*models.ServiceInfo, 0, len(findInsResp.Instances))
+       for _, ins := range findInsResp.Instances {
+               serviceInfo := &models.ServiceInfo{}
+               serviceInfo.FromServiceInstance(ins)
+               serviceInfos = append(serviceInfos, serviceInfo)
+       }
+       return resp, serviceInfos
+
+}
diff --git a/mep/mepserver/mp1/plan_get_one_subscribe.go b/mep/mepserver/mp1/plan_get_one_subscribe.go
new file mode 100644 (file)
index 0000000..01e5d22
--- /dev/null
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "context"
+       "encoding/json"
+       "net/http"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/server/core/backend"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+       "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+
+       "mepserver/mp1/arch/workspace"
+       "mepserver/mp1/models"
+)
+
+type GetOneSubscribe struct {
+       workspace.TaskBase
+       R             *http.Request       `json:"r,in"`
+       HttpErrInf    *proto.Response     `json:"httpErrInf,out"`
+       Ctx           context.Context     `json:"ctx,in"`
+       W             http.ResponseWriter `json:"w,in"`
+       AppInstanceId string              `json:"appInstanceId,in"`
+       SubscribeId   string              `json:"subscribeId,in"`
+       HttpRsp       interface{}         `json:"httpRsp,out"`
+}
+
+func (t *GetOneSubscribe) OnRequest(data string) workspace.TaskCode {
+
+       appInstanceId := t.AppInstanceId
+       subscribeId := t.SubscribeId
+
+       opts := []registry.PluginOp{
+               registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/" + appInstanceId + "/" + subscribeId)),
+       }
+       resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+       if err != nil {
+               log.Errorf(err, "get subscription from etcd failed")
+               t.SetFirstErrorCode(OperateDataWithEtcdErr, "get subscription from etch failed")
+               return workspace.TaskFinish
+       }
+
+       if len(resp.Kvs) == 0 {
+               log.Errorf(err, "subscription not exist")
+               t.SetFirstErrorCode(SubscriptionNotFound, "subscription not exist")
+               return workspace.TaskFinish
+       }
+       sub := &models.SerAvailabilityNotificationSubscription{}
+       jsonErr := json.Unmarshal([]byte(string(resp.Kvs[0].Value)), sub)
+       if jsonErr != nil {
+               log.Warn("subscribe can not be parsed to SerAvailabilityNotificationSubscription")
+               return workspace.TaskFinish
+       }
+       t.HttpRsp = sub
+       return workspace.TaskFinish
+}
diff --git a/mep/mepserver/mp1/plan_get_one_svc.go b/mep/mepserver/mp1/plan_get_one_svc.go
new file mode 100644 (file)
index 0000000..d2a7309
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "context"
+       "net/http"
+
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/core"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+
+       "mepserver/mp1/arch/workspace"
+       "mepserver/mp1/models"
+       meputil "mepserver/mp1/util"
+)
+
+type GetOneDecode struct {
+       workspace.TaskBase
+       R           *http.Request   `json:"r,in"`
+       Ctx         context.Context `json:"ctx,out"`
+       CoreRequest interface{}     `json:"coreRequest,out"`
+}
+
+func (t *GetOneDecode) OnRequest(data string) workspace.TaskCode {
+       t.Ctx, t.CoreRequest = t.getFindParam(t.R)
+       return workspace.TaskFinish
+
+}
+
+func (t *GetOneDecode) getFindParam(r *http.Request) (context.Context, *proto.GetOneInstanceRequest) {
+       query, ids := meputil.GetHTTPTags(r)
+       mp1SrvId := query.Get(":serviceId")
+       serviceId := mp1SrvId[:len(mp1SrvId)/2]
+       instanceId := mp1SrvId[len(mp1SrvId)/2:]
+       req := &proto.GetOneInstanceRequest{
+               ConsumerServiceId:  r.Header.Get("X-ConsumerId"),
+               ProviderServiceId:  serviceId,
+               ProviderInstanceId: instanceId,
+               Tags:               ids,
+       }
+
+       ctx := util.SetTargetDomainProject(r.Context(), r.Header.Get("X-Domain-Name"), query.Get(":project"))
+       return ctx, req
+}
+
+type GetOneInstance struct {
+       workspace.TaskBase
+       HttpErrInf  *proto.Response `json:"httpErrInf,out"`
+       Ctx         context.Context `json:"ctx,in"`
+       CoreRequest interface{}     `json:"coreRequest,in"`
+       HttpRsp     interface{}     `json:"httpRsp,out"`
+}
+
+func (t *GetOneInstance) OnRequest(data string) workspace.TaskCode {
+       resp, _ := core.InstanceAPI.GetOneInstance(t.Ctx, t.CoreRequest.(*proto.GetOneInstanceRequest))
+       t.HttpErrInf = resp.Response
+       resp.Response = nil
+       mp1Rsp := &models.ServiceInfo{}
+       if resp.Instance != nil {
+               mp1Rsp.FromServiceInstance(resp.Instance)
+       } else {
+               t.SetFirstErrorCode(SerInstanceNotFound, "service instance id not found")
+               return workspace.TaskFinish
+       }
+       t.HttpRsp = mp1Rsp
+
+       return workspace.TaskFinish
+}
diff --git a/mep/mepserver/mp1/plan_get_subsribes.go b/mep/mepserver/mp1/plan_get_subsribes.go
new file mode 100644 (file)
index 0000000..cf34770
--- /dev/null
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "context"
+       "encoding/json"
+       "net/http"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/server/core/backend"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+       "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+
+       "mepserver/mp1/arch/workspace"
+       "mepserver/mp1/models"
+)
+
+type GetSubscribes struct {
+       workspace.TaskBase
+       R             *http.Request       `json:"r,in"`
+       HttpErrInf    *proto.Response     `json:"httpErrInf,out"`
+       Ctx           context.Context     `json:"ctx,in"`
+       W             http.ResponseWriter `json:"w,in"`
+       AppInstanceId string              `json:"appInstanceId,in"`
+       HttpRsp       interface{}         `json:"httpRsp,out"`
+}
+
+func (t *GetSubscribes) OnRequest(data string) workspace.TaskCode {
+
+       appInstanceId := t.AppInstanceId
+
+       opts := []registry.PluginOp{
+               registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/"+appInstanceId), registry.WithPrefix()),
+       }
+       resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+       if err != nil {
+               log.Errorf(err, "get subscription from etcd failed")
+               t.SetFirstErrorCode(OperateDataWithEtcdErr, "get subscription from etcd failed")
+               return workspace.TaskFinish
+       }
+
+       var values []string
+       for _, value := range resp.Kvs {
+               values = append(values, string(value.Value))
+       }
+       if len(values) == 0 {
+               log.Errorf(err, "get subscription failed, subscription not exist")
+               t.SetFirstErrorCode(SubscriptionNotFound, "get subscription failed, subscription not exist")
+               return workspace.TaskFinish
+       }
+       subs := make([]*models.SerAvailabilityNotificationSubscription, 0, len(values))
+       for _, val := range values {
+               sub := &models.SerAvailabilityNotificationSubscription{}
+               err := json.Unmarshal([]byte(val), sub)
+               if err != nil {
+                       log.Warn("subscribe can not be parsed to SerAvailabilityNotificationSubscription")
+                       continue
+               }
+               subs = append(subs, sub)
+       }
+       t.HttpRsp = subs
+       return workspace.TaskFinish
+}
diff --git a/mep/mepserver/mp1/plan_register_svc.go b/mep/mepserver/mp1/plan_register_svc.go
new file mode 100644 (file)
index 0000000..2a65de8
--- /dev/null
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "io/ioutil"
+       "net/http"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/core"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+       svcerr "github.com/apache/servicecomb-service-center/server/error"
+
+       "mepserver/mp1/arch/workspace"
+       "mepserver/mp1/models"
+       meputil "mepserver/mp1/util"
+)
+
+type DecodeRestReq struct {
+       workspace.TaskBase
+       R             *http.Request   `json:"r,in"`
+       Ctx           context.Context `json:"ctx,out"`
+       AppInstanceId string          `json:"appInstanceId,out"`
+       SubscribeId   string          `json:"subscribeId,out"`
+       ServiceId     string          `json:"serviceId,out"`
+       RestBody      interface{}     `json:"restBody,out"`
+}
+
+func (t *DecodeRestReq) OnRequest(data string) workspace.TaskCode {
+       t.GetParam(t.R)
+       err := t.ParseBody(t.R)
+       if err != nil {
+               log.Error("parse rest body failed", err)
+       }
+       return workspace.TaskFinish
+}
+
+func (t *DecodeRestReq) ParseBody(r *http.Request) error {
+       if t.RestBody == nil {
+               return nil
+       }
+       msg, err := ioutil.ReadAll(r.Body)
+       if err != nil {
+               log.Error("read body failed", err)
+               t.SetFirstErrorCode(SerErrFailBase, err.Error())
+               return err
+       }
+
+       newMsg, err := t.checkParam(msg)
+       if err != nil {
+               log.Error("check Param failed", err)
+               t.SetFirstErrorCode(SerErrFailBase, err.Error())
+               return err
+       }
+
+       err = json.Unmarshal(newMsg, t.RestBody)
+       if err != nil {
+               log.Errorf(err, "invalid json: %s", util.BytesToStringWithNoCopy(newMsg))
+               t.SetFirstErrorCode(SerErrFailBase, err.Error())
+               return err
+       }
+       return nil
+
+}
+
+func (t *DecodeRestReq) checkParam(msg []byte) ([]byte, error) {
+
+       var temp map[string]interface{}
+       err := json.Unmarshal(msg, &temp)
+       if err != nil {
+               log.Errorf(err, "invalid json to map: %s", util.BytesToStringWithNoCopy(msg))
+               t.SetFirstErrorCode(SerErrFailBase, err.Error())
+               return nil, err
+       }
+
+       meputil.SetMapValue(temp, "consumedLocalOnly", true)
+       meputil.SetMapValue(temp, "isLocal", true)
+       meputil.SetMapValue(temp, "scopeOfLocality", "MEC_HOST")
+
+       msg, err = json.Marshal(&temp)
+       if err != nil {
+               log.Errorf(err, "invalid map to json")
+               t.SetFirstErrorCode(SerErrFailBase, err.Error())
+               return nil, err
+       }
+
+       return msg, nil
+}
+
+func (t *DecodeRestReq) WithBody(body interface{}) *DecodeRestReq {
+       t.RestBody = body
+       return t
+}
+
+func (t *DecodeRestReq) GetParam(r *http.Request) {
+       query, _ := meputil.GetHTTPTags(r)
+       t.AppInstanceId = query.Get(":appInstanceId")
+       t.SubscribeId = query.Get(":subscriptionId")
+       t.ServiceId = query.Get(":serviceId")
+       t.Ctx = util.SetTargetDomainProject(r.Context(), r.Header.Get("X-Domain-Name"), query.Get(":project"))
+}
+
+type RegisterServiceId struct {
+       HttpErrInf *proto.Response `json:"httpErrInf,out"`
+       workspace.TaskBase
+       Ctx       context.Context `json:"ctx,in"`
+       ServiceId string          `json:"serviceId,out"`
+       RestBody  interface{}     `json:"restBody,in"`
+}
+
+func (t *RegisterServiceId) OnRequest(data string) workspace.TaskCode {
+
+       serviceInfo, ok := t.RestBody.(*models.ServiceInfo)
+       if !ok {
+               t.SetFirstErrorCode(1, "restbody failed")
+               return workspace.TaskFinish
+       }
+       req := &proto.CreateServiceRequest{}
+       serviceInfo.ToServiceRequest(req)
+       resp, err := core.ServiceAPI.Create(t.Ctx, req)
+       if err != nil {
+               log.Errorf(err, "Service Center ServiceAPI.Create fail: %s!", err.Error())
+               t.SetFirstErrorCode(1, err.Error())
+               return workspace.TaskFinish
+       }
+
+       if resp.ServiceId == "" {
+               t.HttpErrInf = resp.Response
+               log.Warn("Service id empty.")
+       }
+       t.ServiceId = resp.ServiceId
+       return workspace.TaskFinish
+}
+
+type RegisterServiceInst struct {
+       HttpErrInf *proto.Response `json:"httpErrInf,out"`
+       workspace.TaskBase
+       W             http.ResponseWriter `json:"w,in"`
+       Ctx           context.Context     `json:"ctx,in"`
+       AppInstanceId string              `json:"appInstanceId,in"`
+       ServiceId     string              `json:"serviceId,in"`
+       InstanceId    string              `json:"instanceId,out"`
+       RestBody      interface{}         `json:"restBody,in"`
+       HttpRsp       interface{}         `json:"httpRsp,out"`
+}
+
+func (t *RegisterServiceInst) OnRequest(data string) workspace.TaskCode {
+       serviceInfo, ok := t.RestBody.(*models.ServiceInfo)
+       if !ok {
+               t.SetFirstErrorCode(1, "restbody failed")
+               return workspace.TaskFinish
+       }
+       req := &proto.RegisterInstanceRequest{}
+       serviceInfo.ToRegisterInstance(req)
+       req.Instance.ServiceId = t.ServiceId
+       req.Instance.Properties["appInstanceId"] = t.AppInstanceId
+       resp, err := core.InstanceAPI.Register(t.Ctx, req)
+       if err != nil {
+               log.Errorf(err, "RegisterInstance fail: %s", t.ServiceId)
+               t.HttpErrInf = &proto.Response{}
+               t.HttpErrInf.Code = svcerr.ErrForbidden
+               t.HttpErrInf.Message = err.Error()
+               return workspace.TaskFinish
+       }
+       t.InstanceId = resp.InstanceId
+
+       //build response serviceComb use serviceId + InstanceId to mark a service instance
+       mp1SerId := t.ServiceId + t.InstanceId
+       serviceInfo.SerInstanceId = mp1SerId
+       t.HttpRsp = serviceInfo
+
+       location := fmt.Sprintf("/mep/mp1/v1/services/%s", mp1SerId)
+       t.W.Header().Set("Location", location)
+       return workspace.TaskFinish
+}
diff --git a/mep/mepserver/mp1/plan_send_httprsp_created.go b/mep/mepserver/mp1/plan_send_httprsp_created.go
new file mode 100644 (file)
index 0000000..dce8a4e
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "net/http"
+
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+
+       "mepserver/mp1/arch/workspace"
+       "mepserver/mp1/util"
+)
+
+type SendHttpRspCreated struct {
+       HttpErrInf *proto.Response `json:"httpErrInf,in"`
+       workspace.TaskBase
+       W       http.ResponseWriter `json:"w,in"`
+       HttpRsp interface{}         `json:"httpRsp,in"`
+}
+
+func (t *SendHttpRspCreated) OnRequest(data string) workspace.TaskCode {
+       util.WriteHTTPResponse(t.W, t.HttpErrInf, t.HttpRsp)
+       return workspace.TaskFinish
+}
diff --git a/mep/mepserver/mp1/plan_subscribe_app.go b/mep/mepserver/mp1/plan_subscribe_app.go
new file mode 100644 (file)
index 0000000..c93ab27
--- /dev/null
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "net/http"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/server/core/backend"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+       "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+       "github.com/satori/go.uuid"
+
+       "mepserver/mp1/arch/workspace"
+       "mepserver/mp1/models"
+)
+
+type SubscribeIst struct {
+       workspace.TaskBase
+       R             *http.Request       `json:"r,in"`
+       HttpErrInf    *proto.Response     `json:"httpErrInf,out"`
+       Ctx           context.Context     `json:"ctx,in"`
+       W             http.ResponseWriter `json:"w,in"`
+       RestBody      interface{}         `json:"restBody,in"`
+       AppInstanceId string              `json:"appInstanceId,in"`
+       SubscribeId   string              `json:"subscribeId,in"`
+       HttpRsp       interface{}         `json:"httpRsp,out"`
+}
+
+func (t *SubscribeIst) OnRequest(data string) workspace.TaskCode {
+
+       mp1SubscribeInfo, ok := t.RestBody.(*models.SerAvailabilityNotificationSubscription)
+       if !ok {
+               t.SetFirstErrorCode(SerErrFailBase, "restBody failed")
+               return workspace.TaskFinish
+       }
+
+       appInstanceId := t.AppInstanceId
+       subscribeId := uuid.NewV4().String()
+       t.SubscribeId = subscribeId
+       subscribeJSON, err := json.Marshal(mp1SubscribeInfo)
+       if err != nil {
+               log.Errorf(err, "can not Marshal subscribe info")
+               t.SetFirstErrorCode(ParseInfoErr, "can not marshal subscribe info")
+               return workspace.TaskFinish
+       }
+       opts := []registry.PluginOp{
+               registry.OpPut(registry.WithStrKey("/cse-sr/etsi/subscribe/"+appInstanceId+"/"+subscribeId),
+                       registry.WithValue(subscribeJSON)),
+       }
+       _, resultErr := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+       if resultErr != nil {
+               log.Errorf(err, "subscription to etcd failed!")
+               t.SetFirstErrorCode(OperateDataWithEtcdErr, "put subscription to etcd failed")
+               return workspace.TaskFinish
+       }
+
+       req := &proto.WatchInstanceRequest{SelfServiceId: appInstanceId[:len(appInstanceId)/2]}
+       t.R.Method = "WATCHLIST"
+       WebsocketListAndWatch(t.Ctx, req, appInstanceId)
+       t.buildResponse(mp1SubscribeInfo)
+
+       return workspace.TaskFinish
+}
+
+func (t *SubscribeIst) buildResponse(sub *models.SerAvailabilityNotificationSubscription) {
+       appInstanceID := t.AppInstanceId
+       subscribeID := t.SubscribeId
+
+       t.HttpRsp = sub
+       location := fmt.Sprintf("/mec_service_mgmt/v1/applications/%s/subscriptions/%s/", appInstanceID, subscribeID)
+       t.W.Header().Set("Location", location)
+}
diff --git a/mep/mepserver/mp1/plan_update_svc.go b/mep/mepserver/mp1/plan_update_svc.go
new file mode 100644 (file)
index 0000000..4e19194
--- /dev/null
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "context"
+
+       "github.com/apache/servicecomb-service-center/pkg/util"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+       svcutil "github.com/apache/servicecomb-service-center/server/service/util"
+
+       "mepserver/mp1/arch/workspace"
+       "mepserver/mp1/models"
+       meputil "mepserver/mp1/util"
+)
+
+type UpdateInstance struct {
+       workspace.TaskBase
+       HttpErrInf *proto.Response `json:"httpErrInf,out"`
+       Ctx        context.Context `json:"ctx,in"`
+       ServiceId  string          `json:"serviceId,in"`
+       RestBody   interface{}     `json:"restBody,in"`
+       HttpRsp    interface{}     `json:"httpRsp,out"`
+}
+
+func (t *UpdateInstance) OnRequest(data string) workspace.TaskCode {
+       if t.ServiceId == "" {
+               t.SetFirstErrorCode(SerErrFailBase, "param is empty")
+               return workspace.TaskFinish
+       }
+       mp1Ser, ok := t.RestBody.(*models.ServiceInfo)
+       if !ok {
+               t.SetFirstErrorCode(SerErrFailBase, "body invalid")
+               return workspace.TaskFinish
+       }
+
+       instance, err := meputil.GetServiceInstance(t.Ctx, t.ServiceId)
+       if err != nil {
+               t.SetFirstErrorCode(SerInstanceNotFound, "find service failed")
+               return workspace.TaskFinish
+       }
+
+       copyInstanceRef := *instance
+       req := proto.RegisterInstanceRequest{
+               Instance: &copyInstanceRef,
+       }
+       mp1Ser.ToRegisterInstance(&req)
+
+       domainProject := util.ParseDomainProject(t.Ctx)
+       centerErr := svcutil.UpdateInstance(t.Ctx, domainProject, &copyInstanceRef)
+       if centerErr != nil {
+               t.SetFirstErrorCode(SerErrServiceUpdFailed, "update service failed")
+               return workspace.TaskFinish
+       }
+
+       err = meputil.Heartbeat(t.Ctx, mp1Ser.SerInstanceId)
+       if err != nil {
+               t.SetFirstErrorCode(SerErrServiceUpdFailed, "heartbeat failed")
+               return workspace.TaskFinish
+       }
+       mp1Ser.SerInstanceId = instance.ServiceId + instance.InstanceId
+       t.HttpRsp = mp1Ser
+       t.HttpErrInf = proto.CreateResponse(proto.Response_SUCCESS, "Update service instance success.")
+       return workspace.TaskFinish
+}
diff --git a/mep/mepserver/mp1/publisher.go b/mep/mepserver/mp1/publisher.go
new file mode 100644 (file)
index 0000000..bc2c09a
--- /dev/null
@@ -0,0 +1,105 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "sync"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/pkg/gopool"
+       "golang.org/x/net/context"
+)
+
+type Publisher struct {
+       wss       []*Websocket
+       goroutine *gopool.Pool
+       lock      sync.Mutex
+}
+
+func (p *Publisher) Run() {
+       gopool.Go(publisher.loop)
+}
+
+func (p *Publisher) loop(ctx context.Context) {
+       defer p.Stop()
+       ticker := time.NewTicker(500 * time.Millisecond)
+       for {
+               select {
+               case <-ctx.Done():
+                       return
+               case <-ticker.C:
+                       var removes []int
+                       for i, ws := range p.wss {
+                               payload := ws.Pick()
+                               if payload == nil {
+                                       continue
+                               }
+                               _, ok := payload.(error)
+                               if ok {
+                                       removes = append(removes, i)
+                               }
+                               p.dispatch(ws, payload)
+                       }
+                       if len(removes) == 0 {
+                               continue
+                       }
+                       p.lock.Lock()
+                       var (
+                               news []*Websocket
+                               s    int
+                       )
+                       for _, e := range removes {
+                               news = append(news, p.wss[s:e]...)
+                               s = e + 1
+                       }
+                       if s < len(p.wss) {
+                               news = append(news, p.wss[s:]...)
+                       }
+                       p.wss = news
+                       p.lock.Unlock()
+               }
+       }
+}
+
+func (p *Publisher) Stop() {
+       p.goroutine.Close(true)
+}
+
+func (p *Publisher) dispatch(ws *Websocket, payload interface{}) {
+       p.goroutine.Do(func(ctx context.Context) {
+               ws.HandleWatchWebSocketJob(payload)
+       })
+}
+
+func (p *Publisher) Accept(ws *Websocket) {
+       p.lock.Lock()
+       p.wss = append(p.wss, ws)
+       p.lock.Unlock()
+}
+
+var publisher *Publisher
+
+func init() {
+       publisher = NewPublisher()
+       publisher.Run()
+}
+
+func NewPublisher() *Publisher {
+       return &Publisher{
+               goroutine: gopool.New(context.Background()),
+       }
+}
diff --git a/mep/mepserver/mp1/websocket.go b/mep/mepserver/mp1/websocket.go
new file mode 100644 (file)
index 0000000..f921fde
--- /dev/null
@@ -0,0 +1,255 @@
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+       "context"
+       "encoding/json"
+       "fmt"
+       "io"
+       "net/http"
+       "strings"
+       "time"
+
+       "github.com/apache/servicecomb-service-center/pkg/log"
+       "github.com/apache/servicecomb-service-center/server/core/backend"
+       "github.com/apache/servicecomb-service-center/server/core/proto"
+       "github.com/apache/servicecomb-service-center/server/notify"
+       "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+
+       "mepserver/mp1/models"
+)
+
+type Websocket struct {
+       watcher         *notify.InstanceEventListWatcher
+       ticker          *time.Ticker
+       ctx             context.Context
+       needPingWatcher bool
+       free            chan struct{}
+       closed          chan struct{}
+       serviceID       string
+}
+
+func (ws *Websocket) Init() error {
+       ws.ticker = time.NewTicker(notify.HeartbeatInterval)
+       ws.needPingWatcher = true
+       ws.free = make(chan struct{}, 1)
+       ws.closed = make(chan struct{})
+       ws.SetReady()
+       if err := notify.NotifyCenter().AddSubscriber(ws.watcher); err != nil {
+               return err
+       }
+       publisher.Accept(ws)
+       return nil
+}
+
+func (ws *Websocket) ReadTimeout() time.Duration {
+       return notify.ReadTimeout
+}
+
+func (ws *Websocket) SendTimeout() time.Duration {
+       return notify.SendTimeout
+}
+
+func (ws *Websocket) HandleWatchWebSocketControlMessage() {
+
+}
+
+func (ws *Websocket) HandleWatchWebSocketJob(payload interface{}) {
+       defer ws.SetReady()
+       var (
+               job *notify.InstanceEvent
+       )
+       switch v := payload.(type) {
+       case error:
+               err := payload.(error)
+               log.Errorf(err, "watcher catch an error, subject: %s, group: %s", ws.watcher.Subject(), ws.watcher.Group())
+       case time.Time:
+               return
+       case *notify.InstanceEvent:
+               serviceID := ws.serviceID
+               job = payload.(*notify.InstanceEvent)
+               resp := job.Response
+               SendMsgToApp(resp, serviceID)
+       default:
+               log.Errorf(nil, "watcher unknown input type %T, subject: %s, group: %s", v, ws.watcher.Subject(), ws.watcher.Group())
+               return
+       }
+
+       select {
+       case _, ok := <-ws.closed:
+               if !ok {
+                       log.Warn("websocket channel closed")
+               }
+               return
+       default:
+       }
+}
+
+func (ws *Websocket) SetReady() {
+       select {
+       case ws.free <- struct{}{}:
+       default:
+       }
+
+}
+
+func (ws *Websocket) Pick() interface{} {
+       select {
+       case _, ok := <-ws.Ready():
+               if !ok {
+                       log.Warn("websocket ready channel closed")
+               }
+               if ws.watcher.Err() != nil {
+                       return ws.watcher.Err()
+               }
+
+               select {
+               case t, ok := <-ws.ticker.C:
+                       if !ok {
+                               log.Warn("websocket ticker C channel closed")
+                       }
+                       return t
+               case j, ok := <-ws.watcher.Job:
+                       if !ok {
+                               log.Warn("websocket watcher job channel closed")
+                       }
+                       if j == nil {
+                               err := fmt.Errorf("server shutdown")
+                               log.Error("server shutdown", err)
+                       }
+                       return j
+               default:
+                       ws.SetReady()
+               }
+       default:
+       }
+       return nil
+}
+
+func (ws *Websocket) Ready() chan struct{} {
+       return ws.free
+}
+
+func (ws *Websocket) Stop() {
+       close(ws.closed)
+}
+
+func getCallBackUris(serviceID string, instanceID string, serName string) []string {
+       var callbackUris []string
+       opts := []registry.PluginOp{
+               registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/"+serviceID), registry.WithPrefix()),
+       }
+       resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+       if err != nil {
+               log.Errorf(err, "get subcription from etcd failed!")
+               return callbackUris
+       }
+       for _, v := range resp.Kvs {
+               var notifyInfo models.SerAvailabilityNotificationSubscription
+               if v.Value == nil {
+                       log.Warn("the value is nil in etcd")
+                       continue
+               }
+               err = json.Unmarshal(v.Value, &notifyInfo)
+               if err != nil {
+                       log.Warn("notify json can not be parsed to notifyInfo")
+                       continue
+               }
+               callbackURI := notifyInfo.CallbackReference
+               filter := notifyInfo.FilteringCriteria
+
+               if (len(filter.SerInstanceIds) != 0 && StringContains(filter.SerInstanceIds, instanceID) != -1) ||
+                       (len(filter.SerNames) != 0 && StringContains(filter.SerNames, serviceID) != -1) {
+                       callbackUris = append(callbackUris, callbackURI)
+               }
+       }
+       log.Infof("send to consumerIds: %s", callbackUris)
+       return callbackUris
+}
+
+func StringContains(arr []string, val string) (index int) {
+       index = -1
+       for i := 0; i < len(arr); i++ {
+               if arr[i] == val {
+                       index = i
+                       return
+               }
+       }
+       return
+}
+
+func SendMsgToApp(data *proto.WatchInstanceResponse, serviceID string) {
+       // transfer data to instanceInfo, and get instaceid, serviceName
+       instanceID := data.Instance.ServiceId + data.Instance.InstanceId
+       serName := data.Instance.Properties["serName"]
+       action := data.Action
+       instanceInfo := data.Instance
+       instanceStr, err := json.Marshal(instanceInfo)
+       if err != nil {
+               log.Errorf(err, "parse instanceInfo failed!")
+               return
+       }
+
+       callbackUris := getCallBackUris(serviceID, instanceID, serName)
+       body := strings.NewReader(string(instanceStr))
+       doSend(action, body, callbackUris)
+}
+
+func doSend(action string, body io.Reader, callbackUris []string) {
+       for _, callbackURI := range callbackUris {
+               log.Debugf("action: %s with callbackURI:%s", action, callbackURI)
+               if !strings.HasPrefix(callbackURI, "http") {
+                       callbackURI = "http://" + callbackURI
+               }
+               client := http.Client{}
+
+               if action == "CREATE" {
+                       contentType := "application/x-www-form-urlencoded"
+                       _, err := http.Post(callbackURI, contentType, body)
+                       if err != nil {
+                               log.Warn("the consumer handle post action failed!")
+                       }
+               } else if action == "DELETE" {
+                       req, err := http.NewRequest("delete", callbackURI, body)
+                       if err != nil {
+                               _, err := client.Do(req)
+                               if err != nil {
+                                       log.Warn("the consumer handle delete action failed!")
+                               }
+
+                       } else {
+                               log.Errorf(err, "crate request failed!")
+                       }
+               } else if action == "UPDATE" {
+                       req, err := http.NewRequest("put", callbackURI, body)
+                       if err != nil {
+                               _, err := client.Do(req)
+                               if err != nil {
+                                       log.Warn("the consumer handle update action failed!")
+                               }
+
+                       } else {
+                               log.Errorf(err, "crate request failed!")
+                       }
+               }
+       }
+}
+
+func DoWebSocketListAndWatchV2(ctx context.Context, id string, id2 string, f func() ([]*proto.WatchInstanceResponse, int64)) {
+       //TBD
+}