Fugue and DuckDB: Fast SQL Code in Python

Fugue and DuckDB: Fast SQL Code in Python

Motivation

As a data scientist, you might be familiar with both Pandas and SQL. However, there might be some queries, and transformations that you feel comfortable doing in SQL instead of Python.

Wouldn’t it be nice if you can query a pandas DataFrame using SQL.

…while also speeding up your code when working with large data?

That is when Fugue + DuckDB comes in handy.

In the previous article, I showed how to query a pandas DataFrame with SQL using the pandas engine.

In this article, I will show you how to speed up your query using the DuckDB engine.

Feel free to play and fork the source code of this article here.

Why Fugue and DuckDB?

Fugue is a Python library that allows users to combine Python code and SQL commands. This gives users the flexibility to switch between Python and SQL within a Jupyter Notebook or a Python script.

By default, Fugue maps your SQL code to pandas. However, pandas is not optimal to use when data size is beyond a few GBs because it:

  • only allows you to use one core at a time
  • creates a lot of intermediate copies of data, which increases memory usage

Fugue also allows you to scale your SQL code using Spark or Dask. However, smaller organizations might not have clusters to which they can port their workload.

Introduction to DuckDB

DuckDB is an in-process SQL OLAP database management system. The speed is very good on even gigabytes of data on local machines.

Thus, the combination of FugueSQL and DuckDB allows you to use SQL with Python and seamlessly speed up your code.

To install FugueSQL with DuckDB engine, type:

pip install -U fugue[duckdb,sql] 

Set Up

First, we import some setup functions for Fugue. This will let us use the %%fsql cell magic inside Jupyter notebooks. We also import the DuckDB engine.

from fugue_notebook import setup
import fugue_duckdb

setup()

Load Data

This article will use the Binance Crypto Kittens dataset. This can be downloaded from Kaggle available through the Creative Commons License.

The folder crypto-binance contains over 1000 distinct files with a total memory of over 5GB. I combined these files and save the new file as raw.parquet.

After downloading the processed file, a file name raw.parquet will be saved in your local machine. Start with getting the location of the new file:

import os

save_path = os.getcwd() + '/raw.parquet'

Now let’s compare the loading speed of pandas and DuckDB.

pandas

Start with loading the data using pandas:

Note that it took us 10.5s to load the data. This is pretty slow. Let’s see if we can speed up the process by using FugueSQL and DuckDB instead.

Fugue + DuckDB

To write SQL using DuckDB as the engine in a Jupyter Notebook cell, simply add %%fsql duck at the beginning of a cell:

In the code above,

  • PRINT allows us to print the output
  • The double brackets {{}} allow us to use the Python variable in SQL.

The code above loads the data in 303ms! Using DuckDB as the engine is over 34 times faster than using pandas as the engine.

Processing

Let’s compare the speed of processing data between pandas and DuckDB + Fugue.

pandas

Fugue + DuckDB

Note: FugueSQL allows for multiple _SELECT_ statements similar to SQL temp tables. This allows the code to be read top-down and eliminates a for of boilerplate code. If _FROM_ is not specified, the SQL statement uses the last DataFrame from the stack.

Observation

We can see that using Fugue + DuckDB is almost 4 times faster than using pandas. It is also slightly easier to write the processing code above in SQL than in pandas.

Why DuckDB Is So Much Faster?

DuckDB is faster because it uses lazy evaluation.

For example, in the code above, the PRINT statement returns a default of 10 records. DuckDB knows that only 10 records are needed by the end result, so it only fetches those records.

On the other hand, Pandas is executed eagerly. This means that only after the entire file is loaded in, the operation to fetch the first 10 rows is run.

But Why Fugue?

DuckDB has its own Python API, why should we use Fugue with DuckDB?

It is because Fugue provides custom functions that allow you to interact with your Python objects easily. In the next sections, we will learn how to improve your SQL code with some of these custom functions.

Use Fugue DataFrame

In the code above, the line

YIELD DATAFRAME AS top_symbols

…outputs a fugue DataFrame and saves it as top_symbols .

You can easily turn top_symbols into a pandas DataFrame:

top_symbols.as_pandas()

…or use top_symbols in another SQL query:

Assign Names to Intermediate Outputs

Sometimes, you might want to assign names to intermediate outputs so they can be used by other processes within the same SQL query. Fugue allows you to assign names to your intermediate outputs using = :

In the code above, I save the intermediate output to src then join src with top_symbols .

Python Extensions

Using Fugue and DuckDB together also allows you to use Python logic in SQL code through extensions. Let’s go through some of these extensions.

Output

SQL doesn’t allow you to plot the outputs. However, we can create a plotting function in Python then use it in our SQL code.


def plot_by(df:pd.DataFrame, by, y) -> None:
    for name, group in df.groupby(by):
        group.plot(x="time", y=y, title=name)

To use the plot_by function above, simply add OUPUT USING next to plot_by :

Transform

There are some functions that are easier to write in Python than in SQL. If you want to transform outputs of a SQL query using Python, use TRANSFORM .

To see how this extension works, start with creating a function called macd . This function uses pandas-ta to get a certain trend of the time series.

# schema: *,macd:double
def macd(df:pd.DataFrame) -> pd.DataFrame:
  """get macd - the indicator of the trend of the timeseries"""
  
    import pandas_ta
    macd = df.ta.macd(close='close', fast=12, slow=26, signal=9)
    return df.assign(macd=macd.iloc[:,2])

We also add the schema hint as a comment (# schema: *,macd:double) on top of the function macd so Fugue can read this schema hint and apply the schema.

Now we can use this function to transform the data in our query:

Cool! We have just transformed a SQL output using a Python function.

Learn more about TRANSFORM and PREPARTITION in Fugue here.

Fugue + DuckDB in Production

To bring FugueSQL out of Jupyter notebooks and into Python scripts, all we need to do is wrap the FugueSQL query inside a fsql class. We can then call the .run() method and choose an execution engine to be "duck".

import fugue_duckdb
query = """ src = LOAD "{{save_path}}"
            SELECT * WHERE symbol LIKE '%USDT'
            SELECT symbol, date_trunc('day',time) AS date, SUM(Number_of_trades) AS trades GROUP BY 1,2
            top_symbols = SELECT symbol, AVG(trades) AS trades GROUP BY 1 ORDER BY 2 DESC LIMIT 4
            
            SELECT src.* FROM src INNER JOIN top_symbols ON src.symbol = top_symbols.symbol
            SELECT
                symbol,
                date_trunc('day', time) AS time,
                min_by(Open, time) AS open,
                MAX(High) AS high,
                MIN(Low) AS low,
                max_by(Close, time) AS close,
                SUM(Number_of_trades) AS trades,
                SUM(Volume) AS volume
            GROUP BY 1, 2
            TRANSFORM PREPARTITION BY symbol PRESORT time
            USING macd
            SELECT * ORDER BY time
            OUTPUT USING plot_by(by='symbol', y="macd")
            """
fsql(query).run("duck")

Conclusion

Congratulations! You have just learned to use FugueSQL with DuckDB as a backend to get the most out of local execution. Because of the lazy evaluation DuckDB provides, we can pre-aggregate our data quickly before bringing it to Pandas for further analysis which is hard to do in SQL.

Using Fugue as the interface also allows us to use the strengths of DuckDB seamlessly.

Scroll to Top

Work with Khuyen Tran

Work with Khuyen Tran