住房建设厅官方网站/郑州模板网站建设
(1)概述:
Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提交事务。事务一旦被提交,该 Channel 从自己的内部缓冲区删除事件。
Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 Sink。
自定义 sink 的接口:https://flume.apache.org/releases/content/1.11.0/FlumeDeveloperGuide.html#sink
MySink 需要继承 AbstractSink 类并实现 Configurable 接口。
实现相应方法:
configure(Context context)//初始化 context(读取配置文件内容)
process()//从 Channel 读取获取数据(event),这个方法将被循环调用。
适用于:读取 Channel 数据写入 MySQL 或者其他文件系统。
(2)需求:
使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。
(3)分析:
步骤:
(1)创建一个 maven 项目,并引入以下pom依赖。
<dependency><groupId>org.apache.flume</groupId><artifactId>flume-ng-core</artifactId><version>1.9.0</version>
</dependency>
(2)自定义MySink ,继承 AbstractSink 类并实现 Configurable 接口,并打包,将jar包放到/opt/module/flume-1.9.0/lib目录下。
package com.study.sink;import org.apache.flume.*;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;public class MySink extends AbstractSink implements Configurable
{//声明前后缀private String perfix;private String subfix;//创建logger对象用于打印到控制台private Logger logger = LoggerFactory.getLogger(MySink.class);@Overridepublic void configure(Context context) {perfix = context.getString("per","per-");subfix = context.getString("sub","-sub");}@Overridepublic Status process() throws EventDeliveryException {//1.获取channel并开启事务Channel channel = getChannel();Transaction transaction = channel.getTransaction();transaction.begin();//2.从channel中抓取数据打印到控制台try {//2.1抓取数据//创建事件Event event;while(true){//防止空数据event = channel.take();if (event != null)break;}//2.2处理事件logger.info(perfix+new String(event.getBody())+subfix);//2.3提交事务transaction.commit();return Status.READY;} catch (Exception e) {e.printStackTrace();//回滚transaction.rollback();return Status.BACKOFF;} finally {transaction.close();}}
}
(3)在/opt/module/flume-1.9.0/job下创建文件夹group6,在该文件夹下创建配置文件netcat-flume-mysink.conf。
# Name the components on this agent
#组件声明
a1.sources = r1
a1.sinks = k1
a1.channels = c1# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port =44444# Describe the sink
a1.sinks.k1.type = com.study.sink.MySink
a1.sinks.per = per
a1.sinks.sub = sub# Use a channel whichbuffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
(4)开启任务
bin/flume-ng agent -c conf/ -n a1 job/group6/netcat-flume-mysink.conf -Dflume.root.logger=INFO,console
(5)开启端口并发送消息
nc localhost 44444
(6)结果
相关文章:

【大数据之Flume】七、Flume进阶之自定义Sink
(1)概述: Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。 Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Chan…...

