elastic.go 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. package elastic
  2. import (
  3. "context"
  4. "fmt"
  5. "os"
  6. "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
  7. "git.shuncheng.lu/bigthing/gocommon/pkg/conf"
  8. "log"
  9. "github.com/olivere/elastic"
  10. )
  11. var (
  12. elasticClient *elastic.Client
  13. elasticHost string
  14. )
  15. func Init() error {
  16. return initElastic()
  17. }
  18. func initElastic() error {
  19. ctx := context.Background()
  20. config := make(map[string]string, 0)
  21. if err := conf.SetAndAssertNil(config, "elastic", "host"); err != nil {
  22. return err
  23. }
  24. if err := conf.SetAndAssertNil(config, "elastic", "log_file"); err != nil {
  25. return err
  26. }
  27. var (
  28. host = config["host"]
  29. )
  30. errorLog, err := getLogger(config["log_file"])
  31. client, err := elastic.NewClient(elastic.SetErrorLog(errorLog), elastic.SetURL(host))
  32. if err != nil {
  33. panic(err)
  34. }
  35. info, code, err := client.Ping(host).Do(ctx)
  36. if err != nil {
  37. return err
  38. }
  39. esVersion, err := client.ElasticsearchVersion(host)
  40. if err != nil {
  41. return err
  42. }
  43. util.Infof("[Elasticsearch] version %s, returned with code %d and version %s ", esVersion, code, info.Version.Number)
  44. elasticClient = client
  45. elasticHost = host
  46. return nil
  47. }
  48. func ping() {
  49. ctx := context.Background()
  50. info, code, err := elasticClient.Ping(elasticHost).Do(ctx)
  51. if err != nil {
  52. initElastic()
  53. return
  54. }
  55. fmt.Printf("Elasticsearch Ping returned with code %d and version %s\n", code, info.Version.Number)
  56. }
  57. func getLogger(logFile string) (*log.Logger, error) {
  58. file, err := os.OpenFile(logFile, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0666)
  59. if err != nil {
  60. return nil, err
  61. }
  62. return log.New(file, "report", log.LstdFlags), nil
  63. }
  64. func getElasticClient() *elastic.Client {
  65. ping()
  66. return elasticClient
  67. }
  68. func CreateIndex(index, mapping string) {
  69. ctx := context.Background()
  70. client := getElasticClient()
  71. exists := IndexExists(index)
  72. if !exists {
  73. // Create a new index.
  74. createIndex, err := client.CreateIndex(index).BodyString(mapping).Do(ctx)
  75. if err != nil {
  76. // Handle error
  77. panic(err)
  78. }
  79. if !createIndex.Acknowledged {
  80. // Not acknowledged
  81. }
  82. fmt.Printf("CreateIndex Index %s , mapping %s, createIndex:%+v \n", index, mapping, createIndex)
  83. }
  84. }
  85. func IndexExists(index string) bool {
  86. ctx := context.Background()
  87. client := getElasticClient()
  88. exists, err := client.IndexExists(index).Do(ctx)
  89. if err != nil {
  90. panic(err)
  91. }
  92. return exists
  93. }
  94. func PutBodyJson(index, indexType, id string, bodyJson interface{}) bool {
  95. ctx := context.Background()
  96. client := getElasticClient()
  97. exists := IndexExists(index)
  98. if exists {
  99. put, err := client.Index().
  100. Index(index).
  101. Type(indexType).
  102. Id(id).
  103. BodyJson(bodyJson).
  104. Do(ctx)
  105. if err != nil {
  106. // Handle error
  107. panic(err)
  108. }
  109. fmt.Printf("PutBodyJson Index put %+v id:%s to index:%s, type %s , put %+v \n", bodyJson, put.Id, put.Index, put.Type, put)
  110. return put.Status == 0
  111. } else {
  112. return false
  113. }
  114. }
  115. func PutBodyString(index, indexType, id, BodyString string) bool {
  116. ctx := context.Background()
  117. client := getElasticClient()
  118. exists := IndexExists(index)
  119. if exists {
  120. put, err := client.Index().
  121. Index(index).
  122. Type(indexType).
  123. Id(id).
  124. BodyString(BodyString).
  125. Do(ctx)
  126. if err != nil {
  127. // Handle error
  128. panic(err)
  129. }
  130. fmt.Printf("PutBodyString Index put %+v id:%s to index:%s, type %s , put %+v \n", BodyString, put.Id, put.Index, put.Type, put)
  131. return put.Status == 0
  132. } else {
  133. return false
  134. }
  135. }
  136. func DeleteIndex(index string) bool {
  137. ctx := context.Background()
  138. client := getElasticClient()
  139. exists := IndexExists(index)
  140. if exists {
  141. deleteIndex, err := client.DeleteIndex(index).Do(ctx)
  142. if err != nil {
  143. panic(err)
  144. }
  145. if !deleteIndex.Acknowledged {
  146. }
  147. fmt.Printf("DeleteIndex Index index:%s, deleteIndex: %+v \n", index, deleteIndex)
  148. return deleteIndex.Acknowledged
  149. } else {
  150. return false
  151. }
  152. }
  153. func Flush(index string) bool {
  154. ctx := context.Background()
  155. client := getElasticClient()
  156. exists := IndexExists(index)
  157. if exists {
  158. // Flush to make sure the documents got written.
  159. _, err := client.Flush().Index(index).Do(ctx)
  160. if err != nil {
  161. panic(err)
  162. }
  163. return true
  164. } else {
  165. return false
  166. }
  167. }
  168. func GetResult(index, indexType, id string) (*elastic.GetResult, error) {
  169. ctx := context.Background()
  170. client := getElasticClient()
  171. exists := IndexExists(index)
  172. if exists {
  173. get, err := client.Get().
  174. Index(index).
  175. Type(indexType).
  176. Id(id).
  177. Do(ctx)
  178. if err != nil {
  179. // Handle error
  180. panic(err)
  181. }
  182. if get.Found {
  183. fmt.Printf("Got document %s in version %d from index %s, type %s\n", get.Id, get.Version, get.Index, get.Type)
  184. }
  185. return get, err
  186. } else {
  187. return nil, fmt.Errorf("index not found")
  188. }
  189. }