有奖捉虫:行业应用 & 管理与支持文档专题 HOT
文档中心 > 弹性 MapReduce > EMR 开发指南 > Spark 开发指南 > 通过 Spark Python 分析 COS 上的数据
?
本节主要是通过 Spark Python 来进行 wordcount 的工作。

开发准备

因为任务中需要访问腾讯云对象存储(COS),所以需要在 COS 中先 创建一个存储桶(Bucket)
确认您已开通腾讯云,并且创建了一个 EMR 集群。在创建 EMR 集群的时候需要在软件配置页面选择 Spark 组件,并且在基础配置页面开启对象存储的授权。

数据准备

需要处理的文件需要事先上传到 COS 中。如果文件在本地那么就可以通过 COS 控制台直接上传。如果文件在 EMR 集群上,可以使用 Hadoop 命令上传。指令如下:
[hadoop@10 hadoop]$ hadoop fs -put $testfile cosn://$bucketname/
其中 $testfile 为要统计的文件的完整路径加名字,$bucketname 为您的存储桶名。上传完成后可以查看文件是否已经在 COS 中。

运行样例

首先需要登录 EMR 集群中的任意机器,最好是登录到 Master 节点。登录 EMR 的方式请参考 登录 Linux 实例。这里我们可以选择使用 WebShell 登录。单击对应云服务器右侧的登录,进入登录界面,用户名默认为 root,密码为创建 EMR 时用户自己输入的密码。输入正确后,即可进入命令行界面。
在 EMR 命令行先使用以下指令切换到 Hadoop 用户,并进入 Spark 安装目录/usr/local/service/spark
[root@172 ~]# su hadoop
[hadoop@172 root]$ cd /usr/local/service/spark
新建一个 Python 文件 wordcount.py,并添加如下代码:
from __future__ import print_function
?
import sys
from operator import add
from pyspark.sql import SparkSession
?
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: wordcount <file>", file=sys.stderr)
exit(-1)
?
spark = SparkSession\\
.builder\\
.appName("PythonWordCount")\\
.getOrCreate()
?
sc = spark.sparkContext
?
lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \\
.map(lambda x: (x, 1)) \\
.reduceByKey(add)
?
output = counts.collect()
counts.saveAsTextFile(sys.argv[2])
?
spark.stop()
通过如下指令提交任务:
[hadoop@10 spark]$ ./bin/spark-submit --master yarn ./wordcount.py
cosn://$bucketname/$yourtestfile cosn://$bucketname/$output
其中 $bucketname 为您的 COS 存储桶名,$yourtestfile 为您的测试文件在存储桶中的完整路径加名字。$output 为您的输出文件夹。$output 为一个未创建的文件夹,如果执行指令前该文件夹已经存在,会导致程序运行失败
成功后程序自动运行,可以在目标存储桶中查看到输出文件:
[hadoop@172 spark]$ hadoop fs -ls cosn://$bucketname/$output
Found 2 items
-rw-rw-rw- 1 hadoop Hadoop 0 2018-06-29 15:35 cosn://$bucketname/$output/_SUCCESS
-rw-rw-rw- 1 hadoop Hadoop 2102 2018-06-29 15:34 cosn://$bucketname/$output/part-00000
最后的结果也可以通过如下指令查看:
[hadoop@172 spark]$ hadoop fs -cat cosn://$bucketname/$output /part-00000
(u'', 27)
(u'code', 1)
(u'both', 1)
(u'Hadoop', 1)
(u'Bureau', 1)
(u'Department', 1)
同样可以把结果输出到 HDFS 中,只需要更改指令中的输出位置即可,如下所示:
[hadoop@10spark]$ ./bin/spark-submit ./wordcount.py
cosn://$bucketname/$yourtestfile/user/hadoop/$output
其中/user/hadoop/为 HDFS 中的路径,如果不存在用户可以自己创建。
任务结束后,可以通过如下命令看到 Spark 运行日志:
[hadoop@10 spark]$ ?/usr/local/service/hadoop/bin/yarn logs -applicationId $yourId
其中 $yourId 应该替代为您的任务 ID。任务 ID 可以在 YARN 的 WebUI 上面进行查看。


http://www.vxiaotou.com