本方案结合腾讯云消息队列 CKafka、流计算 Oceanus、私有网络 VPC、商业智能分析 BI 等,对视频直播行业数字化运营进行实时可视化分析。分析指标包含观看直播人员的地区分布、各级别会员统计、各模块打赏礼物情况、在线人数等。
?
?方案架构
根据以上视频直播场景所涉及的产品,包括流计算 Oceanus、私有网络 VPC、消息队列 CKafka、云数据库 MySQL、弹性 MapReduce 和商业智能分析 BI,设计的架构图如下:
?
?前期准备
购买并创建相应的大数据组件。
创建私有网络 VPC
私有网络是一块您在腾讯云上自定义的逻辑隔离网络空间,在构建流计算 Oceanus、消息队列 CKafka、云数据库 MySQL、弹性 MapReduce 等服务时,选择的网络必须保持一致,网络才能互通。否则就需要使用对等连接、VPN 等方式打通网络。登录 私有网络控制台 创建私有网络,详情请参见 创建私有网络。
创建 Oceanus 集群
流计算 Oceanus 服务兼容原生的 Flink 任务。登录 流计算 Oceanus 控制台 选择计算资源 > 新建创建集群,选择地域、可用区、VPC、日志、存储、设置密码等。VPC 及子网使用刚创建好的网络,详情可参见 创建独享集群。创建完后 Flink 的集群如下:
?
?创建消息队列 Ckafka
消息队列 CKafka(Cloud Kafka)是基于开源 Apache Kafka 消息队列引擎,提供高吞吐性能、高可扩展性的消息队列服务。消息队列 CKafka 完美兼容 Apache kafka 0.9、0.10、1.1、2.4、2.8版本接口,在性能、扩展性、业务安全保障、运维等方面具有超强优势,让您在享受低成本、超强功能的同时,免除繁琐运维工作。
创建 Ckafka 集群
注意
私有网络和子网需选择之前创建的网络和子网。
?
?
?创建 topic
Ckafka 集群创建成功后,在实例列表中,单击新建的实例 ID/名称,进入实例详情页。
?
在实例详情页,切换到 topic 管理页签,单击新建,开始创建 topic。
?
?模拟发送数据到 topic
kafka 客户端
进入同子网的 CVM 下,启动 kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端。
使用脚本发送
脚本一:Java 参考地址:使用 SDK 收发消息?
脚本二:Python 脚本生成模拟数据,具体如下:
#!/usr/bin/python3# 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块import jsonimport randomimport timefrom kafka import KafkaProducer?TIME_FORMAT = "%Y-%m-%d %H:%M:%S"PROVINCES = ["北京", "广东", "山东", "江苏", "河南", "上海", "河北", "浙江", "香港","陕西", "湖南", "重庆", "福建", "天津", "云南", "四川", "广西", "安徽","海南", "江西", "湖北", "山西", "辽宁", "台湾", "黑龙江", "内蒙古","澳门", "贵州", "甘肃", "青海", "新疆", "西藏", "吉林", "宁夏"]?broker_lists = ['172.28.28.13:9092']topic_live_gift_total = 'live_gift_total'topic_live_streaming_log = 'live_streaming_log'?producer = KafkaProducer(bootstrap_servers=broker_lists,value_serializer=lambda m: json.dumps(m).encode('ascii'))?# 模拟几天前,几小时前的数据pre_day_count = 0pre_hour_count = 0hour_unit = 3600day_unit = 3600 * 24?def generate_data_live_gift_total():# construct timeupdate_time = time.time() - day_unit * pre_day_countupdate_time_str = time.strftime(TIME_FORMAT, time.localtime(update_time))create_time = update_time - hour_unit * pre_hour_countcreate_time_str = time.strftime(TIME_FORMAT, time.localtime(create_time))results = []?for _ in range(0, 10):user_id = random.randint(2000, 4000)random_gift_type = random.randint(1, 10)random_gift_total = random.randint(1, 100)msg_kv = {"user_id": user_id, "gift_type": random_gift_type,"gift_total_amount": random_gift_total,"create_time": create_time_str, "update_time": update_time_str}results.append(msg_kv)return results??def generate_live_streaming_log():# construct timeupdate_time = time.time() - day_unit * pre_day_countleave_time_str = time.strftime(TIME_FORMAT, time.localtime(update_time))create_time = update_time - hour_unit * pre_hour_countcreate_time_str = time.strftime(TIME_FORMAT, time.localtime(create_time))results = []?for _ in range(0, 10):user_id = random.randint(2000, 4000)random_province = random.randint(0, len(PROVINCES) - 1)province_name = PROVINCES[random_province]grade = random.randint(1, 5)msg_kv = {"user_id": user_id, "ip": "123.0.0." + str(user_id % 255),"room_id": 20210813, "arrive_time": create_time_str,"create_time": create_time_str, "leave_time": leave_time_str,"region": 1122, "grade": (user_id % 5 + 1), "province": province_name}results.append(msg_kv)return results??def send_data(topic, msgs):count = 0?# produce asynchronouslyfor msg in msgs:import timetime.sleep(1)count += 1producer.send(topic, msg)print(" send %d data...\\n %s" % (count, msg))?producer.flush()??if __name__ == '__main__':count = 1while True:time.sleep(60)#for _ in range(count):msg_live_stream_logs = generate_live_streaming_log()send_data(topic_live_streaming_log, msg_live_stream_logs)?msg_topic_live_gift_totals = generate_data_live_gift_total()send_data(topic_live_gift_total, msg_topic_live_gift_totals)
创建 EMR 集群
弹性 MapReduce 是云端托管的弹性开源泛 Hadoop 服务,支持 Spark、HBase、Presto、Flink、Druid 等大数据框架,本次示例主要需要使用 Hbase 组件。
1. 登录 弹性 MapReduce 控制台,选择集群 > 新建集群,开始新建集群,具体可参考 创建 EMR 集群。新建集群时,需选择安装 HBase 组件。
?
如果是生产环境,服务器配置可根据实际情况选择。网络需要选择之前创建好的 VPC 网络,始终保持服务组件在同一 VPC 下。
?
?2. 在集群列表中,单击新建的集群 ID/名称,进入集群详情页。选择集群资源 > 资源管理,即进入 HBase 的 Master 节点。
?
?3. 进入 云服务器控制台,搜索 EMR 实例 ID,然后单击登录进入服务器。
?
?4. 创建 Hbase 表。
# 进入HBase命令[root@172~]# hbase shell# 建表语句create 'dim_hbase', 'cf'
创建云数据库 MySQL
云数据库 MySQL(TencentDB for MySQL)是腾讯云基于开源数据库 MySQL 专业打造的高性能分布式数据存储服务,让用户能够在云中更轻松地设置、操作和扩展关系数据库。
登录 云数据库 TencentDB 控制台,单击新建,新建 MySQL 服务。网络选择需为上文创建的网络。
?
创建完 MySQL 服务后,需要修改 binlog 参数,如图修改为 FULL(默认值为 MINIMAL)。
?
?修改完参数后,登录 MySQL 创建示例所需要的数据库和数据库表。
1. 进入实例详情页,单击登录,登录 MySQL 云数据库。
?
?2. 新建数据库。
打开 SQL 窗口或可视化页面创建数据库和表。
CREATE DATABASE livedb; --创建数据库列表
创建商业智能分析
商业智能分析 BI(Business Intelligence,BI)支持自服务数据准备、探索式分析和企业级管控,是新一代的敏捷自助型 BI 服务平台。只需几分钟,您就可以在云端轻松自如地完成数据分析、业务数据探查、报表制作等一系列数据可视化操作。便捷的拖拉拽式交互操作方式,让您无需依赖 IT 人员,无需担心试错成本,快速洞察数据背后的关联、趋势和逻辑。
购买商业智能分析
1. 登录 商业智能分析 BI 控制台,使用主账号购买资源,购买时需根据创建的子账号数来进行购买。
?
?2. 子用户提出申请。
?
?3. 主账号审核通过。并给子用户授予添加数据源、创建数据集、查看报告的权限。
添加 MySQL 数据源
说明
1. 打开购买的 MySQL 实例,开启外网。
?
2. 将 SaaS BI(119.29.66.144:3306)添加到 MySQL 数据库安全组。
?
??
这里添加的是 MySQL 3306 端口,不是外网映射的端口。
?
?3. 创建 MySQL 账户并配置权限。
创建账户,并设置账号密码,**主机 IP 设置为%**。
?
??
设置账号权限。
?
??
?4. 进入智能商业分析 BI,连接 MySQL 数据库。添加数据源 > MySQL,填写完成后单击测试连接。
方案实现
接下来通过案例为您介绍如何利用流计算服务 Oceanus 实现视频直播数字化运营的实时可视化数据处理与分析。
解决方案
业务目标
这里只列举以下3种统计指标:
全站观看直播用户分布
礼物总和统计
各模块进入直播间人数统计
源数据格式
事件 log:live_streaming_log(topic)
字段 | 类型 | 含义 |
user_id | bigint | 客户号 |
ip | varchar | 客户 IP 地址 |
room_id | bigint | 房间号 |
arrive_time | varchar | 进入房间时间 |
leave_time | varchar | 离开房间时间 |
create_time | varchar | 创建时间 |
region_code | int | 地区编码 |
grade | int | 会员等级 |
province | varchar | 所在省份 |
Ckafka 内部采用 json 格式存储,展现出来的数据如下所示:
{'user_id': 3165, 'ip': '123.0.0.105', 'room_id': 20210813, 'arrive_time': '2021-08-16 09:48:01', 'create_time': '2021-08-16 09:48:01', 'leave_time': '2021-08-16 09:48:01', 'region': 1122, 'grade': 1, 'province': '浙江'}
礼物记录:live_gift_total(topic 名)
字段 | 类型 | 含义 |
user_id | bigint | 客户号 |
gift_type | int | 礼物类型 |
gift_total_amount | bigint | 礼物数量 |
create_time | varchar | 创建时间 |
update_time | varchar | 更新时间 |
{'user_id': 3994, 'gift_type': 3, 'gift_total_amount': 28, 'create_time': '2021-08-16 09:46:51', 'update_time': '2021-08-16 09:46:51'}
模块记录表:dim_hbase(Hbase 维表)
字段 | 例子 | 含义 |
rowkey | 20210813 | 房间号 |
module_id | 0000 | 所属直播模块 |
Oceanus SQL 作业编写
全网观看直播用户分布(需提前在 MySQL 建表)
1. 定义 source
CREATE TABLE `live_streaming_log_source` (`user_id` BIGINT,`ip` VARCHAR,`room_id` BIGINT,`arrive_time` VARCHAR,`leave_time` VARCHAR,`create_time` VARCHAR,`region_code` INT,`grade` INT,`province` VARCHAR) WITH ('connector' = 'kafka','topic' = 'live_streaming_log','scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'xx.xx.xx.xx:xxxx','properties.group.id' = 'joylyu-consumer-2','format' = 'json','json.ignore-parse-errors' = 'false','json.fail-on-missing-field' = 'false');
2. 定义 sink
CREATE TABLE `live_streaming_log_sink` (`user_id` BIGINT,`ip` VARCHAR,`room_id` VARCHAR,`arrive_time` TIMESTAMP,`leave_time` TIMESTAMP,`create_time` TIMESTAMP,`region_code` VARCHAR,`grade` INT,`province` VARCHAR,primary key(`user_id`, `ip`,`room_id`,`arrive_time`) not enforced) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/livedb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数'table-name' = 'live_streaming_log', -- 需要写入的数据表'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'xxxxxxxxx', -- 数据库访问的密码'sink.buffer-flush.max-rows' = '5000','sink.buffer-flush.interval' = '2s','sink.max-retries' = '3');
3. 业务逻辑
INSERT INTO `live_streaming_log_sink`SELECTCASE WHEN `user_id` IS NULL THEN 0000 ELSE `user_id` END AS `user_id`, `ip`, CAST(`room_id` AS VARCHAR) AS `room_id`, CAST(`arrive_time` AS TIMESTAMP) AS `arrive_time`, CAST(`leave_time` AS TIMESTAMP) AS `leave_time`, CAST(`create_time` AS TIMESTAMP) AS `create_time`, CAST(`region_code` AS VARCHAR) AS `region_code`, `grade`, `province`FROM `live_streaming_log_source`;
礼物总和统计(需提前在 MySQL 建表)
1. 定义 source
CREATE TABLE `live_gift_total_source` (`user_id` BIGINT,`gift_type` INT,`gift_total_amount` BIGINT,`create_time` VARCHAR,`update_time` VARCHAR) WITH ('connector' = 'kafka','topic' = 'live_gift_total', -- 替换为您要消费的 Topic'scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'xx.xx.xx.xx:xxxx', -- 替换为您的 Kafka 连接地址'properties.group.id' = 'demo3Group2', -- 必选参数, 一定要指定 Group ID'format' = 'json','json.ignore-parse-errors' = 'false','json.fail-on-missing-field' = 'false');
2. 定义 sink
CREATE TABLE `live_gift_total_amount_sink ` (`gift_type` VARCHAR,`gift_total_amount` BIGINT,primary key(`gift_type`) not enforced) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/livedb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai','table-name' = 'live_gift_total_amount', -- 需要写入的数据表'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限)'password' = 'xxxxxxxxxxxxx', -- 数据库访问的密码'sink.buffer-flush.max-rows' = '5000','sink.buffer-flush.interval' = '2s','sink.max-retries' = '3');
3. 业务逻辑
INSERT INTO `live_gift_total_amount_sink`SELECTCAST(`gift_type` AS VARCHAR) AS `gift_type`, SUM(`gift_total_amount`) AS `gift_total_amount_all`FROM `live_gift_total_source`GROUP BY CAST(`gift_type` AS VARCHAR);
各模块进入直播间人数统计(需提前在 MySQL 建表)
1. 定义 source
CREATE TABLE `live_streaming_log_source` (`user_id` BIGINT,`ip` VARCHAR,`room_id` BIGINT,`arrive_time` VARCHAR,`leave_time` VARCHAR,`create_time` VARCHAR,`region_code` INT,`grade` INT,`province` VARCHAR,`proc_time` AS PROCTIME()) WITH ('connector' = 'kafka','topic' = 'live_streaming_log', -- 替换为您要消费的 Topic'scan.startup.mode' = 'earliest-offset','properties.bootstrap.servers' = 'xx.xx.xx.xx:xxxx', -- 替换为您的 Kafka 连接地址'properties.group.id' = 'demo3Group3', -- 必选参数, 一定要指定 Group ID'format' = 'json','json.ignore-parse-errors' = 'false','json.fail-on-missing-field' = 'false');
2. 定义 Hbase 维表
CREATE TABLE `dim_hbase` (`rowkey` STRING,`cf` ROW <`module_id` STRING>,PRIMARY KEY (`rowkey`) NOT ENFORCED) WITH ('connector' = 'hbase-1.4','table-name' = 'dim_hbase','zookeeper.quorum' = 'xx.xx.xx.xx:8121,xx.xx.xx.xx:8121,xx.xx.xx.xx:8121');
3. 定义 sink
CREATE TABLE `live_module_number_count_sink` (`module_id` BIGINT,`module_number_count` BIGINT,primary key(`module_id`) not enforced) WITH ('connector' = 'jdbc','url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/livedb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai','table-name' = 'live_module_number_count','username' = 'root','password' = 'xxxxxxxxxxx','sink.buffer-flush.max-rows' = '5000','sink.buffer-flush.interval' = '2s','sink.max-retries' = '3');
4. 业务逻辑
INSERT INTO `live_module_number_count_sink`SELECTCAST(dim_hbase.cf.module_id AS BIGINT) AS module_id,COUNT(live_streaming_log_source.`user_id`) AS module_number_countFROM `live_streaming_log_source`JOIN `dim_hbase` for SYSTEM_TIME as of live_streaming_log_source.proc_timeON CAST(live_streaming_log_source.room_id AS STRING) = dim_hbase.rowkeyGROUP BY CAST(dim_hbase.cf.module_id AS BIGINT);
实时大屏可视化展示
添加数据源
创建数据集
选择创建数据集 > SQL 数据集(可根据实际业务场景选择其他数据集),从刚才的数据源中添加数据集,单击保存。
制作报告
选择制作报告 > 新建报告(可选择任意模板),拖拽组件到中间空白处完成报告的制作。
设置实时刷新。选择左上角报告设置 > 高级,勾选获取实时数据,刷新间隔设置为3s(根据实际业务情况自行选择),这样可以根据 MySQL 数据源间隔3s一次自动刷新报告。完成后,单击保存即可。
查看报告
图表1:用户地区分布。表示观看直播客户在全国范围内的地区分布。
图表2:各级别会员人数。表示各个会员等级的总人数。
图表3:礼物类型总和。表示收到各礼物类型的总和。
图表4:最近6h礼物总数统计。表示最近6小时收到的礼物总计和。
图表5:刷礼物排行前10。表示刷礼物最多的10个客户。
图表6:在线人数。当天每个时间段进入直播间的人数。
?
?
?