1)with cls.lock加锁
class?MetaSingleton(type): ????_instances={} ????lock?=?threading.Lock() ????def?__call__(cls,?*args,?**kwargs): ????????with?cls.lock: ????????????if?cls?not?in?cls._instances: ????????????????time.sleep(0.05)??#模拟耗时 ????????????????cls._instances[cls]?=?super(MetaSingleton,cls).__call__(*args,**kwargs) ????????????return?cls._instances[cls]
锁的创建和释放需要消耗资源,上面代码每次创建都必须获得锁。
3、如果我们开发的程序非单个应用,而是集群化的,即多个客户端共享单个数据库,导致数据库操作无法同步,而数据库连接池是更好的选择。大大节省了内存,提高了服务器地服务效率,能够支持更多的客户服务。
数据库连接池的解决方案是在应用程序启动时建立足够的数据库连接,并讲这些连接组成一个连接池,由应用程序动态地对池中的连接进行申请、使用和释放。对于多于连接池中连接数的并发请求,应该在请求队列中排队等待。
增量数据服务客户端
增量处理策略:第一次加载先判断增量数据表中是否存在最新记录,若有直接加载;否则,记录一下最大/最新的数据记录ID或时间点,保存到一个增量数据库或记录文件中。
从第二次加载开始只加载最大/最新的ID或时间点以后的数据。当加载过程全部成功完成之后并同步更新增量数据库或记录文件,更新这次数据记录的最后记录ID或时间点。
一般这类数据记录表有自增长列,那么也可以使用自增长列来实现这个标识特征。比如本次我用到数据表增长列F_ID。
class?IncrementalRecordServer: ????_servers?=?[] ????_instance?=?None ????def?__new__(cls,?*args,?**kwargs): ????????if?not?IncrementalRecordServer._instance: ????????????#?IncrementalRecordServer._instance?=?super().__new__(cls) ????????????IncrementalRecordServer._instance?=?super(IncrementalRecordServer,cls).__new__(cls) ????????return?IncrementalRecordServer._instance ????def?__init__(self,changeServersID=None): ????????""" ????????变量初始化过程 ????????""" ????????self.F_SDaqID_MAX?=?Database_sqlserver().get_F_SDaqID_MAX() ????????self.record_date?=?datetime.datetime.now().strftime('%Y-%m-%d?%H:%M:%S') ????????self.changeServersID?=?changeServersID ????#?回调更新本地记录,清空记录替换,临时记录 ????def?record(func): ????????def?Server_record(self): ????????????v?=?func(self) ????????????text_save(filename=r"F:\AutoOps_platform\Database\Server_record.txt",record=IncrementalRecordServer._servers) ????????????print("保存成功") ????????????return?v ????????return?Server_record ????#增加服务记录 ????@record ????def?addServer(self): ????????self._servers.append([int(self.F_SDaqID_MAX),self.record_date]) ????????print("添加记录") ????????Database_sqlite.Insert_Max_ID_Record(f1=self.F_SDaqID_MAX,?f2=self.record_date) ????#修改服务记录 ????@record ????def?changeServers(self): ????????#?self._servers.pop() ????????#?此处传入手动修改的记录ID ????????self._servers.append([self.changeServersID,self.record_date]) ????????#先删除再插入实现修改 ????????Database_sqlite.Del_Max_ID_Records() ????????Database_sqlite.Insert_Max_ID_Record(f1=self.changeServersID,?f2=self.record_date) ????????print("更新记录") ????#删除服务记录 ????@record ????def?popServers(self): ????????#?self._servers.pop() ????????print("删除记录") ????????Database_sqlite.Del_Max_ID_Records() ????#?最新服务记录 ????def?getServers(self): ????????#?print(self._servers[-1]) ????????Max_ID_Records?=?Database_sqlite.View_Max_ID_Records() ????????print("查看记录",Max_ID_Records) ????????return?Max_ID_Records ????#提取数据 ????def?Incremental_data_client(self): ????????""" ????????#?提取数据(增量数据MAXID获取,并提取增量数据) ????????""" ????????#?实时数据库 ????????#?第一次加载先判断是否存在最新记录 ????????if?self.getServers()?==?None: ????????????#?插入增量数据库ID ????????????self.addServer() ????????????#?提取增量数据 ????????????data?=?Database_sqlserver.get_incremental_data(self.F_SDaqID_MAX) ????????????return?data ????????#?获取增量数据库中已有的最新最大ID记录 ????????incremental_Max_ID?=?self.getServers() ????????#添加记录 ????????self.addServer() ????????#?提取增量数据 ????????Target_data_source?=?Database_sqlserver.get_incremental_data(incremental_Max_ID) ????????return?Target_data_source
优化策略:
1、延迟加载方式
以上增量记录服务类IncrementalRecordServer通过覆盖__new__方法来控制对象的创建,我们在创建对象的时候会先检查对象是否存在。也可以通过懒加载的方式实现,节约资源优化如下。
class?IncrementalRecordServer: ????_servers?=?[] ????_instance?=?None ????def?__init__(self,changeServersID=None): ????????""" ????????变量初始化过程 ????????""" ????????self.F_SDaqID_MAX?=?Database_sqlserver().get_F_SDaqID_MAX() ????????self.record_date?=?datetime.datetime.now().strftime('%Y-%m-%d?%H:%M:%S') ????????self.changeServersID?=?changeServersID ????????if?not?IncrementalRecordServer._instance: ????????????print("__init__对象创建") ????????else: ????????????print("对象已经存在:",IncrementalRecordServer._instance) ????????????self.getInstance() ????@classmethod ????def?getInstance(cls): ????????if?not?cls._instance: ????????????cls._instance?=?IncrementalRecordServer() ????????return?cls._instance
懒汉式实例化能够确保实际需要时才创建对象,实例化a= IncrementalRecordServer()时,调用初始化__init__方法,但是没有新的对象创建。懒汉式这种方式加载类对象,也称为延迟加载方式。
2、单例模式能有效利用空间资源,每次利用同一空间资源。
不同操作对象的内存地址相同,且不同对象初始化将上一个对象初始化变量覆盖,确保最新记录实时更新。表面上以上代码实现了单例模式没问题,但多线程并发情况下,存在线程安全问题,可能同时创建不同的对象空间。考虑到线程安全,也可以进一步加锁处理.
3、适用范围及注意事项
本次代码适用于部署生产指定时间点运行之后产出的增量数据,长时间未启用再启动需要清空历史记录即增量数据库或文件ID需清空,一般实时数据增量实现一次加载没有什么问题,所以这一点也不用很关注(文件方式代码可自行完善);当加载历史数据库或定时间隔产生数据量过大时,需要进一步修改代码,需要判断数据规模,指定起始节点及加载数据量,综合因素考虑,下次分享一下亿级数据量提取方案。
4、进一步了解Python垃圾回收机制;并发情况下,通过优化线程池来管理资源。
最后可以添加一个函数来释放资源
def?__del__(self): ????class_name?=?self.__class__.__name__ ????print(class_name,"销毁")
del obj 调用__del__() 销毁对象,释放其空间;只有Python 对象在不再引用对象时被释放。当程序中有其它变量引用该实例对象时,即便手动调用 __del__() 方法,该方法也不会立即执行。这和 Python 的垃圾回收机制的实现有关。
结果测试
if?__name__?==?'__main__': ????for?i?in?range(6): ????????hc1?=?IncrementalRecordServer() ????????hc1.addServer() ????????print("Record_ID",hc1._servers[i]) ????????#?del?hc1 ????????time.sleep(60) ????#Server2-客户端client ????#?最新服务记录 ????hc2?=?IncrementalRecordServer() ????hc2.getServers() ????#查看增量数据 ????hc2.Incremental_data_client()
插入记录
模拟每1分钟插入一条记录,向增量数据库插入7条
if?__name__?==?'__main__': ????#?Server3-客户端client ????#?手动添加增量起始ID记录 ????hc3?=?IncrementalRecordServer(changeServersID='346449980') ????hc3.changeServers()
if?__name__?==?'__main__': ????#删除ID ????hc3?=?IncrementalRecordServer(changeServersID='346449980') ????#?hc3.changeServers() ????hc3.popServers()
以上就是Python实现实时增量数据加载工具的解决方案的详细内容,更多关于Python增量数据加载的资料请关注其它相关文章!
标签:SQLite
相关阅读 >>
微软官方sqlhelper类 数据库辅助操作类 font color=red原创font
php登录验证功能示例【用户名、密码、验证码、数据库、已登陆验证、自动登录和注销登录等】
Sqlitestudio优雅调试android手机数据库Sqlite(推荐)
更多相关阅读请进入《Sqlite》频道 >>

数据库系统概念 第6版
本书主要讲述了数据模型、基于对象的数据库和XML、数据存储和查询、事务管理、体系结构等方面的内容。