import pandas as pd
from sqlalchemy import create_engine
import cx_Oracle as cx
# oracle 12c数据库
ip = ''
port = ''
uname = '' # 用户名
pwd = '' # 密码
tnsname = '' # 实例名
dsnStr = cx.makedsn(ip, port, service_name=tnsname)
connect_str = "oracle://%s:%s@%s" % (uname, pwd, dsnStr)
engine = create_engine(connect_str, encoding='utf-8',echo=True,max_identifier_length=128)
# 文件路径
default_path=''
# 导出某张表数据
def query_wirte(table_name):
sql = "select * from "+table_name+" where rownum < 11"
df = pd.read_sql(sql, engine)
# Get the number of columns
column_length=df.shape[1]
# Get number of rows
#rows=df.shape[0]
# 当某些表字段非常多的情况
if column_length>=256:
df.to_excel(default_path+table_name+'.xlsx',index=False)
else:
df.to_excel(default_path + table_name + '.xls', index=False)
# 导出某张表字段和结构
def query_table(table_name):
sql = "select ut.COLUMN_NAME," \
"uc.comments," \
"ut.DATA_TYPE," \
"ut.DATA_LENGTH," \
"ut.NULLABLE " \
"from user_tab_columns ut " \
"inner JOIN user_col_comments uc " \
"on ut.TABLE_NAME = uc.table_name " \
"and ut.COLUMN_NAME = uc.column_name " \
"where ut.Table_Name='"+table_name+"' " \
"order by ut.column_name"
df = pd.read_sql(sql, engine)
df.to_excel(default_path + table_name + '_annotation.xls', index=False)
# 读取某excel文件写入到某张表
def read_wirte_xls(table_name):
df = pd.read_excel(default_path+table_name+'.xls')
df.to_sql(table_name, engine, index=False, if_exists='append')
# df转json
def to_json(df,orient='records'):
return df.to_json(orient = orient, force_ascii = False)
if __name__ == '__main__':
query_wirte('table_name')
query_table('table_name')
read_wirte_xls('table_name')
Q.E.D.