如何使用spark从hbase读取
下面的代码将从hbase中读取,然后将其转换为json结构并转换为schemaRDD,但问题是我using List
来存储jsonstring,然后传递给javaRDD,对于大约100 GB的数据,master将会加载内存中的数据。 从hbase加载数据然后执行操作,然后转换为JavaRDD的正确方法是什么?
package hbase_reader; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.rdd.RDD; import org.apache.spark.sql.api.java.JavaSQLContext; import org.apache.spark.sql.api.java.JavaSchemaRDD; import org.apache.commons.cli.ParseException; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.mapreduce.TableInputFormat; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Text; import org.apache.spark.SparkConf; import scala.Function1; import scala.Tuple2; import scala.runtime.AbstractFunction1; import com.google.common.collect.Lists; public class hbase_reader { public static void main(String[] args) throws IOException, ParseException { List<String> jars = Lists.newArrayList(""); SparkConf spconf = new SparkConf(); spconf.setMaster("local[2]"); spconf.setAppName("HBase"); //spconf.setSparkHome("/opt/human/opt/spark-0.9.0-hdp1"); spconf.setJars(jars.toArray(new String[jars.size()])); JavaSparkContext sc = new JavaSparkContext(spconf); //spconf.set("spark.executor.memory", "1g"); JavaSQLContext jsql = new JavaSQLContext(sc); HBaseConfiguration conf = new HBaseConfiguration(); String tableName = "HBase.CounData1_Raw_Min1"; HTable table = new HTable(conf,tableName); try { ResultScanner scanner = table.getScanner(new Scan()); List<String> jsonList = new ArrayList<String>(); String json = null; for(Result rowResult:scanner) { json = ""; String rowKey = Bytes.toString(rowResult.getRow()); for(byte[] s1:rowResult.getMap().keySet()) { String s1_str = Bytes.toString(s1); String jsonSame = ""; for(byte[] s2:rowResult.getMap().get(s1).keySet()) { String s2_str = Bytes.toString(s2); for(long s3:rowResult.getMap().get(s1).get(s2).keySet()) { String s3_str = new String(rowResult.getMap().get(s1).get(s2).get(s3)); jsonSame += "\""+s2_str+"\":"+s3_str+","; } } jsonSame = jsonSame.substring(0,jsonSame.length()-1); json += "\""+s1_str+"\""+":{"+jsonSame+"}"+","; } json = json.substring(0,json.length()-1); json = "{\"RowKey\":\""+rowKey+"\","+json+"}"; jsonList.add(json); } JavaRDD<String> jsonRDD = sc.parallelize(jsonList); JavaSchemaRDD schemaRDD = jsql.jsonRDD(jsonRDD); System.out.println(schemaRDD.take(2)); } finally { table.close(); } } }
使用Spark(Scala)读取HBase数据的基本示例,您也可以使用Java来进行此操作:
import org.apache.hadoop.hbase.client.{HBaseAdmin, Result} import org.apache.hadoop.hbase.{ HBaseConfiguration, HTableDescriptor } import org.apache.hadoop.hbase.mapreduce.TableInputFormat import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.spark._ object HBaseRead { def main(args: Array[String]) { val sparkConf = new SparkConf().setAppName("HBaseRead").setMaster("local[2]") val sc = new SparkContext(sparkConf) val conf = HBaseConfiguration.create() val tableName = "table1" System.setProperty("user.name", "hdfs") System.setProperty("HADOOP_USER_NAME", "hdfs") conf.set("hbase.master", "localhost:60000") conf.setInt("timeout", 120000) conf.set("hbase.zookeeper.quorum", "localhost") conf.set("zookeeper.znode.parent", "/hbase-unsecure") conf.set(TableInputFormat.INPUT_TABLE, tableName) val admin = new HBaseAdmin(conf) if (!admin.isTableAvailable(tableName)) { val tableDesc = new HTableDescriptor(tableName) admin.createTable(tableDesc) } val hBaseRDD = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result]) println("Number of Records found : " + hBaseRDD.count()) sc.stop() } }
更新了-2016年
从Spark 1.0.x +开始,现在您可以使用Spark-HBase Connector:
Maven的依赖包括:
<dependency> <groupId>it.nerdammer.bigdata</groupId> <artifactId>spark-hbase-connector_2.10</artifactId> <version>1.0.3</version> // Version can be changed as per your Spark version, I am using Spark 1.6.x </dependency>
并find相同的以下示例代码:
import org.apache.spark._ import it.nerdammer.spark.hbase._ object HBaseRead extends App { val sparkConf = new SparkConf().setAppName("Spark-HBase").setMaster("local[4]") sparkConf.set("spark.hbase.host", "<YourHostnameOnly>") //eg 192.168.1.1 or localhost or your hostanme val sc = new SparkContext(sparkConf) // For Example If you have an HBase Table as 'Document' with ColumnFamily 'SMPL' and qualifier as 'DocID, Title' then: val docRdd = sc.hbaseTable[(Option[String], Option[String])]("Document") .select("DocID", "Title").inColumnFamily("SMPL") println("Number of Records found : " + docRdd .count()) }
更新 – 2017年
从Spark 1.6.x +开始,现在您也可以使用SHC连接器(Hortonworks或HDP用户):
Maven的依赖包括:
<dependency> <groupId>com.hortonworks</groupId> <artifactId>shc</artifactId> <version>1.0.0-2.0-s_2.11</version> // Version depends on the Spark version and is supported upto Spark 2.x </dependency>
使用这个连接器的主要优点是它在Schema定义中具有灵活性,并且不需要像在nerdammer / spark-hbase-connector中一样使用Hardcoded参数。 另外请记住,它支持Spark 2.x,所以这个连接器非常灵活,并在问题和PR中提供端到端的支持。
find最新的自述文件和示例的以下存储库path:
Hortonworks Spark HBase连接器
您也可以将此RDD转换为DataFrame并在其上运行SQL,或者可以将这些Dataset或DataFrames映射到用户定义的Java Pojo或Case类。 它工作辉煌。
请在下面评论,如果你需要别的东西。
我更喜欢从hbase中读取数据,并完成json操作。
Spark提供JavaSparkContext.newAPIHadoopRDD函数来读取hadoop存储中的数据,包括HBase。 您必须提供configuration参数和表格input格式的HBaseconfiguration,表名称和扫描,以及它的键值
您可以使用表格input格式类和它的作业参数来提供表名和扫描configuration
例:
conf.set(TableInputFormat.INPUT_TABLE, "tablename"); JavaPairRDD<ImmutableBytesWritable, Result> data = jsc.newAPIHadoopRDD(conf, TableInputFormat.class,ImmutableBytesWritable.class, Result.class);
那么你可以在spark中做json操作。 由于spark可以在内存满时进行重新计算,因此只会载入重新计算部分(cmiiw)所需的数据,所以您不必担心数据大小
只是添加如何添加扫描评论:
TableInputFormat具有以下属性:
- SCAN_ROW_START
- SCAN_ROW_STOP
conf.set(TableInputFormat.SCAN_ROW_START, "startrowkey"); conf.set(TableInputFormat.SCAN_ROW_STOP, "stoprowkey");
由于这个问题并不新鲜,现在还有一些其他的select:
- hbase-spark ,一个可直接在HBase 仓库中使用的模块
- Hortonworks公司的Spark-on-HBase
我对第一个项目了解不多,但看起来不支持Spark 2.x。 但是,它在Spark 1.6.x的RDD级别上提供了丰富的支持。
另一方面,Spark-on-HBase在Spark 2.0和即将推出的Spark 2.1上都有分支机构。 这个项目非常有希望,因为它专注于数据集/dataframeAPI。 在底层,它实现了标准的Spark Datasource API,并利用Spark Catalyst引擎进行查询优化。 开发人员声称,它能够进行分区修剪,列修剪,谓词下推和实现数据局部性。
下面介绍一个简单的例子,其中使用了com.hortonworks:shc:1.0.0-2.0-s_2.11
这个回购和Spark 2.0.2的工件。
case class Record(col0: Int, col1: Int, col2: Boolean) val spark = SparkSession .builder() .appName("Spark HBase Example") .master("local[4]") .getOrCreate() def catalog = s"""{ |"table":{"namespace":"default", "name":"table1"}, |"rowkey":"key", |"columns":{ |"col0":{"cf":"rowkey", "col":"key", "type":"int"}, |"col1":{"cf":"cf1", "col":"col1", "type":"int"}, |"col2":{"cf":"cf2", "col":"col2", "type":"boolean"} |} |}""".stripMargin val artificialData = (0 to 100).map(number => Record(number, number, number % 2 == 0)) // write spark .createDataFrame(artificialData) .write .option(HBaseTableCatalog.tableCatalog, catalog) .option(HBaseTableCatalog.newTable, "5") .format("org.apache.spark.sql.execution.datasources.hbase") .save() // read val df = spark .read .option(HBaseTableCatalog.tableCatalog, catalog) .format("org.apache.spark.sql.execution.datasources.hbase") .load() df.count()