| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495 |
- package common
- import "sync/atomic"
- type GPool interface {
- AddJob(run Fun, args ...interface{})
- Stop()
- }
- type task struct {
- args []interface{}
- run Fun
- }
- type Fun func(args ...interface{})
- type gPool struct {
- runningCount int64
- maxG int64
- taskChannel chan task
- cond Condition
- down chan uint8
- isBlock bool
- }
- func New(maxG int64, isBlock bool) GPool {
- pool := new(gPool)
- pool.maxG = maxG
- pool.cond = NewCond()
- pool.taskChannel = make(chan task, maxG)
- pool.down = make(chan uint8, 0)
- pool.isBlock = isBlock
- pool.run()
- return pool
- }
- func (g *gPool) AddJob(run Fun, args ...interface{}) {
- if run == nil {
- return
- }
- task := task{
- run: run,
- args: args,
- }
- addJob := func() {
- select {
- case <-g.down:
- case g.taskChannel <- task:
- }
- }
- if g.isBlock {
- addJob()
- } else {
- go addJob()
- }
- }
- func (g *gPool) Stop() {
- close(g.down)
- close(g.taskChannel)
- g.cond.NotifyAll()
- }
- func (g *gPool) run() {
- go func() {
- defer func() {
- g.Stop()
- }()
- for {
- select {
- case job := <-g.taskChannel:
- if atomic.AddInt64(&g.runningCount, 1) > g.maxG {
- g.cond.Wait()
- }
- select {
- case <-g.down:
- default:
- GoWithRecover(func() {
- defer func() {
- atomic.AddInt64(&g.runningCount, -1)
- g.cond.Notify()
- }()
- job.run(job.args...)
- }, nil)
- }
- case <-g.down:
- return
- }
- }
- }()
- }
|