from pyspark.sql import SparkSession
# Sample data
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Alice", 25)]
df = spark.createDataFrame(data, ["id", "name", "age"])
df.createOrReplaceTempView("people")
# Select distinct names
unique_names_df = spark.sql(
"SELECT DISTINCT name"
"FROM people"
)
# Show result
unique_names_df.show()
# Count the number of rows in the DataFrame
count_df = spark.sql(
"SELECT COUNT(*) as total_count"
"FROM people"
)
# Show result
count_df.show()
# Group by name and count occurrences
group_by_count_df = spark.sql(
"SELECT name, COUNT(*) as name_count"
"FROM people"
"GROUP BY name"
)
# Show result
group_by_count_df.show()
# Sample data
data = [("Alice", 1000), ("Bob", 1500), ("Alice", 2000)]
df2 = spark.createDataFrame(data, ["name", "salary"])
df2.createOrReplaceTempView("salaries")
# Group by name and sum salaries
sum_salaries_df = spark.sql(
"SELECT name, SUM(salary) as total_salary"
"FROM salaries"
"GROUP BY name"
)
# Show result
sum_salaries_df.show()
# Group by name and calculate average salary
avg_salaries_df = spark.sql(
"SELECT name, AVG(salary) as avg_salary"
"FROM salaries"
"GROUP BY name"
)
# Show result
avg_salaries_df.show()
# Filter records where salary is greater than 1200
filter_df = spark.sql(
"SELECT * "
"FROM salaries"
"WHERE salary > 1200"
)
# Show result
filter_df.show()
# Order records by salary in descending order
order_by_df = spark.sql(
"SELECT * "
"FROM salaries"
"ORDER BY salary DESC"
)
# Show result
order_by_df.show()
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, col
# Sample data
data = [("Hello world",), ("Hello PySpark",), ("Spark is great",)]
df = spark.createDataFrame(data, ["text"])
# Split the text into words
words_df = df.select(explode(split(col("text"), " ")).alias("word"))
# Count occurrences of each word
word_count_df = words_df.groupBy("word").count()
# Show result
word_count_df.show()
# Sample data
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Cathy", 22)]
df = spark.createDataFrame(data, ["id", "name", "age"])
# Filter rows where age > 25
filtered_df = df.filter(col("age") > 25)
# Show result
filtered_df.show()
# Sample data for DataFrame 1
data1 = [(1, "Alice"), (2, "Bob"), (3, "Cathy")]
df1 = spark.createDataFrame(data1, ["id", "name"])
# Sample data for DataFrame 2
data2 = [(1, "HR"), (2, "Engineering"), (4, "Marketing")]
df2 = spark.createDataFrame(data2, ["id", "department"])
# Inner join on 'id'
joined_df = df1.join(df2, on="id", how="inner")
# Show result
joined_df.show()
from pyspark.sql.functions import avg
# Sample data
data = [("Alice", "HR", 25), ("Bob", "Engineering", 30), ("Cathy", "HR", 28)]
df = spark.createDataFrame(data, ["name", "department", "age"])
# Group by department and calculate average age
avg_age_df = df.groupBy("department").agg(avg("age").alias("avg_age"))
# Show result
avg_age_df.show()
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
# Sample data
data = [(1, "Alice"), (2, "Bob"), (3, "Cathy")]
df = spark.createDataFrame(data, ["id", "name"])
# Define a UDF to add a prefix to a name
def add_prefix(name):
return "Mr./Ms. " + name
add_prefix_udf = udf(add_prefix, StringType())
# Apply the UDF
df_with_prefix = df.withColumn("name_with_prefix", add_prefix_udf(col("name")))
# Show result
df_with_prefix.show()
# Sample data with missing values
data = [(1, "Alice", 25), (2, "Bob", None), (3, "Cathy", 28)]
df = spark.createDataFrame(data, ["id", "name", "age"])
# Fill missing values in 'age' with a default value of 0
filled_df = df.na.fill({"age": 0})
# Show result
filled_df.show()
# Sample data
data = [(1, "Alice", 25), (2, "Bob", 30), (3, "Cathy", 22)]
df = spark.createDataFrame(data, ["id", "name", "age"])
# Write DataFrame to CSV
df.write.csv("/path/to/output", header=True)
In this guide, we will deep dive into how to manage data lakes on Databricks using SQL within PySpark. A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.
Let's start by loading data into a data lake using PySpark SQL queries.
spark.sql("SELECT * FROM '/mnt/data/sample.csv'").show()
After loading data into the data lake, you can perform transformations using SQL queries in PySpark. For example, let's perform a group by operation.
spark.sql("SELECT category, SUM(price) FROM sales GROUP BY category").show()
Once you have processed the data, you can write it back to your data lake in various formats such as Parquet using PySpark.
df_grouped.write.format("parquet").save("/mnt/data/output/")
This code finds duplicate records in the data based on a specific column using SQL commands in PySpark.
spark.sql("SELECT email, COUNT(email) FROM customers GROUP BY email HAVING COUNT(email) > 1").show()
This code retrieves the top categories by the total price using SQL queries in PySpark.
spark.sql("SELECT category, SUM(price) FROM sales GROUP BY category ORDER BY SUM(price) DESC LIMIT 10").show()
In this guide, we will deep dive into how to manage data lakes on Databricks using PySpark. A data lake is a centralized repository that allows you to store all your structured and unstructured data at any scale.
Let's start by loading data into a data lake using PySpark on Databricks.
df = spark.read.format("csv").option("header", "true").load("/mnt/data/sample.csv")
df.show()
After loading data into the data lake, you can perform transformations using PySpark. For example, let's perform a group by operation.
df_grouped = df.groupBy("category").sum("price")
df_grouped.show()
Once you have processed the data, you can write it back to your data lake in various formats such as Parquet or Delta.
df_grouped.write.format("parquet").save("/mnt/data/output/")
This code finds duplicate records in the data based on a specific column.
df_duplicates = df.groupBy("email").count().filter("count > 1")
df_duplicates.show()
This code retrieves the top categories by the total price, similar to how you might use SQL's GROUP BY.
df_top_categories = df.groupBy("category").sum("price").orderBy("sum(price)", ascending=False)
df_top_categories.show(10)