有奖捉虫:行业应用 & 管理与支持文档专题 HOT
文档中心 > 最佳实践 > 流计算 Oceanus > 视频直播解决方案之实时 BI 分析
本方案结合腾讯云消息队列 CKafka、流计算 Oceanus、私有网络 VPC、商业智能分析 BI 等,对视频直播行业数字化运营进行实时可视化分析。分析指标包含观看直播人员的地区分布、各级别会员统计、各模块打赏礼物情况、在线人数等。
?
?

方案架构

根据以上视频直播场景所涉及的产品,包括流计算 Oceanus、私有网络 VPC、消息队列 CKafka、云数据库 MySQL、弹性 MapReduce 和商业智能分析 BI,设计的架构图如下:
?
?

前期准备

购买并创建相应的大数据组件。

创建私有网络 VPC

私有网络是一块您在腾讯云上自定义的逻辑隔离网络空间,在构建流计算 Oceanus、消息队列 CKafka、云数据库 MySQL、弹性 MapReduce 等服务时,选择的网络必须保持一致,网络才能互通。否则就需要使用对等连接、VPN 等方式打通网络。登录 私有网络控制台 创建私有网络,详情请参见 创建私有网络

创建 Oceanus 集群

流计算 Oceanus 服务兼容原生的 Flink 任务。登录 流计算 Oceanus 控制台 选择计算资源 > 新建创建集群,选择地域、可用区、VPC、日志、存储、设置密码等。VPC 及子网使用刚创建好的网络,详情可参见 创建独享集群。创建完后 Flink 的集群如下:
?
?

创建消息队列 Ckafka

消息队列 CKafka(Cloud Kafka)是基于开源 Apache Kafka 消息队列引擎,提供高吞吐性能、高可扩展性的消息队列服务。消息队列 CKafka 完美兼容 Apache kafka 0.9、0.10、1.1、2.4、2.8版本接口,在性能、扩展性、业务安全保障、运维等方面具有超强优势,让您在享受低成本、超强功能的同时,免除繁琐运维工作。

创建 Ckafka 集群

登录 消息队列 CKafka 控制台,单击新建,开始创建 Ckafka 集群,详情请参见 创建实例
注意
私有网络和子网需选择之前创建的网络和子网。
?
?
?

创建 topic

Ckafka 集群创建成功后,在实例列表中,单击新建的实例 ID/名称,进入实例详情页。
?
在实例详情页,切换到 topic 管理页签,单击新建,开始创建 topic。
?
?

模拟发送数据到 topic

kafka 客户端 进入同子网的 CVM 下,启动 kafka 客户端,模拟发送数据,具体操作参见 运行 Kafka 客户端
使用脚本发送
脚本一:Java 参考地址:使用 SDK 收发消息?
脚本二:Python 脚本生成模拟数据,具体如下:
#!/usr/bin/python3
# 首次使用该脚本,需 "pip3 install kafka" 安装kafka模块
import json
import random
import time
from kafka import KafkaProducer
?
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
PROVINCES = ["北京", "广东", "山东", "江苏", "河南", "上海", "河北", "浙江", "香港",
"陕西", "湖南", "重庆", "福建", "天津", "云南", "四川", "广西", "安徽",
"海南", "江西", "湖北", "山西", "辽宁", "台湾", "黑龙江", "内蒙古",
"澳门", "贵州", "甘肃", "青海", "新疆", "西藏", "吉林", "宁夏"]
?
broker_lists = ['172.28.28.13:9092']
topic_live_gift_total = 'live_gift_total'
topic_live_streaming_log = 'live_streaming_log'
?
producer = KafkaProducer(bootstrap_servers=broker_lists,
value_serializer=lambda m: json.dumps(m).encode('ascii'))
?
# 模拟几天前,几小时前的数据
pre_day_count = 0
pre_hour_count = 0
hour_unit = 3600
day_unit = 3600 * 24
?
def generate_data_live_gift_total():
# construct time
update_time = time.time() - day_unit * pre_day_count
update_time_str = time.strftime(TIME_FORMAT, time.localtime(update_time))
create_time = update_time - hour_unit * pre_hour_count
create_time_str = time.strftime(TIME_FORMAT, time.localtime(create_time))
results = []
?
for _ in range(0, 10):
user_id = random.randint(2000, 4000)
random_gift_type = random.randint(1, 10)
random_gift_total = random.randint(1, 100)
msg_kv = {"user_id": user_id, "gift_type": random_gift_type,
"gift_total_amount": random_gift_total,
"create_time": create_time_str, "update_time": update_time_str}
results.append(msg_kv)
return results
?
?
def generate_live_streaming_log():
# construct time
update_time = time.time() - day_unit * pre_day_count
leave_time_str = time.strftime(TIME_FORMAT, time.localtime(update_time))
create_time = update_time - hour_unit * pre_hour_count
create_time_str = time.strftime(TIME_FORMAT, time.localtime(create_time))
results = []
?
for _ in range(0, 10):
user_id = random.randint(2000, 4000)
random_province = random.randint(0, len(PROVINCES) - 1)
province_name = PROVINCES[random_province]
grade = random.randint(1, 5)
msg_kv = {"user_id": user_id, "ip": "123.0.0." + str(user_id % 255),
"room_id": 20210813, "arrive_time": create_time_str,
"create_time": create_time_str, "leave_time": leave_time_str,
"region": 1122, "grade": (user_id % 5 + 1), "province": province_name}
results.append(msg_kv)
return results
?
?
def send_data(topic, msgs):
count = 0
?
# produce asynchronously
for msg in msgs:
import time
time.sleep(1)
count += 1
producer.send(topic, msg)
print(" send %d data...\\n %s" % (count, msg))
?
producer.flush()
?
?
if __name__ == '__main__':
count = 1
while True:
time.sleep(60)
#for _ in range(count):
msg_live_stream_logs = generate_live_streaming_log()
send_data(topic_live_streaming_log, msg_live_stream_logs)
?
msg_topic_live_gift_totals = generate_data_live_gift_total()
send_data(topic_live_gift_total, msg_topic_live_gift_totals)

