Elasticsearch数据库 环境配置
安装环境
pip install elasticsearch==7.6.0
EsDao包装类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 """ @version: v1.0 @author: huangyc @file: EsDao.py @Description: Es统一操作类 @time: 2020/4/27 10:22 """ from elasticsearch.helpers import bulkfrom elasticsearch import Elasticsearchimport pandas as pdclass EsDao (object ): """ ElasticSearch的数据操作类 """ DEFAULT_BATCH_SIZE = 1000 BULK_BATCH_SIZE = 10000 def __init__ (self, hosts, timeout=3600 *24 ): self.hosts = hosts self.timeout = timeout self.es = Elasticsearch(hosts, timeout=self.timeout) def save_data_list (self, index_name, data_list ): """ 保存数据列表到es的指定索引中 :param index_name: 索引名称 :param data_list: 数据列表,列表元素代表一行数据,元素类型为dict :return: """ bulk_data_lst = [ data_list[i:i + self.BULK_BATCH_SIZE] for i in range (0 , len (data_list), self.BULK_BATCH_SIZE) ] if len (data_list) > 0 and '_id' in data_list[0 ]: for bulk_data in bulk_data_lst: actions = [{ "_index" : index_name, "_type" : index_name, "_id" : data.pop("_id" ), "_source" : data } for data in bulk_data ] bulk(self.es, actions, index=index_name, raise_on_error=True ) else : for bulk_data in bulk_data_lst: actions = [{ "_index" : index_name, "_type" : index_name, "_source" : data } for data in bulk_data ] bulk(self.es, actions, index=index_name, raise_on_error=True ) def is_index_exists (self, index_name ): """ 判断指定索引是否存在 :param index_name: 索引名称 :return: """ return self.es.indices.exists(index=index_name) def delete_by_query (self, index_name, query_body ): """ 按查询结果删除数据 :param index_name: :param query_body: :return: """ return self.es.delete_by_query(index_name, query_body) def clear_index_data (self, index_name ): """ 清空指定索引的数据 :param index_name: :return: """ return self.delete_by_query( index_name=index_name, query_body={ "query" : { "match_all" : {} } } ) def save_df_data (self, index_name, df ): """ 保存pandas的DataFrame到es的指定索引中 :param index_name: 索引名称 :param df: 要保存的dataframe :return: """ col_lst = df.columns.tolist() dic_lst = [dict ([(c, v) for c, v in zip (col_lst, r)]) for r in df.values.tolist()] self.save_data_list(index_name=index_name, data_list=dic_lst) def create_index (self, index_name, mapping_properties ): """ 创建索引 :param index_name: 索引名称 :param mapping_properties: 索引mapping中的属性列表 :return: """ if not self.es.indices.exists(index=index_name): mapping = { "mappings" : { index_name: { "properties" : mapping_properties } } } res = self.es.indices.create(index=index_name, body=mapping) if res is not None and 'acknowledged' in res: return res.get('acknowledged' ) return False def _search_with_scroll (self, index_name, query_body ): if "size" not in query_body: query_body["size" ] = self.DEFAULT_BATCH_SIZE response = self.es.search( index=index_name, body=query_body, search_type="dfs_query_then_fetch" , scroll="120m" , timeout="60m" ) scroll_id = response["_scroll_id" ] while True : sources = [doc["_source" ] for doc in response["hits" ]["hits" ]] if len (sources) == 0 : break yield sources response = self.es.scroll(scroll_id=scroll_id, scroll="60m" ) def query_for_df (self, index_name, query_body ): """ 执行查询并获取pandas.DataFrame格式的返回值 :param index_name: 索引名称 :param query_body: 查询条件 :return: """ sources = [] for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body): sources.extend(sub_source) return pd.DataFrame(sources) def query_for_df_with_batch (self, index_name, query_body, batch_size=DEFAULT_BATCH_SIZE ): """ 按批次大小查询并返回pandas.DataFrame的generator格式的返回值 :param index_name: 索引名称 :param query_body: 查询条件 :param batch_size: 批次大小 :return: """ if "size" not in query_body: query_body["size" ] = batch_size for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body): yield pd.DataFrame(sub_source) def get_first_row_with_df (self, index_name ): """ 获取指定索引的首行数据,格式为pandas.DataFrame 可用于获取索引的元信息 :param index_name: 索引名称 :return: """ query_body = { "size" : 1 , "query" : { "match_all" : {} } } for sub_source in self._search_with_scroll(index_name=index_name, query_body=query_body): return pd.DataFrame(sub_source)
使用案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class TaskMeta : ''' 数据元类 ''' def __init__ (self, text, doc_id, sentence_id, reg_lst, flag, has_reg, text_source="primitive" ): self.text = text self.doc_id = doc_id self.sentence_id = sentence_id self.reg_lst = reg_lst self.flag = flag self.has_reg = has_reg self.text_source = text_source def __repr__ (self ): return f'{self.text} {self.doc_id} {self.sentence_id} {self.reg_lst} {self.flag} {self.has_reg} {self.text_source} ' def to_dict (self ): return {"text" : self.text, "doc_id" : self.doc_id, "sentence_id" : self.sentence_id, "reg_lst" : self.reg_lst, "flag" : self.flag, "has_reg" : self.has_reg, "text_source" : self.text_source}
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 def create_index (target_es_dao, index_name, mapping ): ''' 创建es索引 :return: 是否创建成功 ''' if not target_es_dao.is_index_exists(index_name): target_es_dao.create_index(index_name, mapping) else : target_es_dao.clear_index_data(index_name) print (f"索引{index_name} 已存在, 已清除数据" ) def writer_fun (target_es_dao, target_index, sample_lst ): ''' 写数据到es库 ''' df_sample_lst = [] [df_sample_lst.append(sample.to_dict()) for sample in sample_lst] df_sample_lst = pd.DataFrame(df_sample_lst) target_es_dao.save_df_data(target_index, df_sample_lst) print (f'写入数据{len (sample_lst)} 条' ) def es_cal_test (): source_es_dao = EsDao(f"http://{aug_config.SOURCE_IP} :{aug_config.SOURCE_PORT} /" ) query_condition = { "query_string" : { "default_field" : "has_reg" , "query" : "true" } } query_body = { "query" : query_condition } datas = source_es_dao.query_for_df(index_name=aug_config.SOURCE_INDEX, query_body=query_body) records = datas.to_dict(orient='record' ) sample_lst = [] for record in records: sample_lst.append( TaskMeta( text=record["text" ], doc_id=record["doc_id" ], sentence_id=record["sentence_id" ], reg_lst=record["reg_lst" ], flag=record["flag" ], has_reg=record["has_reg" ] ) ) create_index(target_es_dao, aug_config.TARGET_INDEX, aug_config.MAPPING) writer_fun(target_es_dao, aug_config.TARGET_INDEX, sample_lst=sample_lst) if __name__ == '__main__' : es_cal_test()
Oracle数据库
Python操作Oracle数据库:cx_Oracle
环境配置
Linux上Python连接Oracle解决报错cx_Oracle.DatabaseError: DPI-1047
安装库
链接库准备,需要将oci.dll、oraocci11.dll、oraociei11.dll
复制到sitepackages路径下,oracle client下载链接 ,并配置到系统环境变量,链接中没有的自己去官网(win64 、所有平台 、linux64 )注册一个账号下载对应的版本
1 2 SELECT * FROM v$version;
没有配置会报如下的错:
1 2 3 4 5 # Windows下报错 cx_Oracle.DatabaseError: DPI -1047: Cannot locate a 64-bit Oracle Client library : "D :\software \win_or # Linux 下报错 cx_Oracle.DatabaseError : DPI -1047: Cannot locate a 64-bit Oracle Client library : "libclntsh.so : cannot open shared object file : No such file or directory ". See https ://cx -oracle.readthedocs.io /en /latest /user_guide /installation.html for help
sql基础 建表 1 2 3 4 5 6 7 8 9 create table blob_table_tmp( id number primary key, blob_cl blob not null , clob_cl clob not null ); insert into blob_table_tmp values (1 ,rawtohex('11111000011111' ),'增加一条记录时,碰到插入blob类型数据出错' );insert into blob_table_tmp values (3 ,rawtohex('4561888' ),'增加一条记录时,碰到插入blob类型数据出错' );insert into blob_table_tmp values (4 ,rawtohex('增加一条记录时333' ),'增加一条记录时,碰到插入blob类型数据出错' );
查询
获取连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 FINANCE_DB_HOST = "192.168.x.x" FINANCE_DB_PORT = 1521 FINANCE_DB_USER = "hyc" FINANCE_DB_PASSWORD = "123456" FINANCE_DB_DB = "ORCL" class OracleConn (): config_path = '' @staticmethod def get_conn (conn_name, encoding="UTF-8" ): conn_str = str (eval ("%s_DB_USER" % (OracleConn.config_path, conn_name))) + "/" + str (eval ("%s.%s_DB_PASSWORD" % (OracleConn.config_path, conn_name))) conn_str += "@" + str (eval ("%s_DB_HOST" % (OracleConn.config_path, conn_name))) conn_str += ":" + str (eval ("%s_DB_PORT" % (OracleConn.config_path, conn_name))) conn_str += "/" + str (eval ("%s_DB_DB" % (OracleConn.config_path, conn_name))) return ora.connect(conn_str, encoding=encoding, nencoding=encoding)
读写数据库
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def oracle_test (): conn = OracleConn.get_conn("FINANCE" ) cur = conn.cursor() sql = "select id,blob_cl,clob_cl from FINANCE.blob_table_tmp" datas = [] r = cur.execute(sql) [datas.append((gg[0 ], gg[1 ].read().decode('utf-8' ), gg[2 ].read())) for gg in r] insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)" res = [] [res.append((data[0 ], data[1 ])) for data in datas] cur.executemany(insert_sql, res) cur.execute('commit' ) cur.close() conn.close() print ("写入结束" ) if __name__ == '__main__' : oracle_test()
相关操作
关于数据库的连接,查询和写入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import cx_Oracleclass Setting : DB_USER = 'narutohyc' DB_PASSWORD = 'hyc' DB_IP = '192.168.0.1' DB_PORT = '' DB_SERVICE = 'dataBaseName' setting = Setting() def oracle_test (): conn = cx_Oracle.connect('%s/%s@%s/%s' % (setting.DB_USER, setting.DB_PASSWORD, setting.DB_IP, setting.DB_SERVICE), encoding='utf-8' ) cur = conn.cursor() sql = "select ID, name from hyc_database" datas = [] r = cur.execute(sql) [datas.append((gg[0 ], gg[1 ].read())) for gg in r] insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)" res = [] [res.append((data[0 ], data[1 ])) for data in datas] cur.executemany(insert_sql, res) cur.execute('commit' ) cur.close() conn.close() print ("写入结束" ) if __name__ == '__main__' : oracle_test()
Postgresql数据库
官方文档Documentation → PostgreSQL 16
查询
数据类型
我终于学会了使用python操作postgresql
保姆级 CentOS 7离线安装PostgreSQL 14教程
易百_PostgreSQL教程
离线安装数据库
先从centos7-pg_14.2下载 下载rpm包(微云下载centos7.6_PostgreSQL14.2 ),或者直接官方下载安装教程 安装,如果离线安装就下载rpm包
1 2 3 4 5 rpm -ivh postgresql14-libs-14.2-1PGDG.rhel7.x86_64.rpm rpm -ivh postgresql14-14.2-1PGDG.rhel7.x86_64.rpm rpm -ivh postgresql14-server-14.2-1PGDG.rhel7.x86_64.rpm rpm -ivh postgresql14-contrib-14.2-1PGDG.rhel7.x86_64.rpm
出现OSError: Python library not found: libpython3.6mu.so.1.0, libpython3.6m.so.1.0, libpython3.6.so.1.0, libpython3.6m.so的解决办法
1 yum install python3-devel
创建数据库data和log文件夹
1 2 3 4 5 6 mkdir -p /home/postgres/pgsql_datamkdir -p /home/postgres/pgsql_logtouch /home/postgres/pgsql_log/pgsql.log
授权给安装数据时自动创建的postgres用户
1 2 chown -R postgres:postgres /home/postgres/pgsql_datachown -R postgres:postgres /home/postgres/pgsql_log
切换到安装数据时自动创建的postgres用户
初始化数据库到新建数据目录
1 /usr/pgsql-14/bin/initdb -D /home/postgres/pgsql_data
启动服务器(初始化数据库日志文件)
1 2 3 /usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log start # 查看状态 /usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log status
切换到管理员开启端口并重启防火墙
1 2 3 su root firewall-cmd --zone=public --add-port=5432/tcp --permanent firewall-cmd --reload
修改配置文件实现远程访问vi /home/postgres/pgsql_data/postgresql.conf
1 2 3 4 5 6 listen_addresses = '*' max_connections = 1000 password_encryption = md5
修改可访问的用户IP段
1 2 3 vi /home/pgsql_data/pg_hba.conf(a进入编辑模式,esc退出编辑模式,:wq并按回车保存) IPV4下修改为或新增 host all all 0.0.0.0/0 trust
postgres用户重启数据库服务
1 2 su - postgres /usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log restart
数据库安装结束,管理员postgres,默认密码123456
使用navicat连接pg库后新建数据库
环境配置 python连接pg
pip install “psycopg[binary,pool]”
使用sqlalchemy
1 2 3 4 5 6 7 8 9 10 11 12 from sqlalchemy import create_enginewith engine.connect().execution_options(stream_results=True ) as connection: stream = connection.execute(sql) while 1 : streams = stream.fetchmany(1000 ) if not streams: break for idx, row in enumerate (streams): row = row._asdict() print (idx)
使用psycopg
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 """ @version: v1.0 @author: huangyc @file: cur_2_test.py @Description: @time: 2024/3/21 17:22 """ import importlibimport osfrom basic_support.comm_funcs.comm_utils import gen_uuid32from dbutils.pooled_db import PooledDBfrom psycopg import ServerCursorfrom functools import partialfrom psycopg.rows import dict_rowos.environ['NLS_LANG' ] = 'SIMPLIFIED CHINESE_CHINA.UTF8' cls = partial(ServerCursor, name=gen_uuid32()) config = dict (host='192.168.xx.xx' , port=5432 , user='xx' , password='xx' , dbname='xx' , cursor_factory=cls) dic = dict (keepalives=1 , keepalives_idle=30 , keepalives_interval=10 , keepalives_count=5 ) config.update(dic) db_creator = importlib.import_module("psycopg" ) max_cached = 0 max_connections = 16 max_usage = 0 blocking = True sql = f"SELECT * FROM t_tmp_filter_docs limit 5000" """ max_cached: 池中空闲连接的最大数量。默认为0,即无最大数量限制。(建议默认) max_connections: 被允许的最大连接数。默认为0,无最大数量限制。(视情况而定) max_usage: 连接的最大使用次数。默认0,即无使用次数限制。(建议默认) blocking: 连接数达到最大时,新连接是否可阻塞。默认False,即达到最大连接数时,再取新连接将会报错。 (建议True,达到最大连接数时,新连接阻塞,等待连接数减少再连接) """ pool = PooledDB(db_creator, maxcached=max_cached, maxconnections=max_connections, maxusage=max_usage, blocking=blocking, **config) conn = pool.connection() cur = conn.cursor(row_factory=dict_row) batch_size = 1000 params = None cur.itersize = batch_size cur.execute(sql, params) idx = 0 while records := cur.fetchmany(batch_size): idx += 1 print (idx) try : cur.close() except : pass try : conn.close() except : pass
sql语法 数据库连接 1 2 3 4 5 6 7 8 select count (* ) from pg_stat_activity;show max_connections;select * from pg_stat_activity;select usename, count (* ) from pg_stat_activity group by usename;
数据库信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 select pg_size_pretty (pg_database_size('pg_fac_stk' ));SELECT table_schema || '.' || table_name AS table_full_name, pg_size_pretty(pg_total_relation_size('"' || table_schema || '"."' || table_name || '"' )) AS size FROM information_schema.tables where table_name like 'finance_%' ORDER BY pg_total_relation_size('"' || table_schema || '"."' || table_name || '"' ) DESC ; select relname as TABLE_NAME, reltuples as rowCounts from pg_class where relkind = 'r' order by rowCounts desc ;select pg_relation_filepath('product' );select version();EXPLAIN ANALYZE SELECT * FROM t_cfg_opinfo; select datname,xact_rollback,deadlocks from pg_stat_database;
数据备份与恢复
使用pgdump备份数据库
pgdump是PostgreSQL官方提供的备份工具,可以将数据库的数据和架构保存到一个文件中,使用pgdump备份的优点包括:
备份数据可以保持原有的结构和特性,还原时可以保证数据准确性
备份文件可以跨平台传输,方便进行远程备份
备份文件可以进行压缩,减小文件大小,方便传输和存储
可以新建数据库,建几张表做测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 # 学生表 CREATE TABLE students ( id SERIAL PRIMARY KEY, name VARCHAR (100 ) NOT NULL , gender VARCHAR (10 ) NOT NULL , age INTEGER NOT NULL , class VARCHAR (20 ) NOT NULL ); # 学科表 CREATE TABLE subjects ( id SERIAL PRIMARY KEY, name VARCHAR (100 ) NOT NULL ); # 成绩表 CREATE TABLE scores ( id SERIAL PRIMARY KEY, student_id INTEGER NOT NULL , subject_id INTEGER NOT NULL , score INTEGER NOT NULL , FOREIGN KEY (student_id) REFERENCES students (id), FOREIGN KEY (subject_id) REFERENCES subjects (id) ); # 插入一些测试数据 INSERT INTO students (name, gender, age, class)VALUES ('Alice' , 'Female' , 18 , 'Class A' ), ('Bob' , 'Male' , 17 , 'Class B' ), ('Charlie' , 'Male' , 19 , 'Class A' ), ('Diana' , 'Female' , 18 , 'Class B' ); # 插入学科表数据 INSERT INTO subjects (name)VALUES ('Mathematics' ), ('English' ), ('Science' ); INSERT INTO scores (student_id, subject_id, score)VALUES (1 , 1 , 90 ), (1 , 2 , 85 ), (1 , 3 , 92 ); INSERT INTO scores (student_id, subject_id, score)VALUES (2 , 1 , 78 ), (2 , 2 , 80 ), (2 , 3 , 75 ); INSERT INTO scores (student_id, subject_id, score)VALUES (3 , 1 , 88 ), (3 , 2 , 92 ), (3 , 3 , 90 ); INSERT INTO scores (student_id, subject_id, score)VALUES (4 , 1 , 95 ), (4 , 2 , 88 ), (4 , 3 , 92 );
备份 使用pgdump备份数据库非常简单,只需要在终端中输入相应的命令即可
备份整个数据库
1 2 3 4 pg_dump -h <数据库地址> -p <数据库端口> -U <数据库用户名> -F c -b -v -f <备份文件路径> <数据库名称> # 示例 /usr/pgsql-14 /bin/pg_dump -h 127 .0 .0 .1 -U postgres -p 5432 -F t -b -v -f build_hyc_test.sql.tar hyc_test
备份指定表或数据
1 2 3 4 5 6 7 pg_dump -h <数据库地址> -p <数据库端口> -U <数据库用户名> -F c -b -v -t <表名1 > -t <表名2 > -f <备份文件路径> <数据库名称> # 示例 -- 备份指定表到sql文件 -- '-c --if -exists' 会生成 'drop table if exist ' 命令 -- '--no-owner' 是一个选项,用于指定在导出数据库时不包括拥有者信息 pg_dump --verbose --host=192 .168 .xx.xx --port=5432 --username=postgres --file /home/huangyc/pg_bak_test/bak_hyc.sql --encoding=UTF-8 -t "public.tushare_wz_index" -t "public.tushare_us_basic" -t "public.dim_fund" -t "public.dim_index" -c --if -exists --no-owner pg_fac_stk
具体参数的含义如下:
-h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址
-p:数据库服务的监听端口,一般为默认端口5432
-U:连接数据库的用户名
-F:备份文件的格式,包括自定义格式c,纯文本格式p和归档格式t
-b:在备份文件中包含备份的数据库的模式信息
-v:备份过程中输出详细的信息
-f:备份文件的保存路径和文件名
-t:只备份指定的表和数据
1 2 3 4 5 pg_dump - h 127.0 .0 .1 - p 5432 - U postgres - f postgres.sql.tar - Ft; pg_dumpall - d postgres - U postgres - f postgres.sql
还原 使用备份文件进行恢复也非常简单,只需要在终端中输入相应的命令即可
恢复整个库
1 2 3 4 pg_restore -h <数据库地址> -p <数据库端口> -U <数据库用户名> -d <数据库名称> <备份文件路径> # 示例 /usr/pgsql-14 /bin/pg_restore -h 127 .0 .0 .1 -U postgres -p 5432 -d hyc_test_bak build_hyc_test.sql.tar
恢复指定数据
1 2 3 4 5 6 pg_restore -h <数据库地址> -p <数据库端口> -U <数据库用户名> -t <表名1 > -t <表名2 > -d <数据库名称> <备份文件路径> # 示例 -- 对于pg_dump备份出来的sql文件,直接执行sql文件即可恢复 -- 还原指定sql文件到bak_test库(需要自己建库) psql --host=192 .168 .xx.xx --port=5432 --username=postgres -d bak_test --file /home/huangyc/pg_bak_test/bak_hyc.sql.tar
具体参数的含义如下:
-h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址
-p:数据库服务的监听端口,一般为默认端口5432
-U:连接数据库的用户名
-d:恢复数据的目标数据库名称
-t:只恢复指定的表和数据
命令详解 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 [postgres@pg01 ~]$ pg_dump --help 用法: pg_dump [选项]... [数据库名字] **一般选项**: -f, --file=FILENAME 输出文件或目录名 -F, --format=c|d|t|p 输出文件格式 (c=custom, d=directory, t=tar,p=plain,plain就是sql纯文本 (默认值)) -j, --jobs =NUM 执行多个并行任务进行备份转储工作 -v, --verbose 详细模式 -V, --version 输出版本信息,然后退出 -Z, --compress=0-9 被压缩格式的压缩级别,0表示不压缩 --lock-wait-timeout=TIMEOUT 在等待表锁超时后操作失败 --no-sync 不用等待变化安全写入磁盘 -?, --help 显示此帮助, 然后退出 **控制输出内容选项(常用)**: -a, --data-only 只转储数据,不包括模式,只对纯文本输出有意义 -s, --schema-only 只转储模式, 不包括数据 -c, --clean 在重新创建之前,先清除(删除)数据库对象,如drop table。只对纯文本输出有意义 -C, --create 指定输出文件中是否生成create database语句,只对纯文本输出有意义 -n, --schema=PATTERN 指定要导出的schema,不指定则导出所有的非系统schema -N, --exclude-schema=PATTERN 排除导出哪些schema -O, --no-owner 在明文格式中, 忽略恢复对象所属者 -t, --table=PATTERN 指定导出的表、视图、序列,可以使用多个-t匹配多个表,使用-t之后,-n和-N就失效了 -T, --exclude-table=PATTERN 排除表 -x, --no-privileges 不要转储权限 (grant/revoke) --disable-triggers 在只恢复数据的过程中禁用触发器 --exclude-table-data=PATTERN do NOT dump data for the specified table(s) --if-exists 当删除对象时使用IF EXISTS --inserts 以INSERT命令,而不是COPY命令的形式转储数据,使用该选项可以把数据加载到非pg数据库,会使恢复非常慢 该选项为每行生成1个单独的insert命令,?在恢复过程中遇到错误,将会丢失1行而不是全部表数据 --column-inserts 以带有列名的INSERT命令形式转储数据,例如insert into table_name(column,...) values(value1,...) --load-via-partition-root 通过根表加载分区 --no-comments 不转储注释 --no-tablespaces 不转储表空间分配信息 --no-unlogged-table-data 不转储没有日志的表数据 --on-conflict-do-nothing 将ON CONFLICT DO NOTHING添加到INSERT命令 **控制输出内容选项(不常用)**: -S, --superuser=NAME 指定关闭触发器时需要用到的超级用户名。 它只有在使用了--disable-triggers时才有影响。一般情况下,最好不要输入该参数,而是用 超级用户启动生成的脚本。 -b, --blobs 在转储中包括大对象 -B, --no-blobs 排除转储中的大型对象 -E, --encoding=ENCODING 转储以ENCODING形式编码的数据 --binary-upgrade 只能由升级工具使用 --enable-row-security 启用行安全性(只转储用户能够访问的内容) --extra-float-digits=NUM 覆盖extra_float_digits的默认设置 --disable-dollar-quoting 取消美元 (符号) 引号, 使用 SQL 标准引号 --no-publications 不转储发布 --no-security-labels 不转储安全标签的分配 --no-subscriptions 不转储订阅 --no-synchronized-snapshots 在并行工作集中不使用同步快照 --quote-all-identifiers 所有标识符加引号,即使不是关键字 --rows-per-insert=NROWS 每个插入的行数;意味着--inserts --section=SECTION 备份命名的节 (数据前, 数据, 及 数据后) --serializable-deferrable 等到备份可以无异常运行 --snapshot=SNAPSHOT 为转储使用给定的快照 --strict-names 要求每个表和(或)schema包括模式以匹配至少一个实体 --use-set-session-authorization 使用 SESSION AUTHORIZATION 命令代替 ALTER OWNER 命令来设置所有权 **联接选项**: -d, --dbname=DBNAME 对数据库 DBNAME备份 -h, --host=主机名 数据库服务器的主机名或套接字目录 -p, --port=端口号 数据库服务器的端口号 -U, --username=名字 以指定的数据库用户联接 -w, --no-password 永远不提示输入口令 -W, --password 强制口令提示 (自动) --role=ROLENAME 在转储前运行SET ROLE
对于pg_dump的自定义备份custom和tar类型的备份,需要使用pg_restore进行恢复,pg_restore语法如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 [postgres@pg01 pg_backup]$ pg_restore --help pg_restore 从一个归档中恢复一个由 pg_dump 创建的 PostgreSQL 数据库. 用法: pg_restore [选项]... [文件名] 一般选项: -d, --dbname=名字 连接数据库名字 -f, --file=文件名 输出文件名(- 对于stdout) -F, --format=c|d|t 备份文件格式(应该自动进行) -l, --list 打印归档文件的 TOC 概述 -v, --verbose 详细模式 -V, --version 输出版本信息, 然后退出 -?, --help 显示此帮助, 然后退出 恢复控制选项: -a, --data-only 只恢复数据, 不包括模式 -c, --clean 在重新创建之前,先清除(删除)数据库对象 -C, --create 创建目标数据库 -e, --exit-on-error 发生错误退出, 默认为继续 -I, --index=NAME 恢复指定名称的索引 -j, --jobs =NUM 执行多个并行任务进行恢复工作 -L, --use-list=FILENAME 从这个文件中使用指定的内容表排序 输出 -n, --schema=NAME 在这个模式中只恢复对象 -N, --exclude-schema=NAME 不恢复此模式中的对象 -O, --no-owner 不恢复对象所属者 -P, --function =NAME(args) 恢复指定名字的函数 -s, --schema-only 只恢复模式, 不包括数据 -S, --superuser=NAME 使用指定的超级用户来禁用触发器 -t, --table=NAME 恢复命名关系(表、视图等) -T, --trigger=NAME 恢复指定名字的触发器 -x, --no-privileges 跳过处理权限的恢复 (grant/revoke) -1, --single-transaction 作为单个事务恢复 --disable-triggers 在只恢复数据的过程中禁用触发器 --enable-row-security 启用行安全性 --if-exists 当删除对象时使用IF EXISTS --no-comments 不恢复注释 --no-data-for-failed-tables 对那些无法创建的表不进行 数据恢复 --no-publications 不恢复发行 --no-security-labels 不恢复安全标签信息 --no-subscriptions 不恢复订阅 --no-tablespaces 不恢复表空间的分配信息 --section=SECTION 恢复命名节 (数据前、数据及数据后) --strict-names 要求每个表和(或)schema包括模式以匹配至少一个实体 --use-set-session-authorization 使用 SESSION AUTHORIZATION 命令代替 ALTER OWNER 命令来设置所有权 联接选项: -h, --host=主机名 数据库服务器的主机名或套接字目录 -p, --port=端口号 数据库服务器的端口号 -U, --username=名字 以指定的数据库用户联接 -w, --no-password 永远不提示输入口令 -W, --password 强制口令提示 (自动) --role=ROLENAME 在恢复前执行SET ROLE操作 选项 -I, -n, -N, -P, -t, -T, 以及 --section 可以组合使用和指定 多次用于选择多个对象. 如果没有提供输入文件名, 则使用标准输入.
表空间
新建表空间
1 2 3 4 mkdir /home/huangyc/t_fac_tschown postgres /home/huangyc/t_fac_ts
pg库新建表空间
1 create tablespace t_fac_ts owner postgres location '/home/huangyc/t_fac_ts' ;
表空间有关的一些语法
1 2 3 4 5 6 # 删除表空间 (需要先drop 表空间所有的表, 或者将该空间下所有的表移除才能drop 表空间) DROP TABLESPACE t_fac_ts;# 修改具体的表到指定表空间下 ALTER TABLE t_fac_tushare_stock_basic SET TABLESPACE t_fac_ts;# 修改指定库到指定表空间下 ALTER DATABASE name SET TABLESPACE new_tablespace;
锁表处理
pg锁表解锁
查看被锁的表
1 2 3 select a.locktype,a.database,a.pid,a.mode,a.relation,b.relnamefrom pg_locks ajoin pg_class b on a.relation = b.oid where relname= 't_opt_strhdk_blsj' ;
杀死被锁的pid
1 select pg_terminate_backend(pid);
表结构修改 1 2 3 4 5 6 7 8 9 10 alter table "user" rename to "ts_user";alter table table_name add column col_name varchar (50 );alter table table_name drop column col_name;alter table table_name add primary key("col_name");alter table table_name rename column old_col_name to new_col_name;
数据更新和查询
设置某字段的值
1 2 3 4 5 6 7 8 update table_name set col_name= new_value;UPDATE table1SET field_to_update = table2.new_valueFROM table2WHERE table1.common_column = table2.common_column;
删除表中重复数据
1 2 3 4 5 6 7 8 9 10 select col1,col2,count (* ) from old_table group by col1,col2;create table bak_table as select distinct * from table_name;select col1,col2,count (* ) from bak_table group by col1,col2;truncate table old_table;insert into old_table (col1,col2) select col1,col2 from bak_table;
不存在插入,存在更新
1 insert into ... on conflict(column_name) do update set ...
conflict(column_name): column_name字段是判断要查找的数据是否存在,作为判断条件
column_name必须是主键 或者其他具有唯一性的字段(如唯一键或排他键)
1 2 3 4 insert into user (id,username,address,create_date,create_by) values ('1' ,'刘德华' ,'香港' ,now(),'system' ) on conflict(id) do update set address= '中国' ,update_date= now(),update_by= 'system' ;
1 2 # 批量的方式 insert into testunnest(id, age, name) values (unnest (array [1 ,3 ]), unnest (array [18 ,10 ]), unnest (array ['valupdated' , 'val3' ])) on conflict (id) do update set age = excluded.age, name = excluded.name;
数据和结构复制 1 2 3 4 5 create table new_table as select * from old_table [WITH NO DATA];insert into new_table (col_0, col_1) select col_0, col_1 from old_table;
视图
普通视图
视图是一个虚拟表,它是根据一个或多个基本表的查询结果动态生成的,每次查询视图时都会执行相应的查询
1 2 3 4 5 6 CREATE VIEW view_name AS SELECT column1, column2, ...FROM table_nameWHERE condition ;drop view view_name;
物化视图
物化视图是一个实际存储数据的表,它的数据定期刷新,不像普通视图那样每次查询都重新计算。
1 2 3 4 5 6 CREATE MATERIALIZED VIEW materialized_view_name AS SELECT column1, column2, ...FROM table_nameWHERE condition ;drop MATERIALIZED VIEW materialized_view_name
需要注意的是,物化视图需要定期手动或自动刷新以更新数据,你可以使用 REFRESH MATERIALIZED VIEW
命令来进行刷新
分页查询 在PostgreSQL数据库中,进行分页查询通常使用LIMIT和OFFSET子句来实现
LIMIT用于限制返回的行数,OFFSET用于指定从结果集的哪一行开始返回数据,下面是一个简单的分页查询示例:
假设我们有一个名为products 的表,其中包含产品的名称和价格,我们想要获取第2页,每页显示10条数据,可以使用以下SQL查询语句:
1 2 3 4 SELECT product_name, priceFROM productsORDER BY product_idLIMIT 10 OFFSET 10 ;
在这个例子中,假设product_id是产品表中的唯一标识符,通过ORDER BY将结果按照product_id排序,LIMIT 10表示返回10行数据,OFFSET 10表示从结果集中的第11行开始返回数据,即返回第2页的数据
乱序查询 在PostgreSQL数据库中,使用ORDER BY RANDOM()语句可以实现按照随机顺序排序查询结果
这样可以让查询结果以随机的顺序返回,每次执行查询时都会得到不同的排序结果
这在需要随机抽样数据或随机展示数据时非常有用
需要注意的是,使用ORDER BY RANDOM()可能会影响查询的性能,因为需要对结果集进行随机排序操作
假设我们有一个名为products 的表,其中包含产品的名称和价格,我们想要以随机顺序返回产品列表,可以使用以下SQL查询语句:
1 2 3 4 5 6 7 SELECT product_name, price FROM products ORDER BY RANDOM();
这将返回products表中产品名称和价格的数据,并以随机的顺序排序,每次执行该查询,返回的产品列表顺序都会不同
列表查询
列表字段的创建
1 2 3 4 CREATE TABLE your_table ( id SERIAL PRIMARY KEY, list_column_name _TEXT );
在PostgreSQL中,你可以使用以下方式来查询数组字段:
查询数组字段中包含特定值的行:
1 SELECT * FROM your_table WHERE your_array_column = ANY ('{value1,value2,value3}' );
查询数组字段中包含任何查询数组中的值的行:
1 SELECT * FROM your_table WHERE your_array_column && '{value1,value2,value3}' ;
查询数组字段中包含所有查询数组中的值的行:
1 SELECT * FROM your_table WHERE your_array_column @> '{value1,value2,value3}' ;
查询数组字段的长度大于1的所有行:
1 SELECT * FROM your_table WHERE array_length(your_array_column, 1 ) > 1 ;
查询数组字段中第一个元素等于特定值的行:
1 SELECT * FROM your_table WHERE your_array_column[1 ] = 'specific_value' ;
删除重复记录
postgresql 常用的删除重复数据方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 create table hyc_tmp_del_test(id int , name varchar (255 ));create table hyc_tmp_del_test_bk (like hyc_tmp_del_test);insert into hyc_tmp_del_test select generate_series(1 , 10000 ), 'huangyc' ;insert into hyc_tmp_del_test select generate_series(1 , 10000 ), 'huangyc' ;insert into hyc_tmp_del_test_bk select * from hyc_tmp_del_test;explain analyse delete from hyc_tmp_del_test_bk a where a.ctid <> (select min (t.ctid) from hyc_tmp_del_test_bk t where a.id= t.id); explain analyse delete from hyc_tmp_del_test_bk a where a.ctid not in (select min (ctid) from hyc_tmp_del_test_bk group by id); explain analyze delete from hyc_tmp_del_test_bk a where a.ctid = any (array (select ctid from (select row_number () over (partition by id), ctid from hyc_tmp_del_test_bk) t where t.row_number > 1 ));
第二种和第三种感觉差不多,原文说是第三种快不少,这里pg库是14.x版本
关键
pg中每个表都有几个系统隐藏列:tableoid, xmin, xmax,cmin,cmax,ctid
其中tableoid表示表的oid,cmin、cmax、xmin和xmax是mvcc的实现有关
ctid表示行版本在表中的物理位置 : 它属于对象标识符类型(oid,Object Identifier Types),是一种行标识符,它的数据使用的元组标识符(tid,tuple identifier)。元组ID是一对(块号,块内的元组索引),用于标识当前行的物理位置。
引申用法
假设我们有一个表格 products
,包含产品名称和产品类别。我们希望从中找出每个产品类别中的前两个产品
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 SELECT * FROM ( SELECT product_name, category, row_number () OVER (PARTITION BY SUBSTR(p.category, 0 , LENGTH(p.category)-2 ) ORDER BY product_name) AS ranking FROM products p WHERE p.category LIKE 'Electronics:%' ) subquery WHERE ranking <= 2 ;
在这个例子中,我们有一个产品表 products
,其中包含 product_name
和 category
列
我们想要找出每个以 ‘Electronics:’ 开头的产品类别中的前两个产品
子查询部分:
子查询从 products
表中选择了产品名称 product_name
和产品类别 category
我们只关注以 ‘Electronics:’ 开头的产品类别,即 p.category LIKE 'Electronics:%'
使用 row_number()
函数,对每个以 ‘Electronics:’ 开头的类别进行编号,按产品名称排序
主查询部分:
在主查询中,我们选择了子查询的所有列 *
我们进一步筛选了结果,只选择了编号小于等于2的记录,以获取每个类别中的前两个产品
索引 1 2 3 4 5 6 7 8 select * from pg_indexes where tablename = 't_cfg_opinfo' ; create index index_name on table_name (col_0, col_1);select * from pg_indexes where tablename= 'table_name' ;drop index index_name;
什么情况下要避免使用索引?
虽然索引的目的在于提高数据库的性能,但这里有几个情况需要避免使用索引
使用索引时,需要考虑下列准则 :
索引不应该使用在较小的表上
索引不应该使用在有频繁的大批量的更新或插入操作的表上
索引不应该使用在含有大量的 NULL 值的列上
索引不应该使用在频繁操作的列上
实用sql 1 2 SELECT (CASE WHEN MAX (version) IS NULL THEN -1 ELSE MAX (version) END ) + 1 AS version FROM table_name
其他语法
筛选某列,逗号拼接
1 2 3 select string_agg(bs_org_id,',' ) as bs_org_ids from bs_org where par_org_id = '100'
日期转换
1 2 select to_char(col_name,'yyyyMMDD' )- interval '2 day' from table_name
转时间戳
1 2 select '2011-01-06 09:57:59' ::timestamp ;TO_TIMESTAMP('2011-01-06 09:57:59' , 'YYYY-MM-DD HH24:MI:S' )
postgresql 获取分组第一条数据 窗口函数
给数据分组并排名,使用 row_number() over (partition by 分组的字段名 order by 排序规则) as 排名
从上述第一步中取出,排名为第一的数据,即为第一条数据 select * from 上述第一步 where 排名=1
获取前N名的数据,将一中第二步的条件换成where 排名 < N+1
distributed key
1 2 alter table table_name set distributed by (id);alter table table_name add primary key (id);
ORM框架 ORM框架比较
一文了解 Python 的三种数据源架构模式
SQLAlchemy 和其他的 ORM 框架
SQLObject
优点:
采用了易懂的ActiveRecord 模式
一个相对较小的代码库
缺点:
方法和类的命名遵循了Java 的小驼峰风格
不支持数据库session隔离工作单元
Storm
Django’s ORM
peewee
优点:
Django式的API,使其易用
轻量实现,很容易和任意web框架集成
缺点:
不支持自动化 schema 迁移
多对多查询写起来不直观
SQLAlchemy
优点:
企业级API,使得代码有健壮性和适应性
灵活的设计,使得能轻松写复杂查询
缺点:
工作单元概念不常见
重量级API,导致长学习曲线
相比其他的ORM, SQLAlchemy 意味着,无论你何时写SQLAlchemy代码, 都专注于工作单元的前沿概念 。DB Session 的概念可能最初很难理解和正确使用,但是后来你会欣赏这额外的复杂性,这让意外的时序提交相关的数据库bug减少到0。在SQLAlchemy中处理多数据库是棘手的, 因为每个DB session 都限定了一个数据库连接。但是,这种类型的限制实际上是好事, 因为这样强制你绞尽脑汁去想在多个数据库之间的交互, 从而使得数据库交互代码很容易调试。
SQLAlchemy
SQLAlchemy 1.4 Documentation
sqlalchemy操作数据库
sqlalchemy外键和relationship查询
SQLALlchemy数据查询小集合
SQLAlchemy 的连接池机制
SQLAlchemy 中的 Session、sessionmaker、scoped_session
Contextual/Thread-local Sessions
SQLAlchemy(常用的SQLAlchemy列选项)
查询官网例子Object Relational Tutorial (1.x API)
sqlalchemy外键和relationship查询
session和scoped_session session用于创建程序和数据库之间的会话,所有对象的载入和保存都需通过session对象 。 通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源 scoped_session 实现了一个线程的隔离,保证不同的线程拿到不同的session, 同一个线程拿到的session 是同一个值
1 2 3 4 5 6 7 s1 = Session() s2 = Session() s1.add(person) s1.commit() s1.close() s2.add(person)
session 和scoped_session本质上都是用来操作数据库的,只是session 只适合在单线程下面使用 官方文档提到了scoped_session的正确使用方法。request结束后要调用scoped_session.remove()
Engine Configuration
使用 create_engine
创建我们需要的DB starting point
1 2 3 4 from sqlalchemy import create_enginescheme = 'mysql+pymysql://root:123456@localhost:3306/dev_shopping?charset=utf8' engine = create_engine(scheme, pool_size=10 , max_overflow=-1 , pool_recycle=1200 )
create_engine
函数常用参数:
pool_size=10 # 连接池的大小,0表示连接数无限制
pool_recycle=-1 # 连接池回收连接的时间,如果设置为-1,表示没有no timeout, 注意,mysql会自动断开超过8小时的连接,所以sqlalchemy沿用被mysql断开的连接会抛出MySQL has gone away
max_overflow=-1 # 连接池中允许‘溢出’的连接个数,如果设置为-1,表示连接池中可以创建任意数量的连接
pool_timeout=30 # 在连接池获取一个空闲连接等待的时间
echo=False # 如果设置True, Engine将会记录所有的日志,日志默认会输出到sys.stdout
创建Engine
之后,接下来的问题,就是如何使用Engine
在单进程中,建议在在初始化的模块的时候创建Engine
, 使Engine
成为全局变量, 而不是为每个调用Engine
的对象或者函数中创建, Engine
不同于connect
, connect
函数会创建数据库连接的资源,Engine
是管理connect
创建的连接资源
在多进程中,为每个子进程都创建各自的Engine
, 因为进程之间是不能共享Engine
几种操作方式
Working with Engines and Connections
SqlAlchemy的Engine,Connection和Session 区别?适合什么时候用?
Engine方式
Engine是SQLAlchemy中连接数据库最底层级别的对象,它维护了一个连接池,可以在应用程序需要和数据库对话时使用。在Engine.execute(close_with_result=True) close_with_result=True 表示连接自动关闭;
1 2 3 4 5 6 result = engine.execute('SELECT * FROM tablename;' ) conn = engine.connect(close_with_result=True ) result = conn.execute('SELECT * FROM tablename;' ) for row in result: print (result['columnname' ] result.close()
Connection方式
Connection,实际上是执行SQL查询的工作,每当你想更好的控制连接的属性,如何时关闭等都建议使用这个操作;比如在一个事务中,要控制它提交commit的时间,在connection控制中就可以运行多个不同的SQL语句,如果其中一个出现问题,则其他所有的语句都会撤销更改;
1 2 3 4 5 6 7 8 9 connection = engine.connect() trans = connection.begin() try : connection.execute("INSERT INTO films VALUES ('Comedy', '82 minutes');" ) connection.execute("INSERT INTO datalog VALUES ('added a comedy');" ) trans.commit() except : trans.rollback() raise
Session方式
Session,一般都是用于ORM中,因为在ORM中,会自动生成SQL语句以及自动连接数据库(自己配置),使用session.execute()也是个编辑的方法,可以将会话绑定到任何对象;如果你确定使用ORM,就建议使用session来处理execute(),否则还是使用connection更好方便;
总结: 从应用角度来看,可以把这三类分为两种:
直接使用Engine.execute() 或Connection.execute(),更加灵活,可以使用原生SQL语句
使用Session处理交易类型的数据,因为方便使用session.add(), session.rollback(), session.commit(), session.close()等,它是使用ORM时推荐的一种和数据库交互的方式