Mastering PySpark Filter Function: A Power Guide with Real Examples

PySpark filter function is a powerhouse for data analysis. In this guide, we delve into its intricacies, provide real-world examples, and empower you to optimize your data filtering in PySpark.

1. Understanding PySpark DataFrame

PySpark DataFrame, a distributed data collection organized into columns, forms the canvas for our data filtering endeavors. Let’s explore its significance.

2. Creating PySpark DataFrame: Analyzing Space Launch Data

To illustrate our journey, we’ll use space launch data, loading it into a PySpark DataFrame and inspecting a sample.

# PySpark DataFrame creation for space launch data (using JSON data)
from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("DoWhileLearn.com").getOrCreate()

# Sample JSON data
json_data = '''
{
  "launches": [
    {"id": 1, "name": "Falcon", "status": "Success", "country": "USA", "rocket": "Falcon 9"},
    {"id": 2, "name": "Delta", "status": "Success", "country": "USA", "rocket": "Delta IV"},
    {"id": 3, "name": "Atlas", "status": "Failure", "country": "USA", "rocket": "Atlas V"},
    {"id": 4, "name": "Titan", "status": "Success", "country": "Russia", "rocket": "Titan II"},
    {"id": 5, "name": "Soyuz", "status": "Failure", "country": "Russia", "rocket": "Soyuz"},
    {"id": 6, "name": "Proton", "status": "Success", "country": "Russia", "rocket": "Proton"},
    {"id": 7, "name": "Zenit", "status": "Success", "country": "Ukraine", "rocket": "Zenit"},
    {"id": 8, "name": "Ariane", "status": "Failure", "country": "France", "rocket": "Ariane"},
    {"id": 9, "name": "Long", "status": "Success", "country": "China", "rocket": "Long March"},
    {"id": 10, "name": "Vega", "status": "Success", "country": "Italy", "rocket": "Vega"}
  ]
}
'''

# Create DataFrame from JSON data
launch_data = spark.read.json(spark.sparkContext.parallelize([json_data]))
launch_data.show(10)

Output:

+---+------------+-----+-------+--------+--------+
| id|launch_date |name |status |country |rocket  |
+---+------------+-----+-------+--------+--------+
|1  |2020-01-01  |Falcon|Success|USA     |Falcon 9|
|2  |2020-01-05  |Delta |Success|USA     |Delta IV|
|3  |2020-01-10  |Atlas |Failure|USA     |Atlas V |
|4  |2020-01-15  |Titan |Success|Russia  |Titan II|
|5  |2020-01-20  |Soyuz |Failure|Russia  |Soyuz   |
|6  |2020-01-25  |Proton|Success|Russia  |Proton  |
|7  |2020-01-30  |Zenit |Success|Ukraine |Zenit   |
|8  |2020-02-04  |Ariane|Failure|France  |Ariane  |
|9  |2020-02-09  |Long  |Success|China   |Long March|
|10 |2020-02-14  |Vega  |Success|Italy   |Vega    |
+---+------------+-----+-------+--------+--------+

3. PySpark filter() Syntax Demystified

Let’s start our journey by understanding the fundamental syntax of the filter function.

# Syntax
filtered_data = df.filter(condition)

Understanding the structure of the condition is crucial for effective DataFrame filtering.

4. Column-Based Filtering in PySpark

Unlock the power of column conditions for efficient data filtering with examples.

Using equals condition

# Using equals condition
equals_condition = launch_data.filter(launch_data.status == "Success")
equals_condition.show(5)

Output:

+---+------------+-------+-------+--------+--------+
| id|launch_date |name   |status |country |rocket  |
+---+------------+-------+-------+--------+--------+
|1  |2020-01-01  |Falcon |Success|USA     |Falcon 9|
|2  |2020-01-05  |Delta  |Success|USA     |Delta IV|
|4  |2020-01-15  |Titan  |Success|Russia  |Titan II|
|6  |2020-01-25  |Proton |Success|Russia  |Proton  |
|9  |2020-02-09  |Long   |Success|China   |Long March|
+---+------------+-------+-------+--------+--------+

Not equals condition

# Using not equals condition
not_equals_condition = launch_data.filter(launch_data.status != "Success")
not_equals_condition.show(5)

Output:

+---+------------+-----+-------+--------+--------+
| id|launch_date |name |status |country |rocket  |
+---+------------+-----+-------+--------+--------+
|3  |2020-01-10  |Atlas|Failure|USA     |Atlas V |
|5  |2020-01-20  |Soyuz|Failure|Russia  |Soyuz   |
|7  |2020-01-30  |Zenit|Success|Ukraine |Zenit   |
|8  |2020-02-04  |Ariane|Failure|France  |Ariane  |
|10 |2020-02-14  |Vega |Success|Italy   |Vega    |
+---+------------+-----+-------+--------+--------+

5. SQL Expressions in PySpark filter()

For SQL aficionados, PySpark enables the use of SQL expressions for seamless DataFrame filtering.

Using SQL Expression

# Using SQL Expression
sql_expression = launch_data.filter("status = 'Success'")
sql_expression.show(5)

Output:

+---+------------+-------+-------+--------+--------+
| id|launch_date |name   |status |country |rocket  |
+---+------------+-------+-------+--------+--------+
|1  |2020-01-01  |Falcon |Success|USA     |Falcon 9|
|2  |2020-01-05  |Delta  |Success|USA     |Delta IV|
|4  |2020-01-15  |Titan  |Success|Russia  |Titan II|
|6  |2020-01-25  |Proton |Success|Russia  |Proton  |
|9  |2020-02-09  |Long   |Success|China   |Long March|
+---+------------+-------+-------+--------+--------+

For not equal

