0%

使用 TiDB Binlog 同步数据到 MySQL 步骤

本文档适用于 TiDB 集群已经搭建完成并且已有部分数据在集群中,需要搭建 TiDB Binlog 将 TiDB 中数据同步到 MySQL 的情况 。

说明:

  • $ 符号表示命令需要在服务器命令行中执行
  • ${xxx} 表示需要替换成实际对应的内容

步骤一、扩容 pump 节点

  1. 准备扩容 pump 节点的拓扑文件
1
2
3
4
5
$ cat scale-pump.yaml
pump_servers:
host: 10.55.12.31
host: 10.55.12.32
host: 10.55.12.33
  1. 执行扩容命令
1
$ tiup cluster scale-out ${cluster_name} ./scale-pump.yaml -uroot -p
  1. 修改 TiDB 集群,开启 Binlog
1
2
3
4
5
6
$ tiup cluster edit-config ${cluster_name}
在 server_config 部分添加对应的配置
server_configs:
tidb:
binlog.enable: true
binlog.ignore-error: true
  1. 滚动重启 tidb-server 节点开始 binlog
1
$ tiup cluster reload ${cluster_name} -R tidb
  1. 查看集群状态
1
$ tiup cluster display ${cluster_name}

步骤二、全量数据迁移

  1. 使用 mydumper 全量初始化导出
1
2
3
$ wget http://download.pingcap.org/tidb-enterprise-tools-latest-linux-amd64.tar.gz
$ tar -xzf tidb-enterprise-tools-latest-linux-amd64.tar.gz && cd tidb-enterprise-tools-latest-linux-amd64
$ ./bin/mydumper -h ${source ip} -P 4000 -u root -t 16 -F 64 --skip-tz-utc -o ./tmp/all_backup
  1. 获取 initial_commit_ts
1
2
3
4
5
6
7
$ cat /tmp/all_backup/metadata
Started dump at: 2019-12-13 15:16:44
SHOW MASTER STATUS:
Log: tidb-binlog
Pos: 413196983854432259
GTID:
Finished dump at: 2019-12-13 15:16:48

Pos 413196983854432259 即为备份的 tso 时刻点值,后续 Binlog 增量同步时需要从这个时间点开始同步

  1. 备份数据复制到 MySQL 节点并使用 loader 将全备数据导入到目标库中
1
$ ./bin/loader -h ${target ip} -u root -P 3306 -t 32 -d /tmp/all_backup

步骤三、扩容 drainer 节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
1. 准备扩容 drainer 节点的拓扑文件
$ cat scale-drainer.yaml
drainer_servers:
- host: 10.0.1.12
# commit_ts 填写 mydumper 备份文件中 metadata 记录的 tso 号信息
commit_ts: 413196983854432259
config:
syncer.db-type: "mysql"
syncer.to.host: "10.0.1.12"
syncer.to.user: "root"
syncer.to.password: ""
syncer.to.port: 3306
syncer.relay.log-dir: "/dir/to/save/log"
syncer.relay.max-file-size: 10485760
  1. 通过 tiup 执行扩容命令
1
$ tiup cluster scale-out ${cluster_name} ./scale-drainer.yaml -uroot -p
  1. 查看集群节点状态
1
$ tiup cluster display ${cluster_name}

步骤四、检查增量数据同步情况

在上游 TiDB 执行一些变更操作,在下游 MySQL 中查询验证数据是否正常同步。

使用 Github Action 替换 Travis 自动发布 Hexo 博客

之前通过 Hexo + Github Pages 的模式搭建了这个博客,并且通过 Travis 自动集成发布我更新的博客文章。但是从 2021 年 12 月份开始 Travis CI 不再免费为开源项目提供持续集成的托管服务了。也是因为我好久没写博客了,导致这个问题直到最近才被发现。寻找替代方案,找了一圈发现 Github 自己提供了 Github Actions 的持续集成服务。那就折腾折腾替换成 Github Actions 来发布博客。

准备工作

介绍

我的博客项目地址是:https://github.com/Win-Man/win-man.github.io 。repo 中分了两个分支,一个 master 分支和一个 dev 分支。dev 分支负责存放原始的 hexo 文件,master 分支负责存放 hexo generate 生成的 html 文件,然后通过 https://win-man.github.io/ 这个网址访问到的就是 master 分支上的 html 文件了。

之前 Travis CI 持续集成服务中是会监测 dev 分支的更新,每当我有将提交 push 到 dev 分支上的时候,会触发 Travis CI 的持续集成服务,基于 dev 分支的内容自动更新 master 分支。所以想用 Github Actions 替换 Travis CI 也是类似的思路:

  1. 监测到 dev 分支有更新
  2. 基于 dev 分支内容执行 hexo deploy

步骤

生成秘钥

这一步是为了让 Github Actions 有权限将更新 push 到 repo 。

  1. 创建一个新的 SSH 秘钥
1
ssh-keygen -f hexo-deploy-key

会生成 hexo-deploy-key 和 hexo-deploy-key.pub 两个文件

  1. 在 win-man.github.io 这个项目添加 Secrets

添加 hexo-deploy-key 的内容作为 Secrets
add secrets

  1. 在 win-man.github.io 这个项目添加 Deploy key

添加 hexo-deploy-key.pub 的内容作为 Deploy key
add deploy key

配置 Github Action

  1. 切换到 dev 分支下,然后创建 workflows 目录, 并配置 main.yaml 文件
1
2
mkdir -p .github/workflows/
touch .github/workflows/main.yaml
  1. 填写配置文件

配置文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
name: hexo-deploy
on:
push:
branches: [ dev ]
pull_request:
branches: [ dev ]

workflow_dispatch:

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
# This workflow contains a single job called "build"
build:
# The type of runner that the job will run on
runs-on: ubuntu-latest

# Steps represent a sequence of tasks that will be executed as part of the job
steps:
# Checks-out your repository under $GITHUB_WORKSPACE, so your job can access it
- name: Checkout dev branch
uses: actions/checkout@v2
with:
ref: dev
- name: Config node.js
uses: actions/setup-node@v2
with:
node-version: '13'
cache: 'npm'
- name: Install hexo
env:
ACTION_DEPLOY_KEY: ${{ secrets.HEXO_DEPLOY_SECRET }}
run: |
mkdir -p ~/.ssh/
echo "$ACTION_DEPLOY_KEY" > ~/.ssh/id_rsa
chmod 600 ~/.ssh/id_rsa
ssh-keyscan github.com >> ~/.ssh/known_hosts
git config --global user.name "Win-Man"
git config --global user.email 825895587@qq.com
npm install hexo-cli -g
npm install
- name: Generate and deploy
run: |
hexo clean
hexo deploy
  1. 提交 Github Actions 配置

  2. 查看 Github Actions 并测试

每当 dev 分支有代码更新的时候,就会触发 Github Actions 的工作流自动发布博客了。

actions

问题

node version 错误导致生成 html 为空

问题描述

Github Actions job 正常触发也正常运行了,没有报错,但是访问 https://win-man.github.io/ 这个网址却是一篇空白。检查 master 分支上的 html 文件发现每篇文章的 html 文件是生成了,但是里面的内容是空的。

问题原因

nodejs 版本太高,hexo 不支持。

解决方案

将 main.yaml 中 node-version 的配置设置为 13 版本。

参考文档

2021 年终总结

还有两天就要结束这一年,迈入新一年了,按照老习惯,趁着放假自己空闲一点,开始回顾自己这一年。想写年终总结时总是思绪万千,但是真的到了输出文字的时候总是写不出什么,可能还是表达能力太差了。

工作

去年一年,还是在 PingCAP 工作,有的变化只是我从社区部门转岗到商业部门做售前。这个决定经过了很久的思想斗争,但不能算是一个考虑周全的决定。做出决定的那一瞬间,其实也还是有很多凭感觉的地方在,考虑了很多也只是凭借感觉做了选择。之前在做 DBA 的时候有考虑过自己的职业规划,要不就是一直做技术,到可以做成技术专家的程度;另一个选择就是在技术有一定积累之后做售前或者架构师,将技术价值更大化。说实话,按照我心里的预期,我目前的工作经验以及积累还不足以让我觉得我可以做一个售前,但是当有机会时,又不想那么轻易的放弃,毕竟没有那么多等一切都准备好了才有的机会。

从 5 月份开始慢慢接触售前的工作,幸好之前已经对于 TiDB 有了解,所以在技术方面的压力没那么大,可以专心面对跟客户交流、跟人打交道方面的内容。售前的工作给了我更多与人接触的机会,之前我并不是那么喜欢跟人打交道,但是现在不得不去跟人打交道,目前已经习惯了。想想自己刚毕业的时候,连接到别人打的电话都会抵触,不愿意接,到现在主动跟人打电话也可以。感觉变化还是挺大的。

读书

从今年开始改变了一下统计我一年读书的方式,之前因为年终总结是按照农历年为跨度写的,所以之前读书也是按照一样的时间维度去统计,但是今年感觉这种方式统计可能会有不严谨的地方,所以就按照公历年份去看看自己读了多少书,这样也方便统计。

去年也是一样买了很多书,但是读完的不多。看了一下去年的统计,总共读完了 36 本书,大部分书还都是在微信读书上读完的,实体书没有读多少。读的技术方面的书越来越少了,啃不动了。也没有专门读某一领域方面的书。

