Elasticsearch数据库

环境配置

安装环境

pip install elasticsearch==7.6.0

EsDao包装类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
# -- coding: utf-8 --

"""
@version: v1.0
@author: huangyc
@file: EsDao.py
@Description: Es统一操作类
@time: 2020/4/27 10:22
"""

from elasticsearch.helpers import bulk
from elasticsearch import Elasticsearch
import pandas as pd


class EsDao(object):
"""
ElasticSearch的数据操作类
"""
# 查询批次大小
DEFAULT_BATCH_SIZE = 1000

# 写入批次大小
BULK_BATCH_SIZE = 10000

def __init__(self, hosts, timeout=3600*24):
self.hosts = hosts
self.timeout = timeout
self.es = Elasticsearch(hosts, timeout=self.timeout)

def save_data_list(self, index_name, data_list):
"""
保存数据列表到es的指定索引中
:param index_name: 索引名称
:param data_list: 数据列表,列表元素代表一行数据,元素类型为dict
:return:
"""
bulk_data_lst = [
data_list[i:i + self.BULK_BATCH_SIZE]
for i in range(0, len(data_list), self.BULK_BATCH_SIZE)
]

if len(data_list) > 0 and '_id' in data_list[0]:
for bulk_data in bulk_data_lst:
actions = [{
"_index": index_name,
"_type": index_name,
"_id": data.pop("_id"),
"_source": data
}
for data in bulk_data
]
bulk(self.es, actions, index=index_name, raise_on_error=True)
else:
for bulk_data in bulk_data_lst:
actions = [{
"_index": index_name,
"_type": index_name,
"_source": data
}
for data in bulk_data
]
bulk(self.es, actions, index=index_name, raise_on_error=True)

def is_index_exists(self, index_name):
"""
判断指定索引是否存在
:param index_name: 索引名称
:return:
"""
return self.es.indices.exists(index=index_name)

def delete_by_query(self, index_name, query_body):
"""
按查询结果删除数据
:param index_name:
:param query_body:
:return:
"""
return self.es.delete_by_query(index_name, query_body)

def clear_index_data(self, index_name):
"""
清空指定索引的数据
:param index_name:
:return:
"""
return self.delete_by_query(
index_name=index_name,
query_body={
"query": {
"match_all": {}
}
}
)

def save_df_data(self, index_name, df):
"""
保存pandas的DataFrame到es的指定索引中
:param index_name: 索引名称
:param df: 要保存的dataframe
:return:
"""
col_lst = df.columns.tolist()
dic_lst = [dict([(c, v) for c, v in zip(col_lst, r)]) for r in df.values.tolist()]
self.save_data_list(index_name=index_name, data_list=dic_lst)

def create_index(self, index_name, mapping_properties):
"""
创建索引
:param index_name: 索引名称
:param mapping_properties: 索引mapping中的属性列表
:return:
"""
if not self.es.indices.exists(index=index_name):
mapping = {
"mappings": {
index_name: {
"properties": mapping_properties
}
}
}
res = self.es.indices.create(index=index_name, body=mapping)
if res is not None and 'acknowledged' in res:
return res.get('acknowledged')
return False

def _search_with_scroll(self, index_name, query_body):
if "size" not in query_body:
query_body["size"] = self.DEFAULT_BATCH_SIZE
response = self.es.search(
index=index_name,
body=query_body,
search_type="dfs_query_then_fetch",
scroll="120m",
timeout="60m"
)
scroll_id = response["_scroll_id"]
while True:
sources = [doc["_source"] for doc in response["hits"]["hits"]]
if len(sources) == 0:
break
yield sources
response = self.es.scroll(scroll_id=scroll_id, scroll="60m")

def query_for_df(self, index_name, query_body):
"""
执行查询并获取pandas.DataFrame格式的返回值
:param index_name: 索引名称
:param query_body: 查询条件
:return:
"""
sources = []
for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
sources.extend(sub_source)
return pd.DataFrame(sources)

def query_for_df_with_batch(self, index_name, query_body, batch_size=DEFAULT_BATCH_SIZE):
"""
按批次大小查询并返回pandas.DataFrame的generator格式的返回值
:param index_name: 索引名称
:param query_body: 查询条件
:param batch_size: 批次大小
:return:
"""
if "size" not in query_body:
query_body["size"] = batch_size
for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
yield pd.DataFrame(sub_source)

def get_first_row_with_df(self, index_name):
"""
获取指定索引的首行数据,格式为pandas.DataFrame
可用于获取索引的元信息
:param index_name: 索引名称
:return:
"""
query_body = {
"size": 1,
"query": {
"match_all": {}
}
}
for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body):
return pd.DataFrame(sub_source)

使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
class TaskMeta:
'''
数据元类
'''
def __init__(self, text, doc_id, sentence_id, reg_lst, flag, has_reg, text_source="primitive"):
self.text = text
self.doc_id = doc_id
self.sentence_id = sentence_id
self.reg_lst = reg_lst
self.flag = flag
self.has_reg = has_reg
self.text_source = text_source

def __repr__(self):
return f'{self.text} {self.doc_id} {self.sentence_id} {self.reg_lst} {self.flag} {self.has_reg} {self.text_source}'

def to_dict(self):
return {"text": self.text,
"doc_id": self.doc_id,
"sentence_id": self.sentence_id,
"reg_lst": self.reg_lst,
"flag": self.flag,
"has_reg": self.has_reg,
"text_source": self.text_source}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def create_index(target_es_dao, index_name, mapping):
'''
创建es索引
:return: 是否创建成功
'''
if not target_es_dao.is_index_exists(index_name):
target_es_dao.create_index(index_name, mapping)
else:
target_es_dao.clear_index_data(index_name)
print(f"索引{index_name}已存在, 已清除数据")

def writer_fun(target_es_dao, target_index, sample_lst):
'''
写数据到es库
'''
df_sample_lst = []
[df_sample_lst.append(sample.to_dict()) for sample in sample_lst]
df_sample_lst = pd.DataFrame(df_sample_lst)
target_es_dao.save_df_data(target_index, df_sample_lst)
print(f'写入数据{len(sample_lst)}条')

