将 Pandas DataFrame 转换为 Spark DataFrame
-
使用
createDataFrame()函数将 Pandas DataFrame 转换为 Spark DataFrame -
使用
createDataFrame()和schema函数将 Pandas DataFrame 转换为 Spark DataFrame -
使用启用
apache arrow的createDataFrame()函数将 Pandas DataFrame 转换为 Spark DataFrame
DataFrame 是二维可变数据结构。DataFrame 中的数据存储在称为行和列的标记轴中。
Pandas 和 Spark 都有 DataFrame。本教程将讨论将 Pandas DataFrame 转换为 Spark DataFrame 的不同方法。
使用 createDataFrame() 函数将 Pandas DataFrame 转换为 Spark DataFrame
createDataFrame() 函数用于从 RDD 或 pandas.DataFrame 创建 Spark DataFrame。createDataFrame() 将数据和方案作为参数。
我们将很快讨论方案。
createDataFrame() 的语法:
createDataFrame(data, schema=None)
参数:
data= 要传递的 DataFrameschema=str或列表,可选
返回:DataFrame
方法:
- 导入
pandas库并使用DataFrame()方法创建一个 Pandas DataFrame。 - 通过从
pyspark库中导入SparkSession创建spark会话。 - 将 Pandas DataFrame 传递给
SparkSession对象的createDataFrame()方法。 - 打印 DataFrame。
以下代码使用 createDataFrame() 函数将 Pandas DataFrame 转换为 Spark DataFrame。
# import the pandas
import pandas as pd
# from pyspark library import sql
from pyspark import sql
# Creating a SparkSession
spark_session = sql.SparkSession.builder.appName("pdf to sdf").getOrCreate()
# Creating the pandas DataFrame using pandas.DataFrame()
data = pd.DataFrame(
{
"Course": ["Python", "Spark", "Java", "JavaScript", "C#"],
"Mentor": ["Robert", "Elizibeth", "Nolan", "Chris", "johnson"],
"price$": [199, 299, 99, 250, 399],
}
)
# Converting the pandas dataframe in to spark dataframe
spark_DataFrame = spark_session.createDataFrame(data)
# printing the dataframe
spark_DataFrame.show()
输出:
+----------+---------+------+
| Course| Mentor|price$|
+----------+---------+------+
| Python| Robert| 199|
| Spark|Elizibeth| 299|
| Java| Nolan| 99|
|JavaScript| Chris| 250|
| C#| johnson| 399|
+----------+---------+------+
使用 createDataFrame() 和 schema 函数将 Pandas DataFrame 转换为 Spark DataFrame
我们在前面的示例中讨论了 createDataFrame() 方法。现在我们将看到如何在转换 DataFrame 时更改 schema。
此示例将使用模式更改列名,将 Course 更改为 Technology,将 Mentor 更改为 developer,将 price 更改为 Salary。
schema:
schema 定义字段名称及其数据类型。在 Spark 中,schema 是 DataFrame 的结构,DataFrame 的 schema 可以使用 StructType 类来定义,它是 StructField 的集合。
StructField 采用字段或列的名称、数据类型和可为空的。可空参数定义该字段是否可以为空。
方法:
- 导入
pandas库并使用DataFrame()方法创建一个 Pandas DataFrame。 - 通过从
pyspark库中导入SparkSession创建Spark会话。 - 通过将
StructField的集合传递给StructType类来创建模式;StructField对象是通过传递字段的名称、数据类型和可为空来创建的。 - 将 Pandas DataFrame 和模式传递给
SparkSession对象的createDataFrame()方法。 - 打印 DataFrame。
以下代码使用 createDataFrame() 和 schema 将 Pandas DataFrame 转换为 Spark DataFrame。
# import the pandas
import pandas as pd
# from pyspark library import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.types import *
# Creating a SparkSession
spark_session = sql.SparkSession.builder.appName("pdf to sdf").getOrCreate()
# Creating the pandas DataFrame using pandas.DataFrame()
data = pd.DataFrame(
{
"Course": ["Python", "Spark", "Java", "JavaScript", "C#"],
"Mentor": ["Robert", "Elizibeth", "Nolan", "Chris", "johnson"],
"price$": [199, 299, 99, 250, 399],
}
)
# Creating/Changing schema
dfSchema = StructType(
[
StructField("Technology", StringType(), True),
StructField("developer", StringType(), True),
StructField("Salary", IntegerType(), True),
]
)
# Converting the pandas dataframe in to spark dataframe
spark_DataFrame = spark_session.createDataFrame(data, schema=dfSchema)
# printing the dataframe
spark_DataFrame.show()
输出:
+----------+---------+------+
|Technology|developer|Salary|
+----------+---------+------+
| Python| Robert| 199|
| Spark|Elizibeth| 299|
| Java| Nolan| 99|
|JavaScript| Chris| 250|
| C#| johnson| 399|
+----------+---------+------+
使用启用 apache arrow 的 createDataFrame() 函数将 Pandas DataFrame 转换为 Spark DataFrame
Apache Arrow 是一种独立于语言的列式内存格式,用于平面和分层数据或任何结构化数据格式。Apache Arrow 通过创建标准的列式内存格式提高了数据分析的效率。
默认情况下禁用 apache 箭头;我们可以使用以下代码显式启用它。
SparkSession.conf.set("spark.sql.execution.arrow.enabled", "true")
方法:
- 导入
pandas库并使用DataFrame()方法创建一个 Pandas DataFrame。 - 通过从
pyspark库中导入SparkSession创建spark会话。 - 使用
conf属性启用 apache 箭头。 - 将 Pandas DataFrame 传递给
SparkSession对象的createDataFrame()方法。 - 打印 DataFrame。
以下代码通过启用 apache 箭头将 Pandas DataFrame 转换为 Spark DataFrame 来使用 createDataFrame() 函数。
# import the pandas
import pandas as pd
# from pyspark library import sql
from pyspark import sql
# Creating a SparkSession
spark_session = sql.SparkSession.builder.appName("pdf to sdf").getOrCreate()
# Creating the pandas DataFrame using pandas.DataFrame()
data = pd.DataFrame(
{
"Course": ["Python", "Spark", "Java", "JavaScript", "C#"],
"Mentor": ["Robert", "Elizibeth", "Nolan", "Chris", "johnson"],
"price$": [199, 299, 99, 250, 399],
}
)
spark_session.conf.set("spark.sql.execution.arrow.enabled", "true")
# Converting the pandas dataframe in to spark dataframe
sprak_arrow = spark_session.createDataFrame(data)
# printing the dataframe
sprak_arrow.show()
输出:
+----------+---------+------+
| Course| Mentor|price$|
+----------+---------+------+
| Python| Robert| 199|
| Spark|Elizibeth| 299|
| Java| Nolan| 99|
|JavaScript| Chris| 250|
| C#| johnson| 399|
+----------+---------+------+