# For not equal
not_equal_sql = launch_data.filter("status != 'Success'")
not_equal_sql.show(5)

Output:

+---+------------+-----+-------+--------+--------+
| id|launch_date |name |status |country |rocket  |
+---+------------+-----+-------+--------+--------+
|3  |2020-01-10  |Atlas|Failure|USA     |Atlas V |
|5  |2020-01-20  |Soyuz|Failure|Russia  |Soyuz   |
|7  |2020-01-30  |Zenit|Success|Ukraine |Zenit   |
|8  |2020-02-04  |Ariane|Failure|France  |Ariane  |
|10 |2020-02-14  |Vega |Success|Italy   |Vega    |
+---+------------+-----+-------+--------+--------+

6. PySpark where() vs. filter()

In PySpark, both filter() and where() functions are interchangeable. Let’s explore their similarities and differences.

7. Advanced Filtering Techniques in PySpark

Unlock the potential of advanced functions like isin(), like(), and rlike() for handling complex filtering scenarios.

8. Real-world Application: Filtering Space Launch Data

Applying our knowledge to filter space launch data effectively and gain meaningful insights.

# Filtering space launch data for successful launches in the USA
us_successful_launches = launch_data.filter((launch_data.status == "Success") & (launch_data.country == "USA"))
us_successful_launches.show(10)

Output:

+---+------------+-------+-------+-------+--------+
| id|launch_date |name   |status |country|rocket  |
+---+------------+-------+-------+-------+--------+
|1  |2020-01-01  |Falcon |Success|USA    |Falcon 9|
|2  |2020-01-05  |Delta  |Success|USA    |Delta IV|
+---+------------+-------+-------+-------+--------+

9. Code Example: PySpark Filtering with Multiple Conditions

# Filter multiple conditions
multiple_conditions = launch_data.filter((launch_data.status == "Success") & (launch_data.year >= 2020))
multiple_conditions.show(10)

Output:

+---+------------+-------+-------+--------+--------+
| id|launch_date |name   |status |country |rocket  |
+---+------------+-------+-------+--------+--------+
|1  |2020-01-01  |Falcon |Success|USA     |Falcon 9|
|2  |2020-01-05  |Delta  |Success|USA     |Delta IV|
|4  |2020-01-15  |Titan  |Success|Russia  |Titan II|
|6  |2020-01-25  |Proton |Success|Russia  |Proton  |
|9  |2020-02-09  |Long   |Success|China   |Long March|
+---+------------+-------+-------+--------+--------+

11. Complete code: Mastering PySpark Filter Function

from pyspark.sql import SparkSession

# Create a Spark session
spark = SparkSession.builder.appName("DoWhileLearn.com").getOrCreate()

# Sample JSON data
json_data = '''
{
  "launches": [
    {"id": 1, "name": "Falcon", "status": "Success", "country": "USA", "rocket": "Falcon 9"},
    {"id": 2, "name": "Delta", "status": "Success", "country": "USA", "rocket": "Delta IV"},
    {"id": 3, "name": "Atlas", "status": "Failure", "country": "USA", "rocket": "Atlas V"},
    {"id": 4, "name": "Titan", "status": "Success", "country": "Russia", "rocket": "Titan II"},
    {"id": 5, "name": "Soyuz", "status": "Failure", "country": "Russia", "rocket": "Soyuz"},
    {"id": 6, "name": "Proton", "status": "Success", "country": "Russia", "rocket": "Proton"},
    {"id": 7, "name": "Zenit", "status": "Success", "country": "Ukraine", "rocket": "Zenit"},
    {"id": 8, "name": "Ariane", "status": "Failure", "country": "France", "rocket": "Ariane"},
    {"id": 9, "name": "Long", "status": "Success", "country": "China", "rocket": "Long March"},
    {"id": 10, "name": "Vega", "status": "Success", "country": "Italy", "rocket": "Vega"}
  ]
}
'''

# Create DataFrame from JSON data
launch_data = spark.read.json(spark.sparkContext.parallelize([json_data]))

# Output 1: Display sample data
launch_data.show(10)

# Output 2: Using equals condition
equals_condition = launch_data.filter(launch_data.status == "Success")
equals_condition.show(5)

# Output 3: Not equals condition
not_equals_condition = launch_data.filter(launch_data.status != "Success")
not_equals_condition.show(5)

# Output 4: Using SQL Expression
sql_expression = launch_data.filter("status = 'Success'")
sql_expression.show(5)

# Output 5: For not equal using SQL Expression
not_equal_sql = launch_data.filter("status != 'Success'")
not_equal_sql.show(5)

# Output 6: Filtering space launch data for successful launches in the USA
us_successful_launches = launch_data.filter((launch_data.status == "Success") & (launch_data.country == "USA"))
us_successful_launches.show(10)

# Output 7: Filter multiple conditions
multiple_conditions = launch_data.filter((launch_data.status == "Success") & (launch_data.id >= 5))
multiple_conditions.show(10)

11. Conclusion: Mastering PySpark Filter Function

In conclusion, mastering PySpark’s filter() function is essential for efficient data analysis. With the ability to handle multiple conditions and leverage advanced techniques, PySpark proves to be a powerful tool for processing large datasets.

Frequently Asked Questions

How to filter multiple columns in PySpark?

Apply multiple conditions on columns using logical operators (e.g., & for AND, | for OR) when filtering DataFrames.

Example:

# Filter multiple conditions
df.filter((df.country == "USA") & (df.status == "Success")).show(truncate=False)

What is the difference between where and filter in PySpark?

In PySpark, both filter() and where() functions are used to filter out data based on certain conditions. They are used interchangeably, and both of them essentially perform the same operation. where() is an alias for filter.

Leave a Reply