如何使用JDBC源在(Py)Spark中写入和读取数据?

这个问题的目的是logging:

  • 在PySpark中使用JDBC连接读取和写入数据所需的步骤

  • JDBC源和可能的解决scheme可能存在的问题

只需稍作更改,这些方法就可以与Scala和R等其他支持的语言一起工作。

写数据

  1. 提交应用程序或启动shell时,请包含适用的JDBC驱动程序。 你可以使用例如--packages

     bin/pyspark --packages group:name:version 

    或结合driver-class-pathjars

     bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR 

    这些属性也可以在JVM实例启动之前使用PYSPARK_SUBMIT_ARGS环境variables来设置,或者使用conf/spark-defaults.conf来设置spark.jars.packages或者spark.jars / spark.driver.extraClassPath

  2. select所需的模式。 Spark JDBC编写器支持以下模式:

    • append :追加这个:class: DataFrame到现有的数据。
    • overwrite :覆盖现有数据。
    • ignore :如果数据已经存在,则默认忽略此操作。
    • error (默认情况下):如果数据已经存在,则抛出exception。

    不支持Upsert或其他精细修改

     mode = ... 
  3. 准备JDBC URI,例如:

     # You can encode credentials in URI or pass # separately using properties argument # of jdbc method or options url = "jdbc:postgresql://localhost/foobar" 
  4. (可选)创build一个JDBC参数字典。

     properties = { "user": "foo", "password": "bar" } 
  5. 使用DataFrame.write.jdbc

     df.write.jdbc(url=url, table="baz", mode=mode, properties=properties) 

    保存数据(详情请参阅pyspark.sql.DataFrameWriter )。

已知问题

  • 使用--packagesjava.sql.SQLException: No suitable driver found for jdbc: ...java.sql.SQLException: No suitable driver found for jdbc: ...

    假设没有驱动程序版本不匹配来解决这个问题,您可以将driver类添加到properties 。 例如:

     properties = { ... "driver": "org.postgresql.Driver" } 
  • 使用df.write.format("jdbc").options(...).save()可能会导致:

    java.lang.RuntimeException:org.apache.spark.sql.execution.datasources.jdbc.DefaultSource不允许将表创build为select。

    解决scheme未知

  • 在Pyspark 1.3中,您可以尝试直接调用Java方法:

     df._jdf.insertIntoJDBC(url, "baz", True) 

读取数据

  1. 按照写入数据中的步骤1-4
  2. 使用sqlContext.read.jdbc

     sqlContext.read.jdbc(url=url, table="baz", properties=properties) 

    sqlContext.read.format("jdbc")

     (sqlContext.read.format("jdbc") .options(url=url, dbtable="baz", **properties) .load()) 

已知的问题和疑难问题

  • 无法find合适的驱动程序 – 请参阅:写入数据
  • Spark SQL支持使用JDBC源的谓词下推,尽pipe不是所有的谓词都可以推下来。 它也没有委托限制或聚合。 可能的解决方法是用有效的子查询replacedbtable / table参数。 看例如:
    • 是否启动谓词下推使用JDBC?
    • (4)一个多小时执行pyspark.sql.DataFrame.take(4)
  • 默认情况下,JDBC数据源使用单个执行程序线程按顺序加载数据。 要确保分布式数据加载,您可以:

    • 提供分区column (必须是IntegeType ), lowerBoundupperBoundnumPartitions
    • 提供一个相互排斥谓词predicates的列表,每个期望的分区一个predicates
  • 在分布式模式下(使用分区列或谓词),每个执行程序都在自己的事务中进行操作。 如果源数据库同时被修改,则不能保证最终的视图是一致的。

哪里可以find合适的司机:

  • Maven Repository (获取--packages所需的坐标,从--packages选项卡以compile-group:name:versionreplace各个字段的Gradle选项卡中select所需的版本和复制数据)或Maven Central Repository :

    • PostgreSQL的
    • MySQL的

下载mysql-connector-java驱动并保存在spark jar文件夹中,在这里观察下面的python代码将数据写入“acotr1”,我们必须在mysql数据库中创buildacotr1表结构

spark(火花)= SparkSession.builder.appName(“prasadad”)。master('local').config('spark.driver.extraClassPath','D:\ spark-2.1.0-bin-hadoop2.7 \ jars \ mysql-连接器的Java-5.1.41-bin.jar')。getOrCreate()

sc = spark.sparkContext

从pyspark.sql导入SQLContext

sqlContext = SQLContext(sc)

df = sqlContext.read.format(“jdbc”).options(url =“jdbc:mysql:// localhost:3306 / sakila”,driver =“com.mysql.jdbc.Driver”,dbtable =“actor”,user = “根”,口令= “Ramyam01”)。负载()

mysql_url = “JDBC:MySQL的://本地主机:3306 / sakila的用户=根&密码= Ramyam01”

df.write.jdbc(mysql_url,表= “actor1”,模式= “追加”)