1. spark 介绍
由加州大学伯克利分校AMP实验室 开发的大数据计算引擎,以RDD (弹性分布式数据集)为最小数据单,基于内存的列式数据结构DataFrame,计算速度较快。spark 专注于大数据一栈式服务生态,包含sparkCore/sparkSql/sparkStreaming/sparkMllib 等。
2. 测试环境架构Linux 7.0 zk3.4 集群 Kafka 2.11 集群 scala 2.12 jdk 1.8 spark 2.1.0 集群
3. 环境搭建步骤特别说明:
· scala2.12 版本需要jdk 1.8 环境支持:
· spark2.0 及以上版本默认配置scala2.1l
· Springboot 1.5 基于的SpringFrameWork4.3 不支持jdk8 ,因此只能用kafka 0.11 版本,即Spring-kafka 最高为1.37版
· SpringBoot 2.0 基于的SpringFrameWorld5.0 支持jdk8, 因此可以使用kafka 2.0 及以上版本
首先修改host:
1) 安装scala(与jdk 安装方式一样),以192.168.52.40 服务器为例。
a. 上传安装包到服务器路径下解压,例如 /opt
b.设置环境变量 SCALA_HOME, vim /etc/profile
使配置生效 source /etc/profile
测试安装结果:
分别在192.168.52.35 和192.168.52.41 服务器按照相同的方法安装scala
2) 安装spark (前提是需要先安装hadoop、jdk、scala)
(Hadoop 集群之前已经发出过安装说明)
根据架构配置,40 服务器为master,35 和41 服务器都当做worker计算节点使用,以40服务器安装为例。
A. 传压缩包到 服务器路径下并且解压:
B. 修改环境变量
source /etc/profile
C. 修改spark配置环境
cd /opt/spark/conf
//复制一份配置模板
cp spark-env.sh.template spark-env.sh
//做如下修改,vim spark-env.sh
SCALA_HOME
java_HOME
HADOOP_CONF_DIR //hadoop 配置文件路径
SPARK_MASTER_IP //master 地址
SPARK_WORKER_MEMORY //每个work节点所能够分配到的内存
D. 配置slave(worker)
cp slaves.template slaves
vim slaves //此处严格地讲是不应该将master 也当做worker的,40 服务器已经装太多东东了
E. 在35 和41 服务器以相同的方式安装spark
(或者 将scp /opt/spark 到35 和41 服务器,但是确保scala、jdk和hadoop的安装路径相同,否则需要单独修改)
F. 启动spark 集群
cd /opt/spark/sbin
./start-all.sh
写个测试用例试试:
Spark 集群环境搭建完成。
3) 安装ZK集群,以40服务器安装为例
上传安装包并且解压
修改配置文件
cd ./conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
在dataDir 文件路径下创建myid文件
touch myid
vim myid
为了在集群环境下,各机器之间的识别标识,唯一
将40服务器的ZK 分发到其他服务器,分别修改myid 文件为2/3,然后分别启动ZKServer
./zkServer.sh
// 查看节点状态
./zkServer.sh status
可以看出41 的ZK是leader,另两台是follower
4) 安装kafka集群
上传安装包并且进行解压
修改配置文件
cd ./config
vim server.properties
配置完成后,分发到35 和41 服务器,并且分别修改broker.id为1 和2 ,并且将listeners 改为对应服务器地址即可。
以守护进程方式 启动三台服务器的kafka
./kafka-server-start.sh -daemon ../config/server.properties
测试:
测试成功,kafka 集群安装完成。
至此环境已经搭建完成,接下来从开发层面demo展示
4. Spark 项目Demo开发本地开发调试也需要安装scala、jdk环境,方式同上
应用场景
A. 读取静态数据
通过SparkSql 从静态数据源(RDDs、csv、json、hive、jdbc 数据源等)读取数据进行处理。
创建测试文件sparkSql.txt(json格式)
{"chuangwei":"H44ddddddddddddddddd01","pv":13}
{"chuangwei":"H4402","pv":11}
{"chuangwei":"H4401","pv":12}
{"chuangwei":"H4403","pv":10}
{"chuangwei":"H4405","pv":13}
{"chuangwei":"H4401","pv":140}
SparkSQL 分为SparkContext 和 HiveContext(继承自SparkContext),可以通过SparkSession创建
B. 流式数据处理
从kafka、flume等数据源读取数据进行实时分析
· Kafka与SpringBoot整合
由于Springboot不同的版本支持不同的jdk,因此需要不同版本的kafka 支持
Springboot 1.5 最高只能使用1.37 版本的Spring-kafka
Springboot2.0 可以使用2.0 以上版本的Spring-kafka
在 SpringbootApplication.yml 主配置文件关于kafak 配置如下
引入依赖包
模拟生产者发出消息(Spring 容器启动后,注入Bean成功后将每个1 s 发出消息)
模拟消费者接受消息
· 通过SparkStreaming 实时读取流数据
由于 idea 编辑器对scala 函数式——面向对象编程有更加友好的支持,因此测试过程中spark 项目都是idea 编辑
关于idea 的使用不做赘述,针对spark 项目开发只需要安装插件scala 即可
添加依赖:
<properties> <spark.version>2.3.2</spark.version> <scala.version>2.11</scala.version></properties><dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.0</version> </dependency></dependencies>
测试案例
package com.fengmang.statimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.joda.time.DateTimeobject StatKafkaStreaming { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("statKafkaStreaming").setMaster("local[2]"); val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); val sc = sparkSession.sparkContext; sc.setLogLevel("WARN") //设置日志级别 val kafkaParams = Map( "bootstrap.servers" -> "192.168.52.40:9092,192.168.52.35:9092,192.168.52.41:9092", //brokers "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "spark_group" //kafka 消费者group.id ) val streamingContext = new StreamingContext(sc,Seconds(10)); //实时流数据对象,每10s 从kafka 读取一次数据 val topic = Array("stat") //主题 val inputDStream = KafkaUtils.createDirectStream[String, String]( //读取数据流 streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topic, kafkaParams) ) val date = new DateTime()// inputDStream.foreachRDD(_.foreach(println(_))) //输出数据 inputDStream.foreachRDD(_.foreach(reccord => println( date ":" reccord.value()))) //输出数据,与上面函数表达式功能相同 streamingContext.start() //开启实时数据流任务 streamingContext.awaitTermination() }}
同时开启上面springboot 项目的消息生产者和StatKafkaStreaming 流数据读取任务
5. 问题点1) 异常问题一:调试Spark遇到的问题
spark 程序项目打包执行时,出现scala 版本与spark 不匹配所致
Exception in thread "main" java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction1$mcII$sp
查看当前运行版本spark 所支持的scala 版本,在spark 安装目录下查看jar 包中的scala 包版本,然后替换成相对应的版本
2) 异常问题二
component.AbstractLifeCycle: FAILED ServerConnector@4a9cc6cb{HTTP/1.1}{0.0.0.0:4040}: java.net.BindException: Address already in use
java.net.BindException: Address already in use
每启动一个SparkContext 时sparkUI 就会默认使用4040 端口,当其被占用(即已经开启了SaprkCotext),新开启的SparkContext 会尝试连接4041端口
3) SparkContext 冲突的问题
一个JVM 中默认只运行一个SparkContext,SparkContext 可以通过new StreamingContext(sc,Seconds(10)) 获取
但是通过new StreamingContext(sparkConf.Seconds(10)) 就会冲突
该报错是因为创建了多个sparkContext, 一般是因为在创建StreamingContext的时候使用了SparkContext而非SparkConf,如下:
val sc = new SparkContext(new SparkConf()) val ssc = new StreamingContext(sc,Minutes(5))
Exception in thread "main" org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:
4) DataFrame 操作时内存溢出
dataFrame.take()
dataFrame.takeAsList(n) //获取前n 行并且以list 的方式展示
take 和 takeAsList 会将获取的数据返回到driver端,因此需要特别注意使用这两个方法时,返回的数据量,避免OOM 内存溢出
5) 引入SparkSession 下的隐式函数失败
val sparkSession= SparkSession.builder.appName("SparkSqlTest").master("local").getOrCreate()
//需要引入当前sparkSession 下的隐式函数,否则 $ 将被当做不可识别符号
import sparkSession.implicits._
dataFrame.select($"weight" 10)
6) 远程服务无法访问producer和consumer,异常报连接超时和
Marking the coordinator dead for group(Kafka)
问题在于服务器没有配置kafka 服务器需要监听的端口号,如果服务是在本地运行可不用配置,会使用localhost默认地址。
如是远程访问得必须配置 listeners=PLAINTEXT:// 192.168.1.1:9092
7) Kafka 与springboot 版本兼容问题(上文已经提到)
。