Python 算法交易实验73 QTV200第二步: 数据清洗并写入ClickHouse
说明
先检查一下昨天启动的worker是否正常工作,然后做一些简单的清洗,存入clickhouse。
内容
1 检查数据
from Basefuncs import *
# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):some_dt_str1 =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# 测试队列声明
qm = QManager(redis_agent_host = 'http://192.168.0.4:xx/',redis_connection_hash = None,q_max_len= 1000000, batch_size=10000)
qm.info()
target_stream_name = 'xxx'
qm.stream_len(target_stream_name)
2804
获取数据(使用单worker,模式比较简单且性能足够)
data = qm.xrange(target_stream_name)['data']
data_df = pd.DataFrame(data)
keep_cols = ['rec_id', 'data_dt','open', 'close','high','low','vol', 'amt', 'data_source','code','market']
data_df1 = data_df[keep_cols].dropna().drop_duplicates(['rec_id'])# 第一次操作,把之前无关的数据删掉
data_df1 = data_df1[data_df1['data_dt'] >='2024-06-24 00:00:00']
向clickhouse发起query,请求每个etf的最大时间,之后要使得新增的数据大于这个时间,另外目标表的字段形如
这是之前做的设计,因为隔的时间有点久都有点忘了。不过这个设计是合理的,后面会看到。
要做的转换也很简单:
- 1 将时间字符转为时间戳
- 2 从日期中分解出shard、part、block和brick
转换段
import timedata_df1['ts'] = data_df1['data_dt'].apply(inverse_time_str).apply(int)data_df1['brick'] = data_df1['data_dt'].apply(dt_str2ucs_blockname)
data_df1['block'] =data_df1['brick'].apply(lambda x: x[:x.rfind('.')])
data_df1['part'] =data_df1['block'].apply(lambda x: x[:x.rfind('.')])
data_df1['shard'] =data_df1['part'].apply(lambda x: x[:x.rfind('.')])data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
data_df2 =data_df1[keep_cols1]
今天就到这里吧,明晚接着写。
Go on …
昨天疏忽了,数据不应该直接存库,而是应该整理好之后送到队列。然后由默认的worker将数据搬到clickhouse.
2 存数规则
第二步的输入队列BUFF.xxxstream_in
,输出队列BUFF.xxx.stream_out
。
第一次需要确保对应数据表的存在。clickhouse对数值的要求比较严格,为了避免麻烦,统一设置成Float32。(这样可以用统一的同步worker)。另外clickhouse不支持删除数据,这点倒是比较特别。
但可以支持全部删除数据(保留数据结构) TRUNCATE table market_data_v2
create_table_sql = '''
CREATE TABLE market_data_v2
(data_dt String,open Float32,close Float32,high Float32,low Float32,vol Float32,amt Float32,brick String,block String,part String,shard String,code String,ts Float32,pid String
)
ENGINE = MergeTree
ORDER BY (ts )
'''click_para = gb.getx('sp_global.buffer.lan.xxx.xxx.para')
chc = CHClient(**click_para)
chc._exe_sql(create_table_sql)
chc._exe_sql('show tables')
[('market_data',), ('market_data_v2',)]
etl_worker.py
# 0 记录日志
import logging
from logging.handlers import RotatingFileHandlerlogger = logging.getLogger('MyLogger')
handler = RotatingFileHandler('/var/log/workers.log', maxBytes=1024*1024*100, backupCount=5)
logger.addHandler(handler)
logger.setLevel(logging.INFO)# ---------------------------------------- 设置日志from Basefuncs import *
def tuple_list2dict(tuple_list):"""将包含三个元素的tuple列表转换为字典。参数:tuple_list (List[Tuple[K, V1, V2]]): 包含键和两个值的tuple的列表。返回:Dict[K, Tuple[V1, V2]]: 转换后的字典,其中值是包含两个元素的tuple。"""return {key:value1 for key, value1 in tuple_list}# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):some_dt_str1 =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# ---------------------------------------- 基本函数# 测试队列声明
qm = QManager(redis_agent_host = 'http://192.168.0.4:xx/',redis_connection_hash = None,q_max_len= 1000000, batch_size=10000)
qm.info()
source_stream_name ='stream_in'
target_stream_name ='stream_out'
source_stream_len = qm.stream_len(source_stream_name)
target_stream_len = qm.stream_len(target_stream_name)
print('source',source_stream_len)
print('target', target_stream_len)
# qm.ensure_group(target_stream_name)
cur_dt_str = get_time_str1()
if source_stream_len:is_source_recs = True
else:is_source_recs = Falselogger.info('%s %s source No Recs' %(cur_dt_str,'etl_worker'))
# 获取数据(使用单worker,模式比较简单且性能足够)# ---------------------------------------- 队列取数,有数据才执行下面
if is_source_recs:# ---------------------------------------- 取数,取出消息列表和需要的列# worker 30 秒启动一次data = qm.xrange(source_stream_name)['data']data_df = pd.DataFrame(data)msg_id_list = list(data_df['_msg_id'])keep_cols = ['rec_id', 'data_dt','open', 'close','high','low','vol', 'amt', 'data_source','code','market']data_df1 = data_df[keep_cols].dropna().drop_duplicates(['rec_id'])# 第一次操作,把之前无关的数据删掉# data_df1 = data_df1[data_df1['data_dt'] >='2024-06-24 00:00:00']import timedata_df1['ts'] = data_df1['data_dt'].apply(inverse_time_str).apply(int)data_df1['brick'] = data_df1['data_dt'].apply(dt_str2ucs_blockname)data_df1['block'] =data_df1['brick'].apply(lambda x: x[:x.rfind('.')])data_df1['part'] =data_df1['block'].apply(lambda x: x[:x.rfind('.')])data_df1['shard'] =data_df1['part'].apply(lambda x: x[:x.rfind('.')])data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']data_df2 =data_df1[keep_cols1]# ------------------------------------- 获取当前数据库已有的数据# 获取各code最大值click_para = {'database': 'xx','host': '192.168.0.4','name': 'xx','password': 'xx','port': xxx,'user': 'xx'}chc = CHClient(**click_para)'''这个 SQL 语句的作用是按照 `code` 分组,并为每个 `code` 找到对应的最新日期(`data_dt`),这个最新日期是基于 `ts` 字段的最大值来确定的。`argMax` 函数在这里用于找到每个分组中 `ts` 值最大时对应的 `data_dt` 值。具体来说,`argMax(data_dt, ts)` 会返回每个 `code` 分组中使得 `ts` 达到最大值的 `data_dt` 值。这意味着对于每个 `code`,查询会找到 `ts` 字段的最大值,并返回对应的 `data_dt` 值,即每个 `code` 的最新数据日期。最终,这个查询会返回一个结果集,其中包含每个 `code` 以及对应的最新数据日期(`last_data_dt`)。这对于分析每个代码的最新市场数据非常有用。'''latest_sql = '''SELECTcode,argMax(data_dt, ts) AS last_data_dtFROMmarket_data_v2GROUP BYcode'''# 更新时latest_date_tuple_list = chc._exe_sql(latest_sql)latest_date_dict = tuple_list2dict(latest_date_tuple_list)# ------------------------------------- 使用时间进行过滤# 筛选新数据data_df2['existed_dt'] = data_df2['code'].map(latest_date_dict).fillna('')output_sel = data_df2['data_dt'] > data_df2['existed_dt']output_df = data_df2[output_sel][keep_cols1]output_data_listofdict = output_df.to_dict(orient='records')output_data_listofdict2 = slice_list_by_batch2(output_data_listofdict, qm.batch_size)for some_data_listofdict in output_data_listofdict2:qm.parrallel_write_msg(target_stream_name, some_data_listofdict)del_msg = qm.xdel(source_stream_name, msg_id_list)logger.info('%s %s del source %s Recs' %(cur_dt_str,'etl_worker',del_msg['data'] ))
将该脚本发布为任务,30秒执行一次同步。
exe_qtv200_etl_worker.sh
#!/bin/bash# 记录
# sh /home/test_exe.sh com_info_change_pattern running# 有些情况需要把source替换为 .
# . /root/anaconda3/etc/profile.d/conda.sh
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh#conda init
conda activate basecd /home/workers && python3 etl_worker.py
存数成功,后续就自动运行了。
相关文章:
Python 算法交易实验73 QTV200第二步: 数据清洗并写入ClickHouse
说明 先检查一下昨天启动的worker是否正常工作,然后做一些简单的清洗,存入clickhouse。 内容 1 检查数据 from Basefuncs import * # 将一般字符串转为UCS 名称 def dt_str2ucs_blockname(some_dt_str):some_dt_str1 some_dt_str.replace(-,.).re…...
记录:有趣的C#多元运算符 ? : 表达式写法
有时候用 if //...Whatre you she wanna go else if //...do do do else //...and i know something just like this... 感觉代码太多了怎么优雅的、高端的替换? 看个高端的栗子菊: LedCOM["parity"] ledData[4] "N" ? …...
华宽通中标长沙市政务共性能力建设项目,助力智慧政务建设新飞跃
在数字化浪潮的推动下,长沙市政府正积极拥抱智慧城市建设,以科技力量提升政务服务效能。华宽通凭借其卓越的技术实力与丰富的项目经验,成功中标长沙市政务共性能力建设项目,这无疑是对华宽通在智慧城市领域实力的高度认可。 华宽…...
[面试题]计算机网络
[面试题]Java【基础】[面试题]Java【虚拟机】[面试题]Java【并发】[面试题]Java【集合】[面试题]MySQL[面试题]Maven[面试题]Spring Boot[面试题]Spring Cloud[面试题]Spring MVC[面试题]Spring[面试题]MyBatis[面试题]Nginx[面试题]缓存[面试题]Redis[面试题]消息队列[面试题]…...
企业级低代码开发效率变革赋能业务增长
企业级低代码开发已经成为当今软件开发领域的一大趋势,它为企业带来了前所未有的效率变革,从而赋能业务增长。本文将围绕这一主题,深入探讨低代码开发的概念、优势以及如何在企业级应用中实现高效的低代码开发,以助力我国企业实现…...
2024最新总结:1500页金三银四面试宝典 记录35轮大厂面试(都是面试重点)
学习是你这个职业一辈子的事 手里有个 1 2 3,不要想着去怼别人的 4 5 6,因为还有你不知道的 7 8 9。保持空瓶心态从 0 开始才能学到 10 全。 毕竟也是跳槽高峰期,我还是为大家准备了这份1500页金三银四宝典,记录的都是真实大厂面…...
使用Spring Boot和Thymeleaf构建动态Web页面
使用Spring Boot和Thymeleaf构建动态Web页面 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天,我们将探讨如何利用Spring Boot和Thymeleaf构建动…...
扫盲之webSocket
介绍 webSocket 是一种协议,设计用于在客户端和服务器之间提供低延迟、全双工、和长期运行的连接。 全双工:通信的两个参与方可以同时发送和接收数据,不需要等待对方的响应或传输完成 websocket就是为了解决实时通信的问题 建立webSocke…...
一些硬件知识(十二)
1、请说明一下滤波磁珠和滤波电感的区别。 因此磁珠通常用于模数地的连接。 磁珠由导线穿过铁氧体组成,直流电阻很小,在低频时阻抗也很小,对直流信号几乎没有影响。 在高频(几十兆赫兹以上)时磁珠阻抗比较大࿰…...
Adobe Acrobat编辑器最新版下载安装 Adobe Acrobat版本齐全!
功能强大,Adobe Acrobat无疑是PDF文档处理领域的翘楚。这款软件集多种PDF文档处理功能于一身,不仅使得用户可以轻松地编辑PDF文档,更能轻松应对转换和合并等多种需求。 在编辑功能上,Adobe Acrobat的表现尤为出色。无论是添加文字…...
k8s如何使用 HPA 实现自动扩展
使用Horizontal Pod Autoscaler (HPA) 实验目标: 学习如何使用 HPA 实现自动扩展。 实验步骤: 创建一个 Deployment,并设置 CPU 或内存的资源请求。创建一个 HPA,设置扩展策略。生成负载,观察 HPA 如何自动扩展 Pod…...
Hi3861 OpenHarmony嵌入式应用入门--0.96寸液晶屏 iic驱动ssd1306
使用iic驱动ssd1306,代码来源hihope\hispark_pegasus\demo\12_ssd1306 本样例提供了一个HarmonyOS IoT硬件接口的SSD1306 OLED屏驱动库,其功能如下: 内置了128*64 bit的内存缓冲区,支持全屏刷新;优化了屏幕刷新速率,…...
代码随想录训练营第二十二天 77组合
第一题: 原题链接:77. 组合 - 力扣(LeetCode) 思路: 经典的回溯模板题: 终止条件,当中间变量用来存储单个结果的大小等于k,则将中间变量存放到结果数组中。 一个for循环横向遍历…...
Unity踩坑记录
1. 如果同时在父物体和子物体上挂载BoxCollider,那么当使用: private void OnTriggerEnter2D(Collider2D collision){if (collision.CompareTag("CardGroup")){_intersectCardGroups.Add(collision.GetComponent<CardGroup>());}} 来判…...
内容安全复习 1 - 信息内容安全概述
文章目录 信息内容安全简介网络空间信息内容安全大模型 人工智能简介 信息内容安全简介 网络空间 网络空间是融合物理域、信息域、认知域和社会域,控制实体行为的信息活动空间。 上图展示了网络空间安全的结构。可以看到将网络空间划分为了网络域和内容域两个部分。…...
【深度学习】python之人工智能应用篇--跨模态生成技术
跨模态生成技术概述 跨模态生成技术是一种将不同模态的数据(如文本、图像、音频、视频等)进行融合和转换的技术。其目标是通过将一个模态的数据作为输入,生成与之对应的另一个模态的输出。这种技术对于突破单一模态的局限性,提高…...
springboot中获取某个注解下面的某个方法的方法名,参数值等等详细实例
在Spring Boot应用中,获取某个类或方法上的注解及其相关信息,包括方法名称、参数值等,通常涉及到反射和Spring的AOP(面向切面编程)特性。下面是一个示例,展示如何利用Spring AOP的Around注解来拦截带有特定…...
代码随想录——跳跃游戏Ⅱ(Leetcode 45)
题目链接 贪心 class Solution {public int jump(int[] nums) {if(nums.length 1){return 0;}int count 0;// 当前覆盖最远距离下标int curDistance 0;// 下一步覆盖距离最远下标int nextDistance 0;for(int i 0; i < nums.length; i){nextDistance Math.max(nums[…...
从0-1搭建一个web项目(package.json)详解
本章分析package.json文件详解 本文主要对packge.json配置子文件详解 ObJack-Admin一款基于 Vue3.3、TypeScript、Vite3、Pinia、Element-Plus 开源的后台管理框架。在一定程度上节省您的开发效率。另外本项目还封装了一些常用组件、hooks、指令、动态路由、按钮级别权限控制等…...
图解ReentrantLock的基石AQS-独占锁的获取与释放
大家好,我是呼噜噜,我们之前聊过Java中以互斥同步的方式保证线程安全:Sychronized,这次我们来再聊聊另一种互斥同步的方式Lock,本文会介绍ReentrantLock及其它的基石AQS的源码解析,一个非常重要的同步框架 …...
Perl语言入门学习读物
1. PERL 是什么? Perl 最初的设计者为Larry Wall,Perl借取了C、sed、awk、shell scripting以及很多其他程序语言的特性。Perl一般被称为“实用报表提取语言”(PracticalExtraction andReportLanguage),有时也被称做“病态折中垃圾列表器”(Pathologica…...
电脑浏览器问题
网络连接正常,但是浏览器就是打不开网页,显示未连接什么的。 搞了半天,不是代理服务器问题。 也不是端口问题。 也不是软件版本问题。 竟然是浏览器插件的问题,插件禁用,奇迹般的好了。 参考: 电脑有网…...
[Docker] Ubuntu安装Home Assistant
本文主要记载一些Ubuntu安装Home Assistant的细节,方便后面重装。 1. 安装Docker 安装依赖 $ sudo apt-get install \apt-transport-https \ca-certificates \curl \gnupg-agent \software-properties-common添加 Docker 官方 GPG 密钥 $ curl -fsSL https://mirrors.ustc…...
浅谈请求中数据转换
目录 1. 前端 JS 数据类型2. JSON 数据类型(数据传输格式)3. 后端 Java 数据类型4. 后端序列化框架 Fastjson && Jackson 转换4.1. JSON 转换 Java4.2. Java 转换 JSON 1. 前端 JS 数据类型 数据类型示例Stringvar str 张三Number (数字)var a…...
Flutter学习:从搭建环境到运行
一、开发环境的搭建 本文所示内容都是在Windows系统下进行的。 1、下载 Flutter SDK Flutter 官网(https://docs.flutter.cn/release/archive?tabwindows) 或者通过 git clone -b master https://github.com/flutter/flutter.git 下载 2、配置环境…...
sheng的学习笔记-AI-聚类(Clustering)
ai目录 sheng的学习笔记-AI目录-CSDN博客 基础知识 什么是聚类 在“无监督学习”(unsupervised learning)中,训练样本的标记信息是未知的,目标是通过对无标记训练样本的学习来揭示数据的内在性质及规律,为进一步的数据分析提供基础。此类学…...
从0构建一个录制UI测试工具
很多UI自动化测试工具都具备录制UI自动化测试的能力,例如playwright,可以通过playwright vscode插件完成录制,如下图所示,当选择录制脚本时,会打开一个浏览器,在浏览器中输入被测应用url,用户在…...
代码随想录算法训练营第五十一天|LeetCode72 编辑距离、LeetCode647 回文子串、LeetCode516 最长回文子序列、动态规划的小总结
题1: 指路:72. 编辑距离 - 力扣(LeetCode) 思路与代码: 关于dp数组的定义,我们定义一个二维数组dp[i][j],其含义为以i-1为结尾的字符串word1和以j-1为结尾的字符串word2,最近编辑…...
sessionStorage 能在多个标签页之间共享数据吗?
🧑💻 写在开头 点赞 收藏 学会🤣🤣🤣 最近,我的一个朋友在面试中被一个关于 sessionStorage 的问题难住了。我们来聊聊这个话题。 sessionStorage 能在多个标签页之间共享数据吗? 在回答…...
鸿蒙期末项目(完结)
两天仅睡3个小时的努力奋斗之下,终于写完了这个无比拉跨的项目,最后一篇博客总体展示一下本项目运行效果兼测试,随后就是答辩被同学乱沙(悲 刚打开软件,会看到如下欢迎界面,介绍本app的功能和优点 随后我们…...
网站开发工程师职业定位/新网站推广最直接的方法
20150527.C语言—1人已学习 课程介绍 尹成老师带你步入 C 语言的殿堂,讲课生动风趣、深入浅出,全套视频内容充实,整个教程以 C 语言为核心,完整精彩的演练了数据结构、算法、设计模式、数据库、大数据高并发检索、文件重定向、…...
哪里有好看的网站/设计网站官网
很多搞性能测试的人员,只会跟着网上、前辈教导的方法进行测试:挑选业务逻辑中并发量、访问量最高的业务逻辑、结合读写等业务进行测试,然后取整条业务逻辑(模拟用户全流程动作)的逻辑进行测试;结果就是&…...
狐表做网站/网站推广公司大家好
1 class和struct的区别和联系 在c中,class和struct只有一点不同,它们是可以完全替代使用的。唯一的不同在于,class中的成员默认是private的,而struct中默认是public的。 2 指针和引用的不同 2.1 引用在编译后,本质上还…...
php动态网站开发论文/什么是百度竞价推广
转自:https://blog.csdn.net/paincupid/article/details/49924299 经常会接触到VO,DO,DTO的概念,本文从领域建模中的实体划分和项目中的实际应用情况两个角度,对这几个概念进行简析。 得出的主要结论是:在项…...
网站改版后百度不收录/怎么开发网站
入口import moment from moment// 里面的字符可以根据自己的需要进行调整moment.locale(zh-cn, {months: 一月_二月_三月_四月_五月_六月_七月_八月_九月_十月_十一月_十二月.split(_),monthsShort: 1月_2月_3月_4月_5月_6月_7月_8月_9月_10月_11月_12月.split(_),weekdays: 星…...
牡丹江整站优化/2024最火的十大新闻有哪些
这几天把c语言过了一遍,基本上算是入门了,常用语法、函数的使用。c语言是比较古老的语言了,很多系统的底层、工业控制软件都是使用C语言编写,过一遍之后觉得c语言屹立不倒是有原因。c程序员有一句话:使用c语言时间长了…...