api.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package localcache
  2. import (
  3. "context"
  4. "sync"
  5. "time"
  6. "git.shuncheng.lu/bigthing/gocommon/pkg/conf"
  7. "git.shuncheng.lu/bigthing/gocommon/pkg/logger"
  8. )
  9. const (
  10. defaultRefreshTime int64 = 600
  11. defaultCacheName = "localcache"
  12. )
  13. type Cache interface {
  14. Get(key interface{}) interface{}
  15. Put(key interface{}, value interface{})
  16. Delete(key interface{})
  17. KeySet() []interface{}
  18. Refresh()
  19. }
  20. // 用于刷新任务的cache,不能替换cache无效,比如cache=cache1,刷新任务最好recover一下,防止程序出问题
  21. type Refresh func(context context.Context, cache Cache)
  22. type Option func(cache *localCache)
  23. type cacheEntry struct {
  24. // 这个name主要是用来设置ttl时间,直接读取config
  25. cacheName string
  26. // 普通map 并发读写会抛出 fatal error: concurrent map read and map write
  27. // 读多写少,使用sync-map
  28. cache sync.Map
  29. // 刷新缓存的脚本
  30. refresh Refresh
  31. // 刷新时间
  32. refreshTime time.Duration
  33. }
  34. // 对外暴漏的接口
  35. func (this *cacheEntry) Put(key interface{}, value interface{}) {
  36. this.cache.Store(key, value)
  37. }
  38. func (this *cacheEntry) Delete(key interface{}) {
  39. this.cache.Delete(key)
  40. }
  41. func (this *cacheEntry) Get(key interface{}) interface{} {
  42. value, _ := this.cache.Load(key)
  43. return value
  44. }
  45. func (this *cacheEntry) KeySet() []interface{} {
  46. keys := make([]interface{}, 0)
  47. this.cache.Range(func(key, value interface{}) bool {
  48. keys = append(keys, key)
  49. return true
  50. })
  51. return keys
  52. }
  53. type localCache struct {
  54. localCacheName string
  55. lock sync.Mutex
  56. entries map[string]*cacheEntry
  57. start bool
  58. defaultRefreshTime int64 //单位s
  59. }
  60. // 实例化
  61. func NewLocalCache(option ...Option) *localCache {
  62. cache := new(localCache)
  63. for _, e := range option {
  64. if e != nil {
  65. e(cache)
  66. }
  67. }
  68. if cache.localCacheName == "" {
  69. cache.localCacheName = defaultCacheName
  70. }
  71. if cache.entries == nil {
  72. cache.entries = map[string]*cacheEntry{}
  73. }
  74. if cache.defaultRefreshTime == 0 {
  75. cache.defaultRefreshTime = defaultRefreshTime
  76. }
  77. cache.start = false
  78. return cache
  79. }
  80. // 获取数据,不返回bool,是因为业务中只需要判断nil
  81. func (this *localCache) Get(cacheName string, keyName interface{}) interface{} {
  82. entry, isExist := this.entries[cacheName]
  83. if !isExist || entry == nil {
  84. return nil
  85. }
  86. return entry.Get(keyName)
  87. }
  88. // 注册
  89. func (this *localCache) Register(configName string, refresh Refresh) {
  90. this.lock.Lock()
  91. defer this.lock.Unlock()
  92. if this.start {
  93. return
  94. }
  95. cache, isExist := this.entries[configName]
  96. if isExist || cache != nil {
  97. return
  98. }
  99. entry := cacheEntry{
  100. cacheName: configName,
  101. refresh: refresh,
  102. refreshTime: this.getRefreshTime(configName),
  103. }
  104. this.entries[configName] = &entry
  105. }
  106. func (this *localCache) Start() {
  107. this.lock.Lock()
  108. defer this.lock.Unlock()
  109. if this.start {
  110. return
  111. }
  112. // 同步初始化数据
  113. for index, _ := range this.entries {
  114. this.entries[index].Refresh()
  115. }
  116. // goroutine 跑需要recover
  117. for _, elem := range this.entries {
  118. go func(job *cacheEntry) {
  119. timer := time.NewTimer(job.refreshTime)
  120. for {
  121. <-timer.C
  122. job.Refresh()
  123. job.refreshTime = this.getRefreshTime(job.cacheName)
  124. timer.Reset(job.refreshTime)
  125. }
  126. }(elem)
  127. }
  128. this.start = true
  129. }
  130. func (this *cacheEntry) Refresh() {
  131. defer func() {
  132. if p := recover(); p != nil {
  133. logger.Errorf("[local-cache] cache-job has panic,cache-name=%s,panic_info=%v", this.cacheName, p)
  134. }
  135. }()
  136. start := time.Now()
  137. ctx := logger.NewTraceIdContext()
  138. logger.Infoc(ctx, "[local-cache] %s start, ttl=%.0fs", this.cacheName, this.refreshTime.Seconds())
  139. this.refresh(ctx, this)
  140. logger.Infoc(ctx, "[local-cache] %s end, ttl=%.0fs,refresh_spend=%fs", this.cacheName, this.refreshTime.Seconds(), time.Now().Sub(start).Seconds())
  141. }
  142. func (this *localCache) getRefreshTime(cacheName string) time.Duration {
  143. return time.Duration(conf.GetInt64(this.localCacheName, cacheName, this.defaultRefreshTime)) * time.Second
  144. }