当前位置: 首页 > news >正文

基于快照行情的股票/基金 1分钟 K 线合成指南

1. 概述

由于不同交易所不同资产的交易规则是有差异的,导致不同交易所基于快照行情或逐笔成交合成不同资产1分钟 K 线的计算方法是不同的。

本教程旨在提高 DolphinDB 在具体业务场景下的落地效率,降低 DolphinDB 在实际业务使用中的开发难度。

本教程中会学习到:

  • 如何基于历史快照行情数据合成1分钟 K 线
  • 如何基于实时快照行情数据合成1分钟 K 线

本教程适用范围如下:

表1-1 教程支持 K 线合成的资产范围

交易所是否支持
深交所-股票支持
深交所-基金支持
上交所-股票支持
上交所-基金支持
北交所-股票支持

沪深北三大交易所股票和基金资产的交易时间段如下:

  • 早盘集合竞价:09:15:00 - 09:25:00
  • 连续竞价:09:30:00 - 11:30:00
  • 连续竞价:13:00:00 - 14:57:00
  • 午盘集合竞价:14:57:00 - 15:00:00

2. 基于历史快照行情合成 K 线

2.1 快照行情数据特征

交易所 Level-1或 Level-2 快照行情的数据频率约3秒(即每个股票每隔3秒推送一条快照数据),但是并不是3秒等间隔推送。实盘行情下,平均每个股票和基金1天的快照行情数据条数约4000条左右。

图2-1 活跃股票快照行情非3秒等间隔推送

图2-2 不活跃基金快照行情非3秒等间隔推送

图2-3 2023年2月沪深交易所股票和基金快照行情数据量统计

不同的数据供应商、不同的交易所,提供的快照行情字段都会有差异。

本教程针对沪深北交易所的股票和基金快照行情做了统一处理,数据表中的重要字段如下:

表2-1 沪深北交易所的股票和基金快照行情表结构

字段名称数据类型数据说明
TradeTimeTIMESTAMP日期时间
SecurityIDSYMBOL证券代码
OpenPriceDOUBLE开盘价
PreCloPriceDOUBLE昨收价
HighPriceDOUBLE当日最高价
LowPriceDOUBLE当日最低价
LastPriceDOUBLE当日最新价
PreCloseIOPVDOUBLE昨日 IOPV(基金)
IOPVDOUBLEIOPV(基金)
TotalVolumeTradeLONG日累计成交量
TotalValueTradeDOUBLE日累计成交金额
NumTradesLONG日累计成交笔数
UpLimitPxDOUBLE当日涨停价
DownLimitPxDOUBLE当日跌停价
…………快照行情字段太多,只展示一部分

本教程基于沪深北交易所的股票和基金快照行情数据合成的1分钟 K 线的表结构如下:

表2-2 1分钟 K 线表结构

字段名称数据类型数据说明计算规则
SecurityIDSYMBOL证券代码证券代码
TradeTimeTIMESTAMP日期时间早盘集合竞价数据纳入第一根 K 线


第一根 K 线的输出时间是09:30:00,计算窗口为 [09:25:00, 09:31:00)


K 线的输出时间为计算窗口的左边界,计算窗口规则为左闭右开


合成 K 线一共240根,包括11:30:00, 14:57:00和15:00:00,不包括14:58:00和14:59:00
OpenPriceDOUBLE开始价计算窗口内的第一条快照行情的最新价


如果开盘后没有成交,为0


如果盘中计算窗口缺失快照行情,填充上一根 K 线的收盘价
HighPriceDOUBLE最高价计算窗口内的最高价


如果开盘后没有成交,为0


如果盘中计算窗口缺失快照行情,填充上一根 K 线的收盘价
LowPriceDOUBLE最低价计算窗口内的最低价如果开盘后没有成交,为0如果盘中计算窗口缺失快照行情,填充上一根 K 线的收盘价
ClosePriceDOUBLE收盘价计算窗口内的最后一条快照行情的最新价


如果开盘后没有成交,为0


如果盘中计算窗口缺失快照行情,填充上一根 K 线的收盘价
VolumeLONG成交量计算窗口内的所有快照行情成交量求和


如果缺失快照行情,填充0
TurnoverDOUBLE成交金额计算窗口内的所有快照行情成交金额求和

如果缺失快照行情,填充0
TradesCountINT成交笔数计算窗口内的所有快照行情成交笔数求和

如果缺失快照行情,填充0
PreClosePriceDOUBLE昨收价当日快照行情中的昨收价

如果盘中计算窗口缺失快照行情,填充上一根 K 线的昨日收盘价
PreCloseIOPVDOUBLE昨日 IOPV(基金)基金当日快照行情中的昨日 IOPV,深交所基金有该字段,上交所基金没有该字段

如果盘中计算窗口缺失快照行情,填充上一根 K 线的昨日IOPV
IOPVDOUBLEIOPV(基金)基金当日快照行情中的 IOPV,深交所基金没有该字段,上交所基金有该字段

如果盘中计算窗口缺失快照行情,填充上一根 K 线的昨日IOPV
UpLimitPxDOUBLE涨停价当日快照行情中的涨停价,深交所有该字段,上交所没有该字段

如果盘中计算窗口缺失快照行情,填充上一根 K 线的涨停价
DownLimitPxDOUBLE跌停价当日快照行情中的跌停价,深交所有该字段,上交所没有该字段

如果盘中计算窗口缺失快照行情,填充上一根 K 线的跌停价
ChangeRateDOUBLE上涨或下跌幅度当前计算窗口 K 线的收盘价相比上一根 K 线的收盘价的变化幅度

开盘第一根 K 线的涨跌幅等于 [09:25:00, 09:31:00) 计算窗口内最后一笔快照的最新价和第一笔快照的最新价的变化幅度


如果缺失快照行情,填充0

2.2 快照行情合成 K 线的规则

(1)最高价和最低价的处理

因为快照行情是非等时间间隔的数据切片,所以会出现如下情况:

  • 当日最高价(HighPrice)在当前计算窗口内发生变化,但是当前计算窗口内的所有最新价(LastPrice)并不包含发生在当前计算窗口内的当日最高价。

图2-4 最高价计算规则说明

  • 当日最低价(LowPrice)在当前计算窗口内发生变化,但是当前计算窗口内的所有最新价(LastPrice)并不包含发生在当前计算窗口内的当日最低价。

图2-5 最低价计算规则说明

因此,1分钟 K 线的最高价的自定义计算函数如下:

defg high(DeltasHighPrice, HighPrice, LastPrice){if(sum(DeltasHighPrice)>0.000001){return max(HighPrice)}else{return max(LastPrice)}
}

参数说明:

  • DeltasHighPrice:同一个股票或基金两笔相邻快照的当日最高价(HighPrice)的差
  • HighPrice:快照行情的当日最高价
  • LastPrice:快照行情的当日最新价

函数计算逻辑说明:

  • 1分钟 K 线计算窗口内,如果同一个股票或基金两笔相邻快照的当日最高价发生变化,那么1分钟 K 线的最高价等于计算窗口内当日最高价的最大值
  • 1分钟 K 线计算窗口内,如果同一个股票或基金两笔相邻快照的当日最高价没有发生变化,那么1分钟 K 线的最高价等于计算窗口内当日最新价的最大值

因此,1分钟 K 线的最低价的自定义计算函数如下:

defg low(DeltasLowPrice, LowPrice, LastPrice){sumDeltas = sum(DeltasLowPrice)if(sumDeltas<-0.000001 and sumDeltas!=NULL){return min(iif(LowPrice==0.0, NULL, LowPrice))}else{return min(LastPrice)}
}

参数说明:

  • DeltasLowPrice:同一个股票或基金两笔相邻快照的当日最低价(LowPrice)的差
  • LowPrice:快照行情的当日最低价
  • LastPrice:快照行情的当日最新价

函数计算逻辑说明:

  • 1分钟 K 线计算窗口内,如果同一个股票或基金两笔相邻快照的当日最低价发生变化,那么1分钟 K 线的最低价等于计算窗口内当日最低价的最小值
  • 开盘没有成交的股票或基金,推送的快照行情的最新价和最低价都为0,在当日第一个有成交的窗口内,最低价发生变化,此时计算窗口 K 线的最低价等于不为0的快照行情当日最低价的最小值,min(iif(LowPrice==0.0, NULL, LowPrice)) 就是在处理此种特殊情况
  • 1分钟 K 线计算窗口内,如果同一个股票或基金两笔相邻快照的当日最低价没有发生变化,那么1分钟 K 线的最低价等于计算窗口内当日最新价的最小值
  • 性能优化小技巧:第三行代码的 if 判断条件,两次引用 sum(DeltasLowPrice),建议先用变量 sumDeltas = sum(DeltasLowPrice) 进行一次聚合计算。如果在 if 判断条件写成 if(sum(DeltasLowPrice)<-0.000001 and sum(DeltasLowPrice)!=NULL) ,则此处进行了两次聚合计算

