数据采集配置
原创大约 4 分钟
创建Topic
部署并启动ZooKeeper和Kafka后,就可以在Kafka中创建需要的Topic
了。
> cd /home/work/kafka
# 存储所有采集到的日志数据
> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 40 --replication-factor 2 --topic all_type_r2p40
# 存储服务端日志数据
> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 2 --topic user_follow
# 存储内容上报相关信息
> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 2 --topic live_info
# 存储客户端日志数据
> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 2 --topic user_active
# 存储所有可能没有type字段的异常数据
> ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 5 --replication-factor 2 --topic default_r2p5
日志采集
> cd /home/work/filebeat-8.7.0-linux-x86_64
> vi filebeat.yml
- type: filestream
id: my-filestream-id
enabled: true
# 修改为/var/logs
paths:
- /var/logs/*.log
output.kafka:
# initial brokers for reading cluster metadata
hosts: ["172.16.185.176:9092"]
# message topic selection + partitioning
topic: 'all_type_r2p40'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
# 数据格式化,仅读取message字段内容
codec.format:
string: '%{[message]}'
FileBeat配置完成后暂不启动。
数据分发
使用Flume将Kafka中指定Topic
的数据分发到其他的Topic
中。
> cd /home/work/flume-1.11.0
> cp -r /home/work/flume-1.11.0/conf /home/work/flume-1.11.0/conf-kafka-to-kafka
> vi kafka-to-kafka.conf
# -------- 公共配置 --------
agent1.sources = kksr
agent1.channels = kkc1
agent1.sinks = kks1
agent1.sources.kksr.channels = kkc1
agent1.sinks.kks1.channel = kkc1
# -------- kksr 相关配置 --------
agent1.sources.kksr.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kksr.batchSize = 1000
agent1.sources.kksr.batchDurationMillis = 1000
agent1.sources.kksr.kafka.bootstrap.servers = 172.16.185.176:9092
agent1.sources.kksr.kafka.topics = all_type_r2p40
agent1.sources.kksr.kafka.consumer.group.id = group_id_1
# -------- 拦截器相关配置 --------
# 定义拦截器
agent1.sources.kksr.interceptors = i2 i1
# 拦截器类型
agent1.sources.kksr.interceptors.i1.type = regex_extractor
# 设置正则表达式,匹配指定的数据,这样设置会在数据的header中增加topic=aaa
agent1.sources.kksr.interceptors.i1.regex = "type":"(\\w+)"
agent1.sources.kksr.interceptors.i1.serializers = s1
agent1.sources.kksr.interceptors.i1.serializers.s1.name = topic
# 数据中没有type字段的赋一个默认topic
agent1.sources.kksr.interceptors.i2.type = static
agent1.sources.kksr.interceptors.i2.key = topic
agent1.sources.kksr.interceptors.i2.preserveExisting = false
agent1.sources.kksr.interceptors.i2.value = default_r2p5
# -------- kkc1 相关配置 --------
# channel类型
agent1.channels.kkc1.type = file
agent1.channels.kkc1.checkpointDir = /data/channel/all_type/checkpoint
agent1.channels.kkc1.dataDirs = /data/channel/all_type/data
# -------- kks1 相关配置 --------
agent1.sinks.kks1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.kks1.kafka.topic = default
agent1.sinks.kks1.kafka.bootstrap.servers = 172.16.185.176:9092
agent1.sinks.kks1.kafka.flumeBatchSize = 1
agent1.sinks.kks1.kafka.producer.acks = 1
agent1.sinks.kks1.kafka.producer.linger.ms = 1
agent1.sinks.kks1.kafka.producer.compression.type = snappy
同时修改log4j2.xml
文件,避免输出的日志混到一起。
> cd /home/work/flume-1.11.0/conf-kafka-to-kafka
> vi log4j2.xml
......
<RollingFile name="LogFile" fileName="${LOG_DIR}/flume-kafka-to-kafka.log" filePattern="${LOG_DIR}/archive/flume-kafka-to-kafka.log.%d{yyyyMMdd}-%i">
......
<IfFileName glob="flume-kafka-to-kafka.log.*">
Flume配置完成后也暂不启动。
数据存储
将服务端上报的内容相关指标数据保存到HDFS,便于后续离线计算。
> cd /home/work/flume-1.11.0
> cp -r /home/work/flume-1.11.0/conf /home/work/flume-1.11.0/conf-live-info-to-hdfs
> vi live-info-to-hdfs.conf
# -------- 公共配置 --------
agent2.sources = kksr
agent2.channels = kkc1
agent2.sinks = kks1
agent2.sources.kksr.channels = kkc1
agent2.sinks.kks1.channel = kkc1
# -------- kksr 相关配置 --------
agent2.sources.kksr.type = org.apache.flume.source.kafka.KafkaSource
agent2.sources.kksr.kafka.bootstrap.servers = 172.16.185.176:9092
agent2.sources.kksr.kafka.topics = live_info
agent2.sources.kksr.kafka.consumer.group.id = group_id_2
# -------- kkc1 相关配置 --------
agent2.channels.kkc1.type = file
agent2.channels.kkc1.checkpointDir = /data/channel/live_info/checkpoint
agent2.channels.kkc1.dataDirs = /data/channel/live_info/data
# -------- kks1 相关配置 --------
agent2.sinks.kks1.type = hdfs
# 输出到子文件夹
agent2.sinks.kks1.hdfs.path = hdfs://172.16.185.176:9000/data/live_info/%Y%m%d
agent2.sinks.kks1.hdfs.writeFormat = Text
agent2.sinks.kks1.hdfs.fileType = DataStream
agent2.sinks.kks1.hdfs.callTimeout = 3600000
# 当文件大小为104857600字节时,将临时文件滚动成一个目标文件
agent2.sinks.kks1.hdfs.rollSize = 104857600
# events数据达到该数量的时候,将临时文件滚动成目标文件
agent2.sinks.kks1.hdfs.rollCount = 0
# 每隔N秒将临时文件滚动成一个目标文件
agent2.sinks.kks1.hdfs.rollInterval = 3600
# 配置前缀和后缀
agent2.sinks.kks1.hdfs.filePrefix=run
agent2.sinks.kks1.hdfs.fileSuffix=.data
同时修改log4j2.xml
文件,避免输出的日志混到一起。
> cd /home/work/flume-1.11.0/conf-live-info-to-hdfs
> vi log4j2.xml
......
<RollingFile name="LogFile" fileName="${LOG_DIR}/flume-live-info-to-hdfs.log" filePattern="${LOG_DIR}/archive/flume-live-info-to-hdfs.log.%d{yyyyMMdd}-%i">
......
<IfFileName glob="flume-live-info-to-hdfs.log.*">
再将客户端上报的用户行为数据保存到HDFS,便于后续离线计算。
> cd /home/work/flume-1.11.0
> cp -r /home/work/flume-1.11.0/conf /home/work/flume-1.11.0/conf-user-active-to-hdfs
> vi user-active-to-hdfs.conf
# -------- 公共配置 --------
agent3.sources = kksr
agent3.channels = kkc1
agent3.sinks = kks1
agent3.sources.kksr.channels = kkc1
agent3.sinks.kks1.channel = kkc1
# -------- kksr 相关配置 --------
agent3.sources.kksr.type = org.apache.flume.source.kafka.KafkaSource
agent3.sources.kksr.kafka.bootstrap.servers = 172.16.185.176:9092
agent3.sources.kksr.kafka.topics = user_active
agent3.sources.kksr.kafka.consumer.group.id = group_id_3
# -------- kkc1 相关配置 --------
agent3.channels.kkc1.type = file
agent3.channels.kkc1.checkpointDir = /data/channel/user_active/checkpoint
agent3.channels.kkc1.dataDirs = /data/channel/user_active/data
# -------- kks1 相关配置 --------
agent3.sinks.kks1.type = hdfs
# 输出到子文件夹
agent3.sinks.kks1.hdfs.path = hdfs://172.16.185.176:9000/data/user_active/%Y%m%d
agent3.sinks.kks1.hdfs.writeFormat = Text
agent3.sinks.kks1.hdfs.fileType = DataStream
agent3.sinks.kks1.hdfs.callTimeout = 3600000
# 当文件大小为104857600字节时,将临时文件滚动成一个目标文件
agent3.sinks.kks1.hdfs.rollSize = 104857600
# events数据达到该数量的时候,将临时文件滚动成目标文件
agent3.sinks.kks1.hdfs.rollCount = 0
# 每隔N秒将临时文件滚动成一个目标文件
agent3.sinks.kks1.hdfs.rollInterval = 3600
# 配置前缀和后缀
agent3.sinks.kks1.hdfs.filePrefix=run
agent3.sinks.kks1.hdfs.fileSuffix=.data
同时修改log4j2.xml
文件,避免输出的日志混到一起。
> cd /home/work/flume-1.11.0/conf-user-active-to-hdfs
> vi log4j2.xml
......
<RollingFile name="LogFile" fileName="${LOG_DIR}/flume-user-active-to-hdfs.log" filePattern="${LOG_DIR}/archive/flume-user-active-to-hdfs.log.%d{yyyyMMdd}-%i">
......
<IfFileName glob="flume-user-active-to-hdfs.log.*">
启动服务
启动服务的顺序一般都是按照数据流动的顺序从后往前启动。
# 启动数据存储flume
> cd /home/work/flume-1.11.0/bin
> nohup ./flume-ng agent --name agent3 --conf conf-user-active-to-hdfs/ --conf-file conf-user-active-to-hdfs/user-active-to-hdfs.conf &
# 启动数据存储flume
> cd /home/work/flume-1.11.0/bin
> nohup ./flume-ng agent --name agent2 --conf conf-live-info-to-hdfs/ --conf-file conf-live-info-to-hdfs/live-info-to-hdfs.conf &
# 启动数据分发flume
> cd /home/work/flume-1.11.0/bin
> nohup ./flume-ng agent --name agent1 --conf conf-kafka-to-kafka/ --conf-file conf-kafka-to-kafka/kafka-to-kafka.conf &
# 启动日志数据采集filebeat
> cd /home/work/filebeat-8.7.0-linux-x86_64
> nohup ./filebeat -c filebeat.yml &
感谢支持
更多内容,请移步《超级个体》。