Elasticsearch数据库 环境配置 
安装环境
pip install elasticsearch==7.6.0
 
EsDao包装类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 """ @version: v1.0 @author: huangyc @file: EsDao.py @Description: Es统一操作类 @time: 2020/4/27 10:22 """ from  elasticsearch.helpers import  bulkfrom  elasticsearch import  Elasticsearchimport  pandas as  pdclass  EsDao (object ):    """      ElasticSearch的数据操作类     """          DEFAULT_BATCH_SIZE = 1000           BULK_BATCH_SIZE = 10000      def  __init__ (self, hosts, timeout=3600 *24  ):         self.hosts = hosts         self.timeout = timeout         self.es = Elasticsearch(hosts, timeout=self.timeout)     def  save_data_list (self, index_name, data_list ):         """          保存数据列表到es的指定索引中         :param index_name: 索引名称         :param data_list: 数据列表,列表元素代表一行数据,元素类型为dict         :return:         """         bulk_data_lst = [             data_list[i:i + self.BULK_BATCH_SIZE]             for  i in  range (0 , len (data_list), self.BULK_BATCH_SIZE)         ]         if  len (data_list) > 0  and  '_id'  in  data_list[0 ]:             for  bulk_data in  bulk_data_lst:                 actions = [{                     "_index" : index_name,                     "_type" : index_name,                     "_id" : data.pop("_id" ),                     "_source" : data                 }                     for  data in  bulk_data                 ]                 bulk(self.es, actions, index=index_name, raise_on_error=True )         else :             for  bulk_data in  bulk_data_lst:                 actions = [{                     "_index" : index_name,                     "_type" : index_name,                     "_source" : data                 }                     for  data in  bulk_data                 ]                 bulk(self.es, actions, index=index_name, raise_on_error=True )     def  is_index_exists (self, index_name ):         """          判断指定索引是否存在         :param index_name: 索引名称         :return:         """         return  self.es.indices.exists(index=index_name)     def  delete_by_query (self, index_name, query_body ):         """          按查询结果删除数据         :param index_name:         :param query_body:         :return:         """         return  self.es.delete_by_query(index_name, query_body)     def  clear_index_data (self, index_name ):         """          清空指定索引的数据         :param index_name:         :return:         """         return  self.delete_by_query(             index_name=index_name,             query_body={                 "query" : {                     "match_all" : {}                 }             }         )     def  save_df_data (self, index_name, df ):         """          保存pandas的DataFrame到es的指定索引中         :param index_name: 索引名称         :param df: 要保存的dataframe         :return:         """         col_lst = df.columns.tolist()         dic_lst = [dict ([(c, v) for  c, v in  zip (col_lst, r)]) for  r in  df.values.tolist()]         self.save_data_list(index_name=index_name, data_list=dic_lst)     def  create_index (self, index_name, mapping_properties ):         """          创建索引         :param index_name: 索引名称         :param mapping_properties: 索引mapping中的属性列表         :return:         """         if  not  self.es.indices.exists(index=index_name):             mapping = {                 "mappings" : {                     index_name: {                         "properties" : mapping_properties                     }                 }             }             res = self.es.indices.create(index=index_name, body=mapping)             if  res is  not  None  and  'acknowledged'  in  res:                 return  res.get('acknowledged' )         return  False      def  _search_with_scroll (self, index_name, query_body ):         if  "size"  not  in  query_body:             query_body["size" ] = self.DEFAULT_BATCH_SIZE         response = self.es.search(             index=index_name,             body=query_body,             search_type="dfs_query_then_fetch" ,             scroll="120m" ,             timeout="60m"          )         scroll_id = response["_scroll_id" ]         while  True :             sources = [doc["_source" ] for  doc in  response["hits" ]["hits" ]]             if  len (sources) == 0 :                 break              yield  sources             response = self.es.scroll(scroll_id=scroll_id, scroll="60m" )     def  query_for_df (self, index_name, query_body ):         """          执行查询并获取pandas.DataFrame格式的返回值         :param index_name: 索引名称         :param query_body: 查询条件         :return:         """         sources = []         for  sub_source in  self._search_with_scroll(index_name=index_name, query_body=query_body):             sources.extend(sub_source)         return  pd.DataFrame(sources)     def  query_for_df_with_batch (self, index_name, query_body, batch_size=DEFAULT_BATCH_SIZE ):         """          按批次大小查询并返回pandas.DataFrame的generator格式的返回值         :param index_name: 索引名称         :param query_body: 查询条件         :param batch_size: 批次大小         :return:         """         if  "size"  not  in  query_body:             query_body["size" ] = batch_size         for  sub_source in  self._search_with_scroll(index_name=index_name, query_body=query_body):             yield  pd.DataFrame(sub_source)     def  get_first_row_with_df (self, index_name ):         """          获取指定索引的首行数据,格式为pandas.DataFrame         可用于获取索引的元信息         :param index_name: 索引名称         :return:         """         query_body = {             "size" : 1 ,             "query" : {                 "match_all" : {}             }         }         for  sub_source in  self._search_with_scroll(index_name=index_name, query_body=query_body):             return  pd.DataFrame(sub_source) 
 
使用案例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 class  TaskMeta :    '''      数据元类     '''     def  __init__ (self, text, doc_id, sentence_id, reg_lst, flag, has_reg, text_source="primitive"  ):         self.text = text         self.doc_id = doc_id         self.sentence_id = sentence_id         self.reg_lst = reg_lst         self.flag = flag         self.has_reg = has_reg         self.text_source = text_source     def  __repr__ (self ):         return  f'{self.text}  {self.doc_id}  {self.sentence_id}  {self.reg_lst}  {self.flag}  {self.has_reg}  {self.text_source} '      def  to_dict (self ):         return  {"text" : self.text,                 "doc_id" : self.doc_id,                 "sentence_id" : self.sentence_id,                 "reg_lst" : self.reg_lst,                 "flag" : self.flag,                 "has_reg" : self.has_reg,                 "text_source" : self.text_source} 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 def  create_index (target_es_dao, index_name, mapping ):    '''      创建es索引     :return: 是否创建成功     '''     if  not  target_es_dao.is_index_exists(index_name):         target_es_dao.create_index(index_name, mapping)     else :         target_es_dao.clear_index_data(index_name)         print (f"索引{index_name} 已存在, 已清除数据" ) def  writer_fun (target_es_dao, target_index, sample_lst ):    '''      写数据到es库     '''     df_sample_lst = []     [df_sample_lst.append(sample.to_dict()) for  sample in  sample_lst]     df_sample_lst = pd.DataFrame(df_sample_lst)     target_es_dao.save_df_data(target_index, df_sample_lst)     print (f'写入数据{len (sample_lst)} 条' ) def  es_cal_test ():         source_es_dao = EsDao(f"http://{aug_config.SOURCE_IP} :{aug_config.SOURCE_PORT} /" )     query_condition = {         "query_string" : {             "default_field" : "has_reg" ,             "query" : "true"          }     }     query_body = {         "query" : query_condition     }          datas = source_es_dao.query_for_df(index_name=aug_config.SOURCE_INDEX, query_body=query_body)     records = datas.to_dict(orient='record' )     sample_lst = []     for  record in  records:         sample_lst.append(             TaskMeta(                 text=record["text" ],                 doc_id=record["doc_id" ],                 sentence_id=record["sentence_id" ],                 reg_lst=record["reg_lst" ],                 flag=record["flag" ],                 has_reg=record["has_reg" ]             )         )          create_index(target_es_dao, aug_config.TARGET_INDEX, aug_config.MAPPING)          writer_fun(target_es_dao, aug_config.TARGET_INDEX, sample_lst=sample_lst) if  __name__ == '__main__' :    es_cal_test() 
 
