Spark - Merge / Union Dataframe With Different Schema (column Names And Sequence) To A Dataframe With Master Common Schema
I tried taking a schema as a common schema by df.schema() and load all the CSV files to it .But fails as to the assigned schema , the headers of other CSV files doesnot match Any s
Solution 1:
as I understand it. You want to Union / Merge files with different schemas ( though subset of one Master Schema) .. I wrote this function UnionPro which I think just suits your requirement -
EDIT - Added a Pyspark version
def unionPro(DFList: List[DataFrame], caseDiff: String = "Y"): DataFrame = {
val spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession.active/**
* This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
* Creates a Unioned DataFrame
*///"This doesn't preserve Order------------------------------------"//val MasterColList2 = DFList.map(_.columns.toSet).flatMap(x => x).toSet
val inputDFList = if (caseDiff == "N")
DFListelse {
DFList.map(df => {
val cols = df.columns
val selector = cols.map(x =>col(x).alias(x.toLowerCase))
df.select(selector: _*)
})
}
//"This Preserves Order------------------------------------"
val masterColStrList: Array[String] = inputDFList.map(x => x.columns).reduce((x, y) => (x.union(y))).distinct//val masterColList = ???//Create masterSchema ignoring different Datatype & Nullable in StructField and treating them same based on Name ignoring cases
val ignoreNullable: StructField =>StructField = x =>StructField(x.name, x.dataType, true)
val masterSchema = StructType(inputDFList.map(_.schema.fields.map(ignoreNullable)).reduce((x, y) => (x.union(y))).groupBy(_.name.toLowerCase).map(_._2.head).toArray)
def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = {
allCols.toList.map(x => x match {
case x if myCols.contains(x) => col(x)
case_ =>lit(null).as(x)
})
}
// Create EmptyDF
val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(masterColStrList.head, masterColStrList.tail: _*)
/*
val df1 = DFList(0)
val df1cols = df1.columns
val masterEmptyDF = df1.select(unionExpr(df1cols, MasterColList): _*).where(lit(1) === lit(2))
val DFColumns: List[Array[Column]] = DFList.map(_.columns).map(unionExpr(_, MasterColList).toArray)
val unioned_Data = DFList.zip(DFColumns).map(x => x._1.select(x._2: _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))*///For union/unionall Sequence of columns need to be same.. Use unionByName otherwise//Passing MasterColStrList to Ensure Columns are in correct order
inputDFList.map(df => df.select(unionExpr(df.columns, masterColStrList): _*)).foldLeft(masterEmptyDF)((x, y) => x.unionByName(y))
//inputDFList.map(df => df.select(unionExpr(df.columns, masterColStrList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))
}
Here is the sample test for it -
val aDF = Seq(("A", 1), ("B", 2)).toDF("Name", "ID")
val bDF = Seq(("C", 1), ("D", 2)).toDF("Name", "Sal")
unionPro(List(aDF, bDF), spark).show
Which gives output as -
+----+----+----+|Name| ID| Sal|+----+----+----+| A|1|null|| B|2|null|| C|null|1|| D|null|2|+----+----+----+
Here's Pyspark version of it -
defunionPro(DFList: List[DataFrame], caseDiff: str = "N") -> DataFrame:"""
:param DFList:
:param caseDiff:
:return:
This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
Creates a Unioned DataFrame
"""
inputDFList = DFList if caseDiff == "N"else [df.select([F.col(x.lower) for x in df.columns]) for df in DFList]
# "This Preserves Order ( OrderedDict0-----------------------------------"
from collections import OrderedDict
## As columnNames ( String) are hashable
masterColStrList = list(OrderedDict.fromkeys(reduce(lambda x, y: x + y, [df.columns for df in inputDFList])))
# Create masterSchema ignoring different Datatype & Nullable in StructField and treating them same based on Name ignoring cases
ignoreNullable = lambdax: StructField(x.name, x.dataType, True)
import itertools
# to get reliable results by groupby iterable must be sorted by grouping key# in sorted function key function( lambda) must be passed as named argument ( keyword argument)# but by Sorting now, I lost original order of columns. Hence I'll use masterColStrList while returning final DF
masterSchema = StructType([list(y)[0] for x, y in itertools.groupby(
sorted(reduce(lambda x, y: x + y, [[ignoreNullable(x) for x in df.schema.fields] for df in inputDFList]),
key=lambdax: x.name),
lambdax: x.name)])
defunionExpr(myCols: List[str], allCols: List[str]) -> List[Column]:
return [F.col(x) if x in myCols else F.lit(None).alias(x) for x in allCols]
# Create Empty Dataframe
masterEmptyDF = spark.createDataFrame([], masterSchema)
return reduce(lambda x, y: x.unionByName(y),
[df.select(unionExpr(df.columns, masterColStrList)) for df in inputDFList], masterEmptyDF).select(
masterColStrList)
Post a Comment for "Spark - Merge / Union Dataframe With Different Schema (column Names And Sequence) To A Dataframe With Master Common Schema"