在做Spark Dataframe操作的时候会遇到很多需求. 最近我遇到了一个问题, 就是通过agg
方法传入多种聚合函数对多列进行操作, 然后后就生成了多列的聚合结果. 但是列名就是以 原列名+聚合函数的方法, 然后就产生需求 —- 如何修改这些列名呢?
然后找到了以下方法:
-
使用
selectExpr
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
data = sqlContext.createDataFrame([("Alberto", 2), ("Dakota", 2)], ["Name", "askdaosdka"]) data.show() data.printSchema() # Output #+-------+----------+ #| Name|askdaosdka| #+-------+----------+ #|Alberto| 2| #| Dakota| 2| #+-------+----------+ #root # |-- Name: string (nullable = true) # |-- askdaosdka: long (nullable = true) df = data.selectExpr("Name as name", "askdaosdka as age") df.show() df.printSchema() # Output #+-------+---+ #| name|age| #+-------+---+ #|Alberto| 2| #| Dakota| 2| #+-------+---+ #root # |-- name: string (nullable = true) # |-- age: long (nullable = true)
-
使用
withColumnRenamed
1 2 3 4 5 6 7 8
oldColumns = data.schema.names newColumns = ["name", "age"] # 这里的reduce是python的reduce from functools import reduce df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), data) df.printSchema() df.show()
为了便于理解这里给出python
reduce
方法的官方解释.1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
functools.reduce(function, iterable[, initializer]) ''' Apply function of two arguments cumulatively to the items of sequence, from left to right, so as to reduce the sequence to a single value. ''' # Roughly equivalent to: def reduce(function, iterable, initializer=None): it = iter(iterable) if initializer is None: value = next(it) else: value = initializer for element in it: value = function(value, element) return value
简单总结一下, 之前看到这种实现方式, 以为是
withColumnRenamed
有什么特别高级的实现, 现在结合两部分代买分析一下, 简单来说还是就是通过循环, 反复调用withColumnRenamed
方法给df的column重命名…… -
使用
alias
1 2 3 4 5 6 7 8 9 10 11 12
from pyspark.sql.functions import * data = data.select(col("Name").alias("name"), col("askdaosdka").alias("age")) data.show() # Output #+-------+---+ #| name|age| #+-------+---+ #|Alberto| 2| #| Dakota| 2| #+-------+---+
-
使用
sqlContext.sql
1 2 3 4 5 6 7 8 9 10 11 12
sqlContext.registerDataFrameAsTable(data, "myTable") df2 = sqlContext.sql("SELECT Name AS name, askdaosdka as age from myTable") df2.show() # Output #+-------+---+ #| name|age| #+-------+---+ #|Alberto| 2| #| Dakota| 2| #+-------+---+
总的来说, Pyspark并没有能有一次性对整个column names进行替换的方法, 只能通过一些trick逐个列进行操作. 从个人喜好来说, 第一种和第二种实现方式感觉会相对简单.
主要方法转载自:python – 如何更改pyspark中的数据框列名? - 代码日志