python之操作mysql数据库(PyMySQL)

环境介绍

  1. 创建数据库TESTDB
  2. 在TESTDB数据库中创建表EMPLOYEE
  3. EMPLOYEE表字段为FIRST_NAME, LAST_NAME, AGE, SEX 和 INCOME
  4. 连接数据库TESTDB使用的用户名为 "testuser" ,密码为 "test123"

pymysql执行事务

事务机制可以确保数据一致性
事务应该具有4个属性:原子性、一致性、隔离性、持久性。这四个属性通常称为ACID特性。

  1. 原子性(atomicity):一个事务是一个不可分割的工作单位,事务中包括的诸操作要么都做,要么都不做
  2. 一致性(consistency):事务必须是使数据库从一个一致性状态变到另一个一致性状态。一致性与原子性是密切相关的
  3. 隔离性(isolation):一个事务的执行不能被其他事务干扰。即一个事务内部的操作及使用的数据对并发的其他事务是隔离的,并发执行的各个事务之间不能互相干扰
  4. 持久性(durability):持续性也称永久性(permanence),指一个事务一旦提交,它对数据库中数据的改变就应该是永久性的。接下来的其他操作或故障不应该对其有任何影响

Python DB API 2.0 的事务提供了两个方法 commit 或 rollback

数据库操作

import pymysql
 
 # 数据库连接
db = pymysql.connect("localhost","testuser",3306,"test123","TESTDB" )   #  打开数据库连接
cursor = db.cursor() # 使用 cursor() 方法创建一个游标对象 cursor
cursor.execute("SELECT VERSION()") # 使用 execute()  方法执行 SQL 查询 
data = cursor.fetchone() # 使用 fetchone() 方法获取单条数据
print ("Database version : %s " % data)
# Database version : 5.5.20-log
 
# 创建数据库表
cursor.execute("DROP TABLE IF EXISTS EMPLOYEE") # 使用 execute() 方法执行 SQL,如果表存在则删除
sql = """CREATE TABLE EMPLOYEE (
         FIRST_NAME  CHAR(20) NOT NULL,
         LAST_NAME  CHAR(20),
         AGE INT,  
         SEX CHAR(1),
         INCOME FLOAT )"""  # 使用预处理语句创建表
cursor.execute(sql)

# 数据库插入操作
sql = """INSERT INTO EMPLOYEE(FIRST_NAME,
         LAST_NAME, AGE, SEX, INCOME)
         VALUES ('Mac', 'Mohan', 20, 'M', 2000)"""  # SQL 插入语句  或者如下
# sql = "INSERT INTO EMPLOYEE(FIRST_NAME,LAST_NAME, AGE, SEX, INCOME) VALUES ('%s', '%s',  %s,  '%s',  %s)" % ('Mac', 'Mohan', 20, 'M', 2000)
try:
   cursor.execute(sql)  # 执行sql语句
   db.commit()  # 提交到数据库执行【pymysql是事务型,必须commit之后才能执行相应的操作】
except:
   db.rollback()    # 如果发生错误则回滚

# 数据库查询操作
# fetchone(): 获取下一个查询结果集,结果集是一个对象
# fetchall(): 接收全部的返回结果行
# rowcount: 这是一个只读属性,并返回执行execute()方法后影响的行数
sql = "SELECT * FROM EMPLOYEE \
       WHERE INCOME > %s" % (1000)  # SQL 查询语句
try:
   cursor.execute(sql)
   results = cursor.fetchall()  # 获取所有记录列表
   for row in results:
      fname = row[0]
      lname = row[1]
      age = row[2]
      sex = row[3]
      income = row[4]
      print ("fname=%s,lname=%s,age=%s,sex=%s,income=%s" % \
             (fname, lname, age, sex, income )) # 打印结果
             # fname=Mac, lname=Mohan, age=20, sex=M, income=2000
except:
   print ("Error: unable to fetch data")

# 数据库更新操作
sql = "UPDATE EMPLOYEE SET AGE = AGE + 1 WHERE SEX = '%c'" % ('M')  # SQL 更新语句
try:
   cursor.execute(sql)
   db.commit()
