Flume的使用案例

案例一:单一日志传输avro client

flume-avro-client.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
avro-client-agent.sources = r1
avro-client-agent.sinks = k1
avro-client-agent.channels = c1

# Describe/configure the source
avro-client-agent.sources.r1.type = avro
avro-client-agent.sources.r1.bind = localhost
avro-client-agent.sources.r1.port = 41414
#注意这个端口名,在后面的教程中会用得到

# Describe the sink
avro-client-agent.sinks.k1.type = logger

# Use a channel which buffers events in memory
avro-client-agent.channels.c1.type = memory

# Bind the source and sink to the channel
avro-client-agent.sources.r1.channels = c1
avro-client-agent.sinks.k1.channel = c1

文本准备

1
2
3
[hadoop@hadoop001 ~]$ echo "flume" >> data/flume-avro-client.test
[hadoop@hadoop001 ~]$ cat data/flume-avro-client.test
flume

sink启动命令

1
2
3
4
5
flume-ng agent \
--name avro-client-agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume-avro-client.conf \
-Dflume.root.logger=INFO,console

source启动命令

1
2
3
4
5
flume-ng avro-client \
--conf $FLUME_HOME/conf \
-H localhost \
-p 41414 \
-F ~/data/flume-avro-client.test

结果:

source端:

命令执行,event传输完后会退出。退出后再往文件添加数据,并不会传输到sink端,所以avro client的方式不适合于增量data。

sink端:

1
[INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 66 6C 75 6D 65                                  flume }

案例二:监控端口数据

需求:监听localhost机器的44444端口,接收到数据sink到终端

  1. 创建agent配置文件

    1
    2
    [hadoop@hadoop001 ~]$ cd app/flume/conf/
    [hadoop@hadoop001 conf]$ vi example.conf

    example.conf

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    # example.conf: A single-node Flume configuration
    #需求:监听localhost机器的44444端口,接收到数据sink到终端

    # Name the components on this agent 配置各种名字
    a1.sources = r1 #配置source的名字
    a1.sinks = k1 #配置sink的名字
    a1.channels = c1 #配置channel的名字

    # Describe/configure the source 配置source的基本属性
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444

    # Use a channel which buffers events in memory 配置channel的基本属性
    a1.channels.c1.type = memory

    # Describe the sink 配置sink的基本属性
    a1.sinks.k1.type = logger

    # Bind the source and sink to the channel 连线
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    This configuration defines a single agent named a1. a1 has a source that listens for data on port 44444, a channel that buffers event data in memory, and a sink that logs event data to the console. The configuration file names the various components, then describes their types and configuration parameters. A given configuration file might define several named agents; when a given Flume process is launched a flag is passed telling it which named agent to manifest.

  2. 启动flume agent a1

    1
    2
    3
    4
    5
    flume-ng agent \
    --name a1 \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/conf/example.conf \
    -Dflume.root.logger=INFO,console
  3. 在另一个终端telnet localhost 44444 发送event

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    [hadoop@hadoop001 ~]$ telnet localhost 44444
    Trying ::1...
    telnet: connect to address ::1: Connection refused
    Trying 127.0.0.1...
    Connected to localhost.
    Escape character is '^]'.
    1
    OK
    2
    OK
    3
    OK
    a
    OK
    b
    OK

    OK
  4. 第一个终端的日志控制台也会有相应的显示

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    [hadoop@hadoop001 conf]$ flume-ng agent \
    > --name a1 \
    > --conf $FLUME_HOME/conf \
    > --conf-file $FLUME_HOME/conf/example.conf \
    > -Dflume.root.logger=INFO,console
    Info: Sourcing environment configuration script /home/hadoop/app/flume/conf/flume-env.sh
    Info: Including Hadoop libraries found via (/home/hadoop/app/hadoop/bin/hadoop) for HDFS access
    Info: Including Hive libraries found via (/home/hadoop/app/hive) for Hive access
    .
    .
    .
    2022-05-16 21:57:12,297 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:166)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]
    2022-05-16 21:58:30,762 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 0D 1. }
    2022-05-16 21:58:36,151 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32 0D 2. }
    2022-05-16 21:58:38,455 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 33 0D 3. }
    2022-05-16 21:58:39,071 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 61 0D a. }
    2022-05-16 21:58:39,847 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 62 0D b. }
    2022-05-16 22:02:54,721 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: E5 95 8A 0D .... }

    补充一点,flume只能传递英文和字符,不能用中文。

HDFS Sink跟写文件相关配置

hdfs.path -> hdfs目录路径

hdfs.filePrefix -> 文件前缀。默认值FlumeData

hdfs.fileSuffix -> 文件后缀

hdfs.rollInterval -> 多久时间后close hdfs文件。单位是秒,默认30秒。设置为0的话表示不根据时间close hdfs文件

hdfs.rollSize -> 文件大小超过一定值后,close文件。默认值1024,单位是字节。设置为0的话表示不基于文件大小

hdfs.rollCount -> 写入了多少个事件后close文件。默认值是10个。设置为0的话表示不基于事件个数

hdfs.fileType -> 文件格式, 有3种格式可选择:SequenceFile, DataStream or CompressedStream

hdfs.batchSize -> 批次数,HDFS Sink每次从Channel中拿的事件个数。默认值100(与事务有关)

hdfs.minBlockReplicas -> HDFS每个块最小的replicas数字,不设置的话会取hadoop中的配置

hdfs.maxOpenFiles -> 允许最多打开的文件数,默认是5000。如果超过了这个值,越早的文件会被关闭

serializer -> HDFS Sink写文件的时候会进行序列化操作。会调用对应的Serializer借口,可以自定义符合需求的Serializer

hdfs.retryInterval -> 关闭HDFS文件失败后重新尝试关闭的延迟数,单位是秒

hdfs.callTimeout -> HDFS操作允许的时间,比如hdfs文件的open,write,flush,close操作。单位是毫秒,默认值是10000

hdfs.rollInterval,hdfs.rollSize,hdfs.rollCount,hdfs.minBlockReplicas,hdfs.batchSize这5个配置影响着hdfs文件的关闭。

注意,这5个配置影响的是一个hdfs文件,是一个hdfs文件。当hdfs文件关闭的时候,这些配置指标会重新开始计算。因为BucketWriter中的open方法里会调用resetCounters方法,这个方法会重置计数器。而基于hdfs.rollInterval的timedRollFuture线程返回值是在close方法中被销毁的。因此,只要close文件,并且open新文件的时候,这5个属性都会重新开始计算。

hdfs.rollInterval与时间有关,当时间达到hdfs.rollInterval配置的秒数,那么会close文件。

hdfs.rollSize与每个event的字节大小有关,当一个一个event的字节相加起来大于等于hdfs.rollSize的时候,那么会close文件。

hdfs.rollCount与事件的个数有关,当事件个数大于等于hdfs.rollCount的时候,那么会close文件。

hdfs.batchSize表示当事件添加到hdfs.batchSize个的时候,也就是说HDFS Sink每次会拿hdfs.batchSize个事件,而且这些所有的事件都写进了同一个hdfs文件,这才会触发本次条件,并且其他4个配置都未达成条件。然后会close文件。

