consumer.go 685 B

1234567891011121314151617181920212223242526272829303132333435363738
  1. package rocketmq
  2. import (
  3. "git.shuncheng.lu/bigthing/gocommon/pkg/conf"
  4. )
  5. // Global 对象
  6. var (
  7. _consumer *rocketMqConsumer
  8. )
  9. // 获取Consumer(Init后使用)
  10. func GetConsumer() Consumer {
  11. //if _consumer == nil {
  12. // return nil, errors.New("rocket-mq consumer not start, please boot init rocket-mq consumer")
  13. //}
  14. return _consumer
  15. }
  16. // 初始化
  17. func InitConsumer() error {
  18. config, err := GetRocketMQConfig(ConsumerMQ, conf.MustValue)
  19. if err != nil {
  20. return err
  21. }
  22. initLogger(config)
  23. con, err := newConsumer(config.ToConsumerConfig()...)
  24. if err != nil {
  25. return err
  26. }
  27. _consumer = con
  28. return nil
  29. }
  30. // 启动
  31. func StartConsumer() error {
  32. return _consumer.start()
  33. }