Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比

简介: Hadoop-20 Flume 采集数据双写至本地+HDFS中 监控目录变化 3个Agent MemoryChannel Source对比

章节内容

上一节完成了如下的内容:


编写Agent Conf配置文件

收集Hive数据

汇聚到HDFS中

测试效果

背景介绍

这里是三台公网云服务器,每台 2C4G,搭建一个Hadoop的学习环境,供我学习。

之前已经在 VM 虚拟机上搭建过一次,但是没留下笔记,这次趁着前几天薅羊毛的3台机器,赶紧尝试在公网上搭建体验一下。


2C4G 编号 h121

2C4G 编号 h122

2C2G 编号 h123

文档推荐

除了官方文档以外,这里有一个写的很好的中文文档:

https://flume.liyifeng.org/

监控目录

业务需求

想要监控指定目录 收集信息并上传到HDFS中

Source

选择 spooldir,因为 spooldir 能够保证数据不丢失,且能够进行断点续传,但是延迟较高,不能实时监控。


Channel

选择 memory


Sink

选择 HDFS


需要注意

拷贝到 spool 目录下的文件 不可以再打开编辑

无法监控子目录的文件夹变动

被监控文件夹每500毫秒 扫描一次文件变动

适合用于同步新文件,但不适合对实时追加日志的文件进行监听并同步

配置文件

cd /opt/wzk/flume_test
vim flume_spooldir-hdfs.conf

我们需要写入如下内容

# Name the components on this agent
a3.sources = r3
a3.channels = c3
a3.sinks = k3
# Describe/configure the source
a3.sources.r3.type = spooldir
# 注意这里的文件夹 换成自己的!!!
a3.sources.r3.spoolDir = /opt/wzk/upload
a3.sources.r3.fileSuffix = .COMPLETED
a3.sources.r3.fileHeader = true

# 忽略以.tmp结尾的文件,不上传
a3.sources.r3.ignorePattern = ([^ ]*\.tmp)
# Use a channel which buffers events in memory
a3.channels.c3.type = memory
a3.channels.c3.capacity = 10000
a3.channels.c3.transactionCapacity = 500
# Describe the sink
a3.sinks.k3.type = hdfs
# 注意修改成你自己的IP!!!
a3.sinks.k3.hdfs.path = hdfs://h121.wzk.icu:9000/flume/upload/%Y%m%d/%H%M

# 上传文件的前缀
a3.sinks.k3.hdfs.filePrefix = upload-
# 是否使用本地时间戳
a3.sinks.k3.hdfs.useLocalTimeStamp = true
# 积攒500个Event,flush到HDFS一次
a3.sinks.k3.hdfs.batchSize = 500
# 设置文件类型
a3.sinks.k3.hdfs.fileType = DataStream
# 60秒滚动一次
a3.sinks.k3.hdfs.rollInterval = 60
# 128M滚动一次
a3.sinks.k3.hdfs.rollSize = 134217700
# 文件滚动与event数量无关
a3.sinks.k3.hdfs.rollCount = 0
# 最小冗余数
a3.sinks.k3.hdfs.minBlockReplicas = 1

# Bind the source and sink to the channel
a3.sources.r3.channels = c3
a3.sinks.k3.channel = c3

启动Agent

$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file flume-spooldir-hdfs.conf \
-Dflume.root.logger=INFO,console

测试效果

Flume

cd /opt/wzk/upload
vim 1.txt

随便向其中写入一些内容,并保存,可以看到Flume已经有反应了。

HDFS

查看HDFS,也已经有内容了

采集双写

这里业务上需要:

  • Flume将数据写入本地
  • Flume将数据写入HDFS

分析实现

  • 需要多个Agent级联实现
  • Source选择taildir
  • Channel选择memory
  • 最终的Sink分别选择HDFS,file_roll

配置文件1

配置文件包含如下内容:

  • 1个 taildir source
  • 2个 memory channel
  • 2个 avro sink

新建文件

vim flume-taildir-avro.conf

写入如下内容

# Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# 将数据流复制给所有channel
a1.sources.r1.selector.type = replicating
# source
a1.sources.r1.type = taildir
# 记录每个文件最新消费位置
a1.sources.r1.positionFile = /root/flume/taildir_position.json
a1.sources.r1.filegroups = f1
# 备注:.*log 是正则表达式;这里写成 *.log 是错误的
a1.sources.r1.filegroups.f1 = /tmp/root/.*log
# sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = linux123
a1.sinks.k1.port = 9091
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = linux123
a1.sinks.k2.port = 9092
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 500
a1.channels.c2.type = memory
a1.channels.c2.capacity = 10000
a1.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2

配置文件2

配置文件包含如下内容:

  • 1个 avro source
  • 1个 memory channel
  • 1个 hdfs sink

新建配置文件

vim flume-avro-hdfs.conf

写入如下的内容:

