--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "context"
+ "net/http"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/rest"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+ "github.com/apache/servicecomb-service-center/server/notify"
+ v4 "github.com/apache/servicecomb-service-center/server/rest/controller/v4"
+ svcutil "github.com/apache/servicecomb-service-center/server/service/util"
+
+ "mepserver/mp1/arch/workspace"
+ "mepserver/mp1/models"
+)
+
+const (
+ basePath = "/mep/mec_service_mgmt/v1"
+ servicesPath = basePath + "/services"
+ appServicesPath = basePath + "/applications/:appInstanceId" + "/services"
+ appSubscribePath = basePath + "/applications/:appInstanceId/subscriptions"
+)
+
+const (
+ SerErrFailBase workspace.ErrCode = workspace.TaskFail
+ SerErrServiceNotFound = 2
+ SerInstanceNotFound = 3
+ ParseInfoErr = 4
+ SubscriptionNotFound = 5
+ OperateDataWithEtcdErr = 6
+ SerErrServiceDelFailed = 7
+ SerErrServiceUpdFailed = 8
+)
+
+type APIHookFunc func() models.EndPointInfo
+
+type APIGwHook struct {
+ APIHook APIHookFunc
+}
+
+var apihook APIGwHook
+
+func SetAPIHook(hook APIGwHook) {
+ apihook = hook
+}
+
+func GetApiHook() APIGwHook {
+ return apihook
+}
+
+func init() {
+ initRouter()
+}
+
+func initRouter() {
+ rest.
+ RegisterServant(&Mp1Service{})
+}
+
+type Mp1Service struct {
+ v4.MicroServiceService
+}
+
+func (m *Mp1Service) URLPatterns() []rest.Route {
+ return []rest.Route{
+ // appSubscriptions
+ {Method: rest.HTTP_METHOD_POST, Path: appSubscribePath, Func: doAppSubscribe},
+ {Method: rest.HTTP_METHOD_GET, Path: appSubscribePath, Func: getAppSubscribes},
+ {Method: rest.HTTP_METHOD_GET, Path: appSubscribePath + "/:subscriptionId", Func: getOneAppSubscribe},
+ {Method: rest.HTTP_METHOD_DELETE, Path: appSubscribePath + "/:subscriptionId", Func: delOneAppSubscribe},
+ // appServices
+ {Method: rest.HTTP_METHOD_POST, Path: appServicesPath, Func: serviceRegister},
+ {Method: rest.HTTP_METHOD_GET, Path: appServicesPath, Func: serviceDiscover},
+ {Method: rest.HTTP_METHOD_PUT, Path: appServicesPath + "/:serviceId", Func: serviceUpdate},
+ {Method: rest.HTTP_METHOD_GET, Path: appServicesPath + "/:serviceId", Func: getOneService},
+ {Method: rest.HTTP_METHOD_DELETE, Path: appServicesPath + "/:serviceId", Func: serviceDelete},
+ // services
+ {Method: rest.HTTP_METHOD_GET, Path: servicesPath, Func: serviceDiscover},
+ {Method: rest.HTTP_METHOD_GET, Path: servicesPath + "/:serviceId", Func: getOneService},
+ }
+}
+
+func doAppSubscribe(w http.ResponseWriter, r *http.Request) {
+
+ workPlan := NewWorkSpace(w, r)
+ workPlan.Try(
+ (&DecodeRestReq{}).WithBody(&models.SerAvailabilityNotificationSubscription{}),
+ &SubscribeIst{})
+ workPlan.Finally(&SendHttpRspCreated{})
+
+ workspace.WkRun(workPlan)
+}
+
+func getAppSubscribes(w http.ResponseWriter, r *http.Request) {
+
+ workPlan := NewWorkSpace(w, r)
+ workPlan.Try(
+ &DecodeRestReq{},
+ &GetSubscribes{})
+ workPlan.Finally(&SendHttpRsp{})
+
+ workspace.WkRun(workPlan)
+}
+
+func getOneAppSubscribe(w http.ResponseWriter, r *http.Request) {
+
+ workPlan := NewWorkSpace(w, r)
+ workPlan.Try(
+ &DecodeRestReq{},
+ &GetOneSubscribe{})
+ workPlan.Finally(&SendHttpRsp{})
+
+ workspace.WkRun(workPlan)
+}
+
+func delOneAppSubscribe(w http.ResponseWriter, r *http.Request) {
+
+ workPlan := NewWorkSpace(w, r)
+ workPlan.Try(
+ &DecodeRestReq{},
+ &DelOneSubscribe{})
+ workPlan.Finally(&SendHttpRsp{})
+
+ workspace.WkRun(workPlan)
+}
+
+func serviceRegister(w http.ResponseWriter, r *http.Request) {
+ log.Info("Register service start...")
+
+ workPlan := NewWorkSpace(w, r)
+ workPlan.Try(
+ (&DecodeRestReq{}).WithBody(&models.ServiceInfo{}),
+ &RegisterServiceId{},
+ &RegisterServiceInst{})
+ workPlan.Finally(&SendHttpRspCreated{})
+
+ workspace.WkRun(workPlan)
+}
+
+func serviceDiscover(w http.ResponseWriter, r *http.Request) {
+ log.Info("Discover service service start...")
+
+ workPlan := NewWorkSpace(w, r)
+ workPlan.Try(
+ &DiscoverDecode{},
+ &DiscoverService{},
+ &ToStrDiscover{},
+ &RspHook{})
+ workPlan.Finally(&SendHttpRsp{})
+
+ workspace.WkRun(workPlan)
+}
+
+func serviceUpdate(w http.ResponseWriter, r *http.Request) {
+ log.Info("Update a service start...")
+
+ workPlan := NewWorkSpace(w, r)
+ workPlan.Try(
+ (&DecodeRestReq{}).WithBody(&models.ServiceInfo{}),
+ &UpdateInstance{})
+ workPlan.Finally(&SendHttpRsp{})
+
+ workspace.WkRun(workPlan)
+}
+
+func getOneService(w http.ResponseWriter, r *http.Request) {
+ log.Info("Register service start...")
+
+ workPlan := NewWorkSpace(w, r)
+ workPlan.Try(
+ &GetOneDecode{},
+ &GetOneInstance{})
+ workPlan.Finally(&SendHttpRsp{})
+
+ workspace.WkRun(workPlan)
+
+}
+
+func serviceDelete(w http.ResponseWriter, r *http.Request) {
+ log.Info("Delete a service start...")
+
+ workPlan := NewWorkSpace(w, r)
+ workPlan.Try(
+ &DecodeRestReq{},
+ &DeleteService{})
+ workPlan.Finally(&SendHttpRsp{})
+
+ workspace.WkRun(workPlan)
+}
+
+func WebsocketListAndWatch(ctx context.Context, req *proto.WatchInstanceRequest, consumerSvcId string) {
+ if req == nil || len(req.SelfServiceId) == 0 {
+ log.Warn("request fomat invalid!")
+ return
+ }
+ domainProject := util.ParseDomainProject(ctx)
+ if !svcutil.ServiceExist(ctx, domainProject, req.SelfServiceId) {
+ log.Warn("service does not exist!")
+ return
+ }
+ DoWebsocketListAndWatch(ctx, req.SelfServiceId, consumerSvcId, func() ([]*proto.WatchInstanceResponse, int64) {
+ return svcutil.QueryAllProvidersInstances(ctx, req.SelfServiceId)
+ })
+}
+
+func DoWebsocketListAndWatch(ctx context.Context, serviceId string, consumerSvcId string, f func() ([]*proto.WatchInstanceResponse, int64)) {
+ domainProject := util.ParseDomainProject(ctx)
+ socket := &Websocket{
+ ctx: ctx,
+ watcher: notify.NewInstanceEventListWatcher(serviceId, domainProject, f),
+ serviceID: consumerSvcId,
+ }
+ ProcessSocket(socket)
+}
+
+func ProcessSocket(socket *Websocket) {
+ if err := socket.Init(); err != nil {
+ return
+ }
+ socket.HandleWatchWebSocketControlMessage()
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "encoding/json"
+ "strings"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ apt "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/core/backend"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+ "github.com/apache/servicecomb-service-center/server/notify"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+ "github.com/apache/servicecomb-service-center/server/service/cache"
+ "github.com/apache/servicecomb-service-center/server/service/metrics"
+ svcutil "github.com/apache/servicecomb-service-center/server/service/util"
+ "golang.org/x/net/context"
+
+ "mepserver/mp1/models"
+)
+
+const indexStart, indexEnd = 33, 17
+
+type InstanceEtsiEventHandler struct {
+}
+
+func (h *InstanceEtsiEventHandler) Type() discovery.Type {
+ return backend.INSTANCE
+}
+
+func (h *InstanceEtsiEventHandler) OnEvent(evt discovery.KvEvent) {
+ action := evt.Type
+ instance, ok := evt.KV.Value.(*proto.MicroServiceInstance)
+ if !ok {
+ log.Warn("cast to instance failed")
+ }
+ providerId, providerInstanceId, domainProject := apt.GetInfoFromInstKV(evt.KV.Key)
+ idx := strings.Index(domainProject, "/")
+ domainName := domainProject[:idx]
+ switch action {
+ case proto.EVT_INIT:
+ metrics.ReportInstances(domainName, 1)
+ return
+ case proto.EVT_CREATE:
+ metrics.ReportInstances(domainName, 1)
+ case proto.EVT_DELETE:
+ metrics.ReportInstances(domainName, -1)
+ if !apt.IsDefaultDomainProject(domainProject) {
+ projectName := domainProject[idx+1:]
+ svcutil.RemandInstanceQuota(util.SetDomainProject(context.Background(), domainName, projectName))
+ }
+ }
+
+ if notify.NotifyCenter().Closed() {
+ log.Warnf("caught [%s] instance [%s/%s] event, endpoints %v, but notify service is closed",
+ action, providerId, providerInstanceId, instance.Endpoints)
+ return
+ }
+
+ ctx := context.WithValue(context.WithValue(context.Background(),
+ svcutil.CTX_CACHEONLY, "1"),
+ svcutil.CTX_GLOBAL, "1")
+ ms, err := svcutil.GetService(ctx, domainProject, providerId)
+ if ms == nil {
+ log.Errorf(err, "caught [%s] instance [%s/%s] event, endpoints %v, get cached provider's file failed",
+ action, providerId, providerInstanceId, instance.Endpoints)
+ return
+ }
+
+ log.Infof("caught [%s] service[%s][%s/%s/%s/%s] isntance[%s] event, endpoints %v",
+ action, providerId, ms.Environment, ms.AppId, ms.ServiceName, ms.Version, providerInstanceId, instance.Endpoints)
+
+ consumerIds := getCosumerIds()
+
+ log.Infof("there are %d consuemrIDs, %s", len(consumerIds), consumerIds)
+ PublishInstanceEvent(evt, domainProject, proto.MicroServiceToKey(domainProject, ms), consumerIds)
+}
+
+func getCosumerIds() []string {
+ var consumerIds []string
+ opts := []registry.PluginOp{
+ registry.OpGet(registry.WithStrKey("/cse-sr/inst/files"), registry.WithPrefix()),
+ }
+ resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+ if err != nil {
+ log.Errorf(err, "get subscription from etcd failed")
+ return consumerIds
+ }
+
+ for _, kvs := range resp.Kvs {
+ key := kvs.Key
+ keystring := string(key)
+ value := kvs.Value
+
+ var mp1Req models.ServiceInfo
+ err = json.Unmarshal(value, &mp1Req)
+ if err != nil {
+ log.Errorf(err, "parse serviceInfo failed")
+ }
+ length := len(keystring)
+ keystring = keystring[length-indexStart : length-indexEnd]
+ if StringContains(consumerIds, keystring) == -1 {
+ consumerIds = append(consumerIds, keystring)
+ }
+ }
+ return consumerIds
+}
+
+func NewInstanceEtsiEventHandler() *InstanceEtsiEventHandler {
+ return &InstanceEtsiEventHandler{}
+}
+
+func PublishInstanceEvent(evt discovery.KvEvent, domainProject string, serviceKey *proto.MicroServiceKey, subscribers []string) {
+ defer cache.FindInstances.Remove(serviceKey)
+ if len(subscribers) == 0 {
+ log.Warn("the subscribers size is 0")
+ return
+ }
+
+ response := &proto.WatchInstanceResponse{
+ Response: proto.CreateResponse(proto.Response_SUCCESS, "Watch instance successfully."),
+ Action: string(evt.Type),
+ Key: serviceKey,
+ Instance: evt.KV.Value.(*proto.MicroServiceInstance),
+ }
+ for _, consumerId := range subscribers {
+ job := notify.NewInstanceEventWithTime(consumerId, domainProject, evt.Revision, evt.CreateAt, response)
+ err := notify.NotifyCenter().Publish(job)
+ if err != nil {
+ log.Errorf(err, "publish failed")
+ }
+ }
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package mp1
+
+import (
+ "net/http"
+ "net/url"
+
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+ "golang.org/x/net/context"
+
+ "mepserver/mp1/arch/workspace"
+)
+
+type MepSpace struct {
+ workspace.SpaceBase
+ R *http.Request `json:"r"`
+ W http.ResponseWriter `json:"w"`
+
+ Ctx context.Context `json:"ctx"`
+ ServiceId string `json:"serviceId"`
+ RestBody interface{} `json:"restBody"`
+ AppInstanceId string `json:"appInstanceId"`
+ InstanceId string `json:"instanceId"`
+ SubscribeId string `json:"subscribeId"`
+ QueryParam url.Values `json:"queryParam"`
+
+ CoreRequest interface{} `json:"coreRequest"`
+ CoreRsp interface{} `json:"coreRsp"`
+ HttPErrInf *proto.Response `json:"httpErrInf"`
+ HttPRsp interface{} `json:"httpRsp"`
+}
+
+func NewWorkSpace(w http.ResponseWriter, r *http.Request) *MepSpace {
+ var plan = MepSpace{
+ W: w,
+ R: r,
+ }
+
+ plan.Init()
+ return &plan
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ mgr "github.com/apache/servicecomb-service-center/server/plugin"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/quota"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/quota/buildin"
+)
+
+func init() {
+ mgr.RegisterPlugin(mgr.Plugin{PName: mgr.QUOTA, Name: "buildin", New: New})
+ discovery.AddEventHandler(NewInstanceEtsiEventHandler())
+}
+
+func New() mgr.PluginInstance {
+ buildin.InitConfigs()
+ log.Infof("quota init, service: %d, instance: %d, schema: %d/service, tag: %d/service, rule: %d/service",
+ quota.DefaultServiceQuota, quota.DefaultInstanceQuota, quota.DefaultSchemaQuota, quota.DefaultTagQuota, quota.DefaultRuleQuota)
+ return &buildin.BuildInQuota{}
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "context"
+ "net/http"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/core/backend"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+
+ "mepserver/mp1/arch/workspace"
+)
+
+type DelOneSubscribe struct {
+ workspace.TaskBase
+ R *http.Request `json:"r,in"`
+ HttpErrInf *proto.Response `json:"httpErrInf,out"`
+ Ctx context.Context `json:"ctx,in"`
+ W http.ResponseWriter `json:"w,in"`
+ AppInstanceId string `json:"appInstanceId,in"`
+ SubscribeId string `json:"subscribeId,in"`
+ HttpRsp interface{} `json:"httpRsp,out"`
+}
+
+func (t *DelOneSubscribe) OnRequest(data string) workspace.TaskCode {
+
+ appInstanceId := t.AppInstanceId
+ subscribeId := t.SubscribeId
+
+ opts := []registry.PluginOp{
+ registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/" + appInstanceId + "/" + subscribeId)),
+ }
+ resp, errGet := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+ if errGet != nil {
+ log.Errorf(errGet, "get subscription from etcd failed")
+ t.SetFirstErrorCode(OperateDataWithEtcdErr, "get subscription from etch failed")
+ return workspace.TaskFinish
+ }
+
+ if len(resp.Kvs) == 0 {
+ log.Errorf(errGet, "subscription not exist")
+ t.SetFirstErrorCode(SubscriptionNotFound, "subscription not exist")
+ return workspace.TaskFinish
+ }
+
+ opts = []registry.PluginOp{
+ registry.OpDel(registry.WithStrKey("/cse-sr/etsi/subscribe/" + appInstanceId + "/" + subscribeId)),
+ }
+ _, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+ if err != nil {
+ log.Errorf(err, "delete subscription from etcd failed")
+ t.SetFirstErrorCode(OperateDataWithEtcdErr, "delete subscription from etch failed")
+ return workspace.TaskFinish
+ }
+
+ t.HttpRsp = ""
+ return workspace.TaskFinish
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "context"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+
+ "mepserver/mp1/arch/workspace"
+)
+
+type DeleteService struct {
+ HttpErrInf *proto.Response `json:"httpErrInf,out"`
+ workspace.TaskBase
+ Ctx context.Context `json:"ctx,in"`
+ ServiceId string `json:"serviceId,in"`
+}
+
+func (t *DeleteService) OnRequest(data string) workspace.TaskCode {
+ if t.ServiceId == "" {
+ t.SetFirstErrorCode(SerErrServiceDelFailed, "param is empty")
+ return workspace.TaskFinish
+ }
+ serviceID := t.ServiceId[:len(t.ServiceId)/2]
+ instanceID := t.ServiceId[len(t.ServiceId)/2:]
+ req := &proto.UnregisterInstanceRequest{
+ ServiceId: serviceID,
+ InstanceId: instanceID,
+ }
+ resp, err := core.InstanceAPI.Unregister(t.Ctx, req)
+ if err != nil {
+ log.Errorf(err, "Service delete failed!")
+ }
+ t.HttpErrInf = resp.Response
+
+ return workspace.TaskFinish
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "context"
+ "net/http"
+ "net/url"
+
+ "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+
+ "mepserver/mp1/arch/workspace"
+ "mepserver/mp1/models"
+ meputil "mepserver/mp1/util"
+)
+
+type DiscoverDecode struct {
+ workspace.TaskBase
+ R *http.Request `json:"r,in"`
+ Ctx context.Context `json:"ctx,out"`
+ QueryParam url.Values `json:"queryParam,out"`
+ CoreRequest interface{} `json:"coreRequest,out"`
+}
+
+func (t *DiscoverDecode) OnRequest(data string) workspace.TaskCode {
+ t.Ctx, t.CoreRequest, t.QueryParam = meputil.GetFindParam(t.R)
+ return workspace.TaskFinish
+}
+
+type DiscoverService struct {
+ workspace.TaskBase
+ Ctx context.Context `json:"ctx,in"`
+ QueryParam url.Values `json:"queryParam,in"`
+ CoreRequest interface{} `json:"coreRequest,in"`
+ CoreRsp interface{} `json:"coreRsp,out"`
+}
+
+func (t *DiscoverService) checkInstanceId(req *proto.FindInstancesRequest) bool {
+ instanceId := req.AppId
+ if instanceId != "default" {
+ instances := t.CoreRsp.(*proto.FindInstancesResponse).Instances
+ for _, val := range instances {
+ if val.ServiceId+val.InstanceId == instanceId {
+ return true
+ }
+ }
+ return false
+ }
+ return true
+}
+
+func (t *DiscoverService) OnRequest(data string) workspace.TaskCode {
+ req, ok := t.CoreRequest.(*proto.FindInstancesRequest)
+ if !ok {
+ t.SetFirstErrorCode(SerErrServiceNotFound, "cast to request fail")
+ return workspace.TaskFinish
+ }
+ if req.ServiceName == "" {
+ var errFindByKey error
+ t.CoreRsp, errFindByKey = meputil.FindInstanceByKey(t.QueryParam)
+ if errFindByKey != nil || t.CoreRsp == nil {
+ t.SetFirstErrorCode(SerErrServiceNotFound, errFindByKey.Error())
+ return workspace.TaskFinish
+ }
+ if !t.checkInstanceId(req) {
+ t.SetFirstErrorCode(SerErrServiceNotFound, "instance id not found")
+ }
+ return workspace.TaskFinish
+ }
+
+ findInstance, err := core.InstanceAPI.Find(t.Ctx, req)
+ if err != nil {
+ t.SetFirstErrorCode(SerErrServiceNotFound, err.Error())
+ return workspace.TaskFinish
+ }
+ if findInstance == nil || len(findInstance.Instances) == 0 {
+ t.SetFirstErrorCode(SerErrServiceNotFound, "service not found")
+ return workspace.TaskFinish
+ }
+
+ t.CoreRsp = findInstance
+ return workspace.TaskFinish
+}
+
+type ToStrDiscover struct {
+ HttpErrInf *proto.Response `json:"httpErrInf,out"`
+ workspace.TaskBase
+ CoreRsp interface{} `json:"coreRsp,in"`
+ HttpRsp interface{} `json:"httpRsp,out"`
+}
+
+func (t *ToStrDiscover) OnRequest(data string) workspace.TaskCode {
+ t.HttpErrInf, t.HttpRsp = mp1CvtSrvDiscover(t.CoreRsp.(*proto.FindInstancesResponse))
+ return workspace.TaskFinish
+}
+
+type RspHook struct {
+ R *http.Request `json:"r,in"`
+ workspace.TaskBase
+ Ctx context.Context `json:"ctx,in"`
+ HttpRsp interface{} `json:"httpRsp,in"`
+ HookRsp interface{} `json:"hookRsp,out"`
+}
+
+func (t *RspHook) OnRequest(data string) workspace.TaskCode {
+ t.HookRsp = instanceHook(t.Ctx, t.R, t.HttpRsp)
+ return workspace.TaskFinish
+}
+
+func instanceHook(ctx context.Context, r *http.Request, rspData interface{}) interface{} {
+ rspBody, ok := rspData.([]*models.ServiceInfo)
+ if !ok {
+ return rspData
+ }
+
+ if len(rspBody) == 0 {
+ return rspBody
+ }
+ consumerName := r.Header.Get("X-ConsumerName")
+ if consumerName == "APIGW" {
+ return rspBody
+ }
+
+ for _, v := range rspBody {
+ if apihook.APIHook != nil {
+ info := apihook.APIHook()
+ if len(info.Addresses) == 0 && len(info.Uris) == 0 {
+ return rspBody
+ }
+ v.TransportInfo.Endpoint = info
+ }
+ }
+ return rspBody
+}
+
+type SendHttpRsp struct {
+ HttpErrInf *proto.Response `json:"httpErrInf,in"`
+ workspace.TaskBase
+ W http.ResponseWriter `json:"w,in"`
+ HttpRsp interface{} `json:"httpRsp,in"`
+}
+
+func (t *SendHttpRsp) OnRequest(data string) workspace.TaskCode {
+ errInfo := t.GetSerErrInfo()
+ if errInfo.ErrCode >= int(workspace.TaskFail) {
+ statusCode, httpBody := t.cvtHttpErrInfo(errInfo)
+ meputil.HttpErrResponse(t.W, statusCode, httpBody)
+
+ return workspace.TaskFinish
+ }
+ meputil.WriteResponse(t.W, t.HttpErrInf, t.HttpRsp)
+ return workspace.TaskFinish
+}
+
+func (t *SendHttpRsp) cvtHttpErrInfo(errInfo *workspace.SerErrInfo) (int, interface{}) {
+ statusCode := http.StatusBadRequest
+ var httpBody interface{}
+ switch workspace.ErrCode(errInfo.ErrCode) {
+ case SerErrServiceNotFound:
+ {
+ //status should return bad request
+ body := &models.ProblemDetails{
+ Title: "Can not found resource",
+ Status: uint32(errInfo.ErrCode),
+ Detail: errInfo.Message,
+ }
+ httpBody = body
+ }
+ case SerInstanceNotFound:
+ {
+ statusCode = http.StatusNotFound
+ body := &models.ProblemDetails{
+ Title: "Can not found resource",
+ Status: uint32(errInfo.ErrCode),
+ Detail: errInfo.Message,
+ }
+ httpBody = body
+ }
+ }
+
+ return statusCode, httpBody
+}
+
+func mp1CvtSrvDiscover(findInsResp *proto.FindInstancesResponse) (*proto.Response, []*models.ServiceInfo) {
+ resp := findInsResp.Response
+ if resp != nil && resp.GetCode() != proto.Response_SUCCESS {
+ return resp, nil
+ }
+ serviceInfos := make([]*models.ServiceInfo, 0, len(findInsResp.Instances))
+ for _, ins := range findInsResp.Instances {
+ serviceInfo := &models.ServiceInfo{}
+ serviceInfo.FromServiceInstance(ins)
+ serviceInfos = append(serviceInfos, serviceInfo)
+ }
+ return resp, serviceInfos
+
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "context"
+ "encoding/json"
+ "net/http"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/core/backend"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+
+ "mepserver/mp1/arch/workspace"
+ "mepserver/mp1/models"
+)
+
+type GetOneSubscribe struct {
+ workspace.TaskBase
+ R *http.Request `json:"r,in"`
+ HttpErrInf *proto.Response `json:"httpErrInf,out"`
+ Ctx context.Context `json:"ctx,in"`
+ W http.ResponseWriter `json:"w,in"`
+ AppInstanceId string `json:"appInstanceId,in"`
+ SubscribeId string `json:"subscribeId,in"`
+ HttpRsp interface{} `json:"httpRsp,out"`
+}
+
+func (t *GetOneSubscribe) OnRequest(data string) workspace.TaskCode {
+
+ appInstanceId := t.AppInstanceId
+ subscribeId := t.SubscribeId
+
+ opts := []registry.PluginOp{
+ registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/" + appInstanceId + "/" + subscribeId)),
+ }
+ resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+ if err != nil {
+ log.Errorf(err, "get subscription from etcd failed")
+ t.SetFirstErrorCode(OperateDataWithEtcdErr, "get subscription from etch failed")
+ return workspace.TaskFinish
+ }
+
+ if len(resp.Kvs) == 0 {
+ log.Errorf(err, "subscription not exist")
+ t.SetFirstErrorCode(SubscriptionNotFound, "subscription not exist")
+ return workspace.TaskFinish
+ }
+ sub := &models.SerAvailabilityNotificationSubscription{}
+ jsonErr := json.Unmarshal([]byte(string(resp.Kvs[0].Value)), sub)
+ if jsonErr != nil {
+ log.Warn("subscribe can not be parsed to SerAvailabilityNotificationSubscription")
+ return workspace.TaskFinish
+ }
+ t.HttpRsp = sub
+ return workspace.TaskFinish
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "context"
+ "net/http"
+
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+
+ "mepserver/mp1/arch/workspace"
+ "mepserver/mp1/models"
+ meputil "mepserver/mp1/util"
+)
+
+type GetOneDecode struct {
+ workspace.TaskBase
+ R *http.Request `json:"r,in"`
+ Ctx context.Context `json:"ctx,out"`
+ CoreRequest interface{} `json:"coreRequest,out"`
+}
+
+func (t *GetOneDecode) OnRequest(data string) workspace.TaskCode {
+ t.Ctx, t.CoreRequest = t.getFindParam(t.R)
+ return workspace.TaskFinish
+
+}
+
+func (t *GetOneDecode) getFindParam(r *http.Request) (context.Context, *proto.GetOneInstanceRequest) {
+ query, ids := meputil.GetHTTPTags(r)
+ mp1SrvId := query.Get(":serviceId")
+ serviceId := mp1SrvId[:len(mp1SrvId)/2]
+ instanceId := mp1SrvId[len(mp1SrvId)/2:]
+ req := &proto.GetOneInstanceRequest{
+ ConsumerServiceId: r.Header.Get("X-ConsumerId"),
+ ProviderServiceId: serviceId,
+ ProviderInstanceId: instanceId,
+ Tags: ids,
+ }
+
+ ctx := util.SetTargetDomainProject(r.Context(), r.Header.Get("X-Domain-Name"), query.Get(":project"))
+ return ctx, req
+}
+
+type GetOneInstance struct {
+ workspace.TaskBase
+ HttpErrInf *proto.Response `json:"httpErrInf,out"`
+ Ctx context.Context `json:"ctx,in"`
+ CoreRequest interface{} `json:"coreRequest,in"`
+ HttpRsp interface{} `json:"httpRsp,out"`
+}
+
+func (t *GetOneInstance) OnRequest(data string) workspace.TaskCode {
+ resp, _ := core.InstanceAPI.GetOneInstance(t.Ctx, t.CoreRequest.(*proto.GetOneInstanceRequest))
+ t.HttpErrInf = resp.Response
+ resp.Response = nil
+ mp1Rsp := &models.ServiceInfo{}
+ if resp.Instance != nil {
+ mp1Rsp.FromServiceInstance(resp.Instance)
+ } else {
+ t.SetFirstErrorCode(SerInstanceNotFound, "service instance id not found")
+ return workspace.TaskFinish
+ }
+ t.HttpRsp = mp1Rsp
+
+ return workspace.TaskFinish
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "context"
+ "encoding/json"
+ "net/http"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/core/backend"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+
+ "mepserver/mp1/arch/workspace"
+ "mepserver/mp1/models"
+)
+
+type GetSubscribes struct {
+ workspace.TaskBase
+ R *http.Request `json:"r,in"`
+ HttpErrInf *proto.Response `json:"httpErrInf,out"`
+ Ctx context.Context `json:"ctx,in"`
+ W http.ResponseWriter `json:"w,in"`
+ AppInstanceId string `json:"appInstanceId,in"`
+ HttpRsp interface{} `json:"httpRsp,out"`
+}
+
+func (t *GetSubscribes) OnRequest(data string) workspace.TaskCode {
+
+ appInstanceId := t.AppInstanceId
+
+ opts := []registry.PluginOp{
+ registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/"+appInstanceId), registry.WithPrefix()),
+ }
+ resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+ if err != nil {
+ log.Errorf(err, "get subscription from etcd failed")
+ t.SetFirstErrorCode(OperateDataWithEtcdErr, "get subscription from etcd failed")
+ return workspace.TaskFinish
+ }
+
+ var values []string
+ for _, value := range resp.Kvs {
+ values = append(values, string(value.Value))
+ }
+ if len(values) == 0 {
+ log.Errorf(err, "get subscription failed, subscription not exist")
+ t.SetFirstErrorCode(SubscriptionNotFound, "get subscription failed, subscription not exist")
+ return workspace.TaskFinish
+ }
+ subs := make([]*models.SerAvailabilityNotificationSubscription, 0, len(values))
+ for _, val := range values {
+ sub := &models.SerAvailabilityNotificationSubscription{}
+ err := json.Unmarshal([]byte(val), sub)
+ if err != nil {
+ log.Warn("subscribe can not be parsed to SerAvailabilityNotificationSubscription")
+ continue
+ }
+ subs = append(subs, sub)
+ }
+ t.HttpRsp = subs
+ return workspace.TaskFinish
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "net/http"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/core"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+ svcerr "github.com/apache/servicecomb-service-center/server/error"
+
+ "mepserver/mp1/arch/workspace"
+ "mepserver/mp1/models"
+ meputil "mepserver/mp1/util"
+)
+
+type DecodeRestReq struct {
+ workspace.TaskBase
+ R *http.Request `json:"r,in"`
+ Ctx context.Context `json:"ctx,out"`
+ AppInstanceId string `json:"appInstanceId,out"`
+ SubscribeId string `json:"subscribeId,out"`
+ ServiceId string `json:"serviceId,out"`
+ RestBody interface{} `json:"restBody,out"`
+}
+
+func (t *DecodeRestReq) OnRequest(data string) workspace.TaskCode {
+ t.GetParam(t.R)
+ err := t.ParseBody(t.R)
+ if err != nil {
+ log.Error("parse rest body failed", err)
+ }
+ return workspace.TaskFinish
+}
+
+func (t *DecodeRestReq) ParseBody(r *http.Request) error {
+ if t.RestBody == nil {
+ return nil
+ }
+ msg, err := ioutil.ReadAll(r.Body)
+ if err != nil {
+ log.Error("read body failed", err)
+ t.SetFirstErrorCode(SerErrFailBase, err.Error())
+ return err
+ }
+
+ newMsg, err := t.checkParam(msg)
+ if err != nil {
+ log.Error("check Param failed", err)
+ t.SetFirstErrorCode(SerErrFailBase, err.Error())
+ return err
+ }
+
+ err = json.Unmarshal(newMsg, t.RestBody)
+ if err != nil {
+ log.Errorf(err, "invalid json: %s", util.BytesToStringWithNoCopy(newMsg))
+ t.SetFirstErrorCode(SerErrFailBase, err.Error())
+ return err
+ }
+ return nil
+
+}
+
+func (t *DecodeRestReq) checkParam(msg []byte) ([]byte, error) {
+
+ var temp map[string]interface{}
+ err := json.Unmarshal(msg, &temp)
+ if err != nil {
+ log.Errorf(err, "invalid json to map: %s", util.BytesToStringWithNoCopy(msg))
+ t.SetFirstErrorCode(SerErrFailBase, err.Error())
+ return nil, err
+ }
+
+ meputil.SetMapValue(temp, "consumedLocalOnly", true)
+ meputil.SetMapValue(temp, "isLocal", true)
+ meputil.SetMapValue(temp, "scopeOfLocality", "MEC_HOST")
+
+ msg, err = json.Marshal(&temp)
+ if err != nil {
+ log.Errorf(err, "invalid map to json")
+ t.SetFirstErrorCode(SerErrFailBase, err.Error())
+ return nil, err
+ }
+
+ return msg, nil
+}
+
+func (t *DecodeRestReq) WithBody(body interface{}) *DecodeRestReq {
+ t.RestBody = body
+ return t
+}
+
+func (t *DecodeRestReq) GetParam(r *http.Request) {
+ query, _ := meputil.GetHTTPTags(r)
+ t.AppInstanceId = query.Get(":appInstanceId")
+ t.SubscribeId = query.Get(":subscriptionId")
+ t.ServiceId = query.Get(":serviceId")
+ t.Ctx = util.SetTargetDomainProject(r.Context(), r.Header.Get("X-Domain-Name"), query.Get(":project"))
+}
+
+type RegisterServiceId struct {
+ HttpErrInf *proto.Response `json:"httpErrInf,out"`
+ workspace.TaskBase
+ Ctx context.Context `json:"ctx,in"`
+ ServiceId string `json:"serviceId,out"`
+ RestBody interface{} `json:"restBody,in"`
+}
+
+func (t *RegisterServiceId) OnRequest(data string) workspace.TaskCode {
+
+ serviceInfo, ok := t.RestBody.(*models.ServiceInfo)
+ if !ok {
+ t.SetFirstErrorCode(1, "restbody failed")
+ return workspace.TaskFinish
+ }
+ req := &proto.CreateServiceRequest{}
+ serviceInfo.ToServiceRequest(req)
+ resp, err := core.ServiceAPI.Create(t.Ctx, req)
+ if err != nil {
+ log.Errorf(err, "Service Center ServiceAPI.Create fail: %s!", err.Error())
+ t.SetFirstErrorCode(1, err.Error())
+ return workspace.TaskFinish
+ }
+
+ if resp.ServiceId == "" {
+ t.HttpErrInf = resp.Response
+ log.Warn("Service id empty.")
+ }
+ t.ServiceId = resp.ServiceId
+ return workspace.TaskFinish
+}
+
+type RegisterServiceInst struct {
+ HttpErrInf *proto.Response `json:"httpErrInf,out"`
+ workspace.TaskBase
+ W http.ResponseWriter `json:"w,in"`
+ Ctx context.Context `json:"ctx,in"`
+ AppInstanceId string `json:"appInstanceId,in"`
+ ServiceId string `json:"serviceId,in"`
+ InstanceId string `json:"instanceId,out"`
+ RestBody interface{} `json:"restBody,in"`
+ HttpRsp interface{} `json:"httpRsp,out"`
+}
+
+func (t *RegisterServiceInst) OnRequest(data string) workspace.TaskCode {
+ serviceInfo, ok := t.RestBody.(*models.ServiceInfo)
+ if !ok {
+ t.SetFirstErrorCode(1, "restbody failed")
+ return workspace.TaskFinish
+ }
+ req := &proto.RegisterInstanceRequest{}
+ serviceInfo.ToRegisterInstance(req)
+ req.Instance.ServiceId = t.ServiceId
+ req.Instance.Properties["appInstanceId"] = t.AppInstanceId
+ resp, err := core.InstanceAPI.Register(t.Ctx, req)
+ if err != nil {
+ log.Errorf(err, "RegisterInstance fail: %s", t.ServiceId)
+ t.HttpErrInf = &proto.Response{}
+ t.HttpErrInf.Code = svcerr.ErrForbidden
+ t.HttpErrInf.Message = err.Error()
+ return workspace.TaskFinish
+ }
+ t.InstanceId = resp.InstanceId
+
+ //build response serviceComb use serviceId + InstanceId to mark a service instance
+ mp1SerId := t.ServiceId + t.InstanceId
+ serviceInfo.SerInstanceId = mp1SerId
+ t.HttpRsp = serviceInfo
+
+ location := fmt.Sprintf("/mep/mp1/v1/services/%s", mp1SerId)
+ t.W.Header().Set("Location", location)
+ return workspace.TaskFinish
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "net/http"
+
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+
+ "mepserver/mp1/arch/workspace"
+ "mepserver/mp1/util"
+)
+
+type SendHttpRspCreated struct {
+ HttpErrInf *proto.Response `json:"httpErrInf,in"`
+ workspace.TaskBase
+ W http.ResponseWriter `json:"w,in"`
+ HttpRsp interface{} `json:"httpRsp,in"`
+}
+
+func (t *SendHttpRspCreated) OnRequest(data string) workspace.TaskCode {
+ util.WriteHTTPResponse(t.W, t.HttpErrInf, t.HttpRsp)
+ return workspace.TaskFinish
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net/http"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/core/backend"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+ "github.com/satori/go.uuid"
+
+ "mepserver/mp1/arch/workspace"
+ "mepserver/mp1/models"
+)
+
+type SubscribeIst struct {
+ workspace.TaskBase
+ R *http.Request `json:"r,in"`
+ HttpErrInf *proto.Response `json:"httpErrInf,out"`
+ Ctx context.Context `json:"ctx,in"`
+ W http.ResponseWriter `json:"w,in"`
+ RestBody interface{} `json:"restBody,in"`
+ AppInstanceId string `json:"appInstanceId,in"`
+ SubscribeId string `json:"subscribeId,in"`
+ HttpRsp interface{} `json:"httpRsp,out"`
+}
+
+func (t *SubscribeIst) OnRequest(data string) workspace.TaskCode {
+
+ mp1SubscribeInfo, ok := t.RestBody.(*models.SerAvailabilityNotificationSubscription)
+ if !ok {
+ t.SetFirstErrorCode(SerErrFailBase, "restBody failed")
+ return workspace.TaskFinish
+ }
+
+ appInstanceId := t.AppInstanceId
+ subscribeId := uuid.NewV4().String()
+ t.SubscribeId = subscribeId
+ subscribeJSON, err := json.Marshal(mp1SubscribeInfo)
+ if err != nil {
+ log.Errorf(err, "can not Marshal subscribe info")
+ t.SetFirstErrorCode(ParseInfoErr, "can not marshal subscribe info")
+ return workspace.TaskFinish
+ }
+ opts := []registry.PluginOp{
+ registry.OpPut(registry.WithStrKey("/cse-sr/etsi/subscribe/"+appInstanceId+"/"+subscribeId),
+ registry.WithValue(subscribeJSON)),
+ }
+ _, resultErr := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+ if resultErr != nil {
+ log.Errorf(err, "subscription to etcd failed!")
+ t.SetFirstErrorCode(OperateDataWithEtcdErr, "put subscription to etcd failed")
+ return workspace.TaskFinish
+ }
+
+ req := &proto.WatchInstanceRequest{SelfServiceId: appInstanceId[:len(appInstanceId)/2]}
+ t.R.Method = "WATCHLIST"
+ WebsocketListAndWatch(t.Ctx, req, appInstanceId)
+ t.buildResponse(mp1SubscribeInfo)
+
+ return workspace.TaskFinish
+}
+
+func (t *SubscribeIst) buildResponse(sub *models.SerAvailabilityNotificationSubscription) {
+ appInstanceID := t.AppInstanceId
+ subscribeID := t.SubscribeId
+
+ t.HttpRsp = sub
+ location := fmt.Sprintf("/mec_service_mgmt/v1/applications/%s/subscriptions/%s/", appInstanceID, subscribeID)
+ t.W.Header().Set("Location", location)
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "context"
+
+ "github.com/apache/servicecomb-service-center/pkg/util"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+ svcutil "github.com/apache/servicecomb-service-center/server/service/util"
+
+ "mepserver/mp1/arch/workspace"
+ "mepserver/mp1/models"
+ meputil "mepserver/mp1/util"
+)
+
+type UpdateInstance struct {
+ workspace.TaskBase
+ HttpErrInf *proto.Response `json:"httpErrInf,out"`
+ Ctx context.Context `json:"ctx,in"`
+ ServiceId string `json:"serviceId,in"`
+ RestBody interface{} `json:"restBody,in"`
+ HttpRsp interface{} `json:"httpRsp,out"`
+}
+
+func (t *UpdateInstance) OnRequest(data string) workspace.TaskCode {
+ if t.ServiceId == "" {
+ t.SetFirstErrorCode(SerErrFailBase, "param is empty")
+ return workspace.TaskFinish
+ }
+ mp1Ser, ok := t.RestBody.(*models.ServiceInfo)
+ if !ok {
+ t.SetFirstErrorCode(SerErrFailBase, "body invalid")
+ return workspace.TaskFinish
+ }
+
+ instance, err := meputil.GetServiceInstance(t.Ctx, t.ServiceId)
+ if err != nil {
+ t.SetFirstErrorCode(SerInstanceNotFound, "find service failed")
+ return workspace.TaskFinish
+ }
+
+ copyInstanceRef := *instance
+ req := proto.RegisterInstanceRequest{
+ Instance: ©InstanceRef,
+ }
+ mp1Ser.ToRegisterInstance(&req)
+
+ domainProject := util.ParseDomainProject(t.Ctx)
+ centerErr := svcutil.UpdateInstance(t.Ctx, domainProject, ©InstanceRef)
+ if centerErr != nil {
+ t.SetFirstErrorCode(SerErrServiceUpdFailed, "update service failed")
+ return workspace.TaskFinish
+ }
+
+ err = meputil.Heartbeat(t.Ctx, mp1Ser.SerInstanceId)
+ if err != nil {
+ t.SetFirstErrorCode(SerErrServiceUpdFailed, "heartbeat failed")
+ return workspace.TaskFinish
+ }
+ mp1Ser.SerInstanceId = instance.ServiceId + instance.InstanceId
+ t.HttpRsp = mp1Ser
+ t.HttpErrInf = proto.CreateResponse(proto.Response_SUCCESS, "Update service instance success.")
+ return workspace.TaskFinish
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "sync"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/gopool"
+ "golang.org/x/net/context"
+)
+
+type Publisher struct {
+ wss []*Websocket
+ goroutine *gopool.Pool
+ lock sync.Mutex
+}
+
+func (p *Publisher) Run() {
+ gopool.Go(publisher.loop)
+}
+
+func (p *Publisher) loop(ctx context.Context) {
+ defer p.Stop()
+ ticker := time.NewTicker(500 * time.Millisecond)
+ for {
+ select {
+ case <-ctx.Done():
+ return
+ case <-ticker.C:
+ var removes []int
+ for i, ws := range p.wss {
+ payload := ws.Pick()
+ if payload == nil {
+ continue
+ }
+ _, ok := payload.(error)
+ if ok {
+ removes = append(removes, i)
+ }
+ p.dispatch(ws, payload)
+ }
+ if len(removes) == 0 {
+ continue
+ }
+ p.lock.Lock()
+ var (
+ news []*Websocket
+ s int
+ )
+ for _, e := range removes {
+ news = append(news, p.wss[s:e]...)
+ s = e + 1
+ }
+ if s < len(p.wss) {
+ news = append(news, p.wss[s:]...)
+ }
+ p.wss = news
+ p.lock.Unlock()
+ }
+ }
+}
+
+func (p *Publisher) Stop() {
+ p.goroutine.Close(true)
+}
+
+func (p *Publisher) dispatch(ws *Websocket, payload interface{}) {
+ p.goroutine.Do(func(ctx context.Context) {
+ ws.HandleWatchWebSocketJob(payload)
+ })
+}
+
+func (p *Publisher) Accept(ws *Websocket) {
+ p.lock.Lock()
+ p.wss = append(p.wss, ws)
+ p.lock.Unlock()
+}
+
+var publisher *Publisher
+
+func init() {
+ publisher = NewPublisher()
+ publisher.Run()
+}
+
+func NewPublisher() *Publisher {
+ return &Publisher{
+ goroutine: gopool.New(context.Background()),
+ }
+}
--- /dev/null
+/*
+ * Copyright 2020 Huawei Technologies Co., Ltd.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mp1
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "strings"
+ "time"
+
+ "github.com/apache/servicecomb-service-center/pkg/log"
+ "github.com/apache/servicecomb-service-center/server/core/backend"
+ "github.com/apache/servicecomb-service-center/server/core/proto"
+ "github.com/apache/servicecomb-service-center/server/notify"
+ "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
+
+ "mepserver/mp1/models"
+)
+
+type Websocket struct {
+ watcher *notify.InstanceEventListWatcher
+ ticker *time.Ticker
+ ctx context.Context
+ needPingWatcher bool
+ free chan struct{}
+ closed chan struct{}
+ serviceID string
+}
+
+func (ws *Websocket) Init() error {
+ ws.ticker = time.NewTicker(notify.HeartbeatInterval)
+ ws.needPingWatcher = true
+ ws.free = make(chan struct{}, 1)
+ ws.closed = make(chan struct{})
+ ws.SetReady()
+ if err := notify.NotifyCenter().AddSubscriber(ws.watcher); err != nil {
+ return err
+ }
+ publisher.Accept(ws)
+ return nil
+}
+
+func (ws *Websocket) ReadTimeout() time.Duration {
+ return notify.ReadTimeout
+}
+
+func (ws *Websocket) SendTimeout() time.Duration {
+ return notify.SendTimeout
+}
+
+func (ws *Websocket) HandleWatchWebSocketControlMessage() {
+
+}
+
+func (ws *Websocket) HandleWatchWebSocketJob(payload interface{}) {
+ defer ws.SetReady()
+ var (
+ job *notify.InstanceEvent
+ )
+ switch v := payload.(type) {
+ case error:
+ err := payload.(error)
+ log.Errorf(err, "watcher catch an error, subject: %s, group: %s", ws.watcher.Subject(), ws.watcher.Group())
+ case time.Time:
+ return
+ case *notify.InstanceEvent:
+ serviceID := ws.serviceID
+ job = payload.(*notify.InstanceEvent)
+ resp := job.Response
+ SendMsgToApp(resp, serviceID)
+ default:
+ log.Errorf(nil, "watcher unknown input type %T, subject: %s, group: %s", v, ws.watcher.Subject(), ws.watcher.Group())
+ return
+ }
+
+ select {
+ case _, ok := <-ws.closed:
+ if !ok {
+ log.Warn("websocket channel closed")
+ }
+ return
+ default:
+ }
+}
+
+func (ws *Websocket) SetReady() {
+ select {
+ case ws.free <- struct{}{}:
+ default:
+ }
+
+}
+
+func (ws *Websocket) Pick() interface{} {
+ select {
+ case _, ok := <-ws.Ready():
+ if !ok {
+ log.Warn("websocket ready channel closed")
+ }
+ if ws.watcher.Err() != nil {
+ return ws.watcher.Err()
+ }
+
+ select {
+ case t, ok := <-ws.ticker.C:
+ if !ok {
+ log.Warn("websocket ticker C channel closed")
+ }
+ return t
+ case j, ok := <-ws.watcher.Job:
+ if !ok {
+ log.Warn("websocket watcher job channel closed")
+ }
+ if j == nil {
+ err := fmt.Errorf("server shutdown")
+ log.Error("server shutdown", err)
+ }
+ return j
+ default:
+ ws.SetReady()
+ }
+ default:
+ }
+ return nil
+}
+
+func (ws *Websocket) Ready() chan struct{} {
+ return ws.free
+}
+
+func (ws *Websocket) Stop() {
+ close(ws.closed)
+}
+
+func getCallBackUris(serviceID string, instanceID string, serName string) []string {
+ var callbackUris []string
+ opts := []registry.PluginOp{
+ registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/"+serviceID), registry.WithPrefix()),
+ }
+ resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
+ if err != nil {
+ log.Errorf(err, "get subcription from etcd failed!")
+ return callbackUris
+ }
+ for _, v := range resp.Kvs {
+ var notifyInfo models.SerAvailabilityNotificationSubscription
+ if v.Value == nil {
+ log.Warn("the value is nil in etcd")
+ continue
+ }
+ err = json.Unmarshal(v.Value, ¬ifyInfo)
+ if err != nil {
+ log.Warn("notify json can not be parsed to notifyInfo")
+ continue
+ }
+ callbackURI := notifyInfo.CallbackReference
+ filter := notifyInfo.FilteringCriteria
+
+ if (len(filter.SerInstanceIds) != 0 && StringContains(filter.SerInstanceIds, instanceID) != -1) ||
+ (len(filter.SerNames) != 0 && StringContains(filter.SerNames, serviceID) != -1) {
+ callbackUris = append(callbackUris, callbackURI)
+ }
+ }
+ log.Infof("send to consumerIds: %s", callbackUris)
+ return callbackUris
+}
+
+func StringContains(arr []string, val string) (index int) {
+ index = -1
+ for i := 0; i < len(arr); i++ {
+ if arr[i] == val {
+ index = i
+ return
+ }
+ }
+ return
+}
+
+func SendMsgToApp(data *proto.WatchInstanceResponse, serviceID string) {
+ // transfer data to instanceInfo, and get instaceid, serviceName
+ instanceID := data.Instance.ServiceId + data.Instance.InstanceId
+ serName := data.Instance.Properties["serName"]
+ action := data.Action
+ instanceInfo := data.Instance
+ instanceStr, err := json.Marshal(instanceInfo)
+ if err != nil {
+ log.Errorf(err, "parse instanceInfo failed!")
+ return
+ }
+
+ callbackUris := getCallBackUris(serviceID, instanceID, serName)
+ body := strings.NewReader(string(instanceStr))
+ doSend(action, body, callbackUris)
+}
+
+func doSend(action string, body io.Reader, callbackUris []string) {
+ for _, callbackURI := range callbackUris {
+ log.Debugf("action: %s with callbackURI:%s", action, callbackURI)
+ if !strings.HasPrefix(callbackURI, "http") {
+ callbackURI = "http://" + callbackURI
+ }
+ client := http.Client{}
+
+ if action == "CREATE" {
+ contentType := "application/x-www-form-urlencoded"
+ _, err := http.Post(callbackURI, contentType, body)
+ if err != nil {
+ log.Warn("the consumer handle post action failed!")
+ }
+ } else if action == "DELETE" {
+ req, err := http.NewRequest("delete", callbackURI, body)
+ if err != nil {
+ _, err := client.Do(req)
+ if err != nil {
+ log.Warn("the consumer handle delete action failed!")
+ }
+
+ } else {
+ log.Errorf(err, "crate request failed!")
+ }
+ } else if action == "UPDATE" {
+ req, err := http.NewRequest("put", callbackURI, body)
+ if err != nil {
+ _, err := client.Do(req)
+ if err != nil {
+ log.Warn("the consumer handle update action failed!")
+ }
+
+ } else {
+ log.Errorf(err, "crate request failed!")
+ }
+ }
+ }
+}
+
+func DoWebSocketListAndWatchV2(ctx context.Context, id string, id2 string, f func() ([]*proto.WatchInstanceResponse, int64)) {
+ //TBD
+}