agollo.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643
  1. package agollo
  2. import (
  3. "encoding/json"
  4. "io/ioutil"
  5. "net/http"
  6. "os"
  7. "sync"
  8. "time"
  9. )
  10. var (
  11. localIP = getLocalIP()
  12. defaultConfigFilePath = "app.properties"
  13. defaultCluster = "default"
  14. defaultNamespace = "application"
  15. defaultConfigType = "properties"
  16. defaultBackupFile = ".agollo"
  17. defaultClientTimeout = 90 * time.Second
  18. defaultNotificationID = -1
  19. defaultLongPollInterval = 1 * time.Second
  20. defaultAutoFetchOnCacheMiss = false
  21. defaultFailTolerantOnBackupExists = false
  22. defaultWatchTimeout = 500 * time.Millisecond
  23. watchLock sync.RWMutex
  24. timeZone = time.FixedZone("CST", 8*3600)
  25. defaultAgollo Agollo
  26. )
  27. type Agollo interface {
  28. Start() <-chan *LongPollerError
  29. Stop()
  30. Reload() error
  31. Get(key string, opts ...GetOption) string
  32. GetNameSpace(namespace string) Configurations
  33. GetAll() map[string]interface{}
  34. Watch() <-chan *ApolloResponse
  35. WatchNamespace(namespace string, stop chan bool) <-chan *ApolloResponse
  36. Options() Options
  37. }
  38. type Configurations map[string]interface{}
  39. type ApolloResponse struct {
  40. Namespace string
  41. OldValue Configurations
  42. NewValue Configurations
  43. Error error
  44. }
  45. type LongPollerError struct {
  46. ConfigServerURL string
  47. AppID string
  48. Cluster string
  49. Notifications []Notification
  50. Namespace string // 服务响应200后去非缓存接口拉取时的namespace
  51. Err error
  52. }
  53. type agollo struct {
  54. opts Options
  55. notificationMapLock sync.Mutex
  56. notificationMap map[string]int // key: namespace value: notificationId
  57. namespaceMapLock sync.Mutex
  58. namespaceMap map[string]string // key: namespace value: releaseKey
  59. cacheLock sync.Mutex
  60. cache map[string]interface{} // key: namespace value: Configurations
  61. watchCh chan *ApolloResponse // watch all namespace
  62. watchNamespaceChMap map[string]chan *ApolloResponse // key: namespace value: chan *ApolloResponse
  63. errorsCh chan *LongPollerError
  64. runOnce sync.Once
  65. stop bool
  66. stopCh chan struct{}
  67. stopLock sync.Mutex
  68. }
  69. func New(configServerURL, appID string, opts ...Option) (Agollo, error) {
  70. a := &agollo{
  71. stopCh: make(chan struct{}),
  72. errorsCh: make(chan *LongPollerError),
  73. opts: newOptions(opts...),
  74. }
  75. a.opts.ConfigServerURL = normalizeURL(configServerURL)
  76. a.opts.AppID = appID
  77. return a.preload()
  78. }
  79. func NewWithConfigFile(configFilePath string, opts ...Option) (Agollo, error) {
  80. f, err := os.Open(configFilePath)
  81. if err != nil {
  82. return nil, err
  83. }
  84. defer f.Close()
  85. var conf struct {
  86. AppID string `json:"appId,omitempty"`
  87. Cluster string `json:"cluster,omitempty"`
  88. IP string `json:"ip,omitempty"`
  89. }
  90. if err := json.NewDecoder(f).Decode(&conf); err != nil {
  91. return nil, err
  92. }
  93. return New(
  94. conf.IP,
  95. conf.AppID,
  96. append(
  97. []Option{
  98. Cluster(conf.Cluster),
  99. },
  100. opts...,
  101. )...,
  102. )
  103. }
  104. func (a *agollo) preload() (Agollo, error) {
  105. for _, namespace := range a.opts.PreloadNamespaces {
  106. _, err := a.loadConfigFromNonCache(namespace)
  107. if err != nil {
  108. if a.opts.FailTolerantOnBackupExists {
  109. _, err = a.loadBackup(namespace)
  110. if err != nil {
  111. return nil, err
  112. }
  113. continue
  114. }
  115. return nil, err
  116. }
  117. }
  118. return a, nil
  119. }
  120. func (a *agollo) Reload() error {
  121. err := a.loadAllConfigFromNonCacheAndThenCache()
  122. if err == nil {
  123. return nil
  124. }
  125. //a.log("ReloadAllNamespace", "From api", "Error", err.Error())
  126. err = a.loadAllFromBackup()
  127. if err != nil {
  128. return err
  129. }
  130. return nil
  131. }
  132. func (a *agollo) GetAll() map[string]interface{} {
  133. return a.cache
  134. }
  135. func (a *agollo) Get(key string, opts ...GetOption) string {
  136. getOpts := newGetOptions(
  137. append(
  138. []GetOption{
  139. WithNamespace(a.opts.DefaultNamespace),
  140. },
  141. opts...,
  142. )...,
  143. )
  144. val, found := a.GetNameSpace(getOpts.Namespace)[key]
  145. if !found {
  146. return getOpts.DefaultValue
  147. }
  148. v, _ := ToStringE(val)
  149. return v
  150. }
  151. func (a *agollo) GetNameSpace(namespace string) Configurations {
  152. if len(a.cache) == 0 {
  153. a.cache = make(map[string]interface{})
  154. }
  155. //LoadOrStore 函数的意义就是当前没有关系的时候 绑定关系 有的话返回关系
  156. configs, val := a.cache[namespace]
  157. if val == false {
  158. a.cache[namespace] = Configurations{}
  159. configs = Configurations{}
  160. return a.loadNameSpace(namespace)
  161. } else {
  162. configs = a.cache[namespace]
  163. }
  164. //a.log("Namesapce", namespace, "From", "cache")
  165. return configs.(Configurations)
  166. }
  167. func (a *agollo) loadNameSpace(namespace string) Configurations {
  168. // 存储不存在的namespace, 之后在longPoller中拉取配置
  169. a.notificationMapLock.Lock()
  170. defer a.notificationMapLock.Unlock()
  171. if len(a.notificationMap) == 0 {
  172. a.notificationMap = make(map[string]int)
  173. }
  174. _, val := a.notificationMap[namespace]
  175. if val == false {
  176. a.notificationMap[namespace] = defaultNotificationID
  177. }
  178. if a.opts.AutoFetchOnCacheMiss {
  179. configs, err := a.loadConfigFromCache(namespace)
  180. if err == nil {
  181. //a.log("Namesapce", namespace, "From", "cache-api")
  182. return configs
  183. }
  184. }
  185. if a.opts.FailTolerantOnBackupExists {
  186. configs, err := a.loadBackup(namespace)
  187. if err == nil {
  188. a.log("Namesapce", namespace, "From", "local")
  189. return configs
  190. }
  191. }
  192. return Configurations{}
  193. }
  194. func (a *agollo) Options() Options {
  195. return a.opts
  196. }
  197. func (a *agollo) Watch() <-chan *ApolloResponse {
  198. watchLock.Lock()
  199. defer watchLock.Unlock()
  200. if a.watchCh == nil {
  201. a.watchCh = make(chan *ApolloResponse)
  202. }
  203. return a.watchCh
  204. }
  205. func (a *agollo) WatchNamespace(namespace string, stop chan bool) <-chan *ApolloResponse {
  206. watchCh, val := a.watchNamespaceChMap[namespace]
  207. if val == false {
  208. go func(stop chan bool) {
  209. select {
  210. case <-stop:
  211. delete(a.watchNamespaceChMap, namespace)
  212. //a.watchNamespaceChMap.Delete(namespace)
  213. }
  214. }(stop)
  215. watchCh = make(chan *ApolloResponse)
  216. }
  217. return watchCh
  218. }
  219. func (a *agollo) sendWatchCh(namespace string, oldVal, newVal Configurations) {
  220. resp := &ApolloResponse{
  221. Namespace: namespace,
  222. OldValue: oldVal,
  223. NewValue: newVal,
  224. }
  225. timer := time.NewTimer(defaultWatchTimeout)
  226. for _, watchCh := range a.getWatchChs(namespace) {
  227. select {
  228. case watchCh <- resp:
  229. case <-timer.C: // 防止创建全局监听或者某个namespace监听却不消费死锁问题
  230. timer.Reset(defaultWatchTimeout)
  231. }
  232. }
  233. }
  234. func (a *agollo) getWatchChs(namespace string) []chan *ApolloResponse {
  235. var chs []chan *ApolloResponse
  236. watchLock.Lock()
  237. defer watchLock.Unlock()
  238. if a.watchCh != nil {
  239. chs = append(chs, a.watchCh)
  240. }
  241. if watchNamespaceCh, found := a.watchNamespaceChMap[namespace]; found {
  242. chs = append(chs, watchNamespaceCh)
  243. }
  244. return chs
  245. }
  246. func (a *agollo) sendErrorsCh(notifications []Notification, namespace string, err error) {
  247. longPollerError := &LongPollerError{
  248. ConfigServerURL: a.opts.ConfigServerURL,
  249. AppID: a.opts.AppID,
  250. Cluster: a.opts.Cluster,
  251. Notifications: notifications,
  252. Namespace: namespace,
  253. Err: err,
  254. }
  255. select {
  256. case a.errorsCh <- longPollerError:
  257. default:
  258. }
  259. }
  260. func (a *agollo) log(kvs ...interface{}) {
  261. timeStr := time.Now().In(timeZone).Format("2006-01-02 15:04:05")
  262. a.opts.Logger.Log(
  263. append([]interface{}{
  264. "[" + timeStr + "]",
  265. "[Agollo]",
  266. "ConfigServerUrl", a.opts.ConfigServerURL,
  267. "AppID", a.opts.AppID,
  268. "Cluster", a.opts.Cluster,
  269. },
  270. kvs...,
  271. )...,
  272. )
  273. }
  274. func (a *agollo) loadConfigFromCache(namespace string) (configurations Configurations, err error) {
  275. configurations, err = a.opts.ApolloClient.GetConfigsFromCache(
  276. a.opts.ConfigServerURL,
  277. a.opts.AppID,
  278. a.opts.Cluster,
  279. namespace)
  280. if err != nil {
  281. a.log("Namespace", namespace, "Action", "LoadConfigFromCache", "Error", err.Error())
  282. return
  283. }
  284. err = a.handleConfig(namespace, configurations)
  285. return
  286. }
  287. func (a *agollo) loadConfigFromNonCache(namespace string) (configurations Configurations, err error) {
  288. var (
  289. status int
  290. config *Config
  291. //cachedReleaseKey, _ = a.namespaceMap.LoadOrStore(namespace, "")
  292. )
  293. a.namespaceMapLock.Lock()
  294. defer a.namespaceMapLock.Unlock()
  295. if len(a.namespaceMap) == 0 {
  296. a.namespaceMap = make(map[string]string)
  297. }
  298. cachedReleaseKey, val := a.namespaceMap[namespace]
  299. if val == false {
  300. a.namespaceMap[namespace] = ""
  301. cachedReleaseKey = ""
  302. } else {
  303. cachedReleaseKey = a.namespaceMap[namespace]
  304. }
  305. status, config, err = a.opts.ApolloClient.GetConfigsFromNonCache(
  306. a.opts.ConfigServerURL,
  307. a.opts.AppID,
  308. a.opts.Cluster,
  309. namespace,
  310. ReleaseKey(cachedReleaseKey),
  311. )
  312. if err != nil {
  313. a.log("Namespace", namespace, "Action", "LoadConfigFromNonCache", "Error", err.Error())
  314. return
  315. }
  316. if status == http.StatusOK {
  317. configurations = config.Configurations
  318. a.namespaceMap[namespace] = config.ReleaseKey
  319. err = a.handleConfig(namespace, config.Configurations)
  320. return
  321. }
  322. return
  323. }
  324. func (a *agollo) loadAllConfigFromNonCacheAndThenCache() (err error) {
  325. var (
  326. status int
  327. config *Config
  328. //cachedReleaseKey, _ = a.namespaceMap.LoadOrStore(namespace, "")
  329. )
  330. a.namespaceMapLock.Lock()
  331. defer a.namespaceMapLock.Unlock()
  332. if len(a.cache) == 0 {
  333. a.cache = make(map[string]interface{})
  334. }
  335. for _, namespace := range a.opts.PreloadNamespaces {
  336. if len(a.namespaceMap) == 0 {
  337. a.namespaceMap = make(map[string]string)
  338. }
  339. cachedReleaseKey, val := a.namespaceMap[namespace]
  340. if val == false {
  341. a.namespaceMap[namespace] = ""
  342. cachedReleaseKey = ""
  343. } else {
  344. cachedReleaseKey = a.namespaceMap[namespace]
  345. }
  346. status, config, err = a.opts.ApolloClient.GetConfigsFromNonCache(
  347. a.opts.ConfigServerURL,
  348. a.opts.AppID,
  349. a.opts.Cluster,
  350. namespace,
  351. ReleaseKey(cachedReleaseKey),
  352. )
  353. if err != nil {
  354. a.log("Namespace", namespace, "Action", "LoadConfigFromNonCache", "Error", err.Error())
  355. return err
  356. }
  357. if status == http.StatusOK {
  358. configurations := config.Configurations
  359. a.namespaceMap[namespace] = config.ReleaseKey
  360. a.cacheLock.Lock()
  361. a.cache[namespace] = configurations
  362. a.cacheLock.Unlock()
  363. }
  364. }
  365. //a.log("ReloadAllNamespace", "From api")
  366. return a.backup()
  367. }
  368. func (a *agollo) handleConfig(namespace string, configurations Configurations) error {
  369. // 读取旧缓存用来给监听队列
  370. oldValue := a.GetNameSpace(namespace)
  371. // 覆盖旧缓存
  372. a.cacheLock.Lock()
  373. a.cache[namespace] = configurations
  374. a.cacheLock.Unlock()
  375. // 发送到监听channel
  376. a.sendWatchCh(namespace, oldValue, configurations)
  377. // 备份配置
  378. return a.backup()
  379. }
  380. func (a *agollo) backup() error {
  381. backup := map[string]Configurations{}
  382. a.cacheLock.Lock()
  383. defer a.cacheLock.Unlock()
  384. for key, val := range a.cache {
  385. conf, _ := val.(Configurations)
  386. backup[key] = conf
  387. }
  388. data, err := json.Marshal(backup)
  389. if err != nil {
  390. return err
  391. }
  392. return ioutil.WriteFile(a.opts.BackupFile, data, 0644)
  393. }
  394. func (a *agollo) loadAllFromBackup() error {
  395. if len(a.cache) == 0 {
  396. a.cache = make(map[string]interface{})
  397. }
  398. if _, err := os.Stat(a.opts.BackupFile); err != nil {
  399. return err
  400. }
  401. data, err := ioutil.ReadFile(a.opts.BackupFile)
  402. if err != nil {
  403. return err
  404. }
  405. backup := map[string]Configurations{}
  406. err = json.Unmarshal(data, &backup)
  407. if err != nil {
  408. return err
  409. }
  410. a.cacheLock.Lock()
  411. defer a.cacheLock.Unlock()
  412. for _, specifyNamespace := range a.opts.PreloadNamespaces {
  413. for namespace, configs := range backup {
  414. if namespace == specifyNamespace {
  415. a.cache[namespace] = configs
  416. }
  417. }
  418. }
  419. a.log("ReloadAllNamesapce", "From", "Backup")
  420. return nil
  421. }
  422. func (a *agollo) loadBackup(specifyNamespace string) (Configurations, error) {
  423. if _, err := os.Stat(a.opts.BackupFile); err != nil {
  424. return nil, nil
  425. }
  426. data, err := ioutil.ReadFile(a.opts.BackupFile)
  427. if err != nil {
  428. return nil, err
  429. }
  430. backup := map[string]Configurations{}
  431. err = json.Unmarshal(data, &backup)
  432. if err != nil {
  433. return nil, err
  434. }
  435. for namespace, configs := range backup {
  436. if namespace == specifyNamespace {
  437. if len(a.cache) == 0 {
  438. a.cache = make(map[string]interface{})
  439. }
  440. a.cache[namespace] = configs
  441. return configs, nil
  442. }
  443. }
  444. return nil, nil
  445. }
  446. func (a *agollo) longPoll() {
  447. notifications := a.notifications()
  448. status, notifications, err := a.opts.ApolloClient.Notifications(
  449. a.opts.ConfigServerURL,
  450. a.opts.AppID,
  451. a.opts.Cluster,
  452. notifications,
  453. )
  454. if err != nil {
  455. a.log("Notifications", Notifications(a.notifications()).String(),
  456. "Error", err.Error(), "Action", "LongPoll")
  457. a.sendErrorsCh(notifications, "", err)
  458. }
  459. if status == http.StatusOK {
  460. // 服务端判断没有改变,不会返回结果,这个时候不需要修改,遍历空数组跳过
  461. for _, notification := range notifications {
  462. _, err = a.loadConfigFromNonCache(notification.NamespaceName)
  463. if err == nil {
  464. a.notificationMapLock.Lock()
  465. a.notificationMap[notification.NamespaceName] = notification.NotificationID
  466. a.notificationMapLock.Unlock()
  467. continue
  468. } else {
  469. a.sendErrorsCh(notifications, notification.NamespaceName, err)
  470. }
  471. }
  472. }
  473. }
  474. func (a *agollo) notifications() []Notification {
  475. var notifications []Notification
  476. a.notificationMapLock.Lock()
  477. for key, val := range a.notificationMap {
  478. notifications = append(notifications, Notification{
  479. NamespaceName: key,
  480. NotificationID: val,
  481. })
  482. }
  483. a.notificationMapLock.Unlock()
  484. return notifications
  485. }
  486. // 启动goroutine去轮训apollo通知接口
  487. func (a *agollo) Start() <-chan *LongPollerError {
  488. a.runOnce.Do(func() {
  489. go func() {
  490. timer := time.NewTimer(a.opts.LongPollerInterval)
  491. defer timer.Stop()
  492. for !a.shouldStop() {
  493. select {
  494. case <-timer.C:
  495. a.longPoll()
  496. timer.Reset(a.opts.LongPollerInterval)
  497. case <-a.stopCh:
  498. return
  499. }
  500. }
  501. }()
  502. })
  503. return a.errorsCh
  504. }
  505. func (a *agollo) Stop() {
  506. a.stopLock.Lock()
  507. defer a.stopLock.Unlock()
  508. if a.stop {
  509. return
  510. }
  511. a.stop = true
  512. close(a.stopCh)
  513. }
  514. func (a *agollo) shouldStop() bool {
  515. select {
  516. case <-a.stopCh:
  517. return true
  518. default:
  519. return false
  520. }
  521. }
  522. func Init(configServerURL, appID string, opts ...Option) (err error) {
  523. defaultAgollo, err = New(configServerURL, appID, opts...)
  524. return
  525. }
  526. func InitWithConfigFile(configFilePath string, opts ...Option) (err error) {
  527. defaultAgollo, err = NewWithConfigFile(configFilePath, opts...)
  528. return
  529. }
  530. func InitWithDefaultConfigFile(opts ...Option) error {
  531. return InitWithConfigFile(defaultConfigFilePath, opts...)
  532. }
  533. func Start() <-chan *LongPollerError {
  534. return defaultAgollo.Start()
  535. }
  536. func Stop() {
  537. defaultAgollo.Stop()
  538. }
  539. func Reload() error {
  540. return defaultAgollo.Reload()
  541. }
  542. func GetAll() map[string]interface{} {
  543. return defaultAgollo.GetAll()
  544. }
  545. func Get(key string, opts ...GetOption) string {
  546. return defaultAgollo.Get(key, opts...)
  547. }
  548. func GetNameSpace(namespace string) Configurations {
  549. return defaultAgollo.GetNameSpace(namespace)
  550. }
  551. func Watch() <-chan *ApolloResponse {
  552. return defaultAgollo.Watch()
  553. }
  554. func WatchNamespace(namespace string, stop chan bool) <-chan *ApolloResponse {
  555. return defaultAgollo.WatchNamespace(namespace, stop)
  556. }
  557. func GetAgollo() Agollo {
  558. return defaultAgollo
  559. }