创建 EMR 集群

弹性 MapReduce 是云端托管的弹性开源泛 Hadoop 服务,支持 Spark、HBase、Presto、Flink、Druid 等大数据框架,本次示例主要需要使用 Hbase 组件。
1. 登录 弹性 MapReduce 控制台,选择集群 > 新建集群,开始新建集群,具体可参考 创建 EMR 集群。新建集群时,需选择安装 HBase 组件。
?
如果是生产环境,服务器配置可根据实际情况选择。网络需要选择之前创建好的 VPC 网络,始终保持服务组件在同一 VPC 下。
?
?
2. 在集群列表中,单击新建的集群 ID/名称,进入集群详情页。选择集群资源 > 资源管理,即进入 HBase 的 Master 节点。
?
?
3. 进入 云服务器控制台,搜索 EMR 实例 ID,然后单击登录进入服务器。
?
?
4. 创建 Hbase 表。
# 进入HBase命令
[root@172~]# hbase shell
# 建表语句
create 'dim_hbase', 'cf'

创建云数据库 MySQL

云数据库 MySQL(TencentDB for MySQL)是腾讯云基于开源数据库 MySQL 专业打造的高性能分布式数据存储服务,让用户能够在云中更轻松地设置、操作和扩展关系数据库。
登录 云数据库 TencentDB 控制台,单击新建,新建 MySQL 服务。网络选择需为上文创建的网络。
?
创建完 MySQL 服务后,需要修改 binlog 参数,如图修改为 FULL(默认值为 MINIMAL)。
?
?
修改完参数后,登录 MySQL 创建示例所需要的数据库和数据库表。
1. 进入实例详情页,单击登录,登录 MySQL 云数据库。
?
?
2. 新建数据库。 打开 SQL 窗口或可视化页面创建数据库和表。
CREATE DATABASE livedb; --创建数据库列表

创建商业智能分析

商业智能分析 BI(Business Intelligence,BI)支持自服务数据准备、探索式分析和企业级管控,是新一代的敏捷自助型 BI 服务平台。只需几分钟,您就可以在云端轻松自如地完成数据分析、业务数据探查、报表制作等一系列数据可视化操作。便捷的拖拉拽式交互操作方式,让您无需依赖 IT 人员,无需担心试错成本,快速洞察数据背后的关联、趋势和逻辑。

购买商业智能分析

1. 登录 商业智能分析 BI 控制台,使用主账号购买资源,购买时需根据创建的子账号数来进行购买。
?
?
2. 子用户提出申请。
?
?
3. 主账号审核通过。并给子用户授予添加数据源、创建数据集、查看报告的权限。

添加 MySQL 数据源

说明
这里选用开启外网方式连接,更多连接方式可参见 数据库连接方式概览
1. 打开购买的 MySQL 实例,开启外网。
?
2. 将 SaaS BI(119.29.66.144:3306)添加到 MySQL 数据库安全组。
?
?
?
这里添加的是 MySQL 3306 端口,不是外网映射的端口
?
?
3. 创建 MySQL 账户并配置权限。 创建账户,并设置账号密码,**主机 IP 设置为%**。
?
?
?
设置账号权限。
?
?
?
?
4. 进入智能商业分析 BI,连接 MySQL 数据库。添加数据源 > MySQL,填写完成后单击测试连接