Oracle数据库 
Python操作Oracle数据库:cx_Oracle 
 
环境配置 
Linux上Python连接Oracle解决报错cx_Oracle.DatabaseError: DPI-1047 
 
安装库
 
 
链接库准备,需要将oci.dll、oraocci11.dll、oraociei11.dll复制到sitepackages路径下,oracle client下载链接 ,并配置到系统环境变量,链接中没有的自己去官网(win64 、所有平台 、linux64 )注册一个账号下载对应的版本
1 2 SELECT  *  FROM  v$version;
 
没有配置会报如下的错:
1 2 3 4 5 # Windows下报错 cx_Oracle.DatabaseError: DPI -1047: Cannot  locate  a  64-bit  Oracle  Client  library : "D :\software \win_or  # Linux 下报错 cx_Oracle.DatabaseError : DPI -1047: Cannot  locate  a  64-bit  Oracle  Client  library : "libclntsh.so : cannot  open  shared  object  file : No  such  file  or  directory ". See  https ://cx -oracle.readthedocs.io /en /latest /user_guide /installation.html  for  help 
 
 
 
sql基础 建表 1 2 3 4 5 6 7 8 9 create  table  blob_table_tmp(  id number primary  key,   blob_cl blob  not  null , 	clob_cl clob  not  null  ); insert  into  blob_table_tmp values (1 ,rawtohex('11111000011111' ),'增加一条记录时,碰到插入blob类型数据出错' );insert  into  blob_table_tmp values (3 ,rawtohex('4561888' ),'增加一条记录时,碰到插入blob类型数据出错' );insert  into  blob_table_tmp values (4 ,rawtohex('增加一条记录时333' ),'增加一条记录时,碰到插入blob类型数据出错' );
 
查询 
获取连接
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 FINANCE_DB_HOST = "192.168.x.x"  FINANCE_DB_PORT = 1521  FINANCE_DB_USER = "hyc"  FINANCE_DB_PASSWORD = "123456"  FINANCE_DB_DB = "ORCL"  class  OracleConn ():    config_path = ''      @staticmethod     def  get_conn (conn_name, encoding="UTF-8"  ):         conn_str = str (eval ("%s_DB_USER"  % (OracleConn.config_path, conn_name))) + "/"  + str (eval ("%s.%s_DB_PASSWORD"  % (OracleConn.config_path, conn_name)))         conn_str += "@"  + str (eval ("%s_DB_HOST"  % (OracleConn.config_path, conn_name)))         conn_str += ":"  + str (eval ("%s_DB_PORT"  % (OracleConn.config_path, conn_name)))         conn_str += "/"  + str (eval ("%s_DB_DB"  % (OracleConn.config_path, conn_name)))         return  ora.connect(conn_str, encoding=encoding, nencoding=encoding) 
 
读写数据库
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 def  oracle_test ():         conn = OracleConn.get_conn("FINANCE" )     cur = conn.cursor()          sql = "select id,blob_cl,clob_cl from FINANCE.blob_table_tmp"      datas = []     r = cur.execute(sql)          [datas.append((gg[0 ], gg[1 ].read().decode('utf-8' ), gg[2 ].read())) for  gg in  r]          insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)"      res = []     [res.append((data[0 ], data[1 ])) for  data in  datas]     cur.executemany(insert_sql, res)     cur.execute('commit' )     cur.close()     conn.close()     print ("写入结束" ) if  __name__ == '__main__' :    oracle_test() 
 
相关操作 
关于数据库的连接,查询和写入
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import  cx_Oracleclass  Setting :    DB_USER = 'narutohyc'      DB_PASSWORD = 'hyc'      DB_IP = '192.168.0.1'      DB_PORT = ''      DB_SERVICE = 'dataBaseName'  setting = Setting() def  oracle_test ():         conn = cx_Oracle.connect('%s/%s@%s/%s'  % (setting.DB_USER, setting.DB_PASSWORD, setting.DB_IP, setting.DB_SERVICE), encoding='utf-8' )     cur = conn.cursor()          sql = "select ID, name from hyc_database"      datas = []     r = cur.execute(sql)          [datas.append((gg[0 ], gg[1 ].read())) for  gg in  r]          insert_sql = "INSERT INTO new_table(id,new_name) VALUES (:ID,:NEW_NAME)"      res = []     [res.append((data[0 ], data[1 ])) for  data in  datas]     cur.executemany(insert_sql, res)     cur.execute('commit' )     cur.close()     conn.close()     print ("写入结束" ) if  __name__ == '__main__' :    oracle_test() 
 
Postgresql数据库 
官方文档Documentation  → PostgreSQL 16 
查询  
数据类型  
 
我终于学会了使用python操作postgresql 
保姆级 CentOS 7离线安装PostgreSQL 14教程 
易百_PostgreSQL教程 
 
安装数据库 
先从centos7-pg_14.2下载 下载rpm包(微云下载centos7.6_PostgreSQL14.2 ),或者直接官方下载安装教程 安装,如果离线安装就下载rpm包
 
1 2 3 4 5 rpm -ivh postgresql14-libs-14.2-1PGDG.rhel7.x86_64.rpm rpm -ivh postgresql14-14.2-1PGDG.rhel7.x86_64.rpm rpm -ivh postgresql14-server-14.2-1PGDG.rhel7.x86_64.rpm rpm -ivh postgresql14-contrib-14.2-1PGDG.rhel7.x86_64.rpm 
 
出现OSError: Python library not found: libpython3.6mu.so.1.0, libpython3.6m.so.1.0, libpython3.6.so.1.0, libpython3.6m.so的解决办法
1 yum install python3-devel 
 
