Python 操作 PostgreSQL

Python 操作 PostgreSQL

这里利用 psycopg2 库连接PostgreSQL。我使用的 Python 版本为 3.5。官方还未支持,非官方的库可以从Unofficial Windows Binaries for Python Extension Packages下载。

1. 利用 Python 查看 PostgreSQL 版本

本例的作用主要是实现 Python 和 PostgreSQL 的连接。

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import psycopg2
import sys

con = None

try:
    con = psycopg2.connect(database='postgres',user='postgres', password='123')
    cur = con.cursor()
    cur.execute('SELECT version()')
    ver = cur.fetchone()
    print(ver)
except psycopg2.DatabaseError as e:
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

执行结果:

('PostgreSQL 9.5.2, compiled by Visual C++ build 1800, 64-bit',)
>>>

psycopg2.connect表示对 PostgreSQL 数据库的连接,参数包括数据库名称、用户名和密码。con.cursor()用来在数据库和 Python 之间传递命令和结果。cur.execute()表示执行命令,括号里面的命令为 PostgreSQL 格式的命令行。cur.fetchone()表示取回一行结果。cur.fetchall()表示取回所有结果。
except语句中,sys.exit()会引发一个异常:SystemExit,如果这个异常没有被捕获,那么python解释器将会退出。如果有捕获此异常的代码,那么这些代码还是会执行。1 表示状态码 status=1,异常退出。
finally语句中,表示不论上面的程序执行是否异常,con.close()最后断开数据库连接。

2. 创建表

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import psycopg2
import sys

con = None

try:
    con = psycopg2.connect(database='postgres',user='postgres', password='123')
    cur = con.cursor()
    cur.execute("DROP TABLE IF EXISTS cars")
    cur.execute("CREATE TABLE Cars(Id INTEGER PRIMARY KEY, Name VARCHAR(20), Price INT)")
    cur.execute("INSERT INTO Cars VALUES(1,'Audi',52642)")
    cur.execute("INSERT INTO Cars VALUES(2,'Mercedes',57127)")
    cur.execute("INSERT INTO Cars VALUES(3,'Skoda',9000)")
    cur.execute("INSERT INTO Cars VALUES(4,'Volvo',29000)")
    cur.execute("INSERT INTO Cars VALUES(5,'Bentley',350000)")
    cur.execute("INSERT INTO Cars VALUES(6,'Citroen',21000)")
    cur.execute("INSERT INTO Cars VALUES(7,'Hummer',41400)")
    cur.execute("INSERT INTO Cars VALUES(8,'Volkswagen',21600)")
    cur.execute("SELECT * FROM cars")
    con.commit()
    rows = cur.fetchall()
    for row in rows:
        print(row)
except psycopg2.DatabaseError as e:
    if con:
        con.rollback()
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

创建的表可以在 PostgreSQL 中查看。这里为了方便通过SELECT * FROM cars将结果取回到 Python 中显示,结果如下:

(1, 'Audi', 52642)
(2, 'Mercedes', 57127)
(3, 'Skoda', 9000)
(4, 'Volvo', 29000)
(5, 'Bentley', 350000)
(6, 'Citroen', 21000)
(7, 'Hummer', 41400)
(8, 'Volkswagen', 21600)
>>>

可以看到,可以通过execute一次输入多组命令,通过con.commit()执行。这里调用了cur.fetchall()命令,所有即使没有con.commit(),上面的execute命令也会被执行。但是如果没有fetchall命令,就一定要有commit,命令才可以被执行。简单地说,就是execute命令自己是无法执行的,必须有commit或者要取回命令fetchall fetchone存在才可以。
except语句表示如果发生错误,使用con.rollback()命令回滚可能已经发生的任何改变。确保数据库不会发生损坏。

3. executemany() 命令

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import psycopg2
import sys

cars = (
    (1, 'Audi', 52643),
    (2, 'Mercedes', 57127),
    (3, 'Skoda', 9000),
    (4, 'Volvo', 29000),
    (5, 'Bentley', 350000),
    (6, 'Citroen', 21000),
    (7, 'Hummer', 41400),
    (8, 'Volkswagen', 21600)
)

con = None

