2 * Copyright 2020 Huawei Technologies Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 "github.com/apache/servicecomb-service-center/pkg/log"
24 "github.com/apache/servicecomb-service-center/pkg/util"
25 apt "github.com/apache/servicecomb-service-center/server/core"
26 "github.com/apache/servicecomb-service-center/server/core/backend"
27 "github.com/apache/servicecomb-service-center/server/core/proto"
28 "github.com/apache/servicecomb-service-center/server/notify"
29 "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
30 "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
31 "github.com/apache/servicecomb-service-center/server/service/cache"
32 "github.com/apache/servicecomb-service-center/server/service/metrics"
33 svcutil "github.com/apache/servicecomb-service-center/server/service/util"
34 "golang.org/x/net/context"
36 "mepserver/mp1/models"
39 const indexStart, indexEnd = 33, 17
41 type InstanceEtsiEventHandler struct {
44 func (h *InstanceEtsiEventHandler) Type() discovery.Type {
45 return backend.INSTANCE
48 func (h *InstanceEtsiEventHandler) OnEvent(evt discovery.KvEvent) {
50 instance, ok := evt.KV.Value.(*proto.MicroServiceInstance)
52 log.Warn("cast to instance failed")
54 providerId, providerInstanceId, domainProject := apt.GetInfoFromInstKV(evt.KV.Key)
55 idx := strings.Index(domainProject, "/")
56 domainName := domainProject[:idx]
59 metrics.ReportInstances(domainName, 1)
61 case proto.EVT_CREATE:
62 metrics.ReportInstances(domainName, 1)
63 case proto.EVT_DELETE:
64 metrics.ReportInstances(domainName, -1)
65 if !apt.IsDefaultDomainProject(domainProject) {
66 projectName := domainProject[idx+1:]
67 svcutil.RemandInstanceQuota(util.SetDomainProject(context.Background(), domainName, projectName))
71 if notify.NotifyCenter().Closed() {
72 log.Warnf("caught [%s] instance [%s/%s] event, endpoints %v, but notify service is closed",
73 action, providerId, providerInstanceId, instance.Endpoints)
77 ctx := context.WithValue(context.WithValue(context.Background(),
78 svcutil.CTX_CACHEONLY, "1"),
79 svcutil.CTX_GLOBAL, "1")
80 ms, err := svcutil.GetService(ctx, domainProject, providerId)
82 log.Errorf(err, "caught [%s] instance [%s/%s] event, endpoints %v, get cached provider's file failed",
83 action, providerId, providerInstanceId, instance.Endpoints)
87 log.Infof("caught [%s] service[%s][%s/%s/%s/%s] isntance[%s] event, endpoints %v",
88 action, providerId, ms.Environment, ms.AppId, ms.ServiceName, ms.Version, providerInstanceId, instance.Endpoints)
90 consumerIds := getCosumerIds()
92 log.Infof("there are %d consuemrIDs, %s", len(consumerIds), consumerIds)
93 PublishInstanceEvent(evt, domainProject, proto.MicroServiceToKey(domainProject, ms), consumerIds)
96 func getCosumerIds() []string {
97 var consumerIds []string
98 opts := []registry.PluginOp{
99 registry.OpGet(registry.WithStrKey("/cse-sr/inst/files"), registry.WithPrefix()),
101 resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
103 log.Errorf(err, "get subscription from etcd failed")
107 for _, kvs := range resp.Kvs {
109 keystring := string(key)
112 var mp1Req models.ServiceInfo
113 err = json.Unmarshal(value, &mp1Req)
115 log.Errorf(err, "parse serviceInfo failed")
117 length := len(keystring)
118 keystring = keystring[length-indexStart : length-indexEnd]
119 if StringContains(consumerIds, keystring) == -1 {
120 consumerIds = append(consumerIds, keystring)
126 func NewInstanceEtsiEventHandler() *InstanceEtsiEventHandler {
127 return &InstanceEtsiEventHandler{}
130 func PublishInstanceEvent(evt discovery.KvEvent, domainProject string, serviceKey *proto.MicroServiceKey, subscribers []string) {
131 defer cache.FindInstances.Remove(serviceKey)
132 if len(subscribers) == 0 {
133 log.Warn("the subscribers size is 0")
137 response := &proto.WatchInstanceResponse{
138 Response: proto.CreateResponse(proto.Response_SUCCESS, "Watch instance successfully."),
139 Action: string(evt.Type),
141 Instance: evt.KV.Value.(*proto.MicroServiceInstance),
143 for _, consumerId := range subscribers {
144 job := notify.NewInstanceEventWithTime(consumerId, domainProject, evt.Revision, evt.CreateAt, response)
145 err := notify.NotifyCenter().Publish(job)
147 log.Errorf(err, "publish failed")