当前位置: 首页 > news >正文

Flink实时电商数仓(八)

用户域登录各窗口汇总表

  • 主要任务:从kafka页面日志主题读取数据,统计
    • 七日回流用户:之前活跃的用户,有一段时间不活跃了,之后又开始活跃,称为回流用户
    • 当日独立用户数:同一个用户当天重复登录,只算作一个独立用户。

思路分析

  1. 读取kafka页面主题数据
  2. 转换数据结构:String -> JSONObject
  3. 过滤数据,uid不为null
    • 登录的两种情况
      • 用户打开应用后自动登录
      • 用户打印应用后没有登录,浏览后跳转到登录页面
    • 过滤条件:
      • uid不为null且last_page_id is null
      • last_page_id = login
  4. 设置水位线
  5. 按照uid分组
  6. 统计回流用户数和独立用户数
  7. 开窗聚合
  8. 写入doris

具体实现

  1. 设置端口、并行度、消费者组、kafka主题
  2. 读取dwd页面主题数据
    - stream.print()
  3. 对数据进行清洗过滤:uid不为空
    • stream.flatMap()使用flatMap过滤
    • new FlatMapFunction<>(){}在该方法内部转换为JSONObject, 并且获取uid和lastPageId, try-catch这段代码
    • 判断是否满足思路分析中的条件,如果中途发生异常,直接catch后打印到控制台清理掉即可。
  4. 先注册水位线
    • jsonObjStream.assignTimestampAndWatermark
    • new SerializableTimestampAssigner<>, 提取数据中的ts
  5. 按照uid分组
    • stream.keyby()按照uid进行分组
  6. 判断独立用户和回流用户
    • 创建UserLoginBean, 使用状态保存用户的登录信息
    • 在open方法中,getRuntimeContext().getState(new ValueStateDescriptor<>("last_login_dt",String.class))创建状态记录用户上一次的登录时间
    • processElement方法中比较当前登录的日期和状态存储的日期
      • 如果lastLoginDt==null是新用户
      • 如果不为空,判断上次登录时间和当前时间的差值是否大于7天;如果大于7天,说明是回流用户。
      • 如果小于7天,还需要判断上次登录时间是否是今天,如果不是今天,则说明该用户本次是独立用户。
  7. 开窗聚合
    • 使用滚动窗口开窗聚合
    • reduce算子中写聚合逻辑
    • process算子中获取窗口信息
  8. 写入doris
    • 创建doris sink,写出到doris

核心代码

