当前位置: 首页 > news >正文

Python 算法交易实验73 QTV200第二步: 数据清洗并写入ClickHouse

说明

先检查一下昨天启动的worker是否正常工作,然后做一些简单的清洗,存入clickhouse。

内容

1 检查数据

from Basefuncs import * 
# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):some_dt_str1   =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# 测试队列声明
qm = QManager(redis_agent_host = 'http://192.168.0.4:xx/',redis_connection_hash = None,q_max_len= 1000000, batch_size=10000)
qm.info()
target_stream_name = 'xxx'
qm.stream_len(target_stream_name)
2804

获取数据(使用单worker,模式比较简单且性能足够)

data = qm.xrange(target_stream_name)['data']
data_df = pd.DataFrame(data)
keep_cols = ['rec_id', 'data_dt','open', 'close','high','low','vol', 'amt', 'data_source','code','market']
data_df1 = data_df[keep_cols].dropna().drop_duplicates(['rec_id'])# 第一次操作,把之前无关的数据删掉
data_df1 = data_df1[data_df1['data_dt'] >='2024-06-24 00:00:00']

在这里插入图片描述
向clickhouse发起query,请求每个etf的最大时间,之后要使得新增的数据大于这个时间,另外目标表的字段形如
在这里插入图片描述
这是之前做的设计,因为隔的时间有点久都有点忘了。不过这个设计是合理的,后面会看到。

要做的转换也很简单:

  • 1 将时间字符转为时间戳
  • 2 从日期中分解出shard、part、block和brick

转换段

import timedata_df1['ts'] = data_df1['data_dt'].apply(inverse_time_str).apply(int)data_df1['brick'] = data_df1['data_dt'].apply(dt_str2ucs_blockname)
data_df1['block'] =data_df1['brick'].apply(lambda x: x[:x.rfind('.')])
data_df1['part'] =data_df1['block'].apply(lambda x: x[:x.rfind('.')])
data_df1['shard'] =data_df1['part'].apply(lambda x: x[:x.rfind('.')])data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']
data_df2 =data_df1[keep_cols1]

在这里插入图片描述

今天就到这里吧,明晚接着写。

Go on …

昨天疏忽了,数据不应该直接存库,而是应该整理好之后送到队列。然后由默认的worker将数据搬到clickhouse.

2 存数规则

第二步的输入队列BUFF.xxxstream_in,输出队列BUFF.xxx.stream_out
第一次需要确保对应数据表的存在。clickhouse对数值的要求比较严格,为了避免麻烦,统一设置成Float32。(这样可以用统一的同步worker)。另外clickhouse不支持删除数据,这点倒是比较特别。
在这里插入图片描述
但可以支持全部删除数据(保留数据结构) TRUNCATE table market_data_v2

create_table_sql = '''
CREATE TABLE market_data_v2
(data_dt String,open Float32,close Float32,high Float32,low Float32,vol Float32,amt Float32,brick String,block String,part String,shard String,code String,ts Float32,pid String
)
ENGINE = MergeTree
ORDER BY (ts )
'''click_para = gb.getx('sp_global.buffer.lan.xxx.xxx.para')
chc = CHClient(**click_para)
chc._exe_sql(create_table_sql)
chc._exe_sql('show tables')
[('market_data',), ('market_data_v2',)]

etl_worker.py

