如何修改 PySpark Dataframe 的列名

从零开始Spark

Posted by Lyon Ling on August 15, 2019

在做Spark Dataframe操作的时候会遇到很多需求. 最近我遇到了一个问题, 就是通过agg方法传入多种聚合函数对多列进行操作, 然后后就生成了多列的聚合结果. 但是列名就是以 原列名+聚合函数的方法, 然后就产生需求 —- 如何修改这些列名呢?

然后找到了以下方法:

  • 使用selectExpr

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    
    data = sqlContext.createDataFrame([("Alberto", 2), ("Dakota", 2)], 
                                      ["Name", "askdaosdka"])
    data.show()
    data.printSchema()
    # Output
    #+-------+----------+
    #|   Name|askdaosdka|
    #+-------+----------+
    #|Alberto|         2|
    #| Dakota|         2|
    #+-------+----------+
    #root
    # |-- Name: string (nullable = true)
    # |-- askdaosdka: long (nullable = true)
      
    df = data.selectExpr("Name as name", "askdaosdka as age")
    df.show()
    df.printSchema()
    # Output
    #+-------+---+
    #|   name|age|
    #+-------+---+
    #|Alberto|  2|
    #| Dakota|  2|
    #+-------+---+
    #root
    # |-- name: string (nullable = true)
    # |-- age: long (nullable = true)
    
  • 使用withColumnRenamed

    1
    2
    3
    4
    5
    6
    7
    8
    
    oldColumns = data.schema.names
    newColumns = ["name", "age"]
      
    # 这里的reduce是python的reduce
    from functools import reduce
    df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], newColumns[idx]), range(len(oldColumns)), data)
    df.printSchema()
    df.show()
    

    为了便于理解这里给出python reduce方法的官方解释.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    
    functools.reduce(function, iterable[, initializer])
    '''
    Apply function of two arguments cumulatively to the items of sequence, from left to right, so as to reduce the sequence to a single value. 
    '''
      
    # Roughly equivalent to:
    def reduce(function, iterable, initializer=None):
        it = iter(iterable)
        if initializer is None:
            value = next(it)
        else:
            value = initializer
        for element in it:
            value = function(value, element)
        return value
    

    简单总结一下, 之前看到这种实现方式, 以为是withColumnRenamed有什么特别高级的实现, 现在结合两部分代买分析一下, 简单来说还是就是通过循环, 反复调用withColumnRenamed方法给df的column重命名……

  • 使用alias

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
    from pyspark.sql.functions import *
      
    data = data.select(col("Name").alias("name"), col("askdaosdka").alias("age"))
    data.show()
      
    # Output
    #+-------+---+
    #|   name|age|
    #+-------+---+
    #|Alberto|  2|
    #| Dakota|  2|
    #+-------+---+
    
  • 使用sqlContext.sql

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    
    sqlContext.registerDataFrameAsTable(data, "myTable")
    df2 = sqlContext.sql("SELECT Name AS name, askdaosdka as age from myTable")
      
    df2.show()
      
    # Output
    #+-------+---+
    #|   name|age|
    #+-------+---+
    #|Alberto|  2|
    #| Dakota|  2|
    #+-------+---+
    

总的来说, Pyspark并没有能有一次性对整个column names进行替换的方法, 只能通过一些trick逐个列进行操作. 从个人喜好来说, 第一种和第二种实现方式感觉会相对简单.

主要方法转载自:python – 如何更改pyspark中的数据框列名? - 代码日志