不同数据库进行同步和增量数据(SQL server 与MySQL数据库为例)
场景
最近在做的一个项目需要将远程服务器的SQL server数据库中表的数据传输到本机的MySQL数据库中,并且远程的SQL server数据库表的数据会实时进行更新,并且差不多是一分钟内传输18条数据,例如现在是2023-12-4 15:09,在15:08这个时间内有18条数据需要首先进SQL server数据库,再更新到MySQL数据库中,这种场景如果每分钟都能将18条数据放入SQL server数据库的话就非常简单了,但是在15:08的时候,这18条数据可能只来11条,剩下的7条可能在15:09或后面的时间陆续过来。我开始的想法是通过最后更新的时间的时间戳来查询新来的数据然后更新到MySQL中,但是由于在最终的时间内还会来前面时间的数据,这样会导致前面时间的数据丢失,所以我想了另外一方法。
- 首先使用python写一个程序来同步SQL sever的历史数据到MySQL数据库中
- 在SQL server中创建一个中间表。
- 在SQL server中要传输的表中创建一个触发器,当这个表更新数据则触发将更新的数据放入到中间表中
- 在python脚本中写一个循环来定期检查中间表,我的SQL server表中由两个主键定义一条数据,所以中间表也是由两个字段定义一条数据,由于入库历史数据的数据量非常大,有几十万条,在这个入库历史数据的时间段内更新了很多条数据,所以可能中间表的数据与入库到MySQL中的字段有重复,所以我需要先验证中间表中的数据MySQL是否存在。
- 存在则删除中间表中这条数据
- 不存在则插入MySQL后删除这条数据
- 最后完成了入库程序,经过验证没有数据丢失
1.历史数据入库
历史数据入库我使用的python写的,首先定义两个数据库的信息
# 使用示例
sql_server_conn_params = {'driver': '{SQL Server}','server': 'ip','database': '数据库名','uid': 'jzyg','pwd': ''
}mysql_conn_params = {'host': 'localhost','user': 'root','password': '123456','database': '数据库名','charset': 'utf8mb4'
}
定义查询语句
querySolar = 'SELECT dtime,stationID,staionName,electric,tiltSolar,levelSolar,scatterSolar,directSolar,tiltSolar_day,levelSolar_day,scatterSolar_day,directSolar_day,sunShine_day FROM realData_Solar'
定义入库历史数据函数
def transfer_wind_data(self):# 连接到 SQL Serverwith pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:with sql_server_conn.cursor() as sql_server_cursor:# 修改查询,仅选择上次同步后的数据modified_query = self.queryWind + " WHERE dtime >= ? ORDER BY dtime"sql_server_cursor.execute(modified_query, (self.wind_last_dtime,))rows = sql_server_cursor.fetchall()if not rows:return # 没有新数据# 连接到 MySQLwith pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as mysql_cursor:data_list = []for row in rows:observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")fsz_id = row[1].replace('"', "").strip()station_name = row[2]farmName = station_name.split("-")[0]# 根据电站名查询电站号sql = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"mysql_cursor.execute(sql, ('%' + farmName + '%',))result = mysql_cursor.fetchone()farm_id = Noneif result is not None:farm_id = result# 处理查询结果为空的情况if farm_id is not None:farm_id = farm_id[0]staion_name = row[2]wind_direction_instant = row[3]wind_speed_instant = row[4]wind_speed_two_min = row[5]wind_speed_ten_min = row[6]data = (observe_time,fsz_id,staion_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min)data_list.append(data)self.wind_last_dtime = row[0].strftime("%Y-%m-%d %H:%M:%S")if data_list:result = mysql_cursor.executemany('INSERT INTO wind_monitor''(observe_time,fsz_id,station_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min) ''VALUES (%s,%s,%s,%s,%s,%s,%s,%s)', data_list) # 根据你的表结构修改mysql_conn.commit()print(f'wind_monitor表插入了{result}行数据.' if result else '没有新数据插入。')
2.创建中间表和触发器
创建中间表
CREATE TABLE intermediateData_Wind AS SELECT * FROM realData_Wind WHERE 1=0;
创建触发器
CREATE TRIGGER CopyToIntermediateTable
ON realData_Wind
AFTER INSERT
AS
BEGIN-- 插入操作INSERT INTO intermediateData_Wind (dtime, stationID, staionName, windDirectionInstant, windSpeedInstant, windSpeed2min, windSpeed10min)SELECT dtime, stationID, staionName, windDirectionInstant, windSpeedInstant, windSpeed2min, windSpeed10minFROM inserted;
END;
3.创建轮询中间表代码
def transfer_insert_intermediateData_Wind(self):# 连接到 SQL Serverwith pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:with sql_server_conn.cursor() as sql_server_cursor:# 查询中间表中所有数据sql_server_cursor.execute(self.queryIntermediateData)intermediate_rows = sql_server_cursor.fetchall()# 用于跟踪删除和插入的数量deleted_count = 0inserted_count = 0# 连接到 MySQLwith pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as mysql_cursor:for row in intermediate_rows:observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")fsz_id = row[1].replace('"', "").strip()# 检查wind_monitor表中是否存在相同数据check_query = "SELECT COUNT(*) FROM wind_monitor WHERE observe_time = %s AND fsz_id = %s"mysql_cursor.execute(check_query, (observe_time, fsz_id))count = mysql_cursor.fetchone()[0]dtime = row[0].strftime("%Y-%m-%d %H:%M:00.000")if count > 0:# 数据存在,从中间表删除delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"sql_server_cursor.execute(delete_query, (dtime, row[1]))sql_server_conn.commit()deleted_count += 1else:# 数据不存在,插入到wind_monitor并从中间表删除station_name = row[2]farmName = station_name.split("-")[0]# 根据电站名查询电站号farm_query = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"mysql_cursor.execute(farm_query, ('%' + farmName + '%',))farm_result = mysql_cursor.fetchone()farm_id = farm_result[0] if farm_result else Nonewind_direction_instant = row[3]wind_speed_instant = row[4]wind_speed_two_min = row[5]wind_speed_ten_min = row[6]insert_data = (observe_time, fsz_id, station_name, farm_id, wind_direction_instant,wind_speed_instant, wind_speed_two_min, wind_speed_ten_min)insert_query = 'INSERT INTO wind_monitor (observe_time, fsz_id, station_name, farm_id, wind_direction_instant, wind_speed_instant, wind_speed_two_min, wind_speed_ten_min) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'mysql_cursor.execute(insert_query, insert_data)mysql_conn.commit()inserted_count += 1delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"sql_server_cursor.execute(delete_query, (dtime, row[1]))sql_server_conn.commit()# 打印删除和插入的数据统计print(f"从intermediateData_Wind表中删除了{deleted_count}条数据.")print(f"向wind_monitor表中插入了{inserted_count}条数据.")
4.总体代码
import threading
import time
import pyodbc
import pymysql
class DataTransfer:def __init__(self, sql_server_conn_params, mysql_conn_params, queryWind,queryIntermediateData, interval=1):self.sql_server_conn_params = sql_server_conn_paramsself.mysql_conn_params = mysql_conn_paramsself.queryWind = queryWindself.queryIntermediateData = queryIntermediateDataself.interval = intervalself.wind_last_dtime = '1970-01-01 00:00:00' # 初始时间def clear_mysql_tables(self):"""清空 MySQL 中的指定表格数据"""try:with pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as cursor:# 清空 wind_monitor 表cursor.execute("TRUNCATE TABLE wind_monitor")mysql_conn.commit()print("已清空 wind_monitor 表的数据。")except Exception as e:print(f"清空表格时发生错误: {e}")def transfer_data(self):self.transfer_wind_data()while True:try:self.transfer_insert_intermediateData_Wind()except Exception as e:print(f"发生错误: {e}")# 等待一定时间再次传输数据time.sleep(self.interval)def transfer_insert_intermediateData_Wind(self):# 连接到 SQL Serverwith pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:with sql_server_conn.cursor() as sql_server_cursor:# 查询中间表中所有数据sql_server_cursor.execute(self.queryIntermediateData)intermediate_rows = sql_server_cursor.fetchall()# 用于跟踪删除和插入的数量deleted_count = 0inserted_count = 0# 连接到 MySQLwith pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as mysql_cursor:for row in intermediate_rows:observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")fsz_id = row[1].replace('"', "").strip()# 检查wind_monitor表中是否存在相同数据check_query = "SELECT COUNT(*) FROM wind_monitor WHERE observe_time = %s AND fsz_id = %s"mysql_cursor.execute(check_query, (observe_time, fsz_id))count = mysql_cursor.fetchone()[0]dtime = row[0].strftime("%Y-%m-%d %H:%M:00.000")if count > 0:# 数据存在,从中间表删除delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"sql_server_cursor.execute(delete_query, (dtime, row[1]))sql_server_conn.commit()deleted_count += 1else:# 数据不存在,插入到wind_monitor并从中间表删除station_name = row[2]farmName = station_name.split("-")[0]# 根据电站名查询电站号farm_query = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"mysql_cursor.execute(farm_query, ('%' + farmName + '%',))farm_result = mysql_cursor.fetchone()farm_id = farm_result[0] if farm_result else Nonewind_direction_instant = row[3]wind_speed_instant = row[4]wind_speed_two_min = row[5]wind_speed_ten_min = row[6]insert_data = (observe_time, fsz_id, station_name, farm_id, wind_direction_instant,wind_speed_instant, wind_speed_two_min, wind_speed_ten_min)insert_query = 'INSERT INTO wind_monitor (observe_time, fsz_id, station_name, farm_id, wind_direction_instant, wind_speed_instant, wind_speed_two_min, wind_speed_ten_min) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)'mysql_cursor.execute(insert_query, insert_data)mysql_conn.commit()inserted_count += 1delete_query = "DELETE FROM intermediateData_Wind WHERE dtime = ? AND stationID = ?"sql_server_cursor.execute(delete_query, (dtime, row[1]))sql_server_conn.commit()# 打印删除和插入的数据统计print(f"从intermediateData_Wind表中删除了{deleted_count}条数据.")print(f"向wind_monitor表中插入了{inserted_count}条数据.")def transfer_wind_data(self):# 连接到 SQL Serverwith pyodbc.connect(**self.sql_server_conn_params) as sql_server_conn:with sql_server_conn.cursor() as sql_server_cursor:# 修改查询,仅选择上次同步后的数据modified_query = self.queryWind + " WHERE dtime >= ? ORDER BY dtime"sql_server_cursor.execute(modified_query, (self.wind_last_dtime,))rows = sql_server_cursor.fetchall()if not rows:return # 没有新数据# 连接到 MySQLwith pymysql.connect(**self.mysql_conn_params) as mysql_conn:with mysql_conn.cursor() as mysql_cursor:data_list = []for row in rows:observe_time = row[0].strftime("%Y-%m-%d %H:%M:00")fsz_id = row[1].replace('"', "").strip()station_name = row[2]farmName = station_name.split("-")[0]# 根据电站名查询电站号sql = "SELECT FARMID FROM com_farm WHERE FARMNAME LIKE %s"mysql_cursor.execute(sql, ('%' + farmName + '%',))result = mysql_cursor.fetchone()farm_id = Noneif result is not None:farm_id = result# 处理查询结果为空的情况if farm_id is not None:farm_id = farm_id[0]staion_name = row[2]wind_direction_instant = row[3]wind_speed_instant = row[4]wind_speed_two_min = row[5]wind_speed_ten_min = row[6]data = (observe_time,fsz_id,staion_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min)data_list.append(data)self.wind_last_dtime = row[0].strftime("%Y-%m-%d %H:%M:%S")if data_list:result = mysql_cursor.executemany('INSERT INTO wind_monitor''(observe_time,fsz_id,station_name,farm_id,wind_direction_instant,wind_speed_instant,wind_speed_two_min,wind_speed_ten_min) ''VALUES (%s,%s,%s,%s,%s,%s,%s,%s)', data_list) # 根据你的表结构修改mysql_conn.commit()print(f'wind_monitor表插入了{result}行数据.' if result else '没有新数据插入。')def start(self):# 在启动线程前先清空表格self.clear_mysql_tables()thread = threading.Thread(target=self.transfer_data)thread.start()sql_server_conn_params = {'driver': '{SQL Server}','server': '','database': '','uid': '','pwd': ''
}mysql_conn_params = {'host': 'localhost','user': 'root','password': '123456','database': '','charset': 'utf8mb4'
}
queryIntermediateData = "SELECT dtime,stationID,staionName,windDirectionInstant,windSpeedInstant,windSpeed2min,windSpeed10min FROM intermediateData_Wind"
queryWind = 'SELECT dtime,stationID,staionName,windDirectionInstant,windSpeedInstant,windSpeed2min,windSpeed10min FROM realData_Wind'
data_transfer = DataTransfer(sql_server_conn_params, mysql_conn_params, queryWind,queryIntermediateData)
data_transfer.start()
相关文章:
不同数据库进行同步和增量数据(SQL server 与MySQL数据库为例)
场景 最近在做的一个项目需要将远程服务器的SQL server数据库中表的数据传输到本机的MySQL数据库中,并且远程的SQL server数据库表的数据会实时进行更新,并且差不多是一分钟内传输18条数据,例如现在是2023-12-4 15:09,在15:08这个…...

