gpool.go 1.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package common
  2. import "sync/atomic"
  3. type GPool interface {
  4. AddJob(run Fun, args ...interface{})
  5. Stop()
  6. }
  7. type task struct {
  8. args []interface{}
  9. run Fun
  10. }
  11. type Fun func(args ...interface{})
  12. type gPool struct {
  13. runningCount int64
  14. maxG int64
  15. taskChannel chan task
  16. cond Condition
  17. down chan uint8
  18. isBlock bool
  19. }
  20. func New(maxG int64, isBlock bool) GPool {
  21. pool := new(gPool)
  22. pool.maxG = maxG
  23. pool.cond = NewCond()
  24. pool.taskChannel = make(chan task, maxG)
  25. pool.down = make(chan uint8, 0)
  26. pool.isBlock = isBlock
  27. pool.run()
  28. return pool
  29. }
  30. func (g *gPool) AddJob(run Fun, args ...interface{}) {
  31. if run == nil {
  32. return
  33. }
  34. task := task{
  35. run: run,
  36. args: args,
  37. }
  38. addJob := func() {
  39. select {
  40. case <-g.down:
  41. case g.taskChannel <- task:
  42. }
  43. }
  44. if g.isBlock {
  45. addJob()
  46. } else {
  47. go addJob()
  48. }
  49. }
  50. func (g *gPool) Stop() {
  51. close(g.down)
  52. close(g.taskChannel)
  53. g.cond.NotifyAll()
  54. }
  55. func (g *gPool) run() {
  56. go func() {
  57. defer func() {
  58. g.Stop()
  59. }()
  60. for {
  61. select {
  62. case job := <-g.taskChannel:
  63. if atomic.AddInt64(&g.runningCount, 1) > g.maxG {
  64. g.cond.Wait()
  65. }
  66. select {
  67. case <-g.down:
  68. default:
  69. GoWithRecover(func() {
  70. defer func() {
  71. atomic.AddInt64(&g.runningCount, -1)
  72. g.cond.Notify()
  73. }()
  74. job.run(job.args...)
  75. }, nil)
  76. }
  77. case <-g.down:
  78. return
  79. }
  80. }
  81. }()
  82. }