In this tutorial, we’ll build an end-to-end machine learning pipeline using xorq expressions to predict the number of comments a Hacker News story will receive based on its title. The pipeline fetches live data, processes text, trains a model, and makes predictions - all expressed as a single composable expression.

Fetching Live Data

First, we create a function to fetch the latest stories from Hacker News:

def hackernews_stories():
    # Get most recent item ID
    latest_item = requests.get(
        "https://hacker-news.firebaseio.com/v0/maxitem.json"
    ).json()

    # Fetch the 1000 most recent items
    results = []
    scope = range(latest_item - 1000, latest_item)
    for item_id in scope:
        item = requests.get(
            f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json"
        ).json()
        results.append(item)

    # Filter for valid stories
    df = pd.DataFrame(results)
    if len(df) > 0:
        df = df[df.type == "story"]
        df = df[~df.title.isna()]

    return pa.Table.from_pandas(df).to_reader()

We then load this data into xorq:

con = xo.connect()
t = con.read_record_batches(hackernews_stories())

Text Vectorization

To process the story titles, we use TF-IDF vectorization:

vectorizer = TfidfVectorizer()

@toolz.curry
def unbound_fit_transform(data, model):
    return model.fit_transform(data.to_pylist()).toarray()

@toolz.curry
def unbound_transform(data, model):
    return model.transform(data.to_pylist()).toarray()

These functions are wrapped as PyArrow UDFs so they can be used in expressions:

@ibis.udf.scalar.pyarrow
def fit_transform(title: dt.string) -> dt.Array(dt.float64):
    transformed = inner_fit_transform(title)
    return pa.array(transformed.tolist())

@ibis.udf.scalar.pyarrow
def transform(title: dt.string) -> dt.Array(dt.float64):
    transformed = inner_transform(title)
    return pa.array(transformed.tolist())

Train/Test Split and Feature Transformation

We split our data while preserving uniqueness based on ID and title:

(train, test) = xo.train_test_splits(
    t,
    unique_key=("id", "title",),
    test_sizes=0.2,
)

And transform both sets using our vectorization UDFs:

transformed_train_data = train.select(
    title=fit_transform(train.title),
    descendants=train.descendants.fill_null(0)
)

transformed_test_data = test.select(
    title=transform(test.title),
    descendants=test.descendants.fill_null(0)
)

Model Training and Prediction

We define functions for training an XGBoost regressor and making predictions with it:

@toolz.curry
def train_xgboost_model(df, features, target, seed=0):
    X = pd.DataFrame(df[list(features)].squeeze().tolist())
    y = df[target]

    xgb_r = xgb.XGBRegressor(
        objective="reg:squarederror",
        eval_metric=mean_absolute_error,
        n_estimators=20,
        seed=seed
    )
    xgb_r.fit(X, y)
    return xgb_r

@toolz.curry
def predict_xgboost_model(df, model, features):
    X = pd.DataFrame(df[list(features)].squeeze().tolist())
    return model.predict(X)

Building the Pipeline Expression

Finally, we compose everything together into a single expressions, that works as a pipeline:

# Create model training UDAF
model_udaf = udf.agg.pandas_df(
    fn=toolz.compose(wrap_model(model_key=model_key), train_fn),
    schema=transformed_train_data.schema(),
    return_type=dt.binary,
    name=model_key,
)

# Create prediction UDF
predict_expr_udf = make_pandas_expr_udf(
    computed_kwargs_expr=model_udaf.on_expr(transformed_train_data),
    fn=predict_fn,
    schema=transformed_test_data[features].schema(),
    return_type=dt.float32,
    name=prediction_key,
)

# Create final expression
expr = transformed_test_data.mutate(
    predict_expr_udf.on_expr(transformed_test_data).name(prediction_key)
)

To run the pipeline and get predictions:

expr.execute()

The key advantage of this approach is that everything - from data fetching through prediction - is expressed as a single composable pipeline. xorq handles the execution details, optimization, and can even cache intermediate results.

This makes it easy to:

  • Update the pipeline with new data
  • Modify individual steps without rewriting the whole pipeline
  • Cache and reuse expensive computations
  • Execute different parts of the pipeline on different engines

This expression-based approach provides a clean, declarative way to build ML pipelines while maintaining the flexibility to use powerful ML libraries like scikit-learn and XGBoost.