事件总线的数据
原创大约 2 分钟
事件总线
中的数据包括两大类。
客户端埋点上报的数据(原始数据保存在
/var/logs/client_upload.log
)。服务端接口调用的数据(原始数据保存在
/var/logs/*_api.log
)。
这些原始数据经过ETL工具的即时合并和简单清洗后,结果被保存在/var/logs/eventbus.log
文件中,格式统一如下。
{
"eid": 20815976325148963257,
"etype": "LOGIN",
"ename": "login_success",
"esource": "Android",
"etime": "2024-01-02 03:04:05",
"userid": 20815976325148963257,
"aid": 587,
"aname": "CLICK",
"tid": "8e69466a-9d69-4b24-98df-c7a03da1c831",
"tname": "Tide",
"context": {
"device":{
"ip": "14.119.104.189",
"province": "北京",
"city": "北京",
"area": "朝阳区",
"imei": "868540050954128",
"imsi": "460080585306741"
},
"profile":{
"grade": "L1"
},
"product":{
"id": "8e69466a-9d69-4b24-98df-c7a03da1c831",
"price": "26.0",
"category": "Washing"
}
}
}
字段 | 说明 |
---|---|
id | 事件编码,20位长整型 |
type | 事件类型,例如,登录 、领券 、下单 、支付 等 |
name | 事件名称,例如,登录成功 、领券成功 等 |
source | 事件来源,例如,Android 、IOS 、PC 、API (业务接口调用)等 |
time | 事件发生的时间,格式为yyyy-MM-dd HH:mm:ss ,这个字段非常重要,后续计算都需要用到 |
userid | 触发事件的用户编码,20位长整型。另外,将用户编码放到这里是便于后续计算 |
aid | 行为编码 |
aname | 行为名称,例如,单击 、拖拽 、滑动 等 |
tid | 触发事件的目标编码,事件可能会因为产品/订单/优惠券而触发,这里的编码就是对应的产品/订单/优惠券的编码 |
tname | 事件目标名称,例如,Tide (汰渍洗衣粉) |
context | 触发事件时的上下文,例如,设备型号 、用户等级 等 |
通过Flume将/var/logs/eventbus.log
日志文件上传到Kafka。
> cd /home/work/flume-1.11.0/
> cp conf/ conf-log-to-kafka/
> cd conf-log-to-kafka
> mv flume.conf log-to-kafka.conf
> vi log-to-kafka.conf
# 指定source和sink到channel
a1.sources = r1
a1.channels = c1
a1.sinks = k1
# 配置source
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/logs/eventbus.log
# 配置channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 配置sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 172.16.185.176:9092
a1.sinks.k1.kafka.topic = eventbus
a1.sinks.k1.kafka.consumer.group.id = clickhouse
a1.sinks.k1.kafka.flumeBatchSize = 1
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
a1.sinks.k1.kafka.producer.compression.type = snappy
# 绑定source和sink到channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
然后启动Flume。
> cd /home/work/flume-1.11.0/
> nohup ./bin/flume-ng agent -n a1 -c conf-log-to-kafka/ -f conf-log-to-kafka/log-to-kafka.conf -Dflume.root.logger=INFO,console > /dev/null 2>/home/work/logs/flume-error.log &
感谢支持
更多内容,请移步《超级个体》。