博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
关于flume的几道题
阅读量:5130 次
发布时间:2019-06-13

本文共 15099 字,大约阅读时间需要 50 分钟。

1,要求:监听一个tcp,udp端口41414将数据打印在控制台

 

# example.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = netcata1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 41414# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

启动命令:

bin/flume-ng agent --conf conf/ --conf-file conf/one.conf  --name a1 -Dflume.root.logger=INFO,console &

Telnet:

root@Ubuntu-1:~# telnet 0.0.0.0 41414Trying 0.0.0.0...Connected to 0.0.0.0.Escape character is '^]'.huxingOK

结果:

 

2,要求:将A机器的日志文件access.log传输到机器B上,并打印到控制台上

 

 这里我假设A机器是131,B机器是132,则 需要将配置文件写在132上,然后正常启动132,而131中只需要启动avro_client,通过avro序列化将文件打到132中。

132中的配置文件内容:

# example.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 44444# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

启动132的flume:

bin/flume-ng agent --conf conf/ --conf-file conf/two.conf --name a1 -Dflume.root.logger=INFO,console &

启动131的avro_client:

bin/flume-ng avro-client --host 192.168.22.132 --port 44444 --filename logs/avro.log

查看132控制台:

成功

3,监听一个日志文件access.log,如果有日志追加及时的将数据打印在控制台上,如果是大文件呢?堆?

conf内容:

# example.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /opt/logs/access.log# Describe the sinka1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 10000000a1.channels.c1.transactionCapacity = 1000000# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

启动命令:

bin/flume-ng agent --conf conf/ --conf-file conf/three.conf  --name a1 -Dflume.root.logger=INFO,console &

打文件到控制台:

root@Ubuntu-1:/usr/local/apache-flume/logs# cat hu.log >> avro.log

成功

----------------------------------------------------------------------------------

如果是个很大文件的话怎么办呢?

--将这个文件中的的注释消掉。

4,A,B机器中的access.log汇总到C机器上然后统一收集到hdfs上分天存储。

 在132,135中写入four_avro_sink.conf文件:

# example.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /usr/local/apache-flume/logs/avro.log# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = 192.168.22.131a1.sinks.k1.port = 41414# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1

就是将以exec形式持续的输出最新的数据到sink,再以avro的方式将文件序列化的方式传到131的sink上

启动flume:

root@Ubuntu-135:/usr/local/apache-flume# bin/flume-ng agent --conf conf/ --conf-file conf/four_avro_sink.conf --name a1 -Dflume.root.logger=INFO,console &

 

在131中写入four.conf文件:

#定义agent名, source、channel、sink的名称access.sources = r1access.channels = c1access.sinks = k1#具体定义sourceaccess.sources.r1.type = avroaccess.sources.r1.bind = 0.0.0.0access.sources.r1.port = 41414#具体定义channelaccess.channels.c1.type = memoryaccess.channels.c1.capacity = 1000access.channels.c1.transactionCapacity = 100#定义拦截器,为消息添加时间戳access.sources.r1.interceptors = i1access.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder#具体定义sinkaccess.sinks.k1.type = hdfsaccess.sinks.k1.hdfs.path = hdfs://Ubuntu-1:9000/%Y%m%daccess.sinks.k1.hdfs.filePrefix = events-access.sinks.k1.hdfs.fileType = DataStream#access.sinks.k1.hdfs.fileType = CompressedStream#access.sinks.k1.hdfs.codeC = gzip#不按照条数生成文件access.sinks.k1.hdfs.rollCount = 0#HDFS上的文件达到64M时生成一个文件access.sinks.k1.hdfs.rollSize = 67108864access.sinks.k1.hdfs.rollInterval = 0#组装source、channel、sinkaccess.sources.r1.channels = c1access.sinks.k1.channel = c1

启动Hadoop:

root@Ubuntu-1:/usr/local/hadoop-2.6.0# sbin/start-dfs.sh

启动flume:

root@Ubuntu-1:/usr/local/apache-flume# bin/flume-ng agent --conf conf/ --conf-file conf/four.conf  --name access -Dflume.root.logger=INFO,console &

 

 

