Ch.6 - Working with different types of Data
10 Jan 2025Only points that needed attention is jotted here.
Working with Dates and Timestamps
- Spark’s
TimestampType
class supports only second-level precision. If one wants a precision up to milliseconds or microseconds, one has to potentially operate on them aslongs
.
Example: using F.current_date()
and F.current_timestamp()
date_df = spark.range(10)\
.withColumn("today", F.current_date())\
.withColumn("now", F.current_timestamp())
date_df.show()
would give
+---+----------+--------------------+
| id| today| now|
+---+----------+--------------------+
| 0|2020-11-30|2020-11-30 15:46:...|
| 1|2020-11-30|2020-11-30 15:46:...|
| 2|2020-11-30|2020-11-30 15:46:...|
| 3|2020-11-30|2020-11-30 15:46:...|
| 4|2020-11-30|2020-11-30 15:46:...|
| 5|2020-11-30|2020-11-30 15:46:...|
| 6|2020-11-30|2020-11-30 15:46:...|
| 7|2020-11-30|2020-11-30 15:46:...|
| 8|2020-11-30|2020-11-30 15:46:...|
| 9|2020-11-30|2020-11-30 15:46:...|
+---+----------+--------------------+
The schema (obtained by date_df.printSchema()
) looks like
root
|-- id: long (nullable = false)
|-- today: date (nullable = false)
|-- now: timestamp (nullable = false)
Example: using F.date_add()
and F.date_sub()
date_df.select(
F.date_sub("today", 5).alias("5-days-ago"),
F.date_add("today", 5).alias("5-days-later"))
would give
+----------+------------+
|5-days-ago|5-days-later|
+----------+------------+
|2020-11-25| 2020-12-05|
... ...
|2020-11-25| 2020-12-05|
+----------+------------+
or in SQL
%sql
SELECT date_add(today, 5) AS `5-days-ago` FROM date_df
LIMIT 1
would give
+----------+
|5-days-ago|
+----------+
|2020-11-25|
+----------+
Difference in date/months: datediff
and months_between
- Syntax:
datediff(col_1, col_2)
- Spark would return
null
if it cannot parse a date - To parse a string to date, one can use:
F.to_date(...)
.
Example:
date_df.select(
F.to_date(F.lit('2012-20-12')).alias('invalid'),
F.to_date(F.lit('2012-12-20')).alias('valid'))
would give
+-------+----------+
|invalid| valid|
+-------+----------+
| null|2012-12-20|
+-------+----------+
Fixing invalid dates - specific our date format
to_date(column, format)
, e.g.F.to_date(F.lit("2017-20-12"), "yyyy-dd-MM")
to_timestamp(column, format)
, e.g.F.to_timestamp(F.lit("2017-20-12"), "yyyy-dd-MM")
Working with Nulls in Data
- Use
na
subpackage ofDataFrame
- Important note: when we declare a column as NOT having a null time, that is not actually enforced. The nullable signal is simply to help Spark SQL optiimize for handling that column.
Functions that can be used to deal with nulls
coalesce
ifnull
,nullif
,nvl
,nvl2
df.na.drop(how, thresh, subset)
ordf.dropna()
drop the whole row if any element in the row contains NULL. Options for how are ('any'
,'all'
)df.filter(df.col("col").isNotNull())
you drop those rows which have null only in the columncol
. One can alternatively usedf.na.drop(subset=["col"])
to achieve the same resultsdf.na.fill(value, subset)
ordf.fillna(value, subset)
df.na.replace(to_replace, value, subset=None)
-> this is to replace value that are not null when df has null values, e.g.
df3 = spark.createDataFrame([Row(a=2, b=3), Row(a=None, b=5), Row(a=4, b=None)])
df3.na.replace(to_replace=[3], value=[33]).show()
gives
+----+----+
| a| b|
+----+----+
| 2| 33|
|null| 5|
| 4|null|
+----+----+
Ordering
asc_nulls_first
,asc_nulls_last
,desc_nulls_first
,desc_ulls_last
Working with Complex Types (P. 111)
Structs
complex_df = df.select(F.struct("Description", "InvoiveNo").alias("complex")
To bring up all values in the struct, use complex_df.select("complex.*")
.
Arrays
split
F.split(...)
Array Length
F.size(...)
array_contains
F.array_contains(col, value)
explode
This is used to explode an array column.
df.withColumn("exploded", F.explode(F.col("splitted")))
In SQL, it reads like
SELECT ... FROM ...
LATERAL VIEW explode(splitted) AS exploded
Maps
Python syntax: pyspark.sql.functions.create_map(*cols)
df.select(create_map('name', 'age').alias("map")).collect()
returns
[Row(map={'Alice': 2}), Row(map={'Bob': 5})]
In SQL, the function is map
. It reads like
SELECT map(Description, InvoiceNo) AS complex_map FROM dfTable
WHERE Description IS NOT NULL
Attention: read more from documentation about the map-related functions.
Working with JSON
Attention: come back to JSON later!!