def es_cal_test():
# 获取连接
source_es_dao = EsDao(f"http://{aug_config.SOURCE_IP}:{aug_config.SOURCE_PORT}/")
query_condition = {
"query_string": {
"default_field": "has_reg",
"query": "true"
}
}
query_body = {
"query": query_condition
}
# 查询数据
datas = source_es_dao.query_for_df(index_name=aug_config.SOURCE_INDEX, query_body=query_body)
records = datas.to_dict(orient='record')
sample_lst = []
for record in records:
sample_lst.append(
TaskMeta(
text=record["text"],
doc_id=record["doc_id"],
sentence_id=record["sentence_id"],
reg_lst=record["reg_lst"],
flag=record["flag"],
has_reg=record["has_reg"]
)
)

# 创建索引
create_index(target_es_dao, aug_config.TARGET_INDEX, aug_config.MAPPING)
# 写入数据
writer_fun(target_es_dao, aug_config.TARGET_INDEX, sample_lst=sample_lst)

if __name__ == '__main__':
es_cal_test()

Oracle数据库

Python操作Oracle数据库:cx_Oracle

环境配置

Linux上Python连接Oracle解决报错cx_Oracle.DatabaseError: DPI-1047

  1. 安装库

    1
    pip install cx-Oracle
  2. 链接库准备,需要将oci.dll、oraocci11.dll、oraociei11.dll复制到sitepackages路径下,oracle client下载链接,并配置到系统环境变量,链接中没有的自己去官网(win64所有平台linux64)注册一个账号下载对应的版本

    1
    2
    -- 查看oracle版本
    SELECT * FROM v$version;

    没有配置会报如下的错:

    1
    2
    3
    4
    5
    # Windows下报错
    cx_Oracle.DatabaseError: DPI-1047: Cannot locate a 64-bit Oracle Client library: "D:\software\win_or

    # Linux下报错
    cx_Oracle.DatabaseError: DPI-1047: Cannot locate a 64-bit Oracle Client library: "libclntsh.so: cannot open shared object file: No such file or directory". See https://cx-oracle.readthedocs.io/en/latest/user_guide/installation.html for help
    • windows下安装完客户端后,配置oracle客户端的环境变量

      1
      D:\software\win_oracle_dlls\instantclient_11_2
    • linux下可以使用rpm安装包安装

      1
      sudo rpm -ivh oracle-instantclient11.2-basic-11.2.0.4.0-1.x86_64.rpm

      然后将环境变量配置到/etc/profile

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      # 配置oracle客户端
      export ORACLE_CLIENT_HOME=/lib/oracle/11.2/client64
      export TNS_ADMIN=$ORACLE_CLIENT_HOME
      export LD_LIBRARY_PATH=$ORACLE_CLIENT_HOME/lib
      export ORABIN=$ORACLE_CLIENT_HOME/bin

      PATH=$PATH:$ORABIN
      export PATH

      export PATH=$ORACLE_HOME:$PATH
      export PATH=$PATH:$HOME/bin:$ORACLE_CLIENT_HOME/bin

      其他类似找不到libclntsh.sod的错误,如果出现这个错误,请进行软连接挂载文件,让系统的路径能正确的获取到该文件,操作如下:

      1
      2
      3
      sudo sh -c "/usr/lib/oracle/instantclient_11_1 > /etc/ld.so.conf.d/oracle-instantclient.conf"

      sudo ldconfig

sql基础

建表

1
2
3
4
5
6
7
8
9
--blob字段插入实例
create table blob_table_tmp(
id number primary key,
blob_cl blob not null,
clob_cl clob not null
);
insert into blob_table_tmp values(1,rawtohex('11111000011111'),'增加一条记录时,碰到插入blob类型数据出错');
insert into blob_table_tmp values(3,rawtohex('4561888'),'增加一条记录时,碰到插入blob类型数据出错');
insert into blob_table_tmp values(4,rawtohex('增加一条记录时333'),'增加一条记录时,碰到插入blob类型数据出错');

查询

获取连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
FINANCE_DB_HOST = "192.168.x.x"
FINANCE_DB_PORT = 1521
FINANCE_DB_USER = "hyc"
FINANCE_DB_PASSWORD = "123456"
FINANCE_DB_DB = "ORCL"

class OracleConn():
config_path = ''
@staticmethod
def get_conn(conn_name, encoding="UTF-8"):
conn_str = str(eval("%s_DB_USER" % (OracleConn.config_path, conn_name))) + "/" + str(eval("%s.%s_DB_PASSWORD" % (OracleConn.config_path, conn_name)))
conn_str += "@" + str(eval("%s_DB_HOST" % (OracleConn.config_path, conn_name)))
conn_str += ":" + str(eval("%s_DB_PORT" % (OracleConn.config_path, conn_name)))
conn_str += "/" + str(eval("%s_DB_DB" % (OracleConn.config_path, conn_name)))
return ora.connect(conn_str, encoding=encoding, nencoding=encoding)

读写数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
def oracle_test():
# 获取数据库连接
conn = OracleConn.get_conn("FINANCE")
cur = conn.cursor()

# 查询数据
sql = "select id,blob_cl,clob_cl from FINANCE.blob_table_tmp"
datas = []
r = cur.execute(sql)
# 假设name是clob字段类型
[datas.append((gg[0], gg[1].read().decode('utf-8'), gg[2].read())) for gg in r]

# 写入数据
insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)"
res = []
[res.append((data[0], data[1])) for data in datas]
cur.executemany(insert_sql, res)
cur.execute('commit')

cur.close()
conn.close()
print("写入结束")


if __name__ == '__main__':
oracle_test()

相关操作

关于数据库的连接,查询和写入

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import cx_Oracle

class Setting:
DB_USER = 'narutohyc'
DB_PASSWORD = 'hyc'
DB_IP = '192.168.0.1'
DB_PORT = ''
DB_SERVICE = 'dataBaseName'
setting = Setting()

def oracle_test():
# 获取数据库连接
conn = cx_Oracle.connect('%s/%s@%s/%s' % (setting.DB_USER, setting.DB_PASSWORD, setting.DB_IP, setting.DB_SERVICE), encoding='utf-8')
cur = conn.cursor()

# 查询数据
sql = "select ID, name from hyc_database"
datas = []
r = cur.execute(sql)
# 假设name是clob字段类型
[datas.append((gg[0], gg[1].read())) for gg in r]

