type
status
date
slug
summary
tags
category
icon
password
flume定义
flume是cloudera提供的一个高可用、高可靠的,分布式海量日志采集、聚合和传输的系统,flume基于流式架构,灵活简单

Flume基础架构

Agent
Agent是一个JVM进程,它以时间的形式把数据从源头送至目的地。它主要有三个部分组成source、channel和sink
Source
Source是负责接收数据到Flume Agent的组件,Source组件可以处理各种类型、格式的日志数据
Sink
Sink不断地轮询Channel中的事件且批量的移除他们,并将这些时间批量写入到存储或者索引系统,或者发送到另一个FlumeAgent里面
Channel
Channel是位于Source和Sink之间的缓冲, 因此Channel允许Source和Sink运作在不同的速率上,Channel是线程安全的,可以同时处理几个Source的写入和Sink的读取操作
Flume自带两种Channel:MemoryChannel和File Channel
Memory~是内存中的队列,通常在不需要关心数据丢失的情境下使用,程序死亡,机器宕机或者重启都会导致数据丢失
FIle~将所有事件写到资产,因此安全性上比Memory要好
Event
Flume数据传输的基本单元,以Event的行事把数据从源头发送到目的地,Event由Header和Body两部分组成,
Header用来存放该event 的一些属性为KV结构
Body用来存放该条数据,形式为字节数组
Flume安装部署
监控端口数据官方案例
需求:使用flume监控一个端口, 收集该端口数据,并打印到控制台
实时监控目录下的多个追加文件
taildikr Source适合用于监听多个实时追加的文件,并且能够实现断点续传

实现步骤
Flume事务

Flume内部原理

ChannelSelector:
作用是选出Event将要被发往哪个channel,有两种类型,分别是Replicating(复制)和Multiplexing(多路复用)
ReplicatingSelector会将同一个Event发往所有的Channel,Mutiplexing会根据相应的原则,将不同的Event发往不同的Channel
SinkProcessor:
有三种类型,分别是DefaultSinkProcessor(默认1对 1)LoadBalancingSinkProcessor(负载均衡)和FailoverSinkProcessor(故障转移)
默认1对1 对应的是单个的Sink
LoadBalancingSinkProcessor和FailoverSinkProcessor对应的是Sink Group,前者可以实现负载均衡,后者可实现错误恢复
复制
使用Flume-1监控文件变动,Flume-1将变动内容传递给Flume-2,Flume-2负责存储到HDFS。同时Flume-1将变动内容传递给Flume-3,Flume-3负责输出到Local System

实现步骤:
1. 准备工作
2.创建flume1.conf
3.创建flume2.conf
4.创建flume3.conf
5.启动hadoop
6.执行配置文件
7.向监控的文件传入数据
8.检查HDFS数据和/opt/module/datas/flume3目录中数据

多路复用及拦截器的使用
需求:使用Flume采集服务器本地日志,需要按照日志类型的不同,将不同种类的日志发往不同的分析系统。
在实际的开发中,一台服务器产生的日志类型可能有很多种,不同类型的日志可能需要发送到不同的分析系统。此时会用到Flume的channel selecter中的Multiplexing结构,Multiplexing的原理是,根据event中Header的某个key的值,将不同的event发送到不同的Channel中,所以我们需要自定义一个Interceptor,为不同类型的event的Header中的key赋予不同的值。
在该案例中,我们以端口数据模拟日志,以数字(单个)和字母(单个)模拟不同类型的日志,我们需要自定义interceptor区分数字和字母,将其分别发往不同的分析系统

实现步骤
1.创建一个maven项目,并引入以下依赖。
2.定义CustomInterceptor类并实现Interceptor接口。
3.编辑flume1配置文件
4.配置flume2
5.配置flume3
6分别启动flume2、flume3、flume1
7.在hadoop102上使用netcat向localhost:4444上发送字母和数字
8.观察flume2和flume3打印的日志
聚合
案例需求:
hadoop102上的Flume-1监控文件/opt/module/flume/files1/.*file.*,
hadoop103上的Flume-2监控某一个端口的数据流,
Flume-1与Flume-2将数据发送给hadoop104上的Flume-3,Flume-3将最终数据打印到控制台。

实现
1.准备工作①分发Flume②在hadoop102、hadoop103以及104上的/opt/module/flume/conf目录下创建一个group3文件夹
2.创建Flume1.conf
3.创建flume2.conf
4.创建flume3.conf
5.执行配置文件
6.在hadoop102上向/opt/module/flume/目录下的group.log追加内容
7.在hadoop103上向44444端口发送数据
8.检查hadoop104上数据

