PySpark StructType And StructField Examples

Apache Spark is a powerful framework for distributed data processing, and PySpark, its Python API, provides an excellent interface for working with large-scale datasets. In this article, we’ll delve into the world of PySpark StructType and StructField to understand how they can be leveraged for efficient DataFrame manipulation.

Key Points:

  1. Defining DataFrame Schemas:
    • Usage: StructType is commonly employed for defining DataFrame schemas.
    • Data Types: Ideal for structured data with diverse field data types.
  2. Nested Structures:
    • Complex Schemas: Enables the creation of intricate schemas.
    • Nesting Capability: StructType nesting within other StructType objects.
    • Application: Useful for representing hierarchical or multi-level data.
  3. Enforcing Data Structure:
    • Data Reading: Employ StructType when reading data from various sources.
    • Interpretation & Structure: Ensures accurate interpretation and structuring.
    • Significance: Crucial for dealing with semi-structured or schema-less data sources.

Using PySpark StructType And StructField with DataFrame

Before we dive into the details, let’s understand the basics. StructType is a collection of StructField objects that define the schema of a DataFrame. Each StructField represents a column and specifies its name, data type, and whether it can contain null values.

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Create a Spark session
spark = SparkSession.builder.appName("structTypeExample").getOrCreate()

# Define a schema using StructType and StructField
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Create an empty DataFrame with the defined schema
df = spark.createDataFrame([], schema=schema)

# Show the DataFrame schema
df.printSchema()

Output:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)

Defining Nested StructType or Struct

PySpark allows you to create nested structures within a DataFrame. This is achieved by defining a StructType within another StructType.

# Define a nested schema
nested_schema = StructType([
    StructField("address", StructType([
        StructField("city", StringType(), True),
        StructField("zipcode", StringType(), True)
    ]), True)
])

# Add a column with a nested structure to the DataFrame
df = df.withColumn("location", nested_schema)

# Show the updated DataFrame schema
df.printSchema()

Output:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- zipcode: string (nullable = true)

Adding and Changing Columns of the DataFrame

You can dynamically add or modify columns in a DataFrame using PySpark. Here’s an example of adding a new column and updating an existing one.

# Add a new column 'gender' with StringType
df = df.withColumn("gender", StringType())

# Update the 'age' column to DoubleType
df = df.withColumn("age", df["age"].cast("double"))

# Show the modified DataFrame schema
df.printSchema()

Output:

root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- zipcode: string (nullable = true)
 |-- gender: string (nullable = true)

Using SQL ArrayType and MapType

PySpark supports ArrayType and MapType to handle arrays and key-value pairs within a DataFrame.

from pyspark.sql.types import ArrayType, MapType

# Define a schema with ArrayType and MapType
array_map_schema = StructType([
    StructField("grades", ArrayType(IntegerType()), True),
    StructField("contacts", MapType(StringType(), StringType()), True)
])

# Add columns with ArrayType and MapType to the DataFrame
df = df.withColumn("scores", array_map_schema["grades"]).withColumn("contact_info", array_map_schema["contacts"])

# Show the updated DataFrame schema
df.printSchema()

Output:

root
 |-- name: string (nullable = true)
 |-- age: double (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- zipcode: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- scores: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- contact_info: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

Creating StructType or Struct from JSON

PySpark makes it easy to create a StructType from a JSON string.

# Sample JSON data
sample_json = '{"name": "John", "age": 25, "location": {"city": "New York", "zipcode": "10001"}, "gender": "Male"}'

# Create DataFrame from JSON string
json_df = spark.read.json(spark.sparkContext.parallelize([sample_json]))

# Infer schema from the DataFrame
infer_schema = json_df.schema

# Show the inferred schema
infer_schema.printSchema()

Output:

root
 |-- age: long (nullable = true)
 |-- gender: string (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- zipcode: string (nullable = true)
 |-- name: string (nullable = true)

Creating StructType Object from DDL String

You can create a StructType object from a Data Definition Language (DDL) string.

# Define DDL string
ddl_string = "name STRING, age INT, location STRUCT<city: STRING, zipcode: STRING>"

# Create StructType from DDL string
ddl_schema = StructType.fromDDL(ddl_string)

# Show the generated schema
ddl_schema.printSchema()

Output:

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- location: struct (nullable = true)
 |    |-- city: string (nullable = true)
 |    |-- zipcode: string (nullable = true)

Check if a Field Exists in a StructType

You might need to check if a field exists in a StructType. PySpark provides a convenient method for this.

# Check if 'gender' field exists in the DataFrame schema
if "gender" in df.columns:
    print("The 'gender' field exists in the DataFrame schema.")
else:
    print("The 'gender' field does not exist in the DataFrame schema.")

Output:

The 'gender' field exists in the DataFrame schema.

In conclusion, understanding and effectively utilising PySpark StructType and StructField can greatly enhance your DataFrame manipulation capabilities. Whether defining nested structures, adding or changing columns dynamically, or working with complex data types, PySpark provides powerful tools for handling diverse datasets efficiently.

PySpark SturctType And StructField Complete Example

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType, MapType

# Create a Spark session
spark = SparkSession.builder.appName("structTypeExample").getOrCreate()

# Define DataFrame schema
schema = StructType([
    StructField("name", StringType(), True),
    StructField("age", IntegerType(), True)
])

# Create an empty DataFrame with the defined schema
df = spark.createDataFrame([], schema=schema)

# Show the DataFrame schema
df.printSchema()

# Define a nested schema
nested_schema = StructType([
    StructField("address", StructType([
        StructField("city", StringType(), True),
        StructField("zipcode", StringType(), True)
    ]), True)
])

# Add a column with nested structure to the DataFrame
df = df.withColumn("location", nested_schema)

# Show the updated DataFrame schema
df.printSchema()

# Add a new column 'gender' with StringType
df = df.withColumn("gender", StringType())

# Update the 'age' column to DoubleType
df = df.withColumn("age", df["age"].cast("double"))

# Show the modified DataFrame schema
df.printSchema()

# Define a schema with ArrayType and MapType
array_map_schema = StructType([
    StructField("grades", ArrayType(IntegerType()), True),
    StructField("contacts", MapType(StringType(), StringType()), True)
])

# Add columns with ArrayType and MapType to the DataFrame
df = df.withColumn("scores", array_map_schema["grades"]).withColumn("contact_info", array_map_schema["contacts"])

# Show the updated DataFrame schema
df.printSchema()

# Sample JSON data
sample_json = '{"name": "John", "age": 25, "location": {"city": "New York", "zipcode": "10001"}, "gender": "Male"}'

# Create DataFrame from JSON string
json_df = spark.read.json(spark.sparkContext.parallelize([sample_json]))

# Infer schema from the DataFrame
infer_schema = json_df.schema

# Show the inferred schema
infer_schema.printSchema()

# Define DDL string
ddl_string = "name STRING, age INT, location STRUCT<city: STRING, zipcode: STRING>"

# Create StructType from DDL string
ddl_schema = StructType.fromDDL(ddl_string)

# Show the generated schema
ddl_schema.printSchema()

# Check if 'gender' field exists in the DataFrame schema
if "gender" in df.columns:
    print("The 'gender' field exists in the DataFrame schema.")
else:
    print("The 'gender' field does not exist in the DataFrame schema.")

Leave a Reply