给自己记录一本之后有机会还可以再读一遍的书 《鳗鱼的旅行》。

生活

21 年有一个字很火,那就是 ”卷“。但是我不是一个爱卷的人,所以我选择反卷。21 年选择了更多的时间去生活,或者只是说选择了更少的时间去工作。21 年完成了几件事,首先是去把摩托车驾照考了,也买了自己人生中的第一辆摩托车,在提车回来的第一天就骑着车开了 200 公里(主要的原因是因为杭州禁摩,只能选择绕路将车开回来)。考驾照时是骑的三轮摩托车,从来没有骑过二轮的。胆子很大的,提车那天自己一个人就去了,提车之前在网上看了很久的骑车教程。结果还好,找了一段路练习了一下换挡,然后就 200 公里一路骑回来了。还曾一度骑到了 90+ 码,现在想来还是有点后怕,第一天就这么猛,真的不要命。而且忘记了新车应该要磨合一下。

21 年还完成了连续骑自行车 100 公里的目标,花了一天时间从杭州出发,绕青山湖一圈再骑回杭州,刚好一圈。百公里消耗炒面一碗。也是因为骑自行车在 21 年摔了一跤,把右手手腕摔坏了,本来以为没有伤到骨头没有什么大事,没想到很长时间都不能正常运动,现在做俯卧撑手腕都感觉有点不一样(给自己减肥失败找个理由)。

21 年想学滑雪,没有完成。

最后

最近学到一个新的词叫深度年(Year of Deepening)。主要意思是这一年让自己往已经了解的领域深挖,而不是不断扩展新的领域或者新的爱好,让自己走的更深,而不是更广。觉得 22 年可以往这个方向靠一靠。

原文地址:https://cstack.github.io/db_tutorial/parts/part4.html
原文作者: cstack
译者:Win-Man

目前为止,我们的数据库具备了插入记录和输出所有记录行的能力。让我们花点时间测试一下目前数据库的能力。

我准备使用 rspec 写测试,因为我对这个比较熟悉,并且语法比较易读。

我会定义一个简短的助手去发送一系列命令给我们的数据库程序,然后对返回的输出结果做断言。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
describe 'database' do
def run_script(commands)
raw_output = nil
IO.popen("./db", "r+") do |pipe|
commands.each do |command|
pipe.puts command
end

pipe.close_write

# Read entire output
raw_output = pipe.gets(nil)
end
raw_output.split("\n")
end

it 'inserts and retrieves a row' do
result = run_script([
"insert 1 user1 person1@example.com",
"select",
".exit",
])
expect(result).to match_array([
"db > Executed.",
"db > (1, user1, person1@example.com)",
"Executed.",
"db > ",
])
end
end

这个简单的测试确保了我们得到了和我们输入一样的输出结果,并且这个测试确实通过了:

1
2
3
4
5
bundle exec rspec
.

Finished in 0.00871 seconds (files took 0.09506 seconds to load)
1 example, 0 failures

然后就可以测试往数据库中插入大量的记录了:

1
2
3
4
5
6
7
8
it 'prints error message when table is full' do
script = (1..1401).map do |i|
"insert #{i} user#{i} person#{i}@example.com"
end
script << ".exit"
result = run_script(script)
expect(result[-2]).to eq('db > Error: Table full.')
end

再重新运行测试。

1
2
3
4
5
bundle exec rspec
..

Finished in 0.01553 seconds (files took 0.08156 seconds to load)
2 examples, 0 failures

测试通过了,我们的数据库可以存储 1400 行数据目前,因为我们设置了最大的数据页大小是 100,并且每个数据页上可以存储 14 条记录。

重新翻看我们写的代码,我意识到我们可能没有正确的处理如何存储 text 字段。可以很容易的验证这个 case:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
it 'allows inserting strings that are the maximum length' do
long_username = "a"*32
long_email = "a"*255
script = [
"insert 1 #{long_username} #{long_email}",
"select",
".exit",
]
result = run_script(script)
expect(result).to match_array([
"db > Executed.",
"db > (1, #{long_username}, #{long_email})",
"Executed.",
"db > ",
])
end

然后测试就跑不通了

1
2
3
4
5
6
7
8
9
10
Failures:

1) database allows inserting strings that are the maximum length
Failure/Error: raw_output.split("\n")

ArgumentError:
invalid byte sequence in UTF-8
# ./spec/main_spec.rb:14:in `split'
# ./spec/main_spec.rb:14:in `run_script'
# ./spec/main_spec.rb:48:in `block (2 levels) in <top (required)>'

如果我们手动测试这个例子,我们可以在输出的记录中看到一些奇怪的字符(我对字符串进行了缩写):

1
2
3
4
5
6
db > insert 1 aaaaa... aaaaa...
Executed.
db > select
(1, aaaaa...aaa\�, aaaaa...aaa\�)
Executed.
db >

这是怎么回事?我们再看一下我们对于 Row 的定义,我们给 username 字段分配了 32 字节的空间,email 字段分配了 255 字节的空间。但是 C 语言中 string 类型的字段应该是以 null 字符结尾的,我们并没有给这个结束符分配空间。所以这个问题的结局方案就是多分配一个字节的空间:

1
2
3
4
5
6
7
8
 const uint32_t COLUMN_EMAIL_SIZE = 255;
typedef struct {
uint32_t id;
- char username[COLUMN_USERNAME_SIZE];
- char email[COLUMN_EMAIL_SIZE];
+ char username[COLUMN_USERNAME_SIZE + 1];
+ char email[COLUMN_EMAIL_SIZE + 1];
} Row;

再次运行测试,可以看到测试通过了

1
2
3
4
5
 bundle exec rspec
...

Finished in 0.0188 seconds (files took 0.08516 seconds to load)
3 examples, 0 failures

我们不应该允许插入的 username 和 email 字段的值超过我们定义的 column size ,可以增加一个测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
it 'prints error message if strings are too long' do
long_username = "a"*33
long_email = "a"*256
script = [
"insert 1 #{long_username} #{long_email}",
"select",
".exit",
]
result = run_script(script)
expect(result).to match_array([
"db > String is too long.",
"db > Executed.",
"db > ",
])
end

为了实现这个限制,我们需要更新一下我们的 parser 解析器。目前使用的是 scanf():

1
2
3
4
5
6
7
8
9
10
if (strncmp(input_buffer->buffer, "insert", 6) == 0) {
statement->type = STATEMENT_INSERT;
int args_assigned = sscanf(
input_buffer->buffer, "insert %d %s %s", &(statement->row_to_insert.id),
statement->row_to_insert.username, statement->row_to_insert.email);
if (args_assigned < 3) {
return PREPARE_SYNTAX_ERROR;
}
return PREPARE_SUCCESS;
}

但是 scanf() 有一些缺点。如果读取的字符串大小超过了分配的空间,会导致缓存溢出并且数据会被写入到未知的空间。我们需要在拷贝数据到 Row 结构体之前先检查一下字符串的长度。为了实现这个,我们需要将输入通过空格切割一下。

