| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643 |
- package agollo
- import (
- "encoding/json"
- "io/ioutil"
- "net/http"
- "os"
- "sync"
- "time"
- )
- var (
- localIP = getLocalIP()
- defaultConfigFilePath = "app.properties"
- defaultCluster = "default"
- defaultNamespace = "application"
- defaultConfigType = "properties"
- defaultBackupFile = ".agollo"
- defaultClientTimeout = 90 * time.Second
- defaultNotificationID = -1
- defaultLongPollInterval = 1 * time.Second
- defaultAutoFetchOnCacheMiss = false
- defaultFailTolerantOnBackupExists = false
- defaultWatchTimeout = 500 * time.Millisecond
- watchLock sync.RWMutex
- timeZone = time.FixedZone("CST", 8*3600)
- defaultAgollo Agollo
- )
- type Agollo interface {
- Start() <-chan *LongPollerError
- Stop()
- Reload() error
- Get(key string, opts ...GetOption) string
- GetNameSpace(namespace string) Configurations
- GetAll() map[string]interface{}
- Watch() <-chan *ApolloResponse
- WatchNamespace(namespace string, stop chan bool) <-chan *ApolloResponse
- Options() Options
- }
- type Configurations map[string]interface{}
- type ApolloResponse struct {
- Namespace string
- OldValue Configurations
- NewValue Configurations
- Error error
- }
- type LongPollerError struct {
- ConfigServerURL string
- AppID string
- Cluster string
- Notifications []Notification
- Namespace string // 服务响应200后去非缓存接口拉取时的namespace
- Err error
- }
- type agollo struct {
- opts Options
- notificationMapLock sync.Mutex
- notificationMap map[string]int // key: namespace value: notificationId
- namespaceMapLock sync.Mutex
- namespaceMap map[string]string // key: namespace value: releaseKey
- cacheLock sync.Mutex
- cache map[string]interface{} // key: namespace value: Configurations
- watchCh chan *ApolloResponse // watch all namespace
- watchNamespaceChMap map[string]chan *ApolloResponse // key: namespace value: chan *ApolloResponse
- errorsCh chan *LongPollerError
- runOnce sync.Once
- stop bool
- stopCh chan struct{}
- stopLock sync.Mutex
- }
- func New(configServerURL, appID string, opts ...Option) (Agollo, error) {
- a := &agollo{
- stopCh: make(chan struct{}),
- errorsCh: make(chan *LongPollerError),
- opts: newOptions(opts...),
- }
- a.opts.ConfigServerURL = normalizeURL(configServerURL)
- a.opts.AppID = appID
- return a.preload()
- }
- func NewWithConfigFile(configFilePath string, opts ...Option) (Agollo, error) {
- f, err := os.Open(configFilePath)
- if err != nil {
- return nil, err
- }
- defer f.Close()
- var conf struct {
- AppID string `json:"appId,omitempty"`
- Cluster string `json:"cluster,omitempty"`
- IP string `json:"ip,omitempty"`
- }
- if err := json.NewDecoder(f).Decode(&conf); err != nil {
- return nil, err
- }
- return New(
- conf.IP,
- conf.AppID,
- append(
- []Option{
- Cluster(conf.Cluster),
- },
- opts...,
- )...,
- )
- }
- func (a *agollo) preload() (Agollo, error) {
- for _, namespace := range a.opts.PreloadNamespaces {
- _, err := a.loadConfigFromNonCache(namespace)
- if err != nil {
- if a.opts.FailTolerantOnBackupExists {
- _, err = a.loadBackup(namespace)
- if err != nil {
- return nil, err
- }
- continue
- }
- return nil, err
- }
- }
- return a, nil
- }
- func (a *agollo) Reload() error {
- err := a.loadAllConfigFromNonCacheAndThenCache()
- if err == nil {
- return nil
- }
- //a.log("ReloadAllNamespace", "From api", "Error", err.Error())
- err = a.loadAllFromBackup()
- if err != nil {
- return err
- }
- return nil
- }
- func (a *agollo) GetAll() map[string]interface{} {
- return a.cache
- }
- func (a *agollo) Get(key string, opts ...GetOption) string {
- getOpts := newGetOptions(
- append(
- []GetOption{
- WithNamespace(a.opts.DefaultNamespace),
- },
- opts...,
- )...,
- )
- val, found := a.GetNameSpace(getOpts.Namespace)[key]
- if !found {
- return getOpts.DefaultValue
- }
- v, _ := ToStringE(val)
- return v
- }
- func (a *agollo) GetNameSpace(namespace string) Configurations {
- if len(a.cache) == 0 {
- a.cache = make(map[string]interface{})
- }
- //LoadOrStore 函数的意义就是当前没有关系的时候 绑定关系 有的话返回关系
- configs, val := a.cache[namespace]
- if val == false {
- a.cache[namespace] = Configurations{}
- configs = Configurations{}
- return a.loadNameSpace(namespace)
- } else {
- configs = a.cache[namespace]
- }
- //a.log("Namesapce", namespace, "From", "cache")
- return configs.(Configurations)
- }
- func (a *agollo) loadNameSpace(namespace string) Configurations {
- // 存储不存在的namespace, 之后在longPoller中拉取配置
- a.notificationMapLock.Lock()
- defer a.notificationMapLock.Unlock()
- if len(a.notificationMap) == 0 {
- a.notificationMap = make(map[string]int)
- }
- _, val := a.notificationMap[namespace]
- if val == false {
- a.notificationMap[namespace] = defaultNotificationID
- }
- if a.opts.AutoFetchOnCacheMiss {
- configs, err := a.loadConfigFromCache(namespace)
- if err == nil {
- //a.log("Namesapce", namespace, "From", "cache-api")
- return configs
- }
- }
- if a.opts.FailTolerantOnBackupExists {
- configs, err := a.loadBackup(namespace)
- if err == nil {
- a.log("Namesapce", namespace, "From", "local")
- return configs
- }
- }
- return Configurations{}
- }
- func (a *agollo) Options() Options {
- return a.opts
- }
- func (a *agollo) Watch() <-chan *ApolloResponse {
- watchLock.Lock()
- defer watchLock.Unlock()
- if a.watchCh == nil {
- a.watchCh = make(chan *ApolloResponse)
- }
- return a.watchCh
- }
- func (a *agollo) WatchNamespace(namespace string, stop chan bool) <-chan *ApolloResponse {
- watchCh, val := a.watchNamespaceChMap[namespace]
- if val == false {
- go func(stop chan bool) {
- select {
- case <-stop:
- delete(a.watchNamespaceChMap, namespace)
- //a.watchNamespaceChMap.Delete(namespace)
- }
- }(stop)
- watchCh = make(chan *ApolloResponse)
- }
- return watchCh
- }
- func (a *agollo) sendWatchCh(namespace string, oldVal, newVal Configurations) {
- resp := &ApolloResponse{
- Namespace: namespace,
- OldValue: oldVal,
- NewValue: newVal,
- }
- timer := time.NewTimer(defaultWatchTimeout)
- for _, watchCh := range a.getWatchChs(namespace) {
- select {
- case watchCh <- resp:
- case <-timer.C: // 防止创建全局监听或者某个namespace监听却不消费死锁问题
- timer.Reset(defaultWatchTimeout)
- }
- }
- }
- func (a *agollo) getWatchChs(namespace string) []chan *ApolloResponse {
- var chs []chan *ApolloResponse
- watchLock.Lock()
- defer watchLock.Unlock()
- if a.watchCh != nil {
- chs = append(chs, a.watchCh)
- }
- if watchNamespaceCh, found := a.watchNamespaceChMap[namespace]; found {
- chs = append(chs, watchNamespaceCh)
- }
- return chs
- }
- func (a *agollo) sendErrorsCh(notifications []Notification, namespace string, err error) {
- longPollerError := &LongPollerError{
- ConfigServerURL: a.opts.ConfigServerURL,
- AppID: a.opts.AppID,
- Cluster: a.opts.Cluster,
- Notifications: notifications,
- Namespace: namespace,
- Err: err,
- }
- select {
- case a.errorsCh <- longPollerError:
- default:
- }
- }
- func (a *agollo) log(kvs ...interface{}) {
- timeStr := time.Now().In(timeZone).Format("2006-01-02 15:04:05")
- a.opts.Logger.Log(
- append([]interface{}{
- "[" + timeStr + "]",
- "[Agollo]",
- "ConfigServerUrl", a.opts.ConfigServerURL,
- "AppID", a.opts.AppID,
- "Cluster", a.opts.Cluster,
- },
- kvs...,
- )...,
- )
- }
- func (a *agollo) loadConfigFromCache(namespace string) (configurations Configurations, err error) {
- configurations, err = a.opts.ApolloClient.GetConfigsFromCache(
- a.opts.ConfigServerURL,
- a.opts.AppID,
- a.opts.Cluster,
- namespace)
- if err != nil {
- a.log("Namespace", namespace, "Action", "LoadConfigFromCache", "Error", err.Error())
- return
- }
- err = a.handleConfig(namespace, configurations)
- return
- }
- func (a *agollo) loadConfigFromNonCache(namespace string) (configurations Configurations, err error) {
- var (
- status int
- config *Config
- //cachedReleaseKey, _ = a.namespaceMap.LoadOrStore(namespace, "")
- )
- a.namespaceMapLock.Lock()
- defer a.namespaceMapLock.Unlock()
- if len(a.namespaceMap) == 0 {
- a.namespaceMap = make(map[string]string)
- }
- cachedReleaseKey, val := a.namespaceMap[namespace]
- if val == false {
- a.namespaceMap[namespace] = ""
- cachedReleaseKey = ""
- } else {
- cachedReleaseKey = a.namespaceMap[namespace]
- }
- status, config, err = a.opts.ApolloClient.GetConfigsFromNonCache(
- a.opts.ConfigServerURL,
- a.opts.AppID,
- a.opts.Cluster,
- namespace,
- ReleaseKey(cachedReleaseKey),
- )
- if err != nil {
- a.log("Namespace", namespace, "Action", "LoadConfigFromNonCache", "Error", err.Error())
- return
- }
- if status == http.StatusOK {
- configurations = config.Configurations
- a.namespaceMap[namespace] = config.ReleaseKey
- err = a.handleConfig(namespace, config.Configurations)
- return
- }
- return
- }
- func (a *agollo) loadAllConfigFromNonCacheAndThenCache() (err error) {
- var (
- status int
- config *Config
- //cachedReleaseKey, _ = a.namespaceMap.LoadOrStore(namespace, "")
- )
- a.namespaceMapLock.Lock()
- defer a.namespaceMapLock.Unlock()
- if len(a.cache) == 0 {
- a.cache = make(map[string]interface{})
- }
- for _, namespace := range a.opts.PreloadNamespaces {
- if len(a.namespaceMap) == 0 {
- a.namespaceMap = make(map[string]string)
- }
- cachedReleaseKey, val := a.namespaceMap[namespace]
- if val == false {
- a.namespaceMap[namespace] = ""
- cachedReleaseKey = ""
- } else {
- cachedReleaseKey = a.namespaceMap[namespace]
- }
- status, config, err = a.opts.ApolloClient.GetConfigsFromNonCache(
- a.opts.ConfigServerURL,
- a.opts.AppID,
- a.opts.Cluster,
- namespace,
- ReleaseKey(cachedReleaseKey),
- )
- if err != nil {
- a.log("Namespace", namespace, "Action", "LoadConfigFromNonCache", "Error", err.Error())
- return err
- }
- if status == http.StatusOK {
- configurations := config.Configurations
- a.namespaceMap[namespace] = config.ReleaseKey
- a.cacheLock.Lock()
- a.cache[namespace] = configurations
- a.cacheLock.Unlock()
- }
- }
- //a.log("ReloadAllNamespace", "From api")
- return a.backup()
- }
- func (a *agollo) handleConfig(namespace string, configurations Configurations) error {
- // 读取旧缓存用来给监听队列
- oldValue := a.GetNameSpace(namespace)
- // 覆盖旧缓存
- a.cacheLock.Lock()
- a.cache[namespace] = configurations
- a.cacheLock.Unlock()
- // 发送到监听channel
- a.sendWatchCh(namespace, oldValue, configurations)
- // 备份配置
- return a.backup()
- }
- func (a *agollo) backup() error {
- backup := map[string]Configurations{}
- a.cacheLock.Lock()
- defer a.cacheLock.Unlock()
- for key, val := range a.cache {
- conf, _ := val.(Configurations)
- backup[key] = conf
- }
- data, err := json.Marshal(backup)
- if err != nil {
- return err
- }
- return ioutil.WriteFile(a.opts.BackupFile, data, 0644)
- }
- func (a *agollo) loadAllFromBackup() error {
- if len(a.cache) == 0 {
- a.cache = make(map[string]interface{})
- }
- if _, err := os.Stat(a.opts.BackupFile); err != nil {
- return err
- }
- data, err := ioutil.ReadFile(a.opts.BackupFile)
- if err != nil {
- return err
- }
- backup := map[string]Configurations{}
- err = json.Unmarshal(data, &backup)
- if err != nil {
- return err
- }
- a.cacheLock.Lock()
- defer a.cacheLock.Unlock()
- for _, specifyNamespace := range a.opts.PreloadNamespaces {
- for namespace, configs := range backup {
- if namespace == specifyNamespace {
- a.cache[namespace] = configs
- }
- }
- }
- a.log("ReloadAllNamesapce", "From", "Backup")
- return nil
- }
- func (a *agollo) loadBackup(specifyNamespace string) (Configurations, error) {
- if _, err := os.Stat(a.opts.BackupFile); err != nil {
- return nil, nil
- }
- data, err := ioutil.ReadFile(a.opts.BackupFile)
- if err != nil {
- return nil, err
- }
- backup := map[string]Configurations{}
- err = json.Unmarshal(data, &backup)
- if err != nil {
- return nil, err
- }
- for namespace, configs := range backup {
- if namespace == specifyNamespace {
- if len(a.cache) == 0 {
- a.cache = make(map[string]interface{})
- }
- a.cache[namespace] = configs
- return configs, nil
- }
- }
- return nil, nil
- }
- func (a *agollo) longPoll() {
- notifications := a.notifications()
- status, notifications, err := a.opts.ApolloClient.Notifications(
- a.opts.ConfigServerURL,
- a.opts.AppID,
- a.opts.Cluster,
- notifications,
- )
- if err != nil {
- a.log("Notifications", Notifications(a.notifications()).String(),
- "Error", err.Error(), "Action", "LongPoll")
- a.sendErrorsCh(notifications, "", err)
- }
- if status == http.StatusOK {
- // 服务端判断没有改变,不会返回结果,这个时候不需要修改,遍历空数组跳过
- for _, notification := range notifications {
- _, err = a.loadConfigFromNonCache(notification.NamespaceName)
- if err == nil {
- a.notificationMapLock.Lock()
- a.notificationMap[notification.NamespaceName] = notification.NotificationID
- a.notificationMapLock.Unlock()
- continue
- } else {
- a.sendErrorsCh(notifications, notification.NamespaceName, err)
- }
- }
- }
- }
- func (a *agollo) notifications() []Notification {
- var notifications []Notification
- a.notificationMapLock.Lock()
- for key, val := range a.notificationMap {
- notifications = append(notifications, Notification{
- NamespaceName: key,
- NotificationID: val,
- })
- }
- a.notificationMapLock.Unlock()
- return notifications
- }
- // 启动goroutine去轮训apollo通知接口
- func (a *agollo) Start() <-chan *LongPollerError {
- a.runOnce.Do(func() {
- go func() {
- timer := time.NewTimer(a.opts.LongPollerInterval)
- defer timer.Stop()
- for !a.shouldStop() {
- select {
- case <-timer.C:
- a.longPoll()
- timer.Reset(a.opts.LongPollerInterval)
- case <-a.stopCh:
- return
- }
- }
- }()
- })
- return a.errorsCh
- }
- func (a *agollo) Stop() {
- a.stopLock.Lock()
- defer a.stopLock.Unlock()
- if a.stop {
- return
- }
- a.stop = true
- close(a.stopCh)
- }
- func (a *agollo) shouldStop() bool {
- select {
- case <-a.stopCh:
- return true
- default:
- return false
- }
- }
- func Init(configServerURL, appID string, opts ...Option) (err error) {
- defaultAgollo, err = New(configServerURL, appID, opts...)
- return
- }
- func InitWithConfigFile(configFilePath string, opts ...Option) (err error) {
- defaultAgollo, err = NewWithConfigFile(configFilePath, opts...)
- return
- }
- func InitWithDefaultConfigFile(opts ...Option) error {
- return InitWithConfigFile(defaultConfigFilePath, opts...)
- }
- func Start() <-chan *LongPollerError {
- return defaultAgollo.Start()
- }
- func Stop() {
- defaultAgollo.Stop()
- }
- func Reload() error {
- return defaultAgollo.Reload()
- }
- func GetAll() map[string]interface{} {
- return defaultAgollo.GetAll()
- }
- func Get(key string, opts ...GetOption) string {
- return defaultAgollo.Get(key, opts...)
- }
- func GetNameSpace(namespace string) Configurations {
- return defaultAgollo.GetNameSpace(namespace)
- }
- func Watch() <-chan *ApolloResponse {
- return defaultAgollo.Watch()
- }
- func WatchNamespace(namespace string, stop chan bool) <-chan *ApolloResponse {
- return defaultAgollo.WatchNamespace(namespace, stop)
- }
- func GetAgollo() Agollo {
- return defaultAgollo
- }
|