vue对于时间的处理
2023-08-05 11:25:45 假如这个就是我们要传的时间字符串 比如今天是2023-08-05(同一天):现在把这个时间字符串传入到 formatDate()这个方法,就会给你返回 11:25 比如今天是2023-08-06(前一天&a…...

Apache DolphinScheduler 3.1.8 版本发布,修复 SeaTunnel 相关 Bug
近日,Apache DolphinScheduler 发布了 3.1.8 版本。此版本主要基于 3.1.7 版本进行了 bug 修复,共计修复 16 个 bug, 1 个 doc, 2 个 chore。 其中修复了以下几个较为重要的问题: 修复在构建 SeaTunnel 任务节点的参数时错误的判断条件修复 …...

科技云报道:一波未平一波又起?AI大模型再出邪恶攻击工具
AI大模型的快速向前奔跑,让我们见识到了AI的无限可能,但也展示了AI在虚假信息、深度伪造和网络攻击方面的潜在威胁。 据安全分析平台Netenrich报道,近日,一款名为FraudGPT的AI工具近期在暗网上流通,并被犯罪分子用于编…...

深度对话|如何设计合适的网络经济激励措施
近日,我们与Mysten Labs的首席经济学家Alonso de Gortari进行了对话,讨论了如何在网络运营商和参与者之间找到激励措施的平衡,以及Sui的经济如何不断发展。 是什么让您选择将自己的经济学背景应用于区块链和Web3领域? 起初&…...

opencv带GStreamer之Windows编译
目录 1、下载GStreamer和安装2. GSTReamer CMake配置3. 验证是否配置成功 1、下载GStreamer和安装 下载地址如下: gstreamer-1.0-msvc-x86_64-1.18.2.msi gstreamer-1.0-devel-msvc-x86_64-1.18.2.msi 安装目录无要求,主要是安装完设置环境变量 xxx\1…...

Java并发编程之锁的升级
Java 中的锁机制是多线程编程中的一部分。锁一共有4种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态,这几个状态会随着竞争情况逐渐升级。 锁可以升级但不能降级,意味着偏向锁升级成轻量级锁后不能…...

多核异构处理器A核与M核通信过程
多核异构处理器是指集成了不同类型或架构的CPU的系统级芯片(SoC)。 例如,有些处理器同时包含了高性能的A核(如Cortex-A)和低功耗的M核(如Cortex-M)。 这样的设计可以让不同的CPU负责不同的任务…...

面试热题(反转链表)
给你单链表的头指针 head 和两个整数 left 和 right ,其中 left < right 。请你反转从位置 left 到位置 right 的链表节点,返回 反转后的链表 。 链表的题,大部分都可以用指针或者递归可以做,指针如果做不出来的话,…...

竞赛项目 深度学习的水果识别 opencv python
文章目录 0 前言2 开发简介3 识别原理3.1 传统图像识别原理3.2 深度学习水果识别 4 数据集5 部分关键代码5.1 处理训练集的数据结构5.2 模型网络结构5.3 训练模型 6 识别效果7 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是 🚩 深度学习…...

Java项目部署云windows细节
springboot项目 pom文件中必须要有这个插件(正常其实都有就是我手贱以前不小心删除了) 他的作用是查找主类 <build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-…...

软件功能测试有什么注意事项?功能测试报告起到什么作用?
软件功能测试是软件开发过程中至关重要的一环,它用于评估软件功能的质量和稳定性,并确保软件能够按照预期进行工作。然而,在进行功能测试时,有一些注意事项需要特别关注,以确保测试的准确性和有效性。 一、软件功能测…...

Kubernetes 调度 约束
调度约束 Kubernetes 是通过 List-Watch 的机制进行每个组件的协作,保持数据同步的,每个组件之间的设计实现了解耦。 用户是通过 kubectl 根据配置文件,向 APIServer 发送命令,在 Node 节点上面建立 Pod 和 Container。 APIServer…...

Grafana技术文档-概念-《十分钟扫盲》
Grafana官网链接 Grafana: The open observability platform | Grafana Labs 基本概念 Grafana是一个开源的度量分析和可视化套件,常用于对大量数据进行实时分析和可视化。以下是Grafana的基本概念: 数据源(Data Source)&#…...

【JavaEE进阶】Spring 更简单的读取和存储对象
文章目录 一. 存储Bean对象1. 配置扫描路径2. 添加注解存储 Bean 对象2.1 使用五大类注解存储Bean2.2 为什么要有五大类注解?2.3 有关获取Bean参数的命名规则 3. 使用方法注解储存 Bean 对象3.1 方法注解储存对象的用法3.2 Bean的重命名3.3 同⼀类型多个 Bean 报错 …...

KafKa集群搭建和知识点
一、KafKa概述 1.1 定义 KafKa是一个分布式的基于发布/订阅模式的消息队列,主要应用于大数据试试处理领域 是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统&a…...

剑指 Offer 56 - I. 数组中数字出现的次数题解
题目描述:剑指 Offer 56 - I. 数组中数字出现的次数 - 力扣(LeetCode) 一个整型数组 nums 里除两个数字之外,其他数字都出现了两次。请写程序找出这两个只出现一次的数字。要求时间复杂度是O(n),空间复杂度是O(1)。 示…...

CSDN付费专栏写作协议
一、总则 1.1、欢迎您选用CSDN付费专栏服务(“本服务”)。以下所述条款和条件即构成您与CSDN就使用本服务所达成的协议(“本协议)。本协议被视为《CSDN用户服务条款》(链接:https://passport.csdn.net/ser…...

[保研/考研机试] KY30 进制转换-大整数转二进制 清华大学复试上机题 C++实现
描述 将一个长度最多为30位数字的十进制非负整数转换为二进制数输出。 输入描述: 多组数据,每行为一个长度不超过30位的十进制非负整数。 (注意是10进制数字的个数可能有30个,而非30bits的整数) 输出描述ÿ…...

vue3多条件搜索功能
搜索功能在后台管理页面中非常常见,本篇就着重讲一下vue3-admin-element框架中如何实现一个顶部多条件搜索功能 一、首先需要在vue页面的<template></template>中写入对应的结构 <!-- 搜索 --><div style"display: flex; justify-content…...

C++20协程
目录 协程原理: 进程、线程和协程的区别和联系编辑 协程在IO多路复用中 协程的目的: 协程的优势: 协程原理: (学习来源:幼麟实验室) 线程是进程中的执行体,拥有一个…...

Zabbix 6.0 监控其他
文章目录 一、Zabbix 监控 Windows 系统1)下载 Windows 客户端 Zabbix agent 22)安装客户端,配置3)在服务端 Web 页面添加主机,关联模板 二、Zabbix 监控 java 应用1)客户端开启 java jmxremote 远程监控功…...