hdfs.minBlockReplicas表示期望hdfs对文件最小的复制块数。所以有时候我们配置了hdfs.rollInterval,hdfs.rollSize,hdfs.rollCount这3个参数,并且这3个参数都没有符合条件,但是还是生成了多个文件,这就是因为这个参数导致的,而且这个参数的优先级比hdfs.rollSize,hdfs.rollCount要高。(该参数没有设置则与hadoop conf中的一致)

案例三:实时监控单个追加文件(不支持断点续传)

flume-exec-hdfs.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#define agent
exec-hdfs-agent.sources = exec-source
exec-hdfs-agent.channels = exec-memory-channel
exec-hdfs-agent.sinks = hdfs-sink

#define source
exec-hdfs-agent.sources.exec-source.type = exec
exec-hdfs-agent.sources.exec-source.command = tail -F ~/data/flume-exec.test
exec-hdfs-agent.sources.exec-source.shell = /bin/bash -c

#define channel
exec-hdfs-agent.channels.exec-memory-channel.type = memory

#define sink
exec-hdfs-agent.sinks.hdfs-sink.type = hdfs
exec-hdfs-agent.sinks.hdfs-sink.hdfs.path = hdfs://hadoop001:9000/user/hadoop/data/flume/tail
exec-hdfs-agent.sinks.hdfs-sink.hdfs.fileType = DataStream
exec-hdfs-agent.sinks.hdfs-sink.hdfs.writeFormat = Text
exec-hdfs-agent.sinks.hdfs-sink.hdfs.batchSize = 5

#bind source and sink to channel
exec-hdfs-agent.sources.exec-source.channels = exec-memory-channel
exec-hdfs-agent.sinks.hdfs-sink.channel = exec-memory-channel

sink启动命令

1
2
3
4
5
flume-ng agent \
--name exec-hdfs-agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/flume-exec-hdfs.conf \
-Dflume.root.logger=INFO,console

执行命令

1
2
[hadoop@hadoop001 ~]$ echo "a" >> ~/data/flume-exec.test
[hadoop@hadoop001 ~]$ for i in {1..100}; do echo "hadoop $i" >> ~/data/flume-exec.test;sleep 0.1;done

结果

  1. 如果~/data/flume-exec.test之前已经有数据,则在agent启动时就会有第一个文件。
  2. 如果第一个文件信息数量超过rollSize,则输入的数据从第二个文件开始。
  3. 结果共有1(先前数据)+ 10 (a和1-99,超过hdfs.rollCount而产生) + 1(数字100,30s没输入,超过hdfs.rollInterval而产生),共12个文件。
  4. 无数据输入不额外产生空文件。
  5. 可以通过设置参数把roll间隔调大,避免小文件问题。

案例四:hdfs-round-agent

hdfs sink每分钟roll一次,并格式化输出路径

hdfs-round-agent.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
hdfs-round-agent.sources = r1
hdfs-round-agent.channels = c1
hdfs-round-agent.sinks = k1

hdfs-round-agent.sources.r1.type = netcat
hdfs-round-agent.sources.r1.bind = localhost
hdfs-round-agent.sources.r1.port = 44444
hdfs-round-agent.sources.r1.interceptors = i1
hdfs-round-agent.sources.r1.interceptors.i1.type = timestamp

hdfs-round-agent.channels.c1.type = memory

hdfs-round-agent.sinks.k1.type = hdfs
hdfs-round-agent.sinks.k1.hdfs.path = data/flume/events/%y-%m-%d/%H%M
hdfs-round-agent.sinks.k1.hdfs.filePrefix = events-
hdfs-round-agent.sinks.k1.hdfs.round = true
hdfs-round-agent.sinks.k1.hdfs.roundValue = 1
hdfs-round-agent.sinks.k1.hdfs.roundUnit = minute
#hdfs-round-agent.sinks.k1.hdfs.useLocalTimeStamp = true

hdfs-round-agent.sources.r1.channels = c1
hdfs-round-agent.sinks.k1.channel = c1

因为sink中用到了时间,所以需要利用时间拦截器在event的header中加入时间,否则报错。

1
2
(SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:459)] process failed
java.lang.NullPointerException: Expected timestamp in the Flume event headers, but it was null

官方有说明:要么利用时间拦截器在header加timestamp,要么启动设置hdfs.useLocalTimeStamp=true

1
Note For all of the time related escape sequences, a header with the key “timestamp” must exist among the headers of the event (unless hdfs.useLocalTimeStamp is set to true). One way to add this automatically is to use the TimestampInterceptor.

sink启动命令

1
2
3
4
5
flume-ng agent \
--name hdfs-round-agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/hdfs-round-agent.conf \
-Dflume.root.logger=INFO,console

执行命令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
[root@hadoop001 ~]# telnet localhost 44444
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
a
OK
b
OK
c
OK

