Merge "mecm-mepm uninstall playbook added"
[ealt-edge.git] / mep / mepserver / mp1 / instance_event_handler.go
1 /*
2  * Copyright 2020 Huawei Technologies Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package mp1
18
19 import (
20         "encoding/json"
21         "strings"
22
23         "github.com/apache/servicecomb-service-center/pkg/log"
24         "github.com/apache/servicecomb-service-center/pkg/util"
25         apt "github.com/apache/servicecomb-service-center/server/core"
26         "github.com/apache/servicecomb-service-center/server/core/backend"
27         "github.com/apache/servicecomb-service-center/server/core/proto"
28         "github.com/apache/servicecomb-service-center/server/notify"
29         "github.com/apache/servicecomb-service-center/server/plugin/pkg/discovery"
30         "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
31         "github.com/apache/servicecomb-service-center/server/service/cache"
32         "github.com/apache/servicecomb-service-center/server/service/metrics"
33         svcutil "github.com/apache/servicecomb-service-center/server/service/util"
34         "golang.org/x/net/context"
35
36         "mepserver/mp1/models"
37 )
38
39 const indexStart, indexEnd = 33, 17
40
41 type InstanceEtsiEventHandler struct {
42 }
43
44 func (h *InstanceEtsiEventHandler) Type() discovery.Type {
45         return backend.INSTANCE
46 }
47
48 func (h *InstanceEtsiEventHandler) OnEvent(evt discovery.KvEvent) {
49         action := evt.Type
50         instance, ok := evt.KV.Value.(*proto.MicroServiceInstance)
51         if !ok {
52                 log.Warn("cast to instance failed")
53         }
54         providerId, providerInstanceId, domainProject := apt.GetInfoFromInstKV(evt.KV.Key)
55         idx := strings.Index(domainProject, "/")
56         domainName := domainProject[:idx]
57         switch action {
58         case proto.EVT_INIT:
59                 metrics.ReportInstances(domainName, 1)
60                 return
61         case proto.EVT_CREATE:
62                 metrics.ReportInstances(domainName, 1)
63         case proto.EVT_DELETE:
64                 metrics.ReportInstances(domainName, -1)
65                 if !apt.IsDefaultDomainProject(domainProject) {
66                         projectName := domainProject[idx+1:]
67                         svcutil.RemandInstanceQuota(util.SetDomainProject(context.Background(), domainName, projectName))
68                 }
69         }
70
71         if notify.NotifyCenter().Closed() {
72                 log.Warnf("caught [%s] instance [%s/%s] event, endpoints %v, but notify service is closed",
73                         action, providerId, providerInstanceId, instance.Endpoints)
74                 return
75         }
76
77         ctx := context.WithValue(context.WithValue(context.Background(),
78                 svcutil.CTX_CACHEONLY, "1"),
79                 svcutil.CTX_GLOBAL, "1")
80         ms, err := svcutil.GetService(ctx, domainProject, providerId)
81         if ms == nil {
82                 log.Errorf(err, "caught [%s] instance [%s/%s] event, endpoints %v, get cached provider's file failed",
83                         action, providerId, providerInstanceId, instance.Endpoints)
84                 return
85         }
86
87         log.Infof("caught [%s] service[%s][%s/%s/%s/%s] isntance[%s] event, endpoints %v",
88                 action, providerId, ms.Environment, ms.AppId, ms.ServiceName, ms.Version, providerInstanceId, instance.Endpoints)
89
90         consumerIds := getCosumerIds()
91
92         log.Infof("there are %d consuemrIDs, %s", len(consumerIds), consumerIds)
93         PublishInstanceEvent(evt, domainProject, proto.MicroServiceToKey(domainProject, ms), consumerIds)
94 }
95
96 func getCosumerIds() []string {
97         var consumerIds []string
98         opts := []registry.PluginOp{
99                 registry.OpGet(registry.WithStrKey("/cse-sr/inst/files"), registry.WithPrefix()),
100         }
101         resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
102         if err != nil {
103                 log.Errorf(err, "get subscription from etcd failed")
104                 return consumerIds
105         }
106
107         for _, kvs := range resp.Kvs {
108                 key := kvs.Key
109                 keystring := string(key)
110                 value := kvs.Value
111
112                 var mp1Req models.ServiceInfo
113                 err = json.Unmarshal(value, &mp1Req)
114                 if err != nil {
115                         log.Errorf(err, "parse serviceInfo failed")
116                 }
117                 length := len(keystring)
118                 keystring = keystring[length-indexStart : length-indexEnd]
119                 if StringContains(consumerIds, keystring) == -1 {
120                         consumerIds = append(consumerIds, keystring)
121                 }
122         }
123         return consumerIds
124 }
125
126 func NewInstanceEtsiEventHandler() *InstanceEtsiEventHandler {
127         return &InstanceEtsiEventHandler{}
128 }
129
130 func PublishInstanceEvent(evt discovery.KvEvent, domainProject string, serviceKey *proto.MicroServiceKey, subscribers []string) {
131         defer cache.FindInstances.Remove(serviceKey)
132         if len(subscribers) == 0 {
133                 log.Warn("the subscribers size is 0")
134                 return
135         }
136
137         response := &proto.WatchInstanceResponse{
138                 Response: proto.CreateResponse(proto.Response_SUCCESS, "Watch instance successfully."),
139                 Action:   string(evt.Type),
140                 Key:      serviceKey,
141                 Instance: evt.KV.Value.(*proto.MicroServiceInstance),
142         }
143         for _, consumerId := range subscribers {
144                 job := notify.NewInstanceEventWithTime(consumerId, domainProject, evt.Revision, evt.CreateAt, response)
145                 err := notify.NotifyCenter().Publish(job)
146                 if err != nil {
147                         log.Errorf(err, "publish failed")
148                 }
149         }
150 }