图2-6 开盘没有成交的股票或基金的快照行情

(2)成交量、成交额和成交笔数的处理

快照行情中的成交量、成交金额和成交笔数都是日累计求和值,所以按照窗口为1分钟、步长为1分钟的滚动窗口计算前,需要先求出两笔相邻快照的增量。

图2-7 快照行情中的日累计成交量、日累计成交额和成日累计交笔数

可以使用 DolphinDB 内置的 deltas 函数和 context by SQL 语句进行数据预处理,具体处理代码下文会详细介绍。

(3)开盘以后没有成交

部分成交不活跃的股票和基金,09:15:00 开始交易后一直没有成交,但是快照行情会正常推送。

图2-8 开盘没有成交的股票或基金的快照行情

针对开盘以后一直没有成交的计算窗口,本教程处理规则如下:

  • OpenPrice, HighPrice, LowPrice, ClosePrice, Volume, Turnover, TradesCount, ChangeRate 为0
  • PreClosePrice, PreCloseIOPV, IOPV, UpLimitPx, DownLimitPx 为快照行情中对应的值

(4)盘中计算窗口内没有成交

部分成交不活跃的股票和基金,会出现盘中某些计算窗口内完全没有成交,但是快照行情会正常推送。

图2-9 盘中计算窗口内没有成交的快照行情

针对盘中没有成交的计算窗口,本教程处理规则如下:

  • OpenPrice, HighPrice, LowPrice, ClosePrice 等于前一根 K 线的 ClosePrice
  • Volume, Turnover, TradesCount, ChangeRate 为0
  • PreClosePrice, PreCloseIOPV, IOPV, UpLimitPx, DownLimitPx 等于前一根 K 线对应的值

2.3 基于历史快照行情合成 K 线

第一步:部署测试环境

  • 部署 DolphinDB 单节点:单节点部署教程
  • 下载测试数据,并上传到 DolphinDB 部署的 server 目录下: testData.csv
  • 根据部署教程打开节点 web 编程界面,登陆后运行后续步骤测试代码,默认 admin 账户的密码是 123456

图2-10 DolphinDB web 编程界面

第二步:创建数据库和分区表

下述数据库和分区表创建代码适用于沪深两市股票和基金 Level-2 行情快照数据共同存储在同一表中,更多关于数据库和分区表创建的方法可以参考:存储金融数据的分区方案最佳实践。

粘贴下述代码至 web 编程界面,选中需要执行代码点击执行(执行快捷键:Ctrl+E)即可:

