当前位置: 首页 > 图灵资讯 > 技术篇> 【Flume】高级组件之Sink Processors及项目实践(Sink负载均衡和故障转移)

【Flume】高级组件之Sink Processors及项目实践(Sink负载均衡和故障转移)

来源:图灵教育
时间:2023-06-26 15:46:05

文章目录
  • 1. 组件简介
  • 2. 项目实践
  • 2.1 负载均衡
  • 2.1.1 需求
  • 2.1.2 配置
  • 2.1.3 运行
  • 2.2 故障转移
  • 2.2.1 需求
  • 2.2.2 配置
  • 2.2.3 运行

1. 组件简介

Sink Processors类型包括这三种:Default Sink Processor、Load balancing Sink Processor和Failover Sink Processor。

  • Default Sink Processor是默认的,不需要配置Sink group,这是我们现在使用的最常见的形式,一个Channel后面的Sink形式;
  • Load balancing Sink Processor是负载均衡处理器,一个channle后面可以连接多个Sink,属于Sink group,轮询或随机发送按指定算法,以减轻单个Sink的压力;
  • Failover Sink Processor是故障转移处理器,多个Sink可以在一个Channle后面连接,多个Sink属于Sink group,根据Sink的优先级,默认情况下,先让优先级高的Sink处理数据。如果Sink出现故障,则使用优先级较低的Sink处理数据,以确保数据不丢失。
2. 项目实践2.1 负载均衡

使用Load balancing Sink Processor,即负载均衡处理器,一个Channle后面可以连接多个Sink,多个Sink属于Sink group,轮询或随机发送按指定算法进行,以减轻单个Sink的压力。其参数为:

  • processor.sinks:指定这个sink groups中有哪些sink,指定sink的名称,多个字中间可以用空格隔开;
  • processor.type:对于负载均衡的sink处理器,这里需要指定load_balance;
  • processor.selector:该参数的值内置支持两个,round_robin和random,round_robin表示轮询,按照sink的顺序轮流处理数据,random表示随机。
  • processor.backoff:默认情况下,当false设置为true时,故障节点将被列入黑名单,数据将在一段时间后再次发送。如果仍然失败,等待时间是指数级增长,直到达到最大时间。如果不打开,故障节点将每次重新发送,如果有故障节点,效率将受到影响;
  • processor.selector.maxTimeOut:默认情况下,最大的黑名单时间为30秒。
2.1.1 需求

收集指定端口的数据,实现两个sink通道的负载平衡,通过轮询发送数据。为了显示实验效果,使用avro sink,每次event写一次数据(默认情况下,积累接收100个数据,再写一次数据)。

2.1.2 配置

【Flume】高级组件之Sink Processors及项目实践(Sink负载均衡和故障转移)_flume

Flumee配置bigData01 Agent:

[root@bigdata01 apache-flume-1.9.0-bin]# cat conf/load-balancing.conf # agent的名字是a1# 指定source组件、channel组件和sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 # source组件配置 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # channel组件配置 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件,[为方便演示效果,将batch-size设置为1] a1.sinks.k1.type=avro a1.sinks.k1.hostname=192.168.152.101 a1.sinks.k1.port=41414 a1.sinks.k1.batch-size = 1 a1.sinks.k2.type=avro a1.sinks.k2.hostname=192.168.152.102 a1.sinks.k2.port=41414 a1.sinks.k2.batch-size = 1 # 配置sink策略 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = load_balance a1.sinkgroups.g1.processor.backoff = true a1.sinkgroups.g1.processor.selector = round_robin # 连接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1

Flumee配置bigData02 Agent:

[root@bigdata02 apache-flume-1.9.0-bin]# cat conf/load-balancing-101.conf # 指定source组件、channel组件和sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # source组件配置 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 # channel组件配置 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件[以区分两个sink组件生成的文件,修改filePrefix的值] a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/load_balancea1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.filePrefix = data101 a1.sinks.k1.hdfs.fileSuffix = .log # 连接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

Flumee配置bigData03 Agent:

[root@bigdata03 apache-flume-1.9.0-bin]# cat conf/load-balancing-102.conf # 指定source组件、channel组件和sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # source组件配置 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 # channel组件配置 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件[以区分两个sink组件生成的文件,修改filePrefix的值] a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/load_balancea1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.filePrefix = data102 a1.sinks.k1.hdfs.fileSuffix = .log # 连接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

2.1.3 运行

首先在bigdata02和bigdata03上启动Agent,最后在bigdata01上启动Agent:

[apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing-101.conf -Dflume.root.logger=INFO,console

[apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing-102.conf -Dflume.root.logger=INFO,console

apache-flume-1.9.0-bin]# bin/flume-ng agent --name a1 --conf conf --conf-file conf/load-balancing.conf -Dflume.root.logger=INFO,console

将数据发送到指定端口,模拟输入:

[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444Tryinging ::1...Connected to localhost.Escape character is '^]'.heheOKhahaOK

检查HDFS中保存的运行结果:

[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -ls -R / -rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txtdrwxr-xr-x   - root supergroup          0 2023-06-22 00:47 /load_balance-rw-r--r--   2 root supergroup          6 2023-06-22 00:47 /load_balance/data101.168766028.log.tmp-rw-r--r--   2 root supergroup          6 2023-06-22 00:47 /load_balance/data102.16876602466.log.tmp[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -cat /load_balance/data101.168766028.log.tmp haha[root@bigdata01 apache-flume-1.9.0-bin]# hdfs dfs -cat /load_balance/data102.16876602466.log.tmphehe

2.2 故障转移

使用Failover Sink Processor,即故障转移处理器,一个channle后面可以连接多个sink,多个sink属于sink group,根据sink的优先级,默认情况下,先让优先级高的sink处理数据。如果sink出现故障,则使用优先级较低的sink处理数据,以确保数据不丢失。其参数为:

  • processor.type:对于故障转移的sink处理器,使用failover;
  • processor.priority.:指定sink group中每个sink组件的优先级,在默认情况下,channel中的数据将被优先级较高的sink取走;
  • processor.maxpenalty:sink发生故障后,最大等待时间。
2.2.1 需求

实现两个sink的故障转移。

2.2.2 配置

【Flume】高级组件之Sink Processors及项目实践(Sink负载均衡和故障转移)_故障转移_02

Flumee配置bigData01 Agent:

[root@bigdata01 conf]# cat failover.conf # agent的名字是a1 # 指定source组件、channel组件和sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 k2 # source组件配置 a1.sources.r1.type = netcat a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 44444 # channel组件配置 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件,[为方便演示效果,将batch-size设置为1] a1.sinks.k1.type = avro a1.sinks.k1.hostname = 192.168.152.101 a1.sinks.k1.port = 41414 a1.sinks.k1.batch-size = 1 a1.sinks.k2.type = avro a1.sinks.k2.hostname = 192.168.152.102 a1.sinks.k2.port = 41414 a1.sinks.k2.batch-size = 1 # 配置sink策略 a1.sinkgroups = g1 a1.sinkgroups.g1.sinks = k1 k2 a1.sinkgroups.g1.processor.type = failover a1.sinkgroups.g1.processor.priority.k1 = 5a1.sinkgroups.g1.processor.priority.k2 = 10a1.sinkgroups.g1.processor.maxpenalty = 10000# 连接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1 a1.sinks.k2.channel = c1

Flumee配置bigData02 Agent:

[root@bigdata02 conf]# cat failover-101.conf # agent的名字是a1 # 指定source组件、channel组件和sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # source组件配置 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 # channel组件配置 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件[以区分两个sink组件生成的文件,修改filePrefix的值] a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/failover a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.filePrefix = data101 a1.sinks.k1.hdfs.fileSuffix = .log # 连接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

Flumee配置bigData03 Agent:

[root@bigdata03 conf]# cat failover-102.conf # agent的名字是a1 # 指定source组件、channel组件和sink组件的名称 a1.sources = r1 a1.channels = c1 a1.sinks = k1 # source组件配置 a1.sources.r1.type = avro a1.sources.r1.bind = 0.0.0.0 a1.sources.r1.port = 41414 # channel组件配置 a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # 配置sink组件[以区分两个sink组件生成的文件,修改filePrefix的值] a1.sinks.k1.type = hdfs a1.sinks.k1.hdfs.path = hdfs://192.168.152.100:9000/failover a1.sinks.k1.hdfs.fileType = DataStream a1.sinks.k1.hdfs.writeFormat = Text a1.sinks.k1.hdfs.rollInterval = 3600 a1.sinks.k1.hdfs.rollSize = 134217728 a1.sinks.k1.hdfs.rollCount = 0 a1.sinks.k1.hdfs.useLocalTimeStamp = true a1.sinks.k1.hdfs.filePrefix = data102a1.sinks.k1.hdfs.fileSuffix = .log # 连接组件 a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

2.2.3 运行
  1. 首先在bigdata02和bigdata03上启动Agent,最后在bigdata01上启动Agent:

bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover-101.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover-102.conf -Dflume.root.logger=INFO,console

bin/flume-ng agent --name a1 --conf conf --conf-file conf/failover.conf -Dflume.root.logger=INFO,console

  1. 将数据发送到指定端口,模拟输入两个数据test1test2

[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444Tryinging ::1...Connected to localhost.Escape character is '^]'.test1OKtest222OK

  1. 检查HDFS中保存的运行结果:

由于bigdata03的优先级高,可以看出两个数据都是由其写入的。

[root@bigdata01 hadoop-3.3.5]# hdfs dfs -ls -R /-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txtdrwxr-xr-x   - root supergroup          0 2023-06-22 09:51 /failover-rw-r--r--   2 root supergroup          7 2023-06-22 09:51 /failover/data102.1687965.log.tmpdrwxr-xr-x   - root supergroup          0 2023-06-22 00:52 /load_balance-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data101.168766028.log-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data102.16876602466.log[root@bigdata01 hadoop-3.3.5]# hdfs dfs -cat /failover/data102.1687965.log.ttmptest22ttest

  1. 关闭bigdata03,然后输入测试数据test3

[root@bigdata01 apache-flume-1.9.0-bin]# telnet localhost 44444Tryinging ::1...Connected to localhost.Escape character is '^]'.tetttestoktest2oktest3OK

  1. 检查HDFS中保存的运行结果:

关闭bigdata03后,数据将由优先级较低的bigdata02写入,以确保数据不丢失,达到故障转移的目的。此时,如果bigdata03再次打开,数据将由bigdata03传输。

[root@bigdata01 hadoop-3.3.5]# hdfs dfs -ls -R /-rw-r--r--   2 root supergroup        175 2023-06-22 00:08 /README.txtdrwxr-xr-x   - root supergroup          0 2023-06-22 09:54 /failover-rw-r--r--   2 root supergroup          7 2023-06-22 09:54 /failover/data101.168739863636.log.tmp-rw-r--r--   2 root supergroup         14 2023-06-22 09:53 /failover/data102.1687965.logdrwxr-xr-x   - root supergroup          0 2023-06-22 00:52 /load_balance-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data101.168766028.log-rw-r--r--   2 root supergroup          6 2023-06-22 00:52 /load_balance/data102.16876602466.log[root@bigdata01 hadoop-3.3.5]# hdfs dfs -cat /failover/data101.168739863636.log.tmptest3