# 0 记录日志
import logging
from logging.handlers import RotatingFileHandlerlogger = logging.getLogger('MyLogger')
handler = RotatingFileHandler('/var/log/workers.log', maxBytes=1024*1024*100, backupCount=5)
logger.addHandler(handler)
logger.setLevel(logging.INFO)# ---------------------------------------- 设置日志from Basefuncs import * 
def tuple_list2dict(tuple_list):"""将包含三个元素的tuple列表转换为字典。参数:tuple_list (List[Tuple[K, V1, V2]]): 包含键和两个值的tuple的列表。返回:Dict[K, Tuple[V1, V2]]: 转换后的字典,其中值是包含两个元素的tuple。"""return {key:value1 for key, value1 in tuple_list}# 将一般字符串转为UCS 名称
def dt_str2ucs_blockname(some_dt_str):some_dt_str1   =some_dt_str.replace('-','.').replace(' ','.').replace(':','.')return '.'.join(some_dt_str1.split('.')[:4])
'''
dt_str2ucs_blockname('2024-06-24 09:30:00')
'2024.06.24.09'
'''
# ---------------------------------------- 基本函数# 测试队列声明
qm = QManager(redis_agent_host = 'http://192.168.0.4:xx/',redis_connection_hash = None,q_max_len= 1000000, batch_size=10000)
qm.info()
source_stream_name ='stream_in'
target_stream_name ='stream_out'
source_stream_len =  qm.stream_len(source_stream_name)
target_stream_len = qm.stream_len(target_stream_name)
print('source',source_stream_len)
print('target', target_stream_len)
# qm.ensure_group(target_stream_name)
cur_dt_str = get_time_str1()
if source_stream_len:is_source_recs = True
else:is_source_recs = Falselogger.info('%s %s source No Recs' %(cur_dt_str,'etl_worker'))
# 获取数据(使用单worker,模式比较简单且性能足够)# ---------------------------------------- 队列取数,有数据才执行下面
if is_source_recs:# ---------------------------------------- 取数,取出消息列表和需要的列# worker 30 秒启动一次data = qm.xrange(source_stream_name)['data']data_df = pd.DataFrame(data)msg_id_list = list(data_df['_msg_id'])keep_cols = ['rec_id', 'data_dt','open', 'close','high','low','vol', 'amt', 'data_source','code','market']data_df1 = data_df[keep_cols].dropna().drop_duplicates(['rec_id'])# 第一次操作,把之前无关的数据删掉# data_df1 = data_df1[data_df1['data_dt'] >='2024-06-24 00:00:00']import timedata_df1['ts'] = data_df1['data_dt'].apply(inverse_time_str).apply(int)data_df1['brick'] = data_df1['data_dt'].apply(dt_str2ucs_blockname)data_df1['block'] =data_df1['brick'].apply(lambda x: x[:x.rfind('.')])data_df1['part'] =data_df1['block'].apply(lambda x: x[:x.rfind('.')])data_df1['shard'] =data_df1['part'].apply(lambda x: x[:x.rfind('.')])data_df1['pid'] = data_df1['code'].apply(str) + '_' + data_df1['ts'].apply(str)keep_cols1 = ['data_dt','open','close','high','low', 'vol','amt', 'brick','block','part', 'shard', 'code','ts', 'pid']data_df2 =data_df1[keep_cols1]# ------------------------------------- 获取当前数据库已有的数据# 获取各code最大值click_para = {'database': 'xx','host': '192.168.0.4','name': 'xx','password': 'xx','port': xxx,'user': 'xx'}chc = CHClient(**click_para)'''这个 SQL 语句的作用是按照 `code` 分组,并为每个 `code` 找到对应的最新日期(`data_dt`),这个最新日期是基于 `ts` 字段的最大值来确定的。`argMax` 函数在这里用于找到每个分组中 `ts` 值最大时对应的 `data_dt` 值。具体来说,`argMax(data_dt, ts)` 会返回每个 `code` 分组中使得 `ts` 达到最大值的 `data_dt` 值。这意味着对于每个 `code`,查询会找到 `ts` 字段的最大值,并返回对应的 `data_dt` 值,即每个 `code` 的最新数据日期。最终,这个查询会返回一个结果集,其中包含每个 `code` 以及对应的最新数据日期(`last_data_dt`)。这对于分析每个代码的最新市场数据非常有用。'''latest_sql = '''SELECTcode,argMax(data_dt, ts) AS last_data_dtFROMmarket_data_v2GROUP BYcode'''# 更新时latest_date_tuple_list = chc._exe_sql(latest_sql)latest_date_dict = tuple_list2dict(latest_date_tuple_list)# ------------------------------------- 使用时间进行过滤# 筛选新数据data_df2['existed_dt'] = data_df2['code'].map(latest_date_dict).fillna('')output_sel = data_df2['data_dt'] > data_df2['existed_dt']output_df = data_df2[output_sel][keep_cols1]output_data_listofdict = output_df.to_dict(orient='records')output_data_listofdict2 = slice_list_by_batch2(output_data_listofdict, qm.batch_size)for some_data_listofdict in output_data_listofdict2:qm.parrallel_write_msg(target_stream_name, some_data_listofdict)del_msg = qm.xdel(source_stream_name, msg_id_list)logger.info('%s %s del source %s Recs' %(cur_dt_str,'etl_worker',del_msg['data'] ))