创建数据库data和log文件夹
1 2 3 4 5 6 mkdir  -p /home/postgres/pgsql_datamkdir  -p /home/postgres/pgsql_logtouch  /home/postgres/pgsql_log/pgsql.log
 
授权给安装数据时自动创建的postgres用户
1 2 chown  -R postgres:postgres /home/postgres/pgsql_datachown  -R postgres:postgres /home/postgres/pgsql_log
 
切换到安装数据时自动创建的postgres用户
 
初始化数据库到新建数据目录
1 /usr/pgsql-14/bin/initdb -D /home/postgres/pgsql_data 
 
启动服务器(初始化数据库日志文件)
1 2 3 /usr/pgsql-14/bin/pg_ctl -D  /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log start # 查看状态 /usr/pgsql-14/bin/pg_ctl -D /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log status 
 
切换到管理员开启端口并重启防火墙
1 2 3 su root firewall-cmd --zone=public --add-port=5432/tcp --permanent firewall-cmd --reload 
 
修改配置文件实现远程访问vi /home/postgres/pgsql_data/postgresql.conf
1 2 3 4 5 6 listen_addresses = '*'  max_connections = 1000 password_encryption = md5 
 
修改可访问的用户IP段
1 2 3 vi /home/pgsql_data/pg_hba.conf(a进入编辑模式,esc退出编辑模式,:wq并按回车保存) IPV4下修改为或新增 host    all             all             0.0.0.0/0               trust 
 
postgres用户重启数据库服务
1 2 su - postgres /usr/pgsql-14/bin/pg_ctl -D  /home/postgres/pgsql_data/ -l /home/postgres/pgsql_log/pgsql.log restart 
 
数据库安装结束,管理员postgres,默认密码123456
使用navicat连接pg库后新建数据库  
环境配置 python连接pg 
pip install “psycopg[binary,pool]”
 
使用sqlalchemy
1 2 3 4 5 6 7 8 9 10 11 12 from  sqlalchemy import  create_enginewith  engine.connect().execution_options(stream_results=True ) as  connection:    stream = connection.execute(sql)     while  1 :         streams = stream.fetchmany(1000 )         if  not  streams:             break          for  idx, row in  enumerate (streams):             row = row._asdict()             print (idx) 
 
 
使用psycopg
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 """ @version: v1.0 @author: huangyc @file: cur_2_test.py @Description:  @time: 2024/3/21 17:22 """ import  importlibimport  osfrom  basic_support.comm_funcs.comm_utils import  gen_uuid32from  dbutils.pooled_db import  PooledDBfrom  psycopg import  ServerCursorfrom  functools import  partialfrom  psycopg.rows import  dict_rowos.environ['NLS_LANG' ] = 'SIMPLIFIED CHINESE_CHINA.UTF8'  cls = partial(ServerCursor, name=gen_uuid32()) config = dict (host='192.168.xx.xx' , port=5432 , user='xx' , password='xx' , dbname='xx' , cursor_factory=cls) dic = dict (keepalives=1 , keepalives_idle=30 , keepalives_interval=10 , keepalives_count=5 ) config.update(dic) db_creator = importlib.import_module("psycopg" ) max_cached = 0  max_connections = 16  max_usage = 0  blocking = True  sql = f"SELECT * FROM t_tmp_filter_docs limit 5000"  """ max_cached: 池中空闲连接的最大数量。默认为0,即无最大数量限制。(建议默认) max_connections: 被允许的最大连接数。默认为0,无最大数量限制。(视情况而定) max_usage: 连接的最大使用次数。默认0,即无使用次数限制。(建议默认) blocking: 连接数达到最大时,新连接是否可阻塞。默认False,即达到最大连接数时,再取新连接将会报错。          (建议True,达到最大连接数时,新连接阻塞,等待连接数减少再连接) """ pool = PooledDB(db_creator,                 maxcached=max_cached,                 maxconnections=max_connections,                 maxusage=max_usage,                 blocking=blocking,                 **config) conn = pool.connection() cur = conn.cursor(row_factory=dict_row) batch_size = 1000  params = None  cur.itersize = batch_size cur.execute(sql, params) idx = 0  while  records := cur.fetchmany(batch_size):    idx += 1      print (idx) try :    cur.close() except :    pass  try :    conn.close() except :    pass      
 
 
 
sql语法 数据库连接 1 2 3 4 5 6 7 8 select  count (* ) from  pg_stat_activity;show  max_connections;select  *  from  pg_stat_activity;select  usename, count (* ) from  pg_stat_activity group  by  usename;
 
数据库信息 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 select  pg_size_pretty (pg_database_size('pg_fac_stk' ));SELECT     table_schema ||  '.'  ||  table_name AS  table_full_name,     pg_size_pretty(pg_total_relation_size('"'  ||  table_schema ||  '"."'  ||  table_name ||  '"' )) AS  size FROM  information_schema.tables where  table_name like  'finance_%' ORDER  BY     pg_total_relation_size('"'  ||  table_schema ||  '"."'  ||  table_name ||  '"' ) DESC ;      select  relname as  TABLE_NAME, reltuples as  rowCounts from  pg_class where  relkind =  'r'  order  by  rowCounts desc ;select  pg_relation_filepath('product' );select  version();EXPLAIN ANALYZE SELECT  *  FROM  t_cfg_opinfo; select  datname,xact_rollback,deadlocks from  pg_stat_database;
 
数据备份与恢复 
使用pgdump备份数据库 
 
pgdump是PostgreSQL官方提供的备份工具,可以将数据库的数据和架构保存到一个文件中,使用pgdump备份的优点包括:
备份数据可以保持原有的结构和特性,还原时可以保证数据准确性 
备份文件可以跨平台传输,方便进行远程备份 
备份文件可以进行压缩,减小文件大小,方便传输和存储 
 