Flume数据流监控
Ganglia
Ganglia由gmond、gmetad和gweb三部分组成。
gmond(Ganglia Monitoring Daemon)是一种轻量级服务,安装在每台需要收集指标数据的节点主机上。使用gmond,你可以很容易收集很多系统指标数据,如CPU、内存、磁盘、网络和活跃进程的数据等。
gmetad(Ganglia Meta Daemon)整合所有信息,并将其以RRD格式存储至磁盘的服务。
gweb(Ganglia Web)Ganglia可视化工具,gweb是一种利用浏览器显示gmetad所存储数据的PHP前端。在Web界面中以图表方式展现集群的运行状态下收集的多种不同指标数据。
安装ganglia
1.规划
2.在102、103、104上分别安装epel-release依赖
3.在102上安装监控软件的三个组成部分
4.在103和104上安装监控软件
5.在102修改配置文件/etc/httpd/conf.d/ganglia.conf
6.在102 103 104修改配置文件/etc/ganglia/gmond.conf
7.在102修改配置文件/etc/selinux/config
8.启动ganglia
9.在网页上浏览ganglia
操作flume监控
1.启动ganglia任务
2.发送数据观察到ganglia检测图

图例
ㅤ | ㅤ |
字段(图表名称) | 字段含义 |
EventPutAttemptCount | source尝试写入channel的事件总数量 |
EventPutSuccessCount | 成功写入channel且提交的事件总数量 |
EventTakeAttemptCount | sink尝试从channel拉取事件的总数量。 |
EventTakeSuccessCount | sink成功读取的事件的总数量 |
StartTime | channel启动的时间(毫秒) |
StopTime | channel停止的时间(毫秒) |
ChannelSize | 目前channel中事件的总数量 |
ChannelFillPercentage | channel占用百分比 |
ChannelCapacity | channel的容量 |
企业真实面试题
5.1 Flume组成,Put事务,Take事务
1)taildir source
(1)断点续传、多目录
(2)哪个flume版本产生的?Apache1.7、CDH1.6
(3)没有断点续传功能时怎么做的? 自定义
(4)taildir挂了怎么办?
不会丢数:断点续传
重复数据:
(5)怎么处理重复数据?
①不处理
生产环境通常不处理,因为会影响传输效率
②处理:
- 自身:在taildirsource里面增加自定义事务
- 找兄弟:下一级处理(hive dwd sparkstreaming flink布隆)、去重手段(groupby、开窗取窗口第一条、redis)
(6)taildir source 是否支持递归遍历文件夹读取文件?
不支持。 自定义,递归遍历文件夹 + 读取文件
2)file channel /memory channel
(1)file channel
数据存储于磁盘,优势:可靠性高;劣势:传输速度低
默认容量:100万event
注意:FileChannel可以通过配置dataDirs指向多个路径,每个路径对应不同的硬盘,增大Flume吞吐量。
(2)memory channel
- 数据存储于内存,优势:传输速度快;劣势:可靠性差
- 默认容量:100个event
3)HDFS sink
(1)时间(1小时-2小时)or 大小128m、event个数(0禁止)
具体参数:hdfs.rollInterval=3600,hdfs.rollSize=134217728,hdfs.rollCount =0
4)事务
Source到Channel是Put事务
Channel到Sink是Take事务
5.2 Flume拦截器
1)拦截器注意事项
项目中自定义了:ETL拦截器。
采用两个拦截器的优缺点:优点,模块化开发和可移植性;缺点,性能会低一些
2)自定义拦截器步骤
(1)实现 Interceptor
(2)重写四个方法
1. initialize 初始化
2. public Event intercept(Event event) 处理单个Event
- public List<Event> intercept(List<Event> events) 处理多个Event,在这个方法中调用Event intercept(Event event)
- close 方法
(3)静态内部类,实现Interceptor.Builder
3)拦截器可以不用吗?
可以不用;需要在下一级hive的dwd层和SparkSteaming里面处理
优势:只处理一次,轻度处理;劣势:影响性能,不适合做实时推荐这种对实时要求比较高的场景。
5.3 Flume Channel选择器

5.4 Flume监控器
1)采用Ganglia监控器,监控到Flume尝试提交的次数远远大于最终成功的次数,说明Flume运行比较差。通常是因为内存不充足导致,所有提高内存是比较好的办法。
2)解决办法?
(1)自身:增加内存flume-env.sh 4-6g
- Xmx与-Xms最好设置一致,减少内存抖动带来的性能影响,如果设置不一致容易导致频繁fullgc。
(2)找朋友:增加服务器台数
搞活动 618 ⇒ 增加服务器 ⇒ 用完在退出
5.5 Flume采集数据会丢失吗?
根据Flume的架构原理,Flume是不可能丢失数据的,其内部有完善的事务机制,Source到Channel是事务性的,Channel到Sink是事务性的,因此这两个环节不会出现数据的丢失,唯一可能丢失数据的情况是Channel采用memoryChannel,agent宕机导致数据丢失,或者Channel存储数据已满,导致Source不再写入,未写入的数据丢失。
Flume不会丢失数据,但是有可能造成数据的重复,例如数据已经成功由Sink发出,但是没有接收到响应,Sink会再次发送数据,此时可能会导致数据的重复。
- 作者:tacjin
- 链接:http://jin.wiki/article/98b6eb50-be1b-432b-baba-7f9614e6c050
- 声明:本文采用 CC BY-NC-SA 4.0 许可协议,转载请注明出处。