2243的竖式怎么列,3153的竖式怎么列

首页 > 实用技巧 > 作者:YD1662023-10-29 01:10:45


1. spark 介绍

由加州大学伯克利分校AMP实验室 开发的大数据计算引擎,以RDD (弹性分布式数据集)为最小数据单,基于内存的列式数据结构DataFrame,计算速度较快。spark 专注于大数据一栈式服务生态,包含sparkCore/sparkSql/sparkStreaming/sparkMllib 等。

2. 测试环境架构

Linux 7.0 zk3.4 集群 Kafka 2.11 集群 scala 2.12 jdk 1.8 spark 2.1.0 集群

3. 环境搭建步骤

特别说明:

· scala2.12 版本需要jdk 1.8 环境支持:

· spark2.0 及以上版本默认配置scala2.1l

· Springboot 1.5 基于的SpringFrameWork4.3 不支持jdk8 ,因此只能用kafka 0.11 版本,即Spring-kafka 最高为1.37版

· SpringBoot 2.0 基于的SpringFrameWorld5.0 支持jdk8, 因此可以使用kafka 2.0 及以上版本

首先修改host:

1) 安装scala(与jdk 安装方式一样),以192.168.52.40 服务器为例。

a. 上传安装包到服务器路径下解压,例如 /opt

b.设置环境变量 SCALA_HOME, vim /etc/profile

使配置生效 source /etc/profile

测试安装结果:

分别在192.168.52.35 和192.168.52.41 服务器按照相同的方法安装scala

2) 安装spark (前提是需要先安装hadoop、jdk、scala)

(Hadoop 集群之前已经发出过安装说明)

根据架构配置,40 服务器为master,35 和41 服务器都当做worker计算节点使用,以40服务器安装为例。

A. 传压缩包到 服务器路径下并且解压:

B. 修改环境变量

source /etc/profile

C. 修改spark配置环境

cd /opt/spark/conf

//复制一份配置模板

cp spark-env.sh.template spark-env.sh

//做如下修改,vim spark-env.sh

SCALA_HOME

java_HOME

HADOOP_CONF_DIR //hadoop 配置文件路径

SPARK_MASTER_IP //master 地址

SPARK_WORKER_MEMORY //每个work节点所能够分配到的内存

D. 配置slave(worker)

cp slaves.template slaves

vim slaves //此处严格地讲是不应该将master 也当做worker的,40 服务器已经装太多东东了

E. 在35 和41 服务器以相同的方式安装spark

(或者 将scp /opt/spark 到35 和41 服务器,但是确保scala、jdk和hadoop的安装路径相同,否则需要单独修改)

F. 启动spark 集群

cd /opt/spark/sbin

./start-all.sh

写个测试用例试试:

Spark 集群环境搭建完成。

3) 安装ZK集群,以40服务器安装为例

上传安装包并且解压

修改配置文件

cd ./conf

cp zoo_sample.cfg zoo.cfg

vim zoo.cfg

在dataDir 文件路径下创建myid文件

touch myid

vim myid

为了在集群环境下,各机器之间的识别标识,唯一

将40服务器的ZK 分发到其他服务器,分别修改myid 文件为2/3,然后分别启动ZKServer

./zkServer.sh

// 查看节点状态

./zkServer.sh status

可以看出41 的ZK是leader,另两台是follower

4) 安装kafka集群

上传安装包并且进行解压

修改配置文件

cd ./config

vim server.properties

配置完成后,分发到35 和41 服务器,并且分别修改broker.id为1 和2 ,并且将listeners 改为对应服务器地址即可。

以守护进程方式 启动三台服务器的kafka

./kafka-server-start.sh -daemon ../config/server.properties

测试:

测试成功,kafka 集群安装完成。

至此环境已经搭建完成,接下来从开发层面demo展示

4. Spark 项目Demo开发

本地开发调试也需要安装scala、jdk环境,方式同上

应用场景

A. 读取静态数据

通过SparkSql 从静态数据源(RDDs、csv、json、hive、jdbc 数据源等)读取数据进行处理。

创建测试文件sparkSql.txt(json格式)

{"chuangwei":"H44ddddddddddddddddd01","pv":13}

{"chuangwei":"H4402","pv":11}

{"chuangwei":"H4401","pv":12}

{"chuangwei":"H4403","pv":10}

{"chuangwei":"H4405","pv":13}

{"chuangwei":"H4401","pv":140}

SparkSQL 分为SparkContext 和 HiveContext(继承自SparkContext),可以通过SparkSession创建

B. 流式数据处理

从kafka、flume等数据源读取数据进行实时分析

· Kafka与SpringBoot整合

由于Springboot不同的版本支持不同的jdk,因此需要不同版本的kafka 支持

Springboot 1.5 最高只能使用1.37 版本的Spring-kafka

