f921fdee1e6531e7af9a23e82b2fc7f8fcdba4fb
[ealt-edge.git] / mep / mepserver / mp1 / websocket.go
1 /*
2  * Copyright 2020 Huawei Technologies Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package mp1
18
19 import (
20         "context"
21         "encoding/json"
22         "fmt"
23         "io"
24         "net/http"
25         "strings"
26         "time"
27
28         "github.com/apache/servicecomb-service-center/pkg/log"
29         "github.com/apache/servicecomb-service-center/server/core/backend"
30         "github.com/apache/servicecomb-service-center/server/core/proto"
31         "github.com/apache/servicecomb-service-center/server/notify"
32         "github.com/apache/servicecomb-service-center/server/plugin/pkg/registry"
33
34         "mepserver/mp1/models"
35 )
36
37 type Websocket struct {
38         watcher         *notify.InstanceEventListWatcher
39         ticker          *time.Ticker
40         ctx             context.Context
41         needPingWatcher bool
42         free            chan struct{}
43         closed          chan struct{}
44         serviceID       string
45 }
46
47 func (ws *Websocket) Init() error {
48         ws.ticker = time.NewTicker(notify.HeartbeatInterval)
49         ws.needPingWatcher = true
50         ws.free = make(chan struct{}, 1)
51         ws.closed = make(chan struct{})
52         ws.SetReady()
53         if err := notify.NotifyCenter().AddSubscriber(ws.watcher); err != nil {
54                 return err
55         }
56         publisher.Accept(ws)
57         return nil
58 }
59
60 func (ws *Websocket) ReadTimeout() time.Duration {
61         return notify.ReadTimeout
62 }
63
64 func (ws *Websocket) SendTimeout() time.Duration {
65         return notify.SendTimeout
66 }
67
68 func (ws *Websocket) HandleWatchWebSocketControlMessage() {
69
70 }
71
72 func (ws *Websocket) HandleWatchWebSocketJob(payload interface{}) {
73         defer ws.SetReady()
74         var (
75                 job *notify.InstanceEvent
76         )
77         switch v := payload.(type) {
78         case error:
79                 err := payload.(error)
80                 log.Errorf(err, "watcher catch an error, subject: %s, group: %s", ws.watcher.Subject(), ws.watcher.Group())
81         case time.Time:
82                 return
83         case *notify.InstanceEvent:
84                 serviceID := ws.serviceID
85                 job = payload.(*notify.InstanceEvent)
86                 resp := job.Response
87                 SendMsgToApp(resp, serviceID)
88         default:
89                 log.Errorf(nil, "watcher unknown input type %T, subject: %s, group: %s", v, ws.watcher.Subject(), ws.watcher.Group())
90                 return
91         }
92
93         select {
94         case _, ok := <-ws.closed:
95                 if !ok {
96                         log.Warn("websocket channel closed")
97                 }
98                 return
99         default:
100         }
101 }
102
103 func (ws *Websocket) SetReady() {
104         select {
105         case ws.free <- struct{}{}:
106         default:
107         }
108
109 }
110
111 func (ws *Websocket) Pick() interface{} {
112         select {
113         case _, ok := <-ws.Ready():
114                 if !ok {
115                         log.Warn("websocket ready channel closed")
116                 }
117                 if ws.watcher.Err() != nil {
118                         return ws.watcher.Err()
119                 }
120
121                 select {
122                 case t, ok := <-ws.ticker.C:
123                         if !ok {
124                                 log.Warn("websocket ticker C channel closed")
125                         }
126                         return t
127                 case j, ok := <-ws.watcher.Job:
128                         if !ok {
129                                 log.Warn("websocket watcher job channel closed")
130                         }
131                         if j == nil {
132                                 err := fmt.Errorf("server shutdown")
133                                 log.Error("server shutdown", err)
134                         }
135                         return j
136                 default:
137                         ws.SetReady()
138                 }
139         default:
140         }
141         return nil
142 }
143
144 func (ws *Websocket) Ready() chan struct{} {
145         return ws.free
146 }
147
148 func (ws *Websocket) Stop() {
149         close(ws.closed)
150 }
151
152 func getCallBackUris(serviceID string, instanceID string, serName string) []string {
153         var callbackUris []string
154         opts := []registry.PluginOp{
155                 registry.OpGet(registry.WithStrKey("/cse-sr/etsi/subscribe/"+serviceID), registry.WithPrefix()),
156         }
157         resp, err := backend.Registry().TxnWithCmp(context.Background(), opts, nil, nil)
158         if err != nil {
159                 log.Errorf(err, "get subcription from etcd failed!")
160                 return callbackUris
161         }
162         for _, v := range resp.Kvs {
163                 var notifyInfo models.SerAvailabilityNotificationSubscription
164                 if v.Value == nil {
165                         log.Warn("the value is nil in etcd")
166                         continue
167                 }
168                 err = json.Unmarshal(v.Value, &notifyInfo)
169                 if err != nil {
170                         log.Warn("notify json can not be parsed to notifyInfo")
171                         continue
172                 }
173                 callbackURI := notifyInfo.CallbackReference
174                 filter := notifyInfo.FilteringCriteria
175
176                 if (len(filter.SerInstanceIds) != 0 && StringContains(filter.SerInstanceIds, instanceID) != -1) ||
177                         (len(filter.SerNames) != 0 && StringContains(filter.SerNames, serviceID) != -1) {
178                         callbackUris = append(callbackUris, callbackURI)
179                 }
180         }
181         log.Infof("send to consumerIds: %s", callbackUris)
182         return callbackUris
183 }
184
185 func StringContains(arr []string, val string) (index int) {
186         index = -1
187         for i := 0; i < len(arr); i++ {
188                 if arr[i] == val {
189                         index = i
190                         return
191                 }
192         }
193         return
194 }
195
196 func SendMsgToApp(data *proto.WatchInstanceResponse, serviceID string) {
197         // transfer data to instanceInfo, and get instaceid, serviceName
198         instanceID := data.Instance.ServiceId + data.Instance.InstanceId
199         serName := data.Instance.Properties["serName"]
200         action := data.Action
201         instanceInfo := data.Instance
202         instanceStr, err := json.Marshal(instanceInfo)
203         if err != nil {
204                 log.Errorf(err, "parse instanceInfo failed!")
205                 return
206         }
207
208         callbackUris := getCallBackUris(serviceID, instanceID, serName)
209         body := strings.NewReader(string(instanceStr))
210         doSend(action, body, callbackUris)
211 }
212
213 func doSend(action string, body io.Reader, callbackUris []string) {
214         for _, callbackURI := range callbackUris {
215                 log.Debugf("action: %s with callbackURI:%s", action, callbackURI)
216                 if !strings.HasPrefix(callbackURI, "http") {
217                         callbackURI = "http://" + callbackURI
218                 }
219                 client := http.Client{}
220
221                 if action == "CREATE" {
222                         contentType := "application/x-www-form-urlencoded"
223                         _, err := http.Post(callbackURI, contentType, body)
224                         if err != nil {
225                                 log.Warn("the consumer handle post action failed!")
226                         }
227                 } else if action == "DELETE" {
228                         req, err := http.NewRequest("delete", callbackURI, body)
229                         if err != nil {
230                                 _, err := client.Do(req)
231                                 if err != nil {
232                                         log.Warn("the consumer handle delete action failed!")
233                                 }
234
235                         } else {
236                                 log.Errorf(err, "crate request failed!")
237                         }
238                 } else if action == "UPDATE" {
239                         req, err := http.NewRequest("put", callbackURI, body)
240                         if err != nil {
241                                 _, err := client.Do(req)
242                                 if err != nil {
243                                         log.Warn("the consumer handle update action failed!")
244                                 }
245
246                         } else {
247                                 log.Errorf(err, "crate request failed!")
248                         }
249                 }
250         }
251 }
252
253 func DoWebSocketListAndWatchV2(ctx context.Context, id string, id2 string, f func() ([]*proto.WatchInstanceResponse, int64)) {
254         //TBD
255 }