5,A,B,机器中的access.log ugcheader.log ugctail.log汇总到C机器上。然后统一收集到HDFS的不同目录上

改成

access.sinks.k1.hdfs.path = hdfs://Ubuntu-1:9000/%{type}/%Y%m%d

 另132中的配置文件:

# example.conf: A single-node Flume configuration# Name the components on this agenta1.sources = r1 r2 r3a1.sinks = k1a1.channels = c1# Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.command = tail -F /usr/local/apache-flume/logs/avro.loga1.sources.r1.interceptors = i1a1.sources.r1.interceptors.i1.type = statica1.sources.r1.interceptors.i1.key = typea1.sources.r1.interceptors.i1.value = accessa1.sources.r2.type = execa1.sources.r2.command = tail -F /usr/local/apache-flume/logs/flume.loga1.sources.r2.interceptors = i2a1.sources.r2.interceptors.i2.type = statica1.sources.r2.interceptors.i2.key = typea1.sources.r2.interceptors.i2.value = ugcheada1.sources.r3.type = execa1.sources.r3.command = tail -F /usr/local/apache-flume/logs/hu.loga1.sources.r3.interceptors = i3a1.sources.r3.interceptors.i3.type = statica1.sources.r3.interceptors.i3.key = typea1.sources.r3.interceptors.i3.value = ugctail# Describe the sinka1.sinks.k1.type = avroa1.sinks.k1.hostname = 192.168.22.131a1.sinks.k1.port = 41414#a1.sinks.k1.type = logger# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channela1.sources.r1.channels = c1a1.sources.r2.channels = c1a1.sources.r3.channels = c1a1.sinks.k1.channel = c1

 

6,access.log收集后指定多个目的地【同时,打印到控制台、输出到HDFS】

 131中:

#定义agent名, source、channel、sink的名称access.sources = r1access.channels = c1 c2access.sinks = k1 k2#具体定义sourceaccess.sources.r1.type = avroaccess.sources.r1.bind = 0.0.0.0access.sources.r1.port = 41414#具体定义channelaccess.channels.c1.type = memoryaccess.channels.c1.capacity = 1000access.channels.c1.transactionCapacity = 100access.channels.c2.type = memoryaccess.channels.c2.capacity = 1000access.channels.c2.transactionCapacity = 100access.sinks.k2.type = logger           !!!!重点是这里的k2!!!!!#定义拦截器,为消息添加时间戳access.sources.r1.interceptors = i1access.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder#具体定义sinkaccess.sinks.k1.type = hdfsaccess.sinks.k1.hdfs.path = hdfs://Ubuntu-1:9000/source/%{type}/%Y%m%daccess.sinks.k1.hdfs.filePrefix = events-access.sinks.k1.hdfs.fileType = DataStream#access.sinks.k1.hdfs.fileType = CompressedStream#access.sinks.k1.hdfs.codeC = gzip#不按照条数生成文件access.sinks.k1.hdfs.rollCount = 0#HDFS上的文件达到64M时生成一个文件access.sinks.k1.hdfs.rollSize = 67108864access.sinks.k1.hdfs.rollInterval = 0#组装source、channel、sinkaccess.sources.r1.channels = c1 c2access.sinks.k1.channel = c1access.sinks.k2.channel = c2

132中还是之前第5题中的配置

 

7,在程序里打印日志到flume根据不同的业务指定不同的目的地【控制台、avro】,查看日志的log4j日志的header

 pom文件:

4.0.0
cn.hx
FlumeSource
1.0-SNAPSHOT
jar
FlumeSource
http://maven.apache.org
UTF-8
1.8
1.8
org.apache.maven.plugins
maven-jar-plugin
cn.hx.test
true
lib/
junit
junit
3.8.1
test
org.apache.hadoop
hadoop-common
2.6.0
org.apache.hadoop
hadoop-client
2.6.0
org.apache.hadoop
hadoop-hdfs
2.6.0
log4j
log4j
1.2.17
log4j
log4j
1.2.17

loj4j文件:

##
##
##
##
##
##
##
##
##
### set log levels ####默认logger#INFO是指级别不小于INFO的日志才会使用stdoutappender。ERROR、WARN、INFOlog4j.rootLogger=INFO,stdout1#自定义logger#log4j.logger.accessLogger=INFO,flume#log4j.logger.ugcLogger=INFO,flumelog4j.logger.std1Logger=INFO,stdout1,log4j.logger.std2Logger=INFO,stdout2log4j.logger.access=INFO,flumelog4j.logger.ugchead=INFO,flumelog4j.logger.ugctail=INFO,flume#某个包的level的appender#log4j.logger.com.zenith.flume = INFO,flume### flume ###log4j.appender.flume=org.apache.flume.clients.log4jappender.Log4jAppenderlog4j.appender.flume.layout=org.apache.log4j.PatternLayoutlog4j.appender.flume.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{
1} [%p] %m%nlog4j.appender.flume.Hostname=192.168.22.131log4j.appender.flume.Port=41414log4j.appender.flume.UnsafeMode = true### stdout ###log4j.appender.stdout1=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout1.Threshold=DEBUGlog4j.appender.stdout1.Target=System.outlog4j.appender.stdout1.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout1.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %c{
1} [%p] %m%n### stdout ###log4j.appender.stdout2=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout2.Threshold=DEBUGlog4j.appender.stdout2.Target=System.outlog4j.appender.stdout2.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout2.layout.ConversionPattern=%d{yyyy-MM-dd hh:mm:ss} %c{
1} [%p] %m%n### access ###log4j.appender.access=org.apache.log4j.DailyRollingFileAppenderlog4j.appender.access.Threshold=INFOlog4j.appender.access.File=/usr/local/apache-flume/logs/avro.loglog4j.appender.access.Append=truelog4j.appender.access.DatePattern='.'yyyy-MM-ddlog4j.appender.access.layout=org.apache.log4j.PatternLayoutlog4j.appender.access.layout.ConversionPattern=%m%n### ugchead ###log4j.appender.ugchead=org.apache.log4j.DailyRollingFileAppenderlog4j.appender.ugchead.Threshold=INFOlog4j.appender.ugchead.File=/usr/local/apache-flume/logs/flume.loglog4j.appender.ugchead.Append=truelog4j.appender.ugchead.DatePattern='.'yyyy-MM-ddlog4j.appender.ugchead.layout=org.apache.log4j.PatternLayoutlog4j.appender.ugchead.layout.ConversionPattern=%m%n### ugctail ###log4j.appender.ugctail=org.apache.log4j.DailyRollingFileAppenderlog4j.appender.ugctail.Threshold=INFOlog4j.appender.ugctail.File=/usr/local/apache-flume/logs/hu.loglog4j.appender.ugctail.Append=truelog4j.appender.ugctail.DatePattern='.'yyyy-MM-ddlog4j.appender.ugctail.layout=org.apache.log4j.PatternLayoutlog4j.appender.ugctail.layout.ConversionPattern=%m%n

程序:

