From: Swarup Nayak Date: Tue, 12 May 2020 15:48:14 +0000 (+0530) Subject: Intial commit for MEP server X-Git-Url: https://gerrit.akraino.org/r/gitweb?a=commitdiff_plain;h=f60c4a6d3d28aa32d7638a89ec2c3384b4466cb6;p=ealt-edge.git Intial commit for MEP server Change-Id: I93dbbbc77f158e2897db98469097c6918b8b336b --- diff --git a/mep/mepserver/mp1/controller.go b/mep/mepserver/mp1/controller.go new file mode 100644 index 0000000..95308ab --- /dev/null +++ b/mep/mepserver/mp1/controller.go @@ -0,0 +1,239 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "context" + "net/http" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/rest" + "github.com/apache/servicecomb-service-center/pkg/util" + "github.com/apache/servicecomb-service-center/server/core/proto" + "github.com/apache/servicecomb-service-center/server/notify" + v4 "github.com/apache/servicecomb-service-center/server/rest/controller/v4" + svcutil "github.com/apache/servicecomb-service-center/server/service/util" + + "mepserver/mp1/arch/workspace" + "mepserver/mp1/models" +) + +const ( + basePath = "/mep/mec_service_mgmt/v1" + servicesPath = basePath + "/services" + appServicesPath = basePath + "/applications/:appInstanceId" + "/services" + appSubscribePath = basePath + "/applications/:appInstanceId/subscriptions" +) + +const ( + SerErrFailBase workspace.ErrCode = workspace.TaskFail + SerErrServiceNotFound = 2 + SerInstanceNotFound = 3 + ParseInfoErr = 4 + SubscriptionNotFound = 5 + OperateDataWithEtcdErr = 6 + SerErrServiceDelFailed = 7 + SerErrServiceUpdFailed = 8 +) + +type APIHookFunc func() models.EndPointInfo + +type APIGwHook struct { + APIHook APIHookFunc +} + +var apihook APIGwHook + +func SetAPIHook(hook APIGwHook) { + apihook = hook +} + +func GetApiHook() APIGwHook { + return apihook +} + +func init() { + initRouter() +} + +func initRouter() { + rest. + RegisterServant(&Mp1Service{}) +} + +type Mp1Service struct { + v4.MicroServiceService +} + +func (m *Mp1Service) URLPatterns() []rest.Route { + return []rest.Route{ + // appSubscriptions + {Method: rest.HTTP_METHOD_POST, Path: appSubscribePath, Func: doAppSubscribe}, + {Method: rest.HTTP_METHOD_GET, Path: appSubscribePath, Func: getAppSubscribes}, + {Method: rest.HTTP_METHOD_GET, Path: appSubscribePath + "/:subscriptionId", Func: getOneAppSubscribe}, + {Method: rest.HTTP_METHOD_DELETE, Path: appSubscribePath + "/:subscriptionId", Func: delOneAppSubscribe}, + // appServices + {Method: rest.HTTP_METHOD_POST, Path: appServicesPath, Func: serviceRegister}, + {Method: rest.HTTP_METHOD_GET, Path: appServicesPath, Func: serviceDiscover}, + {Method: rest.HTTP_METHOD_PUT, Path: appServicesPath + "/:serviceId", Func: serviceUpdate}, + {Method: rest.HTTP_METHOD_GET, Path: appServicesPath + "/:serviceId", Func: getOneService}, + {Method: rest.HTTP_METHOD_DELETE, Path: appServicesPath + "/:serviceId", Func: serviceDelete}, + // services + {Method: rest.HTTP_METHOD_GET, Path: servicesPath, Func: serviceDiscover}, + {Method: rest.HTTP_METHOD_GET, Path: servicesPath + "/:serviceId", Func: getOneService}, + } +} + +func doAppSubscribe(w http.ResponseWriter, r *http.Request) { + + workPlan := NewWorkSpace(w, r) + workPlan.Try( + (&DecodeRestReq{}).WithBody(&models.SerAvailabilityNotificationSubscription{}), + &SubscribeIst{}) + workPlan.Finally(&SendHttpRspCreated{}) + + workspace.WkRun(workPlan) +} + +func getAppSubscribes(w http.ResponseWriter, r *http.Request) { + + workPlan := NewWorkSpace(w, r) + workPlan.Try( + &DecodeRestReq{}, + &GetSubscribes{}) + workPlan.Finally(&SendHttpRsp{}) + + workspace.WkRun(workPlan) +} + +func getOneAppSubscribe(w http.ResponseWriter, r *http.Request) { + + workPlan := NewWorkSpace(w, r) + workPlan.Try( + &DecodeRestReq{}, + &GetOneSubscribe{}) + workPlan.Finally(&SendHttpRsp{}) + + workspace.WkRun(workPlan) +} + +func delOneAppSubscribe(w http.ResponseWriter, r *http.Request) { + + workPlan := NewWorkSpace(w, r) + workPlan.Try( + &DecodeRestReq{}, + &DelOneSubscribe{}) + workPlan.Finally(&SendHttpRsp{}) + + workspace.WkRun(workPlan) +} + +func serviceRegister(w http.ResponseWriter, r *http.Request) { + log.Info("Register service start...") + + workPlan := NewWorkSpace(w, r) + workPlan.Try( + (&DecodeRestReq{}).WithBody(&models.ServiceInfo{}), + &RegisterServiceId{}, + &RegisterServiceInst{}) + workPlan.Finally(&SendHttpRspCreated{}) + + workspace.WkRun(workPlan) +} + +func serviceDiscover(w http.ResponseWriter, r *http.Request) { + log.Info("Discover service service start...") + + workPlan := NewWorkSpace(w, r) + workPlan.Try( + &DiscoverDecode{}, + &DiscoverService{}, + &ToStrDiscover{}, + &RspHook{}) + workPlan.Finally(&SendHttpRsp{}) + + workspace.WkRun(workPlan) +} + +func serviceUpdate(w http.ResponseWriter, r *http.Request) { + log.Info("Update a service start...") + + workPlan := NewWorkSpace(w, r) + workPlan.Try( + (&DecodeRestReq{}).WithBody(&models.ServiceInfo{}), + &UpdateInstance{}) + workPlan.Finally(&SendHttpRsp{}) + + workspace.WkRun(workPlan) +} + +func getOneService(w http.ResponseWriter, r *http.Request) { + log.Info("Register service start...") + + workPlan := NewWorkSpace(w, r) + workPlan.Try( + &GetOneDecode{}, + &GetOneInstance{}) + workPlan.Finally(&SendHttpRsp{}) + + workspace.WkRun(workPlan) + +} + +func serviceDelete(w http.ResponseWriter, r *http.Request) { + log.Info("Delete a service start...") + + workPlan := NewWorkSpace(w, r) + workPlan.Try( + &DecodeRestReq{}, + &DeleteService{}) + workPlan.Finally(&SendHttpRsp{}) + + workspace.WkRun(workPlan) +} + +func WebsocketListAndWatch(ctx context.Context, req *proto.WatchInstanceRequest, consumerSvcId string) { + if req == nil || len(req.SelfServiceId) == 0 { + log.Warn("request fomat invalid!") + return + } + domainProject := util.ParseDomainProject(ctx) + if !svcutil.ServiceExist(ctx, domainProject, req.SelfServiceId) { + log.Warn("service does not exist!") + return + } + DoWebsocketListAndWatch(ctx, req.SelfServiceId, consumerSvcId, func() ([]*proto.WatchInstanceResponse, int64) { + return svcutil.QueryAllProvidersInstances(ctx, req.SelfServiceId) + }) +} + +func DoWebsocketListAndWatch(ctx context.Context, serviceId string, consumerSvcId string, f func() ([]*proto.WatchInstanceResponse, int64)) { + domainProject := util.ParseDomainProject(ctx) + socket := &Websocket{ + ctx: ctx, + watcher: notify.NewInstanceEventListWatcher(serviceId, domainProject, f), + serviceID: consumerSvcId, + } + ProcessSocket(socket) +} + +func ProcessSocket(socket *Websocket) { + if err := socket.Init(); err != nil { + return + } + socket.HandleWatchWebSocketControlMessage() +} diff --git a/mep/mepserver/mp1/instance_event_handler.go b/mep/mepserver/mp1/instance_event_handler.go new file mode 100644 index 0000000..34ab04c --- /dev/null +++ b/mep/mepserver/mp1/instance_event_handler.go @@ -0,0 +1,150 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "encoding/json" + "strings" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/util" + apt "github.com/apache/servicecomb-service-center/server/core" + "github.com/apache/servicecomb-service-center/server/core/backend" + "github.com/apache/servicecomb-service-center/server/core/proto" + "github.com/apache/servicecomb-service-center/server/notify" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" + "github.com/apache/servicecomb-service-center/server/service/cache" + "github.com/apache/servicecomb-service-center/server/service/metrics" + svcutil "github.com/apache/servicecomb-service-center/server/service/util" + "golang.org/x/net/context" + + "mepserver/mp1/models" +) + +const indexStart, indexEnd = 33, 17 + +type InstanceEtsiEventHandler struct { +} + +func (h *InstanceEtsiEventHandler) Type() discovery.Type { + return backend.INSTANCE +} + +func (h *InstanceEtsiEventHandler) OnEvent(evt discovery.KvEvent) { + action := evt.Type + instance, ok := evt.KV.Value.(*proto.MicroServiceInstance) + if !ok { + log.Warn("cast to instance failed") + } + providerId, providerInstanceId, domainProject := apt.GetInfoFromInstKV(evt.KV.Key) + idx := strings.Index(domainProject, "/") + domainName := domainProject[:idx] + switch action { + case proto.EVT_INIT: + metrics.ReportInstances(domainName, 1) + return + case proto.EVT_CREATE: + metrics.ReportInstances(domainName, 1) + case proto.EVT_DELETE: + metrics.ReportInstances(domainName, -1) + if !apt.IsDefaultDomainProject(domainProject) { + projectName := domainProject[idx+1:] + svcutil.RemandInstanceQuota(util.SetDomainProject(context.Background(), domainName, projectName)) + } + } + + if notify.NotifyCenter().Closed() { + log.Warnf("caught [%s] instance [%s/%s] event, endpoints %v, but notify service is closed", + action, providerId, providerInstanceId, instance.Endpoints) + return + } + + ctx := context.WithValue(context.WithValue(context.Background(), + svcutil.CTX_CACHEONLY, "1"), + svcutil.CTX_GLOBAL, "1") + ms, err := svcutil.GetService(ctx, domainProject, providerId) + if ms == nil { + log.Errorf(err, "caught [%s] instance [%s/%s] event, endpoints %v, get cached provider's file failed", + action, providerId, providerInstanceId, instance.Endpoints) + return + } + + log.Infof("caught [%s] service[%s][%s/%s/%s/%s] isntance[%s] event, endpoints %v", + action, providerId, ms.Environment, ms.AppId, ms.ServiceName, ms.Version, providerInstanceId, instance.Endpoints) + + consumerIds := getCosumerIds() + + log.Infof("there are %d consuemrIDs, %s", len(consumerIds), consumerIds) + PublishInstanceEvent(evt, domainProject, proto.MicroServiceToKey(domainProject, ms), consumerIds) +} + +func getCosumerIds() []string { + var consumerIds []string + opts := []registry.PluginOp{ + registry.OpGet(registry.WithStrKey("/cse-sr/inst/files"), registry.WithPrefix()), + } + resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil) + if err != nil { + log.Errorf(err, "get subscription from etcd failed") + return consumerIds + } + + for _, kvs := range resp.Kvs { + key := kvs.Key + keystring := string(key) + value := kvs.Value + + var mp1Req models.ServiceInfo + err = json.Unmarshal(value, &mp1Req) + if err != nil { + log.Errorf(err, "parse serviceInfo failed") + } + length := len(keystring) + keystring = keystring[length-indexStart : length-indexEnd] + if StringContains(consumerIds, keystring) == -1 { + consumerIds = append(consumerIds, keystring) + } + } + return consumerIds +} + +func NewInstanceEtsiEventHandler() *InstanceEtsiEventHandler { + return &InstanceEtsiEventHandler{} +} + +func PublishInstanceEvent(evt discovery.KvEvent, domainProject string, serviceKey *proto.MicroServiceKey, subscribers []string) { + defer cache.FindInstances.Remove(serviceKey) + if len(subscribers) == 0 { + log.Warn("the subscribers size is 0") + return + } + + response := &proto.WatchInstanceResponse{ + Response: proto.CreateResponse(proto.Response_SUCCESS, "Watch instance successfully."), + Action: string(evt.Type), + Key: serviceKey, + Instance: evt.KV.Value.(*proto.MicroServiceInstance), + } + for _, consumerId := range subscribers { + job := notify.NewInstanceEventWithTime(consumerId, domainProject, evt.Revision, evt.CreateAt, response) + err := notify.NotifyCenter().Publish(job) + if err != nil { + log.Errorf(err, "publish failed") + } + } +} diff --git a/mep/mepserver/mp1/mepspace.go b/mep/mepserver/mp1/mepspace.go new file mode 100644 index 0000000..2d8a1d2 --- /dev/null +++ b/mep/mepserver/mp1/mepspace.go @@ -0,0 +1,57 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package mp1 + +import ( + "net/http" + "net/url" + + "github.com/apache/servicecomb-service-center/server/core/proto" + "golang.org/x/net/context" + + "mepserver/mp1/arch/workspace" +) + +type MepSpace struct { + workspace.SpaceBase + R *http.Request `json:"r"` + W http.ResponseWriter `json:"w"` + + Ctx context.Context `json:"ctx"` + ServiceId string `json:"serviceId"` + RestBody interface{} `json:"restBody"` + AppInstanceId string `json:"appInstanceId"` + InstanceId string `json:"instanceId"` + SubscribeId string `json:"subscribeId"` + QueryParam url.Values `json:"queryParam"` + + CoreRequest interface{} `json:"coreRequest"` + CoreRsp interface{} `json:"coreRsp"` + HttPErrInf *proto.Response `json:"httpErrInf"` + HttPRsp interface{} `json:"httpRsp"` +} + +func NewWorkSpace(w http.ResponseWriter, r *http.Request) *MepSpace { + var plan = MepSpace{ + W: w, + R: r, + } + + plan.Init() + return &plan +} diff --git a/mep/mepserver/mp1/mp1event.go b/mep/mepserver/mp1/mp1event.go new file mode 100644 index 0000000..a6c08d5 --- /dev/null +++ b/mep/mepserver/mp1/mp1event.go @@ -0,0 +1,37 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "github.com/apache/servicecomb-service-center/pkg/log" + mgr "github.com/apache/servicecomb-service-center/server/plugin" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/quota" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/quota/buildin" +) + +func init() { + mgr.RegisterPlugin(mgr.Plugin{PName: mgr.QUOTA, Name: "buildin", New: New}) + discovery.AddEventHandler(NewInstanceEtsiEventHandler()) +} + +func New() mgr.PluginInstance { + buildin.InitConfigs() + log.Infof("quota init, service: %d, instance: %d, schema: %d/service, tag: %d/service, rule: %d/service", + quota.DefaultServiceQuota, quota.DefaultInstanceQuota, quota.DefaultSchemaQuota, quota.DefaultTagQuota, quota.DefaultRuleQuota) + return &buildin.BuildInQuota{} +} diff --git a/mep/mepserver/mp1/plan_del_one_subscribe.go b/mep/mepserver/mp1/plan_del_one_subscribe.go new file mode 100644 index 0000000..efea6f2 --- /dev/null +++ b/mep/mepserver/mp1/plan_del_one_subscribe.go @@ -0,0 +1,75 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "context" + "net/http" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/server/core/backend" + "github.com/apache/servicecomb-service-center/server/core/proto" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" + + "mepserver/mp1/arch/workspace" +) + +type DelOneSubscribe struct { + workspace.TaskBase + R *http.Request `json:"r,in"` + HttpErrInf *proto.Response `json:"httpErrInf,out"` + Ctx context.Context `json:"ctx,in"` + W http.ResponseWriter `json:"w,in"` + AppInstanceId string `json:"appInstanceId,in"` + SubscribeId string `json:"subscribeId,in"` + HttpRsp interface{} `json:"httpRsp,out"` +} + +func (t *DelOneSubscribe) OnRequest(data string) workspace.TaskCode { + + appInstanceId := t.AppInstanceId + subscribeId := t.SubscribeId + + opts := []registry.PluginOp{ + registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/" + appInstanceId + "/" + subscribeId)), + } + resp, errGet := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil) + if errGet != nil { + log.Errorf(errGet, "get subscription from etcd failed") + t.SetFirstErrorCode(OperateDataWithEtcdErr, "get subscription from etch failed") + return workspace.TaskFinish + } + + if len(resp.Kvs) == 0 { + log.Errorf(errGet, "subscription not exist") + t.SetFirstErrorCode(SubscriptionNotFound, "subscription not exist") + return workspace.TaskFinish + } + + opts = []registry.PluginOp{ + registry.OpDel(registry.WithStrKey("/cse-sr/etsi/subscribe/" + appInstanceId + "/" + subscribeId)), + } + _, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil) + if err != nil { + log.Errorf(err, "delete subscription from etcd failed") + t.SetFirstErrorCode(OperateDataWithEtcdErr, "delete subscription from etch failed") + return workspace.TaskFinish + } + + t.HttpRsp = "" + return workspace.TaskFinish +} diff --git a/mep/mepserver/mp1/plan_delete_svc.go b/mep/mepserver/mp1/plan_delete_svc.go new file mode 100644 index 0000000..f0f1ece --- /dev/null +++ b/mep/mepserver/mp1/plan_delete_svc.go @@ -0,0 +1,54 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "context" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/server/core" + "github.com/apache/servicecomb-service-center/server/core/proto" + + "mepserver/mp1/arch/workspace" +) + +type DeleteService struct { + HttpErrInf *proto.Response `json:"httpErrInf,out"` + workspace.TaskBase + Ctx context.Context `json:"ctx,in"` + ServiceId string `json:"serviceId,in"` +} + +func (t *DeleteService) OnRequest(data string) workspace.TaskCode { + if t.ServiceId == "" { + t.SetFirstErrorCode(SerErrServiceDelFailed, "param is empty") + return workspace.TaskFinish + } + serviceID := t.ServiceId[:len(t.ServiceId)/2] + instanceID := t.ServiceId[len(t.ServiceId)/2:] + req := &proto.UnregisterInstanceRequest{ + ServiceId: serviceID, + InstanceId: instanceID, + } + resp, err := core.InstanceAPI.Unregister(t.Ctx, req) + if err != nil { + log.Errorf(err, "Service delete failed!") + } + t.HttpErrInf = resp.Response + + return workspace.TaskFinish +} diff --git a/mep/mepserver/mp1/plan_discover_svc.go b/mep/mepserver/mp1/plan_discover_svc.go new file mode 100644 index 0000000..6ba4a69 --- /dev/null +++ b/mep/mepserver/mp1/plan_discover_svc.go @@ -0,0 +1,212 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "context" + "net/http" + "net/url" + + "github.com/apache/servicecomb-service-center/server/core" + "github.com/apache/servicecomb-service-center/server/core/proto" + + "mepserver/mp1/arch/workspace" + "mepserver/mp1/models" + meputil "mepserver/mp1/util" +) + +type DiscoverDecode struct { + workspace.TaskBase + R *http.Request `json:"r,in"` + Ctx context.Context `json:"ctx,out"` + QueryParam url.Values `json:"queryParam,out"` + CoreRequest interface{} `json:"coreRequest,out"` +} + +func (t *DiscoverDecode) OnRequest(data string) workspace.TaskCode { + t.Ctx, t.CoreRequest, t.QueryParam = meputil.GetFindParam(t.R) + return workspace.TaskFinish +} + +type DiscoverService struct { + workspace.TaskBase + Ctx context.Context `json:"ctx,in"` + QueryParam url.Values `json:"queryParam,in"` + CoreRequest interface{} `json:"coreRequest,in"` + CoreRsp interface{} `json:"coreRsp,out"` +} + +func (t *DiscoverService) checkInstanceId(req *proto.FindInstancesRequest) bool { + instanceId := req.AppId + if instanceId != "default" { + instances := t.CoreRsp.(*proto.FindInstancesResponse).Instances + for _, val := range instances { + if val.ServiceId+val.InstanceId == instanceId { + return true + } + } + return false + } + return true +} + +func (t *DiscoverService) OnRequest(data string) workspace.TaskCode { + req, ok := t.CoreRequest.(*proto.FindInstancesRequest) + if !ok { + t.SetFirstErrorCode(SerErrServiceNotFound, "cast to request fail") + return workspace.TaskFinish + } + if req.ServiceName == "" { + var errFindByKey error + t.CoreRsp, errFindByKey = meputil.FindInstanceByKey(t.QueryParam) + if errFindByKey != nil || t.CoreRsp == nil { + t.SetFirstErrorCode(SerErrServiceNotFound, errFindByKey.Error()) + return workspace.TaskFinish + } + if !t.checkInstanceId(req) { + t.SetFirstErrorCode(SerErrServiceNotFound, "instance id not found") + } + return workspace.TaskFinish + } + + findInstance, err := core.InstanceAPI.Find(t.Ctx, req) + if err != nil { + t.SetFirstErrorCode(SerErrServiceNotFound, err.Error()) + return workspace.TaskFinish + } + if findInstance == nil || len(findInstance.Instances) == 0 { + t.SetFirstErrorCode(SerErrServiceNotFound, "service not found") + return workspace.TaskFinish + } + + t.CoreRsp = findInstance + return workspace.TaskFinish +} + +type ToStrDiscover struct { + HttpErrInf *proto.Response `json:"httpErrInf,out"` + workspace.TaskBase + CoreRsp interface{} `json:"coreRsp,in"` + HttpRsp interface{} `json:"httpRsp,out"` +} + +func (t *ToStrDiscover) OnRequest(data string) workspace.TaskCode { + t.HttpErrInf, t.HttpRsp = mp1CvtSrvDiscover(t.CoreRsp.(*proto.FindInstancesResponse)) + return workspace.TaskFinish +} + +type RspHook struct { + R *http.Request `json:"r,in"` + workspace.TaskBase + Ctx context.Context `json:"ctx,in"` + HttpRsp interface{} `json:"httpRsp,in"` + HookRsp interface{} `json:"hookRsp,out"` +} + +func (t *RspHook) OnRequest(data string) workspace.TaskCode { + t.HookRsp = instanceHook(t.Ctx, t.R, t.HttpRsp) + return workspace.TaskFinish +} + +func instanceHook(ctx context.Context, r *http.Request, rspData interface{}) interface{} { + rspBody, ok := rspData.([]*models.ServiceInfo) + if !ok { + return rspData + } + + if len(rspBody) == 0 { + return rspBody + } + consumerName := r.Header.Get("X-ConsumerName") + if consumerName == "APIGW" { + return rspBody + } + + for _, v := range rspBody { + if apihook.APIHook != nil { + info := apihook.APIHook() + if len(info.Addresses) == 0 && len(info.Uris) == 0 { + return rspBody + } + v.TransportInfo.Endpoint = info + } + } + return rspBody +} + +type SendHttpRsp struct { + HttpErrInf *proto.Response `json:"httpErrInf,in"` + workspace.TaskBase + W http.ResponseWriter `json:"w,in"` + HttpRsp interface{} `json:"httpRsp,in"` +} + +func (t *SendHttpRsp) OnRequest(data string) workspace.TaskCode { + errInfo := t.GetSerErrInfo() + if errInfo.ErrCode >= int(workspace.TaskFail) { + statusCode, httpBody := t.cvtHttpErrInfo(errInfo) + meputil.HttpErrResponse(t.W, statusCode, httpBody) + + return workspace.TaskFinish + } + meputil.WriteResponse(t.W, t.HttpErrInf, t.HttpRsp) + return workspace.TaskFinish +} + +func (t *SendHttpRsp) cvtHttpErrInfo(errInfo *workspace.SerErrInfo) (int, interface{}) { + statusCode := http.StatusBadRequest + var httpBody interface{} + switch workspace.ErrCode(errInfo.ErrCode) { + case SerErrServiceNotFound: + { + //status should return bad request + body := &models.ProblemDetails{ + Title: "Can not found resource", + Status: uint32(errInfo.ErrCode), + Detail: errInfo.Message, + } + httpBody = body + } + case SerInstanceNotFound: + { + statusCode = http.StatusNotFound + body := &models.ProblemDetails{ + Title: "Can not found resource", + Status: uint32(errInfo.ErrCode), + Detail: errInfo.Message, + } + httpBody = body + } + } + + return statusCode, httpBody +} + +func mp1CvtSrvDiscover(findInsResp *proto.FindInstancesResponse) (*proto.Response, []*models.ServiceInfo) { + resp := findInsResp.Response + if resp != nil && resp.GetCode() != proto.Response_SUCCESS { + return resp, nil + } + serviceInfos := make([]*models.ServiceInfo, 0, len(findInsResp.Instances)) + for _, ins := range findInsResp.Instances { + serviceInfo := &models.ServiceInfo{} + serviceInfo.FromServiceInstance(ins) + serviceInfos = append(serviceInfos, serviceInfo) + } + return resp, serviceInfos + +} diff --git a/mep/mepserver/mp1/plan_get_one_subscribe.go b/mep/mepserver/mp1/plan_get_one_subscribe.go new file mode 100644 index 0000000..01e5d22 --- /dev/null +++ b/mep/mepserver/mp1/plan_get_one_subscribe.go @@ -0,0 +1,72 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/server/core/backend" + "github.com/apache/servicecomb-service-center/server/core/proto" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" + + "mepserver/mp1/arch/workspace" + "mepserver/mp1/models" +) + +type GetOneSubscribe struct { + workspace.TaskBase + R *http.Request `json:"r,in"` + HttpErrInf *proto.Response `json:"httpErrInf,out"` + Ctx context.Context `json:"ctx,in"` + W http.ResponseWriter `json:"w,in"` + AppInstanceId string `json:"appInstanceId,in"` + SubscribeId string `json:"subscribeId,in"` + HttpRsp interface{} `json:"httpRsp,out"` +} + +func (t *GetOneSubscribe) OnRequest(data string) workspace.TaskCode { + + appInstanceId := t.AppInstanceId + subscribeId := t.SubscribeId + + opts := []registry.PluginOp{ + registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/" + appInstanceId + "/" + subscribeId)), + } + resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil) + if err != nil { + log.Errorf(err, "get subscription from etcd failed") + t.SetFirstErrorCode(OperateDataWithEtcdErr, "get subscription from etch failed") + return workspace.TaskFinish + } + + if len(resp.Kvs) == 0 { + log.Errorf(err, "subscription not exist") + t.SetFirstErrorCode(SubscriptionNotFound, "subscription not exist") + return workspace.TaskFinish + } + sub := &models.SerAvailabilityNotificationSubscription{} + jsonErr := json.Unmarshal([]byte(string(resp.Kvs[0].Value)), sub) + if jsonErr != nil { + log.Warn("subscribe can not be parsed to SerAvailabilityNotificationSubscription") + return workspace.TaskFinish + } + t.HttpRsp = sub + return workspace.TaskFinish +} diff --git a/mep/mepserver/mp1/plan_get_one_svc.go b/mep/mepserver/mp1/plan_get_one_svc.go new file mode 100644 index 0000000..d2a7309 --- /dev/null +++ b/mep/mepserver/mp1/plan_get_one_svc.go @@ -0,0 +1,83 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "context" + "net/http" + + "github.com/apache/servicecomb-service-center/pkg/util" + "github.com/apache/servicecomb-service-center/server/core" + "github.com/apache/servicecomb-service-center/server/core/proto" + + "mepserver/mp1/arch/workspace" + "mepserver/mp1/models" + meputil "mepserver/mp1/util" +) + +type GetOneDecode struct { + workspace.TaskBase + R *http.Request `json:"r,in"` + Ctx context.Context `json:"ctx,out"` + CoreRequest interface{} `json:"coreRequest,out"` +} + +func (t *GetOneDecode) OnRequest(data string) workspace.TaskCode { + t.Ctx, t.CoreRequest = t.getFindParam(t.R) + return workspace.TaskFinish + +} + +func (t *GetOneDecode) getFindParam(r *http.Request) (context.Context, *proto.GetOneInstanceRequest) { + query, ids := meputil.GetHTTPTags(r) + mp1SrvId := query.Get(":serviceId") + serviceId := mp1SrvId[:len(mp1SrvId)/2] + instanceId := mp1SrvId[len(mp1SrvId)/2:] + req := &proto.GetOneInstanceRequest{ + ConsumerServiceId: r.Header.Get("X-ConsumerId"), + ProviderServiceId: serviceId, + ProviderInstanceId: instanceId, + Tags: ids, + } + + ctx := util.SetTargetDomainProject(r.Context(), r.Header.Get("X-Domain-Name"), query.Get(":project")) + return ctx, req +} + +type GetOneInstance struct { + workspace.TaskBase + HttpErrInf *proto.Response `json:"httpErrInf,out"` + Ctx context.Context `json:"ctx,in"` + CoreRequest interface{} `json:"coreRequest,in"` + HttpRsp interface{} `json:"httpRsp,out"` +} + +func (t *GetOneInstance) OnRequest(data string) workspace.TaskCode { + resp, _ := core.InstanceAPI.GetOneInstance(t.Ctx, t.CoreRequest.(*proto.GetOneInstanceRequest)) + t.HttpErrInf = resp.Response + resp.Response = nil + mp1Rsp := &models.ServiceInfo{} + if resp.Instance != nil { + mp1Rsp.FromServiceInstance(resp.Instance) + } else { + t.SetFirstErrorCode(SerInstanceNotFound, "service instance id not found") + return workspace.TaskFinish + } + t.HttpRsp = mp1Rsp + + return workspace.TaskFinish +} diff --git a/mep/mepserver/mp1/plan_get_subsribes.go b/mep/mepserver/mp1/plan_get_subsribes.go new file mode 100644 index 0000000..cf34770 --- /dev/null +++ b/mep/mepserver/mp1/plan_get_subsribes.go @@ -0,0 +1,78 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "context" + "encoding/json" + "net/http" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/server/core/backend" + "github.com/apache/servicecomb-service-center/server/core/proto" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" + + "mepserver/mp1/arch/workspace" + "mepserver/mp1/models" +) + +type GetSubscribes struct { + workspace.TaskBase + R *http.Request `json:"r,in"` + HttpErrInf *proto.Response `json:"httpErrInf,out"` + Ctx context.Context `json:"ctx,in"` + W http.ResponseWriter `json:"w,in"` + AppInstanceId string `json:"appInstanceId,in"` + HttpRsp interface{} `json:"httpRsp,out"` +} + +func (t *GetSubscribes) OnRequest(data string) workspace.TaskCode { + + appInstanceId := t.AppInstanceId + + opts := []registry.PluginOp{ + registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/"+appInstanceId), registry.WithPrefix()), + } + resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil) + if err != nil { + log.Errorf(err, "get subscription from etcd failed") + t.SetFirstErrorCode(OperateDataWithEtcdErr, "get subscription from etcd failed") + return workspace.TaskFinish + } + + var values []string + for _, value := range resp.Kvs { + values = append(values, string(value.Value)) + } + if len(values) == 0 { + log.Errorf(err, "get subscription failed, subscription not exist") + t.SetFirstErrorCode(SubscriptionNotFound, "get subscription failed, subscription not exist") + return workspace.TaskFinish + } + subs := make([]*models.SerAvailabilityNotificationSubscription, 0, len(values)) + for _, val := range values { + sub := &models.SerAvailabilityNotificationSubscription{} + err := json.Unmarshal([]byte(val), sub) + if err != nil { + log.Warn("subscribe can not be parsed to SerAvailabilityNotificationSubscription") + continue + } + subs = append(subs, sub) + } + t.HttpRsp = subs + return workspace.TaskFinish +} diff --git a/mep/mepserver/mp1/plan_register_svc.go b/mep/mepserver/mp1/plan_register_svc.go new file mode 100644 index 0000000..2a65de8 --- /dev/null +++ b/mep/mepserver/mp1/plan_register_svc.go @@ -0,0 +1,193 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "context" + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/pkg/util" + "github.com/apache/servicecomb-service-center/server/core" + "github.com/apache/servicecomb-service-center/server/core/proto" + svcerr "github.com/apache/servicecomb-service-center/server/error" + + "mepserver/mp1/arch/workspace" + "mepserver/mp1/models" + meputil "mepserver/mp1/util" +) + +type DecodeRestReq struct { + workspace.TaskBase + R *http.Request `json:"r,in"` + Ctx context.Context `json:"ctx,out"` + AppInstanceId string `json:"appInstanceId,out"` + SubscribeId string `json:"subscribeId,out"` + ServiceId string `json:"serviceId,out"` + RestBody interface{} `json:"restBody,out"` +} + +func (t *DecodeRestReq) OnRequest(data string) workspace.TaskCode { + t.GetParam(t.R) + err := t.ParseBody(t.R) + if err != nil { + log.Error("parse rest body failed", err) + } + return workspace.TaskFinish +} + +func (t *DecodeRestReq) ParseBody(r *http.Request) error { + if t.RestBody == nil { + return nil + } + msg, err := ioutil.ReadAll(r.Body) + if err != nil { + log.Error("read body failed", err) + t.SetFirstErrorCode(SerErrFailBase, err.Error()) + return err + } + + newMsg, err := t.checkParam(msg) + if err != nil { + log.Error("check Param failed", err) + t.SetFirstErrorCode(SerErrFailBase, err.Error()) + return err + } + + err = json.Unmarshal(newMsg, t.RestBody) + if err != nil { + log.Errorf(err, "invalid json: %s", util.BytesToStringWithNoCopy(newMsg)) + t.SetFirstErrorCode(SerErrFailBase, err.Error()) + return err + } + return nil + +} + +func (t *DecodeRestReq) checkParam(msg []byte) ([]byte, error) { + + var temp map[string]interface{} + err := json.Unmarshal(msg, &temp) + if err != nil { + log.Errorf(err, "invalid json to map: %s", util.BytesToStringWithNoCopy(msg)) + t.SetFirstErrorCode(SerErrFailBase, err.Error()) + return nil, err + } + + meputil.SetMapValue(temp, "consumedLocalOnly", true) + meputil.SetMapValue(temp, "isLocal", true) + meputil.SetMapValue(temp, "scopeOfLocality", "MEC_HOST") + + msg, err = json.Marshal(&temp) + if err != nil { + log.Errorf(err, "invalid map to json") + t.SetFirstErrorCode(SerErrFailBase, err.Error()) + return nil, err + } + + return msg, nil +} + +func (t *DecodeRestReq) WithBody(body interface{}) *DecodeRestReq { + t.RestBody = body + return t +} + +func (t *DecodeRestReq) GetParam(r *http.Request) { + query, _ := meputil.GetHTTPTags(r) + t.AppInstanceId = query.Get(":appInstanceId") + t.SubscribeId = query.Get(":subscriptionId") + t.ServiceId = query.Get(":serviceId") + t.Ctx = util.SetTargetDomainProject(r.Context(), r.Header.Get("X-Domain-Name"), query.Get(":project")) +} + +type RegisterServiceId struct { + HttpErrInf *proto.Response `json:"httpErrInf,out"` + workspace.TaskBase + Ctx context.Context `json:"ctx,in"` + ServiceId string `json:"serviceId,out"` + RestBody interface{} `json:"restBody,in"` +} + +func (t *RegisterServiceId) OnRequest(data string) workspace.TaskCode { + + serviceInfo, ok := t.RestBody.(*models.ServiceInfo) + if !ok { + t.SetFirstErrorCode(1, "restbody failed") + return workspace.TaskFinish + } + req := &proto.CreateServiceRequest{} + serviceInfo.ToServiceRequest(req) + resp, err := core.ServiceAPI.Create(t.Ctx, req) + if err != nil { + log.Errorf(err, "Service Center ServiceAPI.Create fail: %s!", err.Error()) + t.SetFirstErrorCode(1, err.Error()) + return workspace.TaskFinish + } + + if resp.ServiceId == "" { + t.HttpErrInf = resp.Response + log.Warn("Service id empty.") + } + t.ServiceId = resp.ServiceId + return workspace.TaskFinish +} + +type RegisterServiceInst struct { + HttpErrInf *proto.Response `json:"httpErrInf,out"` + workspace.TaskBase + W http.ResponseWriter `json:"w,in"` + Ctx context.Context `json:"ctx,in"` + AppInstanceId string `json:"appInstanceId,in"` + ServiceId string `json:"serviceId,in"` + InstanceId string `json:"instanceId,out"` + RestBody interface{} `json:"restBody,in"` + HttpRsp interface{} `json:"httpRsp,out"` +} + +func (t *RegisterServiceInst) OnRequest(data string) workspace.TaskCode { + serviceInfo, ok := t.RestBody.(*models.ServiceInfo) + if !ok { + t.SetFirstErrorCode(1, "restbody failed") + return workspace.TaskFinish + } + req := &proto.RegisterInstanceRequest{} + serviceInfo.ToRegisterInstance(req) + req.Instance.ServiceId = t.ServiceId + req.Instance.Properties["appInstanceId"] = t.AppInstanceId + resp, err := core.InstanceAPI.Register(t.Ctx, req) + if err != nil { + log.Errorf(err, "RegisterInstance fail: %s", t.ServiceId) + t.HttpErrInf = &proto.Response{} + t.HttpErrInf.Code = svcerr.ErrForbidden + t.HttpErrInf.Message = err.Error() + return workspace.TaskFinish + } + t.InstanceId = resp.InstanceId + + //build response serviceComb use serviceId + InstanceId to mark a service instance + mp1SerId := t.ServiceId + t.InstanceId + serviceInfo.SerInstanceId = mp1SerId + t.HttpRsp = serviceInfo + + location := fmt.Sprintf("/mep/mp1/v1/services/%s", mp1SerId) + t.W.Header().Set("Location", location) + return workspace.TaskFinish +} diff --git a/mep/mepserver/mp1/plan_send_httprsp_created.go b/mep/mepserver/mp1/plan_send_httprsp_created.go new file mode 100644 index 0000000..dce8a4e --- /dev/null +++ b/mep/mepserver/mp1/plan_send_httprsp_created.go @@ -0,0 +1,38 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "net/http" + + "github.com/apache/servicecomb-service-center/server/core/proto" + + "mepserver/mp1/arch/workspace" + "mepserver/mp1/util" +) + +type SendHttpRspCreated struct { + HttpErrInf *proto.Response `json:"httpErrInf,in"` + workspace.TaskBase + W http.ResponseWriter `json:"w,in"` + HttpRsp interface{} `json:"httpRsp,in"` +} + +func (t *SendHttpRspCreated) OnRequest(data string) workspace.TaskCode { + util.WriteHTTPResponse(t.W, t.HttpErrInf, t.HttpRsp) + return workspace.TaskFinish +} diff --git a/mep/mepserver/mp1/plan_subscribe_app.go b/mep/mepserver/mp1/plan_subscribe_app.go new file mode 100644 index 0000000..c93ab27 --- /dev/null +++ b/mep/mepserver/mp1/plan_subscribe_app.go @@ -0,0 +1,90 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/server/core/backend" + "github.com/apache/servicecomb-service-center/server/core/proto" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" + "github.com/satori/go.uuid" + + "mepserver/mp1/arch/workspace" + "mepserver/mp1/models" +) + +type SubscribeIst struct { + workspace.TaskBase + R *http.Request `json:"r,in"` + HttpErrInf *proto.Response `json:"httpErrInf,out"` + Ctx context.Context `json:"ctx,in"` + W http.ResponseWriter `json:"w,in"` + RestBody interface{} `json:"restBody,in"` + AppInstanceId string `json:"appInstanceId,in"` + SubscribeId string `json:"subscribeId,in"` + HttpRsp interface{} `json:"httpRsp,out"` +} + +func (t *SubscribeIst) OnRequest(data string) workspace.TaskCode { + + mp1SubscribeInfo, ok := t.RestBody.(*models.SerAvailabilityNotificationSubscription) + if !ok { + t.SetFirstErrorCode(SerErrFailBase, "restBody failed") + return workspace.TaskFinish + } + + appInstanceId := t.AppInstanceId + subscribeId := uuid.NewV4().String() + t.SubscribeId = subscribeId + subscribeJSON, err := json.Marshal(mp1SubscribeInfo) + if err != nil { + log.Errorf(err, "can not Marshal subscribe info") + t.SetFirstErrorCode(ParseInfoErr, "can not marshal subscribe info") + return workspace.TaskFinish + } + opts := []registry.PluginOp{ + registry.OpPut(registry.WithStrKey("/cse-sr/etsi/subscribe/"+appInstanceId+"/"+subscribeId), + registry.WithValue(subscribeJSON)), + } + _, resultErr := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil) + if resultErr != nil { + log.Errorf(err, "subscription to etcd failed!") + t.SetFirstErrorCode(OperateDataWithEtcdErr, "put subscription to etcd failed") + return workspace.TaskFinish + } + + req := &proto.WatchInstanceRequest{SelfServiceId: appInstanceId[:len(appInstanceId)/2]} + t.R.Method = "WATCHLIST" + WebsocketListAndWatch(t.Ctx, req, appInstanceId) + t.buildResponse(mp1SubscribeInfo) + + return workspace.TaskFinish +} + +func (t *SubscribeIst) buildResponse(sub *models.SerAvailabilityNotificationSubscription) { + appInstanceID := t.AppInstanceId + subscribeID := t.SubscribeId + + t.HttpRsp = sub + location := fmt.Sprintf("/mec_service_mgmt/v1/applications/%s/subscriptions/%s/", appInstanceID, subscribeID) + t.W.Header().Set("Location", location) +} diff --git a/mep/mepserver/mp1/plan_update_svc.go b/mep/mepserver/mp1/plan_update_svc.go new file mode 100644 index 0000000..4e19194 --- /dev/null +++ b/mep/mepserver/mp1/plan_update_svc.go @@ -0,0 +1,79 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "context" + + "github.com/apache/servicecomb-service-center/pkg/util" + "github.com/apache/servicecomb-service-center/server/core/proto" + svcutil "github.com/apache/servicecomb-service-center/server/service/util" + + "mepserver/mp1/arch/workspace" + "mepserver/mp1/models" + meputil "mepserver/mp1/util" +) + +type UpdateInstance struct { + workspace.TaskBase + HttpErrInf *proto.Response `json:"httpErrInf,out"` + Ctx context.Context `json:"ctx,in"` + ServiceId string `json:"serviceId,in"` + RestBody interface{} `json:"restBody,in"` + HttpRsp interface{} `json:"httpRsp,out"` +} + +func (t *UpdateInstance) OnRequest(data string) workspace.TaskCode { + if t.ServiceId == "" { + t.SetFirstErrorCode(SerErrFailBase, "param is empty") + return workspace.TaskFinish + } + mp1Ser, ok := t.RestBody.(*models.ServiceInfo) + if !ok { + t.SetFirstErrorCode(SerErrFailBase, "body invalid") + return workspace.TaskFinish + } + + instance, err := meputil.GetServiceInstance(t.Ctx, t.ServiceId) + if err != nil { + t.SetFirstErrorCode(SerInstanceNotFound, "find service failed") + return workspace.TaskFinish + } + + copyInstanceRef := *instance + req := proto.RegisterInstanceRequest{ + Instance: ©InstanceRef, + } + mp1Ser.ToRegisterInstance(&req) + + domainProject := util.ParseDomainProject(t.Ctx) + centerErr := svcutil.UpdateInstance(t.Ctx, domainProject, ©InstanceRef) + if centerErr != nil { + t.SetFirstErrorCode(SerErrServiceUpdFailed, "update service failed") + return workspace.TaskFinish + } + + err = meputil.Heartbeat(t.Ctx, mp1Ser.SerInstanceId) + if err != nil { + t.SetFirstErrorCode(SerErrServiceUpdFailed, "heartbeat failed") + return workspace.TaskFinish + } + mp1Ser.SerInstanceId = instance.ServiceId + instance.InstanceId + t.HttpRsp = mp1Ser + t.HttpErrInf = proto.CreateResponse(proto.Response_SUCCESS, "Update service instance success.") + return workspace.TaskFinish +} diff --git a/mep/mepserver/mp1/publisher.go b/mep/mepserver/mp1/publisher.go new file mode 100644 index 0000000..bc2c09a --- /dev/null +++ b/mep/mepserver/mp1/publisher.go @@ -0,0 +1,105 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "sync" + "time" + + "github.com/apache/servicecomb-service-center/pkg/gopool" + "golang.org/x/net/context" +) + +type Publisher struct { + wss []*Websocket + goroutine *gopool.Pool + lock sync.Mutex +} + +func (p *Publisher) Run() { + gopool.Go(publisher.loop) +} + +func (p *Publisher) loop(ctx context.Context) { + defer p.Stop() + ticker := time.NewTicker(500 * time.Millisecond) + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + var removes []int + for i, ws := range p.wss { + payload := ws.Pick() + if payload == nil { + continue + } + _, ok := payload.(error) + if ok { + removes = append(removes, i) + } + p.dispatch(ws, payload) + } + if len(removes) == 0 { + continue + } + p.lock.Lock() + var ( + news []*Websocket + s int + ) + for _, e := range removes { + news = append(news, p.wss[s:e]...) + s = e + 1 + } + if s < len(p.wss) { + news = append(news, p.wss[s:]...) + } + p.wss = news + p.lock.Unlock() + } + } +} + +func (p *Publisher) Stop() { + p.goroutine.Close(true) +} + +func (p *Publisher) dispatch(ws *Websocket, payload interface{}) { + p.goroutine.Do(func(ctx context.Context) { + ws.HandleWatchWebSocketJob(payload) + }) +} + +func (p *Publisher) Accept(ws *Websocket) { + p.lock.Lock() + p.wss = append(p.wss, ws) + p.lock.Unlock() +} + +var publisher *Publisher + +func init() { + publisher = NewPublisher() + publisher.Run() +} + +func NewPublisher() *Publisher { + return &Publisher{ + goroutine: gopool.New(context.Background()), + } +} diff --git a/mep/mepserver/mp1/websocket.go b/mep/mepserver/mp1/websocket.go new file mode 100644 index 0000000..f921fde --- /dev/null +++ b/mep/mepserver/mp1/websocket.go @@ -0,0 +1,255 @@ +/* + * Copyright 2020 Huawei Technologies Co., Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package mp1 + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "strings" + "time" + + "github.com/apache/servicecomb-service-center/pkg/log" + "github.com/apache/servicecomb-service-center/server/core/backend" + "github.com/apache/servicecomb-service-center/server/core/proto" + "github.com/apache/servicecomb-service-center/server/notify" + "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry" + + "mepserver/mp1/models" +) + +type Websocket struct { + watcher *notify.InstanceEventListWatcher + ticker *time.Ticker + ctx context.Context + needPingWatcher bool + free chan struct{} + closed chan struct{} + serviceID string +} + +func (ws *Websocket) Init() error { + ws.ticker = time.NewTicker(notify.HeartbeatInterval) + ws.needPingWatcher = true + ws.free = make(chan struct{}, 1) + ws.closed = make(chan struct{}) + ws.SetReady() + if err := notify.NotifyCenter().AddSubscriber(ws.watcher); err != nil { + return err + } + publisher.Accept(ws) + return nil +} + +func (ws *Websocket) ReadTimeout() time.Duration { + return notify.ReadTimeout +} + +func (ws *Websocket) SendTimeout() time.Duration { + return notify.SendTimeout +} + +func (ws *Websocket) HandleWatchWebSocketControlMessage() { + +} + +func (ws *Websocket) HandleWatchWebSocketJob(payload interface{}) { + defer ws.SetReady() + var ( + job *notify.InstanceEvent + ) + switch v := payload.(type) { + case error: + err := payload.(error) + log.Errorf(err, "watcher catch an error, subject: %s, group: %s", ws.watcher.Subject(), ws.watcher.Group()) + case time.Time: + return + case *notify.InstanceEvent: + serviceID := ws.serviceID + job = payload.(*notify.InstanceEvent) + resp := job.Response + SendMsgToApp(resp, serviceID) + default: + log.Errorf(nil, "watcher unknown input type %T, subject: %s, group: %s", v, ws.watcher.Subject(), ws.watcher.Group()) + return + } + + select { + case _, ok := <-ws.closed: + if !ok { + log.Warn("websocket channel closed") + } + return + default: + } +} + +func (ws *Websocket) SetReady() { + select { + case ws.free <- struct{}{}: + default: + } + +} + +func (ws *Websocket) Pick() interface{} { + select { + case _, ok := <-ws.Ready(): + if !ok { + log.Warn("websocket ready channel closed") + } + if ws.watcher.Err() != nil { + return ws.watcher.Err() + } + + select { + case t, ok := <-ws.ticker.C: + if !ok { + log.Warn("websocket ticker C channel closed") + } + return t + case j, ok := <-ws.watcher.Job: + if !ok { + log.Warn("websocket watcher job channel closed") + } + if j == nil { + err := fmt.Errorf("server shutdown") + log.Error("server shutdown", err) + } + return j + default: + ws.SetReady() + } + default: + } + return nil +} + +func (ws *Websocket) Ready() chan struct{} { + return ws.free +} + +func (ws *Websocket) Stop() { + close(ws.closed) +} + +func getCallBackUris(serviceID string, instanceID string, serName string) []string { + var callbackUris []string + opts := []registry.PluginOp{ + registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/"+serviceID), registry.WithPrefix()), + } + resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil) + if err != nil { + log.Errorf(err, "get subcription from etcd failed!") + return callbackUris + } + for _, v := range resp.Kvs { + var notifyInfo models.SerAvailabilityNotificationSubscription + if v.Value == nil { + log.Warn("the value is nil in etcd") + continue + } + err = json.Unmarshal(v.Value, ¬ifyInfo) + if err != nil { + log.Warn("notify json can not be parsed to notifyInfo") + continue + } + callbackURI := notifyInfo.CallbackReference + filter := notifyInfo.FilteringCriteria + + if (len(filter.SerInstanceIds) != 0 && StringContains(filter.SerInstanceIds, instanceID) != -1) || + (len(filter.SerNames) != 0 && StringContains(filter.SerNames, serviceID) != -1) { + callbackUris = append(callbackUris, callbackURI) + } + } + log.Infof("send to consumerIds: %s", callbackUris) + return callbackUris +} + +func StringContains(arr []string, val string) (index int) { + index = -1 + for i := 0; i < len(arr); i++ { + if arr[i] == val { + index = i + return + } + } + return +} + +func SendMsgToApp(data *proto.WatchInstanceResponse, serviceID string) { + // transfer data to instanceInfo, and get instaceid, serviceName + instanceID := data.Instance.ServiceId + data.Instance.InstanceId + serName := data.Instance.Properties["serName"] + action := data.Action + instanceInfo := data.Instance + instanceStr, err := json.Marshal(instanceInfo) + if err != nil { + log.Errorf(err, "parse instanceInfo failed!") + return + } + + callbackUris := getCallBackUris(serviceID, instanceID, serName) + body := strings.NewReader(string(instanceStr)) + doSend(action, body, callbackUris) +} + +func doSend(action string, body io.Reader, callbackUris []string) { + for _, callbackURI := range callbackUris { + log.Debugf("action: %s with callbackURI:%s", action, callbackURI) + if !strings.HasPrefix(callbackURI, "http") { + callbackURI = "http://" + callbackURI + } + client := http.Client{} + + if action == "CREATE" { + contentType := "application/x-www-form-urlencoded" + _, err := http.Post(callbackURI, contentType, body) + if err != nil { + log.Warn("the consumer handle post action failed!") + } + } else if action == "DELETE" { + req, err := http.NewRequest("delete", callbackURI, body) + if err != nil { + _, err := client.Do(req) + if err != nil { + log.Warn("the consumer handle delete action failed!") + } + + } else { + log.Errorf(err, "crate request failed!") + } + } else if action == "UPDATE" { + req, err := http.NewRequest("put", callbackURI, body) + if err != nil { + _, err := client.Do(req) + if err != nil { + log.Warn("the consumer handle update action failed!") + } + + } else { + log.Errorf(err, "crate request failed!") + } + } + } +} + +func DoWebSocketListAndWatchV2(ctx context.Context, id string, id2 string, f func() ([]*proto.WatchInstanceResponse, int64)) { + //TBD +}