Caching System

xorq provides a sophisticated caching system that enables efficient iterative development of ML pipelines. The caching system allows you to:

  • Cache results from upstream query engines
  • Persist data locally or in remote storage
  • Automatically invalidate cache when source data changes
  • Chain caches across multiple engines

Storage Types

xorq supports two main types of cache storage:

1. SourceStorage

  • Automatically invalidates cache when upstream data changes
  • Persistence depends on the source backend
  • Supports both remote (Snowflake, Postgres) and in-process (pandas, DuckDB) backends
import xorq as xo
from xorq.common.caching import SourceStorage

# Connect to source database
pg = xo.postgres.connect_env()
con = xo.connect()  # empty connection

# Create source storage
storage = SourceStorage(source=con)

# Register table from postgres and cache it
batting = pg.table("batting")

# Cache the filtered data in the source backend
cached = (
    batting.filter(batting.yearID == 2015)
    .cache(storage=storage)  # cache expression
)

# Execute the query - results will be cached
result = xo.execute(cached)

2. SnapshotStorage

  • No automatic invalidation
  • Ideal for one-off analyses
  • Persistence depends on source backend

3. ParquetCacheStorage

  • Special case of SourceStorage
  • Caches results as Parquet files on local disk
  • Uses source backend for writing
  • Ensures durable persistence

Hashing Strategies

Cache invalidation uses different hashing strategies based on the storage type:

Storage TypeHash Components
In-MemoryData bytes + Schema
Disk-BasedQuery plan + Schema
RemoteTable metadata + Last modified time

Key Benefits

  1. Faster Iteration:
    • Reduce network calls to source systems
    • Minimize recomputation of expensive operations
    • Cache intermediate results for complex pipelines
  2. Declarative Integration:
    • Chain cache operations anywhere in the expression
    • Transparent integration with existing pipelines
    • Multiple storage options for different use cases
  3. Automatic Management:
    • Smart invalidation based on source changes
    • No manual cache management required
    • Efficient storage utilization
  4. Multi-Engine Support:
    • Cache data between different engines
    • Optimize storage location for performance
    • Flexible persistence options

Multi-Engine System

xorq’s multi-engine system enables seamless data movement between different query engines, allowing you to leverage the strengths of each engine while maintaining a unified workflow.

The into_backend Operator

The core of xorq’s multi-engine capability is the into_backend operator, which enables:

  • Transparent data movement between engines
  • Zero-copy data transfer using Apache Arrow
  • Automatic optimization of data placement
import xorq as xo
from xorq.expr.relations import into_backend

# Connect to different engines
pg = xo.postgres.connect_env()
db = xo.duckdb.connect()

# Get tables from different sources
batting = pg.table("batting")

# Load awards_players into DuckDB
awards_players = xo.examples.awards_players.fetch(backend=db)

# Filter data in respective engines
left = batting.filter(batting.yearID == 2015)
right = awards_players.filter(awards_players.lgID == "NL").drop("yearID", "lgID")

# Move right table into postgres for efficient join
expr = left.join(
    into_backend(right, pg),
    ["playerID"],
    how="semi"
)[["yearID", "stint"]]

# Execute the multi-engine query
result = expr.execute()

Supported Engines

xorq currently supports:

  1. In-Process Engines
    • DuckDB
    • DataFusion
    • Pandas
  2. Distributed Engines
    • Trino
    • Snowflake
    • BigQuery

Engine Selection Guidelines

Choose engines based on their strengths:

  1. DuckDB: Local processing, AsOf joins, efficient file formats
  2. DataFusion: Custom UDFs, streaming processing
  3. Trino: Distributed queries, federation, security
  4. Snowflake/BigQuery: Managed infrastructure, scalability

Data Transfer

Data movement between engines is handled through:

  1. Arrow Flight: Zero-copy data transfer protocol
  2. Memory Management: Automatic spilling to disk
  3. Batching: Efficient chunk-based processing

Custom UD(X)F System

xorq provides a powerful system for extending query engines with custom User-Defined Functions (UDFs). Here are three key types supported:

1. Scalar UDF with Model Integration

import xorq as xo
from xorq.expr.ml import make_quickgrove_udf
from pathlib import Path
from xorq import _

