新聞中心
隨著大數(shù)據(jù)時(shí)代的到來,數(shù)據(jù)處理與分析變得越來越重要。而其中,讀取數(shù)據(jù)庫中的數(shù)據(jù)也是一項(xiàng)非常重要的工作。在過去,讀取數(shù)據(jù)庫中的數(shù)據(jù)需要經(jīng)過繁瑣的準(zhǔn)備工作,比較耗時(shí)。而現(xiàn)在,我們可以通過使用Spark輕松讀取數(shù)據(jù)庫,節(jié)省時(shí)間和精力。

本文將會為大家介紹什么是Spark,如何使用Spark讀取數(shù)據(jù)庫以及優(yōu)秀的方法分享。
一、什么是Spark?
Spark是一個快速、通用的大數(shù)據(jù)處理引擎,可以支持包括Java、Scala、Python在內(nèi)的多種編程語言。Spark具有高速內(nèi)存計(jì)算和優(yōu)化引擎,能夠加速數(shù)據(jù)處理的速度。Spark是一種大數(shù)據(jù)處理框架,也是Hadoop生態(tài)系統(tǒng)中的一個重要組件。
二、如何使用Spark讀取數(shù)據(jù)庫?
使用Spark讀取數(shù)據(jù)庫時(shí),需要先進(jìn)行一些配置工作。需要在pom.xml中添加以下依賴:
“`
org.apache.spark
spark-sql_2.11
2.4.5
com.microsoft.sqlserver
mssql-jdbc
6.4.0.jre8
“`
這些依賴會將Spark SQL與Microsoft SQL Server JDBC驅(qū)動程序添加到項(xiàng)目中。
接下來,需要創(chuàng)建一個Java SparkSession對象。在這個對象中,將設(shè)置連接到數(shù)據(jù)庫的參數(shù)。需要設(shè)置數(shù)據(jù)庫的URL、用戶名和密碼。然后,在創(chuàng)建SparkSession對象時(shí),需要將這些參數(shù)傳遞給SparkConf。
示例代碼如下:
“`
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
public class SparkSQLJDBCDemo {
public static void loadSQLServerTable() {
//creating SparkSession
SparkSession sparkSession = SparkSession
.builder()
.appName(“Spark SQL JDBC example”)
.master(“l(fā)ocal[*]”)
.config(“spark.sql.warehouse.dir”, “file:///C:/temp”)
.getOrCreate();
//creating properties object
Properties properties = new Properties();
properties.setProperty(“user”, “username”);
properties.setProperty(“password”, “password”);
//reading table
Dataset jdbcDF = sparkSession.read()
.jdbc(“jdbc:sqlserver://localhost:1433;databaseName=mydatabase;”, “dbo.employeetable”, properties);
jdbcDF.show();
}
public static void mn(String[] args) {
loadSQLServerTable();
}
}
“`
在上述代碼中,我們首先創(chuàng)建了一個SparkSession對象,并設(shè)置參數(shù)。注意到我們已經(jīng)設(shè)置了用戶名、密碼,以及連接數(shù)據(jù)庫的URL。然后,我們通過SparkSession的read方法,具體地讀取了目標(biāo)數(shù)據(jù)庫中的表中的數(shù)據(jù)。在這里,我們讀取了員工表中的所有數(shù)據(jù)。我們通過show方法,將讀取的數(shù)據(jù)進(jìn)行展示。
三、優(yōu)秀的方法分享
1.使用Repartition
在讀取數(shù)據(jù)庫中的數(shù)據(jù)時(shí),一步到位將所有數(shù)據(jù)讀取出來往往會導(dǎo)致生成數(shù)據(jù)傾斜的問題。在這種情況下,我們可以使用Repartition方法,將數(shù)據(jù)按照某種規(guī)則重新劃分,避免數(shù)據(jù)傾斜的問題。示例代碼如下:
“`
Dataset jdbcDF = sparkSession.read()
.jdbc(“jdbc:sqlserver://localhost:1433;databaseName=mydatabase;”, “dbo.employeetable”, properties)
.repartition(10); //設(shè)置Repartition的數(shù)量為10
“`
2.使用Cache
如果需要經(jīng)常使用已讀取到的數(shù)據(jù),而且又希望查詢時(shí)速度更快,我們可以使用Cache方法將數(shù)據(jù)緩存起來。這樣,在后續(xù)的查詢過程中,就可以快速地讀取這些數(shù)據(jù)。示例代碼如下:
“`
Dataset jdbcDF = sparkSession.read()
.jdbc(“jdbc:sqlserver://localhost:1433;databaseName=mydatabase;”, “dbo.employeetable”, properties)
.cache(); //將讀取的數(shù)據(jù)進(jìn)行緩存
“`
3.使用PartitionBy和BucketBy
如果我們可以根據(jù)某種規(guī)則將數(shù)據(jù)進(jìn)行劃分,那么就可以使用PartitionBy或BucketBy方法。這兩種方法可以使查詢速度更快。PartitionBy方法可以將數(shù)據(jù)按照某個字段進(jìn)行分區(qū),而BucketBy方法可以將數(shù)據(jù)按照某個字段進(jìn)行分桶。示例代碼如下:
“`
Dataset jdbcDF = sparkSession.read()
.jdbc(“jdbc:sqlserver://localhost:1433;databaseName=mydatabase;”, “dbo.employeetable”, properties)
.repartition(10)
.partitionBy(“departmentid”); //按照部門ID進(jìn)行分區(qū)
“`
四、結(jié)論
在本文中,我們介紹了Spark的基本概念,以及如何使用Spark輕松讀取數(shù)據(jù)庫。我們還分享了一些優(yōu)秀的方法,包括Repartition、Cache、PartitionBy和BucketBy。
相關(guān)問題拓展閱讀:
- spark編程 mysql得不到數(shù)據(jù)
- spark讀mysql數(shù)據(jù)只出來了字段沒數(shù)據(jù)
spark編程 mysql得不到數(shù)據(jù)
這里說明一點(diǎn):本文提到的解決 Spark insertIntoJDBC找不到Mysql驅(qū)動的方法是針對單機(jī)模式(也就是local模式)。在集群環(huán)境下,下面的方法是不行的。這是因?yàn)樵诜植际江h(huán)境下,加載mysql驅(qū)動包存在一個Bug,1.3及以前的版本 –jars 分薯姿發(fā)的jar在executor端是通過 Spark自身特化的classloader加載的。而JDBC driver manager使用的則是系統(tǒng)默認(rèn)的classloader,因此無法識別??尚械姆椒ㄖ皇窃谒?executor 節(jié)點(diǎn)上預(yù)先裝好JDBC driver并放入默認(rèn)的classpath。
不過Spark 1.4應(yīng)該已經(jīng)fix了這個問題,即數(shù)高絕念汪 –jars 分發(fā)的 jar 也會納入 YARN 的 classloader 范疇。
今天在使用Spark中DataFrame往Mysql中插入RDD,但是一直報(bào)出以下的異常次信息:
$ bin/spark-submit –master local
–jars lib/mysql-connector-java-5.1.35.jar
–class spark.sparkToJDBC ./spark-test_2.10-1.0.jar
spark assembly has been built with Hive, including Datanucleus jars on classpath
Exception in thread “main” java.sql.SQLException: No suitable driver found for
jdbc:
true&characterEncoding=utf8&autoReconnect=true
at java.sql.DriverManager.getConnection(DriverManager.java:602)
at java.sql.DriverManager.getConnection(DriverManager.java:207)
at org.apache.spark.sql.DataFrame.createJDBCTable(DataFrame.scala:1189)
at spark.SparkToJDBC$.toMysqlFromJavaBean(SparkToJDBC.scala:20)
at spark.SparkToJDBC$.main(SparkToJDBC.scala:47)
at spark.SparkToJDBC.main(SparkToJDBC.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$
$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
感覺很奇怪,我在啟動作業(yè)的時(shí)候加了Mysql驅(qū)動啊在,怎么會出現(xiàn)這種異常呢??經(jīng)過查找,發(fā)現(xiàn)在–jars參數(shù)里面加入Mysql是沒有用的。通過查找,發(fā)現(xiàn)提交的作業(yè)可以通過加入–driver-class-path參數(shù)來設(shè)置driver的classpath,試了一下果然沒有出現(xiàn)錯誤!
$ bin/spark-submit –master local
–driver-class-path lib/mysql-connector-java-5.1.35.jar
–class spark.SparkToJDBC ./spark-test_2.10-1.0.jar
其實(shí),我們還可以在spark安裝包的conf/spark-env.sh通過配置SPARK_CLASSPATH來設(shè)置driver的環(huán)境變量,如下:
(這里需要注意的是,在Spark1.3版本中,在Spark配置中按如下進(jìn)行配置時(shí),運(yùn)行程序時(shí)會提示該配置方法在Spark1.0之后的版本已經(jīng)過時(shí),建議使用另外兩個方法;其中一個就是上面講到的方法。另外一個就是在配置文件中配置spark.executor.extraClassPath,具體配置格式會在試驗(yàn)之后進(jìn)行補(bǔ)充)
export SPARK_CLASSPATH=$SPARK_CLASSPATH:/iteblog/com/mysql-connector-java-5.1.35.jar
這樣也可以解決上面出現(xiàn)的異常。但是,我們不能同時(shí)在conf/spark-env.sh里面配置SPARK_CLASSPATH和提交作業(yè)加上–driver-class-path參數(shù),否則會出現(xiàn)以下異常:
查看源代碼打印幫助
$ bin/spark-submit –master local
–driver-class-path lib/mysql-connector-java-5.1.35.jar
–class spark.SparkToJDBC ./spark-test_2.10-1.0.jar
Spark assembly has been built with Hive, including Datanucleus jars on classpath
Exception in thread “main”org.apache.spark.SparkException:
Found both spark.driver.extraClassPath and SPARK_CLASSPATH. Use only the former.
at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply
$7.apply(SparkConf.scala:339)
at org.apache.spark.SparkConf$$anonfun$validateSettings$6$$anonfun$apply
$7.apply(SparkConf.scala:337)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:337)
at org.apache.spark.SparkConf$$anonfun$validateSettings$6.apply(SparkConf.scala:325)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.SparkConf.validateSettings(SparkConf.scala:325)
at org.apache.spark.SparkContext.(SparkContext.scala:197)
at spark.SparkToJDBC$.main(SparkToJDBC.scala:41)
at spark.SparkToJDBC.main(SparkToJDBC.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$
deploy$SparkSubmit$$runMain(SparkSubmit.scala:569)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:166)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:189)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:110)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
“這里說明一點(diǎn):本文提到的解決 Spark insertIntoJDBC找不到Mysql驅(qū)動的方法是針對單機(jī)模式(也就是local模式)。在集群環(huán)境下,下面的方法是不行的。
編程是編定程序的中文簡稱,就是讓計(jì)算機(jī)代碼解決某個問題,對某個計(jì)算體系規(guī)定一定的運(yùn)算方式,使計(jì)算體系按照該計(jì)算方式運(yùn)行,并最終得到相應(yīng)結(jié)果的過程。
為了使計(jì)算機(jī)能夠理解人的意圖,人類就必慶扮須將需解決的問題的思路、方法和手段通過計(jì)算機(jī)能夠理解的形式告訴計(jì)算機(jī),使得計(jì)算機(jī)能夠根據(jù)人的指令一步一步去工作,完成某種特定的任務(wù)。這種人和計(jì)算體系之間交流的過程就是編程。
在計(jì)算機(jī)系統(tǒng)中,一條機(jī)器指令規(guī)定了計(jì)算機(jī)系統(tǒng)的一個特定動作。
一個系列的計(jì)算機(jī)在硬件設(shè)計(jì)制造時(shí)就用了若干指令規(guī)定了該系列計(jì)算櫻裂機(jī)能夠進(jìn)行的基本操作,這些指令一起構(gòu)成了該系列計(jì)算機(jī)的指令系統(tǒng)。在計(jì)算機(jī)應(yīng)用的初期,程序員使用機(jī)器的指令系統(tǒng)來編寫計(jì)算機(jī)應(yīng)用程序,這種程序稱為機(jī)脊差閉器語言程序。
以上內(nèi)容參考:
百度百科-編程
這里說明一點(diǎn):本文提到的解決 Spark insertIntoJDBC找不到Mysql驅(qū)動的方法是純鎮(zhèn)針對單機(jī)模式(也就是local模式)。在集群環(huán)境下,下面的方法是不行的。這是因?yàn)樵诜植际江h(huán)境下,加載mysql驅(qū)動包存在一個Bug,1.3及悄悔以前的版本 –jars 分發(fā)的jar在executor端是通過 Spark自身特化的classloader加載的。而JDBC driver manager使用的則是系統(tǒng)默認(rèn)的classloader,因此無法識別。可行的方法之一是在所有 executor 節(jié)點(diǎn)上預(yù)先裝好JDBC driver并放入默認(rèn)的classpath。
spark讀mysql數(shù)據(jù)只出來了字段沒數(shù)據(jù)
文件慶畝丟失。spark讀mysql數(shù)據(jù)只出來了字段沒數(shù)據(jù)是文件丟失導(dǎo)致,需要重新卸載仿拍該軟件,并譽(yù)大森重新下載安裝即可。
關(guān)于spark 讀取數(shù)據(jù)庫方法的介紹到此就結(jié)束了,不知道你從中找到你需要的信息了嗎 ?如果你還想了解更多這方面的信息,記得收藏關(guān)注本站。
成都服務(wù)器租用選創(chuàng)新互聯(lián),先試用再開通。
創(chuàng)新互聯(lián)(www.cdcxhl.com)提供簡單好用,價(jià)格厚道的香港/美國云服務(wù)器和獨(dú)立服務(wù)器。物理服務(wù)器托管租用:四川成都、綿陽、重慶、貴陽機(jī)房服務(wù)器托管租用。
網(wǎng)站欄目:Spark輕松讀取數(shù)據(jù)庫:優(yōu)秀的方法分享 (spark 讀取數(shù)據(jù)庫方法)
文章來源:http://m.fisionsoft.com.cn/article/dhghpph.html


咨詢
建站咨詢