将该脚本发布为任务,30秒执行一次同步。

exe_qtv200_etl_worker.sh

#!/bin/bash# 记录
# sh /home/test_exe.sh com_info_change_pattern running# 有些情况需要把source替换为 .
# . /root/anaconda3/etc/profile.d/conda.sh
# 激活 base 环境(或你创建的特定环境)
source /root/miniconda3/etc/profile.d/conda.sh#conda init
conda activate basecd  /home/workers && python3 etl_worker.py

存数成功,后续就自动运行了。
在这里插入图片描述

相关文章:

Python 算法交易实验73 QTV200第二步: 数据清洗并写入ClickHouse

说明 先检查一下昨天启动的worker是否正常工作,然后做一些简单的清洗,存入clickhouse。 内容 1 检查数据 from Basefuncs import * # 将一般字符串转为UCS 名称 def dt_str2ucs_blockname(some_dt_str):some_dt_str1 some_dt_str.replace(-,.).re…...

记录:有趣的C#多元运算符 ? : 表达式写法

有时候用 if //...Whatre you she wanna go else if //...do do do else //...and i know something just like this... 感觉代码太多了怎么优雅的、高端的替换? 看个高端的栗子菊: LedCOM["parity"] ledData[4] "N" ? …...

华宽通中标长沙市政务共性能力建设项目,助力智慧政务建设新飞跃

在数字化浪潮的推动下,长沙市政府正积极拥抱智慧城市建设,以科技力量提升政务服务效能。华宽通凭借其卓越的技术实力与丰富的项目经验,成功中标长沙市政务共性能力建设项目,这无疑是对华宽通在智慧城市领域实力的高度认可。 华宽…...

[面试题]计算机网络

[面试题]Java【基础】[面试题]Java【虚拟机】[面试题]Java【并发】[面试题]Java【集合】[面试题]MySQL[面试题]Maven[面试题]Spring Boot[面试题]Spring Cloud[面试题]Spring MVC[面试题]Spring[面试题]MyBatis[面试题]Nginx[面试题]缓存[面试题]Redis[面试题]消息队列[面试题]…...

企业级低代码开发效率变革赋能业务增长

企业级低代码开发已经成为当今软件开发领域的一大趋势,它为企业带来了前所未有的效率变革,从而赋能业务增长。本文将围绕这一主题,深入探讨低代码开发的概念、优势以及如何在企业级应用中实现高效的低代码开发,以助力我国企业实现…...

2024最新总结:1500页金三银四面试宝典 记录35轮大厂面试(都是面试重点)

学习是你这个职业一辈子的事 手里有个 1 2 3,不要想着去怼别人的 4 5 6,因为还有你不知道的 7 8 9。保持空瓶心态从 0 开始才能学到 10 全。 毕竟也是跳槽高峰期,我还是为大家准备了这份1500页金三银四宝典,记录的都是真实大厂面…...

使用Spring Boot和Thymeleaf构建动态Web页面