我准备试用 strtok() 来实现这个。这个比较容易理解:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
+PrepareResult prepare_insert(InputBuffer* input_buffer, Statement* statement) {
+ statement->type = STATEMENT_INSERT;
+
+ char* keyword = strtok(input_buffer->buffer, " ");
+ char* id_string = strtok(NULL, " ");
+ char* username = strtok(NULL, " ");
+ char* email = strtok(NULL, " ");
+
+ if (id_string == NULL || username == NULL || email == NULL) {
+ return PREPARE_SYNTAX_ERROR;
+ }
+
+ int id = atoi(id_string);
+ if (strlen(username) > COLUMN_USERNAME_SIZE) {
+ return PREPARE_STRING_TOO_LONG;
+ }
+ if (strlen(email) > COLUMN_EMAIL_SIZE) {
+ return PREPARE_STRING_TOO_LONG;
+ }
+
+ statement->row_to_insert.id = id;
+ strcpy(statement->row_to_insert.username, username);
+ strcpy(statement->row_to_insert.email, email);
+
+ return PREPARE_SUCCESS;
+}
+
PrepareResult prepare_statement(InputBuffer* input_buffer,
Statement* statement) {
if (strncmp(input_buffer->buffer, "insert", 6) == 0) {
+ return prepare_insert(input_buffer, statement);
- statement->type = STATEMENT_INSERT;
- int args_assigned = sscanf(
- input_buffer->buffer, "insert %d %s %s", &(statement->row_to_insert.id),
- statement->row_to_insert.username, statement->row_to_insert.email);
- if (args_assigned < 3) {
- return PREPARE_SYNTAX_ERROR;
- }
- return PREPARE_SUCCESS;
}

在输入缓冲区上连续调用strtok,每当缓冲区到达分隔符(在本例中是空格)时,就插入一个空字符,从而将它分解为子字符串。它返回一个指向子字符串开头的指针。

可以调用 strlen() 函数确认每个 text 字段值是不是超过了长度限制。

我们可以像处理别的错误一样处理这个错误:

1
2
3
4
5
6
 enum PrepareResult_t {
PREPARE_SUCCESS,
+ PREPARE_STRING_TOO_LONG,
PREPARE_SYNTAX_ERROR,
PREPARE_UNRECOGNIZED_STATEMENT
};
1
2
3
4
5
6
7
8
9
 switch (prepare_statement(input_buffer, &statement)) {
case (PREPARE_SUCCESS):
break;
+ case (PREPARE_STRING_TOO_LONG):
+ printf("String is too long.\n");
+ continue;
case (PREPARE_SYNTAX_ERROR):
printf("Syntax error. Could not parse statement.\n");
continue;

再次运行测试,测试通过

1
2
3
4
5
bundle exec rspec
....

Finished in 0.02284 seconds (files took 0.116 seconds to load)
4 examples, 0 failures

既然已经增加了一种错误情况,我们不妨再处理一个错误情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
it 'prints an error message if id is negative' do
script = [
"insert -1 cstack foo@bar.com",
"select",
".exit",
]
result = run_script(script)
expect(result).to match_array([
"db > ID must be positive.",
"db > Executed.",
"db > ",
])
end
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
 enum PrepareResult_t {
PREPARE_SUCCESS,
+ PREPARE_NEGATIVE_ID,
PREPARE_STRING_TOO_LONG,
PREPARE_SYNTAX_ERROR,
PREPARE_UNRECOGNIZED_STATEMENT
@@ -148,9 +147,6 @@ PrepareResult prepare_insert(InputBuffer* input_buffer, Statement* statement) {
}

int id = atoi(id_string);
+ if (id < 0) {
+ return PREPARE_NEGATIVE_ID;
+ }
if (strlen(username) > COLUMN_USERNAME_SIZE) {
return PREPARE_STRING_TOO_LONG;
}
@@ -230,9 +226,6 @@ int main(int argc, char* argv[]) {
switch (prepare_statement(input_buffer, &statement)) {
case (PREPARE_SUCCESS):
break;
+ case (PREPARE_NEGATIVE_ID):
+ printf("ID must be positive.\n");
+ continue;
case (PREPARE_STRING_TOO_LONG):
printf("String is too long.\n");
continue;

目前这些测试已经足够了,下一个章节会是一个重要的特性:持久化。我们需要将数据存储到文件中,并且中文件中读取出来。

那会很棒的。

在这给出这个部分中与之前代码不同的部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
@@ -22,6 +22,8 @@

enum PrepareResult_t {
PREPARE_SUCCESS,
+ PREPARE_NEGATIVE_ID,
+ PREPARE_STRING_TOO_LONG,
PREPARE_SYNTAX_ERROR,
PREPARE_UNRECOGNIZED_STATEMENT
};
@@ -34,8 +36,8 @@
#define COLUMN_EMAIL_SIZE 255
typedef struct {
uint32_t id;
- char username[COLUMN_USERNAME_SIZE];
- char email[COLUMN_EMAIL_SIZE];
+ char username[COLUMN_USERNAME_SIZE + 1];
+ char email[COLUMN_EMAIL_SIZE + 1];
} Row;

@@ -150,18 +152,40 @@ MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table *table) {
}
}

-PrepareResult prepare_statement(InputBuffer* input_buffer,
- Statement* statement) {
- if (strncmp(input_buffer->buffer, "insert", 6) == 0) {
+PrepareResult prepare_insert(InputBuffer* input_buffer, Statement* statement) {
statement->type = STATEMENT_INSERT;
- int args_assigned = sscanf(
- input_buffer->buffer, "insert %d %s %s", &(statement->row_to_insert.id),
- statement->row_to_insert.username, statement->row_to_insert.email
- );
- if (args_assigned < 3) {
+
+ char* keyword = strtok(input_buffer->buffer, " ");
+ char* id_string = strtok(NULL, " ");
+ char* username = strtok(NULL, " ");
+ char* email = strtok(NULL, " ");
+
+ if (id_string == NULL || username == NULL || email == NULL) {
return PREPARE_SYNTAX_ERROR;
}
+
+ int id = atoi(id_string);
+ if (id < 0) {
+ return PREPARE_NEGATIVE_ID;
+ }
+ if (strlen(username) > COLUMN_USERNAME_SIZE) {
+ return PREPARE_STRING_TOO_LONG;
+ }
+ if (strlen(email) > COLUMN_EMAIL_SIZE) {
+ return PREPARE_STRING_TOO_LONG;
+ }
+
+ statement->row_to_insert.id = id;
+ strcpy(statement->row_to_insert.username, username);
+ strcpy(statement->row_to_insert.email, email);
+
return PREPARE_SUCCESS;
+
+}
+PrepareResult prepare_statement(InputBuffer* input_buffer,
+ Statement* statement) {
+ if (strncmp(input_buffer->buffer, "insert", 6) == 0) {
+ return prepare_insert(input_buffer, statement);
}
if (strcmp(input_buffer->buffer, "select") == 0) {
statement->type = STATEMENT_SELECT;
@@ -223,6 +247,12 @@ int main(int argc, char* argv[]) {
switch (prepare_statement(input_buffer, &statement)) {
case (PREPARE_SUCCESS):
break;
+ case (PREPARE_NEGATIVE_ID):
+ printf("ID must be positive.\n");
+ continue;
+ case (PREPARE_STRING_TOO_LONG):
+ printf("String is too long.\n");
+ continue;
case (PREPARE_SYNTAX_ERROR):
printf("Syntax error. Could not parse statement.\n");
continue;

以及我们增加的测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
+describe 'database' do
+ def run_script(commands)
+ raw_output = nil
+ IO.popen("./db", "r+") do |pipe|
+ commands.each do |command|
+ pipe.puts command
+ end
+
+ pipe.close_write
+
+ # Read entire output
+ raw_output = pipe.gets(nil)
+ end
+ raw_output.split("\n")
+ end
+
+ it 'inserts and retrieves a row' do
+ result = run_script([
+ "insert 1 user1 person1@example.com",
+ "select",
+ ".exit",
+ ])
+ expect(result).to match_array([
+ "db > Executed.",
+ "db > (1, user1, person1@example.com)",
+ "Executed.",
+ "db > ",
+ ])
+ end
+
+ it 'prints error message when table is full' do
+ script = (1..1401).map do |i|
+ "insert #{i} user#{i} person#{i}@example.com"
+ end
+ script << ".exit"
+ result = run_script(script)
+ expect(result[-2]).to eq('db > Error: Table full.')
+ end
+
+ it 'allows inserting strings that are the maximum length' do
+ long_username = "a"*32
+ long_email = "a"*255
+ script = [
+ "insert 1 #{long_username} #{long_email}",
+ "select",
+ ".exit",
+ ]
+ result = run_script(script)
+ expect(result).to match_array([
+ "db > Executed.",
+ "db > (1, #{long_username}, #{long_email})",
+ "Executed.",
+ "db > ",
+ ])
+ end
+
+ it 'prints error message if strings are too long' do
+ long_username = "a"*33
+ long_email = "a"*256
+ script = [
+ "insert 1 #{long_username} #{long_email}",
+ "select",
+ ".exit",
+ ]
+ result = run_script(script)
+ expect(result).to match_array([
+ "db > String is too long.",
+ "db > Executed.",
+ "db > ",
+ ])
+ end
+
+ it 'prints an error message if id is negative' do
+ script = [
+ "insert -1 cstack foo@bar.com",
+ "select",
+ ".exit",
+ ]
+ result = run_script(script)
+ expect(result).to match_array([
+ "db > ID must be positive.",
+ "db > Executed.",
+ "db > ",
+ ])
+ end
+end

原文地址:https://cstack.github.io/db_tutorial/parts/part3.html
原文作者: cstack
译者:Win-Man

我们通过在我们数据库上设置很多限制,从一个小的数据库开始。这一节,我们会做这些内容:

  • 支持两种操作:插入一行数据和查询出所有数据行
  • 将数据存储在内存中(不持久化到硬盘上)
  • 支持单个、表结构固定的表

我们表结构固定的表示用于存储用户的,结构是:

column type
id integer
username varchar(32)
email varchar(255)

这是一个简单的表结构,但是可以让我们去支持多种数据格式和多种数据大小。

insert 语句会类似于这个形式

1
insert 1 cstack foo@bar.com

这意味着我们需要更新我们的 prepare_statement 函数去解析参数

1
2
3
4
5
6
7
8
9
10
11
   if (strncmp(input_buffer->buffer, "insert", 6) == 0) {
statement->type = STATEMENT_INSERT;
+ int args_assigned = sscanf(
+ input_buffer->buffer, "insert %d %s %s", &(statement->row_to_insert.id),
+ statement->row_to_insert.username, statement->row_to_insert.email);
+ if (args_assigned < 3) {
+ return PREPARE_SYNTAX_ERROR;
+ }
return PREPARE_SUCCESS;
}
if (strcmp(input_buffer->buffer, "select") == 0) {

我们将这些解析出来的参数存储到一个 Row 数据结构中:

1
2
3
4
5
6
7
8
9
10
11
12
+#define COLUMN_USERNAME_SIZE 32
+#define COLUMN_EMAIL_SIZE 255
+typedef struct {
+ uint32_t id;
+ char username[COLUMN_USERNAME_SIZE];
+ char email[COLUMN_EMAIL_SIZE];
+} Row;
+
typedef struct {
StatementType type;
+ Row row_to_insert; // only used by insert statement
} Statement;

现在我们需要将 Row 结构体中的数据拷贝到能代表 table 数据的结构中。 SQLite 使用了 B-tree 数据结构,可以提升查询、插入、删除的速度。我们开始的会更简单一点。类似于 B-tree, 会将行记录分组到 page 页上,但是是将这些数据页以数据形式存储,而不是以树的形式存储。

我们的计划是:

  • 将数据行存储在内存页中
  • 每个数据页可以存储尽可能多的行
  • 记录行被序列化成一种压缩的形式
  • 数据页只有在需要的时候才会分配
  • 保持一个固定大小的数据页指针数组

首先,我们需要定义一下一行记录的压缩格式是怎么样的:

1
2
3
4
5
6
7
8
9
+#define size_of_attribute(Struct, Attribute) sizeof(((Struct*)0)->Attribute)
+
+const uint32_t ID_SIZE = size_of_attribute(Row, id);
+const uint32_t USERNAME_SIZE = size_of_attribute(Row, username);
+const uint32_t EMAIL_SIZE = size_of_attribute(Row, email);
+const uint32_t ID_OFFSET = 0;
+const uint32_t USERNAME_OFFSET = ID_OFFSET + ID_SIZE;
+const uint32_t EMAIL_OFFSET = USERNAME_OFFSET + USERNAME_SIZE;
+const uint32_t ROW_SIZE = ID_SIZE + USERNAME_SIZE + EMAIL_SIZE;

这意味着被序列化之后的一行数据会是这样的:

column size(bytes) offset
id 4 0
username 32 4
email 255 36
total 291

我们也需要将编写转换、反转换的代码.

1
2
3
4
5
6
7
8
9
10
11
+void serialize_row(Row* source, void* destination) {
+ memcpy(destination + ID_OFFSET, &(source->id), ID_SIZE);
+ memcpy(destination + USERNAME_OFFSET, &(source->username), USERNAME_SIZE);
+ memcpy(destination + EMAIL_OFFSET, &(source->email), EMAIL_SIZE);
+}
+
+void deserialize_row(void* source, Row* destination) {
+ memcpy(&(destination->id), source + ID_OFFSET, ID_SIZE);
+ memcpy(&(destination->username), source + USERNAME_OFFSET, USERNAME_SIZE);
+ memcpy(&(destination->email), source + EMAIL_OFFSET, EMAIL_SIZE);
+}

接着,Table 的数据结构就是一个数据页指针和记录这个表内有多少数据:

1
2
3
4
5
6
7
8
9
+const uint32_t PAGE_SIZE = 4096;
+#define TABLE_MAX_PAGES 100
+const uint32_t ROWS_PER_PAGE = PAGE_SIZE / ROW_SIZE;
+const uint32_t TABLE_MAX_ROWS = ROWS_PER_PAGE * TABLE_MAX_PAGES;
+
+typedef struct {
+ uint32_t num_rows;
+ void* pages[TABLE_MAX_PAGES];
+} Table;

我将我们数据页的大小设置为了 4KB ,因为这也是大多数操作系统使用的数据页大小。意味着我们的使用的数据页大小和操作系统数据页大小是一致的。操作系统会将数据页作为一个整体进行交换,而不是将他们拆散成多个数据页。

我随意设置了一个限制,限制我们只能分配 100 个数据页。当我们转成树结构的时候,我们数据库的最大大小受限于单个文件的大小(尽管我们还是有设置我们在内存中保留多少数据页)

一行数据不能跨数据页存储。因为相邻的数据页可能不会同时存在在内存中,这个限制可以让读写行数据更加容易。

说到这个,看一下我们是如果在内存中定位到需要读取的某一行数据的:

1
2
3
4
5
6
7
8
9
10
11
+void* row_slot(Table* table, uint32_t row_num) {
+ uint32_t page_num = row_num / ROWS_PER_PAGE;
+ void* page = table->pages[page_num];
+ if (page == NULL) {
+ // Allocate memory only when we try to access page
+ page = table->pages[page_num] = malloc(PAGE_SIZE);
+ }
+ uint32_t row_offset = row_num % ROWS_PER_PAGE;
+ uint32_t byte_offset = row_offset * ROW_SIZE;
+ return page + byte_offset;
+}

现在我们可以让 execute_statement 从表结构体中读取或者写入数据了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
-void execute_statement(Statement* statement) {
+ExecuteResult execute_insert(Statement* statement, Table* table) {
+ if (table->num_rows >= TABLE_MAX_ROWS) {
+ return EXECUTE_TABLE_FULL;
+ }
+
+ Row* row_to_insert = &(statement->row_to_insert);
+
+ serialize_row(row_to_insert, row_slot(table, table->num_rows));
+ table->num_rows += 1;
+
+ return EXECUTE_SUCCESS;
+}
+
+ExecuteResult execute_select(Statement* statement, Table* table) {
+ Row row;
+ for (uint32_t i = 0; i < table->num_rows; i++) {
+ deserialize_row(row_slot(table, i), &row);
+ print_row(&row);
+ }
+ return EXECUTE_SUCCESS;
+}
+
+ExecuteResult execute_statement(Statement* statement, Table* table) {
switch (statement->type) {
case (STATEMENT_INSERT):
- printf("This is where we would do an insert.\n");
- break;
+ return execute_insert(statement, table);
case (STATEMENT_SELECT):
- printf("This is where we would do a select.\n");
- break;
+ return execute_select(statement, table);
}
}

最后,我们需要初始化一下表,创建释放各自内存的函数以及处理更多错误的情况:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
+ Table* new_table() {
+ Table* table = malloc(sizeof(Table));
+ table->num_rows = 0;
+ for (uint32_t i = 0; i < TABLE_MAX_PAGES; i++) {
+ table->pages[i] = NULL;
+ }
+ return table;
+}
+
+void free_table(Table* table) {
+ for (int i = 0; table->pages[i]; i++) {
+ free(table->pages[i]);
+ }
+ free(table);
+}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
int main(int argc, char* argv[]) {
+ Table* table = new_table();
InputBuffer* input_buffer = new_input_buffer();
while (true) {
print_prompt();
@@ -105,13 +203,22 @@ int main(int argc, char* argv[]) {
switch (prepare_statement(input_buffer, &statement)) {
case (PREPARE_SUCCESS):
break;
+ case (PREPARE_SYNTAX_ERROR):
+ printf("Syntax error. Could not parse statement.\n");
+ continue;
case (PREPARE_UNRECOGNIZED_STATEMENT):
printf("Unrecognized keyword at start of '%s'.\n",
input_buffer->buffer);
continue;
}

- execute_statement(&statement);
- printf("Executed.\n");
+ switch (execute_statement(&statement, table)) {
+ case (EXECUTE_SUCCESS):
+ printf("Executed.\n");
+ break;
+ case (EXECUTE_TABLE_FULL):
+ printf("Error: Table full.\n");
+ break;
+ }
}
}

经过这些修改,我们就可以真正的在我们的数据库中存储数据了

1
2
3
4
5
6
7
8
9
10
11
12
~ ./db
db > insert 1 cstack foo@bar.com
Executed.
db > insert 2 bob bob@example.com
Executed.
db > select
(1, cstack, foo@bar.com)
(2, bob, bob@example.com)
Executed.
db > insert foo bar 1
Syntax error. Could not parse statement.
db > .exit

这时候是写一些测试的好时候了,因为以下几个原因:

  • 我们计划修改数据结构去存储我们的表,有测试的话可以方便捕捉到回归错误
  • 有一些边缘 case 我们没有手动测试(比如填满一张表)

我们会在下一个部分做这些内容。现在先看下这个部分完整修改的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
@@ -2,6 +2,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
+#include <stdint.h>

typedef struct {
char* buffer;
@@ -10,6 +11,105 @@ typedef struct {
} InputBuffer;

+typedef enum { EXECUTE_SUCCESS, EXECUTE_TABLE_FULL } ExecuteResult;
+
+typedef enum {
+ META_COMMAND_SUCCESS,
+ META_COMMAND_UNRECOGNIZED_COMMAND
+} MetaCommandResult;
+
+typedef enum {
+ PREPARE_SUCCESS,
+ PREPARE_SYNTAX_ERROR,
+ PREPARE_UNRECOGNIZED_STATEMENT
+ } PrepareResult;
+
+typedef enum { STATEMENT_INSERT, STATEMENT_SELECT } StatementType;
+
+#define COLUMN_USERNAME_SIZE 32
+#define COLUMN_EMAIL_SIZE 255
+typedef struct {
+ uint32_t id;
+ char username[COLUMN_USERNAME_SIZE];
+ char email[COLUMN_EMAIL_SIZE];
+} Row;
+
+typedef struct {
+ StatementType type;
+ Row row_to_insert; //only used by insert statement
+} Statement;
+
+#define size_of_attribute(Struct, Attribute) sizeof(((Struct*)0)->Attribute)
+
+const uint32_t ID_SIZE = size_of_attribute(Row, id);
+const uint32_t USERNAME_SIZE = size_of_attribute(Row, username);
+const uint32_t EMAIL_SIZE = size_of_attribute(Row, email);
+const uint32_t ID_OFFSET = 0;
+const uint32_t USERNAME_OFFSET = ID_OFFSET + ID_SIZE;
+const uint32_t EMAIL_OFFSET = USERNAME_OFFSET + USERNAME_SIZE;
+const uint32_t ROW_SIZE = ID_SIZE + USERNAME_SIZE + EMAIL_SIZE;
+
+const uint32_t PAGE_SIZE = 4096;
+#define TABLE_MAX_PAGES 100
+const uint32_t ROWS_PER_PAGE = PAGE_SIZE / ROW_SIZE;
+const uint32_t TABLE_MAX_ROWS = ROWS_PER_PAGE * TABLE_MAX_PAGES;
+
+typedef struct {
+ uint32_t num_rows;
+ void* pages[TABLE_MAX_PAGES];
+} Table;
+
+void print_row(Row* row) {
+ printf("(%d, %s, %s)\n", row->id, row->username, row->email);
+}
+
+void serialize_row(Row* source, void* destination) {
+ memcpy(destination + ID_OFFSET, &(source->id), ID_SIZE);
+ memcpy(destination + USERNAME_OFFSET, &(source->username), USERNAME_SIZE);
+ memcpy(destination + EMAIL_OFFSET, &(source->email), EMAIL_SIZE);
+}
+
+void deserialize_row(void *source, Row* destination) {
+ memcpy(&(destination->id), source + ID_OFFSET, ID_SIZE);
+ memcpy(&(destination->username), source + USERNAME_OFFSET, USERNAME_SIZE);
+ memcpy(&(destination->email), source + EMAIL_OFFSET, EMAIL_SIZE);
+}
+
+void* row_slot(Table* table, uint32_t row_num) {
+ uint32_t page_num = row_num / ROWS_PER_PAGE;
+ void *page = table->pages[page_num];
+ if (page == NULL) {
+ // Allocate memory only when we try to access page
+ page = table->pages[page_num] = malloc(PAGE_SIZE);
+ }
+ uint32_t row_offset = row_num % ROWS_PER_PAGE;
+ uint32_t byte_offset = row_offset * ROW_SIZE;
+ return page + byte_offset;
+}
+
+Table* new_table() {
+ Table* table = malloc(sizeof(Table));
+ table->num_rows = 0;
+ for (uint32_t i = 0; i < TABLE_MAX_PAGES; i++) {
+ table->pages[i] = NULL;
+ }
+ return table;
+}
+
+void free_table(Table* table) {
+ for (int i = 0; table->pages[i]; i++) {
+ free(table->pages[i]);
+ }
+ free(table);
+}
+
InputBuffer* new_input_buffer() {
InputBuffer* input_buffer = malloc(sizeof(InputBuffer));
input_buffer->buffer = NULL;
@@ -40,17 +140,105 @@ void close_input_buffer(InputBuffer* input_buffer) {
free(input_buffer);
}

+MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table *table) {
+ if (strcmp(input_buffer->buffer, ".exit") == 0) {
+ close_input_buffer(input_buffer);
+ free_table(table);
+ exit(EXIT_SUCCESS);
+ } else {
+ return META_COMMAND_UNRECOGNIZED_COMMAND;
+ }
+}
+
+PrepareResult prepare_statement(InputBuffer* input_buffer,
+ Statement* statement) {
+ if (strncmp(input_buffer->buffer, "insert", 6) == 0) {
+ statement->type = STATEMENT_INSERT;
+ int args_assigned = sscanf(
+ input_buffer->buffer, "insert %d %s %s", &(statement->row_to_insert.id),
+ statement->row_to_insert.username, statement->row_to_insert.email
+ );
+ if (args_assigned < 3) {
+ return PREPARE_SYNTAX_ERROR;
+ }
+ return PREPARE_SUCCESS;
+ }
+ if (strcmp(input_buffer->buffer, "select") == 0) {
+ statement->type = STATEMENT_SELECT;
+ return PREPARE_SUCCESS;
+ }
+
+ return PREPARE_UNRECOGNIZED_STATEMENT;
+}
+
+ExecuteResult execute_insert(Statement* statement, Table* table) {
+ if (table->num_rows >= TABLE_MAX_ROWS) {
+ return EXECUTE_TABLE_FULL;
+ }
+
+ Row* row_to_insert = &(statement->row_to_insert);
+
+ serialize_row(row_to_insert, row_slot(table, table->num_rows));
+ table->num_rows += 1;
+
+ return EXECUTE_SUCCESS;
+}
+
+ExecuteResult execute_select(Statement* statement, Table* table) {
+ Row row;
+ for (uint32_t i = 0; i < table->num_rows; i++) {
+ deserialize_row(row_slot(table, i), &row);
+ print_row(&row);
+ }
+ return EXECUTE_SUCCESS;
+}
+
+ExecuteResult execute_statement(Statement* statement, Table *table) {
+ switch (statement->type) {
+ case (STATEMENT_INSERT):
+ return execute_insert(statement, table);
+ case (STATEMENT_SELECT):
+ return execute_select(statement, table);
+ }
+}
+
int main(int argc, char* argv[]) {
+ Table* table = new_table();
InputBuffer* input_buffer = new_input_buffer();
while (true) {
print_prompt();
read_input(input_buffer);

- if (strcmp(input_buffer->buffer, ".exit") == 0) {
- close_input_buffer(input_buffer);
- exit(EXIT_SUCCESS);
- } else {
- printf("Unrecognized command '%s'.\n", input_buffer->buffer);
+ if (input_buffer->buffer[0] == '.') {
+ switch (do_meta_command(input_buffer, table)) {
+ case (META_COMMAND_SUCCESS):
+ continue;
+ case (META_COMMAND_UNRECOGNIZED_COMMAND):
+ printf("Unrecognized command '%s'\n", input_buffer->buffer);
+ continue;
+ }
+ }
+
+ Statement statement;
+ switch (prepare_statement(input_buffer, &statement)) {
+ case (PREPARE_SUCCESS):
+ break;
+ case (PREPARE_SYNTAX_ERROR):
+ printf("Syntax error. Could not parse statement.\n");
+ continue;
+ case (PREPARE_UNRECOGNIZED_STATEMENT):
+ printf("Unrecognized keyword at start of '%s'.\n",
+ input_buffer->buffer);
+ continue;
+ }
+
+ switch (execute_statement(&statement, table)) {
+ case (EXECUTE_SUCCESS):
+ printf("Executed.\n");
+ break;
+ case (EXECUTE_TABLE_FULL):
+ printf("Error: Table full.\n");
+ break;
}
}
}

问题记录

前段时间给 github page 换了一个 hexo 主题,换成了 next 主题,但是在更换主题之后,发现 travis 上的 CI 无法跑过了,查看日志内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
$ node --version
v6.9.4
$ npm --version
3.10.10
$ nvm --version
0.37.2
before_install.1
0.01s$ export TZ='Asia/Shanghai'
before_install.2
6.90s$ npm install -g hexo
before_install.3
2.81s$ npm install -g hexo-cli
install
1.54s$ npm install
before_script.1
0.01s$ git config --global user.name "Win-Man"
before_script.2
0.00s$ git config --global user.email 825895587@qq.com
before_script.3
0.00s$ sed -i'' "s~git@github.com:Win-Man/win-man.github.io.git~https://${CI_TOKEN}@github.com/Win-Man/win-man.github.io.git~" _config.yml
0.67s$ hexo clean
The command "hexo clean" exited with 0.
1.17s$ hexo generate
INFO Start processing
FATAL Something's wrong. Maybe you can find the solution here: https://hexo.io/docs/troubleshooting.html
TypeError: Object.values is not a function
at points.views.forEach.type (/home/travis/build/Win-Man/win-man.github.io/themes/next/scripts/events/lib/injects.js:82:46)
at Array.forEach (native)
at module.exports.hexo (/home/travis/build/Win-Man/win-man.github.io/themes/next/scripts/events/lib/injects.js:67:16)
at Hexo.hexo.on (/home/travis/build/Win-Man/win-man.github.io/themes/next/scripts/events/index.js:9:27)
at emitNone (events.js:86:13)
at Hexo.emit (events.js:185:7)
at Hexo._generate (/home/travis/build/Win-Man/win-man.github.io/node_modules/hexo/lib/hexo/index.js:399:8)
at loadDatabase.then.then (/home/travis/build/Win-Man/win-man.github.io/node_modules/hexo/lib/hexo/index.js:249:22)
at tryCatcher (/home/travis/build/Win-Man/win-man.github.io/node_modules/bluebird/js/release/util.js:16:23)
at Promise._settlePromiseFromHandler (/home/travis/build/Win-Man/win-man.github.io/node_modules/bluebird/js/release/promise.js:547:31)
at Promise._settlePromise (/home/travis/build/Win-Man/win-man.github.io/node_modules/bluebird/js/release/promise.js:604:18)
at Promise._settlePromise0 (/home/travis/build/Win-Man/win-man.github.io/node_modules/bluebird/js/release/promise.js:649:10)
at Promise._settlePromises (/home/travis/build/Win-Man/win-man.github.io/node_modules/bluebird/js/release/promise.js:729:18)
at Promise._fulfill (/home/travis/build/Win-Man/win-man.github.io/node_modules/bluebird/js/release/promise.js:673:18)
at PromiseArray._resolve (/home/travis/build/Win-Man/win-man.github.io/node_modules/bluebird/js/release/promise_array.js:127:19)
at PromiseArray._promiseFulfilled (/home/travis/build/Win-Man/win-man.github.io/node_modules/bluebird/js/release/promise_array.js:145:14)
at Promise._settlePromise (/home/travis/build/Win-Man/win-man.github.io/node_modules/bluebird/js/release/promise.js:609:26)
The command "hexo generate" exited with 2.
cache.2
store build cache

但是在我本地电脑执行 hexo g,hexo s,hexo d 都没啥问题,怀疑到了 node js 版本的问题,因为这个 repo 中的 .travis.yml 配置文件还是好几年前的了,于是尝试修改 node js 版本与本地版本一致。修改版本并提交之后还是有报错,但是报错信息变了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ node --version
v13.11.0
$ npm --version
6.13.7
$ nvm --version
0.37.2
before_install.1
0.01s$ export TZ='Asia/Shanghai'
before_install.2
5.40s$ npm install -g hexo
3.16s$ npm install -g hexo-cli
npm WARN optional SKIPPING OPTIONAL DEPENDENCY: fsevents@~2.3.1 (node_modules/hexo-cli/node_modules/chokidar/node_modules/fsevents):
npm WARN notsup SKIPPING OPTIONAL DEPENDENCY: Unsupported platform for fsevents@2.3.2: wanted {"os":"darwin","arch":"any"} (current: {"os":"linux","arch":"x64"})
npm ERR! code EEXIST
npm ERR! syscall symlink
npm ERR! path ../lib/node_modules/hexo-cli/bin/hexo
npm ERR! dest /home/travis/.nvm/versions/node/v13.11.0/bin/hexo
npm ERR! errno -17
npm ERR! EEXIST: file already exists, symlink '../lib/node_modules/hexo-cli/bin/hexo' -> '/home/travis/.nvm/versions/node/v13.11.0/bin/hexo'
npm ERR! File exists: /home/travis/.nvm/versions/node/v13.11.0/bin/hexo
npm ERR! Remove the existing file and try again, or run npm
npm ERR! with --force to overwrite files recklessly.
npm ERR! A complete log of this run can be found in:
npm ERR! /home/travis/.npm/_logs/2021-04-12T06_49_56_362Z-debug.log
The command "npm install -g hexo-cli" failed and exited with 239 during .

根据错误信息查找到一个链接
https://segmentfault.com/a/1190000018759308

解决方案

参考链接中的结果方案在 npm install 中加上 -f 选项。
附上完整的 .travis.yml 内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
language: node_js
node_js:
- '13.11.0'
branches:
only:
- dev
cache:
directories:
- node_modules
before_install:
- export TZ='Asia/Shanghai'
- npm install -g hexo
- npm install -g hexo-cli -f
before_script:
- git config --global user.name "Win-Man"
- git config --global user.email 825895587@qq.com
- sed -i'' "s~git@github.com:Win-Man/win-man.github.io.git~https://${CI_TOKEN}@github.com/Win-Man/win-man.github.io.git~" _config.yml
install:
- npm install -f
script:
- hexo clean
- hexo generate
after_success:
- hexo deploy

原文地址:https://cstack.github.io/db_tutorial/parts/part2.html
原文作者: cstack
译者:Win-Man

世界上最简单的 SQL 编译器和字节码虚拟机

我们正在写一个 sqlite 的克隆版本。sqlite 中 SQL 编译器是一个解析一个 string 输入并输出一个内部可识别的字节码。然后 sqlite 编译之后的字节码传给字节码虚拟机,在虚拟机中执行 SQL。

SQLite Architecture (https://www.sqlite.org/arch.html)

将这些内容拆成两个步骤有一些优势点:

  • 减少每个部门的复杂度(例如:字节码虚拟机不用考虑语法错误)
  • 对于相同的查询可以只需要编译一次,将编译后的字节码缓存住,以此可以提高性能

基于这个想法,我们重构一下我们的主函数来支持两个新的关键字:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
int main(int argc, char* argv[]) {
InputBuffer* input_buffer = new_input_buffer();
while (true) {
print_prompt();
read_input(input_buffer);

- if (strcmp(input_buffer->buffer, ".exit") == 0) {
- exit(EXIT_SUCCESS);
- } else {
- printf("Unrecognized command '%s'.\n", input_buffer->buffer);
+ if (input_buffer->buffer[0] == '.') {
+ switch (do_meta_command(input_buffer)) {
+ case (META_COMMAND_SUCCESS):
+ continue;
+ case (META_COMMAND_UNRECOGNIZED_COMMAND):
+ printf("Unrecognized command '%s'\n", input_buffer->buffer);
+ continue;
+ }
}
+
+ Statement statement;
+ switch (prepare_statement(input_buffer, &statement)) {
+ case (PREPARE_SUCCESS):
+ break;
+ case (PREPARE_UNRECOGNIZED_STATEMENT):
+ printf("Unrecognized keyword at start of '%s'.\n",
+ input_buffer->buffer);
+ continue;
+ }
+
+ execute_statement(&statement);
+ printf("Executed.\n");
}
}

.exit 这种非 SQL 的语句被称为“元指令”。这些元指令都是以 . 开始的,所以我们可以将这类语句单独在一个独立的函数中处理。

接下来,我们增加一个步骤用于将输入的行转换为数据库内部表达式。这就是我们对于 sqlite 前端的 hack 版本。

最后,我们将预处理好的语句传递给 execute_statement。这个函数后续会演进成我们的字节码虚拟机。

需要需要的是我们新增加了两个函数,并且这两个函数返回 enum 来表示成功或者失败。

1
2
3
4
5
6
typedef enum {
META_COMMAND_SUCCESS,
META_COMMAND_UNRECOGNIZED_COMMAND
} MetaCommandResult;

typedef enum { PREPARE_SUCCESS, PREPARE_UNRECOGNIZED_STATEMENT } PrepareResult;

“Unrecognized statement” 语句表示这个语句像是一个表达式,但是表达式有错误,所以我们在任何情况都使用 enum 枚举结果代码。C 语言的编译器会报错如果我的 switch 语句不能处理枚举的项,所以我们可以确保处理了这个函数的每个结果。在将来可能会添加更多的结果代码。

do_meta_command 只是现有函数的一个封装,为了之后可处理更多的命令:

1
2
3
4
5
6
7
MetaCommandResult do_meta_command(InputBuffer* input_buffer) {
if (strcmp(input_buffer->buffer, ".exit") == 0) {
exit(EXIT_SUCCESS);
} else {
return META_COMMAND_UNRECOGNIZED_COMMAND;
}
}

目前我们的预处理语句只是包含了有两个可能结果的枚举数组。这可能会包含更多的数据如果我们允许在语句中传入参数:

1
2
3
4
5
typedef enum { STATEMENT_INSERT, STATEMENT_SELECT } StatementType;

typedef struct {
StatementType type;
} Statement;

prepare_statement(我们的 SQL 编译器)目前还无法识别 SQL 语句。事实上这个函数只能识别两个单词:

1
2
3
4
5
6
7
8
9
10
11
12
13
PrepareResult prepare_statement(InputBuffer* input_buffer,
Statement* statement) {
if (strncmp(input_buffer->buffer, "insert", 6) == 0) {
statement->type = STATEMENT_INSERT;
return PREPARE_SUCCESS;
}
if (strcmp(input_buffer->buffer, "select") == 0) {
statement->type = STATEMENT_SELECT;
return PREPARE_SUCCESS;
}

return PREPARE_UNRECOGNIZED_STATEMENT;
}

需要注意的是我们使用 strncmp 对比 insert 因为 insert 关键字之后会跟着插入的数据。(例如:insert 1 cstack foo@bar.com

最后,execute_statement 包含了一些分支:

1
2
3
4
5
6
7
8
9
10
void execute_statement(Statement* statement) {
switch (statement->type) {
case (STATEMENT_INSERT):
printf("This is where we would do an insert.\n");
break;
case (STATEMENT_SELECT):
printf("This is where we would do a select.\n");
break;
}
}

这边并没有返回任何错误码因为目前为止这边还不会发生任何错误。

经过这些重构之后,我们现在的 sqlite 可以认识两个新的关键字。

1
2
3
4
5
6
7
8
9
10
11
12
13
~ ./db
db > insert foo bar
This is where we would do an insert.
Executed.
db > delete foo
Unrecognized keyword at start of 'delete foo'.
db > select
This is where we would do a select.
Executed.
db > .tables
Unrecognized command '.tables'
db > .exit
~

我们数据库的基本框架基本已经形成了,如果不用存储数据的话,这是不是很好?在下一个部分,我们会去实现 insert 和 select 语句,创造世界上最差劲的存储引擎。同时在这给出这一差异部分的内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@@ -10,6 +10,23 @@ struct InputBuffer_t {
} InputBuffer;

+typedef enum {
+ META_COMMAND_SUCCESS,
+ META_COMMAND_UNRECOGNIZED_COMMAND
+} MetaCommandResult;
+
+typedef enum { PREPARE_SUCCESS, PREPARE_UNRECOGNIZED_STATEMENT } PrepareResult;
+
+typedef enum { STATEMENT_INSERT, STATEMENT_SELECT } StatementType;
+
+typedef struct {
+ StatementType type;
+} Statement;
+
InputBuffer* new_input_buffer() {
InputBuffer* input_buffer = malloc(sizeof(InputBuffer));
input_buffer->buffer = NULL;
@@ -40,17 +57,67 @@ void close_input_buffer(InputBuffer* input_buffer) {
free(input_buffer);
}

+MetaCommandResult do_meta_command(InputBuffer* input_buffer) {
+ if (strcmp(input_buffer->buffer, ".exit") == 0) {
+ close_input_buffer(input_buffer);
+ exit(EXIT_SUCCESS);
+ } else {
+ return META_COMMAND_UNRECOGNIZED_COMMAND;
+ }
+}
+
+PrepareResult prepare_statement(InputBuffer* input_buffer,
+ Statement* statement) {
+ if (strncmp(input_buffer->buffer, "insert", 6) == 0) {
+ statement->type = STATEMENT_INSERT;
+ return PREPARE_SUCCESS;
+ }
+ if (strcmp(input_buffer->buffer, "select") == 0) {
+ statement->type = STATEMENT_SELECT;
+ return PREPARE_SUCCESS;
+ }
+
+ return PREPARE_UNRECOGNIZED_STATEMENT;
+}
+
+void execute_statement(Statement* statement) {
+ switch (statement->type) {
+ case (STATEMENT_INSERT):
+ printf("This is where we would do an insert.\n");
+ break;
+ case (STATEMENT_SELECT):
+ printf("This is where we would do a select.\n");
+ break;
+ }
+}
+
int main(int argc, char* argv[]) {
InputBuffer* input_buffer = new_input_buffer();
while (true) {
print_prompt();
read_input(input_buffer);

- if (strcmp(input_buffer->buffer, ".exit") == 0) {
- close_input_buffer(input_buffer);
- exit(EXIT_SUCCESS);
- } else {
- printf("Unrecognized command '%s'.\n", input_buffer->buffer);
+ if (input_buffer->buffer[0] == '.') {
+ switch (do_meta_command(input_buffer)) {
+ case (META_COMMAND_SUCCESS):
+ continue;
+ case (META_COMMAND_UNRECOGNIZED_COMMAND):
+ printf("Unrecognized command '%s'\n", input_buffer->buffer);
+ continue;
+ }
}
+
+ Statement statement;
+ switch (prepare_statement(input_buffer, &statement)) {
+ case (PREPARE_SUCCESS):
+ break;
+ case (PREPARE_UNRECOGNIZED_STATEMENT):
+ printf("Unrecognized keyword at start of '%s'.\n",
+ input_buffer->buffer);
+ continue;
+ }
+
+ execute_statement(&statement);
+ printf("Executed.\n");
}
}

原文地址:https://cstack.github.io/db_tutorial/parts/part1.html
原文作者: cstack
译者:Win-Man

介绍并设置 REPL

作为一个 Web 开发人员,我每天的工作中都会使用到关系型数据库,但是这些数据库对于我来说就是一个黑盒。因此我有一些疑问:

  • 在内存中、在磁盘上数据存储格式是怎么样的?
  • 什么时候数据库从内存转移到磁盘?
  • 为什么每个表只能有一个主键?
  • 事务回滚是如何工作的?
  • 索引的格式是怎么样的?
  • 什么时候会发生全表扫以及全表扫是如何工作的?
  • 预处理语句是以什么形式存储的?

简而言之,问题就是数据库是怎么工作的?

为了搞清楚这些问题,我从头开始写了一个数据库。它是模拟 sqlite 数据库实现的,因为 sqlite 设计的就是一个比 MySQL 或者 PostgreSQL 特性更少的数据库。所以我更有希望理解它。这整个数据库都存储在一个文件中。

Sqlite

sqlite 官网上有很多关于 sqlite 内核的文档,除此之外我还有一份资料 SQLite Database System: Design and Implementation

sqlite architecture(https://www.sqlite.org/zipvfs/doc/trunk/www/howitworks.wiki)

一个查询如果要获得数据或者修改数据的话,需要经过一系列的组件。前端包括:

  • tokenizer
  • parser
  • code generator

前端的输入是一个 SQL 查询. 输出的是 sqlite 虚拟机字节码(本质上是一个可以在数据库上操作的编译程序)

后端包括:

  • virtual machine
  • B-tree
  • pager
  • os interface

virtual machine 虚拟机接收前端生成的字节码作为指令。这些指令可以操作一个或多个表、索引,表和索引都是以 B 树的数据结构存储的。虚拟机本质上是一个大的语句与字节码指令转换器。

每棵 B-tree 包含很多节点。每个节点的长度是一页。B 树可以从磁盘上读取一页的数据或者通过命令将数据写回到数据库。

pager 组件接收读取或者写入数据的指令。它需要在正确的数据库数据文件位置写入或者读取数据,也需要将最近访问到的数据页缓存到内存中,并且决定什么时候将这些数据缓存页写到磁盘上。

os interface 操作系统接口层取决于 sqlite 是在哪个操作系统层编译的。在这个课程中,我不会支持多个操作系统平台。

千里之行,始于足下,让我们从一些比较简单的内容开始:REPL(交互式顶层构件)。

实现一个简单的 REPL

当你从命令行启动 sqlite client 的时候,会启动一个循环读取命令并执行命令:

1
2
3
4
5
6
7
8
9
10
~ sqlite3
SQLite version 3.16.0 2016-11-04 19:09:39
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen on a persistent database.
sqlite> create table users (id int, username varchar(255), email varchar(255));
sqlite> .tables
users
sqlite> .exit
~

为了实现这个,我们的主函数会有一个无限输出提示符的循环,它会从读取一行输入,然后处理一行输入:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
int main(int argc, char* argv[]) {
InputBuffer* input_buffer = new_input_buffer();
while (true) {
print_prompt();
read_input(input_buffer);

if (strcmp(input_buffer->buffer, ".exit") == 0) {
close_input_buffer(input_buffer);
exit(EXIT_SUCCESS);
} else {
printf("Unrecognized command '%s'.\n", input_buffer->buffer);
}
}
}

我们需要定义一个 InputBuffer 结构体来存储 getline() 函数得到的内容及信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
typedef struct {
char* buffer;
size_t buffer_length;
ssize_t input_length;
} InputBuffer;

InputBuffer* new_input_buffer() {
InputBuffer* input_buffer = (InputBuffer*)malloc(sizeof(InputBuffer));
input_buffer->buffer = NULL;
input_buffer->buffer_length = 0;
input_buffer->input_length = 0;

return input_buffer;
}

print_prompt() 函数用于输出提示符。我们需要在读取输入之前需要打印提示符。

1
void print_prompt() { printf("db > "); }

通过 getline() 函数读取一行输入

1
ssize_t getline(char **lineptr, size_t *n, FILE *stream);

lineptr: 存储输入内容的缓冲区地址指针。如果被 getline() 函数 mallocated 了 NULL 值,那么用户需要手动释放该空间,即使命令执行失败了。

n:存储分配的缓冲区大小值的变量指针

stream: 读取输入流,我们会从标准输入中读取。

返回值:读取的字节数,有可能比缓冲区大小小。

我们通过 getline() 函数,将读取的输入存储在 input_buffer->buffer 中,分配的缓冲区大小存储在 input_buffer->buffer_length 中。并且将返回结构存储在 input_buffer->input_length

初始环境下,缓冲区是空的,所以 getline 需要分配足够的内存用于存放输入并将指针指向缓冲区。

1
2
3
4
5
6
7
8
9
10
11
12
13
void read_input(InputBuffer* input_buffer) {
ssize_t bytes_read =
getline(&(input_buffer->buffer), &(input_buffer->buffer_length), stdin);

if (bytes_read <= 0) {
printf("Error reading input\n");
exit(EXIT_FAILURE);
}

// Ignore trailing newline
input_buffer->input_length = bytes_read - 1;
input_buffer->buffer[bytes_read - 1] = 0;
}

然后就该定义一个函数用来释放 InputBuffer * 实例的内存以及相应结构中的元素内存(getline 会在 read_input 的时候分配内存给 input_buffer->buffer)

1
2
3
4
void close_input_buffer(InputBuffer* input_buffer) {
free(input_buffer->buffer);
free(input_buffer);
}

最后,我们需要解析并执行命令。目前这只能识别一个命令 .exit,用于退出程序。其他的输入命令我们会输出一个错误然后继续读取新的输入。

1
2
3
4
5
6
if (strcmp(input_buffer->buffer, ".exit") == 0) {
close_input_buffer(input_buffer);
exit(EXIT_SUCCESS);
} else {
printf("Unrecognized command '%s'.\n", input_buffer->buffer);
}

来尝试一下!

1
2
3
4
5
~ ./db
db > .tables
Unrecognized command '.tables'.
db > .exit
~

好了,我们现在有了一个可以使用的 REPL 程序。在下一部分中,我们会开始开发我们的命令语言。同时,在这给出这个部分的完整程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>

typedef struct {
char* buffer;
size_t buffer_length;
ssize_t input_length;
} InputBuffer;

InputBuffer* new_input_buffer() {
InputBuffer* input_buffer = malloc(sizeof(InputBuffer));
input_buffer->buffer = NULL;
input_buffer->buffer_length = 0;
input_buffer->input_length = 0;

return input_buffer;
}

void print_prompt() { printf("db > "); }

void read_input(InputBuffer* input_buffer) {
ssize_t bytes_read =
getline(&(input_buffer->buffer), &(input_buffer->buffer_length), stdin);

if (bytes_read <= 0) {
printf("Error reading input\n");
exit(EXIT_FAILURE);
}

// Ignore trailing newline
input_buffer->input_length = bytes_read - 1;
input_buffer->buffer[bytes_read - 1] = 0;
}

void close_input_buffer(InputBuffer* input_buffer) {
free(input_buffer->buffer);
free(input_buffer);
}

int main(int argc, char* argv[]) {
InputBuffer* input_buffer = new_input_buffer();
while (true) {
print_prompt();
read_input(input_buffer);

if (strcmp(input_buffer->buffer, ".exit") == 0) {
close_input_buffer(input_buffer);
exit(EXIT_SUCCESS);
} else {
printf("Unrecognized command '%s'.\n", input_buffer->buffer);
}
}
}

原文地址:https://cstack.github.io/db_tutorial/
原文作者: cstack
译者:Win-Man

一个数据库是如何工作的?

  • 在内存中、在磁盘上数据存储格式是怎么样的?
  • 什么时候数据库从内存转移到磁盘?
  • 为什么每个表只能有一个主键?
  • 事务回滚是如何工作的?
  • 索引的格式是怎么样的?
  • 什么时候会发生全表扫以及全表扫是如何工作的?
  • 预处理语句是以什么形式存储的?

简而言之,问题就是数据库是怎么工作的?

为了搞明白这个问题,我用 C 写了一个 sqlite 的克隆版本,并且记录了我的整个过程。

目录

“What I cannot create, I do not understand” - Richard Feynman

sqlite architecture

sqlite architecture (https://www.sqlite.org/arch.html)

MySQL query rewrite

介绍

MySQL 从 5.7.6 版本开始支持 SQL 改写的功能,对于符合条件的 SQL 可以进行对应的修改。在 8.0.12 之前的版本只支持 SELECT 语句的改写,8.0.12 版本开始支持 SELECT/INSERT/REPLACE/UPDATE/DELETE 语句的改写。

Query Rewrite Plugin 安装

安装 Query Rewrite Plugin 直接通过运行 install_rewriter.sql 中的 SQL 来进行安装即可,如果需要卸载的话,执行 uninstall_rewriter.sql 就可以,这两个 sql 文本文件存放在安装目录的 share 目录下。

  • 执行 install_rewriter.sql 安装插件
1
# mysql -uroot -p -h127.0.0.1 < install_rewriter.sql
  • 通过变量确认插件开启
1
2
3
4
5
6
7
root@127.0.0.1 : (none) 03:53:58> SHOW GLOBAL VARIABLES LIKE 'rewriter_enabled';
+------------------+-------+
| Variable_name | Value |
+------------------+-------+
| rewriter_enabled | ON |
+------------------+-------+
1 row in set (0.00 sec)
  • 如果需要每次重启 rewriter_enabled 参数都是开启的话,可以在配置文件中配置对应参数
1
2
[mysqld]
rewriter_enabled=ON
  • 动态修改参数可以通过以下方式开启或者关闭插件,在刚安装完插件的时候,默认是开启的
1
2
SET GLOBAL rewriter_enabled = ON;
SET GLOBAL rewriter_enabled = OFF;

安装完 Query Rewrite Plugin 插件之后自从创建一个 query_rewrite 的 database,该 database 下有一张 rewrite_rules 表,用于记录对应的改写规则。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
root@127.0.0.1 : query_rewrite 06:48:28> show create table query_rewrite.rewrite_rules\G
*************************** 1. row ***************************
Table: rewrite_rules
Create Table: CREATE TABLE `rewrite_rules` (
`id` int NOT NULL AUTO_INCREMENT,
`pattern` varchar(5000) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
`pattern_database` varchar(20) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
`replacement` varchar(5000) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL,
`enabled` enum('YES','NO') CHARACTER SET utf8mb4 COLLATE utf8mb4_bin NOT NULL DEFAULT 'YES',
`message` varchar(1000) CHARACTER SET utf8mb4 COLLATE utf8mb4_bin DEFAULT NULL,
`pattern_digest` varchar(64) DEFAULT NULL,
`normalized_pattern` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
1 row in set (0.00 sec)

Query Rewrite Plugin 使用

使用语句改写的话,只需要将改写规则写入到 rewrite_rules 表中并且通过 CALL query_rewrite.flush_rewrite_rules() 加载生效即可。

比如将 SELECT ? 语句改写为 SELECT ? + 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
root@127.0.0.1 : query_rewrite 07:32:41> INSERT INTO query_rewrite.rewrite_rules (pattern, replacement)
-> VALUES('SELECT ?', 'SELECT ? + 1');
Query OK, 1 row affected (0.01 sec)

root@127.0.0.1 : query_rewrite 07:32:51> CALL query_rewrite.flush_rewrite_rules();
Query OK, 1 row affected (0.01 sec)

root@127.0.0.1 : query_rewrite 07:33:01> select 2;
+-------+
| 2 + 1 |
+-------+
| 3 |
+-------+
1 row in set, 1 warning (0.00 sec)

Note (Code 1105): Query 'select 2' rewritten to 'SELECT 2 + 1' by a query rewrite plugin

SQL 改写的话没有语句类型的限制,改写前的语句类型可以跟改写后的语句类型不一致,比如可以将 SELECT 语句改写为 INSERT 语句。如果调用 CALL query_rewrite.flush_rewrite_rules(); 使改写规则生效的时候报错了,可以查看 rewrite_rules 表中对应的 message 字段,会有具体的错误信息提示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
root@127.0.0.1 : query_rewrite 07:34:51> use gangshen;
Database changed
root@127.0.0.1 : gangshen 07:37:13> create table t(id int primary key,name varchar(20));
Query OK, 0 rows affected (0.03 sec)

root@127.0.0.1 : gangshen 07:37:36> INSERT INTO query_rewrite.rewrite_rules (pattern, replacement) VALUES('SELECT * from t', 'insert into t values(3,"aa")');
Query OK, 1 row affected (0.01 sec)

root@127.0.0.1 : gangshen 07:39:12> CALL query_rewrite.flush_rewrite_rules();
ERROR 1644 (45000): Loading of some rule(s) failed.
root@127.0.0.1 : gangshen 07:39:26> select * from query_rewrite.rewrite_rules;
+----+-----------------+------------------+------------------------------+---------+--------------------------------------------------+------------------------------------------------------------------+--------------------+
| id | pattern | pattern_database | replacement | enabled | message | pattern_digest | normalized_pattern |
+----+-----------------+------------------+------------------------------+---------+--------------------------------------------------+------------------------------------------------------------------+--------------------+
| 2 | SELECT ? | NULL | SELECT ? + 1 | YES | NULL | d1b44b0c19af710b5a679907e284acd2ddc285201794bc69a2389d77baedddae | select ? |
| 3 | SELECT * from t | NULL | insert into t values(3,"aa") | YES | Parse error in pattern: >>No database selected<< | NULL | NULL |
+----+-----------------+------------------+------------------------------+---------+--------------------------------------------------+------------------------------------------------------------------+--------------------+
2 rows in set (0.00 sec)

root@127.0.0.1 : gangshen 07:39:36> update query_rewrite.rewrite_rules set pattern='SELECT * from gangshen.t' where id = 3;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0

root@127.0.0.1 : gangshen 07:40:18> update query_rewrite.rewrite_rules set replacement='insert into gangshen.t values(3,"aa")' where id = 3;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0

root@127.0.0.1 : gangshen 07:40:50> CALL query_rewrite.flush_rewrite_rules();
Query OK, 1 row affected (0.01 sec)

root@127.0.0.1 : gangshen 07:40:54> select * from gangshen.t limit 5;
Empty set (0.00 sec)

root@127.0.0.1 : gangshen 07:41:55> select * from gangshen.t;
Query OK, 1 row affected, 1 warning (0.00 sec)

Note (Code 1105): Query 'select * from gangshen.t' rewritten to 'insert into gangshen.t values(3,"aa")' by a query rewrite plugin
root@127.0.0.1 : gangshen 07:42:00> select * from gangshen.t limit 5;
+----+------+
| id | name |
+----+------+
| 3 | aa |
+----+------+
1 row in set (0.00 sec)

Query Rewrite Plugin 的限制

  • 8.0.12 之前的版本只支持 SELECT 语句的改写,8.0.12 版本开始支持 SELECT/INSERT/REPLACE/UPDATE/DELETE 语句的改写
  • 只支持单独语句以及 prepare 语句的改写,视图或者存储过程相关的语句无法改写
  • 改写没有语句类型的限制,比如 SELECT 语句可以改写为 INSERT 语句

Query Rewrite Plugin 场景及总结

个人能想到的 SQL 改写可以使用到的场景:

  1. 对于危险 SQL 比如 delete from table_name 这种,可以做到拦截,避免不小心将表数据完全删除
  2. 业务上线后,发现某条 SQL 对数据库造成很大压力影响,可以将该 SQL 改写为 SELECT 1 这种语句,临时降低对数据库的负载
  3. 数据库升级之后,SQL 执行计划不正确导致数据库负载过高,在不修改业务代码的情况下,可以通过 SQL 改写的方式让 SQL 走更优的执行计划
  4. 历史库场景,过滤 DELETE 操作
  5. SQL 改写可以理解为简单的触发器功能,可以参考 pt-osc 的方式支持在线字段变更