d
OK
e
OK
g
OK

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[hadoop@hadoop001 ~]$ hdfs dfs -ls /user/hadoop/data/flume/events/22-05-17/
Found 3 items
drwxr-xr-x - hadoop supergroup 0 2022-05-17 02:47 /user/hadoop/data/flume/events/22-05-17/0247
drwxr-xr-x - hadoop supergroup 0 2022-05-17 02:48 /user/hadoop/data/flume/events/22-05-17/0248
drwxr-xr-x - hadoop supergroup 0 2022-05-17 02:49 /user/hadoop/data/flume/events/22-05-17/0249
[hadoop@hadoop001 ~]$ hdfs dfs -text /user/hadoop/data/flume/events/22-05-17/0247/*
1652755625761 61 0d
1652755633572 62 0d
1652755634589 63 0d
[hadoop@hadoop001 ~]$ hdfs dfs -text /user/hadoop/data/flume/events/22-05-17/0248/*
1652755722065 64 0d
1652755726784 65 0d
[hadoop@hadoop001 ~]$ hdfs dfs -text /user/hadoop/data/flume/events/22-05-17/0249/*
1652755754582 67 0d

案例五:实时监控目录下多个新文件(不能对文件内容进行实时同步)

spooldir-hdfs-agent.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
spooldir-hdfs-agent.sources = r1
spooldir-hdfs-agent.sinks = k1
spooldir-hdfs-agent.channels = c1

# Describe/configure the source
spooldir-hdfs-agent.sources.r1.type = spooldir
# spoolDir 指定监控的目录
spooldir-hdfs-agent.sources.r1.spoolDir = /home/hadoop/data/flume
# fileSuffix 指定本地文件上传完成后,自动追加的文件后缀
spooldir-hdfs-agent.sources.r1.fileSuffix = .COMPLETED
spooldir-hdfs-agent.sources.r1.fileHeader = true
#忽略所有以.tmp 结尾的文件,不上传
spooldir-hdfs-agent.sources.r1.ignorePattern = ([^ ]*\.tmp)

# Describe the sink
spooldir-hdfs-agent.sinks.k1.type = hdfs
spooldir-hdfs-agent.sinks.k1.hdfs.path = hdfs://hadoop001:9000/user/hadoop/data/flume/events/%Y%m%d/%H

#上传到hdfs后,文件的前缀
spooldir-hdfs-agent.sinks.k1.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
spooldir-hdfs-agent.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
spooldir-hdfs-agent.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
spooldir-hdfs-agent.sinks.k1.hdfs.roundUnit = second
#是否使用本地时间戳
spooldir-hdfs-agent.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
spooldir-hdfs-agent.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
spooldir-hdfs-agent.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
spooldir-hdfs-agent.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
spooldir-hdfs-agent.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量为3
spooldir-hdfs-agent.sinks.k1.hdfs.rollCount = 3

# Use a channel which buffers events in memory
spooldir-hdfs-agent.channels.c1.type = memory

# Bind the source and sink to the channel
spooldir-hdfs-agent.sources.r1.channels = c1
spooldir-hdfs-agent.sinks.k1.channel = c1

文件准备

1
2
3
4
5
6
7
8
9
10
[hadoop@hadoop001 data]$ pwd
/home/hadoop/data
[hadoop@hadoop001 data]$ cat flume.text1
a
b
c
[hadoop@hadoop001 data]$ cat flume.text2
d
e
f

启动命令

1
2
3
4
5
flume-ng agent \
--name spooldir-hdfs-agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/spooldir-hdfs-agent.conf \
-Dflume.root.logger=INFO,console

测试

1
2
3
4
[hadoop@hadoop001 data]$ echo "hello world" > flume/flume.text0
[hadoop@hadoop001 data]$ cp flume.text1 flume/
[hadoop@hadoop001 data]$ cp flume.text2 flume/
[hadoop@hadoop001 data]$ echo "hello world" > flume/flume.text4

结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[hadoop@hadoop001 data]$ ll flume
total 16
-rw-rw-r--. 1 hadoop hadoop 12 May 19 16:09 flume.text0.COMPLETED
-rw-rw-r--. 1 hadoop hadoop 6 May 19 16:09 flume.text1.COMPLETED
-rw-rw-r--. 1 hadoop hadoop 6 May 19 16:09 flume.text2.COMPLETED
-rw-rw-r--. 1 hadoop hadoop 12 May 19 16:10 flume.text4.COMPLETED
[hadoop@hadoop001 data]$ hadoop fs -cat /user/hadoop/data/flume/events/20220519/16/upload-.1652976566664
hello world
a
b
[hadoop@hadoop001 data]$ hadoop fs -cat /user/hadoop/data/flume/events/20220519/16/upload-.1652976566665
c
d
e
[hadoop@hadoop001 data]$ hadoop fs -cat /user/hadoop/data/flume/events/20220519/16/upload-.1652976566666
f
hello world

对于已经标识为.COMPLETED的文件,再次处理或者重名则会报错。.COMPLETED文件内容不会改变。

1
[hadoop@hadoop001 data]$ echo "hello world22" > flume/flume.text4
1
2
[ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:296)] FATAL: Spool Directory source r1: { spoolDir: /home/hadoop/data/flume }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.IllegalStateException: File name has been re-used with different files. Spooling assumptions violated for /home/hadoop/data/flume/flume.text4.COMPLETED

案例六:Flume断点续传Taildir Source

一般的flume日志采集方式会出现重复采集的情况,比如:当某个flume应用挂掉后,重启应用,就会将采集过得日志重复采集。
解决办法:采用断点续传taildir,记录上一次的采集位置,重启应用后,从记录的位置开始采集。

注:Linux 中储存文件元数据的区域就叫做 inode,每个 inode 都有一个号码,操作系统用 inode 号码来识别不同的文件,Unix/Linux 系统内部不使用文件名,而使用 inode 号码来识别文件。文件改名,inode不变。
TAILDIR采用(inode+文件全路径名)作为监听文件的唯一标识(可考虑修改源码,只取inode)

taildir-hdfs-agent.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
taildir-hdfs-agent.sources=r1
taildir-hdfs-agent.sinks=k1
taildir-hdfs-agent.channels=c1

# source的配置
# source类型
taildir-hdfs-agent.sources.r1.type = TAILDIR
# 元数据位置
taildir-hdfs-agent.sources.r1.positionFile = /home/hadoop/data/flume/meta/taildir_position.json
# 监控的目录
taildir-hdfs-agent.sources.r1.filegroups = f1
taildir-hdfs-agent.sources.r1.filegroups.f1=/home/hadoop/data/bd/.*log

# Describe the sink
taildir-hdfs-agent.sinks.k1.type = hdfs
taildir-hdfs-agent.sinks.k1.hdfs.path = hdfs://hadoop001:9000/user/hadoop/data/flume/events/%Y%m%d/%H

#上传到hdfs后,文件的前缀
taildir-hdfs-agent.sinks.k1.hdfs.filePrefix = upload-
#是否按照时间滚动文件夹
taildir-hdfs-agent.sinks.k1.hdfs.round = true
#多少时间单位创建一个新的文件夹
taildir-hdfs-agent.sinks.k1.hdfs.roundValue = 1
#重新定义时间单位
taildir-hdfs-agent.sinks.k1.hdfs.roundUnit = hour
#是否使用本地时间戳
taildir-hdfs-agent.sinks.k1.hdfs.useLocalTimeStamp = true
#积攒多少个 Event 才 flush 到 HDFS 一次
taildir-hdfs-agent.sinks.k1.hdfs.batchSize = 100
#设置文件类型,可支持压缩
taildir-hdfs-agent.sinks.k1.hdfs.fileType = DataStream
#多久生成一个新的文件
taildir-hdfs-agent.sinks.k1.hdfs.rollInterval = 60
#设置每个文件的滚动大小大概是 128M
taildir-hdfs-agent.sinks.k1.hdfs.rollSize = 134217700
#文件的滚动与 Event 数量为3
taildir-hdfs-agent.sinks.k1.hdfs.rollCount = 3

#channel的配置
taildir-hdfs-agent.channels.c1.type = file
taildir-hdfs-agent.channels.c1.checkpointDir = /home/hadoop/data/checkpoint
taildir-hdfs-agent.channels.c1.dataDirs = /home/hadoop/data/flume-data
taildir-hdfs-agent.channels.c1.capacity = 10000000
taildir-hdfs-agent.channels.c1.transactionCapacity = 5000

#用channel链接source和sink
taildir-hdfs-agent.sources.r1.channels = c1
taildir-hdfs-agent.sinks.k1.channel =c1

启动命令

(启动前要确保监控目录已经创建)

1
2
3
4
5
flume-ng agent \
--name taildir-hdfs-agent \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/taildir-hdfs-agent.conf \
-Dflume.root.logger=INFO,console

测试

1
2
3
4
5
6
7
8
9
10
11
[hadoop@hadoop001 data]$ echo "1" >> /home/hadoop/data/bd/bd.log
[hadoop@hadoop001 data]$ echo "2" >> /home/hadoop/data/bd/bd.log
[hadoop@hadoop001 data]$ echo "3" >> /home/hadoop/data/bd/bd.log
[hadoop@hadoop001 data]$ echo "4" >> /home/hadoop/data/bd/bd1.log
[hadoop@hadoop001 data]$ echo "5" >> /home/hadoop/data/bd/bd1.log
[hadoop@hadoop001 data]$ echo "6" >> /home/hadoop/data/bd/bd1.log
[hadoop@hadoop001 data]$ echo "7" >> /home/hadoop/data/bd/bd2.log
[hadoop@hadoop001 data]$ echo "8" >> /home/hadoop/data/bd/bd2.log
[hadoop@hadoop001 data]$ echo "9" >> /home/hadoop/data/bd/bd2.log
[hadoop@hadoop001 data]$ echo "10" >> /home/hadoop/data/bd/bd3.log
[hadoop@hadoop001 data]$ echo "11" >> /home/hadoop/data/bd/bd3.log
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[hadoop@hadoop001 data]$ ll /home/hadoop/data/flume/meta
total 0
-rw-rw-r--. 1 hadoop hadoop 0 May 19 16:45 taildir_position.json
[hadoop@hadoop001 data]$ ll /home/hadoop/data/checkpoint
total 78152
-rw-rw-r--. 1 hadoop hadoop 80008232 May 19 16:46 checkpoint
-rw-rw-r--. 1 hadoop hadoop 37 May 19 16:46 checkpoint.meta
-rw-rw-r--. 1 hadoop hadoop 32 May 19 16:46 inflightputs
-rw-rw-r--. 1 hadoop hadoop 32 May 19 16:46 inflighttakes
-rw-rw-r--. 1 hadoop hadoop 0 May 19 16:45 in_use.lock
drwxrwxr-x. 2 hadoop hadoop 4096 May 19 16:45 queueset
1652974981413 6f 6b 0d
[hadoop@hadoop001 conf]$ hadoop fs -text /user/hadoop/data/flume/events/20220519/17/*
2022-05-19 17:56:32,881 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
2022-05-19 17:56:32,893 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 26dc7b4620ff16bb6f1fdd48f915ce5fb8222d6f]
1
2
3
7
4
5
6
8
9
10
11
1
2
3
4
[hadoop@hadoop001 data]$ echo "12" >> /home/hadoop/data/bd/bd.log
[hadoop@hadoop001 data]$ echo "13" >> /home/hadoop/data/bd/bd.log
[hadoop@hadoop001 data]$ cat /home/hadoop/data/flume/meta/taildir_position.json
[{"inode":1267448,"pos":12,"file":"/home/hadoop/data/bd/bd.log"},{"inode":1267450,"pos":6,"file":"/home/hadoop/data/bd/bd1.log"},{"inode":1267451,"pos":6,"file":"/home/hadoop/data/bd/bd2.log"},{"inode":1267449,"pos":6,"file":"/home/hadoop/data/bd/bd3.log"}][hadoop@hadoop001 data]$

终止agent,输入数据

1
2
3
4
5
[hadoop@hadoop001 data]$ echo "14" >> /home/hadoop/data/bd/bd3.log
[hadoop@hadoop001 data]$ echo "15" >> /home/hadoop/data/bd/bd3.log
[hadoop@hadoop001 data]$ echo "16" >> /home/hadoop/data/bd/bd4.log
[hadoop@hadoop001 data]$ cat /home/hadoop/data/flume/meta/taildir_position.json
[{"inode":1267448,"pos":12,"file":"/home/hadoop/data/bd/bd.log"},{"inode":1267450,"pos":6,"file":"/home/hadoop/data/bd/bd1.log"},{"inode":1267451,"pos":6,"file":"/home/hadoop/data/bd/bd2.log"},{"inode":1267449,"pos":6,"file":"/home/hadoop/data/bd/bd3.log"}]

重启agent测试

1
2
3
4
5
6
[hadoop@hadoop001 data]$ echo "17" >> /home/hadoop/data/bd/bd4.log
[hadoop@hadoop001 data]$ echo "18" >> /home/hadoop/data/bd/bd4.log
[hadoop@hadoop001 data]$ cat /home/hadoop/data/flume/meta/taildir_position.json
[{"inode":1267448,"pos":12,"file":"/home/hadoop/data/bd/bd.log"},
{"inode":1267450,"pos":6,"file":"/home/hadoop/data/bd/bd1.log"},{"inode":1267451,"pos":6,"file":"/home/hadoop/data/bd/bd2.log"},{"inode":1267449,"pos":12,"file":"/home/hadoop/data/bd/bd3.log"},{"inode":1267439,"pos":9,"file":"/home/hadoop/data/bd/bd4.log"}]

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[hadoop@hadoop001 conf]$ hadoop fs -text /user/hadoop/data/flume/events/20220519/17/*
2022-05-19 17:56:32,881 INFO lzo.GPLNativeCodeLoader: Loaded native gpl library from the embedded binaries
2022-05-19 17:56:32,893 INFO lzo.LzoCodec: Successfully loaded & initialized native-lzo library [hadoop-lzo rev 26dc7b4620ff16bb6f1fdd48f915ce5fb8222d6f]
1
2
3
7
4
5
6
8
9
10
11
12
13
14
15
16
17
18

一切正常,数据无丢失无重复,支持断点续传。

Flume Channel Selectors

Flume 支持扇出从一个 source 到多个 channel 的流量. 扇出有两种模式 : 复制和多路复用.。

replicating selector

在复制流程中,event 将发送到所有已配置的 channel。在多路复用的情况下, event 仅被发送到合格 channels 的子集。为了散开流量,需要指定 source 的 channel 列表以及扇出它的策略。 这是通过添加可以复制或多路复用的 channel”选择器” 来完成的。如果它是多路复用器, 则进一步指定选择规则。如果您没有指定选择器,那么默认情况下它会复制:

1
2
3
4
5
6
7
a1.sources = r1
a1.channels = c1 c2 c3

a1.sources.r1.selector.type = replicating
a1.sources.r1.channels = c1 c2 c3
#这意味着c3是可选的,向c3写入失败会被忽略。但是向c1,c2写入失败会出错
a1.sources.r1.selector.optional = c3

上面这个例子中没有声明sink,c3配置成了可选的。向c3发送数据如果失败了会被忽略。c1和c2没有配置成可选的,向c1和c2写数据失败会导致事务失败回滚。

multiplexing selector

多路复用选择具有另一组属性以分流。 这需要指定 event 属性到 channel 集的映射。 选择器检查 event 头中的每个已配置属性。 如果它与指定的值匹配,则该 event 将发送到映射到该值的所有 channel。如果没有匹配项,则将 event 发送到配置为默认值的 channel 集:

1
2
3
4
5
6
7
8
# Mapping for multiplexing selector
<Agent>.sources.<Source1>.selector.type = multiplexing
<Agent>.sources.<Source1>.selector.header = <someHeader>
<Agent>.sources.<Source1>.selector.mapping.<Value1> = <Channel1>
<Agent>.sources.<Source1>.selector.mapping.<Value2> = <Channel1> <Channel2>
<Agent>.sources.<Source1>.selector.mapping.<Value3> = <Channel2>
#...
<Agent>.sources.<Source1>.selector.default = <Channel2>

映射允许为每个值重叠 channel。

以下示例具有多路复用到两个路径的单个流。 名为 agent_foo 的 agent 具有单个 avrosource 和两个链接到两个 sink 的 channel:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# list the sources, sinks and channels in the agent
agent_foo.sources = avro-AppSrv-source1
agent_foo.sinks = hdfs-Cluster1-sink1 avro-forward-sink2
agent_foo.channels = mem-channel-1 file-channel-2
# set channels for source
agent_foo.sources.avro-AppSrv-source1.channels = mem-channel-1 file-channel-2
# set channel for sinks
agent_foo.sinks.hdfs-Cluster1-sink1.channel = mem-channel-1
agent_foo.sinks.avro-forward-sink2.channel = file-channel-2
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

选择器检查名为 “State” 的标头。 如果该值为 “CA”,则将其发送到 mem-channel-1,如果其为 “AZ”,则将其发送到文件 channel-2,或者如果其为 “NY” 则为两者。 如果 “状态” 标题未设置或与三者中的任何一个都不匹配,则它将转到 mem-channel-1,其被指定为 “default”。

选择器还支持可选 channel。 要为标头指定可选 channel,可通过以下方式使用 config 参数 “optional”:

1
2
3
4
5
6
7
8
9
# channel selector configuration
agent_foo.sources.avro-AppSrv-source1.selector.type = multiplexing
agent_foo.sources.avro-AppSrv-source1.selector.header = State
agent_foo.sources.avro-AppSrv-source1.selector.mapping.CA = mem-channel-1
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.NY = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.optional.CA = mem-channel-1 file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.mapping.AZ = file-channel-2
agent_foo.sources.avro-AppSrv-source1.selector.default = mem-channel-1

选择器将首先尝试写入所需的 channel,如果其中一个 channel 无法使用 event ,则会使事务失败。 在所有渠道上重新尝试交易。 一旦所有必需的 channel 消耗了 event ,则选择器将尝试写入可选 channel。 任何可选 channel 使用该 event 的失败都会被忽略而不会重试。

如果可选信道与特定报头的所需信道之间存在重叠,则认为该信道是必需的,并且信道中的故障将导致重试所有必需信道集。 例如,在上面的示例中,对于标题 “CA”,mem-channel-1 被认为是必需的 channel,即使它被标记为必需和可选,并且写入此 channel 的失败将导致该 event 在为选择器配置的所有 channel 上重试。

请注意,如果标头没有任何所需的 channel,则该 event 将被写入默认 channel,并将尝试写入该标头的可选 channel。 如果未指定所需的 channel,则指定可选 channel 仍会将 event 写入默认 channel。 如果没有将 channel 指定为默认 channel 且没有必需 channel,则选择器将尝试将 event 写入可选 channel。 在这种情况下,任何失败都会被忽略。

自定义选择器
自定义选择器就是你可以自己写一个org.apache.flume.ChannelSelector接口的实现类。老规矩,你自己写的实现类以及依赖的jar包在启动时候都必须放入Flume的classpath。

1
2
3
4
a1.sources = r1
a1.channels = c1

a1.sources.r1.selector.type = com.lxk.flume.custom.BalanceChannelSelector

要自定义自己的channel 选择器,比如上面的负载均衡的channel选择器,因为上面系统提供的2个原生的选择器要么全复制,要么选择性的改变数据流向,现在想增加channel数量,缓解压力,数据就需要均衡的发布到声明的n个channel里面去。要自定义,就得了解这个选择器的实现。channel 是在 agent 上暂存 event 的缓冲池。 event由source添加,由sink消费后删除。

案例七:Flume Channel Multiplexing Selectors

1
2
3
4
5
6
7
8
9
10
11
12
13
Flume多路复用:
  同一时刻,source只能传输给一个channel,source是通过 event header 来决定传输到哪一个 channel。
  即:Flume 多路复用,需配合Static Interceptor来使用。比如:从A端口过来的数据,key=from,value=A,logger输出;从B端口过来的数据,key=from,value=B,hdfs输出,此时就可以使用Flume多路复用,通过event header来决定传输到哪个Channel)

案例:
  Flume-1 监听 44444端口;
  Flume-2 监听 44441端口,avro sink 输出到 44444端口;
  Flume-3 监听 44442端口,avro sink 输出到 44444端口;

选型:
 Flume-1:avro source + memory channel + Multiplexing Channel Selector(多路复用渠道选择器) + logger sink + hdfs sink
 Flume-2:netcat source + memory channel + avro sink
 Flume-3:netcat source + memory channel + avro sink

配置文件

multiplexing-flume-1.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
# Name the components on this agent
multiplexing-flume-1.sources = r1
multiplexing-flume-1.sinks = k1 k2
multiplexing-flume-1.channels = c1 c2

# Describe/configure the source
multiplexing-flume-1.sources.r1.type = avro
multiplexing-flume-1.sources.r1.bind = localhost
multiplexing-flume-1.sources.r1.port = 44444

# Multiplexing Channel Selector configure(多路复用通道选择器配置)
multiplexing-flume-1.sources.r1.selector.type = multiplexing
multiplexing-flume-1.sources.r1.selector.header = from
multiplexing-flume-1.sources.r1.selector.mapping.A = c1
multiplexing-flume-1.sources.r1.selector.mapping.B = c2

# Describe the sink
multiplexing-flume-1.sinks.k1.type = logger

multiplexing-flume-1.sinks.k2.type = hdfs
multiplexing-flume-1.sinks.k2.hdfs.path = data/flume/events/%y-%m-%d/%H-%M
multiplexing-flume-1.sinks.k2.hdfs.filePrefix = multiplexing-
multiplexing-flume-1.sinks.k2.hdfs.round = true
multiplexing-flume-1.sinks.k2.hdfs.roundValue = 1
multiplexing-flume-1.sinks.k2.hdfs.roundUnit = minute
multiplexing-flume-1.sinks.k2.hdfs.useLocalTimeStamp = true

# Use a channel which buffers events in memory
multiplexing-flume-1.channels.c1.type = memory
multiplexing-flume-1.channels.c1.capacity = 1000
multiplexing-flume-1.channels.c1.transactionCapacity = 100

multiplexing-flume-1.channels.c2.type = memory
multiplexing-flume-1.channels.c2.capacity = 1000
multiplexing-flume-1.channels.c2.transactionCapacity = 100

# Bind the source and sink to the channel
multiplexing-flume-1.sources.sources.r1.channels = c1 c2
multiplexing-flume-1.sources.sinks.k1.channel = c1
multiplexing-flume-1.sources.sinks.k2.channel = c2

multiplexing-flume-2.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Name the components on this agent
multiplexing-flume-2.sources = r1
multiplexing-flume-2.sinks = k1
multiplexing-flume-2.channels = c1

# Describe/configure the source
multiplexing-flume-2.sources.r1.type = netcat
multiplexing-flume-2.sources.r1.bind = localhost
multiplexing-flume-2.sources.r1.port = 44441
multiplexing-flume-2.sources.r1.interceptors = i1
multiplexing-flume-2.sources.r1.interceptors.i1.type = static
multiplexing-flume-2.sources.r1.interceptors.i1.key = from
multiplexing-flume-2.sources.r1.interceptors.i1.value = A

# Use a channel which buffers events in memory
multiplexing-flume-2.channels.c1.type = memory

# Describe the sink
multiplexing-flume-2.sinks.k1.type = avro
multiplexing-flume-2.sinks.k1.hostname = localhost
multiplexing-flume-2.sinks.k1.port = 44444

# Bind the source and sink to the channel
multiplexing-flume-2.sources.r1.channels = c1
multiplexing-flume-2.sinks.k1.channel = c1

multiplexing-flume-3.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# Name the components on this agent
multiplexing-flume-3.sources = r1
multiplexing-flume-3.sinks = k1
multiplexing-flume-3.channels = c1

# Describe/configure the source
multiplexing-flume-3.sources.r1.type = netcat
multiplexing-flume-3.sources.r1.bind = localhost
multiplexing-flume-3.sources.r1.port = 44442
multiplexing-flume-3.sources.r1.interceptors = i1
multiplexing-flume-3.sources.r1.interceptors.i1.type = static
multiplexing-flume-3.sources.r1.interceptors.i1.key = from
multiplexing-flume-3.sources.r1.interceptors.i1.value = B

# Use a channel which buffers events in memory
multiplexing-flume-3.channels.c1.type = memory

# Describe the sink
multiplexing-flume-3.sinks.k1.type = avro
multiplexing-flume-3.sinks.k1.hostname = localhost
multiplexing-flume-3.sinks.k1.port = 44444

# Bind the source and sink to the channel
multiplexing-flume-3.sources.r1.channels = c1
multiplexing-flume-3.sinks.k1.channel = c1

启动

启动顺序:multiplexing-flume-1,multiplexing-flume-2、multiplexing-flume-3

窗口1 multiplexing-flume-1

1
2
3
4
5
flume-ng agent \
--name multiplexing-flume-1 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/multiplexing-flume-1.conf \
-Dflume.root.logger=INFO,console

窗口2 multiplexing-flume-2

1
2
3
4
5
flume-ng agent \
--name multiplexing-flume-2 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/multiplexing-flume-2.conf \
-Dflume.root.logger=INFO,console

窗口3 multiplexing-flume-3

1
2
3
4
5
flume-ng agent \
--name multiplexing-flume-3 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/multiplexing-flume-3.conf \
-Dflume.root.logger=INFO,console

测试

窗口4

1
2
3
4
5
6
7
8
9
10
11
12
[root@hadoop001 ~]# telnet localhost 44441
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
1
OK
2
OK
3
OK

窗口5

1
2
3
4
5
6
7
8
9
10
11
12
[root@hadoop001 ~]# telnet localhost 44442
Trying ::1...
telnet: connect to address ::1: Connection refused
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
4
OK
5
OK
6
OK

窗口1

1
2
3
4
5
6
7
8
9
2022-05-19 20:00:47,539 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{from=A} body: 31 0D                                           1. }
2022-05-19 20:00:47,539 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{from=A} body: 32 0D 2. }
2022-05-19 20:00:47,539 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{from=A} body: 33 0D 3. }
2022-05-19 20:00:54,825 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.HDFSSequenceFile.configure(HDFSSequenceFile.java:63)] writeFormat = Writable, UseRawLocalFileSystem = false
2022-05-19 20:00:55,007 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:246)] Creating data/flume/events/22-05-19/20-00/multiplexing-.1652990454821.tmp
2022-05-19 20:00:55,231 (hdfs-k2-call-runner-0) [WARN - org.apache.hadoop.util.NativeCodeLoader.<clinit>(NativeCodeLoader.java:60)] Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-05-19 20:01:26,692 (hdfs-k2-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink$1.run(HDFSEventSink.java:393)] Writer callback called.
2022-05-19 20:01:26,693 (hdfs-k2-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doClose(BucketWriter.java:438)] Closing data/flume/events/22-05-19/20-00/multiplexing-.1652990454821.tmp
2022-05-19 20:01:26,716 (hdfs-k2-call-runner-6) [INFO - org.apache.flume.sink.hdfs.BucketWriter$7.call(BucketWriter.java:681)] Renaming data/flume/events/22-05-19/20-00/multiplexing-.1652990454821.tmp to data/flume/events/22-05-19/20-00/multiplexing-.1652990454821

logger输出c1结果

1
2
3
 [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{from=A} body: 31 0D                                           1. }
2022-05-19 20:00:47,539 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{from=A} body: 32 0D 2. }
2022-05-19 20:00:47,539 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{from=A} body: 33 0D 3. }

hdfs查看c2结果

1
2
3
4
5
6
[hadoop@hadoop001 ~]$ hadoop fs -text data/flume/events/22-05-19/20-00/multiplexing-.1652990454821
2022-05-19 20:03:35,791 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
1652990456695 34 0d
1652990456698 35 0d
1652990456699 36 0d
[hadoop@hadoop001 ~]$

测试成功。

Flume Sink Processors

接收组允许用户将多个 sink 分组到一个实体中. sink 处理器可用于在组内的所有 sink 上提供负载平衡功能, 或在时间故障的情况下实现从一个 sink 到另一个 sink 的故障转移.

必需属性以粗体显示

属性名称 默认 描述
sinks - 以空格分隔的参与组的 sink 列表
processor.type default 组件 type 名称需要是 default,failover 或 load_balance
1
2
3
4
Example for agent named a1:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

Default Sink Processor

默认 sink 只接受一个 sink. 用户不必为单个 sink 创建处理器(sink 组). 相反, 用户可以遵循本用户指南中上面解释的 source - channel - sink 模式

Failover Sink Processor

故障转移 sink 维护一个优先级的 sink 列表, 保证只要有一个可用的 event 将被处理(传递)

故障转移机制的工作原理是将故障 sink 降级到池中, 在池中为它们分配一个冷却期, 在重试之前随顺序故障而增加. sink 成功发送 event 后, 它将恢复到实时池. sink 具有与之相关的优先级, 数量越大, 优先级越高. 如果在发送 event 时 sink 发生故障, 则接下来将尝试下一个具有最高优先级的 sink 以发送 event . 例如, 在优先级为 80 的 sink 之前激活优先级为 100 的 sink. 如果未指定优先级, 则根据配置中指定 sink 的顺序确定 Sinks 优先级.

要进行配置, 请将 sink 组处理器设置为故障转移并为所有单个 sink 设置优先级. 所有指定的优先级必须是唯一的 此外, 可以使用 maxpenalty 属性设置故障转移时间的上限(以毫秒为单位)

必需属性以粗体显示

属性名称 默认 描述
sinks - 以空格分隔的参与组的 sink 列表
processor.type default 组件 type 名称需要进行故障转移 & nbsp; failover
processor.priority.<sinkName> - 优先价值。 必须是与当前 sink 组关联的 sink 实例之一。较高优先级值 Sink 较早被激活。绝对值越大表示优先级越高
processor.maxpenalty 30000 失败的 sink 的最大退避时间(以毫秒为单位)
1
2
3
4
5
6
7
Example for agent named a1:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000

Load balancing Sink Processor(负载均衡接收处理器)

Load balancing Sink Processor提供了在多个 sink 上进行负载均衡流量的功能. 它维护一个索引的活动 sink 列表, 必须在其上分配负载. 实现支持使用 round_robin 或 random 机制分配负载. 选择机制的选择默认为 round_robin 类型, 但可以通过配置覆盖. 通过从 AbstractSinkSelector 继承的自定义类支持自定义选择机制.

调用时, 此选择器使用其配置的选择机制选择下一个 sink 并调用它. 对于 round_robin 和 random 如果所选的 sink 无法传递 event , 则处理器通过其配置的选择机制选择下一个可用的 sink. 此实现不会将失败的 sink 列入黑名单, 而是继续乐观地尝试每个可用的 sink. 如果所有 sink 调用都导致失败, 则选择器将故障传播到 sink 运行器.

如果启用了 backoff , 则 sink 处理器会将失败的 sink 列入黑名单, 将其删除以供给定超时的选择. 当超时结束时, 如果 sink 仍然没有响应, 则超时会以指数方式增加, 以避免在无响应的 sink 上长时间等待时卡住. 在禁用此功能的情况下, 在循环中, 所有失败的 sink 负载将被传递到下一个 sink, 因此不均衡

必需属性以粗体显示。

属性名称 默认 描述
processor.sink - 以空格分隔的参与组的 sink 列表
processor.type default 组件 type 名称需要是 load_balance
processor.backoff false 失败的 sink 是否会以指数方式退回。
processor.selector round_robin 选择机制。必须是 round_robin,random 或自定义类的 FQCN,它继承自 AbstractSinkelector
processor.selector.maxTimeOut 30000 由退避选择器用于限制指数 backoff (以毫秒为单位)
1
2
3
4
5
6
Example for agent named a1:
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random

Custom Sink Processor

目前不支持自定义 sink 处理器

案例八:故障转移(Failover Sink Processor)

1
2
3
4
5
6
7
8
需求:
  使用 Flume-1 监控文件变动,Flume-1 将变动内容传递给Flume-2,Flume-2 负责打印到控制台
  如果 Flume-2 挂掉,Flume-1 将变动内容传递给Flume-3,Flume-3 负责打印到控制台。

选型:
 Flume-1:taildir source + memory channel + avro sink + Failover Sink Processor(故障转移)
 Flume-2:avro source + memory channel + logger sink
 Flume-3:avro source + memory channel + logger sink

配置文件

Flume-Failover-01.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
# Name the components on this agent
Flume-Failover-01.sources = r1
Flume-Failover-01.sinks = k1 k2
Flume-Failover-01.channels = c1
Flume-Failover-01.sinkgroups = g1

# Failover configure
Flume-Failover-01.sinkgroups.g1.processor.type = failover
Flume-Failover-01.sinkgroups.g1.processor.priority.k1 = 5
Flume-Failover-01.sinkgroups.g1.processor.priority.k2 = 10
Flume-Failover-01.sinkgroups.g1.processor.maxpenalty = 10000

# Describe/configure the source
Flume-Failover-01.sources.r1.type = TAILDIR
Flume-Failover-01.sources.r1.positionFile = /home/hadoop/data/flume/meta/taildir_position.json
Flume-Failover-01.sources.r1.filegroups = f1
Flume-Failover-01.sources.r1.filegroups.f1 = /home/hadoop/data/flume/test.log

# Describe the sink
Flume-Failover-01.sinks.k1.type = avro
Flume-Failover-01.sinks.k1.hostname = localhost
Flume-Failover-01.sinks.k1.port = 44441

Flume-Failover-01.sinks.k2.type = avro
Flume-Failover-01.sinks.k2.hostname = localhost
Flume-Failover-01.sinks.k2.port = 44442

# Use a channel which buffers events in memory
Flume-Failover-01.channels.c1.type = memory
Flume-Failover-01.channels.c1.capacity = 1000
Flume-Failover-01.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
Flume-Failover-01.sinkgroups.g1.sinks = k1 k2
Flume-Failover-01.sources.r1.channels = c1
Flume-Failover-01.sinks.k1.channel = c1
Flume-Failover-01.sinks.k2.channel = c1

Flume-Failover-02.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Name the components on this agent
Flume-Failover-02.sources = r1
Flume-Failover-02.sinks = k1
Flume-Failover-02.channels = c1

# Describe/configure the source
Flume-Failover-02.sources.r1.type = avro
Flume-Failover-02.sources.r1.bind = localhost
Flume-Failover-02.sources.r1.port = 44441

# Describe the sink
Flume-Failover-02.sinks.k1.type = logger

# Use a channel which buffers events in memory
Flume-Failover-02.channels.c1.type = memory
Flume-Failover-02.channels.c1.capacity = 1000
Flume-Failover-02.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
Flume-Failover-02.sources.r1.channels = c1
Flume-Failover-02.sinks.k1.channel = c1

Flume-Failover-03.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Name the components on this agent
Flume-Failover-03.sources = r1
Flume-Failover-03.sinks = k1
Flume-Failover-03.channels = c1

# Describe/configure the source
Flume-Failover-03.sources.r1.type = avro
Flume-Failover-03.sources.r1.bind = localhost
Flume-Failover-03.sources.r1.port = 44442

# Describe the sink
Flume-Failover-03.sinks.k1.type = logger

# Use a channel which buffers events in memory
Flume-Failover-03.channels.c1.type = memory
Flume-Failover-03.channels.c1.capacity = 1000
Flume-Failover-03.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
Flume-Failover-03.sources.r1.channels = c1
Flume-Failover-03.sinks.k1.channel = c1

启动

启动顺序:Flume-Failover-02、Flume-Failover-03,Flume-Failover-01

窗口2 Flume-Failover-02

1
2
3
4
5
flume-ng agent \
--name Flume-Failover-02 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/Flume-Failover-02.conf \
-Dflume.root.logger=INFO,console

窗口3 Flume-Failover-03

1
2
3
4
5
flume-ng agent \
--name Flume-Failover-03 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/Flume-Failover-03.conf \
-Dflume.root.logger=INFO,console

窗口1 Flume-Failover-01

1
2
3
4
5
flume-ng agent \
--name Flume-Failover-01 \
--conf $FLUME_HOME/conf \
--conf-file $FLUME_HOME/conf/Flume-Failover-01.conf \
-Dflume.root.logger=INFO,console

测试

3个agent启动成功后,正常运行,向监控文件写入数据

1
2
3
4
[hadoop@hadoop001 ~]$ echo "hello world" >> /home/hadoop/data/flume/test.log
[hadoop@hadoop001 ~]$ echo "1" >> /home/hadoop/data/flume/test.log
[hadoop@hadoop001 ~]$ echo "2" >> /home/hadoop/data/flume/test.log
[hadoop@hadoop001 ~]$ echo "3" >> /home/hadoop/data/flume/test.log

优先级问题,发现 窗口3的Flume-Failover-03会接收到数据,logger sink 输出至控制台,而Flume-Failover-02没有。

1
2
3
4
5
2022-05-19 23:12:27,708 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x14cc1069, /127.0.0.1:33077 => /127.0.0.1:44442] CONNECTED: /127.0.0.1:33077
2022-05-19 23:13:24,482 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }
2022-05-19 23:14:02,488 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 31 1 }
2022-05-19 23:14:04,416 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 32 2 }
2022-05-19 23:14:09,421 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 33 3 }

此时,终止Flume-Failover-03服务,然后继续向监控文件写入数据。

1
2
[hadoop@hadoop001 ~]$ echo "4" >> /home/hadoop/data/flume/test.log
[hadoop@hadoop001 ~]$ echo "5" >> /home/hadoop/data/flume/test.log

可见窗口2的Flume-Failover-02接受到打印数据。

1
2
3
2022-05-19 23:12:27,430 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0xfc8b75d1, /127.0.0.1:56336 => /127.0.0.1:44441] CONNECTED: /127.0.0.1:56336
2022-05-19 23:17:12,485 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 34 4 }
2022-05-19 23:17:12,485 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 35 5 }

再重启Flume-Failover-03,向监控文件写入数据。

1
2
[hadoop@hadoop001 ~]$ echo "6" >> /home/hadoop/data/flume/test.log
[hadoop@hadoop001 ~]$ echo "7" >> /home/hadoop/data/flume/test.log

窗口3的Flume-Failover-03接收到打印数据,而Flume-Failover-02没有,恢复原样。

1
2
3
2022-05-19 23:18:46,590 (New I/O worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x85849a45, /127.0.0.1:33086 => /127.0.0.1:44442] CONNECTED: /127.0.0.1:33086
2022-05-19 23:18:50,233 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 36 6 }
2022-05-19 23:18:50,234 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:95)] Event: { headers:{} body: 37 7 }

小结

Flume-2 sink优先级为5,Flume-3 sink优先级为10,所以 Flume-3 优先接收 sink 数据。此时手动将 Flume-3 关停,Flume-1 后台日志会报Caused by: java.io.IOException: Error connecting to /192.168.204.203:41414。然后 Flume-2 开始 sink数据。这就是故障转移

案例九:负载均衡(Load balancing Sink Processor)

配置文件

负载均衡只有sinkgroups配置与故障转移不同,其余都相同,配置如下:

1
2
3
Flume-Failover-01.sinkgroups.g1.processor.type = load_balance
Flume-Failover-01.sinkgroups.g1.processor.backoff = true
Flume-Failover-01.sinkgroups.g1.processor.selector = random

Flume-2 和 Flume-3 根据配置的负载均衡策略a1.sinkgroups.g1.processor.selector 进行负载,如果Flume-3关闭了,name所有sink都到 Flume-2。这就是Flume负载均衡。

案例九:使用log4j将数据流入flume

log4j+flume

log4j和flume整合,官方提供了两种appender将log4j的日志写入flume,分别是Log4J AppenderLoad Balancing Log4J Appender

Log4J Appender

Log4J Appender将log数据发送到flume的一个avro source中,在flume中可以根据需求在下游接不同的sink。

Log4j Appender使用时,有以下的配置参数(加粗的是必须的):

参数名 默认值 描述
Hostname source的host地址,如:110.110.110.100
Port source的监听端口,如:9999
UnsafeMode false 如果为true,则添加程序不会在发送事件失败时引发异常。
AvroReflectionEnabled false 使用Avro Reflection序列化Log4j事件。
AvroSchemaUrl avro schema的url地址

Load Balancing Log4J Appender

将log数据发送到flume的多个avro source中。实现负载均衡。

使用时,有以下的配置参数(加粗的是必须的):

参数名 默认值 描述
Hosts sources的host:port。是以空格分隔的。如:10.10.10.10:9999 10.10.10.11:9999
Selector ROUND_ROBIN 选择机制。必须为ROUND_ROBIN,RANDOM或自定义FQDN。
MaxBackoff 表示负载均衡客户端将从未能消耗事件的节点退出的最长时间(以毫秒为单位)。
UnsafeMode false 如果为true,则添加程序不会在发送事件失败时引发异常。
AvroReflectionEnabled false 使用Avro Reflection序列化Log4j事件。.
AvroSchemaUrl avro schema的url地址

Load Balancing Log4J Appender相当于是实现了多个Log4J Appender来实现负载均衡。在flume端,
Load Balancing Log4J Appender需要配置多个avro source来监听输入。

具体实现

pom依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<!-- https://mvnrepository.com/artifact/org.apache.flume.flume-ng-clients/flume-ng-log4jappender -->
<dependency>
<groupId>org.apache.flume.flume-ng-clients</groupId>
<artifactId>flume-ng-log4jappender</artifactId>
<version>1.9.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.flume/flume-ng-sdk -->
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-sdk</artifactId>
<version>1.9.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>

测试类

1
2
3
4
5
6
7
8
9
10
package com.upupfeng;

import org.apache.log4j.Logger;

public class Log4j2Flume {
public static void main(String[] args) {
Logger logger = Logger.getLogger(Log4j2Flume.class);
logger.info("test");
}
}

log4j.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
log4j.rootLogger=debug,stdout,flume

# 输出到控制台
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target=System.out
log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern =[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %-5p %l - %m%n

# Log4j Appender
log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.flume.Hostname=localhost
log4j.appender.flume.Port=44444
log4j.appender.flume.UnsafeMode=true
log4j.appender.flume.layout=org.apache.log4j.PatternLayout
log4j.appender.flume.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %-5p %l - %m%n

# Load Balancing Log4J Appender
log4j.appender.flume2 = org.apache.flume.clients.log4jappender.LoadBalancingLog4jAppender
log4j.appender.flume2.Hosts = 192.168.168.200:9001 192.168.168.200:9002 192.168.168.200:9003
log4j.appender.flume2.Selector = ROUND_ROBIN
log4j.appender.flume2.MaxBackoff = 30000
log4j.appender.flume2.UnsafeMode = true
log4j.appender.flume2.Threshold=ERROR
log4j.appender.flume2.layout=org.apache.log4j.PatternLayout
log4j.appender.flume2.layout.ConversionPattern=[%d{yyyy-MM-dd HH:mm:ss.SSS}] [%thread] %-5p %l - %m%n

flume-conf.properties

agent的配置。Log4J Appender只需要配置一个agent;Load Balancing Log4J Appender要配置多个。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = avro
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
## 事件容量
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

对于Log4j Appender的方式,启动一个agent等待接收,运行代码即可在flume的sink端获得数据。

对于Load Balancing Log4J Appender的方式,启动多个agent等待接收,进行负载均衡的接收数据。