# 写入数据
insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)"
res = []
[res.append((data[0], data[1])) for data in datas]
cur.executemany(insert_sql, res)
cur.execute('commit')

cur.close()
conn.close()
print("写入结束")

if __name__ == '__main__':
oracle_test()

Postgresql数据库

官方文档DocumentationPostgreSQL 16

  1. 查询
  2. 数据类型

我终于学会了使用python操作postgresql

保姆级 CentOS 7离线安装PostgreSQL 14教程

易百_PostgreSQL教程

离线安装数据库

先从centos7-pg_14.2下载下载rpm包(微云下载centos7.6_PostgreSQL14.2),或者直接官方下载安装教程安装,如果离线安装就下载rpm包

1
2
3
4
5
# 离线安装执行以下命令安装
rpm -ivh postgresql14-libs-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-server-14.2-1PGDG.rhel7.x86_64.rpm
rpm -ivh postgresql14-contrib-14.2-1PGDG.rhel7.x86_64.rpm

出现OSError: Python library not found: libpython3.6mu.so.1.0, libpython3.6m.so.1.0, libpython3.6.so.1.0, libpython3.6m.so的解决办法

1
yum install python3-devel

创建数据库data和log文件夹

1
2
3
4
5
6
# 创建数据库data和log文件夹
mkdir -p /home/postgres/pgsql_data
mkdir -p /home/postgres/pgsql_log

# 创建日志文件
touch /home/postgres/pgsql_log/pgsql.log

授权给安装数据时自动创建的postgres用户

1
2
chown -R postgres:postgres /home/postgres/pgsql_data
chown -R postgres:postgres /home/postgres/pgsql_log

切换到安装数据时自动创建的postgres用户

1
su - postgres

初始化数据库到新建数据目录

1
/usr/pgsql-14/bin/initdb -D /home/postgres/pgsql_data

启动服务器(初始化数据库日志文件)

1
2
3
/usr/pgsql-14/bin/pg_ctl -D  /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log start
# 查看状态
/usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log status

切换到管理员开启端口并重启防火墙

1
2
3
su root
firewall-cmd --zone=public --add-port=5432/tcp --permanent
firewall-cmd --reload

修改配置文件实现远程访问vi /home/postgres/pgsql_data/postgresql.conf

1
2
3
4
5
6
# 修改监听地址
listen_addresses = '*'
# 修改最大连接数(按需)
max_connections = 1000
# 修改密码认证
password_encryption = md5

修改可访问的用户IP段

1
2
3
vi /home/pgsql_data/pg_hba.conf(a进入编辑模式,esc退出编辑模式,:wq并按回车保存)
IPV4下修改为或新增
host all all 0.0.0.0/0 trust

postgres用户重启数据库服务

1
2
su - postgres
/usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log restart

数据库安装结束,管理员postgres,默认密码123456

img

使用navicat连接pg库后新建数据库

环境配置

python连接pg

pip install “psycopg[binary,pool]”

  1. 使用sqlalchemy

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    from sqlalchemy import create_engine

    with engine.connect().execution_options(stream_results=True) as connection:
    stream = connection.execute(sql)

    while 1:
    streams = stream.fetchmany(1000)
    if not streams:
    break
    for idx, row in enumerate(streams):
    row = row._asdict()
    print(idx)
  2. 使用psycopg

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    #!/usr/bin/env Python
    # -- coding: utf-8 --

    """
    @version: v1.0
    @author: huangyc
    @file: cur_2_test.py
    @Description:
    @time: 2024/3/21 17:22
    """
    import importlib
    import os

    from basic_support.comm_funcs.comm_utils import gen_uuid32
    from dbutils.pooled_db import PooledDB
    from psycopg import ServerCursor
    from functools import partial
    from psycopg.rows import dict_row

    os.environ['NLS_LANG'] = 'SIMPLIFIED CHINESE_CHINA.UTF8'

    # 解决游标的问题, 默认使用的是ClientCursor
    cls = partial(ServerCursor, name=gen_uuid32())

    config = dict(host='192.168.xx.xx', port=5432, user='xx', password='xx', dbname='xx', cursor_factory=cls)
    dic = dict(keepalives=1, keepalives_idle=30, keepalives_interval=10, keepalives_count=5)
    config.update(dic)

    db_creator = importlib.import_module("psycopg")

    max_cached = 0
    max_connections = 16
    max_usage = 0
    blocking = True

    sql = f"SELECT * FROM t_tmp_filter_docs limit 5000"

    """
    max_cached: 池中空闲连接的最大数量。默认为0,即无最大数量限制。(建议默认)
    max_connections: 被允许的最大连接数。默认为0,无最大数量限制。(视情况而定)
    max_usage: 连接的最大使用次数。默认0,即无使用次数限制。(建议默认)
    blocking: 连接数达到最大时,新连接是否可阻塞。默认False,即达到最大连接数时,再取新连接将会报错。
    (建议True,达到最大连接数时,新连接阻塞,等待连接数减少再连接)
    """
    pool = PooledDB(db_creator,
    maxcached=max_cached,
    maxconnections=max_connections,
    maxusage=max_usage,
    blocking=blocking,
    **config)

    conn = pool.connection()

    cur = conn.cursor(row_factory=dict_row)
    batch_size = 1000
    params = None
    cur.itersize = batch_size
    cur.execute(sql, params)

    idx = 0
    while records := cur.fetchmany(batch_size):
    idx += 1
    print(idx)

    try:
    cur.close()
    except:
    pass

    try:
    conn.close()
    except:
    pass

sql语法

数据库连接

1
2
3
4
5
6
7
8
-- 获取数据库实例连接数
select count(*) from pg_stat_activity;
-- 获取数据库最大连接数
show max_connections;
-- 查询当前连接数详细信息
select * from pg_stat_activity;
-- 查询数据库中各个用户名对应的数据库连接数
select usename, count(*) from pg_stat_activity group by usename;

数据库信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
-- 查询数据库大小
select pg_size_pretty (pg_database_size('pg_fac_stk'));

-- 查询各表磁盘占用
SELECT
table_schema || '.' || table_name AS table_full_name,
pg_size_pretty(pg_total_relation_size('"' || table_schema || '"."' || table_name || '"')) AS size
FROM information_schema.tables where table_name like 'finance_%'
ORDER BY
pg_total_relation_size('"' || table_schema || '"."' || table_name || '"') DESC;

