Remove BPA from Makefile
[icn.git] / cmd / bpa-operator / vendor / github.com / modern-go / concurrent / unbounded_executor.go
1 package concurrent
2
3 import (
4         "context"
5         "fmt"
6         "runtime"
7         "runtime/debug"
8         "sync"
9         "time"
10         "reflect"
11 )
12
13 // HandlePanic logs goroutine panic by default
14 var HandlePanic = func(recovered interface{}, funcName string) {
15         ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
16         ErrorLogger.Println(string(debug.Stack()))
17 }
18
19 // UnboundedExecutor is a executor without limits on counts of alive goroutines
20 // it tracks the goroutine started by it, and can cancel them when shutdown
21 type UnboundedExecutor struct {
22         ctx                   context.Context
23         cancel                context.CancelFunc
24         activeGoroutinesMutex *sync.Mutex
25         activeGoroutines      map[string]int
26         HandlePanic           func(recovered interface{}, funcName string)
27 }
28
29 // GlobalUnboundedExecutor has the life cycle of the program itself
30 // any goroutine want to be shutdown before main exit can be started from this executor
31 // GlobalUnboundedExecutor expects the main function to call stop
32 // it does not magically knows the main function exits
33 var GlobalUnboundedExecutor = NewUnboundedExecutor()
34
35 // NewUnboundedExecutor creates a new UnboundedExecutor,
36 // UnboundedExecutor can not be created by &UnboundedExecutor{}
37 // HandlePanic can be set with a callback to override global HandlePanic
38 func NewUnboundedExecutor() *UnboundedExecutor {
39         ctx, cancel := context.WithCancel(context.TODO())
40         return &UnboundedExecutor{
41                 ctx:                   ctx,
42                 cancel:                cancel,
43                 activeGoroutinesMutex: &sync.Mutex{},
44                 activeGoroutines:      map[string]int{},
45         }
46 }
47
48 // Go starts a new goroutine and tracks its lifecycle.
49 // Panic will be recovered and logged automatically, except for StopSignal
50 func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
51         pc := reflect.ValueOf(handler).Pointer()
52         f := runtime.FuncForPC(pc)
53         funcName := f.Name()
54         file, line := f.FileLine(pc)
55         executor.activeGoroutinesMutex.Lock()
56         defer executor.activeGoroutinesMutex.Unlock()
57         startFrom := fmt.Sprintf("%s:%d", file, line)
58         executor.activeGoroutines[startFrom] += 1
59         go func() {
60                 defer func() {
61                         recovered := recover()
62                         // if you want to quit a goroutine without trigger HandlePanic
63                         // use runtime.Goexit() to quit
64                         if recovered != nil {
65                                 if executor.HandlePanic == nil {
66                                         HandlePanic(recovered, funcName)
67                                 } else {
68                                         executor.HandlePanic(recovered, funcName)
69                                 }
70                         }
71                         executor.activeGoroutinesMutex.Lock()
72                         executor.activeGoroutines[startFrom] -= 1
73                         executor.activeGoroutinesMutex.Unlock()
74                 }()
75                 handler(executor.ctx)
76         }()
77 }
78
79 // Stop cancel all goroutines started by this executor without wait
80 func (executor *UnboundedExecutor) Stop() {
81         executor.cancel()
82 }
83
84 // StopAndWaitForever cancel all goroutines started by this executor and
85 // wait until all goroutines exited
86 func (executor *UnboundedExecutor) StopAndWaitForever() {
87         executor.StopAndWait(context.Background())
88 }
89
90 // StopAndWait cancel all goroutines started by this executor and wait.
91 // Wait can be cancelled by the context passed in.
92 func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
93         executor.cancel()
94         for {
95                 oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
96                 select {
97                 case <-oneHundredMilliseconds.C:
98                         if executor.checkNoActiveGoroutines() {
99                                 return
100                         }
101                 case <-ctx.Done():
102                         return
103                 }
104         }
105 }
106
107 func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
108         executor.activeGoroutinesMutex.Lock()
109         defer executor.activeGoroutinesMutex.Unlock()
110         for startFrom, count := range executor.activeGoroutines {
111                 if count > 0 {
112                         InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
113                                 "startFrom", startFrom,
114                                 "count", count)
115                         return false
116                 }
117         }
118         return true
119 }