13 // HandlePanic logs goroutine panic by default
14 var HandlePanic = func(recovered interface{}, funcName string) {
15 ErrorLogger.Println(fmt.Sprintf("%s panic: %v", funcName, recovered))
16 ErrorLogger.Println(string(debug.Stack()))
19 // UnboundedExecutor is a executor without limits on counts of alive goroutines
20 // it tracks the goroutine started by it, and can cancel them when shutdown
21 type UnboundedExecutor struct {
23 cancel context.CancelFunc
24 activeGoroutinesMutex *sync.Mutex
25 activeGoroutines map[string]int
26 HandlePanic func(recovered interface{}, funcName string)
29 // GlobalUnboundedExecutor has the life cycle of the program itself
30 // any goroutine want to be shutdown before main exit can be started from this executor
31 // GlobalUnboundedExecutor expects the main function to call stop
32 // it does not magically knows the main function exits
33 var GlobalUnboundedExecutor = NewUnboundedExecutor()
35 // NewUnboundedExecutor creates a new UnboundedExecutor,
36 // UnboundedExecutor can not be created by &UnboundedExecutor{}
37 // HandlePanic can be set with a callback to override global HandlePanic
38 func NewUnboundedExecutor() *UnboundedExecutor {
39 ctx, cancel := context.WithCancel(context.TODO())
40 return &UnboundedExecutor{
43 activeGoroutinesMutex: &sync.Mutex{},
44 activeGoroutines: map[string]int{},
48 // Go starts a new goroutine and tracks its lifecycle.
49 // Panic will be recovered and logged automatically, except for StopSignal
50 func (executor *UnboundedExecutor) Go(handler func(ctx context.Context)) {
51 pc := reflect.ValueOf(handler).Pointer()
52 f := runtime.FuncForPC(pc)
54 file, line := f.FileLine(pc)
55 executor.activeGoroutinesMutex.Lock()
56 defer executor.activeGoroutinesMutex.Unlock()
57 startFrom := fmt.Sprintf("%s:%d", file, line)
58 executor.activeGoroutines[startFrom] += 1
61 recovered := recover()
62 // if you want to quit a goroutine without trigger HandlePanic
63 // use runtime.Goexit() to quit
65 if executor.HandlePanic == nil {
66 HandlePanic(recovered, funcName)
68 executor.HandlePanic(recovered, funcName)
71 executor.activeGoroutinesMutex.Lock()
72 executor.activeGoroutines[startFrom] -= 1
73 executor.activeGoroutinesMutex.Unlock()
79 // Stop cancel all goroutines started by this executor without wait
80 func (executor *UnboundedExecutor) Stop() {
84 // StopAndWaitForever cancel all goroutines started by this executor and
85 // wait until all goroutines exited
86 func (executor *UnboundedExecutor) StopAndWaitForever() {
87 executor.StopAndWait(context.Background())
90 // StopAndWait cancel all goroutines started by this executor and wait.
91 // Wait can be cancelled by the context passed in.
92 func (executor *UnboundedExecutor) StopAndWait(ctx context.Context) {
95 oneHundredMilliseconds := time.NewTimer(time.Millisecond * 100)
97 case <-oneHundredMilliseconds.C:
98 if executor.checkNoActiveGoroutines() {
107 func (executor *UnboundedExecutor) checkNoActiveGoroutines() bool {
108 executor.activeGoroutinesMutex.Lock()
109 defer executor.activeGoroutinesMutex.Unlock()
110 for startFrom, count := range executor.activeGoroutines {
112 InfoLogger.Println("UnboundedExecutor is still waiting goroutines to quit",
113 "startFrom", startFrom,