Lazy loaded image
🛶Flume日志采集
字数 6032阅读时长 16 分钟
2020-1-13
2025-8-13
type
status
date
slug
summary
tags
category
icon
password

flume定义

flume是cloudera提供的一个高可用、高可靠的,分布式海量日志采集、聚合和传输的系统,flume基于流式架构,灵活简单
notion image

Flume基础架构

notion image

Agent

Agent是一个JVM进程,它以时间的形式把数据从源头送至目的地。它主要有三个部分组成source、channel和sink

Source

Source是负责接收数据到Flume Agent的组件,Source组件可以处理各种类型、格式的日志数据

Sink

Sink不断地轮询Channel中的事件且批量的移除他们,并将这些时间批量写入到存储或者索引系统,或者发送到另一个FlumeAgent里面

Channel

Channel是位于Source和Sink之间的缓冲, 因此Channel允许Source和Sink运作在不同的速率上,Channel是线程安全的,可以同时处理几个Source的写入和Sink的读取操作
Flume自带两种Channel:MemoryChannel和File Channel
Memory~是内存中的队列,通常在不需要关心数据丢失的情境下使用,程序死亡,机器宕机或者重启都会导致数据丢失
FIle~将所有事件写到资产,因此安全性上比Memory要好

Event

Flume数据传输的基本单元,以Event的行事把数据从源头发送到目的地,Event由Header和Body两部分组成,
Header用来存放该event 的一些属性为KV结构
Body用来存放该条数据,形式为字节数组

Flume安装部署

监控端口数据官方案例

需求:使用flume监控一个端口, 收集该端口数据,并打印到控制台

实时监控目录下的多个追加文件

taildikr Source适合用于监听多个实时追加的文件,并且能够实现断点续传
notion image
实现步骤

Flume事务

notion image

Flume内部原理

notion image
ChannelSelector:
作用是选出Event将要被发往哪个channel,有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)
ReplicatingSelector会将同一个Event发往所有的Channel,Mutiplexing会根据相应的原则,将不同的Event发往不同的Channel
SinkProcessor:
有三种类型,分别是DefaultSinkProcessor(默认1对 1)LoadBalancingSinkProcessor(负载均衡)和FailoverSinkProcessor(故障转移)
默认1对1 对应的是单个的Sink
LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,前者可以实现负载均衡,后者可实现错误恢复

复制

使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local System
notion image
实现步骤:
1. 准备工作
2.创建flume1.conf
3.创建flume2.conf
4.创建flume3.conf
5.启动hadoop
6.执行配置文件
 
7.向监控的文件传入数据
8.检查HDFS数据和/opt/module/datas/flume3目录中数据
notion image

多路复用及拦截器的使用

需求:使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume的channel selecter中的Multiplexing结构,Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。 在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统
notion image
实现步骤
1.创建一个maven项目,并引入以下依赖。
2.定义CustomInterceptor类并实现Interceptor接口。
3.编辑flume1配置文件
4.配置flume2
5.配置flume3
6分别启动flume2、flume3、flume1
7.在hadoop102上使用netcat向localhost:4444上发送字母和数字
8.观察flume2和flume3打印的日志

聚合

案例需求:
hadoop102上的Flume-1监控文件/opt/module/flume/files1/.*file.*,
hadoop103上的Flume-2监控某一个端口的数据流,
Flume-1与Flume-2将数据发送给hadoop104上的Flume-3,Flume-3将最终数据打印到控制台。
notion image
实现
1.准备工作①分发Flume②在hadoop102、hadoop103以及104上的/opt/module/flume/conf目录下创建一个group3文件夹
2.创建Flume1.conf
3.创建flume2.conf
4.创建flume3.conf
5.执行配置文件
6.在hadoop102上向/opt/module/flume/目录下的group.log追加内容
7.在hadoop103上向44444端口发送数据
8.检查hadoop104上数据
notion image

Flume数据流监控

