环境介绍
- 创建数据库TESTDB
- 在TESTDB数据库中创建表EMPLOYEE
- EMPLOYEE表字段为FIRST_NAME, LAST_NAME, AGE, SEX 和 INCOME
- 连接数据库TESTDB使用的用户名为 "testuser" ,密码为 "test123"
pymysql执行事务
事务机制可以确保数据一致性
事务应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为ACID特性。
- 原子性(atomicity):一个事务是一个不可分割的工作单位,事务中包括的诸操作要么都做,要么都不做
- 一致性(consistency):事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的
- 隔离性(isolation):一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰
- 持久性(durability):持续性也称永久性(permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他操作或故障不应该对其有任何影响
Python DB API 2.0 的事务提供了两个方法 commit 或 rollback
数据库操作
import pymysql
# 数据库连接
db = pymysql.connect("localhost","testuser",3306,"test123","TESTDB" ) # 打开数据库连接
cursor = db.cursor() # 使用 cursor() 方法创建一个游标对象 cursor
cursor.execute("SELECT VERSION()") # 使用 execute() 方法执行 SQL 查询
data = cursor.fetchone() # 使用 fetchone() 方法获取单条数据
print ("Database version : %s " % data)
# Database version : 5.5.20-log
# 创建数据库表
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE") # 使用 execute() 方法执行 SQL,如果表存在则删除
sql = """CREATE TABLE EMPLOYEE (
FIRST_NAME CHAR(20) NOT NULL,
LAST_NAME CHAR(20),
AGE INT,
SEX CHAR(1),
INCOME FLOAT )""" # 使用预处理语句创建表
cursor.execute(sql)
# 数据库插入操作
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
LAST_NAME, AGE, SEX, INCOME)
VALUES ('Mac', 'Mohan', 20, 'M', 2000)""" # SQL 插入语句 或者如下
# sql = "INSERT INTO EMPLOYEE(FIRST_NAME,LAST_NAME, AGE, SEX, INCOME) VALUES ('%s', '%s', %s, '%s', %s)" % ('Mac', 'Mohan', 20, 'M', 2000)
try:
cursor.execute(sql) # 执行sql语句
db.commit() # 提交到数据库执行【pymysql是事务型,必须commit之后才能执行相应的操作】
except:
db.rollback() # 如果发生错误则回滚
# 数据库查询操作
# fetchone(): 获取下一个查询结果集,结果集是一个对象
# fetchall(): 接收全部的返回结果行
# rowcount: 这是一个只读属性,并返回执行execute()方法后影响的行数
sql = "SELECT * FROM EMPLOYEE \
WHERE INCOME > %s" % (1000) # SQL 查询语句
try:
cursor.execute(sql)
results = cursor.fetchall() # 获取所有记录列表
for row in results:
fname = row[0]
lname = row[1]
age = row[2]
sex = row[3]
income = row[4]
print ("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \
(fname, lname, age, sex, income )) # 打印结果
# fname=Mac, lname=Mohan, age=20, sex=M, income=2000
except:
print ("Error: unable to fetch data")
# 数据库更新操作
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M') # SQL 更新语句
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
# 删除操作
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20) # SQL 删除语句
try:
cursor.execute(sql)
db.commit()
except:
db.rollback()
# 关闭数据库连接
db.close()
错误处理
标准错误分为两种,警告和错误。其中错误又分为接口错误和数据库错误两大类。以下两个均是StandardError的子类
:
- Warning:当有严重警告时触发,例如插入数据是被截断等等
- Error:警告以外所有其他错误类,
以下两个均是Error的子类
- InterfaceError:当有数据库接口模块本身的错误(而不是数据库的错误)发生时触发
- DatabaseError:和数据库有关的错误发生时触发。
以下6个为DatabaseError的子类
- DataError:当有数据处理时的错误发生时触发,例如:除零错误,数据超范围等等
- OperationalError 指非用户控制的,而是操作数据库时发生的错误。例如:连接意外断开、 数据库名未找到、事务处理失败、内存分配错误等等操作数据库是发生的错误
- IntegrityError:完整性相关的错误,例如外键检查失败等
- InternalError:数据库的内部错误,例如游标(cursor)失效了、事务同步失败等等
- ProgrammingError:程序错误,例如数据表(table)没找到或已存在、SQL语句语法错误、 参数数量错误等等
- NotSupportedError:不支持错误,指使用了数据库不支持的函数或API等,例如在连接对象上使用.rollback()函数,然而数据库并不支持事务或者事务已关闭
批量插入的2种方法
自动化单条insert(execute)
单条insert的话插入5w条数据大约用时5秒左右,相对来说效率不高,但是能对每条插入的数据进行操作,例如校验、查询数据库存不存在重复的数据等
import time
from pymysql import *
# 装饰器,计算插入50000条数据需要的时间
def timer(func):
def decor(*args):
start_time = time.time()
func(*args)
end_time = time.time()
d_time = end_time - start_time
print("the running time is : ", d_time)
return decor
@timer
def add_test_users():
conn = connect(host='主机名', port='端口号', user='用户名', password='密码', database='数据库名', charset='utf8')
cs = conn.cursor()
for num in range(0, 58000):
try:
sql = "insert into '表名'(字段名) values(值)"
cs.execute(sql)
except Exception as e:
return
conn.commit()
cs.close()
conn.close()
print('OK')
add_test_users()
拼接sql语句(executemany)
使用这种批量插入方式插入5w条数据用时大约不到1秒,效率有所提高,但是因为pymysql是事务型的,一条数据插入不成功则所有的数据均插入失败,不适合有主键或者非重复联合索引的情况。
另外,
execute(sql)
: 接受一条语句,然后执行executemany(templet,args)
:能同时执行多条语句,执行同样多的语句可比execute()快很多,强烈建议执行多条语句时使用executemanytemplet
: sql模板字符串,例如insert into table(id,name,age) values(%s,%s,%s)
args
: 模板字符串中的参数,是一个list,在list中的每一个元素必须是元组,例如:[(1,'mike'),(2,'jordan'),(3,'james'),(4,'rose')]
import time
from pymysql import *
# 装饰器,计算插入50000条数据需要的时间
def timer(func):
def decor(*args):
start_time = time.time()
func(*args)
end_time = time.time()
d_time = end_time - start_time
print("the running time is : ", d_time)
return decor
@timer
def add_test_users():
usersvalues = []
for num in range(1, 50000):
usersvalues.append(('需要插入的字段对应的value'))
conn = connect(host='主机名', port='端口号', user='用户名', password='密码', database='数据库名', charset='utf8')
cs = conn.cursor()
# 注意这里使用的是executemany而不是execute,下边有对executemany的详细说明
cs.executemany('insert into '表名'(字段名) values(%s,%s,%s,%s)', usersvalues)
conn.commit()
cs.close()
conn.close()
print('OK')
add_test_users()