-- 获取各个表中的数据记录数
select relname as TABLE_NAME, reltuples as rowCounts from pg_class where relkind = 'r' order by rowCounts desc;

-- 查看数据库表对应的数据文件
select pg_relation_filepath('product');

-- 查看数据库实例的版本
select version();

-- 分析评估SQL执行情况
EXPLAIN ANALYZE SELECT * FROM t_cfg_opinfo;

-- 获取数据库当前的回滚事务数以及死锁数
select datname,xact_rollback,deadlocks from pg_stat_database;

数据备份与恢复

使用pgdump备份数据库

pgdump是PostgreSQL官方提供的备份工具,可以将数据库的数据和架构保存到一个文件中,使用pgdump备份的优点包括:

  1. 备份数据可以保持原有的结构和特性,还原时可以保证数据准确性
  2. 备份文件可以跨平台传输,方便进行远程备份
  3. 备份文件可以进行压缩,减小文件大小,方便传输和存储

可以新建数据库,建几张表做测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
# 学生表
CREATE TABLE students (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL,
gender VARCHAR(10) NOT NULL,
age INTEGER NOT NULL,
class VARCHAR(20) NOT NULL
);
# 学科表
CREATE TABLE subjects (
id SERIAL PRIMARY KEY,
name VARCHAR(100) NOT NULL
);
# 成绩表
CREATE TABLE scores (
id SERIAL PRIMARY KEY,
student_id INTEGER NOT NULL,
subject_id INTEGER NOT NULL,
score INTEGER NOT NULL,
FOREIGN KEY (student_id) REFERENCES students (id),
FOREIGN KEY (subject_id) REFERENCES subjects (id)
);

# 插入一些测试数据
INSERT INTO students (name, gender, age, class)
VALUES
('Alice', 'Female', 18, 'Class A'),
('Bob', 'Male', 17, 'Class B'),
('Charlie', 'Male', 19, 'Class A'),
('Diana', 'Female', 18, 'Class B');

# 插入学科表数据
INSERT INTO subjects (name)
VALUES
('Mathematics'),
('English'),
('Science');

-- Alice 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
(1, 1, 90),
(1, 2, 85),
(1, 3, 92);
-- Bob 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
(2, 1, 78),
(2, 2, 80),
(2, 3, 75);
-- Charlie 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
(3, 1, 88),
(3, 2, 92),
(3, 3, 90);
-- Diana 的成绩
INSERT INTO scores (student_id, subject_id, score)
VALUES
(4, 1, 95),
(4, 2, 88),
(4, 3, 92);

备份

使用pgdump备份数据库非常简单,只需要在终端中输入相应的命令即可

  • 备份整个数据库

    1
    2
    3
    4
    pg_dump -h <数据库地址> -p <数据库端口> -U <数据库用户名> -F c -b -v -f <备份文件路径> <数据库名称>

    # 示例
    /usr/pgsql-14/bin/pg_dump -h 127.0.0.1 -U postgres -p 5432 -F t -b -v -f build_hyc_test.sql.tar hyc_test
  • 备份指定表或数据

    1
    2
    3
    4
    5
    6
    7
    pg_dump -h <数据库地址> -p <数据库端口> -U <数据库用户名> -F c -b -v -t <表名1> -t <表名2> -f <备份文件路径> <数据库名称>

    # 示例
    -- 备份指定表到sql文件
    -- '-c --if-exists' 会生成 'drop table if exist' 命令
    -- '--no-owner' 是一个选项,用于指定在导出数据库时不包括拥有者信息
    pg_dump --verbose --host=192.168.xx.xx --port=5432 --username=postgres --file /home/huangyc/pg_bak_test/bak_hyc.sql --encoding=UTF-8 -t "public.tushare_wz_index" -t "public.tushare_us_basic" -t "public.dim_fund" -t "public.dim_index" -c --if-exists --no-owner pg_fac_stk

具体参数的含义如下:

  • -h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址
  • -p:数据库服务的监听端口,一般为默认端口5432
  • -U:连接数据库的用户名
  • -F:备份文件的格式,包括自定义格式c,纯文本格式p和归档格式t
  • -b:在备份文件中包含备份的数据库的模式信息
  • -v:备份过程中输出详细的信息
  • -f:备份文件的保存路径和文件名
  • -t:只备份指定的表和数据
1
2
3
4
5
-- 备份postgres库并tar打包
pg_dump -h 127.0.0.1 -p 5432 -U postgres -f postgres.sql.tar -Ft;

-- 备份postgres库,转储数据为带列名的INSERT命令
pg_dumpall -d postgres -U postgres -f postgres.sql --column-inserts;

还原

使用备份文件进行恢复也非常简单,只需要在终端中输入相应的命令即可

  • 恢复整个库

    1
    2
    3
    4
    pg_restore -h <数据库地址> -p <数据库端口> -U <数据库用户名> -d <数据库名称> <备份文件路径>

    # 示例
    /usr/pgsql-14/bin/pg_restore -h 127.0.0.1 -U postgres -p 5432 -d hyc_test_bak build_hyc_test.sql.tar
  • 恢复指定数据

    1
    2
    3
    4
    5
    6
    pg_restore -h <数据库地址> -p <数据库端口> -U <数据库用户名> -t <表名1> -t <表名2> -d <数据库名称> <备份文件路径>

    # 示例
    -- 对于pg_dump备份出来的sql文件,直接执行sql文件即可恢复
    -- 还原指定sql文件到bak_test库(需要自己建库)
    psql --host=192.168.xx.xx --port=5432 --username=postgres -d bak_test --file /home/huangyc/pg_bak_test/bak_hyc.sql.tar

具体参数的含义如下:

  • -h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址
  • -p:数据库服务的监听端口,一般为默认端口5432
  • -U:连接数据库的用户名
  • -d:恢复数据的目标数据库名称
  • -t:只恢复指定的表和数据