try:
    con = psycopg2.connect(database='postgres',user='postgres', password='123')
    cur = con.cursor()
    cur.execute("DROP TABLE IF EXISTS Cars")
    cur.execute("CREATE TABLE Cars(Id INT PRIMARY KEY, Name VARCHAR(20), Price INT)")
    query = "INSERT INTO Cars (Id, Name, Price) VALUES (%s, %s, %s)"
    cur.executemany(query, cars)
    con.commit()
except psycopg2.DatabaseError as e:
    if con:
        con.rollback()
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

重复的代码总是不好的,上例中要输入的数据其实都是同一类型的,可以利用类似print函数的方法:query定义语句内容,元组cars插入数据内容。使用cur.executemany(query, cars)一次性执行多条语句。

4. 取回数据 fetchall() & fecthone()

fetchall()命令:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import psycopg2
import sys

con = None

try:
    con = psycopg2.connect("dbname='postgres' user='postgres' password='123'")
    cur = con.cursor()
    cur.execute("SELECT * FROM Cars")
    rows = cur.fetchall()
    for row in rows:
        print(row)
except psycopg2.DatabaseError as e:
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

执行结果:

(1, 'Audi', 52643)
(2, 'Mercedes', 57127)
(3, 'Skoda', 9000)
(4, 'Volvo', 29000)
(5, 'Bentley', 350000)
(6, 'Citroen', 21000)
(7, 'Hummer', 41400)
(8, 'Volkswagen', 21600)
>>>

fetchone()命令:

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import psycopg2
import sys

con = None

try:
    con = psycopg2.connect("dbname='postgres' user='postgres' password='123'")
    cur = con.cursor()
    cur.execute("SELECT * FROM Cars")
    while True:
        row = cur.fetchone()
        if row == None:
            break
        print(row[0], row[1], row[2])
except psycopg2.DatabaseError as e:
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

执行结果:

1 Audi 52643
2 Mercedes 57127
3 Skoda 9000
4 Volvo 29000
5 Bentley 350000
6 Citroen 21000
7 Hummer 41400
8 Volkswagen 21600
>>>

5. 输出字典数据

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import psycopg2
import psycopg2.extras
import sys

con = None

try:
    con = psycopg2.connect("dbname='postgres' user='postgres' password='123'")
    cursor = con.cursor(cursor_factory=psycopg2.extras.DictCursor)
    cursor.execute("SELECT * FROM Cars")
    rows = cursor.fetchall()
    for row in rows:
        print ("%s %s %s" % (row["id"], row["name"], row["price"]))
except psycopg2.DatabaseError as e:
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

执行结果:

1 Audi 52643
2 Mercedes 57127
3 Skoda 9000
4 Volvo 29000
5 Bentley 350000
6 Citroen 21000
7 Hummer 41400
8 Volkswagen 21600
>>>

6. 参数化 UPDATE 和 SELECT

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import psycopg2
import sys

con = None
uId = 1
uPrice = 10000

try:
    con = psycopg2.connect("dbname='postgres' user='postgres' password='123'")
    cur = con.cursor()
    cur.execute("UPDATE Cars SET Price=%s WHERE Id=%s", (uPrice, uId))
    cur.execute("SELECT * FROM Cars WHERE Id=%s", (uId,))
    con.commit()
    print("Number of rows updated: %d" % cur.rowcount)
    print(cur.fetchall())
except psycopg2.DatabaseError as e:
    if con:
        con.rollback()
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

执行结果:

Number of rows updated: 1
[(1, 'Audi', 10000)]
>>>

在执行参数化命令时,参数必须放在元组中,如上面的查询命令,即使只有以一个参数。

7. 字典格式的参数化查询

#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import psycopg2
import sys

con = None
uid = 3

try:
    con = psycopg2.connect("dbname='postgres' user='postgres' password='123'")
    cur = con.cursor()
    cur.execute("SELECT * FROM Cars WHERE Id=%(id)s", {'id': uid })
    print(cur.fetchone())