//创建数据库
create database "dfs://snapshotDB"
partitioned by VALUE(2020.01.01..2021.01.01), HASH([SYMBOL, 50])
engine='TSDB'
//创建分区表
create table "dfs://snapshotDB"."snapshotTB"(Market SYMBOLTradeTime TIMESTAMPMDStreamID SYMBOLSecurityID SYMBOLSecurityIDSource SYMBOLTradingPhaseCode SYMBOLImageStatus INTPreCloPrice DOUBLENumTrades LONGTotalVolumeTrade LONGTotalValueTrade DOUBLELastPrice DOUBLEOpenPrice DOUBLEHighPrice DOUBLELowPrice DOUBLEClosePrice DOUBLEDifPrice1 DOUBLEDifPrice2 DOUBLEPE1 DOUBLEPE2 DOUBLEPreCloseIOPV DOUBLEIOPV DOUBLETotalBidQty LONGWeightedAvgBidPx DOUBLEAltWAvgBidPri DOUBLETotalOfferQty LONGWeightedAvgOfferPx DOUBLEAltWAvgAskPri DOUBLEUpLimitPx DOUBLEDownLimitPx DOUBLEOpenInt INTOptPremiumRatio DOUBLEOfferPrice DOUBLE[]BidPrice DOUBLE[]OfferOrderQty LONG[]BidOrderQty LONG[]BidNumOrders INT[]OfferNumOrders INT[]ETFBuyNumber INTETFBuyAmount LONGETFBuyMoney DOUBLEETFSellNumber INTETFSellAmount LONGETFSellMoney DOUBLEYieldToMatu DOUBLETotWarExNum DOUBLEWithdrawBuyNumber INTWithdrawBuyAmount LONGWithdrawBuyMoney DOUBLEWithdrawSellNumber INTWithdrawSellAmount LONGWithdrawSellMoney DOUBLETotalBidNumber INTTotalOfferNumber INTMaxBidDur INTMaxSellDur INTBidNum INTSellNum INTLocalTime TIMESeqNo INTOfferOrders LONG[]BidOrders LONG[]
)
partitioned by TradeTime, SecurityID,
sortColumns=[`Market,`SecurityID,`TradeTime],
keepDuplicates=ALL

运行不报错,即执行成功,可以执行下述代码查看分区表的表结构信息:

loadTable("dfs://snapshotDB", "snapshotTB").schema().colDefs

返回:

图2-11 行情快照分区表的部分表结构信息

第三步:导入 csv 测试数据

执行下述代码,导入测试 csv 文本数据。注意,执行下述代码前,必须把测试数据 testData.csv 文件上传到DolphinDB 部署的 server 目录下。也可以把 csv 文件上传到自定义路径,例如 /data/testData.csv,此时需要下述代码的 loadTextEx 导入函数的 filename="/data/testData.csv"

tmp = loadTable("dfs://snapshotDB", "snapshotTB").schema().colDefs
schemaTB = table(tmp.name as name, tmp.typeString as type)
loadTextEx(dbHandle=database("dfs://snapshotDB"), tableName="snapshotTB", partitionColumns=`TradeDate`SecurityID, filename="./testData.csv", schema=schemaTB)

成功导入后可以执行下述代码,查询前10行数据至内存中查看:

data = select top 10 * from loadTable("dfs://snapshotDB", "snapshotTB")

返回:

图2-12 数据预览方法

第四步:自定义最高价最低价计算函数

执行下述代码,在当前会话中声明自定义函数,可以在同一个会话的后续操作中直接引用函数 high 和 low

defg high(DeltasHighPrice, HighPrice, LastPrice){if(sum(DeltasHighPrice)>0.000001){return max(HighPrice)}else{return max(LastPrice)}
}defg low(DeltasLowPrice, LowPrice, LastPrice){sumDeltas = sum(DeltasLowPrice)if(sumDeltas<-0.000001 and sumDeltas!=NULL){return min(iif(LowPrice==0.0, NULL, LowPrice))}else{return min(LastPrice)}
}

第五步:加载少量数据至内存调试

为了方便临时调试代码的正确性,可以先加载少量数据到内存中,方便后续复杂业务代码调试。

执行下述代码,加载1天2个票的数据到内存中,并赋值给表变量 snapshotTB,后续代码中,可以直接引用 snapshotTB 这个表变量调试代码:

snapshotTB =	select	TradeTime, SecurityID, OpenPrice,PreCloPrice, HighPrice, LowPrice,LastPrice, PreCloseIOPV, IOPV,TotalVolumeTrade, TotalValueTrade, NumTrades,UpLimitPx, DownLimitPxfrom loadTable("dfs://snapshotDB", "snapshotTB")where TradeTime.date()=2023.02.01, SecurityID in `888888`999999

第六步:原始行情快照数据处理

执行下述代码,对原始的快照行情进行处理,主要进行了以下几个方面的数据加工:

  • 把09:25:00-09:30:00的数据归入第一根 K 线:
  • 计算同一个股票或者基金的两笔相邻快照的最高价和最低价的变化幅度
  • 计算同一个股票或者基金的两笔相邻快照的成交量、成交金额和成交笔数的增量
tempTB1 =	select	TradeTime.date() as TradeDate,iif(TradeTime.time()<=09:30:00.000, 09:30:00.000, TradeTime.time()) as TradeTime,SecurityID,OpenPrice,PreCloPrice,HighPrice,LowPrice,LastPrice,PreCloseIOPV,IOPV,UpLimitPx,DownLimitPx,iif(deltas(HighPrice)>0.000001, 1, 0) as DeltasHighPrice,iif(abs(deltas(LowPrice))>0.000001, -1, 0) as DeltasLowPrice,iif(deltas(TotalVolumeTrade)==NULL, TotalVolumeTrade, deltas(TotalVolumeTrade)) as DeltasVolume,iif(deltas(TotalValueTrade)==NULL, TotalValueTrade, deltas(TotalValueTrade)) as DeltasTurnover,iif(deltas(NumTrades)==NULL, NumTrades, deltas(NumTrades)) as DeltasTradesCountfrom snapshotTBwhere TradeTime.time()>=09:25:00.000context by SecurityID

第七步:1分钟窗口和1分钟步长的聚合计算

执行下述代码,针对处理后的快照行情做窗口为1分钟、步长为1分钟的滚动窗口计算,重要处理步骤如下:

  • 调用了自定义函数 high 和 low 计算 K 线的最高价和最低价
  • FirstBarChangeRate表示当前1分钟计算窗口内,最后一条最新价不为0的快照相对第一条最新价不为0的快照的变化幅度,用于开盘第一根 K 线的涨跌幅计算
  • 数据降频处理调用了 DolphinDB 内置函数interval,在此步骤时,针对盘中计算窗口缺失快照行情的情况,统一用0填充,后面会针对全部为0的 K 线填充数据做进一步处理
tempTB2 =	select	firstNot(LastPrice, 0) as OpenPrice,high(DeltasHighPrice, HighPrice, LastPrice) as HighPrice,low(DeltasLowPrice, LowPrice, LastPrice) as LowPrice,last(LastPrice) as ClosePrice,sum(DeltasVolume) as Volume,sum(DeltasTurnover) as Turnover,sum(DeltasTradesCount) as TradesCount,last(PreCloPrice) as PreClosePrice,last(PreCloseIOPV) as PreCloseIOPV,last(IOPV) as IOPV,last(UpLimitPx) as UpLimitPx,last(DownLimitPx) as DownLimitPx,lastNot(LastPrice, 0)\firstNot(LastPrice, 0)-1 as FirstBarChangeRate	from tempTB1group by SecurityID, TradeDate, interval(X=TradeTime, duration=60s, label='left', fill=0) as TradeTime

第八步:对齐每日240根 K 线

本教程实例中,合成的1分钟 K 线一共240根:

  • 第一根 K 线的输出时间是09:30:00,计算窗口为 [09:25:00, 09:31:00)
  • 全部输出包括11:30:00, 14:57:00和15:00:00,不包括14:58:00和14:59:00

如果用户需要自定义输出规则,比如需要输出14:58:00和14:59:00这两个特殊时间段的 K 线,可以调整下述代码实现。

codes = select distinct(SecurityID) as SecurityID from tempTB2 order by SecurityID
allTime = table((take(0..120, 121)*60*1000+09:30:00.000).join(take(0..117, 118)*60*1000+13:00:00.000).join(15:00:00.000) as TradeTime)
tempTB3 = cj(codes, allTime)

第九步:缺失快照行情计算窗口填充

执行下述代码,针对对齐每日240根 K 线的结果数据进行如下处理,得到最终结果:

  • 针对盘中没有成交的计算窗口,OpenPrice, HighPrice, LowPrice, ClosePrice 用前一根 K 线的 ClosePrice 填充
  • 针对盘中没有成交的计算窗口,PreClosePrice, PreCloseIOPV, IOPV, UpLimitPx, DownLimitPx 用前一根 K 线对应的值填充
  • 开盘第一根 K 线的涨跌幅等于 [09:25:00, 09:31:00) 计算窗口内最后一笔快照的最新价和第一笔快照的最新价的变化幅度
result = select	SecurityID,concatDateTime(TradeDate, TradeTime) as TradeTime,iif(OpenPrice==0.0 and PreClosePrice==0.0, cumlastNot(ClosePrice, 0.0), OpenPrice) as OpenPrice,iif(HighPrice==0.0 and PreClosePrice==0.0, cumlastNot(ClosePrice, 0.0), HighPrice) as HighPrice,iif(LowPrice==0.0 and PreClosePrice==0.0, cumlastNot(ClosePrice, 0.0), LowPrice) as LowPrice,iif(ClosePrice==0.0 and PreClosePrice==0.0, cumlastNot(ClosePrice, 0.0), ClosePrice) as ClosePrice,Volume,Turnover,TradesCount,iif(PreClosePrice==0.0, cumlastNot(PreClosePrice, 0.0), PreClosePrice) as PreClosePrice,iif(PreCloseIOPV==0.0 and PreClosePrice==0.0, cumlastNot(PreCloseIOPV, 0.0), PreCloseIOPV).nullFill(0.0) as PreCloseIOPV,iif(IOPV==0.0 and PreCloseIOPV==0.0, cumlastNot(IOPV, 0.0), IOPV).nullFill(0.0) as IOPV,iif(UpLimitPx==0.0, cumlastNot(UpLimitPx, 0.0), UpLimitPx).nullFill(0.0) as UpLimitPx,iif(DownLimitPx==0.0, cumlastNot(DownLimitPx, 0.0), DownLimitPx).nullFill(0.0) as DownLimitPx,iif(	time(TradeTime)==09:30:00.000,iif(FirstBarChangeRate!=NULL, FirstBarChangeRate, 0.0),iif(ratios(ClosePrice)!=NULL and ClosePrice!=0.0, ratios(ClosePrice)-1, 0.0)) as ChangeRatefrom lj(tempTB3, tempTB2, `TradeTime`SecurityID)context by SecurityID

返回:

图2-13 1分钟 K 线计算结果

第十步:封装并行计算代码

完成前面九个步骤,已经可以针对少量快照行情数据完成 K 线合成任务,但是针对海量历史行情数据还需进一步封装,才能实现并行计算。

可以先执行下述代码,情况当前会话中的所有临时内存变量:

undef all

然后执行下述代码,定义并行计算相关函数:

defg high(DeltasHighPrice, HighPrice, LastPrice){if(sum(DeltasHighPrice)>0.000001){return max(HighPrice)}else{return max(LastPrice)}
}defg low(DeltasLowPrice, LowPrice, LastPrice){sumDeltas = sum(DeltasLowPrice)if(sumDeltas<-0.000001 and sumDeltas!=NULL){return min(iif(LowPrice==0.0, NULL, LowPrice))}else{return min(LastPrice)}
}def calOHLCBaseOnSnapshotMapFuc(snapshotTB){//Processing the original snapshot market table for calculating OHLCtempTB1 =	select	TradeTime.date() as TradeDate,iif(TradeTime.time()<=09:30:00.000, 09:30:00.000, TradeTime.time()) as TradeTime,SecurityID,OpenPrice,PreCloPrice,HighPrice,LowPrice,LastPrice,PreCloseIOPV,IOPV,UpLimitPx,DownLimitPx,iif(deltas(HighPrice)>0.000001, 1, 0) as DeltasHighPrice,iif(abs(deltas(LowPrice))>0.000001, -1, 0) as DeltasLowPrice,iif(deltas(TotalVolumeTrade)==NULL, TotalVolumeTrade, deltas(TotalVolumeTrade)) as DeltasVolume,iif(deltas(TotalValueTrade)==NULL, TotalValueTrade, deltas(TotalValueTrade)) as DeltasTurnover,iif(deltas(NumTrades)==NULL, NumTrades, deltas(NumTrades)) as DeltasTradesCountfrom snapshotTBwhere TradeTime.time()>=09:25:00.000context by SecurityID//Aggregate Calculating: temporary 1-minute OHLC tabletempTB2 =	select	firstNot(LastPrice, 0.0) as OpenPrice,high(DeltasHighPrice, HighPrice, LastPrice) as HighPrice,low(DeltasLowPrice, LowPrice, LastPrice) as LowPrice,last(LastPrice) as ClosePrice,sum(DeltasVolume) as Volume,sum(DeltasTurnover) as Turnover,sum(DeltasTradesCount) as TradesCount,last(PreCloPrice) as PreClosePrice,last(PreCloseIOPV) as PreCloseIOPV,last(IOPV) as IOPV,last(UpLimitPx) as UpLimitPx,last(DownLimitPx) as DownLimitPx,lastNot(LastPrice, 0.0)\firstNot(LastPrice, 0.0)-1 as FirstBarChangeRate	from tempTB1group by SecurityID, TradeDate, interval(X=TradeTime, duration=60s, label='left', fill=0) as TradeTime//240 bars per daycodes = select distinct(SecurityID) as SecurityID from tempTB2 order by SecurityIDallTime = table((take(0..120, 121)*60*1000+09:30:00.000).join(take(0..117, 118)*60*1000+13:00:00.000).join(15:00:00.000) as TradeTime)tempTB3 = cj(codes, allTime)//Processing missing data calculation window, excluding openingresult = select	SecurityID,concatDateTime(TradeDate, TradeTime) as TradeTime,iif(OpenPrice==0.0 and PreClosePrice==0.0, cumlastNot(ClosePrice, 0.0), OpenPrice) as OpenPrice,iif(HighPrice==0.0 and PreClosePrice==0.0, cumlastNot(ClosePrice, 0.0), HighPrice) as HighPrice,iif(LowPrice==0.0 and PreClosePrice==0.0, cumlastNot(ClosePrice, 0.0), LowPrice) as LowPrice,iif(ClosePrice==0.0 and PreClosePrice==0.0, cumlastNot(ClosePrice, 0.0), ClosePrice) as ClosePrice,Volume,Turnover,TradesCount,iif(PreClosePrice==0.0, cumlastNot(PreClosePrice, 0.0), PreClosePrice) as PreClosePrice,iif(PreCloseIOPV==0.0 and PreClosePrice==0.0, cumlastNot(PreCloseIOPV, 0.0), PreCloseIOPV).nullFill(0.0) as PreCloseIOPV,iif(IOPV==0.0 and PreCloseIOPV==0.0, cumlastNot(IOPV, 0.0), IOPV).nullFill(0.0) as IOPV,iif(UpLimitPx==0.0, cumlastNot(UpLimitPx, 0.0), UpLimitPx).nullFill(0.0) as UpLimitPx,iif(DownLimitPx==0.0, cumlastNot(DownLimitPx, 0.0), DownLimitPx).nullFill(0.0) as DownLimitPx,iif(	time(TradeTime)==09:30:00.000,iif(FirstBarChangeRate!=NULL, FirstBarChangeRate, 0.0),iif(ratios(ClosePrice)!=NULL and ClosePrice!=0.0, ratios(ClosePrice)-1, 0.0)) as ChangeRatefrom lj(tempTB3, tempTB2, `TradeTime`SecurityID)context by SecurityIDreturn result
}def calOHLCBaseOnSnapshot(calStartDate, calEndDate, dbName, tbName){//Generate data source: If SQL only contains the required columns for calculation, it can improve calculation efficiencydataSource = sqlDS(<	select	TradeTime, SecurityID, OpenPrice,PreCloPrice, HighPrice, LowPrice,LastPrice, PreCloseIOPV, IOPV,TotalVolumeTrade, TotalValueTrade, NumTrades,UpLimitPx, DownLimitPxfrom loadTable(dbName, tbName)where TradeTime.date()>=calStartDate, TradeTime.date()<=calEndDate>)result = mr(ds=dataSource, mapFunc=calOHLCBaseOnSnapshotMapFuc, finalFunc=unionAll{,false}, parallel=true)return result
}

最后执行下述代码,针对一天的沪深全市场快照行情并行计算1分钟 K 线:

calStartDate = 2023.02.01
calEndDate = 2023.02.01
dbName = "dfs://snapshotDB"
tbName = "snapshotTB"
oneDayResult = calOHLCBaseOnSnapshot(calStartDate, calEndDate, dbName, tbName)

第十一步:1分钟 K 线数据存储

首次存储数据前,需要执行下述代码创建存储1分钟 K 线所需的数据库和分区表:

def createStockFundOHLCDfsTB(dbName="dfs://stockFundOHLC", tbName="stockFundOHLC"){if(existsDatabase(dbUrl=dbName)){print(dbName + " has been created !")print(tbName + " has been created !")}else{db = database(dbName, VALUE, 2021.01.01..2021.12.31)print(dbName + " created successfully.")colNames = `SecurityID`TradeTime`OpenPrice`HighPrice`LowPrice`ClosePrice`Volume`Turnover`TradesCount`PreClosePrice`PreCloseIOPV`IOPV`UpLimitPx`DownLimitPx`ChangeRatecolTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]schemaTable = table(1:0, colNames, colTypes)db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`TradeTime)print(tbName + " created successfully.")}return loadTable(dbName, tbName).schema().colDefs
}
dbName = "dfs://stockFundOHLC"
tbName = "stockFundOHLC"
createStockFundOHLCDfsTB(dbName, tbName)

执行下述代码将内存中的计算结果存入分区表:

loadTable("dfs://stockFundOHLC", "stockFundOHLC").append!(oneDayResult)

查询前10行数据至内存中查看:

data = select top 10 * from loadTable("dfs://stockFundOHLC", "stockFundOHLC")

返回:

图2-14 1分钟 K 线分区表查询结果

2.4 并行计算性能测试

表2-3 测试环境配置表

配置项信息
OS(操作系统)CentOS Linux 7 (Core)
内核3.10.0-1160.el7.x86_64
CPUIntel(R) Xeon(R) Gold 5220R CPU @ 2.20GHz16 逻辑 CPU 核心
内存256 GB


表2-4 计算性能测试结果表

测试场景原始快照行情数据计算结果行数计算结果大小计算耗时
1天24,285,866 行1,555,440 行172 MB4.3 s
10天238,691,947 行15,563,760 行1.7 GB37.3 s
1个月(20天)474,919,708 行31,158,240 行3.4 GB76.5 s


如果是社区版用户,在进行上述海量数据并行计算的时候,可能会因为内存不足(社区版限制最大内存使用为8 GB)导致计算任务失败,请联系 DolphinDB 官方知乎账号申请试用 license。

3. 基于实时快照行情数据合成 K 线

实时快照行情数据特征和1分钟 K 线合成规则与2.1和2.2章节中的介绍一致。

基于实时快照行情数据合成 K 线的流程图如下:

图3-1 基于实时快照行情数据合成 K 线的流程图

接下来将详细介绍如何在 DolphinDB 中实现1分钟 K 线的实时计算代码开发。关于 DolphinDB 的流数据功能的基础概念本教程中不再展开,可以参考官网教程:流数据。

3.1 定义创建流表所需的函数

执行下述代码,定义创建流数据表所需的函数:

def getMDLSnapshotTB(tableCapacity=1000000){colNames = `Market`TradeTime`MDStreamID`SecurityID`SecurityIDSource`TradingPhaseCode`ImageStatus`PreCloPrice`NumTrades`TotalVolumeTrade`TotalValueTrade`LastPrice`OpenPrice`HighPrice`LowPrice`ClosePrice`DifPrice1`DifPrice2`PE1`PE2`PreCloseIOPV`IOPV`TotalBidQty`WeightedAvgBidPx`AltWAvgBidPri`TotalOfferQty`WeightedAvgOfferPx`AltWAvgAskPri`UpLimitPx`DownLimitPx`OpenInt`OptPremiumRatio`OfferPrice`BidPrice`OfferOrderQty`BidOrderQty`BidNumOrders`OfferNumOrders`ETFBuyNumber`ETFBuyAmount`ETFBuyMoney`ETFSellNumber`ETFSellAmount`ETFSellMoney`YieldToMatu`TotWarExNum`WithdrawBuyNumber`WithdrawBuyAmount`WithdrawBuyMoney`WithdrawSellNumber`WithdrawSellAmount`WithdrawSellMoney`TotalBidNumber`TotalOfferNumber`MaxBidDur`MaxSellDur`BidNum`SellNum`LocalTime`SeqNo`OfferOrders`BidOrderscolTypes = [SYMBOL,TIMESTAMP,SYMBOL,SYMBOL,SYMBOL,SYMBOL,INT,DOUBLE,LONG,LONG,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,DOUBLE,LONG,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE[],DOUBLE[],LONG[],LONG[],INT[],INT[],INT,LONG,DOUBLE,INT,LONG,DOUBLE,DOUBLE,DOUBLE,INT,LONG,DOUBLE,INT,LONG,DOUBLE,INT,INT,INT,INT,INT,INT,TIME,INT,LONG[],LONG[]]return streamTable(tableCapacity:0, colNames, colTypes)
}def getMDLSnapshotProcessTB(tableCapacity=1000000){colNames = `SecurityID`TradeTime`UpLimitPx`DownLimitPx`PreCloPrice`HighPrice`LowPrice`LastPrice`PreCloseIOPV`IOPV`DeltasHighPrice`DeltasLowPrice`DeltasVolume`DeltasTurnover`DeltasTradesCountcolTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, INT]return streamTable(tableCapacity:0, colNames, colTypes)
}def getMDLStockFundOHLCTempTB(tableCapacity=1000000){colNames = `TradeTime`SecurityID`OpenPrice`HighPrice`LowPrice`ClosePrice`Volume`Turnover`TradesCount`PreClosePrice`PreCloseIOPV`IOPV`UpLimitPx`DownLimitPx`FirstBarChangeRatecolTypes = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]return streamTable(tableCapacity:0, colNames, colTypes)
}def getMDLStockFundOHLCTB(tableCapacity=1000000){colNames = `SecurityID`TradeTime`OpenPrice`HighPrice`LowPrice`ClosePrice`Volume`Turnover`TradesCount`PreClosePrice`PreCloseIOPV`IOPV`UpLimitPx`DownLimitPx`ChangeRatecolTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]return streamTable(tableCapacity:0, colNames, colTypes)
}