except:
   db.rollback()

# 删除操作
sql = "DELETE FROM EMPLOYEE WHERE AGE > %s" % (20)  # SQL 删除语句
try:
   cursor.execute(sql)
   db.commit()
except:
   db.rollback()

# 关闭数据库连接
db.close()

错误处理

标准错误分为两种,警告和错误。其中错误又分为接口错误和数据库错误两大类。以下两个均是StandardError的子类

  1. Warning:当有严重警告时触发,例如插入数据是被截断等等
  2. Error:警告以外所有其他错误类,以下两个均是Error的子类
    1. InterfaceError:当有数据库接口模块本身的错误(而不是数据库的错误)发生时触发
    2. DatabaseError:和数据库有关的错误发生时触发。以下6个为DatabaseError的子类
      1. DataError:当有数据处理时的错误发生时触发,例如:除零错误,数据超范围等等
      2. OperationalError 指非用户控制的,而是操作数据库时发生的错误。例如:连接意外断开、 数据库名未找到、事务处理失败、内存分配错误等等操作数据库是发生的错误
      3. IntegrityError:完整性相关的错误,例如外键检查失败等
      4. InternalError:数据库的内部错误,例如游标(cursor)失效了、事务同步失败等等
      5. ProgrammingError:程序错误,例如数据表(table)没找到或已存在、SQL语句语法错误、 参数数量错误等等
      6. NotSupportedError:不支持错误,指使用了数据库不支持的函数或API等,例如在连接对象上使用.rollback()函数,然而数据库并不支持事务或者事务已关闭

批量插入的2种方法

自动化单条insert(execute)

单条insert的话插入5w条数据大约用时5秒左右,相对来说效率不高,但是能对每条插入的数据进行操作,例如校验、查询数据库存不存在重复的数据等

import time
from pymysql import *

# 装饰器,计算插入50000条数据需要的时间
def timer(func):
    def decor(*args):
        start_time = time.time()
        func(*args)
        end_time = time.time()
        d_time = end_time - start_time
        print("the running time is : ", d_time)

    return decor

@timer
def add_test_users():
    conn = connect(host='主机名', port='端口号', user='用户名', password='密码', database='数据库名', charset='utf8')
    cs = conn.cursor()
    for num in range(0, 58000):
        try:
            sql = "insert into '表名'(字段名) values(值)"
            cs.execute(sql)
        except Exception as e:
            return
    conn.commit()
    cs.close()
    conn.close()
    print('OK')

add_test_users()

拼接sql语句(executemany)

使用这种批量插入方式插入5w条数据用时大约不到1秒,效率有所提高,但是因为pymysql是事务型的,一条数据插入不成功则所有的数据均插入失败,不适合有主键或者非重复联合索引的情况
另外,

  1. execute(sql): 接受一条语句,然后执行
  2. executemany(templet,args):能同时执行多条语句,执行同样多的语句可比execute()快很多,强烈建议执行多条语句时使用executemany
    1. templet : sql模板字符串,例如insert into table(id,name,age) values(%s,%s,%s)
    2. args: 模板字符串中的参数,是一个list,在list中的每一个元素必须是元组,例如:[(1,'mike'),(2,'jordan'),(3,'james'),(4,'rose')]
import time
from pymysql import *

# 装饰器,计算插入50000条数据需要的时间
def timer(func):
    def decor(*args):
        start_time = time.time()
        func(*args)
        end_time = time.time()
        d_time = end_time - start_time
        print("the running time is : ", d_time)

    return decor

@timer
def add_test_users():
    usersvalues = []
    for num in range(1, 50000):
        usersvalues.append(('需要插入的字段对应的value'))
    conn = connect(host='主机名', port='端口号', user='用户名', password='密码', database='数据库名', charset='utf8')
    cs = conn.cursor()
    # 注意这里使用的是executemany而不是execute,下边有对executemany的详细说明
    cs.executemany('insert into '表名'(字段名) values(%s,%s,%s,%s)', usersvalues)
    conn.commit()
    cs.close()
    conn.close()
    print('OK')

add_test_users()