如何使用JDBC源在(Py)Spark中写入和读取数据?
这个问题的目的是logging:
-
在PySpark中使用JDBC连接读取和写入数据所需的步骤
-
JDBC源和可能的解决scheme可能存在的问题
只需稍作更改,这些方法就可以与Scala和R等其他支持的语言一起工作。
写数据
-
提交应用程序或启动shell时,请包含适用的JDBC驱动程序。 你可以使用例如
--packages
:bin/pyspark --packages group:name:version
或结合
driver-class-path
和jars
bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
这些属性也可以在JVM实例启动之前使用
PYSPARK_SUBMIT_ARGS
环境variables来设置,或者使用conf/spark-defaults.conf
来设置spark.jars.packages
或者spark.jars
/spark.driver.extraClassPath
。 -
select所需的模式。 Spark JDBC编写器支持以下模式:
-
append
:追加这个:class:DataFrame
到现有的数据。 -
overwrite
:覆盖现有数据。 -
ignore
:如果数据已经存在,则默认忽略此操作。 -
error
(默认情况下):如果数据已经存在,则抛出exception。
不支持Upsert或其他精细修改
mode = ...
-
-
准备JDBC URI,例如:
# You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar"
-
(可选)创build一个JDBC参数字典。
properties = { "user": "foo", "password": "bar" }
-
使用
DataFrame.write.jdbc
df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
保存数据(详情请参阅
pyspark.sql.DataFrameWriter
)。
已知问题 :
-
使用
--packages
(java.sql.SQLException: No suitable driver found for jdbc: ...
)java.sql.SQLException: No suitable driver found for jdbc: ...
假设没有驱动程序版本不匹配来解决这个问题,您可以将
driver
类添加到properties
。 例如:properties = { ... "driver": "org.postgresql.Driver" }
-
使用
df.write.format("jdbc").options(...).save()
可能会导致:java.lang.RuntimeException:org.apache.spark.sql.execution.datasources.jdbc.DefaultSource不允许将表创build为select。
解决scheme未知
-
在Pyspark 1.3中,您可以尝试直接调用Java方法:
df._jdf.insertIntoJDBC(url, "baz", True)
读取数据
- 按照写入数据中的步骤1-4
-
使用
sqlContext.read.jdbc
:sqlContext.read.jdbc(url=url, table="baz", properties=properties)
或
sqlContext.read.format("jdbc")
:(sqlContext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load())
已知的问题和疑难问题 :
- 无法find合适的驱动程序 – 请参阅:写入数据
- Spark SQL支持使用JDBC源的谓词下推,尽pipe不是所有的谓词都可以推下来。 它也没有委托限制或聚合。 可能的解决方法是用有效的子查询replace
dbtable
/table
参数。 看例如:- 是否启动谓词下推使用JDBC?
- (4)一个多小时执行pyspark.sql.DataFrame.take(4)
-
默认情况下,JDBC数据源使用单个执行程序线程按顺序加载数据。 要确保分布式数据加载,您可以:
- 提供分区
column
(必须是IntegeType
),lowerBound
,upperBound
,numPartitions
。 - 提供一个相互排斥谓词
predicates
的列表,每个期望的分区一个predicates
。
- 提供分区
-
在分布式模式下(使用分区列或谓词),每个执行程序都在自己的事务中进行操作。 如果源数据库同时被修改,则不能保证最终的视图是一致的。
哪里可以find合适的司机:
-
Maven Repository (获取
--packages
所需的坐标,从--packages
选项卡以compile-group:name:version
replace各个字段的Gradle选项卡中select所需的版本和复制数据)或Maven Central Repository :- PostgreSQL的
- MySQL的
下载mysql-connector-java驱动并保存在spark jar文件夹中,在这里观察下面的python代码将数据写入“acotr1”,我们必须在mysql数据库中创buildacotr1表结构
spark(火花)= SparkSession.builder.appName(“prasadad”)。master('local').config('spark.driver.extraClassPath','D:\ spark-2.1.0-bin-hadoop2.7 \ jars \ mysql-连接器的Java-5.1.41-bin.jar')。getOrCreate()
sc = spark.sparkContext
从pyspark.sql导入SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.format(“jdbc”).options(url =“jdbc:mysql:// localhost:3306 / sakila”,driver =“com.mysql.jdbc.Driver”,dbtable =“actor”,user = “根”,口令= “Ramyam01”)。负载()
mysql_url = “JDBC:MySQL的://本地主机:3306 / sakila的用户=根&密码= Ramyam01”
df.write.jdbc(mysql_url,表= “actor1”,模式= “追加”)