public static void main(String[] args) {new DwsUserUserLoginWindow().start(10024,4,"dws_user_user_login_window", Constant.TOPIC_DWD_TRAFFIC_PAGE);}@Overridepublic void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {//1.读取dwd页面数据//stream.print();//2. 对数据进行清洗过滤SingleOutputStreamOperator<JSONObject> jsonObjStream = etl(stream);//3. 注册水位线SingleOutputStreamOperator<JSONObject> withWatermarkStream = addWatermark(jsonObjStream);//4. 按照uid分组KeyedStream<JSONObject, String> keyedStream = getKeyedStream(withWatermarkStream);//5. 判断独立用户和回流用户SingleOutputStreamOperator<UserLoginBean> processedStream = getUserLoginBeanStream(keyedStream);//processedStream.print();//开窗聚合SingleOutputStreamOperator<UserLoginBean> reducedStream = getReducedStream(processedStream);//reducedStream.print();//写入DorisreducedStream.map(new DorisMapFunction<>()).sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_USER_USER_LOGIN_WINDOW));}

[gitee仓库地址:(https://gitee.com/langpaian/gmall2023-realtime)

相关文章:

Flink实时电商数仓(八)

用户域登录各窗口汇总表 主要任务&#xff1a;从kafka页面日志主题读取数据&#xff0c;统计 七日回流用户&#xff1a;之前活跃的用户&#xff0c;有一段时间不活跃了&#xff0c;之后又开始活跃&#xff0c;称为回流用户当日独立用户数&#xff1a;同一个用户当天重复登录&a…...

Python Pymysql实现数据存储

什么是 PyMySQL&#xff1f; PyMySQL 是在 Python3.x 版本中用于连接 MySQL 服务器的一个库&#xff0c;Python2 中则使用mysqldb。 PyMySQL 遵循 Python 数据库 API v2.0 规范&#xff0c;并包含了 pure-Python MySQL 客户端库。 PyMySQL 安装 在使用 PyMySQL 之前&#xf…...

软件测试/测试开发丨Python 常用第三方库 pymysql

pymysql 概述 Python 的数据库接口标准是 Python DB-APIPyMySQL 是从 Python 连接到 MySQL 数据库服务器的接口PyMySQL 的目标是成为 MySQLdb 的替代品官方文档&#xff1a;pymysql.readthedocs.io/ pymysql 安装 使用 pip 安装使用 Pycharm 界面安装 pip install pymysqlp…...

第二节 linux操作系统安装与配置

一&#xff1a;Vmware虚拟机安装与使用   ①VMware是一个虚拟PC的软件&#xff0c;可以在现有的操作系统上虚拟出一个新的硬件环境&#xff0c;相当于模拟出一台新的PC &#xff0c;以此来实现在一台机器上真正同时运行多个独立的操作系统。   ②VMware主要特点&#xff1a…...

ChatGPT 对SEO的影响

ChatGPT 的兴起是否预示着 SEO 的终结&#xff1f; 一点也不。事实上&#xff0c;如果使用得当&#xff0c;它可以让你的 SEO 工作变得更加容易。 强调“正确使用时”。 你可以使用ChatGPT来帮助进行关键字研究的头脑风暴部分、重新措辞你的内容、生成架构标记等等。 但你不…...

光伏逆变器MPPT的作用、原理及算法

MPPT是逆变器非常核心的技术&#xff0c;MPPT电压在进行光伏电站设计时一项非常关键的参数。 一、什么是MPPT&#xff1f; &#xff08;单块光伏组件的I-V、P-V曲线&#xff09; 上图中&#xff0c;光伏组件的输出电压和电流遵循I-V曲线(绿色)、P-V曲线(蓝色)&#xff0c;如果…...

一文详解pyspark常用算子与API

rdd.glom() 对rdd的数据进行嵌套&#xff0c;嵌套按照分区来进行 rdd sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 2)print(rdd.glom().collect()) 输出&#xff1a;[[1,2,3,4],[5,6,7,8,9]] 参考 PySpark基础入门&#xff08;2&#xff09;&#xff1a;RDD及其常用算子…...

使用Rollup 搭建开发环境

1 什么是Rollup Rollup 是一个用于 JavaScript 的模块打包工具&#xff0c;它将小的代码片段编译成更大、更复杂的代码&#xff0c;例如库或应用程序。它使用 JavaScript 的 ES6 版本中包含的新标准化代码模块格式&#xff0c;而不是以前的 CommonJS 和 AMD 等特殊解决方案。(开…...

ubuntu:beyond compare 4 This license key has been revoked 解决办法

https://www.cnblogs.com/zhibei/p/12095431.html 错误如图所示&#xff1a; 解决办法&#xff1a; &#xff08;1&#xff09;先用find命令找到bcompare所在位置&#xff1a;sudo find /home/ -name *bcompare &#xff08;2&#xff09;进入 /home/whf/.config,删除/bco…...

华为交换机生成树STP配置案例

企业内部网络怎么防止网络出现环路&#xff1f;学会STP生成树技术就可以解决啦。 STP简介 在二层交换网络中&#xff0c;一旦存在环路就会造成报文在环路内不断循环和增生&#xff0c;产生广播风暴&#xff0c;从而占用所有的有效带宽&#xff0c;使网络变得无法正常通信。 在…...

Avalonia框架下实现热更新

在Avalonia框架下实现热更新&#xff08;也称为动态加载或模块化更新&#xff09;&#xff0c;通常涉及程序集的动态加载与卸载&#xff0c;以及UI元素、视图模型或其他应用程序逻辑部分的实时替换。由于Avalonia本身是一个跨平台的GUI框架&#xff0c;并没有直接内置热更新机制…...

适用于各种危险区域的火焰识别摄像机,实时监测、火灾预防、安全监控,为安全保驾护航

火灾是一种极具破坏力的灾难&#xff0c;对人们的生命和财产造成了严重的威胁。为了更好地预防和防范火灾&#xff0c;火焰识别摄像机作为一种先进的监控设备&#xff0c;正逐渐受到人们的重视和应用。本文将介绍火焰识别摄像机在安全监控和火灾预防方面的全面应用方案。 一、火…...

react-router-dom5升级到6

前言 升级前版本为5.1.2 下载与运行 下载 npm install react-router-dom6运行 运行发现报错: 将node_modules删除&#xff0c;重新执行npm i即可 运行发现如下报错 这是因为之前有引用react-router-dom.min&#xff0c;v6中取消了该文件&#xff0c;所以未找到文件导致报错。…...

Linux调试工具—gdb

&#x1f3ac;慕斯主页&#xff1a;修仙—别有洞天 ♈️今日夜电波&#xff1a;HEART BEAT—YOASOBI 2:20━━━━━━️&#x1f49f;──────── 5:35 &#x1f504; ◀️ ⏸ ▶️ ☰ …...

SpringCloud(H版alibaba)框架开发教程之nacos做配置中心——附源码(2)

上篇主要讲了使用eureka&#xff0c;zk&#xff0c;nacos当注册中心 这篇内容是nacos配置中心 代码改动部分mysql驱动更新到8.0&#xff0c;数据库版本升级到了8.0&#xff0c;nacos版本更新到了2.x nacos2.x链接 链接&#xff1a;https://pan.baidu.com/s/11nObzgTjWisAfOp…...

网络摄像头爆破实战

*** 重要说明&#xff1a;仅用于交流网络安全测试技术&#xff0c;并唤起大家对网络安全的重视&#xff0c;如用本文的技术干违法的事情&#xff0c;博主概不负责。*** 文章目录 前言1. 发现摄像头2. 发现端口3. 确定品牌信息4. 确定RTSP地址5. 获取视频流6. 获取密码7. 再次获…...

亚信安慧AntDB数据并行加载工具的实现(二)

3.功能性说明 本节对并行加载工具的部分支持的功能进行简要说明。 1) 支持表类型 并行加载工具支持普通表、分区表。 2) 支持指定导入字段 文件中并不是必须包含表中所有的字段&#xff0c;用户可以指定导入某些字段&#xff0c;但是指定的字段数要和文件中的字段数保持一…...

【Java进阶篇】JDK新版本中的新特性都有哪些

JDK新版本中的新特性都有哪些 ✔️经典解析✔️拓展知识仓✔️本地变量类型推断✔️Switch 表达式✔️Text Blocks✔️Records✔️封装类✔️instanceof 模式匹配✔️switch 模式匹配 ✅✔️虚拟线程 ✔️经典解析 JDK 8中推出了Lambda表达式、Stream、Optional、新的日期API等…...

力扣labuladong一刷day49天迪杰斯特拉

力扣labuladong一刷day49天迪杰斯特拉 文章目录 力扣labuladong一刷day49天迪杰斯特拉一、743. 网络延迟时间二、1631. 最小体力消耗路径三、1514. 概率最大的路径 一、743. 网络延迟时间 题目链接&#xff1a;https://leetcode.cn/problems/network-delay-time/ 使用迪杰斯特…...

MCS接口技术----定时/计数,中断

目录 一.中断系统相关寄存器 1.51单片机中断系统的总体结构&#xff1a; 2.中断源的中断级别&#xff08;由高到低&#xff09;&#xff1a; 3.与中断有关的四个寄存器&#xff1a; &#xff08;1&#xff09;TCON---定时控制寄存器 &#xff08;2&#xff09;IE---中断允…...

Java开发框架和中间件面试题(10)

目录 104.怎么保证缓存和数据库数据的一致性&#xff1f; 105.什么是缓存穿透&#xff0c;什么是缓存雪崩&#xff1f;怎么解决&#xff1f; 106.如何对数据库进行优化&#xff1f; 107.使用索引时有哪些原则&#xff1f; 108.存储过程如何进行优化&#xff1f; 109.说说…...

C++ 具名要求-基本概念-指定该类型对象可以从右值构造

指定该类型对象可以从右值构造 指定该类型的实例可以从一个右值实参构造。 要求 以下情况下&#xff0c;类型 T 满足可移动构造 (MoveConstructible) &#xff1a; 给定 T 类型的右值表达式 rv任意标识符 u 下列表达式必须合法且拥有其指定的效果 表达式后条件T u rv;u…...

Python如何把类当做字典来访问及浅谈Python类命名空间

Python如何把类当做字典来访问 Python把类当做字典来访问 定义一个类将它实例化&#xff0c;我们可以通过obj.属性来访问类的属性&#xff0c;如果想获取类的所有实例变量&#xff0c;我们可以使用obj.__dict__来访问&#xff0c;如下&#xff1a; class A:def __init__(self)…...

简述Redis备份策略以及对应的实现机制

引言 Redis作为高性能的内存数据库&#xff0c;数据的安全性至关重要。一旦数据丢失&#xff0c;可能会对业务造成重大影响。因此&#xff0c;备份Redis数据是每个Redis使用者都必须考虑的问题。本文将介绍Redis的备份策略以及对应的实现机制。 一、备份策略 1.1 定期备份 …...

【5G PHY】5G 物理层加速卡介绍

博主未授权任何人或组织机构转载博主任何原创文章&#xff0c;感谢各位对原创的支持&#xff01; 博主链接 本人就职于国际知名终端厂商&#xff0c;负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作&#xff0c;目前牵头6G算力网络技术标准研究。 博客…...

lftp学习笔记

目录 0. ftp vs. lftp1. 安装2. 常用命令2.1 登录2.2 文件管理2.3 文件传输 3. 脚本编程4. 实践中的问题排查参考 0. ftp vs. lftp lftp是一款文件传输工具&#xff0c;支持FTP、HTTP、SFTP、FISH等多种协议。 功能ftplftp数据传输文件文件、文件夹多线程传输支持断点续传支持…...

idea 插件开发之 HelloWorld

前言 本文使用的 idea 2023.3 版本进行插件入门开发&#xff0c;首先要说明的是 idea 2023 版本及以后的 idea&#xff0c;对插件开发进行了一定程度的变动&#xff1a; 1、创建项目时不再支持 maven 选项 2、必须是 jdk17 及以后版本&#xff08;点击查看官网版本对应关系&…...

极速文件搜索工具Everything结合内网穿透实现远程搜索本地文件

文章目录 前言1.软件安装完成后&#xff0c;打开Everything2.登录cpolar官网 设置空白数据隧道3.将空白数据隧道与本地Everything软件结合起来总结 前言 要搭建一个在线资料库&#xff0c;我们需要两个软件的支持&#xff0c;分别是cpolar&#xff08;用于搭建内网穿透数据隧道…...

【PowerMockito:编写单元测试过程中采用when打桩失效的问题】

问题描述 正如上图所示&#xff0c;采用when打桩了&#xff0c;但是&#xff0c;实际执行的时候还是返回null。 解决方案 打桩时直接用any() 但是这样可能出现一个mybatisplus的异常&#xff0c;所以在测试类中需要加入以下代码片段&#xff1a; Beforepublic void setUp() …...

[蓝桥杯 2018省赛]回家路费

回家路费 题目描述 本题为填空题&#xff0c;只需要算出结果后&#xff0c;在代码中使用输出语句将所填结果输出即可。 小明被不明势力劫持。后莫名其妙被扔到 X 星站再无问津。小明得知每天都有飞船飞往地球&#xff0c;但需要 108108 元的船票&#xff0c;而他却身无分文。…...