Springboot2.0 可以使用2.0 以上版本的Spring-kafka

在 SpringbootApplication.yml 主配置文件关于kafak 配置如下

引入依赖包

模拟生产者发出消息(Spring 容器启动后,注入Bean成功后将每个1 s 发出消息)

模拟消费者接受消息

· 通过SparkStreaming 实时读取流数据

由于 idea 编辑器对scala 函数式——面向对象编程有更加友好的支持,因此测试过程中spark 项目都是idea 编辑

关于idea 的使用不做赘述,针对spark 项目开发只需要安装插件scala 即可

添加依赖:

<properties> <spark.version>2.3.2</spark.version> <scala.version>2.11</scala.version></properties><dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.3.2</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-mllib_${scala.version}</artifactId> <version>${spark.version}</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.3.0</version> </dependency></dependencies>

测试案例

package com.fengmang.statimport org.apache.kafka.common.serialization.StringDeserializerimport org.apache.spark.SparkConfimport org.apache.spark.sql.SparkSessionimport org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}import org.apache.spark.streaming.{Seconds, StreamingContext}import org.joda.time.DateTimeobject StatKafkaStreaming { def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setAppName("statKafkaStreaming").setMaster("local[2]"); val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate(); val sc = sparkSession.sparkContext; sc.setLogLevel("WARN") //设置日志级别 val kafkaParams = Map( "bootstrap.servers" -> "192.168.52.40:9092,192.168.52.35:9092,192.168.52.41:9092", //brokers "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "spark_group" //kafka 消费者group.id ) val streamingContext = new StreamingContext(sc,Seconds(10)); //实时流数据对象,每10s 从kafka 读取一次数据 val topic = Array("stat") //主题 val inputDStream = KafkaUtils.createDirectStream[String, String]( //读取数据流 streamingContext, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topic, kafkaParams) ) val date = new DateTime()// inputDStream.foreachRDD(_.foreach(println(_))) //输出数据 inputDStream.foreachRDD(_.foreach(reccord => println( date ":" reccord.value()))) //输出数据,与上面函数表达式功能相同 streamingContext.start() //开启实时数据流任务 streamingContext.awaitTermination() }}

同时开启上面springboot 项目的消息生产者和StatKafkaStreaming 流数据读取任务

5. 问题点

1) 异常问题一:调试Spark遇到的问题

spark 程序项目打包执行时,出现scala 版本与spark 不匹配所致

Exception in thread "main" java.lang.BootstrapMethodError: java.lang.NoClassDefFoundError: scala/runtime/java8/JFunction1$mcII$sp

查看当前运行版本spark 所支持的scala 版本,在spark 安装目录下查看jar 包中的scala 包版本,然后替换成相对应的版本

2) 异常问题二

component.AbstractLifeCycle: FAILED ServerConnector@4a9cc6cb{HTTP/1.1}{0.0.0.0:4040}: java.net.BindException: Address already in use

java.net.BindException: Address already in use

每启动一个SparkContext 时sparkUI 就会默认使用4040 端口,当其被占用(即已经开启了SaprkCotext),新开启的SparkContext 会尝试连接4041端口

3) SparkContext 冲突的问题

一个JVM 中默认只运行一个SparkContext,SparkContext 可以通过new StreamingContext(sc,Seconds(10)) 获取

但是通过new StreamingContext(sparkConf.Seconds(10)) 就会冲突

该报错是因为创建了多个sparkContext, 一般是因为在创建StreamingContext的时候使用了SparkContext而非SparkConf,如下:

val sc = new SparkContext(new SparkConf()) val ssc = new StreamingContext(sc,Minutes(5))

Exception in thread "main" org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at:

4) DataFrame 操作时内存溢出

dataFrame.take()

dataFrame.takeAsList(n) //获取前n 行并且以list 的方式展示

take 和 takeAsList 会将获取的数据返回到driver端,因此需要特别注意使用这两个方法时,返回的数据量,避免OOM 内存溢出

5) 引入SparkSession 下的隐式函数失败

val sparkSession= SparkSession.builder.appName("SparkSqlTest").master("local").getOrCreate()

//需要引入当前sparkSession 下的隐式函数,否则 $ 将被当做不可识别符号

import sparkSession.implicits._

dataFrame.select($"weight" 10)

6) 远程服务无法访问producer和consumer,异常报连接超时和

Marking the coordinator dead for group(Kafka)

问题在于服务器没有配置kafka 服务器需要监听的端口号,如果服务是在本地运行可不用配置,会使用localhost默认地址。

如是远程访问得必须配置 listeners=PLAINTEXT:// 192.168.1.1:9092

7) Kafka 与springboot 版本兼容问题(上文已经提到)

栏目热文

文档排行

本站推荐

Copyright © 2018 - 2021 www.yd166.com., All Rights Reserved.