except psycopg2.DatabaseError as e:
    print('Error: %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

执行结果:

(3, 'Skoda', 9000)
>>>

8. 插入和读取图片

插入图片:

# #!/usr/bin/env python3
# # -*- coding: utf-8 -*-

import psycopg2
import sys

def readImage():
    try:
        fin = open("pic.jpg", "rb")
        img = fin.read()
        return img
    except IOError as e:
        print("Error %d: %s" % (e.args[0], e.args[1]))
        sys.exit(1)
    finally:
        if fin:
            fin.close()

try:
    con = psycopg2.connect(database="postgres", user="postgres", password="123")
    cur = con.cursor()
    data = readImage()
    binary = psycopg2.Binary(data)
    cur.execute("DROP TABLE IF EXISTS Images")
    cur.execute("CREATE TABLE Images(Id INTEGER PRIMARY KEY, Data BYTEA)")
    cur.execute("INSERT INTO Images VALUES (1, %s)", (binary,))
    con.commit()
except psycopg2.DatabaseError as e:
    if con:
        con.rollback()
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

此程序在数据库 Images 表中插入图片 pic.jpg。
postgresql-python-small.png

读取图片:

# !/usr/bin/env python3
# -*- coding: utf-8 -*-

import psycopg2
import sys

def writeImage(data):
    try:
        fout = open('zju.jpg','wb')
        fout.write(data)
    except IOError as e:
        print("Error %d: %s" % (e.args[0], e.args[1]))
        sys.exit(1)
    finally:
        if fout:
            fout.close()

try:
    con = psycopg2.connect(database="postgres", user="postgres", password="123")
    cur = con.cursor()
    cur.execute("SELECT Data FROM Images LIMIT 1")  # 限制读的行数
    data = cur.fetchone()[0]
    writeImage(data)
except psycopg2.DatabaseError as e:
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

最终在文件夹内生成一个 zju.jpg 的图像。LIMIT 1表示读取前1行。由于之前 Images 表中只有一行,因此,第一行就是图像的二进制数据。

9. 打印标题

# !/usr/bin/env python3
# -*- coding: utf-8 -*-

import psycopg2
import sys

con = None

try:
    con = psycopg2.connect("dbname='postgres' user='postgres' password='123'")
    cur = con.cursor()
    cur.execute('SELECT * FROM Cars')
    col_names = [cn[0] for cn in cur.description]
    rows = cur.fetchall()
    print("%s %-10s %s" % (col_names[0], col_names[1], col_names[2]))
    for row in rows:
        print("%2s %-10s %s" % row)
    print(type(cur.description))
    for cn in cur.description:
        print(cn)
except psycopg2.DatabaseError as e:
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

执行结果:

id name       price
 2 Mercedes   57127
 3 Skoda      9000
 4 Volvo      29000
 5 Bentley    350000
 6 Citroen    21000
 7 Hummer     41400
 8 Volkswagen 21600
 1 Audi       10000
\<class 'tuple'>
Column(name='id', type_code=23, display_size=None, internal_size=4, precision=None, scale=None, null_ok=None)
Column(name='name', type_code=1043, display_size=None, internal_size=20, precision=None, scale=None, null_ok=None)
Column(name='price', type_code=23, display_size=None, internal_size=4, precision=None, scale=None, null_ok=None)
>>>

通过cur.descripton可以获得列的名称。

10. 列出数据库中所有表

# !/usr/bin/env python3
# -*- coding: utf-8 -*-

import psycopg2
import sys

con = None

try:
    con = psycopg2.connect(database='postgres', user='postgres', password='123')
    cur = con.cursor()
    cur.execute("""SELECT table_name FROM information_schema.tables
       WHERE table_schema = 'public'""")
    rows = cur.fetchall()
    for row in rows:
        print(row[0])
except psycopg2.DatabaseError as e:
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

执行结果:

score
class
student
student_bak
images
friends
cars
>>>

这是数据库 postgres 中的所有表。注意在这里不能使用\d命令。

11. 导出导入数据

11.1 导出数据:

# !/usr/bin/env python3
# -*- coding: utf-8 -*-

import psycopg2
import sys

con = None
fout = None

try:
    con = psycopg2.connect(database='postgres', user='postgres', password='123')
    cur = con.cursor()
    fout = open('cars.txt', 'w')
    cur.copy_to(fout, 'cars', sep="|")
except psycopg2.DatabaseError as e:
    print('Error %s' % e)
    sys.exit(1)
except IOError as e:
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()
    if fout:
        fout.close()

执行结果:

car.txt

2|Mercedes|57127
3|Skoda|9000
4|Volvo|29000
5|Bentley|350000
6|Citroen|21000
7|Hummer|41400
8|Volkswagen|21600
1|Audi|10000

11.2 导入数据:

# !/usr/bin/env python3
# -*- coding: utf-8 -*-

import psycopg2
import sys

con = None
f = None

try:
    con = psycopg2.connect(database='postgres', user='postgres', password='123')
    cur = con.cursor()
    cur.execute("DROP TABLE IF EXISTS Cars")
    cur.execute("CREATE TABLE Cars(Id INT PRIMARY KEY, Name VARCHAR(20), Price INT)")
    f = open('cars.txt', 'r')
    cur.copy_from(f, 'cars', sep="|")
    con.commit()
except psycopg2.DatabaseError as e:
    if con:
        con.rollback()
    print('Error %s' % e)
    sys.exit(1)
except IOError as e:
    if con:
        con.rollback()
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()
    if f:
        f.close()

这里的导入没有 UPDATE 功能,如果导入数据的 KEY 和表中原有的 KEY 重合,则会报错,如何改为 UPDATE 我还没有找到方法。谁有好方法请告诉我。

12. Transactions

事务(Transaction)是访问并可能更新数据库中各种数据项的一个程序执行单元(unit)。事务通常由高级数据库操纵语言或编程语言(如SQL,C++或Java)书写的用户程序的执行所引起,并用形如begin transaction和commit transaction或 rollback transaction语句(或函数调用)来界定。事务由事务开始(begin transaction)和事务结束(commit transaction或 rollback transaction)之间执行的全体操作组成。SQL Server中事务语句开始或结束时transaction可简写为tran。

如果输入execute语句后不执行commit(),则execute语句不会在 PostgreSQL 中执行。

# !/usr/bin/env python3
# -*- coding: utf-8 -*-

import psycopg2
import sys

con = None

try:
    con = psycopg2.connect(database='postgres', user='postgres', password='123')
    cur = con.cursor()
    cur.execute("DROP TABLE IF EXISTS Friends")
    cur.execute("CREATE TABLE Friends(Id serial PRIMARY KEY, Name VARCHAR(10))")
    cur.execute("INSERT INTO Friends(Name) VALUES ('Tom')")
    cur.execute("INSERT INTO Friends(Name) VALUES ('Rebecca')")
    cur.execute("INSERT INTO Friends(Name) VALUES ('Jim')")
    cur.execute("INSERT INTO Friends(Name) VALUES ('Robert')")
    #con.commit()
except psycopg2.DatabaseError as e:
    if con:
        con.rollback()
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

以上丢弃、创建、插入命令都不会执行。但如果将autocommit设置为 True。则不用commit()命令,每条execute()语句都会自动执行。

# !/usr/bin/env python3
# -*- coding: utf-8 -*-
import psycopg2
import sys

con = None

try:
    con = psycopg2.connect(database='postgres', user='postgres', password='123')
    con.autocommit = True
    cur = con.cursor()
    cur.execute("DROP TABLE IF EXISTS Friends")
    cur.execute("CREATE TABLE Friends(Id serial PRIMARY KEY, Name VARCHAR(10))")
    cur.execute("INSERT INTO Friends(Name) VALUES ('Jane')")
    cur.execute("INSERT INTO Friends(Name) VALUES ('Tom')")
    cur.execute("INSERT INTO Friends(Name) VALUES ('Rebecca')")
    cur.execute("INSERT INTO Friends(Name) VALUES ('Jim')")
    cur.execute("INSERT INTO Friends(Name) VALUES ('Robert')")
    cur.execute("INSERT INTO Friends(Name) VALUES ('Patrick')")
except psycopg2.DatabaseError as e:
    print('Error %s' % e)
    sys.exit(1)
finally:
    if con:
        con.close()

通过以上命令,在 PostgreSQL 中创建表 Friends。

[TOC]

参考资料:

  1. PostgreSQL Python tutorial
  2. Psycopg – PostgreSQL database adapter for Python