如何使用Spark DataFrame查询JSON数据列?

我有一个卡桑德拉表为简单起见如下所示:

key: text jsonData: text blobData: blob 

我可以使用spark和spark-cassandra连接器为此创build一个基本数据框:

 val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "mytable", "keyspace" -> "ks1")) .load() 

尽pipe我将JSON数据扩展到其底层结构,但我仍在苦苦挣扎。 我最终希望能够根据jsonstring中的属性进行过滤并返回blob数据。 像jsonData.foo =“bar”,并返回blobData。 目前这是可能的吗?

Spark 2.1+

你可以使用from_json函数:

 import org.apache.spark.sql.functions.from_json import org.apache.spark.sql.types._ val schema = StructType(Seq( StructField("k", StringType, true), StructField("v", DoubleType, true) )) df.withColumn("jsonData", from_json($"jsonData", schema)) 

Spark 1.6+

你可以使用get_json_object ,它有一个列和一个path:

 import org.apache.spark.sql.functions.get_json_object val exprs = Seq("k", "v").map( c => get_json_object($"jsonData", s"$$.$c").alias(c)) df.select($"*" +: exprs: _*) 

并将字段提取到可以进一步转换为预期types的​​单个string。

Spark <= 1.5

目前这是可能的吗?

据我所知,这是不可能的。 你可以尝试类似这样的事情:

 val df = sc.parallelize(Seq( ("1", """{"k": "foo", "v": 1.0}""", "some_other_field_1"), ("2", """{"k": "bar", "v": 3.0}""", "some_other_field_2") )).toDF("key", "jsonData", "blobData") 

我假设blob字段不能用JSON表示。 否则,你的驾驶室省略了拆分和连接:

 import org.apache.spark.sql.Row val blobs = df.drop("jsonData").withColumnRenamed("key", "bkey") val jsons = sqlContext.read.json(df.drop("blobData").map{ case Row(key: String, json: String) => s"""{"key": "$key", "jsonData": $json}""" }) val parsed = jsons.join(blobs, $"key" === $"bkey").drop("bkey") parsed.printSchema // root // |-- jsonData: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: double (nullable = true) // |-- key: long (nullable = true) // |-- blobData: string (nullable = true) 

另一种方法(更便宜,但更复杂)的方法是使用UDF来parsingJSON并输出structmap列。 比如像这样的东西:

 import net.liftweb.json.parse case class KV(k: String, v: Int) val parseJson = udf((s: String) => { implicit val formats = net.liftweb.json.DefaultFormats parse(s).extract[KV] }) val parsed = df.withColumn("parsedJSON", parseJson($"jsonData")) parsed.show // +---+--------------------+------------------+----------+ // |key| jsonData| blobData|parsedJSON| // +---+--------------------+------------------+----------+ // | 1|{"k": "foo", "v":...|some_other_field_1| [foo,1]| // | 2|{"k": "bar", "v":...|some_other_field_2| [bar,3]| // +---+--------------------+------------------+----------+ parsed.printSchema // root // |-- key: string (nullable = true) // |-- jsonData: string (nullable = true) // |-- blobData: string (nullable = true) // |-- parsedJSON: struct (nullable = true) // | |-- k: string (nullable = true) // | |-- v: integer (nullable = false) 

from_json函数正是你正在寻找的。 你的代码看起来像这样:

 val df = sqlContext.read .format("org.apache.spark.sql.cassandra") .options(Map("table" -> "mytable", "keyspace" -> "ks1")) .load() //You can define whatever struct type that your json states val schema = StructType(Seq( StructField("key", StringType, true), StructField("value", DoubleType, true) )) df.withColumn("jsonData", from_json(col("jsonData"), schema)) 

底层的JSONstring是

 "{ \"column_name1\":\"value1\",\"column_name2\":\"value2\",\"column_name3\":\"value3\",\"column_name5\":\"value5\"}"; 

以下是过滤JSON并将所需数据加载到Cassandra的脚本。

  sqlContext.read.json(rdd).select("column_name1 or fields name in Json", "column_name2","column_name2") .write.format("org.apache.spark.sql.cassandra") .options(Map("table" -> "Table_name", "keyspace" -> "Key_Space_name")) .mode(SaveMode.Append) .save()