conn.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package common_db
  2. import (
  3. "context"
  4. _ "database/sql"
  5. "reflect"
  6. "sync/atomic"
  7. "time"
  8. "git.shuncheng.lu/bigthing/gocommon/pkg/trace"
  9. "git.shuncheng.lu/bigthing/gocommon/pkg/conf"
  10. "git.shuncheng.lu/bigthing/gocommon/pkg/internal/constant"
  11. . "git.shuncheng.lu/bigthing/gocommon/pkg/internal/util"
  12. _ "github.com/go-sql-driver/mysql"
  13. "xorm.io/xorm"
  14. )
  15. type DbConn interface {
  16. Close() error
  17. DbName() string
  18. NewSlaveEngine() *xorm.Engine
  19. NewMasterEngine() *xorm.Engine
  20. NewSlaveSession() *xorm.Session
  21. NewMasterSession() *xorm.Session
  22. NewSlaveCtxSession(ctx context.Context) *xorm.Session
  23. NewMasterCtxSession(ctx context.Context) *xorm.Session
  24. }
  25. const (
  26. defaultPingConnection = 30
  27. defaultRefreshConfigTime = 30
  28. )
  29. type dbConn struct {
  30. dbName string
  31. realDbName string
  32. config map[string]string
  33. /**
  34. // 原子包的速度
  35. BenchmarkName-8 1000000000 0.817 ns/op
  36. // 速写锁的速度
  37. BenchmarkName-8 28494505 39.3 ns/op
  38. // 完全不是一个量级
  39. */
  40. engineGroup atomic.Value
  41. }
  42. func (d *dbConn) getConfig(key string) string {
  43. return d.config[key]
  44. }
  45. func (d *dbConn) setConfig(conf map[string]string) {
  46. d.config = conf
  47. }
  48. func (d *dbConn) getEngineGroup() *xorm.EngineGroup {
  49. group := d.engineGroup.Load()
  50. if group == nil {
  51. return nil
  52. }
  53. return group.(*xorm.EngineGroup)
  54. }
  55. func (d *dbConn) setEngineGroup(group *xorm.EngineGroup) {
  56. d.engineGroup.Store(group)
  57. }
  58. //主库从库engine group
  59. func NewDbConn(dbName string) (DbConn, error) {
  60. conn := &dbConn{
  61. dbName: dbName,
  62. }
  63. err := conn.reloadEngine()
  64. if err != nil {
  65. return nil, err
  66. }
  67. // 热加载
  68. GoWithRecover(func() {
  69. conn.checkRefreshConfig()
  70. }, func(error interface{}) {
  71. Errorf("[database.checkConn] run panic find err, err=%v", err)
  72. })
  73. // 健康检测 v1.0.2.3 取消,go std driver的feature支持 自动处理断开的连接, https://github.com/go-sql-driver/mysql#features
  74. //go func() {
  75. // conn.checkConn()
  76. //}()
  77. return conn, nil
  78. }
  79. func (d *dbConn) NewSlaveEngine() *xorm.Engine {
  80. return d.getEngineGroup().Slave()
  81. }
  82. func (d *dbConn) NewMasterEngine() *xorm.Engine {
  83. return d.getEngineGroup().Master()
  84. }
  85. func (d *dbConn) NewSlaveSession() *xorm.Session {
  86. return d.NewSlaveEngine().NewSession()
  87. }
  88. func (d *dbConn) NewMasterSession() *xorm.Session {
  89. return d.NewMasterEngine().NewSession()
  90. }
  91. func (d *dbConn) NewSlaveCtxSession(ctx context.Context) *xorm.Session {
  92. ctx = d.withContext(ctx)
  93. return d.NewSlaveEngine().Context(ctx)
  94. }
  95. func (d *dbConn) NewMasterCtxSession(ctx context.Context) *xorm.Session {
  96. ctx = d.withContext(ctx)
  97. return d.NewMasterEngine().Context(ctx)
  98. }
  99. func (d *dbConn) withContext(ctx context.Context) context.Context {
  100. if ctx == nil {
  101. ctx = context.Background()
  102. }
  103. ctx = context.WithValue(ctx, constant.DbName, d.dbName)
  104. span, err := trace.NewMysqlExitSpan(ctx, d.dbName)
  105. if span != nil && err == nil {
  106. ctx = trace.SetMySqlExitSpan(ctx, span)
  107. }
  108. return ctx
  109. }
  110. func (d *dbConn) DbName() string {
  111. return d.realDbName
  112. }
  113. func (d *dbConn) Close() error {
  114. return closeEngine(d.getEngineGroup())
  115. }
  116. /**
  117. 定期检测!健康!!
  118. */
  119. //func (d *dbConn) checkConn() {
  120. // getRefreshTime := func() time.Duration {
  121. // return time.Second * time.Duration(String2Int64(d.getConfig("ping_connection"), defaultPingConnection))
  122. // }
  123. // timer := time.NewTimer(getRefreshTime())
  124. // for {
  125. // <-timer.C
  126. // func() {
  127. // defer func() {
  128. // if err := recover(); err != nil {
  129. // Errorf("[database.checkConn] find panic,err=%v", err)
  130. // }
  131. // }()
  132. // serr := d.NewSlaveSession().Ping()
  133. // merr := d.NewMasterSession().Ping()
  134. // if serr != nil || merr != nil {
  135. // Errorf("[database.checkConn] ping err,err=%v", serr, merr)
  136. // oldEngine := d.getEngineGroup()
  137. // err := d.reloadEngine()
  138. // if err != nil {
  139. // Errorf("[database.checkConn] reload err,err=%v", err)
  140. // return
  141. // }
  142. // go func() {
  143. // err := closeEngine(oldEngine) // 异步关闭
  144. // if err != nil {
  145. // Errorf("[database.checkConn] close err,err=%v", err)
  146. // }
  147. // return
  148. // }()
  149. // }
  150. // }()
  151. // timer.Reset(getRefreshTime())
  152. // }
  153. //}
  154. /**
  155. 定期刷新配置,查看db是否变更!!
  156. */
  157. func (d *dbConn) checkRefreshConfig() {
  158. getRefreshTime := func() time.Duration {
  159. return time.Second * time.Duration(String2Int64(d.getConfig("refresh_config_time"), defaultRefreshConfigTime))
  160. }
  161. timer := time.NewTimer(getRefreshTime())
  162. for {
  163. <-timer.C
  164. func() {
  165. defer func() {
  166. if err := recover(); err != nil {
  167. Errorf("[database.checkRefreshConfig] panic,err=%v", err)
  168. }
  169. }()
  170. newConf, err := getMysqlConfig(d.dbName, conf.SetAndAssertNil)
  171. if err != nil {
  172. Errorf("[database.checkRefreshConfig] getMysqlConfig err,err=%v", err)
  173. return
  174. }
  175. isSame := reflect.DeepEqual(d.config, newConf)
  176. if !isSame {
  177. oldEngine := d.getEngineGroup()
  178. if err := d.reloadEngine(); err != nil {
  179. Errorf("[database.checkRefreshConfig] reload err,err=%v", err)
  180. return
  181. }
  182. go func(oldEngine *xorm.EngineGroup) {
  183. defer func() {
  184. if err := recover(); err != nil {
  185. Errorf("[database.checkRefreshConfig] close engine panic, err=%v", err)
  186. }
  187. }()
  188. err := closeEngine(oldEngine) // 异步关闭
  189. if err != nil {
  190. Errorf("[database.checkConn] close err, err=%v", err)
  191. }
  192. }(oldEngine)
  193. }
  194. }()
  195. timer.Reset(getRefreshTime())
  196. }
  197. }
  198. func (d *dbConn) reloadEngine() error {
  199. group, config, err := newEngine(d.dbName)
  200. if err != nil {
  201. return err
  202. }
  203. if err := testConn(group); err != nil {
  204. return err
  205. }
  206. d.realDbName = If(config["dbname"] == "", d.dbName, config["dbname"]).(string)
  207. d.setConfig(config)
  208. d.setEngineGroup(group)
  209. return nil
  210. }
  211. /**
  212. 测试 engine 连接, 创建 Engine时候需要测试连接
  213. */
  214. func testConn(engine *xorm.EngineGroup) error {
  215. if engine == nil {
  216. return NewError("the engine group is nil can not test")
  217. }
  218. if engine.Slave() == nil {
  219. return NewError("the slave engine is nil can not test")
  220. }
  221. if err := engine.Slave().Ping(); err != nil {
  222. return NewError("slave engine ping find err, err: %s", err)
  223. }
  224. if engine.Master() == nil {
  225. return NewError("the master engine is nil can not test")
  226. }
  227. if err := engine.Master().Ping(); err != nil {
  228. return NewError("master engine ping find err, err: %s", err)
  229. }
  230. return nil
  231. }