t = xo.examples.diamonds.fetch()

model_path = Path(xo.options.pins.get_path("diamonds-model"))
model = make_quickgrove_udf(model_path, model_name="diamonds_model")
expr = t.mutate(pred=model.on_expr).filter(_.carat < 1).select(_.pred).execute()

2. User Defined Aggregate Functions

from xorq.expr import udf
import xorq.vendor.ibis.expr.datatypes as dt

alltypes = xo.examples.functional_alltypes.fetch()
cols = (by, _) = ["year", "month"]
name = "sum_sum"

@udf.agg.pandas_df(
    schema=alltypes[cols].schema(),
    return_type=dt.int64(),
    name=name,
)
def sum_sum(df):
    return df.sum().sum()

actual = (
    alltypes.group_by(by)
    .agg(sum_sum(*(alltypes[c] for c in cols)).name(name))
    .execute()
)

Additionally you can use UDAF for training ML models, see the example for training an XGBoost model

3. Window UDF for Analysis

import pyarrow as pa
from xorq.expr.udf import pyarrow_udwf
from xorq.vendor import ibis


@pyarrow_udwf(
    schema=ibis.schema({"a": float}),
    return_type=ibis.dtype(float),
    alpha=0.9,
)
def exp_smooth(self, values: list[pa.Array], num_rows: int) -> pa.Array:
    results = []
    curr_value = 0.0
    values = values[0]
    for idx in range(num_rows):
        if idx == 0:
            curr_value = values[idx].as_py()
        else:
            curr_value = values[idx].as_py() * self.alpha + curr_value * (
                1.0 - self.alpha
            )
        results.append(curr_value)

    return pa.array(results)


df = pd.DataFrame(
    [
        (0, 7, "A"),
        (1, 4, "A"),
        (2, 3, "A"),
        (3, 8, "A"),
        (4, 9, "B"),
        (5, 1, "B"),
        (6, 6, "B"),
    ],
    columns=["a", "b", "c"],
)

t = xo.register(df, table_name="t")

expr = t.select(
    t.a,
    udwf=exp_smooth.on_expr(t).over(ibis.window()),
).order_by(t.a)

result = expr.execute()

Ephemeral Flight Service

xorq’s Ephemeral Flight Service provides a high-performance data transfer mechanism between engines using Apache Arrow Flight. Unlike traditional data transfer methods, this service provides:

  1. Automatic Lifecycle Management
  2. Zero-Copy Data Movement
    • Direct memory transfer between processes
    • No serialization/deserialization overhead
    • Efficient handling of large datasets
  3. Process Isolation
    • Separate processes for different engines
    • Independent resource management
    • Fault isolation
  4. Resource Management
  5. Security Integration

Comparison with Ibis

While xorq is built on top of Ibis, it extends its capabilities in several key ways. Here’s a comprehensive example showing the differences:

import xorq as xo
from xorq.common.caching import ParquetCacheStorage
from xorq.expr.relations import into_backend
import pathlib

# xorq multi-engine workflow
# 1. Connect to different engines
pg = xo.postgres.connect_env()
ddb = xo.duckdb.connect()

# 2. Set up caching
storage = ParquetCacheStorage(
    source=ddb,
    path=pathlib.Path.cwd(),
)

# 3. Create a complex workflow
# - Load data from postgres
# - Cache intermediate results
# - Process in DuckDB
# - Return to postgres
result = (
    pg.table("batting")
    .filter(lambda t: t.yearID > 2014)
    .cache(storage=storage)
    .pipe(into_backend, ddb)
    .join(
        xo.examples.awards_players.fetch(backend=ddb),
        "playerID"
    )
    .pipe(into_backend, pg)
    .execute()
)

# The above workflow demonstrates:
# 1. Multi-engine execution
# 2. Persistent caching
# 3. Seamless data movement
# 4. Integrated example data
# 5. Pipeline optimization

Key Advantages over Ibis

  1. Unified Experience
    • Consistent API across engines
    • Seamless engine transitions
    • Integrated caching system
  2. ML Focus
    • Built-in ML tooling
    • Efficient feature engineering
    • Model training integration
  3. Development Workflow
    • Interactive development support
    • Caching for iteration