命令详解

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
[postgres@pg01 ~]$ pg_dump --help
用法:
pg_dump [选项]... [数据库名字]
**一般选项**:
-f, --file=FILENAME 输出文件或目录名
-F, --format=c|d|t|p 输出文件格式 (c=custom, d=directory, t=tar,p=plain,plain就是sql纯文本 (默认值))
-j, --jobs=NUM 执行多个并行任务进行备份转储工作
-v, --verbose 详细模式
-V, --version 输出版本信息,然后退出
-Z, --compress=0-9 被压缩格式的压缩级别,0表示不压缩
--lock-wait-timeout=TIMEOUT 在等待表锁超时后操作失败
--no-sync 不用等待变化安全写入磁盘
-?, --help 显示此帮助, 然后退出
**控制输出内容选项(常用)**:
-a, --data-only 只转储数据,不包括模式,只对纯文本输出有意义
-s, --schema-only 只转储模式, 不包括数据
-c, --clean 在重新创建之前,先清除(删除)数据库对象,如drop table。只对纯文本输出有意义
-C, --create 指定输出文件中是否生成create database语句,只对纯文本输出有意义
-n, --schema=PATTERN 指定要导出的schema,不指定则导出所有的非系统schema
-N, --exclude-schema=PATTERN 排除导出哪些schema
-O, --no-owner 在明文格式中, 忽略恢复对象所属者
-t, --table=PATTERN 指定导出的表、视图、序列,可以使用多个-t匹配多个表,使用-t之后,-n和-N就失效了
-T, --exclude-table=PATTERN 排除表
-x, --no-privileges 不要转储权限 (grant/revoke)
--disable-triggers 在只恢复数据的过程中禁用触发器
--exclude-table-data=PATTERN do NOT dump data for the specified table(s)
--if-exists 当删除对象时使用IF EXISTS
--inserts 以INSERT命令,而不是COPY命令的形式转储数据,使用该选项可以把数据加载到非pg数据库,会使恢复非常慢
该选项为每行生成1个单独的insert命令,?在恢复过程中遇到错误,将会丢失1行而不是全部表数据
--column-inserts 以带有列名的INSERT命令形式转储数据,例如insert into table_name(column,...) values(value1,...)
--load-via-partition-root 通过根表加载分区
--no-comments 不转储注释
--no-tablespaces 不转储表空间分配信息
--no-unlogged-table-data 不转储没有日志的表数据
--on-conflict-do-nothing 将ON CONFLICT DO NOTHING添加到INSERT命令
**控制输出内容选项(不常用)**:
-S, --superuser=NAME 指定关闭触发器时需要用到的超级用户名。 它只有在使用了--disable-triggers时才有影响。一般情况下,最好不要输入该参数,而是用 超级用户启动生成的脚本。
-b, --blobs 在转储中包括大对象
-B, --no-blobs 排除转储中的大型对象
-E, --encoding=ENCODING 转储以ENCODING形式编码的数据
--binary-upgrade 只能由升级工具使用
--enable-row-security 启用行安全性(只转储用户能够访问的内容)
--extra-float-digits=NUM 覆盖extra_float_digits的默认设置
--disable-dollar-quoting 取消美元 (符号) 引号, 使用 SQL 标准引号
--no-publications 不转储发布
--no-security-labels 不转储安全标签的分配
--no-subscriptions 不转储订阅
--no-synchronized-snapshots 在并行工作集中不使用同步快照
--quote-all-identifiers 所有标识符加引号,即使不是关键字
--rows-per-insert=NROWS 每个插入的行数;意味着--inserts
--section=SECTION 备份命名的节 (数据前, 数据, 及 数据后)
--serializable-deferrable 等到备份可以无异常运行
--snapshot=SNAPSHOT 为转储使用给定的快照
--strict-names 要求每个表和(或)schema包括模式以匹配至少一个实体
--use-set-session-authorization
使用 SESSION AUTHORIZATION 命令代替
ALTER OWNER 命令来设置所有权
**联接选项**:
-d, --dbname=DBNAME 对数据库 DBNAME备份
-h, --host=主机名 数据库服务器的主机名或套接字目录
-p, --port=端口号 数据库服务器的端口号
-U, --username=名字 以指定的数据库用户联接
-w, --no-password 永远不提示输入口令
-W, --password 强制口令提示 (自动)
--role=ROLENAME 在转储前运行SET ROLE

对于pg_dump的自定义备份custom和tar类型的备份,需要使用pg_restore进行恢复,pg_restore语法如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
[postgres@pg01 pg_backup]$ pg_restore --help
pg_restore 从一个归档中恢复一个由 pg_dump 创建的 PostgreSQL 数据库.
用法:
pg_restore [选项]... [文件名]
一般选项:
-d, --dbname=名字 连接数据库名字
-f, --file=文件名 输出文件名(- 对于stdout)
-F, --format=c|d|t 备份文件格式(应该自动进行)
-l, --list 打印归档文件的 TOC 概述
-v, --verbose 详细模式
-V, --version 输出版本信息, 然后退出
-?, --help 显示此帮助, 然后退出
恢复控制选项:
-a, --data-only 只恢复数据, 不包括模式
-c, --clean 在重新创建之前,先清除(删除)数据库对象
-C, --create 创建目标数据库
-e, --exit-on-error 发生错误退出, 默认为继续
-I, --index=NAME 恢复指定名称的索引
-j, --jobs=NUM 执行多个并行任务进行恢复工作
-L, --use-list=FILENAME 从这个文件中使用指定的内容表排序
输出
-n, --schema=NAME 在这个模式中只恢复对象
-N, --exclude-schema=NAME 不恢复此模式中的对象
-O, --no-owner 不恢复对象所属者
-P, --function=NAME(args) 恢复指定名字的函数
-s, --schema-only 只恢复模式, 不包括数据
-S, --superuser=NAME 使用指定的超级用户来禁用触发器
-t, --table=NAME 恢复命名关系(表、视图等)
-T, --trigger=NAME 恢复指定名字的触发器
-x, --no-privileges 跳过处理权限的恢复 (grant/revoke)
-1, --single-transaction 作为单个事务恢复
--disable-triggers 在只恢复数据的过程中禁用触发器
--enable-row-security 启用行安全性
--if-exists 当删除对象时使用IF EXISTS
--no-comments 不恢复注释
--no-data-for-failed-tables 对那些无法创建的表不进行
数据恢复
--no-publications 不恢复发行
--no-security-labels 不恢复安全标签信息
--no-subscriptions 不恢复订阅
--no-tablespaces 不恢复表空间的分配信息
--section=SECTION 恢复命名节 (数据前、数据及数据后)
--strict-names 要求每个表和(或)schema包括模式以匹配至少一个实体
--use-set-session-authorization
使用 SESSION AUTHORIZATION 命令代替
ALTER OWNER 命令来设置所有权
联接选项:
-h, --host=主机名 数据库服务器的主机名或套接字目录
-p, --port=端口号 数据库服务器的端口号
-U, --username=名字 以指定的数据库用户联接
-w, --no-password 永远不提示输入口令
-W, --password 强制口令提示 (自动)
--role=ROLENAME 在恢复前执行SET ROLE操作
选项 -I, -n, -N, -P, -t, -T, 以及 --section 可以组合使用和指定
多次用于选择多个对象.
如果没有提供输入文件名, 则使用标准输入.

