Merge "mecm-mepm uninstall playbook added"
[ealt-edge.git] / mep / mepserver / mp1 / publisher.go
1 /*
2  * Copyright 2020 Huawei Technologies Co., Ltd.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *     http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 package mp1
18
19 import (
20         "sync"
21         "time"
22
23         "github.com/apache/servicecomb-service-center/pkg/gopool"
24         "golang.org/x/net/context"
25 )
26
27 type Publisher struct {
28         wss       []*Websocket
29         goroutine *gopool.Pool
30         lock      sync.Mutex
31 }
32
33 func (p *Publisher) Run() {
34         gopool.Go(publisher.loop)
35 }
36
37 func (p *Publisher) loop(ctx context.Context) {
38         defer p.Stop()
39         ticker := time.NewTicker(500 * time.Millisecond)
40         for {
41                 select {
42                 case <-ctx.Done():
43                         return
44                 case <-ticker.C:
45                         var removes []int
46                         for i, ws := range p.wss {
47                                 payload := ws.Pick()
48                                 if payload == nil {
49                                         continue
50                                 }
51                                 _, ok := payload.(error)
52                                 if ok {
53                                         removes = append(removes, i)
54                                 }
55                                 p.dispatch(ws, payload)
56                         }
57                         if len(removes) == 0 {
58                                 continue
59                         }
60                         p.lock.Lock()
61                         var (
62                                 news []*Websocket
63                                 s    int
64                         )
65                         for _, e := range removes {
66                                 news = append(news, p.wss[s:e]...)
67                                 s = e + 1
68                         }
69                         if s < len(p.wss) {
70                                 news = append(news, p.wss[s:]...)
71                         }
72                         p.wss = news
73                         p.lock.Unlock()
74                 }
75         }
76 }
77
78 func (p *Publisher) Stop() {
79         p.goroutine.Close(true)
80 }
81
82 func (p *Publisher) dispatch(ws *Websocket, payload interface{}) {
83         p.goroutine.Do(func(ctx context.Context) {
84                 ws.HandleWatchWebSocketJob(payload)
85         })
86 }
87
88 func (p *Publisher) Accept(ws *Websocket) {
89         p.lock.Lock()
90         p.wss = append(p.wss, ws)
91         p.lock.Unlock()
92 }
93
94 var publisher *Publisher
95
96 func init() {
97         publisher = NewPublisher()
98         publisher.Run()
99 }
100
101 func NewPublisher() *Publisher {
102         return &Publisher{
103                 goroutine: gopool.New(context.Background()),
104         }
105 }