aiomysql 是一个支持异步访问mysql的python库,它依赖并重用PyMySQL的大部分部分代码,tornado 结合aiomysql 能够异步且高效的对mysql进行访问操作。
安装方式
pip install aiomysql
异步代码的编写对于初学者而言,只需要注意使用await关键字就可以了,其他的与同步代码几乎相同,这也是async/await 关键字的优点,它让异步代码看起来与同步代码相似。
为了能在tornado 服务里更好的使用aiomysql, 我封装了一个AioMysqlClient 类,用来实现对mysql数据库的访问功能
db.py
import aiomysql.cursors
import asyncio
import traceback
from tornado.ioloop import IOLoop
class AioMysqlClient():
def __init__(self, host, port, username, password, db_name, **kwargs):
self.host = host
self.port = port
self.username = username
self.password = password
self.db_name = db_name
self.kwargs = kwargs # 其他参数
self.conn_pool = None # 连接池
self.is_connected = False # 是否处于连接状态
self.lock = asyncio.Lock() # 异步锁
async def init_pool(self):
"""
创建数据库连接
:return:
"""
print("init_pool")
try:
self.conn_pool = await aiomysql.create_pool(
host=self.host,
port=self.port,
user=self.username,
password=self.password,
db=self.db_name,
**self.kwargs
)
self.is_connected = True
except:
print(traceback.format_exc())
self.is_connected = False
return self
async def insert(self, sql, args= None):
conn = await self.conn_pool.acquire()
cur = await self.execute(conn, sql, args)
await conn.commit()
conn.close() # 不是关闭连接,而是还到连接池中
return cur
async def fetch_one(self, sql, args=None):
conn = await self.conn_pool.acquire()
cur = await self.execute(conn, sql, args)
if cur.rowcount == 0:
return None
return await cur.fetchone()
async def fetch_all(self, sql, args=None):
conn = await self.conn_pool.acquire()
cur = await self.execute(conn, sql, args)
if cur.rowcount == 0:
return None
return await cur.fetchall()
async def execute(self, conn, sql, args=None):
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute(sql, args) # 执行sql
return cur
async def test_insert():
mysql_client = AioMysqlClient("127.0.0.1", 6606, 'root', 'root', 'test')
await mysql_client.init_pool()
sql = "insert into student(name, age)values('小明', 14)"
print("执行sql")
cur = await mysql_client.insert(sql)
print(cur.lastrowid)
async def test_select():
mysql_client = AioMysqlClient("127.0.0.1", 6606, 'root', 'root', 'test')
await mysql_client.init_pool()
sql = "select * from student"
data = await mysql_client.fetch_one(sql)
print(data)
datas = await mysql_client.fetch_all(sql)
print(datas)
if __name__ == "__main__":
asyncio.run(test_select())
AioMysqlClient 在内部维护了一个连接池,使用的时候,通过acquire方法获取一个空闲的连接,连接使用结束后,通过close方法归还给连接池。
app.py
import json
import asyncio
import tornado.ioloop
from tornado.web import RequestHandler, Application
from tornado.httpserver import HTTPServer
from tornado.options import options, define
from tornado.ioloop import IOLoop
from db import AioMysqlClient
define('port', default=8000, help='监听端口')
async def connect_mysql():
mysql_client = AioMysqlClient("127.0.0.1", 6606, 'root', 'root', 'test')
await mysql_client.init_pool()
return mysql_client
class BaseHnadler(RequestHandler):
def initialize(self):
self.mysql_client = self.settings['mysql']
class StudentsHandler(BaseHnadler):
async def post(self):
data = json.loads(self.request.body)
sql = f"insert into student(name, age)values('{data['name']}', {data['age']})"
cur = await self.mysql_client.insert(sql)
return self.write(json.dumps({'id': cur.lastrowid}))
class StudentInfoHandler(BaseHnadler):
async def get(self, id):
sql = f"select * from student where id = {id}"
data = await self.mysql_client.fetch_one(sql)
return self.write(json.dumps(data))
def make_app(config):
options.parse_command_line()
handlers_routes = [
(r'/students', StudentsHandler),
(r'/students/(.*)', StudentInfoHandler)
]
app = Application(handlers=handlers_routes, **config)
return app
def main():
config = {
'mysql': IOLoop.current().run_sync(connect_mysql)
}
app = make_app(config)
http_server = HTTPServer(app)
http_server.listen(options.port)
print('ok')
tornado.ioloop.IOLoop.current().start()
if __name__ == '__main__':
main()
RequestHandler 可以共用一个数据库连接池,初始化操作放在connect_mysql函数中进行,通过IOLoop.current().run_sync 方法运行,如果connect_mysql是异步的,那么run_sync 会一直运行直到有结果,必须这样处理,才能在server启动前完成mysql连接的初始化操作。
通过向Application 初始化函数传入config, 在RequestHandler 中就可以通过self.settings['mysql'] 获得AioMysqlClient 实例对象。
奇怪的是,以上代码在windows系统下无法正常运行,只能在linux系统下正确执行,问题就出在run_sync 上,总是报错
RuntimeError: no running event loop
关于这个问题,慢慢研究吧,等有了结果,我会更新这篇教程。
BaseHnadler 是所有处理请求类的父类,在这里可以编写所有子类都会用到的代码,RequestHandler 在被创建以后,会调用initialize 做初始化操作,在这里我定义了mysql_client, 后续的调用会更加方便。
现在,服务已经可以正常启动了,接下来,要多一些测试来验证我们的代码是正确的
import requests
def test_add():
data = {
'name': '小刚',
'age': 15
}
res = requests.post('http://127.0.0.1:8000/students', json=data)
print(res.json())
if __name__ == '__main__':
test_add()
这段代码用来测试mysql新增,测试查询请求,可以直接通过浏览器访问
http://127.0.0.1:8000/students/1
QQ交流群: 211426309