3.2 定义最高价最低价计算函数

执行下述代码,定义计算最高价最低价所需函数:

defg high(DeltasHighPrice, HighPrice, LastPrice){if(sum(DeltasHighPrice)>0.000001){return max(HighPrice)}else{return max(LastPrice)}
}defg low(DeltasLowPrice, LowPrice, LastPrice){sumDeltas = sum(DeltasLowPrice)if(sumDeltas<-0.000001 and sumDeltas!=NULL){return min(iif(LowPrice==0.0, NULL, LowPrice))}else{return min(LastPrice)}
}

3.3 注册原始行情快照数据处理引擎

第一步:创建相关流数据表

执行下述代码:

//Declare parameters
tableCapacity = 1000000
mdlSnapshotTBName = "mdlSnapshot"
mdlSnapshotProcessTBName = "mdlSnapshotProcess"
//Create MDL snapshot table
share(getMDLSnapshotTB(tableCapacity), mdlSnapshotTBName)
//Create MDL processed snapshot table
share(getMDLSnapshotProcessTB(tableCapacity), mdlSnapshotProcessTBName)

说明:

  • tableCapacity 表示创建流数据表时预先分配的内存空间大小,如果该值小于实际接收的行情数量,则会自动扩容,扩容的时间点会有时延波动。该值也不建议设置太大,设置太大会造成内存浪费。合理的值应该是稍大于当天的全部行情数据量,这个值可以根据过去一段时间的行情数据统计得到。
  • mdlSnapshotTBName 流数据表用于接收实时快照行情,并把增量数据实时推送给原始行情快照数据处理引擎。
  • mdlSnapshotProcessTBName 流数据表用于接收引擎处理的结果数据。