Ganglia
Ganglia由gmond、gmetad和gweb三部分组成。 gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用gmond,你可以很容易收集很多系统指标数据,如CPU、内存、磁盘、网络和活跃进程的数据等。 gmetad(Ganglia Meta Daemon)整合所有信息,并将其以RRD格式存储至磁盘的服务。 gweb(Ganglia Web)Ganglia可视化工具,gweb是一种利用浏览器显示gmetad所存储数据的PHP前端。在Web界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。
安装ganglia
1.规划
2.在102、103、104上分别安装epel-release依赖
3.在102上安装监控软件的三个组成部分
4.在103和104上安装监控软件
5.在102修改配置文件/etc/httpd/conf.d/ganglia.conf
6.在102 103 104修改配置文件/etc/ganglia/gmond.conf
7.在102修改配置文件/etc/selinux/config
8.启动ganglia
9.在网页上浏览ganglia

操作flume监控

1.启动ganglia任务
2.发送数据观察到ganglia检测图
notion image
图例
字段(图表名称)
字段含义
EventPutAttemptCount
source尝试写入channel的事件总数量
EventPutSuccessCount
成功写入channel且提交的事件总数量
EventTakeAttemptCount
sink尝试从channel拉取事件的总数量。
EventTakeSuccessCount
sink成功读取的事件的总数量
StartTime
channel启动的时间(毫秒)
StopTime
channel停止的时间(毫秒)
ChannelSize
目前channel中事件的总数量
ChannelFillPercentage
channel占用百分比
ChannelCapacity
channel的容量

企业真实面试题

5.1 Flume组成,Put事务,Take事务

1)taildir source
(1)断点续传、多目录
(2)哪个flume版本产生的?Apache1.7、CDH1.6
(3)没有断点续传功能时怎么做的? 自定义
(4)taildir挂了怎么办?
不会丢数:断点续传
重复数据:
(5)怎么处理重复数据?
①不处理
生产环境通常不处理,因为会影响传输效率
②处理:
  • 自身:在taildirsource里面增加自定义事务
  • 找兄弟:下一级处理(hive dwd sparkstreaming flink布隆)、去重手段(groupby、开窗取窗口第一条、redis)
(6)taildir source 是否支持递归遍历文件夹读取文件?
不支持。  自定义,递归遍历文件夹 + 读取文件
2)file channel /memory channel
(1)file channel
数据存储于磁盘,优势:可靠性高;劣势:传输速度低
默认容量:100万event
注意:FileChannel可以通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
(2)memory channel
  • 数据存储于内存,优势:传输速度快;劣势:可靠性差
  • 默认容量:100个event
3)HDFS sink
(1)时间(1小时-2小时)or 大小128m、event个数(0禁止)
具体参数:hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0
4)事务
Source到Channel是Put事务
Channel到Sink是Take事务

5.2 Flume拦截器

1)拦截器注意事项
项目中自定义了:ETL拦截器。
采用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些
2)自定义拦截器步骤
(1)实现 Interceptor
(2)重写四个方法
 1. initialize 初始化
 2. public Event intercept(Event event) 处理单个Event
  1. public List<Event> intercept(List<Event> events) 处理多个Event,在这个方法中调用Event intercept(Event event)
  1. close 方法
(3)静态内部类,实现Interceptor.Builder
3)拦截器可以不用吗?
可以不用;需要在下一级hive的dwd层和SparkSteaming里面处理
优势:只处理一次,轻度处理;劣势:影响性能,不适合做实时推荐这种对实时要求比较高的场景。

5.3 Flume Channel选择器

notion image

5.4 Flume监控器

1)采用Ganglia监控器,监控到Flume尝试提交的次数远远大于最终成功的次数,说明Flume运行比较差。通常是因为内存不充足导致,所有提高内存是比较好的办法。
2)解决办法?
(1)自身:增加内存flume-env.sh   4-6g
  • Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
(2)找朋友:增加服务器台数
搞活动 618  ⇒ 增加服务器 ⇒ 用完在退出

5.5 Flume采集数据会丢失吗?

根据Flume的架构原理,Flume是不可能丢失数据的,其内部有完善的事务机制,Source到Channel是事务性的,Channel到Sink是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是Channel采用memoryChannel,agent宕机导致数据丢失,或者Channel存储数据已满,导致Source不再写入,未写入的数据丢失。
Flume不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由Sink发出,但是没有接收到响应,Sink会再次发送数据,此时可能会导致数据的重复。
 
上一篇
Git总结——多人开发 Git 分支管理
下一篇
Maven包依赖工具管理