package cn.hx;import org.apache.log4j.BasicConfigurator;import org.apache.log4j.Logger;/** * Created by hushiwei on 2017/8/20. */public class test {    protected static final Logger loggeaccess = Logger.getLogger("access");    protected static final Logger loggerugc = Logger.getLogger("ugchead");    public static void main(String[] args) throws Exception {        BasicConfigurator.configure();        while (true) {            loggeaccess.info("this is acccess log");            loggerugc.info("ugc");            //KafkaUtil util=new KafkaUtil();            //util.initProducer();            //util.produceData("crxy","time",String.valueOf(new Date().getTime()));            Thread.sleep(1000);        }    }}

在131中执行:

root@Ubuntu-1:/usr/local/apache-flume# bin/flume-ng agent --conf conf/ --conf-file conf/avro_source.conf --name agent1 -Dflume.root.logger=INFO,console &

avro.source文件是上面某道题中的文件

打jar包后到131中执行

可是报错,没有解决:

Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/log4j/Logger

Caused by:java.lang.ClassNotFoundException:org.apache.log4j.Logger

 

8,A机器的access.log日志采集后打印到B、C做负载均衡,打印到控制台上,load_balance

132和135中:

conf文件用avro_source.conf 

启动:

root@Ubuntu-2:/usr/local/apache-flume# bin/flume-ng agent --conf conf/ --conf-file conf/avro_source.conf --name agent1 -Dflume.root.logger=INFO,console &

131中:

# Name the components on this agenta1.sources = r1a1.sinks = k1 k2a1.channels = c1 # Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.channels=c1a1.sources.r1.command=tail -F /usr/local/apache-flume/logs/xing.log           #define sinkgroupsa1.sinkgroups=g1a1.sinkgroups.g1.sinks=k1 k2a1.sinkgroups.g1.processor.type=load_balancea1.sinkgroups.g1.processor.backoff=truea1.sinkgroups.g1.processor.selector=round_robin#define the sink 1a1.sinks.k1.type=avroa1.sinks.k1.hostname=192.168.22.132a1.sinks.k1.port=41414#define the sink 2a1.sinks.k2.type=avroa1.sinks.k2.hostname=192.168.22.135a1.sinks.k2.port=41414# Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1a1.sinks.k2.channel=c1

启动

root@Ubuntu-1:/usr/local/apache-flume# bin/flume-ng agent --conf conf/ --conf-file conf/eight.conf --name a1 -Dflume.root.logger=INFO,console &

在131中:

在132中:

在135中:

9,A机器的access.log日志采集后打印到B、C做故障转移,打印到控制台上,failover

132和135中起avro_source的conf文件

131中启:

# Name the components on this agenta1.sources = r1a1.sinks = k1 k2a1.channels = c1  # Describe/configure the sourcea1.sources.r1.type = execa1.sources.r1.channels=c1a1.sources.r1.command=tail -F /usr/local/apache-flume/logs/xing.log #define sinkgroupsa1.sinkgroups=g1a1.sinkgroups.g1.sinks=k1 k2a1.sinkgroups.g1.processor.type=failovera1.sinkgroups.g1.processor.priority.k1=10a1.sinkgroups.g1.processor.priority.k2=5a1.sinkgroups.g1.processor.maxpenalty=10000 #define the sink 1a1.sinks.k1.type=avroa1.sinks.k1.hostname=192.168.22.132a1.sinks.k1.port=41414 #define the sink 2a1.sinks.k2.type=avroa1.sinks.k2.hostname=192.168.22.135a1.sinks.k2.port=41414  # Use a channel which buffers events in memorya1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channela1.sources.r1.channels = c1a1.sinks.k1.channel = c1a1.sinks.k2.channel=c1

启131

root@Ubuntu-1:/usr/local/apache-flume# bin/flume-ng agent --conf conf/ --conf-file conf/nine.conf --name a1 -Dflume.root.logger=INFO,console &

查看:

关闭132中的flume之后

132宕机之后 可以看到数据直接转到135中了:

 

转载于:https://www.cnblogs.com/huxinga/p/7298465.html

你可能感兴趣的文章
【题解】青蛙的约会
查看>>
mybatis调用存储过程,获取返回的游标
查看>>
设计模式之装饰模式(结构型)
查看>>
Swift3.0服务端开发(三) Mustache页面模板与日志记录
查看>>
EntityFrameWork 实现实体类和DBContext分离在不同类库
查看>>
autopep8
查看>>
GIT在Linux上的安装和使用简介
查看>>
[转]: 视图和表的区别和联系
查看>>
Android 官方新手指导教程
查看>>
幸运转盘v1.0 【附视频】我的Android原创处女作,请支持!
查看>>
[51nod] 1199 Money out of Thin Air #线段树+DFS序
查看>>
Red and Black(poj-1979)
查看>>
安装 Express
查看>>
NOIP2013 提高组 Day1
查看>>
cocos2dx 3.x simpleAudioEngine 长音效被众多短音效打断问题
查看>>
存储(硬件方面的一些基本术语)
查看>>
观察者模式
查看>>
Weka中数据挖掘与机器学习系列之基本概念(三)
查看>>
Win磁盘MBR转换为GUID
查看>>
大家在做.NET B/S项目的时候多用什么设技术啊?
查看>>