| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- package kafka
- import (
- "context"
- "errors"
- "git.shuncheng.lu/bigthing/gocommon/pkg/conf"
- "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
- )
- var (
- defaultKafkaClient *kafkaClient
- )
- type Producer interface {
- Producer(ctx context.Context, topic, message string, key string) error
- ProducerMsg(ctx context.Context, topic, message string) error
- Close() error
- }
- func GetKafkaProducer() Producer {
- return defaultKafkaClient
- }
- type kafkaProducerConfig struct {
- kafkaClientConfig
- }
- func getKafkaProducerConfig(assertNil util.SetAndAssertNil) (*kafkaProducerConfig, error) {
- config := make(map[string]string, 0)
- if err := assertNil(config, "kafka", "host"); err != nil {
- return nil, err
- }
- if err := assertNil(config, "kafka", "log_path"); err != nil {
- return nil, err
- }
- if err := assertNil(config, "kafka", "group_name", "sarama"); err != nil {
- return nil, err
- }
- host := util.SplitIgnoreSpace(config["host"], ",")
- if host == nil || len(host) == 0 {
- return nil, errors.New("kafka host can not empty")
- }
- return &kafkaProducerConfig{
- kafkaClientConfig: kafkaClientConfig{
- Host: host,
- LogPath: config["log_path"],
- GroupName: config["group_name"],
- },
- }, nil
- }
- func InitProducer() error {
- config, err := getKafkaProducerConfig(conf.SetAndAssertNil)
- if err != nil {
- return err
- }
- util.Debugf("[KafkaProducer] load config success, config=%+v", config)
- client, err := newKafkaClient(producerClient, config.kafkaClientConfig)
- if err != nil {
- return err
- }
- defaultKafkaClient = client
- return nil
- }
|