package common import "sync/atomic" type GPool interface { AddJob(run Fun, args ...interface{}) Stop() } type task struct { args []interface{} run Fun } type Fun func(args ...interface{}) type gPool struct { runningCount int64 maxG int64 taskChannel chan task cond Condition down chan uint8 isBlock bool } func New(maxG int64, isBlock bool) GPool { pool := new(gPool) pool.maxG = maxG pool.cond = NewCond() pool.taskChannel = make(chan task, maxG) pool.down = make(chan uint8, 0) pool.isBlock = isBlock pool.run() return pool } func (g *gPool) AddJob(run Fun, args ...interface{}) { if run == nil { return } task := task{ run: run, args: args, } addJob := func() { select { case <-g.down: case g.taskChannel <- task: } } if g.isBlock { addJob() } else { go addJob() } } func (g *gPool) Stop() { close(g.down) close(g.taskChannel) g.cond.NotifyAll() } func (g *gPool) run() { go func() { defer func() { g.Stop() }() for { select { case job := <-g.taskChannel: if atomic.AddInt64(&g.runningCount, 1) > g.maxG { g.cond.Wait() } select { case <-g.down: default: GoWithRecover(func() { defer func() { atomic.AddInt64(&g.runningCount, -1) g.cond.Notify() }() job.run(job.args...) }, nil) } case <-g.down: return } } }() }