kafak_producer.go 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package kafka
  2. import (
  3. "context"
  4. "errors"
  5. "git.shuncheng.lu/bigthing/gocommon/pkg/conf"
  6. "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
  7. )
  8. var (
  9. defaultKafkaClient *kafkaClient
  10. )
  11. type Producer interface {
  12. Producer(ctx context.Context, topic, message string, key string) error
  13. ProducerMsg(ctx context.Context, topic, message string) error
  14. Close() error
  15. }
  16. func GetKafkaProducer() Producer {
  17. return defaultKafkaClient
  18. }
  19. type kafkaProducerConfig struct {
  20. kafkaClientConfig
  21. }
  22. func getKafkaProducerConfig(assertNil util.SetAndAssertNil) (*kafkaProducerConfig, error) {
  23. config := make(map[string]string, 0)
  24. if err := assertNil(config, "kafka", "host"); err != nil {
  25. return nil, err
  26. }
  27. if err := assertNil(config, "kafka", "log_path"); err != nil {
  28. return nil, err
  29. }
  30. if err := assertNil(config, "kafka", "group_name", "sarama"); err != nil {
  31. return nil, err
  32. }
  33. host := util.SplitIgnoreSpace(config["host"], ",")
  34. if host == nil || len(host) == 0 {
  35. return nil, errors.New("kafka host can not empty")
  36. }
  37. return &kafkaProducerConfig{
  38. kafkaClientConfig: kafkaClientConfig{
  39. Host: host,
  40. LogPath: config["log_path"],
  41. GroupName: config["group_name"],
  42. },
  43. }, nil
  44. }
  45. func InitProducer() error {
  46. config, err := getKafkaProducerConfig(conf.SetAndAssertNil)
  47. if err != nil {
  48. return err
  49. }
  50. util.Debugf("[KafkaProducer] load config success, config=%+v", config)
  51. client, err := newKafkaClient(producerClient, config.kafkaClientConfig)
  52. if err != nil {
  53. return err
  54. }
  55. defaultKafkaClient = client
  56. return nil
  57. }