前言
针对PostgreSQL进行压缩,有很多相关的工具。有同学又要问了,为何还要再搞一个?比如,pgbench, sysbench之类的,已经很强大了。是的,它们都很强大。但有时候,在一些特殊的场景,可能自己构造一个更能接近真实的生产环境。
这里,我半写,半借助于ChatGPT,搞出一个代码片段来模拟启动一段多线程并发SQL请求,作用于PostgreSQL数据库。然后,你可以对请求执行完以后的结果进行观测,尤其是表膨胀,受影响记录条数之类的。
基于此,我们还可以进行持续改造,快速用于工作之中。
实作
需求:
实现一段代码,读取一个sql文件,然后分段分批执行,并且是以多线程(比如10个线程,go里边可能就是协程,非常高效)去执行这个SQL中的所有SQL语句。再加一个时间限制,比如持续执行120秒。
实现:
package mainimport ("bufio""context""database/sql""fmt""io""os""strings""sync""time"_ "github.com/lib/pq"
)const (host = "localhost"port = 5555user = "postgres"password = "password"dbname = "mydb"
)func execute_sqls(ctx context.Context, sqls []string, wg *sync.WaitGroup, thread int) {defer wg.Done()psqlInfo := fmt.Sprintf("host=%s port=%d user=%s password=%s dbname=%s sslmode=disable", host, port, user, password, dbname)db, err := sql.Open("postgres", psqlInfo)if err != nil {panic(err)}defer db.Close()start := time.Now()for {for _, sql := range sqls {select {case <-ctx.Done():elapsed := time.Since(start)fmt.Printf("Thread %d stopped. It executed SQLs for %s \n", thread, elapsed)returndefault:_, err := db.Exec(sql)if err != nil {fmt.Println(err)}}}}
}func read_sqls(file string) []string {f, err := os.Open(file)if err != nil {panic(err)}defer f.Close()sqls := make([]string, 0)r := bufio.NewReader(f)for {line, err := r.ReadString(';')if err == io.EOF {break} else if err != nil {panic(err)}sql := strings.TrimSpace(line)if sql != "" {sqls = append(sqls, sql)}}return sqls
}func main() {filepath := "file.sql" // Replace with your file pathnumThreads := 10 // Number of threadssqls := read_sqls(filepath)var wg sync.WaitGroupctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) // 60 secondsfor i := 0; i < numThreads; i++ {wg.Add(1)go execute_sqls(ctx, sqls, &wg, i)}wg.Wait()cancel()fmt.Println("All goroutines stopped")
}
上边的代码,关于输入文件:file.sql, 线程数:10, 运行时间:60秒,都是硬编码进去的。你可以根据实际情况,进行参数化。
体验:
在你的go环境已经安装了"github.com/lib/pq"等必备包之后(go get github.com/lib/pq
),就可以直接执行了。我们准备一个pg的基本环境。database: mydb, 端口:5555, 就用postgres用户及相应密码(仅用于测试目的),不缀述。
目标表的准备:
\c mydb
create table t(id int, col2 varchar(32));
file.sql文件内容如下:
insert into t values ((10000*random())::int, md5(random()::varchar));
with updates as (select (10000*random())::int as id) update t set col2 = 'update' || updates.id from updates where t.id=updates.id returning updates.id;
这个测试的代码片段,就是插入一条随机记录,并且再随机更新一条记录,使用CTE语法,把对应的id值返回来,有可能找不到对应的记录,就返回的是空值。在并发大的情况下,update语句慢慢就起作用了。这样就可以反复执行。
来看看效果:
go run ./stress.gohread 7 stopped. It executed SQLs for 59.999324s
Thread 1 stopped. It executed SQLs for 1m0.000756208s
Thread 2 stopped. It executed SQLs for 1m0.000604792s
Thread 4 stopped. It executed SQLs for 1m0.001703583s
Thread 0 stopped. It executed SQLs for 1m0.008518875s
Thread 9 stopped. It executed SQLs for 1m0.008456083s
Thread 5 stopped. It executed SQLs for 1m0.007964375s
Thread 6 stopped. It executed SQLs for 1m0.007968292s
Thread 3 stopped. It executed SQLs for 1m0.008145042s
Thread 8 stopped. It executed SQLs for 1m0.008202209s
All goroutines stopped
1分钟跑完之后,我们看到这样的部分记录结果:
mydb=# select * from t limit 10;id | col2
------+------------4792 | update47923416 | update34169290 | update9290887 | update8878778 | update87787472 | update74724602 | update46023454 | update34542604 | update26041990 | update1990
(10 rows)
总记录条数:
mydb=# select count(*) from t;count
--------126056
(1 row)
引申:可以认为单个C+U操作,10个线程并发,1分钟入库12.6万。
表大小:
mydb=# select pg_total_relation_size('t');pg_total_relation_size
------------------------8372224
(1 row)
使用下边的SQL看看相关指标:
WITH cteTableInfo AS
(SELECT COUNT(1) AS ct,SUM(length(t::text)) AS TextLength ,'public.t'::regclass AS TableName FROM public.t AS t
)
,cteRowSize AS
(SELECT ARRAY [pg_relation_size(TableName), pg_relation_size(TableName, 'vm'), pg_relation_size(TableName, 'fsm'), pg_table_size(TableName), pg_indexes_size(TableName), pg_total_relation_size(TableName), TextLength] AS val, ARRAY ['Relation Size', 'Visibility Map', 'Free Space Map', 'Table Included Toast Size', 'Indexes Size', 'Total Relation Size', 'Live Row Byte Size'] AS NameFROM cteTableInfo
)
SELECT unnest(name) AS Description,unnest(val) AS Bytes,pg_size_pretty(unnest(val)) AS BytesPretty,unnest(val) / ct AS bytes_per_row
FROM cteTableInfo, cteRowSizeUNION ALL SELECT '------------------------------', NULL, NULL, NULL
UNION ALL SELECT 'TotalRows', ct, NULL, NULL FROM cteTableInfo
UNION ALL SELECT 'LiveTuples', pg_stat_get_live_tuples(TableName), NULL, NULL FROM cteTableInfo
UNION ALL SELECT 'DeadTuples', pg_stat_get_dead_tuples(TableName), NULL, NULL FROM cteTableInfo;
结果:
description | bytes | bytespretty | bytes_per_row
--------------------------------+---------+-------------+---------------Relation Size | 8339456 | 8144 kB | 66Visibility Map | 8192 | 8192 bytes | 0Free Space Map | 24576 | 24 kB | 0Table Included Toast Size | 8372224 | 8176 kB | 66Indexes Size | 0 | 0 bytes | 0Total Relation Size | 8372224 | 8176 kB | 66Live Row Byte Size | 2338451 | 2284 kB | 18------------------------------ | | |TotalRows | 126056 | |LiveTuples | 126056 | |DeadTuples | 12274 | |
(11 rows)
里边有涉及到的死元组为12274行。
mydb=# create extension pgstattuple;
CREATE EXTENSIONmydb=# select * from pgstattuple('public.t') \gx
-[ RECORD 1 ]------+--------
table_len | 8339456
tuple_count | 126056
tuple_len | 5125646
tuple_percent | 61.46
dead_tuple_count | 12110
dead_tuple_len | 483172
dead_tuple_percent | 5.79
free_space | 1451672
free_percent | 17.41
这两种统计结果也都比较接近。
当你针对相同的表,进行随机多次测试,发现上边的值也会不断变化(update的命中率会越来越高)。
总结:
本文的目的,只是作一个抛砖引玉,可以随时使用go, python甚至rust去构建一个小的压缩环境,对各种复杂的压力环境进行模拟,并得出相关结论。当然,作为一个团队,可以开发出使用Java之类的接近业务逻辑的工具也是可以的。也有的测试团队,原意使用JMeter + jdbc来构建测试套集,都不失为一种方式。这类工具,是介于pgbench 和 真实业务场景压测之间的一种使用方式。哪个更方便,就可以用哪个。
上边的代码片段,稍加改造,就可以用到实际的实验当中。
关于表膨胀,可以看看我前边的文章:
也聊聊PostgreSQL中的空间膨胀与AutoVacuum
PG中的一例简单的update看表膨胀