使用Spring Boot和Thymeleaf构建动态Web页面 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!今天,我们将探讨如何利用Spring Boot和Thymeleaf构建动…...

扫盲之webSocket

介绍 webSocket 是一种协议,设计用于在客户端和服务器之间提供低延迟、全双工、和长期运行的连接。 全双工:通信的两个参与方可以同时发送和接收数据,不需要等待对方的响应或传输完成 websocket就是为了解决实时通信的问题 建立webSocke…...

一些硬件知识(十二)

1、请说明一下滤波磁珠和滤波电感的区别。 因此磁珠通常用于模数地的连接。 磁珠由导线穿过铁氧体组成,直流电阻很小,在低频时阻抗也很小,对直流信号几乎没有影响。 在高频(几十兆赫兹以上)时磁珠阻抗比较大&#xff0…...

Adobe Acrobat编辑器最新版下载安装 Adobe Acrobat版本齐全!

功能强大,Adobe Acrobat无疑是PDF文档处理领域的翘楚。这款软件集多种PDF文档处理功能于一身,不仅使得用户可以轻松地编辑PDF文档,更能轻松应对转换和合并等多种需求。 在编辑功能上,Adobe Acrobat的表现尤为出色。无论是添加文字…...

k8s如何使用 HPA 实现自动扩展

使用Horizontal Pod Autoscaler (HPA) 实验目标: 学习如何使用 HPA 实现自动扩展。 实验步骤: 创建一个 Deployment,并设置 CPU 或内存的资源请求。创建一个 HPA,设置扩展策略。生成负载,观察 HPA 如何自动扩展 Pod…...

Hi3861 OpenHarmony嵌入式应用入门--0.96寸液晶屏 iic驱动ssd1306

使用iic驱动ssd1306,代码来源hihope\hispark_pegasus\demo\12_ssd1306 本样例提供了一个HarmonyOS IoT硬件接口的SSD1306 OLED屏驱动库,其功能如下: 内置了128*64 bit的内存缓冲区,支持全屏刷新;优化了屏幕刷新速率,…...

代码随想录训练营第二十二天 77组合

第一题: 原题链接:77. 组合 - 力扣(LeetCode) 思路: 经典的回溯模板题: 终止条件,当中间变量用来存储单个结果的大小等于k,则将中间变量存放到结果数组中。 一个for循环横向遍历…...

Unity踩坑记录

1. 如果同时在父物体和子物体上挂载BoxCollider&#xff0c;那么当使用&#xff1a; private void OnTriggerEnter2D(Collider2D collision){if (collision.CompareTag("CardGroup")){_intersectCardGroups.Add(collision.GetComponent<CardGroup>());}} 来判…...

内容安全复习 1 - 信息内容安全概述

文章目录 信息内容安全简介网络空间信息内容安全大模型 人工智能简介 信息内容安全简介 网络空间 网络空间是融合物理域、信息域、认知域和社会域&#xff0c;控制实体行为的信息活动空间。 上图展示了网络空间安全的结构。可以看到将网络空间划分为了网络域和内容域两个部分。…...

【深度学习】python之人工智能应用篇--跨模态生成技术

跨模态生成技术概述 跨模态生成技术是一种将不同模态的数据&#xff08;如文本、图像、音频、视频等&#xff09;进行融合和转换的技术。其目标是通过将一个模态的数据作为输入&#xff0c;生成与之对应的另一个模态的输出。这种技术对于突破单一模态的局限性&#xff0c;提高…...

springboot中获取某个注解下面的某个方法的方法名,参数值等等详细实例

在Spring Boot应用中&#xff0c;获取某个类或方法上的注解及其相关信息&#xff0c;包括方法名称、参数值等&#xff0c;通常涉及到反射和Spring的AOP&#xff08;面向切面编程&#xff09;特性。下面是一个示例&#xff0c;展示如何利用Spring AOP的Around注解来拦截带有特定…...