Django rest_framework Serializer中的create、Views中的create/perform_create的区别
Django rest_framework Serializer中的create、Views中的create/perform_create的区别 对于后端来说,前后端分离的方式能让前后端的开发都爽。和所有的爽一样,每爽一次都要付出一定的代价。而前后端分离的代价,就是后端要面对巨量的模块化的功…...

差异性分析傻瓜版
path1输入你的第一个Excel path2输入你的第二个Excel DEG.dig <- function(path1,path2) { require(xlsx) require(tidyverse) require(limma) require(edgeR) E<- read.xlsx (path1,sheetIndex 1,header 1) %>% column_to_rownames(var &…...

Keystone Automotive EDI 需求分析
Keystone Automotive 是一家知名的汽车零部件销售卖场,自创立以来,在汽车行业取得了卓越的成就。作为一家专业的汽车零部件供应商,Keystone Automotive 致力于为客户提供优质的产品和卓越的服务。公司的经营范围涵盖广泛,涉及多个…...

jmeter创建一个压测项目
1.jemeter新建一个项目: 2.接下来对Thread进行描述,也可以先使用默认的Thread进行操作。 3.添加http请求头的信息。按照如图所示操作 4.在请求头里面添加必要的字段,可以只填必要字段就可以 5.添加Http请求信息,如下图ÿ…...

CEC2013(MATLAB):淘金优化算法GRO求解CEC2013的28个函数
一、淘金优化算法GRO 淘金优化算法(Gold rush optimizer,GRO)由Kamran Zolf于2023年提出,其灵感来自淘金热,模拟淘金者进行黄金勘探行为。淘金优化算法(Gold rush optimizer,GRO)提…...

AI Deep Reinforcement Learning Autonomous Driving(深度强化学习自动驾驶)
AI Deep Reinforcement Learning Autonomous Driving(深度强化学习自动驾驶) 背景介绍研究背景研究目的及意义项目设计内容算法介绍马尔可夫链及马尔可夫决策过程强化学习神经网络 仿真平台OpenAI gymTorcs配置GTA5 参数选择行动空间奖励函数 环境及软件…...

Java super
在Java中,关键字"super"用于引用一个类的父类。它可以有以下几种用法: 1. 访问父类成员:通过使用"super"后跟一个点,你可以从子类中访问父类的成员(方法或字段)。当子类重写一个方法或…...

【人工智能前沿弄潮】——生成式AI系列:Diffusers学习(1)了解Pipeline 、模型和scheduler
Diffusers旨在成为一个用户友好且灵活的工具箱,用于构建针对您的用例量身定制的扩散系统。工具箱的核心是模型和scheduler。虽然DiffusionPipeline为了方便起见将这些组件捆绑在一起,但您也可以拆分管道并单独使用模型和scheduler来创建新的扩散系统。 …...