Pandas Aggregation을 대체하는 RDB 라이브러리, DuckDB

Pandas Aggregation을 대체하는 RDB 라이브러리, DuckDB
Photo by Christian Bowen / Unsplash

Data Prep

Code

  • Pandas
    • Single Threaded Library
import os  
import pandas as pd  
  
  
# Load the data  
base_path = "path/to/the/folder"  
parquet_files = [os.path.join(base_path, file) for file in os.listdir(base_path) if file.endswith('.parquet')]  
dfs = [pd.read_parquet(file) for file in parquet_files]  
df_pd = pd.concat(dfs, ignore_index=True)  
  
# Benchmark function  
def calculate_monthly_taxi_stats_pandas(df: pd.DataFrame) -> pd.DataFrame:  
	df = (  
		df  
			.assign(  
			trip_year=df["tpep_pickup_datetime"].dt.strftime("%Y").astype("int32"),  
			period=df["tpep_pickup_datetime"].dt.strftime("%Y-%m"),  
			trip_duration=(df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"]).dt.total_seconds()  
			)  
			.query(f"trip_year >= 2021 and trip_year <= 2024")  
			.loc[:, ["period", "trip_duration", "trip_distance", "total_amount", "tip_amount"]]  
			.groupby("period")  
			.agg({  
			"trip_duration": ["count", "mean"],  
			"trip_distance": ["mean", "sum"],  
			"total_amount": ["mean", "sum"],  
			"tip_amount": ["mean"]  
			})  
	)  
	df.columns = df.columns.get_level_values(level=1)  
	df = df.reset_index()  
	df.columns = ["period", "num_rides", "avg_trip_duration", "avg_trip_distance", "total_trip_distance", "avg_trip_price", "total_trip_price", "avg_tip_amount"]  
	df = df.sort_values(by="period")  
	return df  
  
  
# Run  
res_pandas = calculate_monthly_taxi_stats_pandas(df=df_pd)
  • DuckDB
    • parquet_scan()f로 Parquet Read 지원
import duckdb  
  
  
# Database connection  
conn = duckdb.connect()  
  
# Benchmark function  
def calculate_monthly_taxi_stats_duckdb(conn: duckdb.DuckDBPyConnection, path: str) -> pd.DataFrame:  
	return (  
		conn.sql(f"""  
			select  
				period,  
				count(*) AS num_rides,  
				round(avg(trip_duration), 2) AS avg_trip_duration,  
				round(avg(trip_distance), 2) AS avg_trip_distance,  
				round(sum(trip_distance), 2) as total_trip_distance,  
				round(avg(total_amount), 2) as avg_trip_price,  
				round(sum(total_amount), 2) as total_trip_price,  
				round(avg(tip_amount), 2) as avg_tip_amount  
				from (  
					select  
						date_part('year', tpep_pickup_datetime) as trip_year,  
						strftime(tpep_pickup_datetime, '%Y-%m') as period,  
						epoch(tpep_dropoff_datetime - tpep_pickup_datetime) as trip_duration,  
						trip_distance,  
						total_amount,  
						tip_amount  
						from parquet_scan("{path}")  
					where trip_year >= 2021 and trip_year <= 2024  
			)  
			group by period  
			order by period  
			""").df()  
	)  
  
  
# Run  
res_duckdb = calculate_monthly_taxi_stats_duckdb(conn=conn, path="path/to/the/folder/*parquet")

Result

  • Parquet Load 이후, Monthly Stat Summary하는 형태로 Test

Limitation

Reference