本文整理自网络,侵删。
目录
- 创建增量ID记录表
- 数据库连接类
- 增量数据服务客户端
- 结果测试
本次主要分享结合单例模式实际应用案例:实现实时增量数据加载工具的解决方案。最关键的是实现一个可进行添加、修改、删除等操作的增量ID记录表。
单例模式:提供全局访问点,确保类有且只有一个特定类型的对象。通常用于以下场景:日志记录或数据库操作等,避免对用一资源请求冲突。
创建增量ID记录表
import?sqlite3 import?datetime import?pymssql import?pandas?as?pd import?time pd.set_option('expand_frame_repr',?False)
导入所需模块
?#?创建数据表 database_path?=?r'.\Database\ID_Record.db' from?sqlite3?import?connect with?connect(database_path)?as?conn: ????conn.execute( ????????'CREATE?TABLE?IF?NOT?EXISTS?Incremental_data_max_id_record(id?INTEGER?PRIMARY?KEY?AUTOINCREMENT,F_SDaqID_MAX?TEXT,record_date?datetime)')
增量最新记录ID-F_SDaqID_MAX数据库存储
#数据保存到本地txt def?text_save(filename,?record):#filename为写入txt文件的路径,record为要写入F_SDaqID_MAX、record_date数据列表. ????file?=?open(filename,'a')?追加方式 ????#?file?=?open(filename,?'w')??#覆盖方式 ????for?i?in?range(len(record)): ????????s?=?str(record[i]).replace('[','').replace(']','') ????????s?=?s.replace("'",'').replace(',','')?+'\n'???#去除单引号,逗号,每行末尾追加换行符 ????????file.write(s) ????file.close()
增量最新记录ID-F_SDaqID_MAX临时文件存储
增量ID记录提供了两种实现方案 ,一个是数据持久化存储模式,另一个是临时文件存储模式。数据持久化模式顾名思义,也就是说在创建对象的时候,能将操作关键信息如增量ID-F_SDaqID_MAX记录下来,这种flag记录映射是常选择的设计模式。
数据库连接类
实现实时增量数据获取需要实现两个数据库连接类:增量数据ID存储类和增量目标数据源类。这里利用单例模式实现数据库操作类,将增量服务记录信息按照顺序存储到数据库或特定的日志文件中,以维护数据的一致性。
1、增量数据ID存储sqlite连接类代码
class?Database_sqlite(metaclass=MetaSingleton): ????database_path?=?r'.\Database\energy_rc_configure.db' ????connection?=?None ????def?connect(self): ????????if?self.connection?is?None: ????????????self.connection?=?sqlite3.connect(self.database_path,check_same_thread=False,isolation_level=None) ????????????self.cursorobj?=??self.connection.cursor() ????????return?self.cursorobj,self.connection ????#?插入最大记录 ????@staticmethod ????def?Insert_Max_ID_Record(f1,?f2): ????????cursor?=?Database_sqlite().connect() ????????print(cursor) ????????sql?=?f"""insert?into?Incremental_data_max_id_record(F_SDaqID_MAX,record_date)?values("{f1}","{f2}")""" ????????cursor[0].execute(sql) ????????#?sql?=?"insert??into?Incremental_data_max_id_record(F_SDaqID_MAX,record_date)?values(?,?)" ????????#?cursor[0].execute(sql,(f"{f1}",f"{f2}")) ????????cursor[1].commit() ????????print("插入成功!") ????????#?cursor[0].close() ????????return? ????#?取出增量数据库中最新一次ID记录 ????@staticmethod ????def?View_Max_ID_Records(): ????????cursor?=?Database_sqlite().connect() ????????sql?=?"select?max(F_SDaqID_MAX)?from?Incremental_data_max_id_record" ????????cursor[0].execute(sql) ????????results?=?cursor[0].fetchone()[0] ????????#?#单例模式不用关闭数据库连接 ????????#?cursor[0].close() ????????print("最新记录ID",?results) ????????return?results ????#删除数据记录ID ????@staticmethod ????def?Del_Max_ID_Records(): ????????cursor?=?Database_sqlite().connect() ????????sql?=?"delete?from?Incremental_data_max_id_record?where?record_date?=?(select?MAX(record_date)?from?Incremental_data_max_id_record)" ????????cursor[0].execute(sql) ????????#?results?=?cursor[0].fetchone()[0] ????????#?#?cursor[0].close() ????????cursor[1].commit() ????????print("删除成功") ????????return
2、增量数据源sqlserver连接类代码
class?Database_sqlserver(metaclass=MetaSingleton): ????""" ????#实时数据库 ????""" ????connection?=?None ????#?def?connect(self): ????def?__init__(self): ????????if?self.connection?is?None: ????????????self.connection?=?pymssql.connect(host="xxxxx",user="xxxxx",password="xxxxx",database="xxxxx",charset="utf8") ????????????if?self.connection: ????????????????print("连接成功!") ????????????#?打开数据库连接 ????????????self.cursorobj?=?self.connection.cursor() ????????#?return?self.cursorobj,?self.connection ????#?获取数据源中最大ID ????@staticmethod ????def?get_F_SDaqID_MAX(): ????????#?cursor_insert?=?Database_sqlserver().connect() ????????cursor_insert?=?Database_sqlserver().cursorobj ????????sql_MAXID?=?"""select?MAX(F_SDaqID)?from?T_DaqDataForEnergy""" ????????cursor_insert.execute(sql_MAXID)??#?执行查询语句,选择表中所有数据 ????????F_SDaqID_MAX?=?cursor_insert.fetchone()[0]??#?获取记录 ????????print("最大ID值:{0}".format(F_SDaqID_MAX)) ????????return?F_SDaqID_MAX ????#?提取增量数据 ????@staticmethod ????def?get_incremental_data(incremental_Max_ID): ????????#?开始获取增量数据 ????????sql_incremental_data?=?"""select?F_ID,F_Datetime,F_Data?from?T_DaqDataForEnergy??where?F_ID?>?{0}""".format( ????????????incremental_Max_ID) ????????#?cursor_find?=?Database_sqlserver().connect() ????????cursor_find?=?Database_sqlserver().cursorobj ????????cursor_find.execute(sql_incremental_data)??#?执行查询语句,选择表中所有数据 ????????Target_data_source?=?cursor_find.fetchall()??#?获取所有数据记录 ????????#?cursor_find.close() ????????cursor_find.close() ????????df?=?pd.DataFrame( ????????????Target_data_source, ????????????columns=[ ????????????????"F_ID", ????????????????"F_Datetime", ????????????????"F_Data"]) ????????print("提取数据",?df) ????????return?df
数据资源应用服务设计主要考虑数据库操作的一致性和优化数据库的各种操作,提高内存或CPU利用率。
实现多种读取和写入操作,客户端操作调用API,执行相应的DB操作。
注:
1、使用metaclass实现创建具有单例特征的类
Database_sqlserver(metaclass=MetaSingleton)
Database_sqlite(metaclass=MetaSingleton)
使用class定义新类时,数据库类Database_sqlserver由MetaSingleton装饰后即指定了metaclass,那么MetaSingleton的特殊方法__call__方法将自动执行。
class?MetaSingleton(type): ????_instances={} ????def?__call__(cls,?*args,?**kwargs): ????????if?cls?not?in?cls._instances: ????????????cls._instances[cls]?=?super(MetaSingleton,cls).__call__(*args,**kwargs) ????????return?cls._instances[cls]
以上代码基于元类的单例实现,当客户端对数据库执行某些操作时,会多次实例化数据库类,但是只创建一个对象,所以对数据库的调用是同步的。
2、多线程使用同一数据库连接资源需采取一定同步机制
如果没采用同步机制,可能出现一些意料之外的情况
相关阅读 >>
android studio连接Sqlite数据库的登录注册实现
Sqlitestudio优雅调试android手机数据库Sqlite(推荐)
android开发之contentprovider的使用详解
更多相关阅读请进入《Sqlite》频道 >>

数据库系统概念 第6版
机械工业出版社
本书主要讲述了数据模型、基于对象的数据库和XML、数据存储和查询、事务管理、体系结构等方面的内容。
转载请注明出处:木庄网络博客 » Python实现实时增量数据加载工具的解决方案
相关推荐
评论
管理员已关闭评论功能...