package kafka import ( "context" "errors" "git.shuncheng.lu/bigthing/gocommon/pkg/conf" "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util" ) var ( defaultKafkaClient *kafkaClient ) type Producer interface { Producer(ctx context.Context, topic, message string, key string) error ProducerMsg(ctx context.Context, topic, message string) error Close() error } func GetKafkaProducer() Producer { return defaultKafkaClient } type kafkaProducerConfig struct { kafkaClientConfig } func getKafkaProducerConfig(assertNil util.SetAndAssertNil) (*kafkaProducerConfig, error) { config := make(map[string]string, 0) if err := assertNil(config, "kafka", "host"); err != nil { return nil, err } if err := assertNil(config, "kafka", "log_path"); err != nil { return nil, err } if err := assertNil(config, "kafka", "group_name", "sarama"); err != nil { return nil, err } host := util.SplitIgnoreSpace(config["host"], ",") if host == nil || len(host) == 0 { return nil, errors.New("kafka host can not empty") } return &kafkaProducerConfig{ kafkaClientConfig: kafkaClientConfig{ Host: host, LogPath: config["log_path"], GroupName: config["group_name"], }, }, nil } func InitProducer() error { config, err := getKafkaProducerConfig(conf.SetAndAssertNil) if err != nil { return err } util.Debugf("[KafkaProducer] load config success, config=%+v", config) client, err := newKafkaClient(producerClient, config.kafkaClientConfig) if err != nil { return err } defaultKafkaClient = client return nil }