可以新建数据库,建几张表做测试
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 # 学生表 CREATE  TABLE  students (    id SERIAL PRIMARY  KEY,     name VARCHAR (100 ) NOT  NULL ,     gender VARCHAR (10 ) NOT  NULL ,     age INTEGER  NOT  NULL ,     class VARCHAR (20 ) NOT  NULL  ); # 学科表 CREATE  TABLE  subjects (    id SERIAL PRIMARY  KEY,     name VARCHAR (100 ) NOT  NULL  ); # 成绩表 CREATE  TABLE  scores (    id SERIAL PRIMARY  KEY,     student_id INTEGER  NOT  NULL ,     subject_id INTEGER  NOT  NULL ,     score INTEGER  NOT  NULL ,     FOREIGN  KEY (student_id) REFERENCES  students (id),     FOREIGN  KEY (subject_id) REFERENCES  subjects (id) ); # 插入一些测试数据 INSERT  INTO  students (name, gender, age, class)VALUES     ('Alice' , 'Female' , 18 , 'Class A' ),     ('Bob' , 'Male' , 17 , 'Class B' ),     ('Charlie' , 'Male' , 19 , 'Class A' ),     ('Diana' , 'Female' , 18 , 'Class B' ); # 插入学科表数据 INSERT  INTO  subjects (name)VALUES     ('Mathematics' ),     ('English' ),     ('Science' ); INSERT  INTO  scores (student_id, subject_id, score)VALUES     (1 , 1 , 90 ),     (1 , 2 , 85 ),     (1 , 3 , 92 ); INSERT  INTO  scores (student_id, subject_id, score)VALUES     (2 , 1 , 78 ),     (2 , 2 , 80 ),     (2 , 3 , 75 ); INSERT  INTO  scores (student_id, subject_id, score)VALUES     (3 , 1 , 88 ),     (3 , 2 , 92 ),     (3 , 3 , 90 ); INSERT  INTO  scores (student_id, subject_id, score)VALUES     (4 , 1 , 95 ),     (4 , 2 , 88 ),     (4 , 3 , 92 ); 
 
备份 使用pgdump备份数据库非常简单,只需要在终端中输入相应的命令即可
备份整个数据库
1 2 3 4 pg_dump -h <数据库地址> -p <数据库端口> -U <数据库用户名> -F c -b -v -f <备份文件路径> <数据库名称> # 示例 /usr/pgsql-14 /bin/pg_dump -h 127 .0 .0 .1  -U postgres -p 5432  -F t -b -v -f build_hyc_test.sql.tar hyc_test 
 
 
备份指定表或数据
1 2 3 4 5 6 7 pg_dump -h <数据库地址> -p <数据库端口> -U <数据库用户名> -F c -b -v -t <表名1 > -t <表名2 > -f <备份文件路径> <数据库名称> # 示例 -- 备份指定表到sql文件 -- '-c --if -exists' 会生成 'drop table if  exist ' 命令 -- '--no-owner' 是一个选项,用于指定在导出数据库时不包括拥有者信息 pg_dump --verbose --host=192 .168 .xx.xx --port=5432  --username=postgres --file /home/huangyc/pg_bak_test/bak_hyc.sql --encoding=UTF-8  -t "public.tushare_wz_index" -t "public.tushare_us_basic" -t "public.dim_fund" -t "public.dim_index" -c --if -exists --no-owner pg_fac_stk 
 
 
 
具体参数的含义如下:
-h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址 
-p:数据库服务的监听端口,一般为默认端口5432 
-U:连接数据库的用户名 
-F:备份文件的格式,包括自定义格式c,纯文本格式p和归档格式t 
-b:在备份文件中包含备份的数据库的模式信息 
-v:备份过程中输出详细的信息 
-f:备份文件的保存路径和文件名 
-t:只备份指定的表和数据 
 
1 2 3 4 5 pg_dump - h 127.0 .0 .1  - p 5432  - U postgres - f postgres.sql.tar - Ft; pg_dumpall - d postgres - U postgres - f postgres.sql  
 
还原 使用备份文件进行恢复也非常简单,只需要在终端中输入相应的命令即可
恢复整个库
1 2 3 4 pg_restore -h <数据库地址> -p <数据库端口> -U <数据库用户名> -d <数据库名称> <备份文件路径> # 示例 /usr/pgsql-14 /bin/pg_restore -h 127 .0 .0 .1  -U postgres -p 5432  -d hyc_test_bak build_hyc_test.sql.tar 
 
 
恢复指定数据
1 2 3 4 5 6 pg_restore -h <数据库地址> -p <数据库端口> -U <数据库用户名> -t <表名1 > -t <表名2 > -d <数据库名称> <备份文件路径> # 示例 -- 对于pg_dump备份出来的sql文件,直接执行sql文件即可恢复 -- 还原指定sql文件到bak_test库(需要自己建库) psql --host=192 .168 .xx.xx --port=5432  --username=postgres -d bak_test --file /home/huangyc/pg_bak_test/bak_hyc.sql.tar 
 
 
 
具体参数的含义如下:
-h:数据库服务所在主机地址,可以是本地地址localhost或者IP地址 
-p:数据库服务的监听端口,一般为默认端口5432 
-U:连接数据库的用户名 
-d:恢复数据的目标数据库名称 
-t:只恢复指定的表和数据 
 
命令详解 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 [postgres@pg01 ~]$ pg_dump --help  用法:   pg_dump [选项]... [数据库名字] **一般选项**:   -f, --file=FILENAME          输出文件或目录名   -F, --format=c|d|t|p         输出文件格式 (c=custom, d=directory, t=tar,p=plain,plain就是sql纯文本 (默认值))   -j, --jobs =NUM               执行多个并行任务进行备份转储工作   -v, --verbose                详细模式   -V, --version                输出版本信息,然后退出   -Z, --compress=0-9           被压缩格式的压缩级别,0表示不压缩   --lock-wait-timeout=TIMEOUT  在等待表锁超时后操作失败   --no-sync                    不用等待变化安全写入磁盘   -?, --help                    显示此帮助, 然后退出 **控制输出内容选项(常用)**:   -a, --data-only              只转储数据,不包括模式,只对纯文本输出有意义   -s, --schema-only            只转储模式, 不包括数据   -c, --clean                  在重新创建之前,先清除(删除)数据库对象,如drop table。只对纯文本输出有意义   -C, --create                 指定输出文件中是否生成create database语句,只对纯文本输出有意义   -n, --schema=PATTERN         指定要导出的schema,不指定则导出所有的非系统schema   -N, --exclude-schema=PATTERN 排除导出哪些schema   -O, --no-owner               在明文格式中, 忽略恢复对象所属者   -t, --table=PATTERN          指定导出的表、视图、序列,可以使用多个-t匹配多个表,使用-t之后,-n和-N就失效了   -T, --exclude-table=PATTERN  排除表   -x, --no-privileges          不要转储权限 (grant/revoke)   --disable-triggers           在只恢复数据的过程中禁用触发器   --exclude-table-data=PATTERN do  NOT dump data for  the specified table(s)   --if-exists                  当删除对象时使用IF EXISTS   --inserts                    以INSERT命令,而不是COPY命令的形式转储数据,使用该选项可以把数据加载到非pg数据库,会使恢复非常慢                                该选项为每行生成1个单独的insert命令,?在恢复过程中遇到错误,将会丢失1行而不是全部表数据   --column-inserts             以带有列名的INSERT命令形式转储数据,例如insert into table_name(column,...) values(value1,...)   --load-via-partition-root    通过根表加载分区   --no-comments                不转储注释   --no-tablespaces             不转储表空间分配信息   --no-unlogged-table-data     不转储没有日志的表数据   --on-conflict-do-nothing     将ON CONFLICT DO NOTHING添加到INSERT命令 **控制输出内容选项(不常用)**:   -S, --superuser=NAME         指定关闭触发器时需要用到的超级用户名。 它只有在使用了--disable-triggers时才有影响。一般情况下,最好不要输入该参数,而是用 超级用户启动生成的脚本。   -b, --blobs                  在转储中包括大对象   -B, --no-blobs               排除转储中的大型对象   -E, --encoding=ENCODING      转储以ENCODING形式编码的数据   --binary-upgrade             只能由升级工具使用   --enable-row-security        启用行安全性(只转储用户能够访问的内容)   --extra-float-digits=NUM     覆盖extra_float_digits的默认设置   --disable-dollar-quoting     取消美元 (符号) 引号, 使用 SQL 标准引号   --no-publications            不转储发布   --no-security-labels         不转储安全标签的分配   --no-subscriptions           不转储订阅   --no-synchronized-snapshots  在并行工作集中不使用同步快照   --quote-all-identifiers      所有标识符加引号,即使不是关键字   --rows-per-insert=NROWS      每个插入的行数;意味着--inserts   --section=SECTION            备份命名的节 (数据前, 数据, 及 数据后)   --serializable-deferrable    等到备份可以无异常运行   --snapshot=SNAPSHOT          为转储使用给定的快照   --strict-names               要求每个表和(或)schema包括模式以匹配至少一个实体   --use-set-session-authorization                                使用 SESSION AUTHORIZATION 命令代替                                ALTER OWNER 命令来设置所有权 **联接选项**:   -d, --dbname=DBNAME      对数据库 DBNAME备份   -h, --host=主机名        数据库服务器的主机名或套接字目录   -p, --port=端口号        数据库服务器的端口号   -U, --username=名字      以指定的数据库用户联接   -w, --no-password        永远不提示输入口令   -W, --password           强制口令提示 (自动)   --role=ROLENAME          在转储前运行SET ROLE 
 
