项目背景
项目中需要用到mysql的分表场景,调研了一些常用的分库分表中间件,比如,mycat,小米的Gaea,这两个中间件太重了,学习成本较大,另外mycat不是go写的。我们需要一个轻量级的go版本的分表中间件。所以,把目光放在了如下这个开源组件上。go-gorm/sharding: High performance table sharding plugin for Gorm. (github.com)https://github.com/go-gorm/sharding
案例
自定义分表函数以及主键生成自定义函数。该案例仅用于测试,分表的逻辑通常不会基于时间进行分表。测试中使用的主键生成方法也仅仅是用于测试,实际项目中并不推荐使用,该方式对于高并发场景并不友好,实际场景中使用预先生成的方式或是其他的分布式id生成器更好。
事实上,这个库有一些默认的主键id生成方式。
const (// Use Snowflake primary key generatorPKSnowflake = iota// Use PostgreSQL sequence primary key generatorPKPGSequence// Use MySQL sequence primary key generatorPKMySQLSequence// Use custom primary key generatorPKCustom
)
但是,在这个自定义分表逻辑场景中,使用默认的主键id生成方式,出现了bug。
比如,使用KMySQLSequence这种主键生成方式,第二次执行插入sql操作就报了Error 1062 (23000): Duplicate entry '2' for key 'orders_2025.PRIMARY'这个错误。我大概看了一下导致错误的原因在于如下这个方法,这个方法的意思是主键id获取数据库中最后一次插入操作所生成的自增ID值,这就导致,第二次执行插入sql操作时,主键取的是已经存在的,导致报了上面的错误。
初步分析下来是这个原因,目前尚未验证这个错误原因。
func (s *Sharding) genMySQLSequenceKey(tableName string, index int64) int64 {var id int64err := s.DB.Exec("UPDATE `" + mySQLSeqName(tableName) + "` SET id = LAST_INSERT_ID(id + 1)").Errorif err != nil {panic(err)}err = s.DB.Raw("SELECT LAST_INSERT_ID()").Scan(&id).Errorif err != nil {panic(err)}return id
}
使用PKSnowflake这种主键生成方式,导致panic。
初步分析下来的原因是这个index源码中默认设定的最大值是1024,但是,在我们这个场景中,分表的后缀超过了1024,来到了2024,2025等。
同样,该错误原因尚未深入分析,可能不是这个原因,尚待进一步分析。
func (s *Sharding) genSnowflakeKey(index int64) int64 {return s.snowflakeNodes[index].Generate().Int64()
}
测试案例如下:
package testimport ("fmt""testing""time""gorm.io/driver/mysql""gorm.io/gorm""gorm.io/sharding"
)var globalDB *gorm.DBtype Order struct {ID int64 `gorm:"primaryKey"`OrderId string `gorm:"sharding:order_id"` // 指明 OrderId 是分片键UserID int64ProductID int64OrderDate time.Time
}// 自定义 ShardingAlgorithm
func customShardingAlgorithm(value any) (suffix string, err error) {if orderId, ok := value.(string); ok {// 截取字符串,截取前8位,获取年份orderId = orderId[0:8]orderDate, err := time.Parse("20060102", orderId)if err != nil {return "", fmt.Errorf("invalid order_date")}year := orderDate.Year()return fmt.Sprintf("_%d", year), nil}return "", fmt.Errorf("invalid order_date")
}// customePrimaryKeyGeneratorFn 自定义主键生成函数
func customePrimaryKeyGeneratorFn(tableIdx int64) int64 {var id int64seqTableName := "gorm_sharding_orders_id_seq" // 序列表名db := globalDB// 使用事务来确保主键生成的原子性tx := db.Begin()defer func() {if r := recover(); r != nil {tx.Rollback()}}()// 锁定序列表以确保并发安全(可选,取决于你的 MySQL 配置和并发级别)// 注意:在某些 MySQL 版本和配置中,使用 LOCK TABLES 可能不是最佳选择// 这里仅作为示例,实际应用中可能需要更精细的并发控制策略tx.Exec("LOCK TABLES " + seqTableName + " WRITE")// 查询当前的最大 IDtx.Raw("SELECT id FROM " + seqTableName + " ORDER BY id DESC LIMIT 1").Scan(&id)// 更新序列表(这里直接递增 1,实际应用中可能需要更复杂的逻辑)newID := id + 1tx.Exec("INSERT INTO "+seqTableName+" (id) VALUES (?)", newID) // 这里假设序列表允许插入任意 ID,实际应用中可能需要其他机制来确保 ID 的唯一性和连续性// 释放锁定tx.Exec("UNLOCK TABLES")// 提交事务if err := tx.Commit().Error; err != nil {panic(err) // 实际应用中应该使用更优雅的错误处理机制}return newID
}// Test_Gorm_Sharding 用于测试 Gorm Sharding 插件
func Test_Gorm_Sharding(t *testing.T) {// 连接到 MySQL 数据库dsn := "***:******@tcp(ip:port)/sharding_db2?charset=utf8mb4&parseTime=True&loc=Local"db, err := gorm.Open(mysql.New(mysql.Config{DSN: dsn,}), &gorm.Config{})if err != nil {panic("failed to connect database")}globalDB = db// 配置 Gorm Sharding 中间件,使用自定义的分片算法middleware := sharding.Register(sharding.Config{ShardingKey: "order_id",ShardingAlgorithm: customShardingAlgorithm, // 使用自定义的分片算法PrimaryKeyGenerator: sharding.PKCustom,PrimaryKeyGeneratorFn: customePrimaryKeyGeneratorFn,}, "orders") // 逻辑表名为 "orders"db.Use(middleware)// 创建 Order 表err = db.Exec(`CREATE TABLE IF NOT EXISTS orders_2024 (id BIGINT PRIMARY KEY,order_id VARCHAR(50),user_id INT,product_id INT,order_date DATETIME)`).Errorif err != nil {panic("failed to create table")}err = db.Exec(`CREATE TABLE IF NOT EXISTS orders_2025 (id BIGINT PRIMARY KEY,order_id VARCHAR(50),user_id INT,product_id INT,order_date DATETIME)`).Errorif err != nil {panic("failed to create table")}// 示例:插入订单数据order := Order{OrderId: "20240101ORDER0001",UserID: 1,ProductID: 100,OrderDate: time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),}err = db.Create(&order).Errorif err != nil {fmt.Println("Error creating order:", err)}order2 := Order{OrderId: "20250101ORDER0001",UserID: 1,ProductID: 100,OrderDate: time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC),}err = db.Create(&order2).Errorif err != nil {fmt.Println("Error creating order:", err)}// 查询示例var orders []Ordererr = db.Model(&Order{}).Where("order_id=?", "20240101ORDER0001").Find(&orders).Errorif err != nil {fmt.Println("Error querying orders:", err)}fmt.Printf("Selected orders: %#v\n", orders)// 更新示例err = db.Model(&Order{}).Where("order_id=? and id=?", "20240101ORDER0001", int64(14)).Update("product_id", 102).Errorif err != nil {fmt.Println("Error updating order:", err)}// 再次查询示例,验证更新是否生效err = db.Model(&Order{}).Where("order_id=?", "20240101ORDER0001").Find(&orders).Errorif err != nil {fmt.Println("Error querying orders:", err)}fmt.Printf("Selected orders: %#v\n", orders)// 删除示例err = db.Model(&Order{}).Where("order_id=? and id =?", "20240101ORDER0001", int64(16)).Delete(&Order{}).Errorif err != nil {fmt.Println("Error deleting order:", err)}// 再次查询示例,验证删除是否生效err = db.Model(&Order{}).Where("order_id=?", "20240101ORDER0001").Find(&orders).Errorif err != nil {fmt.Println("Error querying orders:", err)}fmt.Printf("Selected orders: %#v\n", orders)
}
遗留问题
1.上文中提到了两种默认主键生成方式报bug问题需要进一步定位以确定根因。
2.该组件的增删改查仅支持查询条件中包含分表主键的场景,对于,不包含主键需要全表扫描的场景并不支持,当然,全表扫描最优解并不是通过mysql的能力解决,最好是借助其他的方案,比如,通过es的方案。但是,对于,我们目前的场景中依然存在上述需求,所以,尚需考虑该场景如何解决。
3.IN查询也不支持。
本质上只支持,根据分表键路由到对应的表进行单表查询。