package elastic import ( "context" "fmt" "os" "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util" "git.shuncheng.lu/bigthing/gocommon/pkg/conf" "log" "github.com/olivere/elastic" ) var ( elasticClient *elastic.Client elasticHost string ) func Init() error { return initElastic() } func initElastic() error { ctx := context.Background() config := make(map[string]string, 0) if err := conf.SetAndAssertNil(config, "elastic", "host"); err != nil { return err } if err := conf.SetAndAssertNil(config, "elastic", "log_file"); err != nil { return err } var ( host = config["host"] ) errorLog, err := getLogger(config["log_file"]) client, err := elastic.NewClient(elastic.SetErrorLog(errorLog), elastic.SetURL(host)) if err != nil { panic(err) } info, code, err := client.Ping(host).Do(ctx) if err != nil { return err } esVersion, err := client.ElasticsearchVersion(host) if err != nil { return err } util.Infof("[Elasticsearch] version %s, returned with code %d and version %s ", esVersion, code, info.Version.Number) elasticClient = client elasticHost = host return nil } func ping() { ctx := context.Background() info, code, err := elasticClient.Ping(elasticHost).Do(ctx) if err != nil { initElastic() return } fmt.Printf("Elasticsearch Ping returned with code %d and version %s\n", code, info.Version.Number) } func getLogger(logFile string) (*log.Logger, error) { file, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666) if err != nil { return nil, err } return log.New(file, "report", log.LstdFlags), nil } func getElasticClient() *elastic.Client { ping() return elasticClient } func CreateIndex(index, mapping string) { ctx := context.Background() client := getElasticClient() exists := IndexExists(index) if !exists { // Create a new index. createIndex, err := client.CreateIndex(index).BodyString(mapping).Do(ctx) if err != nil { // Handle error panic(err) } if !createIndex.Acknowledged { // Not acknowledged } fmt.Printf("CreateIndex Index %s , mapping %s, createIndex:%+v \n", index, mapping, createIndex) } } func IndexExists(index string) bool { ctx := context.Background() client := getElasticClient() exists, err := client.IndexExists(index).Do(ctx) if err != nil { panic(err) } return exists } func PutBodyJson(index, indexType, id string, bodyJson interface{}) bool { ctx := context.Background() client := getElasticClient() exists := IndexExists(index) if exists { put, err := client.Index(). Index(index). Type(indexType). Id(id). BodyJson(bodyJson). Do(ctx) if err != nil { // Handle error panic(err) } fmt.Printf("PutBodyJson Index put %+v id:%s to index:%s, type %s , put %+v \n", bodyJson, put.Id, put.Index, put.Type, put) return put.Status == 0 } else { return false } } func PutBodyString(index, indexType, id, BodyString string) bool { ctx := context.Background() client := getElasticClient() exists := IndexExists(index) if exists { put, err := client.Index(). Index(index). Type(indexType). Id(id). BodyString(BodyString). Do(ctx) if err != nil { // Handle error panic(err) } fmt.Printf("PutBodyString Index put %+v id:%s to index:%s, type %s , put %+v \n", BodyString, put.Id, put.Index, put.Type, put) return put.Status == 0 } else { return false } } func DeleteIndex(index string) bool { ctx := context.Background() client := getElasticClient() exists := IndexExists(index) if exists { deleteIndex, err := client.DeleteIndex(index).Do(ctx) if err != nil { panic(err) } if !deleteIndex.Acknowledged { } fmt.Printf("DeleteIndex Index index:%s, deleteIndex: %+v \n", index, deleteIndex) return deleteIndex.Acknowledged } else { return false } } func Flush(index string) bool { ctx := context.Background() client := getElasticClient() exists := IndexExists(index) if exists { // Flush to make sure the documents got written. _, err := client.Flush().Index(index).Do(ctx) if err != nil { panic(err) } return true } else { return false } } func GetResult(index, indexType, id string) (*elastic.GetResult, error) { ctx := context.Background() client := getElasticClient() exists := IndexExists(index) if exists { get, err := client.Get(). Index(index). Type(indexType). Id(id). Do(ctx) if err != nil { // Handle error panic(err) } if get.Found { fmt.Printf("Got document %s in version %d from index %s, type %s\n", get.Id, get.Version, get.Index, get.Type) } return get, err } else { return nil, fmt.Errorf("index not found") } }