从Apache火花的DataFrame熊猫
2015年8月12日 在工程的博客
这是一个博客的交叉发送奥利弗Girardot。奥利弗是一个软件工程师和横向思想的创始人之一,他工作在机器学习,大数据,DevOps的解决方案。
随着窗口的引入在Apache 1.4火花,你终于可以港口几乎任何相关的大熊猫DataFrame计算Apache火花并行计算框架使用SQL的DataFrame火花。如果你不熟悉火花的DataFrame,不要犹豫地看看抽样是Apache引发的新的字节码后,回到这里。
我知道一些关于如何港口现有的反馈复杂的代码可能是有用的,因此本文的目的是将几个概念熊猫DataFrame看看我们如何翻译这PySpark DataFrame使用火花1.4。
免责声明:几个操作,你可以在熊猫不转化为火花。请记住DataFrames火花就像抽样在某种意义上,他们是一个不可变的数据结构。因此,诸如:
创建一个新的“三”列
df(“三”)= df(“一个”)* df (“2”)
不能存在,只是因为这种做作违背原则的火花。另一个例子是试图通过索引访问DataFrame中的单个元素。别忘了,你使用一个分布式数据结构,而不是一个内存中的随机访问的数据结构。
很明显,这并不意味着你不能做同样的事情(即创建一个新的列)使用火花,这意味着你必须认为不可变/分布式和代码的重写部分,主要部件不纯粹的认为是对一连串的数据转换。
所以就让我们一探究竟吧。
列选择
这部分不是那么多不同的熊猫和火花,但你必须考虑DataFrame的不变的字符。首先让我们创建两个DataFrames熊猫pdf,另一个在火花df:
熊猫= > pdf
在[17]:pdf = pd.DataFrame.from_items ([(“A”, [1, 2, 3]), (" B ", (4、5、6))))
在[18]:pdf.A
[18]:
0 1
1 2
2 3
名称:dtype: int64
SQL = > df火花
在[19]:df = sqlCtx。createDataFrame ([(1,4) (2、5), (6)], [“A”、“B”))
在[20]:df
[20]:DataFrame (bigint。B:长整型数字)
在[21]:df.show ()
+ - + - +
| | | B
+ - + - +
| 1 | 4 |
| 2 | 5 |
| 3 | 6 |
+ - + - +
在SQL或火花熊猫你使用相同的语法来引用一个列:
在[27]:df.A
[27]:列
[27]:列
在[29]:pdf.A
[29]:
0 1
1 2
2 3
名称:dtype: int64
在[30]:pdf [A]
[30]:
0 1
1 2
2 3
名称:dtype: int64
输出看起来不同,但这些仍然是相同的方式引用列使用熊猫或火花。唯一的区别是,在大熊猫,它是一个可变的数据结构,你可以改变,而不是在火花。
列添加
在[31]:pdf (' C ') = 0
在[32]:pdf
[32]:
A B C
0 1 4 0
1 2 5 0
2 3 6 0
在火花SQL使用withColumn或选择方法,
但是你需要创建一个“列”,一个简单的int不会:
在[33]:df。withColumn (' C ', 0)
AttributeError回溯(最近调用最后)
df - - - - - > 1。withColumn (' C ', 0)
/用户/ ogirardot /下载/ spark-1.4.0-bin-hadoop2.4 / python / pyspark / sql / dataframe。佩克在withColumn(自我、colName坳)
1196”“”
- > 1197年回归自我。选择(‘*’,col.alias (colName))
1198年
1199年@ignore_unicode_prefix
AttributeError:“int”对象没有属性的别名
这是你新的最好的朋友“pyspark.sql.functions *”。
如果你不能从组合列创建它
这个包包含所有你需要的功能:
从pyspark [35]:。sql导入函数F
在[36]:df。withColumn (“C”, F.lit (0))
出[36]:DataFrame bigint。B:长整型数字,C: int)
在[37]:df。withColumn (“C”, F.lit(0)),告诉()
+ - + - + - +
| | B C | |
+ - + - + - +
| 1 | 4 | 0 |
| 2 | 5 | 0 |
| 3 | 6 | 0 |
+ - + - + - +
大部分时间在火花SQL可以使用字符串引用列但有两种情况下,你想要使用的列对象而不是字符串:
- 在火花SQL DataFrame列可以有相同的名字,他们将被赋予独特的名字在火花SQL,但是这意味着您不能引用的列名称只有当这是模棱两可的。
- 当你需要操作列使用表达式添加两个列,该列的值的两倍甚至列值大于0吗?,您将无法使用简单的字符串和需要列引用。
- 最后如果你需要重命名,或任何其他复杂的功能,你需要列引用。
这里有一个例子:
在[39]:df。df withColumn (“C”。* 2)
出[39]:DataFrame bigint。B:长整型数字,C:长整型数字)
在[40]:df。df withColumn (“C”。* 2),告诉()
+ - + - + - +
| | B C | |
+ - + - + - +
| 1 | 4 | 2 |
| 2 | 5 | 4 |
| 3 | 6 | 6 |
+ - + - + - +
在[41]:df。df withColumn (“C”。B > 0),告诉()
+ - + - + - - - +
| | B C | |
+ - + - + - - - +
| 1 | 4 | |
| 2 | 5 | |
| | 3 | 6 |如此
+ - + - + - - - +
当你选择列,创建另一个预计DataFrame之外,您还可以使用表达式:
在[42]:df.select (df。B > 0)
[42]:DataFrame ((B > 0):布尔)
在[43]:df.select (df。B > 0),告诉()
+ - - - - - - - +
| | (B > 0)
+ - - - - - - - +
真正| |
真正| |
真正| |
+ - - - - - - - +
年代你可以看到列名会根据您定义的表达式,计算如果你想重命名这个,你需要使用别名方法列:
在[44]:df.select (df。B > 0)。别名("is_positive")).show()
+ - - - - - - - - - - - - +
| is_positive |
+ - - - - - - - - - - - - +
真正| |
真正| |
真正| |
+ - - - - - - - - - - - - +
所有的表达式,我们在这里建立可用于过滤,添加一个新列,甚至在聚合,所以一旦你大致了解它是如何工作的,你会流利的在所有的DataFrame操作框架。
过滤
过滤是非常简单的,你可以使用RDD-like过滤器
方法和复制你的任何现有的大熊猫表达式/谓词过滤:
在[48]:pdf (pdf。B > 0) & (pdf)。0)& (df。0)& (df。一个聚合
起初可以混淆使用聚合的那一刻你写吗groupBy
你不使用一个DataFrame对象,实际使用GroupedData
对象和你需要精确的聚合得到输出DataFrame:
在[77]:df.groupBy (“A”)
[77]:
在[78]:df.groupBy .avg (“A”) (“B”)
[78]:DataFrame [bigint, AVG (B):双)
在[79]:df.groupBy .avg (A) (B),告诉()
+ - + - - - +
| | | AVG (B)
+ - + - - - +
| 1 | 4.0 |
| 2 | 5.0 |
| 3 | 6.0 |
+ - + - - - +
作为一个语法糖,如果你只需要一个聚合,可以使用最简单的函数:avg, cout,最大值、最小值、均值和总和直接在GroupedData
,但大多数时候,这将是太简单了,你会想要创建几个聚合在一个groupBy操作。毕竟(出口的。抽样是Apache引发的新的字节码)这是DataFrames的最大特征之一。这样做你会使用gg
方法:
在[83]:df.groupBy (“A”) .agg (F.avg (B), F.min (B), F.max (B)),告诉()
+ - + - - - - - - - - - - - - - - - - - - + +
| | | AVG (B)最小值(B) |马克斯(B) |
+ - + - - - - - - - - - - - - - - - - - - + +
| 1 | 4.0 | 4 | 4 |
| 2 | 5.0 | 5 | 5 |
| 3 | 6.0 | 6 | 6 |
+ - + - - - - - - - - - - - - - - - - - - + +
当然,就像之前一样,您可以使用任何表达特别是列组成,别名定义等…和其他一些重要的功能:
在[84]:df.groupBy (“A”) .agg (
....:F.first("B").alias("my first"),
....:F.last("B").alias("my last"),
....:F.sum("B").alias("my everything")
....:).show()
+ - + - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - +
| |我的姓我| | |我的一切
+ - + - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - +
| 1 | 4 | 4 | 4 |
| 2 | 5 | 5 | 5 |
| 3 | 6 | 6 | 6 |
+ - + - - - - - - - - - - + - - - - - - - - - - - - - - - - - - - - +
复杂的操作和窗户
现在1.4火花,Dataframe API提供了一个有效的和易于使用的基于窗口的框架,这一特性使任何熊猫火花迁移实际上可行项目的99%——甚至似乎考虑大熊猫的一些特性硬在分布式环境中繁殖。
我们可以选择一个简单的例子就是,在熊猫可以计算差异列,熊猫将比较的值一行最后一个,计算它们之间的差异。通常的特性在分布式环境中很难做到,因为每一行应该是独立处理,现在1.4火花窗口操作您可以定义一个窗口的火花执行一些聚合函数但相对特定的线。这是如何使用diff港口一些现有的大熊猫代码:
在[86]:df = sqlCtx。createDataFrame ([(1、4), (1、5), (2,6) (2,6) (0)], [“A”、“B”))
在[95]:pdf = df.toPandas ()
在[96]:pdf
[96]:
一个B
0 1 4
1 1 5
2 2 6
3 2 6
4 3 0
在[98]:pdf [' diff '] = pdf.B.diff ()
在[102]:pdf
[102]:
B diff
0 1 4南
1 1 5 1
2 2 6 1
3 2 6 0
4 3 0 6
在熊猫可以计算任意列上差异,没有考虑钥匙,没有把订单或任何东西。很酷…但大部分时间不是你想要的东西,你可能会清理留下的烂摊子通过设置列值回南从一行到另一个键时发生了变化。
这是如何做这种事在PySpark使用窗口函数,一个关键,如果你愿意的话,在一个特定的顺序:
在[107]:从pyspark.sql。窗口导入窗口
在[108]:window_over_A = Window.partitionBy .orderBy (“A”) (“B”)
在[109]:df。withColumn (" diff ", F.lead (B) .over (window_over_A) - df.B),告诉()
+ - - - + - - - + - - - +
| | | diff | B
+ - - - + - - - + - - - +
| 1 | 4 | 1 |
零| | 1 | 5 |
| 2 | 6 | 0 |
零| | 2 | 6 |
零| | 3 | 0 |
+ - - - + - - - + - - - +
与你现在能够逐行计算diff -命令或不给一个特定的关键。大窗口操作点在于你没有打破你的数据的结构。让我解释一下我自己。
当你计算一些聚合(再次根据键),你通常会执行groupBy
操作这个键和计算你需要的多个指标(在同一时间如果你够幸运,否则在多个reduceByKey
或aggregateByKey
转换)。
但你是否使用抽样或DataFrame,如果你不使用窗口操作就会粉碎你的数据流的一部分,然后您需要重新加入你的聚合的结果主要——数据流。窗口操作允许您执行计算和复制结果附加列没有任何明确的加入。
这是一个快速的方法来丰富你的数据直接添加滚动计算只是另一个列。两个额外的资源是值得注意的关于这些新特性,官方的砖的博客文章的窗口操作和克利斯朵夫Bourguignat文章的评价熊猫和火花DataFrame差异。
综上所述,你现在有你需要的所有工具在火花1.4端口任何熊猫计算在分布式环境中使用非常类似DataFrame API。