对于pg_dump的自定义备份custom和tar类型的备份,需要使用pg_restore进行恢复,pg_restore语法如下
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 [postgres@pg01 pg_backup]$ pg_restore --help  pg_restore 从一个归档中恢复一个由 pg_dump 创建的 PostgreSQL 数据库. 用法:   pg_restore [选项]... [文件名] 一般选项:   -d, --dbname=名字        连接数据库名字   -f, --file=文件名       输出文件名(- 对于stdout)   -F, --format=c|d|t       备份文件格式(应该自动进行)   -l, --list               打印归档文件的 TOC 概述   -v, --verbose            详细模式   -V, --version            输出版本信息, 然后退出   -?, --help                显示此帮助, 然后退出 恢复控制选项:   -a, --data-only              只恢复数据, 不包括模式   -c, --clean                  在重新创建之前,先清除(删除)数据库对象   -C, --create                 创建目标数据库   -e, --exit-on-error          发生错误退出, 默认为继续   -I, --index=NAME             恢复指定名称的索引   -j, --jobs =NUM               执行多个并行任务进行恢复工作   -L, --use-list=FILENAME      从这个文件中使用指定的内容表排序                                输出   -n, --schema=NAME            在这个模式中只恢复对象   -N, --exclude-schema=NAME    不恢复此模式中的对象   -O, --no-owner               不恢复对象所属者   -P, --function =NAME(args)    恢复指定名字的函数   -s, --schema-only            只恢复模式, 不包括数据   -S, --superuser=NAME         使用指定的超级用户来禁用触发器   -t, --table=NAME             恢复命名关系(表、视图等)   -T, --trigger=NAME           恢复指定名字的触发器   -x, --no-privileges          跳过处理权限的恢复 (grant/revoke)   -1, --single-transaction     作为单个事务恢复   --disable-triggers           在只恢复数据的过程中禁用触发器   --enable-row-security        启用行安全性   --if-exists                  当删除对象时使用IF EXISTS   --no-comments                不恢复注释   --no-data-for-failed-tables  对那些无法创建的表不进行                                数据恢复   --no-publications            不恢复发行   --no-security-labels         不恢复安全标签信息   --no-subscriptions           不恢复订阅   --no-tablespaces             不恢复表空间的分配信息   --section=SECTION            恢复命名节 (数据前、数据及数据后)   --strict-names               要求每个表和(或)schema包括模式以匹配至少一个实体   --use-set-session-authorization                                使用 SESSION AUTHORIZATION 命令代替                                ALTER OWNER 命令来设置所有权 联接选项:   -h, --host=主机名        数据库服务器的主机名或套接字目录   -p, --port=端口号        数据库服务器的端口号   -U, --username=名字      以指定的数据库用户联接   -w, --no-password        永远不提示输入口令   -W, --password           强制口令提示 (自动)   --role=ROLENAME          在恢复前执行SET ROLE操作 选项 -I, -n, -N, -P, -t, -T, 以及 --section 可以组合使用和指定 多次用于选择多个对象. 如果没有提供输入文件名, 则使用标准输入. 
 
表空间 
新建表空间
 
1 2 3 4 mkdir  /home/huangyc/t_fac_tschown  postgres /home/huangyc/t_fac_ts
 
pg库新建表空间
 
1 create tablespace t_fac_ts owner postgres location '/home/huangyc/t_fac_ts' ; 
 
表空间有关的一些语法
 
1 2 3 4 5 6 # 删除表空间 (需要先drop 表空间所有的表, 或者将该空间下所有的表移除才能drop 表空间) DROP  TABLESPACE t_fac_ts;# 修改具体的表到指定表空间下 ALTER  TABLE  t_fac_tushare_stock_basic SET  TABLESPACE t_fac_ts;# 修改指定库到指定表空间下 ALTER  DATABASE name SET  TABLESPACE new_tablespace;
 
锁表处理 
pg锁表解锁
 
查看被锁的表
1 2 3 select  a.locktype,a.database,a.pid,a.mode,a.relation,b.relnamefrom  pg_locks ajoin  pg_class b on  a.relation =  b.oid where  relname= 't_opt_strhdk_blsj' ;
 
 
杀死被锁的pid
1 select  pg_terminate_backend(pid);
 
 
 
表结构修改 1 2 3 4 5 6 7 8 9 10 alter  table  "user" rename to  "ts_user";alter  table  table_name add  column  col_name varchar (50 );alter  table  table_name drop  column  col_name;alter  table  table_name add  primary  key("col_name");alter  table  table_name rename column  old_col_name to  new_col_name;
 
数据更新和查询 
设置某字段的值
 
1 2 3 4 5 6 7 8 update  table_name set  col_name= new_value;UPDATE  table1SET  field_to_update =  table2.new_valueFROM  table2WHERE  table1.common_column =  table2.common_column;
 