国内的几款强大的AI智能—AI语言模型
R5Ai智能助手是一款由百度研发的文心一言,它支持gpt4 / gpt-3.5 / claude,也支持AI绘画,每天提供十次免费使用机会,无需魔法。该智能助手具有以下优点:会画画,没有使用次数限制,可以在界面上找到…...
linux下恶意软件的七种反分析技术
7 类主流的 Linux 恶意软件反分析/检测躲避技术 反调试(Anti-Debug): 软件调试是恶意软件分析的常⽤⼿段之⼀,但恶意软件可以通过识别调试器特征,实现⾃⾝恶意⾏为的隐藏,或导致调试失败,从⽽规避分析与检测…...
Spring Security OAuth2 认证服务器自定义异常处理
目录 前言WebResponseExceptionTranslator自定义异常处理1、自定义我们响应实体类2、定义响应结果枚举类3、自定义异常转换类4、配置自定义异常转换器5、测试 前言 Spring Security OAuth2 认证失败的格式如下 {"error": "unsupported_grant_type","…...
selenium环境安装
一、下载安装python 下载python安装python设置python环境变量安装selenium (1)下载python 您可以从Python官方网站(https://www.python.org/downloads/)下载Python。在页面上,您将看到不同版本的Python供您选择。根…...

(C++)和为s的两个数字--双指针算法
个人主页:Lei宝啊 愿所有美好如期而遇 和为S的两个数字_牛客题霸_牛客网输入一个升序数组 array 和一个数字S,在数组中查找两个数,使得他们的和正好是S,如果。题目来自【牛客题霸】https://www.nowcoder.com/practice/390da4f7a…...
鸿蒙(HarmonyOS)应用开发——构建页面(题目答案)
判断题 1.在Column容器中的子组件默认是按照从上到下的垂直方向布局的,其主轴的方向是垂直方向,在Row容器中的组件默认是按照从左到右的水平方向布局的,其主轴的方向是水平方向。 正确(True) 2.List容器可以沿水平方向排列,也可…...

Python基础快速过一遍
文章目录 一、变量及基本概念1、变量2、变量类型3、变量格式化输出4、type()函数5、input()函数6、类型转换函数7、注释 二、Python运算/字符1、算数运算2、比较运算3、逻辑运算4、赋值运算符5、转义字符6、成员运算符 三、判断/循环语句1、if判断语句2、while循环语句3、for循…...

等保测评报价相差很大,里面有什么门道
等保测评报价的差异主要源于以下几点: 服务质量评估标准不同:不同的测评机构在测评过程中所提供的服务范围、深度、细节等方面可能存在差异,因此导致报价有所不同。一些机构可能提供全面且细致的测评服务,致力于提供高质量的等保测…...

MATLAB的rvctools工具箱熟悉运动学【机械臂机器人示例】
1、rvctools下载安装 rvctools下载地址:rvctools下载 截图如下,点击红色箭头指示的“Download Shared Folder” 即可下载 下载之后进行解压,解压到D:\MATLAB\toolbox这个工具箱目录,这个安装路径根据自己的情况来选择,…...

如何精准操作无人机自动停机坪?
无人机自动停机坪通过自主导航和避障功能,实现了无人机的自主降落和起飞,在无人机技术领域起到了至关重要的作用。停机坪不仅仅是无人机的起降平台,还具备自动换电或充电等功能,为无人机的自动化提供了关键支持。为更有效地操作无…...

【蓝桥杯】带分数
带分数 题目要求用一个ab/c的形式得到一个值,而且只能在1~9里面不重复的组合。 可以对1~9进行全排列,然后不断划分区间。 #include<iostream> #include<vector> using namespace std; int st[15]; int num[15]; int res; int n;int calc(i…...
软件工程 课堂测验 选择填空
系统流程图用图形符号表示系统中各个元素,表达了系统中各个元素之间的 信息流动 喷泉模型是一种以用户需求为动力,以 对象 为驱动的模型。 软件生存周期中最长的是 维护 阶段。 变换流的DFD由三部分组成,不属于其中一部分的是 事务中心 软…...

计算机网络的分类
目录 一、按照传输介质进行分类 1、有线网络 2、无线网络 二、按照使用者进行分类 1、公用网 (public network) 2、专用网(private network) 三、按照网络规模和作用范围进行分类 1、PAN 个人局域网 2、LAN 局域网 3、MAN 城域网 4、 WAN 广域网 5、Internet 因特…...

百度收录批量查询工具,免费SEO优化排名工具
拥有一个在搜索引擎中得到良好收录的网站对于个人和企业都至关重要。而百度,作为中国最大的搜索引擎,其收录情况直接影响着网站的曝光度和流量。 百度搜索引擎是中文用户获取信息的重要途径之一。而在这个竞争激烈的网络环境中,了解自己网站…...

select选择框里填充图片,下拉选项带图片
遇到一个需求,选择下拉框选取图标,填充到框里 1、效果展示 2、代码 <el-form-item label"工种图标" class"Form_icon Form_label"><el-select ref"select" :value"formLabelAlign.icon" placeholder&…...

轨道交通数字孪生可视化平台,助力城市交通运营智慧化
随着经济和科技的快速发展,轨道交通运营管理在日常操作者面临各种挑战。数字孪生技术被认为是未来轨道交通运营管理的重要手段之一。它可以提高轨道交通的运营效率和安全性,助力城市交通运营智慧化。以城市轨道交通运维管理业务需求为导向,从数据感知、融…...

【每日OJ —— 101. 对称二叉树】
每日OJ —— 101. 对称二叉树 1.题目:101. 对称二叉树2.解法2.1.算法讲解2.2.代码实现2.3.提交通过展示 1.题目:101. 对称二叉树 2.解法 2.1.算法讲解 1.该题是判断二叉树是否对称,关键在于,左子树等于右子树,而所给的…...

善网商城上线洁柔产品 公益人专享爱心价官方正品
近日,中国善网慈善商城(以下简称善网商城)系统经升级后重新上线。目前善网商城线上销售的中顺洁柔旗下慈善产品已顺利获得中顺洁柔纸业股份有限公司授权,双方就合作事宜达成共识,并于近日签订线上经营授权书。 &#x…...

禁止谷歌浏览器自动更新
禁止谷歌浏览器自动更新 在使用Python包selenium的时候浏览器版版本发生变化后产生很多问题如: 1、直接版本不对应无法运行 2、版本不一致导致debug启动浏览器超级慢 这里是已谷歌浏览器为代表的。 禁止自动更新的方法如下: 1、WinR调出运行&#x…...

智慧医疗能源事业线深度画像分析(上)
引言 医疗行业作为现代社会的关键基础设施,其能源消耗与环境影响正日益受到关注。随着全球"双碳"目标的推进和可持续发展理念的深入,智慧医疗能源事业线应运而生,致力于通过创新技术与管理方案,重构医疗领域的能源使用模式。这一事业线融合了能源管理、可持续发…...

突破不可导策略的训练难题:零阶优化与强化学习的深度嵌合
强化学习(Reinforcement Learning, RL)是工业领域智能控制的重要方法。它的基本原理是将最优控制问题建模为马尔可夫决策过程,然后使用强化学习的Actor-Critic机制(中文译作“知行互动”机制),逐步迭代求解…...

聊聊 Pulsar:Producer 源码解析
一、前言 Apache Pulsar 是一个企业级的开源分布式消息传递平台,以其高性能、可扩展性和存储计算分离架构在消息队列和流处理领域独树一帜。在 Pulsar 的核心架构中,Producer(生产者) 是连接客户端应用与消息队列的第一步。生产者…...
论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一)
宇树机器人多姿态起立控制强化学习框架论文解析 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(一) 论文解读:交大&港大&上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化…...

GC1808高性能24位立体声音频ADC芯片解析
1. 芯片概述 GC1808是一款24位立体声音频模数转换器(ADC),支持8kHz~96kHz采样率,集成Δ-Σ调制器、数字抗混叠滤波器和高通滤波器,适用于高保真音频采集场景。 2. 核心特性 高精度:24位分辨率,…...

Mysql中select查询语句的执行过程
目录 1、介绍 1.1、组件介绍 1.2、Sql执行顺序 2、执行流程 2.1. 连接与认证 2.2. 查询缓存 2.3. 语法解析(Parser) 2.4、执行sql 1. 预处理(Preprocessor) 2. 查询优化器(Optimizer) 3. 执行器…...
代码随想录刷题day30
1、零钱兑换II 给你一个整数数组 coins 表示不同面额的硬币,另给一个整数 amount 表示总金额。 请你计算并返回可以凑成总金额的硬币组合数。如果任何硬币组合都无法凑出总金额,返回 0 。 假设每一种面额的硬币有无限个。 题目数据保证结果符合 32 位带…...

深度学习水论文:mamba+图像增强
🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...
tomcat入门
1 tomcat 是什么 apache开发的web服务器可以为java web程序提供运行环境tomcat是一款高效,稳定,易于使用的web服务器tomcathttp服务器Servlet服务器 2 tomcat 目录介绍 -bin #存放tomcat的脚本 -conf #存放tomcat的配置文件 ---catalina.policy #to…...
k8s从入门到放弃之HPA控制器
k8s从入门到放弃之HPA控制器 Kubernetes中的Horizontal Pod Autoscaler (HPA)控制器是一种用于自动扩展部署、副本集或复制控制器中Pod数量的机制。它可以根据观察到的CPU利用率(或其他自定义指标)来调整这些对象的规模,从而帮助应用程序在负…...