第二步:定义原始行情快照处理引擎规则

执行下述代码:

//Original columns in the snapshot table
colNames = `TradeTime`UpLimitPx`DownLimitPx`PreCloPrice`HighPrice`LowPrice`LastPrice`PreCloseIOPV`IOPV
//Derived columns processed based on the original snapshot table
convert = sqlCol(colNames).append!(sqlColAlias(<iif(deltas(HighPrice)>0.000001, 1, 0)>, `DeltasHighPrice)).append!(sqlColAlias(<iif(abs(deltas(LowPrice))>0.000001, -1, 0)>, `DeltasLowPrice)).append!(sqlColAlias(<iif(deltas(TotalVolumeTrade)==NULL, TotalVolumeTrade, deltas(TotalVolumeTrade))>, `DeltasVolume)).append!(sqlColAlias(<iif(deltas(TotalValueTrade)==NULL, TotalValueTrade, deltas(TotalValueTrade))>, `DeltasTurnover)).append!(sqlColAlias(<iif(deltas(NumTrades)==NULL, NumTrades, deltas(NumTrades))>, `DeltasTradesCount))

可以点击本地变量查看定义的计算规则。

图3-2 自定义行情数据处理规则

原始行情的加工处理包含两部分内容:

  • 保留一部分原始字段的值,如市场时间、涨停价、跌停价等,在 colNames 变量中表示。
  • 对原始字段加工处理,如基于日累计成交量计算相邻两笔快照的成交量增量,在 DolphinDB 中的处理表达式为 <iif(deltas(TotalVolumeTrade)==NULL, TotalVolumeTrade, deltas(TotalVolumeTrade))> ,deltas 函数表示计算两条相邻数据的差,第一条快照的 deltas(TotalVolumeTrade) 计算结果为 NULL,这里用 iif 函数处理这种 if-else 的逻辑,第一条快照数据的成交量增量等于该条快照的 TotalVolumeTrade

第三步:注册原始行情快照处理引擎(流处理引擎1)

执行下述代码:

mdlSnapshotProcessEngineName = "mdlSnapshotProcessEngine"
createReactiveStateEngine(name=mdlSnapshotProcessEngineName,metrics =convert,dummyTable=objByName(mdlSnapshotTBName),outputTable=objByName(mdlSnapshotProcessTBName),keyColumn="SecurityID",filter=<TradeTime.time() between 09:25:00.000:11:31:00.000 or TradeTime.time() between 13:00:00.000:14:57:00.000 or TradeTime.time()>=15:00:00.000>,keepOrder = true)

说明:

  • 此处使用了 DolphinDB 的响应式状态引擎 ReactiveStateEngine ,主要功能是针对输入数据做滑动窗口处理,即对每一条输入引擎的数据按照指定的计算逻辑处理,该引擎可以进行无状态和有状态计算,有状态计算函数都已经过算法优化,比如增量计算、避免重复计算等。
  • 该引擎支持过滤输出,此处指定了只输出以下时间段的处理结果:
    • 09:25:00.000-11:31:00.000
    • 13:00:00.000-14:57:00.000
    • 大于等于15:00:00.000

执行下述代码,可以查看引擎注册情况,正常返回如下内容:

getStreamEngineStat()

图3-3 引擎注册情况

第四步:订阅原始行情流数据表

执行下述代码:

subscribeTable(tableName=mdlSnapshotTBName,actionName=mdlSnapshotProcessEngineName,handler=getStreamEngine(mdlSnapshotProcessEngineName),msgAsTable=true,batchSize=100,throttle=0.002,hash=0,reconnect=true)

说明:

  • 被订阅的流数据表是原始快照行情表
  • 消费者是原始行情快照处理引擎,即原始行情表实时流入的数据会及时发布到引擎,实时完成原始行情的加工

执行下述代码,可以查看订阅情况,正常返回如下内容:

getStreamingStat().pubTables

图3-4 订阅情况

3.4 注册缺失行情填充处理引擎

按照 K 线合成流程图,正常流程应该先注册流处理引擎2,再注册流处理引擎3。但是本教程中,流处理引擎2到流处理引擎3的用到了引擎级联的功能,所以需要先定义流处理引擎3,然后定义流处理引擎2的时候,才可以指定其输出为流处理引擎3的输入。

第一步:创建相关流数据表

执行下述代码:

//Declare parameters
mdlStockFundOHLCTBName = "mdlStockFundOHLC"
//Create MDL 1-minute OHLC table
share(getMDLStockFundOHLCTB(100000), mdlStockFundOHLCTBName)

说明:

  • mdlStockFundOHLCTBName 流数据表用于接收流处理引擎3最终的输出,即1分钟的 K 线数据。它可以被外部应用订阅,如 Python, C++, Java, C# 等。

第二步:注册缺失行情填充处理引擎(流处理引擎3)

执行下述代码:

//Declare parameters
mdlStockFundOHLCEngineName = "mdlStockFundOHLCEngine"
//Define engine calculation methods
convert = <[TradeTime,iif(OpenPrice==0, ClosePrice, OpenPrice).nullFill(0.0),iif(HighPrice==0, ClosePrice, HighPrice).nullFill(0.0),iif(LowPrice==0, ClosePrice, LowPrice).nullFill(0.0),ClosePrice.nullFill(0.0),Volume,Turnover,TradesCount,PreClosePrice,PreCloseIOPV.nullFill(0.0),IOPV.nullFill(0.0),UpLimitPx,DownLimitPx,iif(time(TradeTime)==09:30:00.000, FirstBarChangeRate, iif(ratios(ClosePrice)!=NULL, ratios(ClosePrice)-1, 0)).nullFill(0.0)
]>
//Create ReactiveStateEngine: mdlStockFundOHLCEngineName
createReactiveStateEngine(name=mdlStockFundOHLCEngineName,metrics =convert,dummyTable=getMDLStockFundOHLCTempTB(1),outputTable=objByName(mdlStockFundOHLCTBName),keyColumn="SecurityID",keepOrder = true)

说明:

  • 流处理引擎2会基于处理后的快照行情数据做步长和窗口为1分钟的滚动窗口计算,其正常输出是每个股票或基金每分钟都有一条数据。但是会存在特殊情况,非常不活跃的股票或基金会存在某一分钟内一条行情快照数据都不存在的情况,此时流处理引擎2统一用0进行填充。为了符合 K 线的计算规则,比如计算窗口缺失快照行情的 K 线,OpenPrice, HighPrice, LowPrice, ClosePrice 用前一根 K 线的 ClosePrice 填充,上述引擎就是在处理这些异常情况。
  • 流处理引擎3的输入是流处理引擎2,所以流处理引擎3不需要订阅上游流数据表。

3.5 注册1分钟窗口和1分钟步长的滚动计算引擎

执行下述代码:

mdlStockFundOHLCTempEngineName = "mdlStockFundOHLCTempEngine"
//Define engine calculation methods
barConvert = <[firstNot(LastPrice, 0),high(DeltasHighPrice, HighPrice, LastPrice),low(DeltasLowPrice, LowPrice, LastPrice),lastNot(LastPrice, 0),sum(DeltasVolume),sum(DeltasTurnover),sum(DeltasTradesCount),first(PreCloPrice),first(PreCloseIOPV),lastNot(IOPV, 0),last(UpLimitPx),last(DownLimitPx),lastNot(LastPrice, 0)\firstNot(LastPrice, 0)-1
]>
//Define engine fill methods
fillList = [0, 0, 0, 'ffill', 0, 0, 0, 'ffill', 'ffill', 'ffill', 'ffill', 'ffill', 0]
createDailyTimeSeriesEngine(name=mdlStockFundOHLCTempEngineName,windowSize=60000,step=60000,metrics=barConvert,dummyTable=objByName(mdlSnapshotProcessTBName),outputTable=getStreamEngine(mdlStockFundOHLCEngineName),timeColumn=`TradeTime,keyColumn=`SecurityID,useWindowStartTime=true,forceTriggerTime=1000,fill=fillList,sessionBegin=09:30:00.000 13:00:00.000 15:00:00.000,sessionEnd=11:31:00.000 14:58:00.000 15:01:00.000,mergeSessionEnd=true,forceTriggerSessionEndTime=30000)
//Subscribe to the processed snapshot table, input incremental data into the DailyTimeSeriesEngine of mdlStockFundOHLCTempEngineName
subscribeTable(tableName=mdlSnapshotProcessTBName,actionName=mdlStockFundOHLCTempEngineName,handler=getStreamEngine(mdlStockFundOHLCTempEngineName),msgAsTable=true,batchSize=100,throttle=0.01,hash=0,reconnect=true)

说明:

  • 此处使用了 DolphinDB 的日级时间序列聚合引擎 DailyTimeSeriesEngine,主要用于实时计算场景下滚动窗口和滑动窗口计算。
  • forceTriggerTime=1000 表示整个市场上,任意1个股票或基金的最新快照行情时间 TradeTime 大于等于窗口关闭时间(本教程中是整分时间)1000 毫秒(1秒)时,强制触发不活跃股票和基金的最新1根 K 线的计算窗口关闭并输出 K 线结果。该强制输出的时延用户可以根据实际业务需求设置。

图3-5 forceTriggerTime 参数使用场景说明

  • forceTriggerSessionEndTime=30000 表示机器时间到达每一个 sessionEnd 的时间点后,机器时间再过 30000 毫秒(3秒)时间后,强制关闭没有输出 sessionEnd 的最后一根 K 线的股票或基金。

图3-6 forceTriggerSessionEndTime 参数使用场景说明

3.6 实时合成 K 线存入分区表

本教程的实时 K 线合成流程中,有三张流数据表,数据内容分别是:

  • 原始快照行情数据: mdlSnapshotTBName
  • 对原始快照行情加工后的数据:mdlSnapshotProcessTBName
  • 1分钟 K 线结果数据:mdlStockFundOHLCTBName

流数据表中的数据需要存入分区表才能满足永久保存的需求,下面以1分钟 K 线结果数据为例,详细介绍如何把流数据表中实时增加的数据存入分区表中。

第一步:创建数据库和分区表

数据库和分区表创建代码只需执行一次,创建成功后,分区表会一直存在,不需要反复执行:

def createStockFundOHLCDfsTB(dbName, tbName){if(existsDatabase(dbUrl=dbName)){print(dbName + " has been created !")print(tbName + " has been created !")}else{db = database(dbName, VALUE, 2021.01.01..2021.12.31)print(dbName + " created successfully.")colNames = `SecurityID`TradeTime`OpenPrice`HighPrice`LowPrice`ClosePrice`Volume`Turnover`TradesCount`PreClosePrice`PreCloseIOPV`IOPV`UpLimitPx`DownLimitPx`ChangeRatecolTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]schemaTable = table(1:0, colNames, colTypes)db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`TradeTime)print(tbName + " created successfully.")}return loadTable(dbName, tbName).schema().colDefs
}
dbName = "dfs://stockFundStreamOHLC"
tbName = "stockFundStreamOHLC"
createStockFundOHLCDfsTB(dbName, tbName)

第二步:订阅1分钟 K 线结果表实时存储

执行下述代码:

subscribeTable(tableName=mdlStockFundOHLCTBName,actionName=mdlStockFundOHLCTBName,handler=loadTable("dfs://stockFundStreamOHLC", "stockFundStreamOHLC"),msgAsTable=true,batchSize=5000,throttle=1,hash=0,reconnect=true)

说明:

  • 为了提高实时数据写入分区表的吞吐量,建议 batchSize 可以设置为5000条,throttle可以设置为1秒,起到批量写入的效果。

3.7 实时行情数据接入

完成上述步骤后,把实时快照行情接入至原始行情表 mdlSnapshotTBName,即可触发已经定义好的 K 线计算任务。

实时行情数据可以通过 DolphinDB 行情插件、消息中间件插件或各种语言的 API 接入,可以参考官网对应教程或者联系 DolphinDB 知乎官方账号咨询。

本教程通过 DolphinDB 的 replay 回放功能把数据库中的历史行情回放成流数据,进行调试开发。

执行下述代码,启动回放任务:

replayData =	select *from loadTable("dfs://snapshotDB", "snapshotTB")where TradeTime.date()=2023.02.01order by TradeTime
replay(inputTables=replayData,outputTables=mdlSnapshot,dateColumn=`TradeTime,timeColumn=`TradeTime,replayRate=-1)

回放结束后,可以执行下述代码查看流数据表中的结果数据:

result = select * from mdlStockFundOHLC order by SecurityID

图3-7 流数据表中的1分钟 K 线计算结果

可以执行下述代码查看分区表中的结果数据:

result = select *from loadTable("dfs://stockFundStreamOHLC", "stockFundStreamOHLC")where SecurityID=`666666

图3-8 分区表中的1分钟 K 线计算结果

3.8 Python 客户端订阅

DolphinDB 提供了各种语言的 API 订阅接口,包括 Python, C++, Java, C# 等,具体使用方法可以参考官方 API 相关教程。

本教程以 Python 为例,展示一个简单的第三方客户端消费 DolphinDB 流数据的例子,在 Python 环境执行下述代码:

import dolphindb as ddb# 与 DolphinDB 建立会话和连接
s = ddb.session()
s.connect(host="192.198.1.39", port=8988, userid="admin", password="123456")# 定义 Python 端的回调函数
def handlerTestPython(msg):print(msg)# python 客户端开启 DolphinDB 订阅功能
s.enableStreaming(0)# 订阅
s.subscribe(host="192.198.1.39",port=8988,handler=handlerTestPython,tableName="mdlStockFundOHLC",actionName="testStream",offset=0,batchSize=2,throttle=0.1,msgAsTable=True)

返回:

图3-9 Python 客户端订阅消费结果

说明:

  • 例子中 offset 设置为0,表示从流数据表中的内存中的第一条历史数据开始订阅消费。所以在 Python 启动该订阅时,Python 客户端可以消费到订阅流数据表中的所有记录。
  • 实盘中,offset 一般设置为-1,表示从订阅启动开始,订阅消费该时刻起被订阅流数据表中的增量数据,即最新数据。

3.9 DolphinDB DashBoard 面板配置

DolphinDB 提供了便捷的可视化数据面板配置,具体使用方法可以参考官网教程:数据面板。

基于该教程合成的 K 线数据配置数据面板效果如下:

图3-10 DolphinDB DashBoard 资产 K 线图

3.10 清理环境

实时计算主要依赖 DolphinDB 的流数据功能,包括订阅发布、流数据表和流计算引擎,所以清理相关环境的时候也需要把上述定义内容全部删除掉。

环境清理步骤:

  • 删除流数据表时必须先删除所有相关订阅发布,使用 unsubscribeTable 函数。
  • 删除流数据表使用 dropStreamTable 函数。
  • 删除流计算引擎使用 dropStreamEngine 函数。

可以执行下述代码清理本教程中流数据功能相关环境:

//Declare parameters
mdlSnapshotTBName = "mdlSnapshot"
mdlSnapshotProcessTBName = "mdlSnapshotProcess"
mdlSnapshotProcessEngineName = "mdlSnapshotProcessEngine"
mdlStockFundOHLCTempEngineName = "mdlStockFundOHLCTempEngine"
mdlStockFundOHLCTBName = "mdlStockFundOHLC"
mdlStockFundOHLCEngineName = "mdlStockFundOHLCEngine"
//Cancel related subscriptions
try{unsubscribeTable(tableName=mdlSnapshotTBName, actionName=mdlSnapshotProcessEngineName)} catch(ex){print(ex)}
try{unsubscribeTable(tableName=mdlSnapshotProcessTBName, actionName=mdlStockFundOHLCTempEngineName)} catch(ex){print(ex)}
try{unsubscribeTable(tableName=mdlStockFundOHLCTBName, actionName=mdlStockFundOHLCTBName)} catch(ex){print(ex)}
//Cancel the definition of related stream tables
try{dropStreamTable(mdlSnapshotTBName)} catch(ex){print(ex)}
try{dropStreamTable(mdlSnapshotProcessTBName)} catch(ex){print(ex)}
try{dropStreamTable(mdlStockFundOHLCTBName)} catch(ex){print(ex)}
//Cancel the definition of related stream calculation engines
try{dropStreamEngine(mdlSnapshotProcessEngineName)} catch(ex){print(ex)}
try{dropStreamEngine(mdlStockFundOHLCEngineName)} catch(ex){print(ex)}
try{dropStreamEngine(mdlStockFundOHLCTempEngineName)} catch(ex){print(ex)}

3.11 实时计算性能测试

表3-1 测试环境配置表

配置项信息
OS(操作系统)CentOS Linux 7 (Core)
内核3.10.0-1160.el7.x86_64
CPUIntel(R) Xeon(R) Gold 5220R CPU @ 2.20GHz16 逻辑 CPU 核心
内存256 GB

表3-2 实时计算性能测试结果表

测试场景平均单个股票或基金单次响应计算的时延
实盘全市场股票和基金(6481个代码)<0.50 ms

3.12 流批一体 K 线合成

为了满足产研一体化需求,需要共用一套代码完成基于历史和实时快照行情合成 K 线的计算。

该需求的用户只需根据本章节基于实时快照行情数据合成 K 线的教程,把全量历史数据全速回放,然后把回放所得的 K 线计算结果存入分区表中即可。

实盘实时计算,按照本章节3.7小节所述,接入实时行情数据即可。如有问题,请联系 DolphinDB 官方知乎账号咨询。

4. 总结

本教程详细介绍了如何在 DolphinDB 中基于历史和实时快照行情数据合成沪深北交易所股票和基金的1分钟 K 线,旨在提高 DolphinDB 在具体业务场景下的落地效率。

DolphinDB 在海量数据的批计算性能上表现优异,基于16个 CPU 核心完成1天沪深全市场24,285,866 行原始快照行情的降频 K 线计算只需4.7秒,输出1分钟 K 线数据量为 1,555,440 行。

DolphinDB 在大流量的实时流计算性能上表现优异,可以达到微秒级别。基于2.20GHz 主频的 CPU 实时计算沪深全市场股票和基金的1分钟 K 线的平均单票单次响应计算的时延为 500 微秒。

本教程合成 K 线的规则可能和用户实际场景有差异,用户可以根据本教程提供的源码修改后快速完成项目开发。

5. 常见问题解答(FAQ)

5.1 out of memory 错误导致任务失败

在使用示例教程进行海量数据并行计算的时候,可能会因为内存不足导致计算任务失败,请联系 DolphinDB 官方知乎账号申请测试 license。

5.2 计算结果与所述计算规则不符

本教程是基于2023年某天的全市场股票和基金快照行情开发的,可能会有一些特殊情况并没有考虑周全。用户如果在使用教程代码中发现计算结果与所属计算规则不符的情况,请及时私信 DolphinDB 官方知乎账号反馈。

6. 附录

  • 测试数据:testData.csv
  • 基于历史快照行情合成 K 线的代码:calHistoryOHLC.dos
  • 基于实时行情快照合成 K 线的代码:calStreamOHLC.dos

相关文章:

基于快照行情的股票/基金 1分钟 K 线合成指南

1. 概述 由于不同交易所不同资产的交易规则是有差异的&#xff0c;导致不同交易所基于快照行情或逐笔成交合成不同资产1分钟 K 线的计算方法是不同的。 本教程旨在提高 DolphinDB 在具体业务场景下的落地效率&#xff0c;降低 DolphinDB 在实际业务使用中的开发难度。 本教程…...

新质生产力崛起:精益化能力助力企业转型升级

在智能制造、物联网、大数据、大模型、AI风起云涌的时代背景下&#xff0c;一个崭新的概念——“新质生产力”逐渐进入了人们的视野。这一热词不仅成为今年两会的讨论焦点&#xff0c;更代表了企业、国家乃至社会未来发展的核心动能。那么&#xff0c;什么是新质生产力&#xf…...

开发了一个在线客服系统

开发了一个在线客服系统 作为程序员&#xff0c;我一直在寻找能够提高工作效率和用户体验的方法。最近&#xff0c;我成功开发了一个在线客服系统&#xff0c;这个系统旨在帮助企业更高效地管理客户咨询和服务流程。 技术栈 我选择了以下的技术栈来构建这个系统&#xff1a;…...

cowa新的数据筛选代码

cowa新的数据筛选代码 代码地址&#xff1a; https://git.cowarobot.com/lhb/data_extracting 一阶段筛选 修改配置文件 config/common_stage.yamlversion: 3 services:de:image: harbor.cowarobot.cn/lhb/data:crpilot2.5-torch2.2environment:- CRPILOT_INSTALL_VERSIONx86…...

项目篇 | 图书管理系统 | 管理员模块 | 图书管理 | 删除

项目篇 | 图书管理系统 | 管理员模块 | 图书管理 | 删除 概述 图书管理页通过列表展示所有图书的相关信息,集成了搜索、添加、删除、修改的功能。 函数简介 // admin.h void delBook(); // 删除图书 void openDelBookMessage(); // 打开删除图书弹框 void closeDelBookMessa…...

自己动手封装axios通用方法并上传至私有npm仓库:详细步骤与实现指南

文章目录 一、构建方法1、api/request.js2、api/requestHandler.js3、api/index.js 二、测试方法1、api/axios.js2、main.js3、app.vue4、vue.config.js5、index.html 三、打包1、配置package.json2、生成库包3、配置发布信息4、发布 四、使用1、安装2、使用 五、维护1、维护和…...

【Sql Server】锁表如何解锁,模拟会话事务方式锁定一个表然后进行解锁

大家好&#xff0c;我是全栈小5&#xff0c;欢迎来到《小5讲堂》。 这是《Sql Server》系列文章&#xff0c;每篇文章将以博主理解的角度展开讲解。 温馨提示&#xff1a;博主能力有限&#xff0c;理解水平有限&#xff0c;若有不对之处望指正&#xff01; 目录 前言创建表模拟…...

【大语言模型】轻松本地部署Stable Diffusion

硬件要求&#xff1a; 配备至少8GB VRAM的GPU&#xff0c;如果你的电脑只有CPU&#xff0c;请看到最后。根据部署规模&#xff0c;需要足够的CPU和RAM。 软件要求&#xff1a; Python 3.7或更高版本。支持NVIDIA GPU的PyTorch。Hugging Face的Diffusers库。Hugging Face的Tr…...

【github主页】优化简历

【github主页】优化简历 写在最前面一、新建秘密仓库二、插件卡片配置1、仓库状态统计2、Most used languages&#xff08;GitHub 常用语言统计&#xff09;使用细则 3、Visitor Badge&#xff08;GitHub 访客徽章&#xff09;4、社交统计5、打字特效6、省略展示小猫 &#x1f…...

dnspy逆向和de4dot脱壳

拿到一个软件&#xff0c;使用dnspy查看&#xff0c;发现反汇编后关键部分的函数名和代码有很多乱码&#xff1a; 这样的函数非常多&#xff0c;要想进一步调试和逆向&#xff0c;就只能在dnspy中看反汇编代码了&#xff0c;而无法看到c#代码&#xff0c;当时的整个逆向过程只剩…...

python之flask安装以及使用

1 flask介绍 Flask是一个非常小的Python Web框架&#xff0c;被称为微型框架&#xff1b;只提供了一个稳健的核心&#xff0c;其他功能全部是通过扩展实现的&#xff1b;意思就是我们可以根据项目的需要量身定制&#xff0c;也意味着我们需要学习各种扩展库的使用。 2 python…...

汽车笔记-保险

保险 1.交强险 上路必须买的&#xff0c; 国家规定必须要买。交强险不管你是有责还是无责&#xff0c;它都是可以赔偿的。交强险还有一个18000的垫付功能&#xff0c;比如说我们出了交通事故后&#xff0c;对方住院治疗需要你垫付钱&#xff0c;那么这个时候就可以用到交强险…...

人工智能时代的图像识别:机遇与挑战并存

人工智能&#xff08;AI&#xff09;时代为图像识别领域带来了前所未有的机遇&#xff0c;同时也伴随着一系列挑战。这一领域的发展不仅深刻影响了科技、医疗、教育、娱乐等多个行业&#xff0c;还在一定程度上改变了人们的生活方式。 机遇&#xff1a; 技术突破与创新&#…...

工作 9 年后,回老家当计算机老师的真实感受

北京某程序员发帖&#xff0c;他说自己工作了整整 9 年后&#xff0c;今年六月就告别了北京这个大都市&#xff0c;安安心心地回老家当起了计算机老师。 工作日&#xff0c;每天早上 8 点就得按点上班儿&#xff0c;到了下午 4 点半&#xff0c;下班儿的铃声一响&#xff0c;就…...

二叉树的镜像【c++】

#include <iostream> #include <vector> using namespace std;//双链表节点结构 typedef struct treeNode {int value;struct treeNode* left;struct treeNode* right;treeNode(int x) : value(x), left(nullptr), right(nullptr) {} } TreeNode;void mirrorTree(T…...

记录Python的pandas库详解

如何生成一个pd import pandas as pd df pd.DataFrame([[1,2,3],[4,5,6]],index[A,B],columns[C1,C2,C3])df ---------------------------------------------------------------------------C1 C2 C3 A 1 2 3 B 4 5 6df.T -------------------------------------------------…...

阻碍团队使用工具的原因竟然是……

本文首发于个人网站「BY林子」&#xff0c;转载请参考版权声明。 工具化、自动化、数字化&#xff0c;这些都是逐步改善工作的质量和效率的方式&#xff0c;是时代不断进步的表现。然而&#xff0c;还是有很多软件开发团队的工作还处于手工阶段&#xff0c;这是为什么呢&#x…...

【并发】第九篇 Atomic原子操作类 - 字段更新器类详解

导航 简介AtomicIntegerFieldUpdater简介 Atomic的字段更新器类是Java中一种用于实现线程安全的字段更新操作的类。它提供了一组原子操作,可以对字段进行原子性的更新。在并发环境中,多个线程同时更新一个字段可能会出现竞态条件(Race Condition)导致数据不一致的问题。At…...

FFmpeg: 自实现ijkplayer播放器--03UI界面设计

文章目录 UI设计流程图UI设计界面点击播放功能实现 UI设计流程图 UI设计界面 主界面 控制条 播放列表 画面显示 标题栏 设置界面 提示框 点击播放功能实现 槽函数实现&#xff1a; connect(ui->ctrlBarWind, &CtrlBar::SigPlayOrPause, this, &Main…...

【安装部署】Apache SeaTunnel 和 Web快速安装详解

版本说明 由于作者目前接触当前最新版本为2.3.4 但是官方提供的web版本未1.0.0&#xff0c;不兼容2.3.4&#xff0c;因此这里仍然使用2.3.3版本。 可以自定义兼容处理&#xff0c;官方提供了文档&#xff1a;https://mp.weixin.qq.com/s/Al1VmBoOKu2P02sBOTB6DQ 因为大部分用…...

泰迪智能科技携手洛阳理工学院共建“泰迪·洛阳理工数据智能工作室”

为深化校企合作&#xff0c;实现应用型人才培养目标&#xff0c;4月11日&#xff0c;洛阳理工学院携手广东泰迪智能科技股份有限公司举行“泰迪洛阳理工数据智能工作室”揭牌仪式暨工作室成员动员会在洛阳理工学院举行。洛阳理工学院计算机与信息工程学院院长石念峰、副院长李明…...

jenkins构建微信小程序并展示二维码

测试小程序的过程中&#xff0c;很多都是在回头和前端开发说一句&#xff0c;兄弟帮我打一个测试版本的测试码&#xff0c;开发有时间的情况下还好&#xff0c;就直接协助了&#xff0c;但是很多时候他们只修复了其中几个bug&#xff0c;其他需要修复的bug代码正在编写&#xf…...

阿里云大学考试python中级题目及解析-python中级

阿里云大学考试python中级题目及解析 1.WEB开发中&#xff0c;下列选项中能够实现客户端重定向的设置是&#xff08;&#xff09; A.响应头设置Location状态码200 B.响应头设置Location状态码302 C.响应头设置Accept-Location状态码301 D.响应头设置Accept-Location状态码…...

攻防演练作为红方,怎么绕过Web应用防火墙

在攻防演练中&#xff0c;作为红方尝试绕过Web应用防火墙&#xff08;WAF&#xff09;是一项常见且具有挑战性的任务。这要求你对WAF的工作原理有深入的理解&#xff0c;并且能够创造性地应用各种技术来测试WAF的防御限制。以下是一些更专业且可操作的策略&#xff0c;用于尝试…...

AI音乐,8大变现方式——Suno:音乐版的ChatGPT - 第505篇

悟纤之歌 这是利用AI为自己制作的一首歌&#xff0c;如果你也感兴趣&#xff0c;可以花点时间阅读下本篇文章。 ​ 导读 随着新一代AI音乐创作工具Suno V3、Stable audio2.0、天工SkyMusic的发布&#xff0c;大家玩自创音乐歌曲&#xff0c;玩的不亦乐乎。而有创业头脑的朋友…...

【C++】模拟list

list的模拟真的很震撼&#xff0c;第一次学习时给我幼小的心灵留下了极大地冲击 接下来我们一起看看list模拟究竟是怎样一回事 目录 节点的封装&#xff1a;list类的实现&#xff1a;私有成员变量&#xff1a;构造函数&#xff1a;push_back && pop_back: 迭代器类的实…...

SAP项目任务一览表

根据SAP Activate项目管理方法论的主要精神&#xff0c;浓缩到一些主要的团队和任务。 主要的团队有&#xff1a; 项目管理(办公室)Project Management(office)&#xff1a;项目经理团队&#xff0c;包括项目办公室。负责项目整体运行和监控&#xff0c;项目办公室负责项目的…...

130个学术网站和26个科研工具

我们平时可以见到不少学术资源&#xff0c;但是很多信息里会有一些重叠网站、无效网站&#xff0c;导致我们虽然收藏了很多网址&#xff0c;但是却并不都能用&#xff0c;学妹特地整合了130个学术资源网站和26个科研工具&#xff0c;每一个都是亲自试过有效的&#xff0c;希望能…...

《一键搞定!揭秘微信公众号文章批量下载的终极神器》

大家好&#xff01;今天我要给大家介绍一个超级好用的小工具&#xff0c;能帮你轻松批量下载微信公众号的文章&#xff0c;还不需要安装任何证书哦&#xff01;无论你是学生还是普通爱好者&#xff0c;只要你想保存一些精彩的公众号内容&#xff0c;这个工具都能帮到你。 概览 …...

鸿蒙入门02-首次安装和配置

注&#xff1a;还没有安装编辑器&#xff08; deveco studio &#xff09;的小伙伴请看鸿蒙入门01-下载和安装-CSDN博客 首次安装配置 编辑器&#xff08; deveco studio &#xff09;安装完毕以后需要进入配置界面进行相关配置配置完毕以后才可以正常使用 环境配置&#xf…...

福州做网站的个体户电话查询/百度竞价广告推广

http://codeforces.com/problemset/problem/730/A 题意&#xff1a;有n个人打天梯&#xff0c;想让这n个人的分数相同&#xff0c;每场比赛必须有2-5个人参赛&#xff0c;参赛的人会降低一分&#xff0c;问一个合理方案让所有人的分数相同。 思路&#xff1a;不限制比赛场数&am…...

公司网站建设考核/百度点击软件

例如&#xff1a;准备的英文意思则是Reday修复(Repair)&#xff1a;修复是我们察觉到人际关系谐调破裂时&#xff0c;会采取的例行动作。修复动作一般由一位伙伴提出&#xff0c;而由另一位伙伴接受。英文意思&#xff1a; the act of putting .例如&#xff1a;预备的英辞意思…...

网站图片要多少像素/网站引流推广怎么做

我们今天来讲一个大部分竞赛生都最喜欢的一个数据结构——并查集 我们先从一道题入手&#xff1a;亲戚 这时并查集最经典的例题(注意&#xff01;没有之一&#xff01;) 我们来看一下这道题&#xff0c;大佬们肯定第一思路就是传递闭包(但是我认为没有没学过并查集的大佬) …...

美容培训东莞网站建设/广告投放平台有哪些

镜像翻转 flip()函数&#xff1a; flip函数是矩阵或者图像翻转&#xff0c;其实图像的本质也是矩阵。 void flip(InputArray src, OutputArray dst, int flipCode) 参数声明&#xff1a; src&#xff1a;输入矩阵 dst&#xff1a;翻转后矩阵&#xff0c;类型与src一致 flipCode…...

重庆网站建设培训机构/百度软件应用中心

copyfrom:http://blog.csdn.net/yicomm/article/details/6681404 java应用程序利用Exe4j打包exe文件 今天上课给学生介绍利用Myeclipse打jar包和exe文件&#xff0c;随便就随手写了下文档供大家参考&#xff0c;希望对大家有用。 1. 使用简介: 把java应用程序打成exe文件我…...

wordpress 媒体/网页设计与制作教程

SQL Server 2014 Visual Studio 2015&#xff08;VS&#xff09;遇到的错误以及解决方法作者&#xff1a;张国军_Suger开发工具与关键技术&#xff1a;SQL Server 2014、Visual Studio 2015&#xff08;VS&#xff09;这些错题主要是为了防止自己再错记录一下自己曾经出现的错…...