goroutine.go 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package common
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "math"
  7. "sync"
  8. "sync/atomic"
  9. "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
  10. )
  11. var (
  12. GoWithRecover = util.GoWithRecover
  13. )
  14. /**
  15. fork join
  16. todo
  17. 1、添加自定义 fork 方法
  18. //2、添加限制最大运行的g数量(但是Go里面没有池的概念,需要自己实现,池的效果没有重新开辟一个G合适,优化的方式可以是,保证当前运行中的最大G个数)
  19. */
  20. // Job 并发处理,有线程安全问题
  21. // start , end 属于左闭右开
  22. // result 子任务返回的结果
  23. type Job func(ctx context.Context, startIndex, endIndex uint64) (result interface{}, err error)
  24. // JobHandler 串行处理,无线程安全问题
  25. // data 是每个job返回的结果
  26. type JobResultHandler func(ctx context.Context, result interface{}) error
  27. // ctx 上下文传递
  28. // totalCount 全部的任务数
  29. // forkCount 每个任务切分的数量
  30. // 所以开启的G的数量差不多是 totalCount/forkCount
  31. // job 并行处理子任务的逻辑
  32. // handlerData 子任务结果的处理逻辑
  33. // maxRunG 最大并发运行的g数量,目前来看 并发 100w个G=4800M内存,所以需要最大并发量限制
  34. func ParallelJobRun(ctx context.Context, totalCount, forkCount uint64, job Job, jobResultHandler JobResultHandler, maxRunG int64) error {
  35. if ctx == nil || totalCount == 0 || forkCount == 0 || job == nil || jobResultHandler == nil {
  36. return errors.New("the params has error")
  37. }
  38. if maxRunG <= 0 {
  39. maxRunG = math.MaxInt32
  40. }
  41. receiveChannel := make(chan interface{}, 0) // 串行处理,所以不需要buffer,设置了buffer可以解决的问题是可以提前释放g
  42. errChannel := make(chan error, 0)
  43. ctx, cancel := context.WithCancel(ctx)
  44. defer func() {
  45. cancel() // 最后关闭,强行通知上下文,关闭处理,防止的问题一个子任务出现问题,需要告知其他的全部出现问题,不做任何处理
  46. close(errChannel) // 这个必须关闭 channel,如果在job线程中去关闭,我们的主线程会强行收到两个通知,也就是假如rc收到后return就不会关闭ec,所以job只关闭rc,主程序退出再关闭rc
  47. }()
  48. go func() {
  49. getGNum := func() uint64 {
  50. if totalCount%forkCount != 0 {
  51. return (totalCount / forkCount) + 1
  52. }
  53. return totalCount / forkCount
  54. }
  55. var (
  56. gNum = getGNum()
  57. wg = sync.WaitGroup{}
  58. )
  59. defer func() {
  60. wg.Wait()
  61. close(receiveChannel)
  62. }()
  63. jobInfo := jobInfo{
  64. ctx: ctx,
  65. wg: &wg,
  66. forkCount: forkCount,
  67. totalCount: totalCount,
  68. errChannel: errChannel,
  69. receiveChannel: receiveChannel,
  70. job: job,
  71. }
  72. limitGoroutineRunJob(gNum, &jobInfo, maxRunG)
  73. }()
  74. for {
  75. select {
  76. case data, isOpen := <-receiveChannel:
  77. if !isOpen {
  78. return nil
  79. }
  80. // 同步执行
  81. err := jobResultHandler(ctx, data)
  82. if err != nil {
  83. return err
  84. }
  85. case err := <-errChannel:
  86. if err != nil {
  87. return err
  88. }
  89. case <-ctx.Done():
  90. return ctx.Err()
  91. }
  92. }
  93. }
  94. func runJob(jobInfo *jobInfo, num uint64) {
  95. jobInfo.wg.Add(1)
  96. defer func() {
  97. if err := recover(); err != nil {
  98. jobInfo.errChannel <- errors.New(fmt.Sprintf("panic:\n %v", err))
  99. }
  100. // 最后关闭wg,保证所有的channel都未关闭,防止panic
  101. jobInfo.wg.Done()
  102. }()
  103. start := num * jobInfo.forkCount
  104. end := (num + 1) * jobInfo.forkCount
  105. // 左闭右开
  106. if end > jobInfo.totalCount {
  107. end = jobInfo.totalCount
  108. }
  109. result, err := jobInfo.job(jobInfo.ctx, start, end)
  110. if err != nil {
  111. select {
  112. case <-jobInfo.ctx.Done():
  113. case jobInfo.errChannel <- err:
  114. }
  115. return
  116. }
  117. select {
  118. case <-jobInfo.ctx.Done():
  119. case jobInfo.receiveChannel <- result:
  120. }
  121. }
  122. type jobInfo struct {
  123. ctx context.Context
  124. wg *sync.WaitGroup
  125. forkCount, totalCount uint64
  126. errChannel chan<- error
  127. receiveChannel chan<- interface{}
  128. job Job
  129. }
  130. func limitGoroutineRunJob(gNum uint64, job *jobInfo, maxRunNum int64) {
  131. var (
  132. init int64 = 0
  133. curRunningJob = &init
  134. cond = NewCond()
  135. count uint64 = 0
  136. )
  137. for ; count < gNum; count++ {
  138. if atomic.AddInt64(curRunningJob, 1) > maxRunNum {
  139. cond.Wait()
  140. }
  141. go func(count uint64) {
  142. defer func() {
  143. atomic.AddInt64(curRunningJob, -1)
  144. cond.Notify() // 可以多次notify,所以不需要条件判断
  145. //if atomic.AddInt64(curRunningJob, -1) < maxRunNum {
  146. // cond.Notify()
  147. //}
  148. }()
  149. runJob(job, count)
  150. }(count)
  151. }
  152. }