如果的程序需要以非常高的并发向mysql写入数据库时,该如何提高写入的速度呢?这里只考虑客户端程序,不考虑mysql服务端,假设mysql服务器的性能足够好。很容易想到采用多线程或者使用多进程。
使用多进程时,使用数据库连接池是没有效果的,因为数据库连接池只在多线程条件下才会使用。多线程条件下,如果你的程序与数据库之间只有一个连接,就会出现多个线程争抢一个数据库连接的情况,你不得不用多线程互斥技术避免出现混乱。
数据库连接池则很好的解决了多线程争抢数据库连接的问题。所谓数据库连接池,只是提前申请一定数量的连接,每个线程在需要向数据库写入数据时,先从连接池里申请连接,得到真实连接后进行数据库操作,操作结束后再将连接放回到连接池。假设你起的是10个多线程,综合实际并发,你或许只申请大小为5的连接池就足够了,这能保证同一时刻有5个线程同时操作数据库。
你去申请连接,未必总是能顺利的得到连接,因为连接池里的连接可能全部都处于busy状态,正在被其他的线程使用,因此你需要处理无法获取到连接的异常,或者将连接池的大小设置的与线程数相同,这样理论上每个线程都能得到一个连接。但这样做,并不理智,失去了创建连接池的部分意义。
为什么不能每个线程维护一个连接呢,这样就不需要数据库连接池了。这个想法很普遍,可这样做在技术上是一个糟糕的选择,因为在线程里维护连接是非常麻烦的。连接池则统一帮助你维护这些连接,如果某个连接断开了,连接池还可以帮你恢复连接。
使用mysql-connector 创建连接池是非常容易的
from mysql.connector.pooling import MySQLConnectionPool
# 第一步,创建连接
mysql_pool = MySQLConnectionPool(
host="10.110.30.3", # 数据库主机地址
user="flink_user", # 数据库用户名
passwd="123456", # 数据库密码
port=6606,
database='flink_db',
pool_size= 5
)
sql = "select * from city"
conn = mysql_pool.get_connection()
cursor = conn.cursor()
cursor.execute(sql)
for data in cursor.fetchall():
print(data)
cursor.close()
conn.close() # 将连接放回到连接池
接下来,我结合源码为你讲解连接池如何工作
创建连接池,用到了MySQLConnectionPool类,咱们来看一下它的初始化函数
class MySQLConnectionPool(object):
"""Class defining a pool of MySQL connections"""
def __init__(self, pool_size=5, pool_name=None, pool_reset_session=True,
**kwargs):
if kwargs:
self.set_config(**kwargs)
cnt = 0
while cnt < self._pool_size:
self.add_connection()
cnt += 1
为了易于阅读,我去掉了一些无关紧要的代码,后面的源码也做相同处理。在MySQLConnectionPool类的初始化函数里,根据_pool_size属性创建足够多的连接。在add_connecttion方法里,新创建的连接都放在了self._cnx_queue 中。
从连接池里获取连接使用get_connection方法,我们看看它是如何工作的
def get_connection(self):
with CONNECTION_POOL_LOCK:
try:
cnx = self._cnx_queue.get(block=False) # 从self._cnx_queue 里获取连接
except queue.Empty:
raise errors.PoolError(
"Failed getting connection; pool exhausted")
# pylint: disable=W0201,W0212
if not cnx.is_connected() \
or self._config_version != cnx._pool_config_version:
cnx.config(**self._cnx_config)
try:
cnx.reconnect() # 重新建立连接
except errors.InterfaceError:
# Failed to reconnect, give connection back to pool
self._queue_connection(cnx)
raise
cnx._pool_config_version = self._config_version
# pylint: enable=W0201,W0212
return PooledMySQLConnection(self, cnx) # 返回PooledMySQLConnection对象
get_connection 做了3件事情:
在使用完连接后,一定要调用close方法()释放连接,来看一下PooledMySQLConnection的close方法如何工作
def close(self):
try:
cnx = self._cnx
if self._cnx_pool.reset_session:
cnx.reset_session()
finally:
self._cnx_pool.add_connection(cnx)
self._cnx = None
close方法会重新将连接加入到连接池_cnx_pool中,在这之前,要将连接的session重置。
QQ交流群: 211426309