Tag: pyspark

火花杀死运行应用程序

我有一个正在运行的Spark应用程序,它占用了我的其他应用程序不会被分配任何资源的所有核心。 我做了一些快速的研究,人们build议使用YARN kill或/ bin / spark-class来终止命令。 但是,我使用的CDH版本和/ bin / spark-class根本不存在,YARN kill应用程序也不起作用。 任何人都可以用这个吗?

如何在Spark DataFrame中添加一个常量列?

我想在DataFrame添加一个任意值的列(每行都是一样的)。 我在使用withColumn时出现错误,如下所示: dt.withColumn('new_column', 10).head(5) ————————————————————————— AttributeError Traceback (most recent call last) <ipython-input-50-a6d0257ca2be> in <module>() 1 dt = (messages 2 .select(messages.fromuserid, messages.messagetype, floor(messages.datetime/(1000*60*5)).alias("dt"))) —-> 3 dt.withColumn('new_column', 10).head(5) /Users/evanzamir/spark-1.4.1/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col) 1166 [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)] 1167 """ -> 1168 return self.select('*', col.alias(colName)) 1169 1170 @ignore_unicode_prefix AttributeError: 'int' object has no attribute 'alias' […]

如何将数据框转换回正常的RDD在pyspark?

我需要使用 (rdd.)partitionBy(npartitions, custom_partitioner) 方法在DataFrame上不可用。 所有的DataFrame方法只涉及DataFrame结果。 那么如何从DataFrame数据创build一个RDD呢? 注意:这是从1.2.0更改(在1.3.0)。 从@dpangmao的答案更新 :方法是.rdd。 我有兴趣了解是否(a)它是公开的和(b)什么是性能影响。 那么(a)是肯定的,(b) – 你可以看到这里有很大的性能影响:必须通过调用mapPartitions来创build一个新的RDD: 在dataframe.py (注意文件名也改变了(是sql.py): @property def rdd(self): """ Return the content of the :class:`DataFrame` as an :class:`RDD` of :class:`Row` s. """ if not hasattr(self, '_lazy_rdd'): jrdd = self._jdf.javaToPython() rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer())) schema = self.schema def applySchema(it): cls = _create_cls(schema) return itertools.imap(cls, it) […]

如何在spark中设置驱动程序的python版本?

我正在使用spark 1.4.0-rc2,所以我可以使用python 3与火花。 如果我将export PYSPARK_PYTHON=python3添加到我的.bashrc文件中,我可以用python 3交互地运行spark。但是,如果我想以本地模式运行独立程序,则会出现以下错误: Exception: Python in worker has different version 3.4 than that in driver 2.7, PySpark cannot run with different minor versions 我如何指定驱动程序的Python版本? 设置export PYSPARK_DRIVER_PYTHON=python3不起作用。

更新spark中的dataframe列

看看新的spark数据框api,目前还不清楚是否可以修改dataframe列。 我将如何去改变数据框的行x列y中的值? 在pandas这将是df.ix[x,y] = new_value 编辑:合并下面说的,你不能修改现有的数据框,因为它是不可变的,但你可以返回一个新的数据框与所需的修改。 如果您只是想根据条件replace列中的值,如np.where : from pyspark.sql import functions as F update_func = (F.when(F.col('update_col') == replace_val, new_value) .otherwise(F.col('update_col'))) df = df.withColumn('new_column_name', update_func) 如果要对列执行一些操作并创build一个添加到数据框的新列: import pyspark.sql.functions as F import pyspark.sql.types as T def my_func(col): do stuff to column here return transformed_value # if we assume that my_func returns a string my_udf = F.UserDefinedFunction(my_func, […]

如何添加一个新的列到Spark DataFrame(使用PySpark)?

我有一个Spark DataFrame(使用PySpark 1.5.1),并想添加一个新的列。 我已经尝试了以下没有任何成功: type(randomed_hours) # => list # Create in Python and transform to RDD new_col = pd.DataFrame(randomed_hours, columns=['new_col']) spark_new_col = sqlContext.createDataFrame(new_col) my_df_spark.withColumn("hours", spark_new_col["new_col"]) 还有一个错误使用这个: my_df_spark.withColumn("hours", sc.parallelize(randomed_hours)) 那么如何使用PySpark将新的列(基于Python向量)添加到现有的DataFrame?

如何运行2个函数在单个RDD上使用pyspark并行执行完全独立的转换?

我试图运行2个函数在单个RDD上使用pyspark并行执行完全独立的转换。 有什么办法可以做同样的事情? def doXTransforms(sampleRDD): (X transforms) def doYTransforms(sampleRDD): (Y Transforms) if __name__ == "__main__": sc = SparkContext(appName="parallelTransforms") sqlContext = SQLContext(sc) hive_context = HiveContext(sc) rows_rdd = hive_context.sql("select * from tables.X_table") p1 = Process(target=doXTransforms , args=(rows_rdd,)) p1.start() p2 = Process(target=doYTransforms, args=(rows_rdd,)) p2.start() p1.join() p2.join() sc.stop() 这不起作用,我现在明白这是行不通的。 但是有没有其他方法可以使这项工作? 具体是否有任何python-spark特定的解决scheme?

PySpark DataFrames – 枚举的方式不转换为pandas?

我有一个非常大的名为df的pyspark.sql.dataframe.DataFrame 。 我需要一些枚举logging的方法,从而能够访问具有特定索引的logging。 (或select索引范围的logging组) pandas,我可以做 indexes=[2,3,6,7] df[indexes] 在这里我想要类似的东西(并且不把数据框转换成pandas) 我能find的最接近的是: 通过以下方式枚举原始数据框中的所有对象: indexes=np.arange(df.count()) df_indexed=df.withColumn('index', indexes) 使用where()函数search我需要的值。 问题: 为什么它不工作,如何使它工作? 如何将一行添加到数据框? 稍后会做出如下的工作: indexes=[2,3,6,7] df1.where("index in indexes").collect() 任何更快,更简单的方法来处理它?

我们如何使用SQL-esque“LIKE”标准来join两个Spark SQL数据框?

我们正在使用与Spark 1.3.1接口的PySpark库。 我们有两个数据框: documents_df := {document_id, document_text}和keywords_df := {keyword} 。 我们希望join两个数据框,并使用keyword_df.keyword出现在document_df.document_textstring中的条件返回带有{document_id, keyword}对的结果数据框。 例如,在PostgreSQL中,我们可以使用以下forms的ON子句来实现: document_df.document_text ilike '%' || keyword_df.keyword || '%' 然而,在PySpark中,我无法获得任何forms的连接语法。 有没有人做过这样的事情? 亲切的问候, 将

如何将Vector分割成列 – 使用PySpark

上下文:我有一个DataFrame 2列:单词和vector。 其中“向量”的列types是VectorUDT 。 一个例子: word | vector assert | [435,323,324,212…] 我想得到这个: word | v1 | v2 | v3 | v4 | v5 | v6 …… assert | 435 | 5435| 698| 356|…. 题: 如何使用pyspark为每个维度在多个列中使用向量分隔列? 提前致谢