2 * Copyright 2020 Huawei Technologies Co., Ltd.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
23 "github.com/apache/servicecomb-service-center/pkg/gopool"
24 "golang.org/x/net/context"
27 type Publisher struct {
29 goroutine *gopool.Pool
33 func (p *Publisher) Run() {
34 gopool.Go(publisher.loop)
37 func (p *Publisher) loop(ctx context.Context) {
39 ticker := time.NewTicker(500 * time.Millisecond)
46 for i, ws := range p.wss {
51 _, ok := payload.(error)
53 removes = append(removes, i)
55 p.dispatch(ws, payload)
57 if len(removes) == 0 {
65 for _, e := range removes {
66 news = append(news, p.wss[s:e]...)
70 news = append(news, p.wss[s:]...)
78 func (p *Publisher) Stop() {
79 p.goroutine.Close(true)
82 func (p *Publisher) dispatch(ws *Websocket, payload interface{}) {
83 p.goroutine.Do(func(ctx context.Context) {
84 ws.HandleWatchWebSocketJob(payload)
88 func (p *Publisher) Accept(ws *Websocket) {
90 p.wss = append(p.wss, ws)
94 var publisher *Publisher
97 publisher = NewPublisher()
101 func NewPublisher() *Publisher {
103 goroutine: gopool.New(context.Background()),