95308ab711249f729d7afac98b23ab1d37a38e4f
[ealt-edge.git] / mep / mepserver / mp1 / controller.go
1 /*
2  * Copyright 2020 Huawei Technologies Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package mp1
18
19 import (
20         "context"
21         "net/http"
22
23         "github.com/apache/servicecomb-service-center/pkg/log"
24         "github.com/apache/servicecomb-service-center/pkg/rest"
25         "github.com/apache/servicecomb-service-center/pkg/util"
26         "github.com/apache/servicecomb-service-center/server/core/proto"
27         "github.com/apache/servicecomb-service-center/server/notify"
28         v4 "github.com/apache/servicecomb-service-center/server/rest/controller/v4"
29         svcutil "github.com/apache/servicecomb-service-center/server/service/util"
30
31         "mepserver/mp1/arch/workspace"
32         "mepserver/mp1/models"
33 )
34
35 const (
36         basePath         = "/mep/mec_service_mgmt/v1"
37         servicesPath     = basePath + "/services"
38         appServicesPath  = basePath + "/applications/:appInstanceId" + "/services"
39         appSubscribePath = basePath + "/applications/:appInstanceId/subscriptions"
40 )
41
42 const (
43         SerErrFailBase         workspace.ErrCode = workspace.TaskFail
44         SerErrServiceNotFound                    = 2
45         SerInstanceNotFound                      = 3
46         ParseInfoErr                             = 4
47         SubscriptionNotFound                     = 5
48         OperateDataWithEtcdErr                   = 6
49         SerErrServiceDelFailed                   = 7
50         SerErrServiceUpdFailed                   = 8
51 )
52
53 type APIHookFunc func() models.EndPointInfo
54
55 type APIGwHook struct {
56         APIHook APIHookFunc
57 }
58
59 var apihook APIGwHook
60
61 func SetAPIHook(hook APIGwHook) {
62         apihook = hook
63 }
64
65 func GetApiHook() APIGwHook {
66         return apihook
67 }
68
69 func init() {
70         initRouter()
71 }
72
73 func initRouter() {
74         rest.
75                 RegisterServant(&Mp1Service{})
76 }
77
78 type Mp1Service struct {
79         v4.MicroServiceService
80 }
81
82 func (m *Mp1Service) URLPatterns() []rest.Route {
83         return []rest.Route{
84                 // appSubscriptions
85                 {Method: rest.HTTP_METHOD_POST, Path: appSubscribePath, Func: doAppSubscribe},
86                 {Method: rest.HTTP_METHOD_GET, Path: appSubscribePath, Func: getAppSubscribes},
87                 {Method: rest.HTTP_METHOD_GET, Path: appSubscribePath + "/:subscriptionId", Func: getOneAppSubscribe},
88                 {Method: rest.HTTP_METHOD_DELETE, Path: appSubscribePath + "/:subscriptionId", Func: delOneAppSubscribe},
89                 // appServices
90                 {Method: rest.HTTP_METHOD_POST, Path: appServicesPath, Func: serviceRegister},
91                 {Method: rest.HTTP_METHOD_GET, Path: appServicesPath, Func: serviceDiscover},
92                 {Method: rest.HTTP_METHOD_PUT, Path: appServicesPath + "/:serviceId", Func: serviceUpdate},
93                 {Method: rest.HTTP_METHOD_GET, Path: appServicesPath + "/:serviceId", Func: getOneService},
94                 {Method: rest.HTTP_METHOD_DELETE, Path: appServicesPath + "/:serviceId", Func: serviceDelete},
95                 // services
96                 {Method: rest.HTTP_METHOD_GET, Path: servicesPath, Func: serviceDiscover},
97                 {Method: rest.HTTP_METHOD_GET, Path: servicesPath + "/:serviceId", Func: getOneService},
98         }
99 }
100
101 func doAppSubscribe(w http.ResponseWriter, r *http.Request) {
102
103         workPlan := NewWorkSpace(w, r)
104         workPlan.Try(
105                 (&DecodeRestReq{}).WithBody(&models.SerAvailabilityNotificationSubscription{}),
106                 &SubscribeIst{})
107         workPlan.Finally(&SendHttpRspCreated{})
108
109         workspace.WkRun(workPlan)
110 }
111
112 func getAppSubscribes(w http.ResponseWriter, r *http.Request) {
113
114         workPlan := NewWorkSpace(w, r)
115         workPlan.Try(
116                 &DecodeRestReq{},
117                 &GetSubscribes{})
118         workPlan.Finally(&SendHttpRsp{})
119
120         workspace.WkRun(workPlan)
121 }
122
123 func getOneAppSubscribe(w http.ResponseWriter, r *http.Request) {
124
125         workPlan := NewWorkSpace(w, r)
126         workPlan.Try(
127                 &DecodeRestReq{},
128                 &GetOneSubscribe{})
129         workPlan.Finally(&SendHttpRsp{})
130
131         workspace.WkRun(workPlan)
132 }
133
134 func delOneAppSubscribe(w http.ResponseWriter, r *http.Request) {
135
136         workPlan := NewWorkSpace(w, r)
137         workPlan.Try(
138                 &DecodeRestReq{},
139                 &DelOneSubscribe{})
140         workPlan.Finally(&SendHttpRsp{})
141
142         workspace.WkRun(workPlan)
143 }
144
145 func serviceRegister(w http.ResponseWriter, r *http.Request) {
146         log.Info("Register service start...")
147
148         workPlan := NewWorkSpace(w, r)
149         workPlan.Try(
150                 (&DecodeRestReq{}).WithBody(&models.ServiceInfo{}),
151                 &RegisterServiceId{},
152                 &RegisterServiceInst{})
153         workPlan.Finally(&SendHttpRspCreated{})
154
155         workspace.WkRun(workPlan)
156 }
157
158 func serviceDiscover(w http.ResponseWriter, r *http.Request) {
159         log.Info("Discover service service start...")
160
161         workPlan := NewWorkSpace(w, r)
162         workPlan.Try(
163                 &DiscoverDecode{},
164                 &DiscoverService{},
165                 &ToStrDiscover{},
166                 &RspHook{})
167         workPlan.Finally(&SendHttpRsp{})
168
169         workspace.WkRun(workPlan)
170 }
171
172 func serviceUpdate(w http.ResponseWriter, r *http.Request) {
173         log.Info("Update a service start...")
174
175         workPlan := NewWorkSpace(w, r)
176         workPlan.Try(
177                 (&DecodeRestReq{}).WithBody(&models.ServiceInfo{}),
178                 &UpdateInstance{})
179         workPlan.Finally(&SendHttpRsp{})
180
181         workspace.WkRun(workPlan)
182 }
183
184 func getOneService(w http.ResponseWriter, r *http.Request) {
185         log.Info("Register service start...")
186
187         workPlan := NewWorkSpace(w, r)
188         workPlan.Try(
189                 &GetOneDecode{},
190                 &GetOneInstance{})
191         workPlan.Finally(&SendHttpRsp{})
192
193         workspace.WkRun(workPlan)
194
195 }
196
197 func serviceDelete(w http.ResponseWriter, r *http.Request) {
198         log.Info("Delete a service start...")
199
200         workPlan := NewWorkSpace(w, r)
201         workPlan.Try(
202                 &DecodeRestReq{},
203                 &DeleteService{})
204         workPlan.Finally(&SendHttpRsp{})
205
206         workspace.WkRun(workPlan)
207 }
208
209 func WebsocketListAndWatch(ctx context.Context, req *proto.WatchInstanceRequest, consumerSvcId string) {
210         if req == nil || len(req.SelfServiceId) == 0 {
211                 log.Warn("request fomat invalid!")
212                 return
213         }
214         domainProject := util.ParseDomainProject(ctx)
215         if !svcutil.ServiceExist(ctx, domainProject, req.SelfServiceId) {
216                 log.Warn("service does not exist!")
217                 return
218         }
219         DoWebsocketListAndWatch(ctx, req.SelfServiceId, consumerSvcId, func() ([]*proto.WatchInstanceResponse, int64) {
220                 return svcutil.QueryAllProvidersInstances(ctx, req.SelfServiceId)
221         })
222 }
223
224 func DoWebsocketListAndWatch(ctx context.Context, serviceId string, consumerSvcId string, f func() ([]*proto.WatchInstanceResponse, int64)) {
225         domainProject := util.ParseDomainProject(ctx)
226         socket := &Websocket{
227                 ctx:       ctx,
228                 watcher:   notify.NewInstanceEventListWatcher(serviceId, domainProject, f),
229                 serviceID: consumerSvcId,
230         }
231         ProcessSocket(socket)
232 }
233
234 func ProcessSocket(socket *Websocket) {
235         if err := socket.Init(); err != nil {
236                 return
237         }
238         socket.HandleWatchWebSocketControlMessage()
239 }