基于 Spark 的数据分析实践

  • 时间:
  • 浏览:8
  • 来源:技术爱好者_提供腾讯爱好者技术_QQ技术网资讯

转载本文需注明出处:微信公众号EAWorld,违者必究。

引言:

Spark是在借鉴了MapReduce之上发展而来的,继承了其分布式并行计算的优点并改进了MapReduce明显的严重不足。Spark主要带有了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX等组件。

本文主要分析了 Spark RDD 以及 RDD 作为开发的严重不足之处,介绍了 SparkSQL 对已有的常见数据系统的操作最好的妙招,以及重点介绍了普元在众多数据开发项目中总结的基于 SparkSQL Flow 开发框架。

目录:

一、Spark RDD

二、基于Spark RDD数据开发的严重不足

三、SparkSQL

四、SparkSQL Flow一、Spark RDD

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一有5个不可变、可分区、元素可并行计算的集合。

RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。

//Scala 在内存中使用列表创建

val lines = List(“A”, “B”, “C”, “D” …)

val rdd:RDD = sc.parallelize(lines);

//以文本文件创建

val rdd:RDD[String] = sc.textFile(“hdfs://path/filename”)

Spark RDD Partition 分区划分

新版本的 Hadoop 机会把 BlockSize 改为 128M,也却说说每个分区出理 的数据量更大。

Spark 读取文件分区的核心原理

本质上,Spark 是利用了 Hadoop 的底层对数据进行分区的 API(InputFormat):

public abstract class InputFormat{

public abstract List getSplits(JobContextcontext

) throwsIOException,InterruptedException;

public abstract RecordReader createRecordReader(InputSplitsplit,

TaskAttemptContextcontext

)throwsIOException,InterruptedException;

}

Spark 任务提交后通过对输入进行 Split,在 RDD 构造阶段,却说判断是否是可 Split(机会参数异常一定在此阶段报出异常),很多 Split 后每个 InputSplit 回会一有5个分区。可不都都能不能都都能不能 在Action 算子提交后,才真正用 getSplits 返回的 InputSplit 通过 createRecordReader 获得每个 Partition 的连接。

很多通过 RecordReader 的 next() 遍历分区内的数据。

Spark RDD 转换函数和提交函数

Spark RDD 的众多函数可分为两大类Transformation 与 Action。Transformation 与 Action 的区别在于,对 RDD 进行 Transformation 不不说会触发计算:Transformation 最好的妙招所产生的 RDD 对象只会记录住该 RDD 所依赖的 RDD 以及计算产生该 RDD 的数据的最好的妙招;可不都都能不能都都能不能 在用户进行 Action 操作时,Spark 才会调度 RDD 计算任务,依次为各个 RDD 计算数据。这却说 Spark RDD 内函数的“懒加载”特征。二、基于Spark RDD数据开发的严重不足

机会MapReduce的shuffle过程需写磁盘,比较影响性能;而Spark利用RDD技术,计算在内存中流式进行。另外 MapReduce计算框架(API)比较局限, 使用时需关注的参数众多,而Spark则是上方结果自动推断,通过对数据集上链式执行函数具备一定的灵活性。

即使 SparkRDD 相对于 MapReduce 提高很大的便利性,但在使用上仍然有很多问题图片。体现在一下几条方面:

  1. RDD 函数众多,开发者不容易掌握,每种函数使用不当 shuffle时造成数据倾斜影响性能;
  2. RDD 关注点仍然是Spark太底层的 API,基于 Spark RDD的开发是基于特定语言(Scala,Python,Java)的函数开发,无法以数据的视界来开发数据;
  3. 对 RDD 转换算子函数内每种常量、变量、广播变量使用不当,会造成不可控的异常;
  4. 对多种数据开发,需每各人开发RDD的转换,样板代码较多,无法有效重利用;
  5. 其它在运行期机会占据 的异常。如:对象无法序列化等运行期都都能不能发现的异常。

三、SparkSQL

Spark 从 1.3 版本刚开始原有 SchemaRDD 的基础上提供了相似 Pandas DataFrame API。新的DataFrame API不仅可不都都能不能大幅度降低普通开发者的学习门槛,同去还支持Scala、Java与Python一种 语言。更重要的是,机会脱胎自SchemaRDD,DataFrame天然植物适用于分布式大数据场景。

一般的数据出理 步骤:读入数据 -> 对数据进行出理 -> 分析结果  -> 写入结果

SparkSQL 特征化数据

  • 出理 特征化数据(如 CSV,JSON,Parquet 等);
  • 把机会特征化数据抽象成 DataFrame (HiveTable);
  • 非特征化数据通过 RDD.map.filter 转加带特征化进行出理 ;
  • 按照列式数据库,只加载非特征化中可特征化的每种列(Hbase,MongoDB);

出理 非特征化数据,可不都都能不能 简单的用 DataFrame 装载。却说要用 SparkRDD 把数据读入,在通过一系列的 Transformer Method 把非特征化的数据加工为特征化,机会过滤到不合法的数据。

SparkSQL DataFrame

SparkSQL 中一切回会 DataFrame,all in DataFrame. DataFrame是一种 以RDD为基础的分布式数据集,相似 于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。机会熟悉 Python Pandas 库中的 DataFrame 特征,则会对 SparkSQL DataFrame 概念非常熟悉。

TextFile DataFrame

import.org.apache.spark.sql._

//定义数据的列名称和类型

valdt=StructType(List(id:String,name:String,gender:String,age:Int))

//导入user_info.csv文件并指定分隔符

vallines = sc.textFile(“/path/user_info.csv”).map(_.split(“,”))

//将表特征和数据关联起来,把读入的数据user.csv映射成行,构成数据集

valrowRDD = lines.map(x=>Row(x(0),x(1),x(2),x(3).toInt))

//通过SparkSession.createDataFrame()创建表,很多数据表表头

val df= spark.createDataFrame(rowRDD, dt)

读取规则数据文件作为DataFrame

SparkSession.Builder builder = SparkSession.builder()

Builder.setMaster(“local”).setAppName(“TestSparkSQLApp”)

SparkSession spark = builder.getOrCreate();

SQLContext sqlContext = spark.sqlContext();

# 读取 JSON 数据,path 可为文件机会目录

valdf=sqlContext.read().json(path);

# 读取 HadoopParquet 文件

vardf=sqlContext.read().parquet(path);

# 读取 HadoopORC 文件

vardf=sqlContext.read().orc(path);

JSON 文件为每行一有5个 JSON 对象的文件类型,行尾不不说逗号。文件头很多用说[]指定为数组;SparkSQL 读取是却说按照每行一根绳子 JSON Record序列化;

Parquet文件

Configurationconfig = new Configuration();

ParquetFileReaderreader = ParquetFileReader.open(

HadoopInputFile.fromPath(new Path(“hdfs:///path/file.parquet”),conf));

Mapschema = reader.getFileMetaData().getKeyValueMetaData();

String allFields= schema.get(“org.apache.spark.sql.parquet.row.metadata”);

allFiedls 的值却说各字段的名称和具体的类型,整体是一有5个json格式进行展示。

读取 Hive 表作为 DataFrame

Spark2 API 推荐通过 SparkSession.Builder 的 Builder 模式创建 SparkContext。 Builder.getOrCreate() 用于创建 SparkSession,SparkSession 是 SparkContext 的封装。

在Spark1.6饱带有5个核心组件SQLcontext和HiveContext。SQLContext 用于出理 在 SparkSQL 中动态注册的表,HiveContext 用于出理 Hive 中的表。

从Spark2.0以上的版本刚开始,spark是使用全新的SparkSession接口代替Spark1.6中的SQLcontext和HiveContext。SQLContext.sql 即可执行 Hive 中的表,也可执行组织组织结构注册的表;

在时需执行 Hive 表时,只时需在 SparkSession.Builder 中开启 Hive 支持即可(enableHiveSupport())。

SparkSession.Builder builder = SparkSession.builder().enableHiveSupport();

SparkSession spark = builder.getOrCreate();

SQLContext sqlContext = spark.sqlContext();

// db 指 Hive 库中的数据库名,机会不写默认为 default

// tableName 指 hive 库的数据表名

sqlContext.sql(“select * from db.tableName”)

SparkSQL ThriftServer

//首先打开 Hive 的 Metastore服务

hive$bin/hive –-service metastore –p 3093

//把 Spark 的相关 jar 上传到hadoophdfs指定目录,用于指定sparkonyarn的依赖 jar

spark$hadoop fs –put jars/*.jar /lib/spark2

// 启动 spark thriftserver 服务

spark$ sbin/start-thriftserver.sh –master yarn-client –driver-memory 1G –conf

spark.yarn.jars=hdfs:///lib/spark2/*.jar

当hdfs 上传了spark 依赖 jar 时,通过spark.yarn.jars 可看完日志 spark 不不说每个job 都上传jar,可节省启动时间

19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.0.5.jar

19/06/1114:08:26 INFO Client: Source and destination file systems are the same. Notcopying hdfs://localhost:9000/lib/spark2/snappy-java-1.1.7.3.jar

//通过 spark bin 下的 beeline 工具,可不都都能不能连接到 spark ThriftServer(SparkOnHive)

bin/beeline -u jdbc:hive2://ip:300/default -n hadoop

-u 是指定 beeline 的执行驱动地址;

-n 是指定登陆到 spark Session 上的用户名称;

Beeline 还支持传入-e 可传入一行 SQL, 

-e <query>                      query that should be executed

也可通过 –f 指定一有5个 SQL File,组织组织结构可用逗号分隔的多个 SQL(存储过程)

-f <exec file>                  script file that should be executed

SparkSQL Beeline 的执行效果展示

SparkSQL ThriftServer

对于 SparkSQL ThriftServer 服务,每个登陆的用户回会创建的 SparkSession,很多执行的对个 SQL 会通过时间顺序列表展示。

SparkSQL ThriftServer 服务可用于很多支持的数据库工具创建查询,也用于第三方的 BI 工具,如 tableau。四、SparkSQL Flow

SparkSQL Flow 是以 SparkSQL 为基础,开发的统一的基于 XML 配置化的可执行一连串的 SQL 操作,你这名 连串的 SQL 操作定义为一有5个 Flow。下文刚开始 SparkSQL Flow 的介绍:

SparkSQL Flow 是基于 SparkSQL 开发的一种 基于 XML 配置化的 SQL 数据流转出理 模型。该模型复杂化了 SparkSQL 、Spark RDD的开发,很多降低开发了难度,适合了解数据业务但无法驾驭大数据以及 Spark 技术的开发者。

  • 一有5个由普元技术部提供的基于 SparkSQL 的开发模型;
  • 一有5个可二次定制开发的大数据开发框架,提供了灵活的可扩展 API;
  • 一有5个提供了 对文件,数据库,NoSQL 等统一的数据开发视界语义;
  • 基于 SQL 的开发语言和 XML 的模板配置,支持 Spark UDF 的扩展管理;
  • 支持基于 Spark Standlone,Yarn,Mesos 资源管理平台;
  • 支持开源、华为、星环等平台统一认证。

SparkSQL Flow 适合的场景:

  1. 批量 ETL;
  2. 非实八时析服务;

SparkSQL Flow XML 概览

  1. Properties 内定义一组变量,可用于宏替换;
  2. Methods 内可注册 udf 和 udaf 一种 函数;
  3. Prepare 内可定义前置 SQL,用于执行 source 前的 sql 操作;
  4. Sources 内定义一有5个到多个数据表视图;
  5. Transformer 内可定义 0 到多个基于 SQL 的数据转换操作(支持 join);
  6. Targets 用于定义 1 到多个数据输出;
  7. After 可定义 0到多个任务日志;

如你所见,source 的 type 参数用于区分 source 的类型,source 支持的种类直接决定SparkSQL Flow 的数据源加载广度;很多,根据 type 不同,source 也时需配置不同的参数,如数据库还时需 driver,url,user和 password 参数。

Transformer 是基于 source 定的数据视图可执行的一组转换 SQL,该 SQL 符合 SparkSQL 的语法(SQL99)。Transform 的 SQL 的执行结果被作为上方表命名为 table_name 指定的值。

Targets 为定义输出,table_name 的值需在 source 机会 Transformer 中定义。

SparkSQL Flow 支持的Sourse

  • 支持从 Hive 获得数据;
  • 支持文件:JSON,TextFile(CSV),ParquetFile,AvroFile

  • 支持RDBMS数据库:PostgreSQL, MySQL,Oracle

  • 支持 NOSQL 数据库:Hbase,MongoDB

SparkSQL Flow TextFile Source

textfile 为读取文本文件,把文本文件每行按照 delimiter 指定的字符进行切分,切分严重不足的列使用 null 填充。

  1. Tablename 为该文件映射的数据表名,可理解为数据的视图;
  2. Fields 为切分后的字段,使用逗号分隔,字段后可紧跟该字段的类型,使用冒号分隔;
  3. Delimiter 为每行的分隔符;
  4. Path 用于指定文件地址,可不都都能不能是文件,也另一有5个文件夹;
  5. Path 指定地址时需使用协议,如:file:// 、 hdfs://,很多跟 core-site.xml 配置密切相关;

SparkSQL Flow DB Source

RDBMS 是从数据库使用 JDBC读取 数据集。支持 type 为:db、mysql、oracle、postgres、mssql;

  1. tablename 为该数据表的抽象 table 名称(视图);
  2. url、driver、user,password 为数据库 JDBC 驱动信息,为时需字段;
  3. SparkSQL 会加载该表的全表数据,无法使用 where 条件。

SparkSQL Flow Transformer

SELECT c_phone,c_type,c_num, CONCAT_VAL(cust_id) as cust_ids

FROM user_concat_testx

group by c_phone,c_type,c_num

Transform 支持 cached 属性,默认为 false;机会设置为 true,相当于把该结果缓存到内存中,缓存到内存中的数据在后续其它 Transform 中使用能提高计算下行速率 。很多需使用少许内存,开发者时需评估该数据集可不都都能不能放上去内存中,出理 出现 OutofMemory 的异常。

SparkSQL Flow Targets

SparkSQL Flow Targets 支持输出数据到一有5个机会多个目标。那此目标,基本覆盖了 Source 带有的组织组织结构系统。下面以 Hive 举例说明:

  1. table_name 为 source 机会 Transform 定义的表名称;
  2. target_table_name 为 hive 中的表结果,Hive 表可不占据 也可占据 ,sparksql 会根据 DataFrame 的数据类型自动创建表;
  3. savemode 默认为 overwrite 覆盖写入,当写入目标已占据 时删除源表再写入;支持 append 模式, 可增量写入。

Target 有一有5个特殊的 show 类型的 target。用于直接在控制台输出一有5个 DataFrame 的结果到控制台(print),该 target 用于开发和测试。

Rows 用于控制输出几条行数据。

SparkSQL Around

After 用于 Flow 在运行刚开始后执行的一有5个环绕,用于记录日志和写入情況。相似 Java 的 try {} finally{ round.execute() }

多个 round 回会执行,round 异常不不原因分析分析任务失败。 



SparkSQL Around的执行效果

Prepare round 可做插入(insert)动作,after round 可做更新 (update)动作,相当于在数据库表中从执行刚开始到刚开始有了全部的日志记录。SparkSQL Flow 会保证round 一定能被执行,很多 round 的执行不影响任务的情況。

SparkSQL Flow 提交

bin/spark-submit –master yarn-client –driver-memory 1G \

–num-executors 10 –executor-memory 2G \

–jars /lib/jsoup-1.11.3.jarlib/jsqlparser-0.9.6.jar,/lib/mysql-connector-java-5.1.46.jar \

–conf spark.yarn.jars=hdfs:///lib/spark2/*.jar \

–queue default –name FlowTest \

etl-flow-0.2.0.jar -f hive-flow-test.xml

接收时需的参数 –f,可选的参数为支持 Kerberos 认证的租户名称principal,和其认证时需的密钥文件。

usage: spark-submit –jars etl-flow.jar –class

com.yiidata.etl.flow.source.FlowRunner

-f,–xml-file Flow XML File Path

–keytabFile keytab File Path(Huawei)

–krb5File krb5 File Path(Huawei)

–principal principal for hadoop(Huawei)

SparkSQL Execution Plan

每个Spark Flow 任务本质上是一连串的 SparkSQL 操作,在 SparkUI SQL tab 可不都都上能不能看完 flow 中重要的数据表操作。

regiserDataFrameAsTable 是每个 source 和 Transform 的数据在 SparkSQL 中的数据视图,每个视图回会在 SparkContex 中注册一次。

对RegisterDataFrameAsTable的分析

通过单个 regiserDataFrameAsTable 项进行分析,SparkSQL 并回会把source 的数据立即计算把数据放上去内存,却说每次执行 source 时却说生成了一有5个 Logical Plan,可不都都能不能都都能不能 遇到时需提交的算子(Action),SparkSQL 才会触发前面所依赖的的 plan 执行。总结

这是一有5个开发框架,回会一有5个心智心智心智成熟图片 图片 的句子图片 图片 是什么期的句子是什么的产品,也回会一种 架构。他却说基于 SparkSQL 整合了大多数的组织组织结构系统,能通过 XML 的模板配置完成数据开发。面向的是理解数据业务但不了解 Spark 的数据开发人员。整个框架完成了大多数的组织组织结构系统对接,开发者只时需使用 type 获得数据,完成数据开发后通过 target 回写到目标系统中。整个过程基本不不说系统多多线程 开发,除非当前的 SQL 函数无法满足使用的情況下,时需自行开发一下特定的 UDF。很多本框架在对 SparkSQL 做了二次开发基础上,大大复杂化了 Spark 的开发,可降低了开发者使用难度。

精选提问:

问1:和Fink平台有那此优势么?

答:Flink 应该对标 Spark Streaming 的出理 方案,是另一种 可选流数据引擎。Flink 也采用了 Scala 语言,组织组织结构原理和操作数据最好的妙招颇有相似 之处,是 SparkStreaming 之外流数据出理 一种 选型。基于 SparkSQL Flow 的架构主要侧重批量数据分析,非实时 ETL 方面。

问2:那此应该是源数据库吧,请问目标数据库支持那此?

答:目前的实现目标数据基本支持所有的源。

问3:亲戚亲戚亲戚朋友产品是软件开发平台,spark和亲戚亲戚亲戚朋友开发平台啥关系?

答:普元针对每种心智心智心智成熟图片 图片 的句子图片 图片 是什么期的句子是什么场景提供了很多开发平台和工具,也在参与了很多大数据项目建设。对于大规模数据的数据报表,数据质量分析也时需适应大数据的技术场景,Spark 作为Hadoop 内比较心智心智心智成熟图片 图片 的句子图片 图片 是什么期的句子是什么的出理 方案,很多作为主要的选型工具。在参与每种项目实施过程中,通过对很多开发中的痛点针对性的提取了应用框架。

问4:对于ETL中占据 的merge、update的数据匹配、整合出理 ,Spark SQL Flow有这麼好的出理 最好的妙招?

答:merge 和 update 在数据开发过程不可出理 ,往往对数据库造成较大压力。大数据场景下不建议逐条对数据做 update 操作,更好的最好的妙招是在数据出理 阶段通过 join 把结果集在写入目标前准备好,统一一次性写入到目标数据库。查询操作通过换库使用新库,这中操作一般适合数据量比较大,数据更新频率较低的情況。机会目标库是 HBase 机会很多 MPP 类基于列式的数据库,适当的可不都都能不能更新。很多当每天有 30% 以上的数据都时需更新时,建议还是一次性生成新表。

问5: blink和flink 应该如可取舍?

答:blink 是阿里巴巴在 flink 基础上做了每种场景优化(却说每种社区有介绍,不不说明确)很多开源,很多考虑到国内那此机构开源往往是这麼持久动力的。要看采用 Blink 是否是用了比较关键的特征。回会消息说 Blink 和 Flink 会合并,毕竟阿里 Dubbo 前期买车人发展,后期还是捐给了 Apache,很多两者合并也是有机会。建议选型 Flink。

问6:etl 同步数据中主要用那此工具?

答:你这名 要区分场景。传统数据库之间,可采用日志同步,回会每种心智心智心智成熟图片 图片 的句子图片 图片 是什么期的句子是什么的工具;

传统数据库和Hadoop 生态内(HBase,HIVE) 同步可使用 apache sqoop。 SparkSQL Flow 可不都都能不能作为数据同步的另一种 方案,可用在实时性不高的场景。SparkSQL Flow 更侧重大数据工具,偏向数据分析和非实时 ETL。

关于作者:震秦,普元资深开发工程师,专注于大数据开发 8 年,擅长 Hadoop 生态内各工具的使用和优化。参与某公关广告(上市)公司DMP 建设,负责数据分层设计和批出理 ,调度实现,完成交付使用;参与国内多省市公安社交网络项目部署,负责产品开发(Spark 分析应用);参与数据清洗加工为我方主题库并部署上层应用。

关于EAWorld:微服务,DevOps,数据治理,移动架构原创技术分享。

本文由

EAWorld

发布在

ITPUB

,转载此文请保持文章全部性,并请附上文章来源(ITPUB)及本页链接。

原文链接:http://www.itpub.net/2019/06/20/2235/