删除表中重复数据
 
1 2 3 4 5 6 7 8 9 10 select  col1,col2,count (* ) from  old_table group  by  col1,col2;create  table  bak_table as  select  distinct  *  from  table_name;select  col1,col2,count (* ) from  bak_table group  by  col1,col2;truncate  table  old_table;insert  into  old_table (col1,col2) select  col1,col2 from  bak_table;
 
不存在插入,存在更新
 
1 insert  into  ... on  conflict(column_name) do update  set  ...
 
conflict(column_name): column_name字段是判断要查找的数据是否存在,作为判断条件
column_name必须是主键 或者其他具有唯一性的字段(如唯一键或排他键)
 
1 2 3 4 insert  into  user (id,username,address,create_date,create_by) values ('1' ,'刘德华' ,'香港' ,now(),'system' ) on  conflict(id) do update  set  address= '中国' ,update_date= now(),update_by= 'system' ; 
 
1 2 # 批量的方式 insert  into  testunnest(id, age, name) values  (unnest (array [1 ,3 ]), unnest (array [18 ,10 ]), unnest (array ['valupdated' , 'val3' ])) on  conflict (id) do update  set  age =  excluded.age, name =  excluded.name;
 
数据和结构复制 1 2 3 4 5 create  table  new_table as  select  *  from  old_table [WITH  NO  DATA];insert  into  new_table (col_0, col_1) select  col_0, col_1 from  old_table;
 
数据导入 1 2 3 4 5 6 7 8 9 10 timescaledb-parallel-copy  \   --connection "host=192 .168 .123 .xx port=xx user=postgres password=xx dbname=xx" \   --table xxx \   --file /extension/outside_data/small.csv \   --workers 16  \   --copy -options "CSV ENCODING 'UTF8'" \   --skip-header \   --columns "id,ts_code,factor_name,xx,xx,biz_datetime,cycle_unit,insert_time,op_instruct,inc_flag" \   --batch-size 5000  \   --verbose 
 
csv示例
 
1 2 3 start_biz_datetime,end_biz_datetime,factor_num_val,inc_flag,factor_name,ts_code,cycle_unit,version,id,insert_time 1991-04-04 00:00:00+08:00,1991-04-04 23:59:59,0.0,False,移动平均线_差比_3d_90d,000001.SZ,day,0,c6ef34aee1994ef9834fee9d5d95e884,2025-06-19 02:06:02 1991-04-05 00:00:00+08:00,1991-04-07 23:59:59,0.0,False,移动平均线_差比_3d_90d,000001.SZ,day,0,0d5f77ff302e45bfbd1796f16b0cdc05,2025-06-19 02:06:02 
 
视图 
普通视图
 
视图是一个虚拟表,它是根据一个或多个基本表的查询结果动态生成的,每次查询视图时都会执行相应的查询
1 2 3 4 5 6 CREATE  VIEW  view_name AS SELECT  column1, column2, ...FROM  table_nameWHERE  condition ;drop  view  view_name;
 
物化视图
 
物化视图是一个实际存储数据的表,它的数据定期刷新,不像普通视图那样每次查询都重新计算。
1 2 3 4 5 6 CREATE  MATERIALIZED VIEW  materialized_view_name AS SELECT  column1, column2, ...FROM  table_nameWHERE  condition ;drop  MATERIALIZED VIEW  materialized_view_name
 
需要注意的是,物化视图需要定期手动或自动刷新以更新数据,你可以使用 REFRESH MATERIALIZED VIEW 命令来进行刷新
分页查询 在PostgreSQL数据库中,进行分页查询通常使用LIMIT和OFFSET子句来实现
LIMIT用于限制返回的行数,OFFSET用于指定从结果集的哪一行开始返回数据,下面是一个简单的分页查询示例:
假设我们有一个名为products 的表,其中包含产品的名称和价格,我们想要获取第2页,每页显示10条数据,可以使用以下SQL查询语句:
1 2 3 4 SELECT  product_name, priceFROM  productsORDER  BY  product_idLIMIT 10  OFFSET  10 ; 
 
在这个例子中,假设product_id是产品表中的唯一标识符,通过ORDER BY将结果按照product_id排序,LIMIT 10表示返回10行数据,OFFSET 10表示从结果集中的第11行开始返回数据,即返回第2页的数据
乱序查询 在PostgreSQL数据库中,使用ORDER BY RANDOM()语句可以实现按照随机顺序排序查询结果
这样可以让查询结果以随机的顺序返回,每次执行查询时都会得到不同的排序结果
这在需要随机抽样数据或随机展示数据时非常有用
需要注意的是,使用ORDER BY RANDOM()可能会影响查询的性能,因为需要对结果集进行随机排序操作
假设我们有一个名为products 的表,其中包含产品的名称和价格,我们想要以随机顺序返回产品列表,可以使用以下SQL查询语句:
1 2 3 4 5 6 7 SELECT     product_name,     price FROM     products ORDER  BY     RANDOM(); 
 
这将返回products表中产品名称和价格的数据,并以随机的顺序排序,每次执行该查询,返回的产品列表顺序都会不同
列表查询 
列表字段的创建
 
1 2 3 4 CREATE  TABLE  your_table (    id SERIAL PRIMARY  KEY,     list_column_name _TEXT ); 
 
在PostgreSQL中,你可以使用以下方式来查询数组字段:
查询数组字段中包含特定值的行: 
 
1 SELECT  *  FROM  your_table WHERE  your_array_column =  ANY ('{value1,value2,value3}' );
 
查询数组字段中包含任何查询数组中的值的行: 
 
1 SELECT  *  FROM  your_table WHERE  your_array_column &&  '{value1,value2,value3}' ;
 
查询数组字段中包含所有查询数组中的值的行: 
 
1 SELECT  *  FROM  your_table WHERE  your_array_column @>  '{value1,value2,value3}' ;
 
查询数组字段的长度大于1的所有行: 
 
1 SELECT  *  FROM  your_table WHERE  array_length(your_array_column, 1 ) >  1 ;
 
查询数组字段中第一个元素等于特定值的行: 
 
1 SELECT  *  FROM  your_table WHERE  your_array_column[1 ] =  'specific_value' ;
 
