Data Prep
Code
import os
import pandas as pd
# Load the data
base_path = "path/to/the/folder"
parquet_files = [os.path.join(base_path, file) for file in os.listdir(base_path) if file.endswith('.parquet')]
dfs = [pd.read_parquet(file) for file in parquet_files]
df_pd = pd.concat(dfs, ignore_index=True)
# Benchmark function
def calculate_monthly_taxi_stats_pandas(df: pd.DataFrame) -> pd.DataFrame:
df = (
df
.assign(
trip_year=df["tpep_pickup_datetime"].dt.strftime("%Y").astype("int32"),
period=df["tpep_pickup_datetime"].dt.strftime("%Y-%m"),
trip_duration=(df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.total_seconds()
)
.query(f"trip_year >= 2021 and trip_year <= 2024")
.loc[:, ["period", "trip_duration", "trip_distance", "total_amount", "tip_amount"]]
.groupby("period")
.agg({
"trip_duration": ["count", "mean"],
"trip_distance": ["mean", "sum"],
"total_amount": ["mean", "sum"],
"tip_amount": ["mean"]
})
)
df.columns = df.columns.get_level_values(level=1)
df = df.reset_index()
df.columns = ["period", "num_rides", "avg_trip_duration", "avg_trip_distance", "total_trip_distance", "avg_trip_price", "total_trip_price", "avg_tip_amount"]
df = df.sort_values(by="period")
return df
# Run
res_pandas = calculate_monthly_taxi_stats_pandas(df=df_pd)
- DuckDB
- parquet_scan()f로 Parquet Read 지원
import duckdb
# Database connection
conn = duckdb.connect()
# Benchmark function
def calculate_monthly_taxi_stats_duckdb(conn: duckdb.DuckDBPyConnection, path: str) -> pd.DataFrame:
return (
conn.sql(f"""
select
period,
count(*) AS num_rides,
round(avg(trip_duration), 2) AS avg_trip_duration,
round(avg(trip_distance), 2) AS avg_trip_distance,
round(sum(trip_distance), 2) as total_trip_distance,
round(avg(total_amount), 2) as avg_trip_price,
round(sum(total_amount), 2) as total_trip_price,
round(avg(tip_amount), 2) as avg_tip_amount
from (
select
date_part('year', tpep_pickup_datetime) as trip_year,
strftime(tpep_pickup_datetime, '%Y-%m') as period,
epoch(tpep_dropoff_datetime - tpep_pickup_datetime) as trip_duration,
trip_distance,
total_amount,
tip_amount
from parquet_scan("{path}")
where trip_year >= 2021 and trip_year <= 2024
)
group by period
order by period
""").df()
)
# Run
res_duckdb = calculate_monthly_taxi_stats_duckdb(conn=conn, path="path/to/the/folder/*parquet")
Result
- Parquet Load 이후, Monthly Stat Summary하는 형태로 Test
Limitation
- 동일 쿼리에서 Block Operator가 여러번 등장할 경우, 상호작용 관련 복잡도증가로 인한 OOM 발생 가능
- list() and string_agg()와 같은 몇몇 Aggregation Function은 Offloading to Disk 미지원
Reference