# Name the components on this agent
a2.sources = r1
a2.sinks = k1
a2.channels = c1
# Describe/configure the source
a2.sources.r1.type = avro
a2.sources.r1.bind = linux123
a2.sources.r1.port = 9091
# Describe the channel
a2.channels.c1.type = memory
a2.channels.c1.capacity = 10000
a2.channels.c1.transactionCapacity = 500
# Describe the sink
a2.sinks.k1.type = hdfs
a2.sinks.k1.hdfs.path = hdfs://linux121:8020/flume2/%Y%m%d/%H
# 上传文件的前缀
a2.sinks.k1.hdfs.filePrefix = flume2-
# 是否使用本地时间戳
a2.sinks.k1.hdfs.useLocalTimeStamp = true
# 500个Event才flush到HDFS一次
a2.sinks.k1.hdfs.batchSize = 500
# 设置文件类型,可支持压缩
a2.sinks.k1.hdfs.fileType = DataStream
# 60秒生成一个新的文件
a2.sinks.k1.hdfs.rollInterval = 60
a2.sinks.k1.hdfs.rollSize = 0
a2.sinks.k1.hdfs.rollCount = 0
a2.sinks.k1.hdfs.minBlockReplicas = 1
# Bind the source and sink to the channel
a2.sources.r1.channels = c1
a2.sinks.k1.channel = c1

配置文件3

配置文件包含如下内容:

  • 1个 avro source
  • 1个 memory channel
  • 1个 file_roll sink

新建配置文件

vim flume-avro-file.conf

写入如下的内容

# Name the components on this agent
a3.sources = r1
a3.sinks = k1
a3.channels = c2
# Describe/configure the source
a3.sources.r1.type = avro
a3.sources.r1.bind = linux123
a3.sources.r1.port = 9092
# Describe the sink
a3.sinks.k1.type = file_roll
# 目录需要提前创建好
a3.sinks.k1.sink.directory = /root/flume/output
# Describe the channel
a3.channels.c2.type = memory
a3.channels.c2.capacity = 10000
a3.channels.c2.transactionCapacity = 500
# Bind the source and sink to the channel
a3.sources.r1.channels = c2
a3.sinks.k1.channel = c2

启动Agent1

$FLUME_HOME/bin/flume-ng agent --name a3 \
--conf-file ~/conf/flume-avro-file.conf \
-Dflume.root.logger=INFO,console &

启动Agent2

$FLUME_HOME/bin/flume-ng agent --name a2 \
--conf-file ~/conf/flume-avro-hdfs.conf \
-Dflume.root.logger=INFO,console &

启动Agent3

$FLUME_HOME/bin/flume-ng agent --name a1 \
--conf-file ~/conf/flume-taildir-avro.conf \
-Dflume.root.logger=INFO,console &


Hive测试

hive -e "show databases;"


目录
相关文章
|
9月前
|
XML 存储 分布式计算
【赵渝强老师】史上最详细:Hadoop HDFS的体系架构
HDFS(Hadoop分布式文件系统)由三个核心组件构成:NameNode、DataNode和SecondaryNameNode。NameNode负责管理文件系统的命名空间和客户端请求,维护元数据文件fsimage和edits;DataNode存储实际的数据块,默认大小为128MB;SecondaryNameNode定期合并edits日志到fsimage中,但不作为NameNode的热备份。通过这些组件的协同工作,HDFS实现了高效、可靠的大规模数据存储与管理。
1084 70
|
分布式计算 Kubernetes Hadoop
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
大数据-82 Spark 集群模式启动、集群架构、集群管理器 Spark的HelloWorld + Hadoop + HDFS
528 6
|
Prometheus 监控 Java
深入探索:自制Agent监控API接口耗时实践
在微服务架构中,监控API接口的调用耗时对于性能优化至关重要。通过监控接口耗时,我们可以识别性能瓶颈,优化服务响应速度。本文将分享如何自己动手实现一个Agent来统计API接口的调用耗时,提供一种实用的技术解决方案。
475 3
|
监控 数据可视化 Java
深入探索:自制Agent监控API接口耗时
在微服务架构中,监控API接口的调用耗时对于性能优化至关重要。通过监控这些指标,我们可以识别瓶颈,优化系统性能。本文将分享如何自己动手实现一个Agent来统计API接口的调用耗时,提供一种有效的监控解决方案。
360 2
|
SQL 分布式计算 关系型数据库
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-24 Sqoop迁移 MySQL到Hive 与 Hive到MySQL SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
347 0
|
SQL 分布式计算 关系型数据库
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
Hadoop-23 Sqoop 数据MySQL到HDFS(部分) SQL生成数据 HDFS集群 Sqoop import jdbc ETL MapReduce
217 0
|
存储 分布式计算 资源调度
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(一)
288 5
|
资源调度 数据可视化 大数据
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(二)
大数据-04-Hadoop集群 集群群起 NameNode/DataNode启动 3台公网云 ResourceManager Yarn HDFS 集群启动 UI可视化查看 YarnUI(二)
175 4
|
XML 分布式计算 资源调度
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(一)
大数据-02-Hadoop集群 XML配置 超详细 core-site.xml hdfs-site.xml 3节点云服务器 2C4G HDFS Yarn MapRedece(一)
496 5
|
SQL 分布式计算 Hadoop
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
Hadoop-14-Hive HQL学习与测试 表连接查询 HDFS数据导入导出等操作 逻辑运算 函数查询 全表查询 WHERE GROUP BY ORDER BY(一)
254 4

相关实验场景

更多