2 * Copyright 2020 Huawei Technologies Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
28 "github.com/apache/servicecomb-service-center/pkg/log"
29 "github.com/apache/servicecomb-service-center/server/core/backend"
30 "github.com/apache/servicecomb-service-center/server/core/proto"
31 "github.com/apache/servicecomb-service-center/server/notify"
32 "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
34 "mepserver/mp1/models"
37 type Websocket struct {
38 watcher *notify.InstanceEventListWatcher
47 func (ws *Websocket) Init() error {
48 ws.ticker = time.NewTicker(notify.HeartbeatInterval)
49 ws.needPingWatcher = true
50 ws.free = make(chan struct{}, 1)
51 ws.closed = make(chan struct{})
53 if err := notify.NotifyCenter().AddSubscriber(ws.watcher); err != nil {
60 func (ws *Websocket) ReadTimeout() time.Duration {
61 return notify.ReadTimeout
64 func (ws *Websocket) SendTimeout() time.Duration {
65 return notify.SendTimeout
68 func (ws *Websocket) HandleWatchWebSocketControlMessage() {
72 func (ws *Websocket) HandleWatchWebSocketJob(payload interface{}) {
75 job *notify.InstanceEvent
77 switch v := payload.(type) {
79 err := payload.(error)
80 log.Errorf(err, "watcher catch an error, subject: %s, group: %s", ws.watcher.Subject(), ws.watcher.Group())
83 case *notify.InstanceEvent:
84 serviceID := ws.serviceID
85 job = payload.(*notify.InstanceEvent)
87 SendMsgToApp(resp, serviceID)
89 log.Errorf(nil, "watcher unknown input type %T, subject: %s, group: %s", v, ws.watcher.Subject(), ws.watcher.Group())
94 case _, ok := <-ws.closed:
96 log.Warn("websocket channel closed")
103 func (ws *Websocket) SetReady() {
105 case ws.free <- struct{}{}:
111 func (ws *Websocket) Pick() interface{} {
113 case _, ok := <-ws.Ready():
115 log.Warn("websocket ready channel closed")
117 if ws.watcher.Err() != nil {
118 return ws.watcher.Err()
122 case t, ok := <-ws.ticker.C:
124 log.Warn("websocket ticker C channel closed")
127 case j, ok := <-ws.watcher.Job:
129 log.Warn("websocket watcher job channel closed")
132 err := fmt.Errorf("server shutdown")
133 log.Error("server shutdown", err)
144 func (ws *Websocket) Ready() chan struct{} {
148 func (ws *Websocket) Stop() {
152 func getCallBackUris(serviceID string, instanceID string, serName string) []string {
153 var callbackUris []string
154 opts := []registry.PluginOp{
155 registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/"+serviceID), registry.WithPrefix()),
157 resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
159 log.Errorf(err, "get subcription from etcd failed!")
162 for _, v := range resp.Kvs {
163 var notifyInfo models.SerAvailabilityNotificationSubscription
165 log.Warn("the value is nil in etcd")
168 err = json.Unmarshal(v.Value, ¬ifyInfo)
170 log.Warn("notify json can not be parsed to notifyInfo")
173 callbackURI := notifyInfo.CallbackReference
174 filter := notifyInfo.FilteringCriteria
176 if (len(filter.SerInstanceIds) != 0 && StringContains(filter.SerInstanceIds, instanceID) != -1) ||
177 (len(filter.SerNames) != 0 && StringContains(filter.SerNames, serviceID) != -1) {
178 callbackUris = append(callbackUris, callbackURI)
181 log.Infof("send to consumerIds: %s", callbackUris)
185 func StringContains(arr []string, val string) (index int) {
187 for i := 0; i < len(arr); i++ {
196 func SendMsgToApp(data *proto.WatchInstanceResponse, serviceID string) {
197 // transfer data to instanceInfo, and get instaceid, serviceName
198 instanceID := data.Instance.ServiceId + data.Instance.InstanceId
199 serName := data.Instance.Properties["serName"]
200 action := data.Action
201 instanceInfo := data.Instance
202 instanceStr, err := json.Marshal(instanceInfo)
204 log.Errorf(err, "parse instanceInfo failed!")
208 callbackUris := getCallBackUris(serviceID, instanceID, serName)
209 body := strings.NewReader(string(instanceStr))
210 doSend(action, body, callbackUris)
213 func doSend(action string, body io.Reader, callbackUris []string) {
214 for _, callbackURI := range callbackUris {
215 log.Debugf("action: %s with callbackURI:%s", action, callbackURI)
216 if !strings.HasPrefix(callbackURI, "http") {
217 callbackURI = "http://" + callbackURI
219 client := http.Client{}
221 if action == "CREATE" {
222 contentType := "application/x-www-form-urlencoded"
223 _, err := http.Post(callbackURI, contentType, body)
225 log.Warn("the consumer handle post action failed!")
227 } else if action == "DELETE" {
228 req, err := http.NewRequest("delete", callbackURI, body)
230 _, err := client.Do(req)
232 log.Warn("the consumer handle delete action failed!")
236 log.Errorf(err, "crate request failed!")
238 } else if action == "UPDATE" {
239 req, err := http.NewRequest("put", callbackURI, body)
241 _, err := client.Do(req)
243 log.Warn("the consumer handle update action failed!")
247 log.Errorf(err, "crate request failed!")
253 func DoWebSocketListAndWatchV2(ctx context.Context, id string, id2 string, f func() ([]*proto.WatchInstanceResponse, int64)) {