表空间

新建表空间

1
2
3
4
# 新建表空间目录 t_fac_ts
mkdir /home/huangyc/t_fac_ts
# 修改表空间的用户权限
chown postgres /home/huangyc/t_fac_ts

pg库新建表空间

1
create tablespace t_fac_ts owner postgres location '/home/huangyc/t_fac_ts';

表空间有关的一些语法

1
2
3
4
5
6
# 删除表空间 (需要先drop表空间所有的表, 或者将该空间下所有的表移除才能drop表空间)
DROP TABLESPACE t_fac_ts;
# 修改具体的表到指定表空间下
ALTER TABLE t_fac_tushare_stock_basic SET TABLESPACE t_fac_ts;
# 修改指定库到指定表空间下
ALTER DATABASE name SET TABLESPACE new_tablespace;

锁表处理

pg锁表解锁

  1. 查看被锁的表

    1
    2
    3
    select a.locktype,a.database,a.pid,a.mode,a.relation,b.relname
    from pg_locks a
    join pg_class b on a.relation = b.oid where relname='t_opt_strhdk_blsj';
  2. 杀死被锁的pid

    1
    select pg_terminate_backend(pid);

表结构修改

1
2
3
4
5
6
7
8
9
10
-- 修改表名
alter table "user" rename to "ts_user";
-- 添加新字段
alter table table_name add column col_name varchar(50);
-- 丢弃某列
alter table table_name drop column col_name;
-- 添加主键
alter table table_name add primary key("col_name");
-- 修改字段名
alter table table_name rename column old_col_name to new_col_name;

数据更新和查询

设置某字段的值

1
2
3
4
5
6
7
8
-- 设置某字段的值
update table_name set col_name=new_value;

-- 更新某个字段并关联其他表
UPDATE table1
SET field_to_update = table2.new_value
FROM table2
WHERE table1.common_column = table2.common_column;

删除表中重复数据

1
2
3
4
5
6
7
8
9
10
-- 查询[旧表]数据的重复情况
select col1,col2,count(*) from old_table group by col1,col2;

-- 所有字段都一样的情况
create table bak_table as select distinct * from table_name;

-- 查询[新表]数据的重复情况
select col1,col2,count(*) from bak_table group by col1,col2;
truncate table old_table;
insert into old_table (col1,col2) select col1,col2 from bak_table;

不存在插入,存在更新

1
insert into ... on conflict(column_name) do update set ...

conflict(column_name): column_name字段是判断要查找的数据是否存在,作为判断条件

column_name必须是主键或者其他具有唯一性的字段(如唯一键或排他键)

1
2
3
4
insert into user(id,username,address,create_date,create_by) 
values('1','刘德华','香港',now(),'system')
on conflict(id)
do update set address='中国',update_date=now(),update_by='system';
1
2
# 批量的方式
insert into testunnest(id, age, name) values (unnest(array[1,3]), unnest(array[18,10]), unnest(array['valupdated', 'val3'])) on conflict (id) do update set age = excluded.age, name = excluded.name;

数据和结构复制

1
2
3
4
5
-- [复制表和数据] 复制表结构和数据 自动建表,不会复制主键什么的
create table new_table as select * from old_table [WITH NO DATA];

-- [复制数据] 复制数据到 新表 表需要提前建,并且表字段要一致,不会复制主键什么的
insert into new_table (col_0, col_1) select col_0, col_1 from old_table;

视图

普通视图

视图是一个虚拟表,它是根据一个或多个基本表的查询结果动态生成的,每次查询视图时都会执行相应的查询

1
2
3
4
5
6
CREATE VIEW view_name AS
SELECT column1, column2, ...
FROM table_name
WHERE condition;

drop view view_name;

物化视图

物化视图是一个实际存储数据的表,它的数据定期刷新,不像普通视图那样每次查询都重新计算。

1
2
3
4
5
6
CREATE MATERIALIZED VIEW materialized_view_name AS
SELECT column1, column2, ...
FROM table_name
WHERE condition;

drop MATERIALIZED VIEW materialized_view_name

需要注意的是,物化视图需要定期手动或自动刷新以更新数据,你可以使用 REFRESH MATERIALIZED VIEW 命令来进行刷新

分页查询

在PostgreSQL数据库中,进行分页查询通常使用LIMIT和OFFSET子句来实现

LIMIT用于限制返回的行数,OFFSET用于指定从结果集的哪一行开始返回数据,下面是一个简单的分页查询示例:

假设我们有一个名为products的表,其中包含产品的名称和价格,我们想要获取第2页,每页显示10条数据,可以使用以下SQL查询语句:

1
2
3
4
SELECT product_name, price
FROM products
ORDER BY product_id
LIMIT 10 OFFSET 10;

在这个例子中,假设product_id是产品表中的唯一标识符,通过ORDER BY将结果按照product_id排序,LIMIT 10表示返回10行数据,OFFSET 10表示从结果集中的第11行开始返回数据,即返回第2页的数据

乱序查询

在PostgreSQL数据库中,使用ORDER BY RANDOM()语句可以实现按照随机顺序排序查询结果

这样可以让查询结果以随机的顺序返回,每次执行查询时都会得到不同的排序结果

这在需要随机抽样数据或随机展示数据时非常有用

需要注意的是,使用ORDER BY RANDOM()可能会影响查询的性能,因为需要对结果集进行随机排序操作

假设我们有一个名为products的表,其中包含产品的名称和价格,我们想要以随机顺序返回产品列表,可以使用以下SQL查询语句:

1
2
3
4
5
6
7
SELECT
product_name,
price
FROM
products
ORDER BY
RANDOM();

这将返回products表中产品名称和价格的数据,并以随机的顺序排序,每次执行该查询,返回的产品列表顺序都会不同

列表查询

列表字段的创建

1
2
3
4
CREATE TABLE your_table (
id SERIAL PRIMARY KEY,
list_column_name _TEXT
);

在PostgreSQL中,你可以使用以下方式来查询数组字段:

  1. 查询数组字段中包含特定值的行:
1
SELECT * FROM your_table WHERE your_array_column = ANY('{value1,value2,value3}');
  1. 查询数组字段中包含任何查询数组中的值的行:
1
SELECT * FROM your_table WHERE your_array_column && '{value1,value2,value3}';
  1. 查询数组字段中包含所有查询数组中的值的行:
1
SELECT * FROM your_table WHERE your_array_column @> '{value1,value2,value3}';
  1. 查询数组字段的长度大于1的所有行:
1
SELECT * FROM your_table WHERE array_length(your_array_column, 1) > 1;
  1. 查询数组字段中第一个元素等于特定值的行:
1
SELECT * FROM your_table WHERE your_array_column[1] = 'specific_value';

删除重复记录

postgresql 常用的删除重复数据方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
-- 初始化数据
create table hyc_tmp_del_test(id int, name varchar(255));
create table hyc_tmp_del_test_bk (like hyc_tmp_del_test);
insert into hyc_tmp_del_test select generate_series(1, 10000), 'huangyc';
insert into hyc_tmp_del_test select generate_series(1, 10000), 'huangyc';
insert into hyc_tmp_del_test_bk select * from hyc_tmp_del_test;

-- 最容易想到的方法就是判断数据是否重复,对于重复的数据只保留ctid最小(或最大)的数据,删除其他的
-- id相同的数据,保留ctid最小的,其他的删除
explain analyse delete from hyc_tmp_del_test_bk a where a.ctid <> (select min(t.ctid) from hyc_tmp_del_test_bk t where a.id=t.id); -- 17.112s

-- group by方法通过分组找到ctid最小的数据,然后删除其他数据
explain analyse delete from hyc_tmp_del_test_bk a where a.ctid not in (select min(ctid) from hyc_tmp_del_test_bk group by id); -- 0.052s

-- 高效删除方法
explain analyze delete from hyc_tmp_del_test_bk a where a.ctid = any(array (select ctid from (select row_number() over (partition by id), ctid from hyc_tmp_del_test_bk) t where t.row_number > 1)); -- 0.055s

第二种和第三种感觉差不多,原文说是第三种快不少,这里pg库是14.x版本

关键

pg中每个表都有几个系统隐藏列:tableoid, xmin, xmax,cmin,cmax,ctid

其中tableoid表示表的oid,cmin、cmax、xmin和xmax是mvcc的实现有关

ctid表示行版本在表中的物理位置: 它属于对象标识符类型(oid,Object Identifier Types),是一种行标识符,它的数据使用的元组标识符(tid,tuple identifier)。元组ID是一对(块号,块内的元组索引),用于标识当前行的物理位置。

引申用法

假设我们有一个表格 products,包含产品名称和产品类别。我们希望从中找出每个产品类别中的前两个产品

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
SELECT
*
FROM
(
SELECT
product_name,
category,
row_number() OVER (PARTITION BY SUBSTR(p.category, 0, LENGTH(p.category)-2) ORDER BY product_name) AS ranking
FROM
products p
WHERE
p.category LIKE 'Electronics:%'
) subquery
WHERE
ranking <= 2;

在这个例子中,我们有一个产品表 products,其中包含 product_namecategory

我们想要找出每个以 ‘Electronics:’ 开头的产品类别中的前两个产品

  • 子查询部分:
    • 子查询从 products 表中选择了产品名称 product_name 和产品类别 category
    • 我们只关注以 ‘Electronics:’ 开头的产品类别,即 p.category LIKE 'Electronics:%'
    • 使用 row_number() 函数,对每个以 ‘Electronics:’ 开头的类别进行编号,按产品名称排序
  • 主查询部分:
    • 在主查询中,我们选择了子查询的所有列 *
    • 我们进一步筛选了结果,只选择了编号小于等于2的记录,以获取每个类别中的前两个产品

索引

1
2
3
4
5
6
7
8
-- 获取数据库表中的索引
select * from pg_indexes where tablename = 't_cfg_opinfo';
-- 创建索引
create index index_name on table_name (col_0, col_1);
-- 查询索引
select * from pg_indexes where tablename='table_name';
-- 删除索引
drop index index_name;

什么情况下要避免使用索引?

虽然索引的目的在于提高数据库的性能,但这里有几个情况需要避免使用索引

使用索引时,需要考虑下列准则

  • 索引不应该使用在较小的表上
  • 索引不应该使用在有频繁的大批量的更新或插入操作的表上
  • 索引不应该使用在含有大量的 NULL 值的列上
  • 索引不应该使用在频繁操作的列上

实用sql

1
2
-- 查询库中的最大版本
SELECT (CASE WHEN MAX(version) IS NULL THEN -1 ELSE MAX(version) END) + 1 AS version FROM table_name

其他语法

筛选某列,逗号拼接

1
2
3
select string_agg(bs_org_id,',') as bs_org_ids 
from bs_org
where par_org_id ='100'

日期转换

1
2
select to_char(col_name,'yyyyMMDD')-interval '2 day' from table_name
-- -interval '2 day' 表示往前2天

转时间戳

1
2
select '2011-01-06 09:57:59'::timestamp;
TO_TIMESTAMP('2011-01-06 09:57:59', 'YYYY-MM-DD HH24:MI:S')

postgresql 获取分组第一条数据 窗口函数

  1. 给数据分组并排名,使用 row_number() over (partition by 分组的字段名 order by 排序规则) as 排名
  2. 从上述第一步中取出,排名为第一的数据,即为第一条数据 select * from 上述第一步 where 排名=1
  3. 获取前N名的数据,将一中第二步的条件换成where 排名 < N+1

distributed key

1
2
alter table table_name set distributed by (id);
alter table table_name add primary key (id);

ORM框架

ORM框架比较

一文了解 Python 的三种数据源架构模式