代码随想录——跳跃游戏Ⅱ(Leetcode 45)

题目链接 贪心 class Solution {public int jump(int[] nums) {if(nums.length 1){return 0;}int count 0;// 当前覆盖最远距离下标int curDistance 0;// 下一步覆盖距离最远下标int nextDistance 0;for(int i 0; i < nums.length; i){nextDistance Math.max(nums[…...

从0-1搭建一个web项目(package.json)详解

本章分析package.json文件详解 本文主要对packge.json配置子文件详解 ObJack-Admin一款基于 Vue3.3、TypeScript、Vite3、Pinia、Element-Plus 开源的后台管理框架。在一定程度上节省您的开发效率。另外本项目还封装了一些常用组件、hooks、指令、动态路由、按钮级别权限控制等…...

图解ReentrantLock的基石AQS-独占锁的获取与释放

大家好&#xff0c;我是呼噜噜&#xff0c;我们之前聊过Java中以互斥同步的方式保证线程安全&#xff1a;Sychronized&#xff0c;这次我们来再聊聊另一种互斥同步的方式Lock&#xff0c;本文会介绍ReentrantLock及其它的基石AQS的源码解析&#xff0c;一个非常重要的同步框架 …...

Perl语言入门学习读物

1. PERL 是什么? Perl 最初的设计者为Larry Wall&#xff0c;Perl借取了C、sed、awk、shell scripting以及很多其他程序语言的特性。Perl一般被称为“实用报表提取语言”(PracticalExtraction andReportLanguage)&#xff0c;有时也被称做“病态折中垃圾列表器”(Pathologica…...

电脑浏览器问题

网络连接正常&#xff0c;但是浏览器就是打不开网页&#xff0c;显示未连接什么的。 搞了半天&#xff0c;不是代理服务器问题。 也不是端口问题。 也不是软件版本问题。 竟然是浏览器插件的问题&#xff0c;插件禁用&#xff0c;奇迹般的好了。 参考&#xff1a; 电脑有网…...

[Docker] Ubuntu安装Home Assistant

本文主要记载一些Ubuntu安装Home Assistant的细节,方便后面重装。 1. 安装Docker 安装依赖 $ sudo apt-get install \apt-transport-https \ca-certificates \curl \gnupg-agent \software-properties-common添加 Docker 官方 GPG 密钥 $ curl -fsSL https://mirrors.ustc…...

浅谈请求中数据转换

目录 1. 前端 JS 数据类型2. JSON 数据类型&#xff08;数据传输格式&#xff09;3. 后端 Java 数据类型4. 后端序列化框架 Fastjson && Jackson 转换4.1. JSON 转换 Java4.2. Java 转换 JSON 1. 前端 JS 数据类型 数据类型示例Stringvar str 张三Number (数字)var a…...

Flutter学习:从搭建环境到运行

一、开发环境的搭建 本文所示内容都是在Windows系统下进行的。 1、下载 Flutter SDK Flutter 官网&#xff08;https://docs.flutter.cn/release/archive?tabwindows&#xff09; 或者通过 git clone -b master https://github.com/flutter/flutter.git 下载 2、配置环境…...

sheng的学习笔记-AI-聚类(Clustering)

ai目录 sheng的学习笔记-AI目录-CSDN博客 基础知识 什么是聚类 在“无监督学习”(unsupervised learning)中&#xff0c;训练样本的标记信息是未知的&#xff0c;目标是通过对无标记训练样本的学习来揭示数据的内在性质及规律&#xff0c;为进一步的数据分析提供基础。此类学…...

从0构建一个录制UI测试工具

很多UI自动化测试工具都具备录制UI自动化测试的能力&#xff0c;例如playwright&#xff0c;可以通过playwright vscode插件完成录制&#xff0c;如下图所示&#xff0c;当选择录制脚本时&#xff0c;会打开一个浏览器&#xff0c;在浏览器中输入被测应用url&#xff0c;用户在…...

代码随想录算法训练营第五十一天|LeetCode72 编辑距离、LeetCode647 回文子串、LeetCode516 最长回文子序列、动态规划的小总结

题1&#xff1a; 指路&#xff1a;72. 编辑距离 - 力扣&#xff08;LeetCode&#xff09; 思路与代码&#xff1a; 关于dp数组的定义&#xff0c;我们定义一个二维数组dp[i][j]&#xff0c;其含义为以i-1为结尾的字符串word1和以j-1为结尾的字符串word2&#xff0c;最近编辑…...

sessionStorage 能在多个标签页之间共享数据吗?

&#x1f9d1;‍&#x1f4bb; 写在开头 点赞 收藏 学会&#x1f923;&#x1f923;&#x1f923; 最近&#xff0c;我的一个朋友在面试中被一个关于 sessionStorage 的问题难住了。我们来聊聊这个话题。 sessionStorage 能在多个标签页之间共享数据吗&#xff1f; 在回答…...

鸿蒙期末项目(完结)

两天仅睡3个小时的努力奋斗之下&#xff0c;终于写完了这个无比拉跨的项目&#xff0c;最后一篇博客总体展示一下本项目运行效果兼测试&#xff0c;随后就是答辩被同学乱沙&#xff08;悲 刚打开软件&#xff0c;会看到如下欢迎界面&#xff0c;介绍本app的功能和优点 随后我们…...

网站开发工程师职业定位/新网站推广最直接的方法

20150527.C语言—1人已学习 课程介绍 尹成老师带你步入 C 语言的殿堂&#xff0c;讲课生动风趣、深入浅出&#xff0c;全套视频内容充实&#xff0c;整个教程以 C 语言为核心&#xff0c;完整精彩的演练了数据结构、算法、设计模式、数据库、大数据高并发检索、文件重定向、…...

哪里有好看的网站/设计网站官网

很多搞性能测试的人员&#xff0c;只会跟着网上、前辈教导的方法进行测试&#xff1a;挑选业务逻辑中并发量、访问量最高的业务逻辑、结合读写等业务进行测试&#xff0c;然后取整条业务逻辑&#xff08;模拟用户全流程动作&#xff09;的逻辑进行测试&#xff1b;结果就是&…...

狐表做网站/网站推广公司大家好

1 class和struct的区别和联系 在c中&#xff0c;class和struct只有一点不同&#xff0c;它们是可以完全替代使用的。唯一的不同在于&#xff0c;class中的成员默认是private的&#xff0c;而struct中默认是public的。 2 指针和引用的不同 2.1 引用在编译后&#xff0c;本质上还…...

php动态网站开发论文/什么是百度竞价推广

转自&#xff1a;https://blog.csdn.net/paincupid/article/details/49924299 经常会接触到VO&#xff0c;DO&#xff0c;DTO的概念&#xff0c;本文从领域建模中的实体划分和项目中的实际应用情况两个角度&#xff0c;对这几个概念进行简析。 得出的主要结论是&#xff1a;在项…...

网站改版后百度不收录/怎么开发网站

入口import moment from moment// 里面的字符可以根据自己的需要进行调整moment.locale(zh-cn, {months: 一月_二月_三月_四月_五月_六月_七月_八月_九月_十月_十一月_十二月.split(_),monthsShort: 1月_2月_3月_4月_5月_6月_7月_8月_9月_10月_11月_12月.split(_),weekdays: 星…...

牡丹江整站优化/2024最火的十大新闻有哪些

这几天把c语言过了一遍&#xff0c;基本上算是入门了&#xff0c;常用语法、函数的使用。c语言是比较古老的语言了&#xff0c;很多系统的底层、工业控制软件都是使用C语言编写&#xff0c;过一遍之后觉得c语言屹立不倒是有原因。c程序员有一句话&#xff1a;使用c语言时间长了…...