第2节,mysql-connector 正确的批量写入数据

批量写入数据需要使用executemany方法,这个方法并不是要执行很多条sql,而是一条有待格式化的字符串和多条需要写入数据库的数据。

1. 新建user 表

为了试验批量写入,新建一张user表

create table user(
    id int NOT NULL AUTO_INCREMENT,
    `name` varchar(50) NOT NULL,
    `age` int NOT NULL,
    PRIMARY KEY (`id`)
)ENGINE=InnoDB DEFAULT CHARSET=utf8;

2. executemany

import time
import mysql.connector

# 第一步,创建连接
mydb = mysql.connector.connect(
    host="10.110.30.3",  # 数据库主机地址
    user="flink_user",  # 数据库用户名
    passwd="123456",  # 数据库密码
    port=6606,
    database='flink_db'
)

sql = "insert into user(name, age)values(%s,%s)"

citys = [
    ('小明', 14) for i in range(100000)
]

t1 = time.time()
mycursor = mydb.cursor()            # 创建cursor
mycursor.executemany(sql, citys)    # 批量执行
mydb.commit()                       # 提交
t2 = time.time()

print(t2-t1)        # 71秒

想要使用executemany方法,必须遵循以下两条规则和一条潜规则,咱们先说前两条规则

  1. sql语句中,values不符需要使用%s占位符,不管字段是什么类型
  2. seq_params 参数要么是[tuple, tuple...]的形式,要么是(tuple, tuple...) 的形式

这两条规则是明处的规则,我上面的代码已经遵守了这两条规则,但是呢,插入10万条数据竟然用了71秒,这个速度着实不乐观,我确定是executemany方法使用的不正确,那么问题出在哪里呢?

3. executemany 与 execute

很多文章里说,executemany就是多次调用执行execute方法,现在实验一下,执行10万次execute,看需要多久

sql = "insert into user(name, age)values('小明', 14)"

t1 = time.time()
mycursor = mydb.cursor()            # 创建cursor
for i in range(100000):
    mycursor.execute(sql)
mydb.commit()                       # 提交
t2 = time.time()

print(t2-t1)        # 68.127

批量写入10万条数据,与10万次写入操作耗时几乎相同,这么看来,似乎executemany 多次调用了execute方法,只有这样才能解释实验现象。

但如果是这样,还有必要分出executemany 和 execute方法么,岂不是多此一举?

4. 正确的使用executemany批量写入数据

批量写入一定比多次单条写入要快,就算是多次单条写入是在最后进行commit也是如此,这是批量写入的性能优势,数据是分批发送到mysql的,减少了网络往返所需要的时间。

事情到了这一步,就需要深入到源码来一探究竟。

    def executemany(self, operation, seq_params):
        """Execute the given operation multiple times"""

        # Optimize INSERTs by batching them
        if re.match(RE_SQL_INSERT_STMT, operation):
            if not seq_params:
                self._rowcount = 0
                return None
            stmt = self._batch_insert(operation, seq_params)
            if stmt is not None:
                return self.execute(stmt)

        rowcnt = 0
        try:
            for params in seq_params:
                self.execute(operation, params)
        except (ValueError, TypeError) as err:
            raise errors.ProgrammingError(
                "Failed executing the operation; {0}".format(err))

        self._rowcount = rowcnt
        return None

我去除掉了一些无关紧要的代码,只保留了最核心的部分, 注意看这部分代码

# Optimize INSERTs by batching them

这里明显是批处理优化,它采用优化的前提条件是sql语句要匹配上RE_SQL_INSERT_STMT,否则就要逐条执行execute 方法,也就是很多文章里的说法。那么什么样的sql才能是开启批处理优化的sql呢,我的sql为什么不行呢,让我们看一下这个正则表达式

RE_SQL_INSERT_STMT = re.compile(
    r"({0}|\s)*INSERT({0}|\s)*INTO\s+[`'\"]?.+[`'\"]?(?:\.[`'\"]?.+[`'\"]?)"
    r"(0, 2)\s+VALUES\s*\(.+(?:\s*,.+)*\)".format(SQL_COMMENT),
    re.I | re.M | re.S)

这个正则不是特别容易理解,但是我逐一到VALUES的前面有一个\s+,它表示1个或多个空格,而我的sql是“insert into user(name, age)values(%s,%s)”, 刚好没有空格,难不成是这个原因导致sql语句没有匹配上正则表达式,最终没有执行批处理优化,修改代码测试一下

sql = "insert into user(name, age) values(%s,%s)"

citys = [
    ('小明', 14) for i in range(100000)
]

t1 = time.time()
mycursor = mydb.cursor()            # 创建cursor
mycursor.executemany(sql, citys)    # 批量执行
mydb.commit()                       # 提交
t2 = time.time()

print(t2-t1)        # 1.6 秒

修改后,写入10万条数据,仅仅需要1.6秒,原来是书写不规范导致不能进行批量插入操作,可是这么多年来,我一直都是这样写sql语句,values前面紧跟着右括号,也符合sql语法,这真是一个大坑啊。

扫描关注, 与我技术互动

QQ交流群: 211426309

加入知识星球, 每天收获更多精彩内容

分享日常研究的python技术和遇到的问题及解决方案