SQLAlchemy 和其他的 ORM 框架

SQLObject

  • 优点:

    采用了易懂的ActiveRecord 模式

    一个相对较小的代码库

  • 缺点:

    方法和类的命名遵循了Java 的小驼峰风格

    不支持数据库session隔离工作单元

Storm

  • 优点:

    清爽轻量的API,短学习曲线和长期可维护性

    不需要特殊的类构造函数,也没有必要的基类

  • 缺点:

    迫使程序员手工写表格创建的DDL语句,而不是从模型类自动派生

    Storm的贡献者必须把他们的贡献的版权给Canonical公司

Django’s ORM

  • 优点:

    易用,学习曲线短

    和Django紧密集合,用Django时使用约定俗成的方法去操作数据库

  • 缺点:

    不好处理复杂的查询,强制开发者回到原生SQL

    紧密和Django集成,使得在Django环境外很难使用

peewee

  • 优点:

    Django式的API,使其易用

    轻量实现,很容易和任意web框架集成

  • 缺点:

    不支持自动化 schema 迁移

    多对多查询写起来不直观

SQLAlchemy

  • 优点:

    企业级API,使得代码有健壮性和适应性

    灵活的设计,使得能轻松写复杂查询

  • 缺点:

    工作单元概念不常见

    重量级API,导致长学习曲线

相比其他的ORM, SQLAlchemy 意味着,无论你何时写SQLAlchemy代码, 都专注于工作单元的前沿概念 。DB Session 的概念可能最初很难理解和正确使用,但是后来你会欣赏这额外的复杂性,这让意外的时序提交相关的数据库bug减少到0。在SQLAlchemy中处理多数据库是棘手的, 因为每个DB session 都限定了一个数据库连接。但是,这种类型的限制实际上是好事, 因为这样强制你绞尽脑汁去想在多个数据库之间的交互, 从而使得数据库交互代码很容易调试。

SQLAlchemy

SQLAlchemy 1.4 Documentation

sqlalchemy操作数据库

sqlalchemy外键和relationship查询

SQLALlchemy数据查询小集合

SQLAlchemy 的连接池机制

SQLAlchemy 中的 Session、sessionmaker、scoped_session

Contextual/Thread-local Sessions

SQLAlchemy(常用的SQLAlchemy列选项)

查询官网例子Object Relational Tutorial (1.x API)

sqlalchemy外键和relationship查询

session和scoped_session

session用于创建程序和数据库之间的会话,所有对象的载入和保存都需通过session对象 。
通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源
scoped_session 实现了一个线程的隔离,保证不同的线程拿到不同的session, 同一个线程拿到的session 是同一个值

1
2
3
4
5
6
7
s1 = Session()
s2 = Session()
s1.add(person)
s1.commit()
# 必须先close,s2才能继续操作person
s1.close()
s2.add(person)

session 和scoped_session本质上都是用来操作数据库的,只是session 只适合在单线程下面使用
官方文档提到了scoped_session的正确使用方法。request结束后要调用scoped_session.remove()

Engine Configuration

使用 create_engine创建我们需要的DB starting point

1
2
3
4
from sqlalchemy import create_engine

scheme = 'mysql+pymysql://root:123456@localhost:3306/dev_shopping?charset=utf8'
engine = create_engine(scheme, pool_size=10 , max_overflow=-1, pool_recycle=1200)

create_engine 函数常用参数:

  • pool_size=10 # 连接池的大小,0表示连接数无限制
  • pool_recycle=-1 # 连接池回收连接的时间,如果设置为-1,表示没有no timeout, 注意,mysql会自动断开超过8小时的连接,所以sqlalchemy沿用被mysql断开的连接会抛出MySQL has gone away
  • max_overflow=-1 # 连接池中允许‘溢出’的连接个数,如果设置为-1,表示连接池中可以创建任意数量的连接
  • pool_timeout=30 # 在连接池获取一个空闲连接等待的时间
  • echo=False # 如果设置True, Engine将会记录所有的日志,日志默认会输出到sys.stdout

创建Engine之后,接下来的问题,就是如何使用Engine

在单进程中,建议在在初始化的模块的时候创建Engine, 使Engine成为全局变量, 而不是为每个调用Engine的对象或者函数中创建, Engine不同于connect, connect函数会创建数据库连接的资源,Engine是管理connect创建的连接资源

在多进程中,为每个子进程都创建各自的Engine, 因为进程之间是不能共享Engine

几种操作方式

Working with Engines and Connections

SqlAlchemy的Engine,Connection和Session 区别?适合什么时候用?

Engine方式

Engine是SQLAlchemy中连接数据库最底层级别的对象,它维护了一个连接池,可以在应用程序需要和数据库对话时使用。在Engine.execute(close_with_result=True) close_with_result=True 表示连接自动关闭;

1
2
3
4
5
6
result = engine.execute('SELECT * FROM tablename;') 
conn = engine.connect(close_with_result=True)
result = conn.execute('SELECT * FROM tablename;')
for row in result:
print(result['columnname']
result.close()

Connection方式

Connection,实际上是执行SQL查询的工作,每当你想更好的控制连接的属性,如何时关闭等都建议使用这个操作;比如在一个事务中,要控制它提交commit的时间,在connection控制中就可以运行多个不同的SQL语句,如果其中一个出现问题,则其他所有的语句都会撤销更改;

1
2
3
4
5
6
7
8
9
connection = engine.connect()
trans = connection.begin()
try:
connection.execute("INSERT INTO films VALUES ('Comedy', '82 minutes');")
connection.execute("INSERT INTO datalog VALUES ('added a comedy');")
trans.commit()
except:
trans.rollback()
raise

Session方式

Session,一般都是用于ORM中,因为在ORM中,会自动生成SQL语句以及自动连接数据库(自己配置),使用session.execute()也是个编辑的方法,可以将会话绑定到任何对象;如果你确定使用ORM,就建议使用session来处理execute(),否则还是使用connection更好方便;

总结: 从应用角度来看,可以把这三类分为两种:

  1. 直接使用Engine.execute() 或Connection.execute(),更加灵活,可以使用原生SQL语句

  2. 使用Session处理交易类型的数据,因为方便使用session.add(), session.rollback(), session.commit(), session.close()等,它是使用ORM时推荐的一种和数据库交互的方式