方案实现

接下来通过案例为您介绍如何利用流计算服务 Oceanus 实现视频直播数字化运营的实时可视化数据处理与分析。

解决方案

业务目标

这里只列举以下3种统计指标:
全站观看直播用户分布
礼物总和统计
各模块进入直播间人数统计

源数据格式

事件 log:live_streaming_log(topic)
字段
类型
含义
user_id
bigint
客户号
ip
varchar
客户 IP 地址
room_id
bigint
房间号
arrive_time
varchar
进入房间时间
leave_time
varchar
离开房间时间
create_time
varchar
创建时间
region_code
int
地区编码
grade
int
会员等级
province
varchar
所在省份
Ckafka 内部采用 json 格式存储,展现出来的数据如下所示:
{
'user_id': 3165
, 'ip': '123.0.0.105'
, 'room_id': 20210813
, 'arrive_time': '2021-08-16 09:48:01'
, 'create_time': '2021-08-16 09:48:01'
, 'leave_time': '2021-08-16 09:48:01'
, 'region': 1122
, 'grade': 1
, 'province': '浙江'
}
礼物记录:live_gift_total(topic 名)
字段
类型
含义
user_id
bigint
客户号
gift_type
int
礼物类型
gift_total_amount
bigint
礼物数量
create_time
varchar
创建时间
update_time
varchar
更新时间
{
'user_id': 3994
, 'gift_type': 3
, 'gift_total_amount': 28
, 'create_time': '2021-08-16 09:46:51'
, 'update_time': '2021-08-16 09:46:51'
}
模块记录表:dim_hbase(Hbase 维表)
字段
例子
含义
rowkey
20210813
房间号
module_id
0000
所属直播模块

Oceanus SQL 作业编写