删除重复记录 
postgresql 常用的删除重复数据方法 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 create  table  hyc_tmp_del_test(id int , name varchar (255 ));create  table  hyc_tmp_del_test_bk (like  hyc_tmp_del_test);insert  into  hyc_tmp_del_test select  generate_series(1 , 10000 ), 'huangyc' ;insert  into  hyc_tmp_del_test select  generate_series(1 , 10000 ), 'huangyc' ;insert  into  hyc_tmp_del_test_bk select  *  from  hyc_tmp_del_test;explain analyse delete  from  hyc_tmp_del_test_bk a where  a.ctid <>  (select  min (t.ctid) from  hyc_tmp_del_test_bk t where  a.id= t.id);  explain analyse delete  from  hyc_tmp_del_test_bk a where  a.ctid not  in  (select  min (ctid) from  hyc_tmp_del_test_bk group  by  id);  explain analyze delete  from  hyc_tmp_del_test_bk a where  a.ctid =  any (array  (select  ctid from  (select  row_number () over  (partition  by  id), ctid from  hyc_tmp_del_test_bk) t where  t.row_number >  1 ));  
 
第二种和第三种感觉差不多,原文说是第三种快不少,这里pg库是14.x版本
关键
 
pg中每个表都有几个系统隐藏列:tableoid, xmin, xmax,cmin,cmax,ctid
其中tableoid表示表的oid,cmin、cmax、xmin和xmax是mvcc的实现有关
ctid表示行版本在表中的物理位置 : 它属于对象标识符类型(oid,Object Identifier Types),是一种行标识符,它的数据使用的元组标识符(tid,tuple identifier)。元组ID是一对(块号,块内的元组索引),用于标识当前行的物理位置。
引申用法
 
假设我们有一个表格 products,包含产品名称和产品类别。我们希望从中找出每个产品类别中的前两个产品
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 SELECT   *  FROM   (     SELECT        product_name,       category,       row_number () OVER  (PARTITION  BY  SUBSTR(p.category, 0 , LENGTH(p.category)-2 ) ORDER  BY  product_name) AS  ranking     FROM        products p     WHERE        p.category LIKE  'Electronics:%'    ) subquery WHERE   ranking <=  2 ; 
 
在这个例子中,我们有一个产品表 products,其中包含 product_name 和 category 列
我们想要找出每个以 ‘Electronics:’ 开头的产品类别中的前两个产品
子查询部分:
子查询从 products 表中选择了产品名称 product_name 和产品类别 category 
我们只关注以 ‘Electronics:’ 开头的产品类别,即 p.category LIKE 'Electronics:%' 
使用 row_number() 函数,对每个以 ‘Electronics:’ 开头的类别进行编号,按产品名称排序 
 
 
主查询部分:
在主查询中,我们选择了子查询的所有列 * 
我们进一步筛选了结果,只选择了编号小于等于2的记录,以获取每个类别中的前两个产品 
 
 
 
索引 1 2 3 4 5 6 7 8 select  *  from  pg_indexes where  tablename =  't_cfg_opinfo' ; create  index index_name on  table_name (col_0, col_1);select  *  from  pg_indexes where  tablename= 'table_name' ;drop  index index_name;
 
什么情况下要避免使用索引?
 
虽然索引的目的在于提高数据库的性能,但这里有几个情况需要避免使用索引
使用索引时,需要考虑下列准则 :
索引不应该使用在较小的表上 
索引不应该使用在有频繁的大批量的更新或插入操作的表上 
索引不应该使用在含有大量的 NULL 值的列上 
索引不应该使用在频繁操作的列上 
 
后台建索引
 
1 2 3 4 5 export PGPASSWORD='xxxxx' nohup psql -h localhost --port=xxxx -U postgres -d xxx -c "CREATE INDEX CONCURRENTLY idx_t_tushare_stk_mins_1min_trade_time_desc ON t_tushare_stk_mins_1min (trade_time DESC)" > index.log 2 >&1  & # 用下面命令看进度 SELECT * FROM pg_stat_progress_create_index; 
 
1 2 3 4 5 6 7 8 # 数据导出和导入 docker exec -it timescaledb /bin/bash export PGPASSWORD='xxx' nohup pg_dump --verbose --host=127 .0 .0 .1  --port=xxx --username=xxx --file=/outside_data/bak_tushare_cold_start_stk_1min.sql --encoding=UTF-8  -t "t_tushare_stk_mins_1min" -t "public.t_log_increase_infos" -c --if -exists --no-owner db_finance > pg_tushare_cold_dump_stk_1min.log 2 >&1  &   这是我备份的命令  export PGPASSWORD='xx' nohup psql --host=127 .0 .0 .1  --port=xxx --username=xxx -d db_finance --file /outside_data/pg_data_bak_tushare_stk_1min_250529/bak_tushare_cold_start_stk_1min_filter_two.sql > restore .log 2 >&1  & 
 
实用sql 1 2 SELECT  (CASE  WHEN  MAX (version) IS  NULL  THEN  -1  ELSE  MAX (version) END ) +  1  AS  version FROM  table_name
 
表触发器 
在数据库中添加触发器,在表上建立触发器,在每次 INSERT/UPDATE/DELETE 时发出通知
1 2 3 4 5 6 7 8 9 10 11 12 13 CREATE OR REPLACE  FUNCTION notify_system_config_change() RETURNS trigger AS $$ BEGIN   PERFORM pg_notify('system_config_channel', 'changed');   RETURN NEW; END; $$ LANGUAGE plpgsql; DROP TRIGGER IF  EXISTS system_config_change_trigger ON system_config; CREATE TRIGGER system_config_change_trigger AFTER INSERT OR UPDATE OR DELETE ON system_config FOR  EACH STATEMENTEXECUTE FUNCTION notify_system_config_change(); 
 
 
在你的后台(FastAPI / Django / 独立线程)中监听这个 channel:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 import  psycopg2import  selectimport  threadingdef  listen_system_config_changes ():    conn = psycopg2.connect(         dbname="postgres" ,         user="postgres" ,         password="password" ,         host="localhost" ,         port="5432"      )     conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)     cur = conn.cursor()     cur.execute("LISTEN system_config_channel;" )     print ("🟢 正在监听 system_config_channel..." )     while  True :         if  select.select([conn], [], [], 5 ) == ([], [], []):             continue          conn.poll()         while  conn.notifies:             notify = conn.notifies.pop(0 )             print (f"🔔 收到通知: {notify.payload} " )                          reload_system_config() def  reload_system_config ():    global  system_config_dict     data = system_config_cache.get_all()     system_config_dict.clear()     system_config_dict.update(data)     for  func in  cfg_from_db_funcs:         func(system_config_dict)     logger.trace(f"system_config_dict自动更新: {system_config_dict} " ) threading.Thread(target=listen_system_config_changes, daemon=True ).start() 
 
 
 
✅ 优点:
无需 Django ORM,也能实时触发。 
不会频繁轮询数据库。 
性能优异,官方机制 
 
其他语法 
筛选某列,逗号拼接
 
1 2 3 select  string_agg(bs_org_id,',' ) as  bs_org_ids   from  bs_org    where  par_org_id = '100'  
 
日期转换
 
