| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208 |
- package elastic
- import (
- "context"
- "fmt"
- "os"
- "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
- "git.shuncheng.lu/bigthing/gocommon/pkg/conf"
- "log"
- "github.com/olivere/elastic"
- )
- var (
- elasticClient *elastic.Client
- elasticHost string
- )
- func Init() error {
- return initElastic()
- }
- func initElastic() error {
- ctx := context.Background()
- config := make(map[string]string, 0)
- if err := conf.SetAndAssertNil(config, "elastic", "host"); err != nil {
- return err
- }
- if err := conf.SetAndAssertNil(config, "elastic", "log_file"); err != nil {
- return err
- }
- var (
- host = config["host"]
- )
- errorLog, err := getLogger(config["log_file"])
- client, err := elastic.NewClient(elastic.SetErrorLog(errorLog), elastic.SetURL(host))
- if err != nil {
- panic(err)
- }
- info, code, err := client.Ping(host).Do(ctx)
- if err != nil {
- return err
- }
- esVersion, err := client.ElasticsearchVersion(host)
- if err != nil {
- return err
- }
- util.Infof("[Elasticsearch] version %s, returned with code %d and version %s ", esVersion, code, info.Version.Number)
- elasticClient = client
- elasticHost = host
- return nil
- }
- func ping() {
- ctx := context.Background()
- info, code, err := elasticClient.Ping(elasticHost).Do(ctx)
- if err != nil {
- initElastic()
- return
- }
- fmt.Printf("Elasticsearch Ping returned with code %d and version %s\n", code, info.Version.Number)
- }
- func getLogger(logFile string) (*log.Logger, error) {
- file, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
- if err != nil {
- return nil, err
- }
- return log.New(file, "report", log.LstdFlags), nil
- }
- func getElasticClient() *elastic.Client {
- ping()
- return elasticClient
- }
- func CreateIndex(index, mapping string) {
- ctx := context.Background()
- client := getElasticClient()
- exists := IndexExists(index)
- if !exists {
- // Create a new index.
- createIndex, err := client.CreateIndex(index).BodyString(mapping).Do(ctx)
- if err != nil {
- // Handle error
- panic(err)
- }
- if !createIndex.Acknowledged {
- // Not acknowledged
- }
- fmt.Printf("CreateIndex Index %s , mapping %s, createIndex:%+v \n", index, mapping, createIndex)
- }
- }
- func IndexExists(index string) bool {
- ctx := context.Background()
- client := getElasticClient()
- exists, err := client.IndexExists(index).Do(ctx)
- if err != nil {
- panic(err)
- }
- return exists
- }
- func PutBodyJson(index, indexType, id string, bodyJson interface{}) bool {
- ctx := context.Background()
- client := getElasticClient()
- exists := IndexExists(index)
- if exists {
- put, err := client.Index().
- Index(index).
- Type(indexType).
- Id(id).
- BodyJson(bodyJson).
- Do(ctx)
- if err != nil {
- // Handle error
- panic(err)
- }
- fmt.Printf("PutBodyJson Index put %+v id:%s to index:%s, type %s , put %+v \n", bodyJson, put.Id, put.Index, put.Type, put)
- return put.Status == 0
- } else {
- return false
- }
- }
- func PutBodyString(index, indexType, id, BodyString string) bool {
- ctx := context.Background()
- client := getElasticClient()
- exists := IndexExists(index)
- if exists {
- put, err := client.Index().
- Index(index).
- Type(indexType).
- Id(id).
- BodyString(BodyString).
- Do(ctx)
- if err != nil {
- // Handle error
- panic(err)
- }
- fmt.Printf("PutBodyString Index put %+v id:%s to index:%s, type %s , put %+v \n", BodyString, put.Id, put.Index, put.Type, put)
- return put.Status == 0
- } else {
- return false
- }
- }
- func DeleteIndex(index string) bool {
- ctx := context.Background()
- client := getElasticClient()
- exists := IndexExists(index)
- if exists {
- deleteIndex, err := client.DeleteIndex(index).Do(ctx)
- if err != nil {
- panic(err)
- }
- if !deleteIndex.Acknowledged {
- }
- fmt.Printf("DeleteIndex Index index:%s, deleteIndex: %+v \n", index, deleteIndex)
- return deleteIndex.Acknowledged
- } else {
- return false
- }
- }
- func Flush(index string) bool {
- ctx := context.Background()
- client := getElasticClient()
- exists := IndexExists(index)
- if exists {
- // Flush to make sure the documents got written.
- _, err := client.Flush().Index(index).Do(ctx)
- if err != nil {
- panic(err)
- }
- return true
- } else {
- return false
- }
- }
- func GetResult(index, indexType, id string) (*elastic.GetResult, error) {
- ctx := context.Background()
- client := getElasticClient()
- exists := IndexExists(index)
- if exists {
- get, err := client.Get().
- Index(index).
- Type(indexType).
- Id(id).
- Do(ctx)
- if err != nil {
- // Handle error
- panic(err)
- }
- if get.Found {
- fmt.Printf("Got document %s in version %d from index %s, type %s\n", get.Id, get.Version, get.Index, get.Type)
- }
- return get, err
- } else {
- return nil, fmt.Errorf("index not found")
- }
- }
|