全网观看直播用户分布(需提前在 MySQL 建表)
1. 定义 source
CREATE TABLE `live_streaming_log_source` (
`user_id` BIGINT,
`ip` VARCHAR,
`room_id` BIGINT,
`arrive_time` VARCHAR,
`leave_time` VARCHAR,
`create_time` VARCHAR,
`region_code` INT,
`grade` INT,
`province` VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'live_streaming_log',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'xx.xx.xx.xx:xxxx',
'properties.group.id' = 'joylyu-consumer-2',
'format' = 'json',
'json.ignore-parse-errors' = 'false',
'json.fail-on-missing-field' = 'false'
);
2. 定义 sink
CREATE TABLE `live_streaming_log_sink` (
`user_id` BIGINT,
`ip` VARCHAR,
`room_id` VARCHAR,
`arrive_time` TIMESTAMP,
`leave_time` TIMESTAMP,
`create_time` TIMESTAMP,
`region_code` VARCHAR,
`grade` INT,
`province` VARCHAR,
primary key(`user_id`, `ip`,`room_id`,`arrive_time`) not enforced
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/livedb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai', -- 请替换为您的实际 MySQL 连接参数
'table-name' = 'live_streaming_log', -- 需要写入的数据表
'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限)
'password' = 'xxxxxxxxx', -- 数据库访问的密码
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '2s',
'sink.max-retries' = '3'
);
3. 业务逻辑
INSERT INTO `live_streaming_log_sink`
SELECT
CASE WHEN `user_id` IS NULL THEN 0000 ELSE `user_id` END AS `user_id`
, `ip`
, CAST(`room_id` AS VARCHAR) AS `room_id`
, CAST(`arrive_time` AS TIMESTAMP) AS `arrive_time`
, CAST(`leave_time` AS TIMESTAMP) AS `leave_time`
, CAST(`create_time` AS TIMESTAMP) AS `create_time`
, CAST(`region_code` AS VARCHAR) AS `region_code`
, `grade`
, `province`
FROM `live_streaming_log_source`;
礼物总和统计(需提前在 MySQL 建表)
1. 定义 source
CREATE TABLE `live_gift_total_source` (
`user_id` BIGINT,
`gift_type` INT,
`gift_total_amount` BIGINT,
`create_time` VARCHAR,
`update_time` VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'live_gift_total', -- 替换为您要消费的 Topic
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'xx.xx.xx.xx:xxxx', -- 替换为您的 Kafka 连接地址
'properties.group.id' = 'demo3Group2', -- 必选参数, 一定要指定 Group ID
'format' = 'json',
'json.ignore-parse-errors' = 'false',
'json.fail-on-missing-field' = 'false'
);
2. 定义 sink
CREATE TABLE `live_gift_total_amount_sink ` (
`gift_type` VARCHAR,
`gift_total_amount` BIGINT,
primary key(`gift_type`) not enforced
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/livedb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
'table-name' = 'live_gift_total_amount', -- 需要写入的数据表
'username' = 'root', -- 数据库访问的用户名(需要提供 INSERT 权限)
'password' = 'xxxxxxxxxxxxx', -- 数据库访问的密码
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '2s',
'sink.max-retries' = '3'
);
3. 业务逻辑
INSERT INTO `live_gift_total_amount_sink`
SELECT
CAST(`gift_type` AS VARCHAR) AS `gift_type`
, SUM(`gift_total_amount`) AS `gift_total_amount_all`
FROM `live_gift_total_source`
GROUP BY CAST(`gift_type` AS VARCHAR);
各模块进入直播间人数统计(需提前在 MySQL 建表)
1. 定义 source
CREATE TABLE `live_streaming_log_source` (
`user_id` BIGINT,
`ip` VARCHAR,
`room_id` BIGINT,
`arrive_time` VARCHAR,
`leave_time` VARCHAR,
`create_time` VARCHAR,
`region_code` INT,
`grade` INT,
`province` VARCHAR,
`proc_time` AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'live_streaming_log', -- 替换为您要消费的 Topic
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'xx.xx.xx.xx:xxxx', -- 替换为您的 Kafka 连接地址
'properties.group.id' = 'demo3Group3', -- 必选参数, 一定要指定 Group ID
'format' = 'json',
'json.ignore-parse-errors' = 'false',
'json.fail-on-missing-field' = 'false'
);
2. 定义 Hbase 维表
CREATE TABLE `dim_hbase` (
`rowkey` STRING,
`cf` ROW <`module_id` STRING>,
PRIMARY KEY (`rowkey`) NOT ENFORCED
) WITH (
'connector' = 'hbase-1.4',
'table-name' = 'dim_hbase',
'zookeeper.quorum' = 'xx.xx.xx.xx:8121,xx.xx.xx.xx:8121,xx.xx.xx.xx:8121'
);
3. 定义 sink
CREATE TABLE `live_module_number_count_sink` (
`module_id` BIGINT,
`module_number_count` BIGINT,
primary key(`module_id`) not enforced
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://xx.xx.xx.xx:xxxx/livedb?rewriteBatchedStatements=true&serverTimezone=Asia/Shanghai',
'table-name' = 'live_module_number_count',
'username' = 'root',
'password' = 'xxxxxxxxxxx',
'sink.buffer-flush.max-rows' = '5000',
'sink.buffer-flush.interval' = '2s',
'sink.max-retries' = '3'
);
4. 业务逻辑
INSERT INTO `live_module_number_count_sink`
SELECT
CAST(dim_hbase.cf.module_id AS BIGINT) AS module_id,
COUNT(live_streaming_log_source.`user_id`) AS module_number_count
FROM `live_streaming_log_source`
JOIN `dim_hbase` for SYSTEM_TIME as of live_streaming_log_source.proc_time
ON CAST(live_streaming_log_source.room_id AS STRING) = dim_hbase.rowkey
GROUP BY CAST(dim_hbase.cf.module_id AS BIGINT);

实时大屏可视化展示

添加数据源

进入 商业智能分析 BI 控制台,选择添加数据源 > MySQL 数据库,根据上面方法连接到指定 MySQL 数据库,单击保存

创建数据集

选择创建数据集 > SQL 数据集(可根据实际业务场景选择其他数据集),从刚才的数据源中添加数据集,单击保存

制作报告

选择制作报告 > 新建报告(可选择任意模板),拖拽组件到中间空白处完成报告的制作。
设置实时刷新。选择左上角报告设置 > 高级,勾选获取实时数据,刷新间隔设置为3s(根据实际业务情况自行选择),这样可以根据 MySQL 数据源间隔3s一次自动刷新报告。完成后,单击保存即可。
具体步骤可参见 基本操作

查看报告

单击查看报告,选择刚才保存的报告,可以动态展示报告。此报告只做演示使用,可参见 通用美化方法 优化报告。如下图所示,大屏中总共6个图表。
图表1:用户地区分布。表示观看直播客户在全国范围内的地区分布。
图表2:各级别会员人数。表示各个会员等级的总人数。
图表3:礼物类型总和。表示收到各礼物类型的总和。
图表4:最近6h礼物总数统计。表示最近6小时收到的礼物总计和。
图表5:刷礼物排行前10。表示刷礼物最多的10个客户。
图表6:在线人数。当天每个时间段进入直播间的人数。
?
?
?


http://www.vxiaotou.com