1 2 select  to_char(col_name,'yyyyMMDD' )- interval  '2 day'  from  table_name
 
转时间戳
 
1 2 select  '2011-01-06 09:57:59' ::timestamp ;TO_TIMESTAMP('2011-01-06 09:57:59' , 'YYYY-MM-DD HH24:MI:S' ) 
 
postgresql 获取分组第一条数据 窗口函数
 
给数据分组并排名,使用 row_number() over (partition by 分组的字段名 order by 排序规则) as 排名 
从上述第一步中取出,排名为第一的数据,即为第一条数据 select * from 上述第一步 where 排名=1 
获取前N名的数据,将一中第二步的条件换成where 排名 < N+1 
 
 distributed key
 
1 2 alter  table  table_name set  distributed by  (id);alter  table  table_name add  primary  key (id);
 
ORM框架 ORM框架比较 
一文了解 Python 的三种数据源架构模式 
SQLAlchemy 和其他的 ORM 框架 
SQLObject
 
优点:
采用了易懂的ActiveRecord 模式
一个相对较小的代码库
 
缺点:
方法和类的命名遵循了Java 的小驼峰风格
不支持数据库session隔离工作单元
 
 
Storm
 
Django’s ORM
 
peewee
 
优点:
Django式的API,使其易用
轻量实现,很容易和任意web框架集成
 
缺点:
不支持自动化 schema 迁移
多对多查询写起来不直观
 
 
SQLAlchemy
 
优点:
企业级API,使得代码有健壮性和适应性
灵活的设计,使得能轻松写复杂查询
 
缺点:
工作单元概念不常见
重量级API,导致长学习曲线
 
 
相比其他的ORM, SQLAlchemy 意味着,无论你何时写SQLAlchemy代码, 都专注于工作单元的前沿概念 。DB Session 的概念可能最初很难理解和正确使用,但是后来你会欣赏这额外的复杂性,这让意外的时序提交相关的数据库bug减少到0。在SQLAlchemy中处理多数据库是棘手的, 因为每个DB session 都限定了一个数据库连接。但是,这种类型的限制实际上是好事, 因为这样强制你绞尽脑汁去想在多个数据库之间的交互, 从而使得数据库交互代码很容易调试。
SQLAlchemy 
SQLAlchemy 1.4 Documentation 
sqlalchemy操作数据库 
sqlalchemy外键和relationship查询 
SQLALlchemy数据查询小集合 
SQLAlchemy 的连接池机制 
SQLAlchemy 中的 Session、sessionmaker、scoped_session 
Contextual/Thread-local Sessions 
SQLAlchemy(常用的SQLAlchemy列选项) 
查询官网例子Object Relational Tutorial (1.x API) 
sqlalchemy外键和relationship查询 
 
session和scoped_session session用于创建程序和数据库之间的会话,所有对象的载入和保存都需通过session对象 。 通过sessionmaker调用创建一个工厂,并关联Engine以确保每个session都可以使用该Engine连接资源 scoped_session 实现了一个线程的隔离,保证不同的线程拿到不同的session, 同一个线程拿到的session 是同一个值
1 2 3 4 5 6 7 s1 = Session() s2 = Session() s1.add(person) s1.commit() s1.close() s2.add(person) 
 
session 和scoped_session本质上都是用来操作数据库的,只是session 只适合在单线程下面使用 官方文档提到了scoped_session的正确使用方法。request结束后要调用scoped_session.remove()
Engine Configuration 
 
使用 create_engine创建我们需要的DB starting point
1 2 3 4 from  sqlalchemy import  create_enginescheme = 'mysql+pymysql://root:123456@localhost:3306/dev_shopping?charset=utf8'  engine = create_engine(scheme, pool_size=10  , max_overflow=-1 , pool_recycle=1200 ) 
 
create_engine 函数常用参数:
pool_size=10 # 连接池的大小,0表示连接数无限制 
pool_recycle=-1 # 连接池回收连接的时间,如果设置为-1,表示没有no timeout, 注意,mysql会自动断开超过8小时的连接,所以sqlalchemy沿用被mysql断开的连接会抛出MySQL has gone away 
max_overflow=-1 # 连接池中允许‘溢出’的连接个数,如果设置为-1,表示连接池中可以创建任意数量的连接 
pool_timeout=30 # 在连接池获取一个空闲连接等待的时间 
echo=False # 如果设置True, Engine将会记录所有的日志,日志默认会输出到sys.stdout 
 
创建Engine之后,接下来的问题,就是如何使用Engine
在单进程中,建议在在初始化的模块的时候创建Engine, 使Engine成为全局变量, 而不是为每个调用Engine的对象或者函数中创建, Engine不同于connect, connect函数会创建数据库连接的资源,Engine是管理connect创建的连接资源
在多进程中,为每个子进程都创建各自的Engine, 因为进程之间是不能共享Engine
几种操作方式 
Working with Engines and Connections 
SqlAlchemy的Engine,Connection和Session 区别?适合什么时候用? 
Engine方式
 
Engine是SQLAlchemy中连接数据库最底层级别的对象,它维护了一个连接池,可以在应用程序需要和数据库对话时使用。在Engine.execute(close_with_result=True) close_with_result=True 表示连接自动关闭;
1 2 3 4 5 6 result = engine.execute('SELECT * FROM tablename;' )  conn = engine.connect(close_with_result=True ) result = conn.execute('SELECT * FROM tablename;' ) for  row in  result:    print (result['columnname' ] result.close() 
 
Connection方式
 
Connection,实际上是执行SQL查询的工作,每当你想更好的控制连接的属性,如何时关闭等都建议使用这个操作;比如在一个事务中,要控制它提交commit的时间,在connection控制中就可以运行多个不同的SQL语句,如果其中一个出现问题,则其他所有的语句都会撤销更改;
1 2 3 4 5 6 7 8 9 connection = engine.connect() trans = connection.begin() try :    connection.execute("INSERT INTO films VALUES ('Comedy', '82 minutes');" )     connection.execute("INSERT INTO datalog VALUES ('added a comedy');" )     trans.commit() except :    trans.rollback()     raise  
 
Session方式
 
Session,一般都是用于ORM中,因为在ORM中,会自动生成SQL语句以及自动连接数据库(自己配置),使用session.execute()也是个编辑的方法,可以将会话绑定到任何对象;如果你确定使用ORM,就建议使用session来处理execute(),否则还是使用connection更好方便;
总结: 从应用角度来看,可以把这三类分为两种:
 
直接使用Engine.execute() 或Connection.execute(),更加灵活,可以使用原生SQL语句
 
使用Session处理交易类型的数据,因为方便使用session.add(), session.rollback(), session.commit(), session.close()等,它是使用ORM时推荐的一种和数据库交互的方式