110 Further scikit-learn pipelines#

COM6018

Copyright © 2023–2025 Jon Barker, University of Sheffield. All rights reserved.

1. Introduction#

In this tutorial, we will revisit the concept of pipelines in scikit-learn. Previously we saw some basic examples of how to use pipelines to streamline machine learning workflows. Here, we will explore more advanced features, pipelines for dealing with complex data types or multiple feature sets, and writing custom pipeline components.

2. Recap: What is a Pipeline?#

A pipeline in scikit-learn is a way to streamline a sequence of data processing steps, such as data preprocessing, feature extraction, and model training. It allows you to chain multiple steps together into a single object that can be treated like a standard estimator.

The benefits of using pipelines include:

  • Simplified Code: Pipelines help to organize code and reduce redundancy by encapsulating the entire workflow in a single object.

  • Reproducibility: Pipelines ensure that the same sequence of steps is applied consistently during training and prediction.

  • Hyperparameter Tuning: Pipelines can be integrated with hyperparameter tuning tools like GridSearchCV, allowing for optimization of the entire workflow.

  • Avoiding Data Leakage: By encapsulating preprocessing steps within the pipeline, you can prevent data leakage during cross-validation.

3. Creating a Basic Pipeline#

Let’s start by creating a simple pipeline that includes data preprocessing and a classifier. We’ll use the Pipeline class from sklearn.pipeline. We’ll consider the Iris dataset for this example. The pipeline will include standard scaling, PCA for dimensionality reduction, and a Random Forest classifier.

The full code is as follows:

from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from sklearn.ensemble import RandomForestClassifier 
from sklearn.model_selection import train_test_split
from sklearn.datasets import load_iris

# Load dataset
data = load_iris()
X = data.data
y = data.target

# Split data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Build the pipeline
pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('pca', PCA(n_components=2)),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

# Train the pipeline
pipeline.fit(X_train, y_train)
Pipeline(steps=[('scaler', StandardScaler()), ('pca', PCA(n_components=2)),
                ('classifier', RandomForestClassifier(random_state=42))])
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
# Evaluate the pipeline
accuracy = pipeline.score(X_test, y_test)
print(f'Accuracy: {accuracy:.2f}')  
Accuracy: 0.90

This code creates a pipeline that standardizes the data, reduces its dimensionality using PCA, and then trains a Random Forest classifier. The pipeline is then evaluated on the test set.

When building the pipeline, each step is given a name (e.g., ‘scaler’, ‘pca’, ‘classifier’). This name can then be used to access specific steps in the pipeline later on, or for retrieving the parameter values after fitting. Note that the labels need to be unique within the pipeline, but that the labels mean that there is no ambiguity if the pipeline contains multiple instances of the same type of transformer or estimator.

So for example, to retrieve the scaler step’s scale and mean values after fitting, you can do:

scaler = pipeline.named_steps['scaler']
print(scaler.scale_)
print(scaler.mean_)
[0.82036535 0.44724776 1.74502786 0.74914766]
[5.80916667 3.06166667 3.72666667 1.18333333]

4. Advanced Pipeline: Pairwise Data#

Now let’s imagine that rather than a classification problem, we wanted to consider this as a verification problem. In this case, we have a pair of samples from the Iris dataset, and we want to determine if they belong to the same class or not.

We will first need to create a dataset of pairs of samples, along with labels indicating whether they belong to the same class. Then we can build a pipeline that processes these pairs and trains a classifier to predict whether the samples in each pair are from the same class.

First we will create the dataset of pairs:

import numpy as np
from sklearn.utils import shuffle
def create_pairs_flat(X, y, seed=42):
    np.random.seed(seed)
    
    pairs = []
    labels = []
    num_classes = len(np.unique(y))
    class_indices = [np.where(y == i)[0] for i in range(num_classes)]
    
    for idx1 in range(len(X)):
        x1 = X[idx1]
        label1 = y[idx1]
        
        # ---- Positive pair ----
        idx2 = np.random.choice(class_indices[label1])
        x2 = X[idx2]
        # flatten: (4,) + (4,) -> (8,)
        pairs.append(np.concatenate([x1, x2]))
        labels.append(1)
        
        # ---- Negative pair ----
        neg_label = (label1 + np.random.randint(1, num_classes)) % num_classes
        idx2 = np.random.choice(class_indices[neg_label])
        x2 = X[idx2]
        pairs.append(np.concatenate([x1, x2]))
        labels.append(0)
    
    pairs = np.array(pairs)      # shape = (2 * len(X), 8)
    labels = np.array(labels)    # shape = (2 * len(X),)
    
    return pairs, labels

X_train, y_train = create_pairs_flat(X_train, y_train)
X_test, y_test = create_pairs_flat(X_test, y_test)
print(X_train.shape, y_train.shape)
print(X_test.shape, y_test.shape)
(240, 8) (240,)
(60, 8) (60,)

Note that the shape of X_train is now (240, 8), indicating that we have 240 samples, each is a pair of the 4-dimensional Iris samples flattened into an 8-dimensional vector.

In this task, the samples are pairs of simple 4-dimensional vectors, but in other applications, they could be more complex data types, such as images or text. We will generally want to treat each element of the pair separately in the pipeline, e.g. we might want to apply PCA to each element of the pair independently.

To handle this we can use the ColumnTransformer to apply different transformations to different subsets of the features. In this case, we will apply PCA separately to the first and second elements of each pair. The ColumnTransformer is defined using a list of tuples, where each tuple specifies a name for the transformation, the transformer itself, and the columns to which it should be applied. A full example of this pipeline is shown below:

import numpy as np
from sklearn.decomposition import PCA
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer


pipeline = Pipeline([
    ('pca_pairs', ColumnTransformer(
        transformers=[
            # first item in pair: columns 0..3
            ('pca1', PCA(n_components=2), slice(0, 4)),
            # second item in pair: columns 4..7
            ('pca2', PCA(n_components=2), slice(4, 8)),
        ],
        remainder='drop',   # default, but explicit is nice
    )),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42)),
])

# Train the pipeline
pipeline.fit(X_train, y_train)

# evaluate the pipeline
accuracy = pipeline.score(X_test, y_test)

print(f'Accuracy: {accuracy:.2f}')
Accuracy: 0.88

We can extend the above example to include the Scaler step as well, by including it in the ColumnTransformer. Notice how each transform is the ColumnTransformer is itself a Pipeline, i.e. Pipelines can be used anywhere an estimator is expected, including inside other Pipelines or ColumnTransformers. This allows very flexible and complex workflows to be constructed.

import numpy as np
from sklearn.decomposition import PCA
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import FunctionTransformer

pipeline = Pipeline([
    ('preprocess', ColumnTransformer(
        transformers=[
            ('pair1', Pipeline([
                ('scaler', StandardScaler()),
                ('pca', PCA(n_components=2)),
            ]), slice(0, 4)),
            ('pair2', Pipeline([
                ('scaler', StandardScaler()),
                ('pca', PCA(n_components=2)),
            ]), slice(4, 8)),
        ],
        remainder='drop',
    )),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42)),
])


# Train the pipeline
pipeline.fit(X_train, y_train)
Pipeline(steps=[('preprocess',
                 ColumnTransformer(transformers=[('pair1',
                                                  Pipeline(steps=[('scaler',
                                                                   StandardScaler()),
                                                                  ('pca',
                                                                   PCA(n_components=2))]),
                                                  slice(0, 4, None)),
                                                 ('pair2',
                                                  Pipeline(steps=[('scaler',
                                                                   StandardScaler()),
                                                                  ('pca',
                                                                   PCA(n_components=2))]),
                                                  slice(4, 8, None))])),
                ('classifier', RandomForestClassifier(random_state=42))])
In a Jupyter environment, please rerun this cell to show the HTML representation or trust the notebook.
On GitHub, the HTML representation is unable to render, please try loading this page with nbviewer.org.
# evaluate the pipeline
accuracy = pipeline.score(X_test, y_test)

print(f'Accuracy: {accuracy:.2f}')
Accuracy: 0.82

When accessing the parameters in these nested pipelines, you can use the double underscore (__) notation to specify the path to the desired parameter. For example, to access the n_components parameter of the PCA applied to the first item in the pair, you would do:

n_components_pair1 = pipeline.get_params()['preprocess__pair1__pca__n_components']
print(n_components_pair1)
2

5. Writing Components using FunctionTransformer#

For more complex workflows, you may need to create your own transformers or estimators to use within your pipeline.

If you have a very simple transformation which does not require any fitting, you can use the FunctionTransformer from sklearn.preprocessing. This allows you to wrap a simple function as a transformer. For example, going back to our verification task, say you want to compute the difference between the two items in each pair before passing them to the classifier. You can do this using a FunctionTransformer as follows:

from sklearn.preprocessing import FunctionTransformer

def compute_difference(X):
    return X[:, :4] - X[:, 4:]

pipeline = Pipeline([
    ('difference', FunctionTransformer(compute_difference)),
    ('scaler', StandardScaler()),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

# Train the pipeline
pipeline.fit(X_train, y_train)

# Evaluate the pipeline
accuracy = pipeline.score(X_test, y_test)
print(f'Accuracy: {accuracy:.2f}')
Accuracy: 0.93

This pattern can be used even if the function has it’s own parameters, as long as those parameters are fixed when the FunctionTransformer is created, i.e. and do not need to be learned from the data. The values of these parameters can be passed using the kw_args argument to the FunctionTransformer.

For example, consider a function to add Gaussian noise to the input data:

import numpy as np
from sklearn.preprocessing import FunctionTransformer

NOISE_LEVEL = 0.8

def add_noise(X, noise_level=0.1, rng=None):
    if rng is None:
        rng = np.random.default_rng()
    noise = rng.normal(0, noise_level, X.shape)
    return X + noise


pipeline = Pipeline([
    ('scaler', StandardScaler()),
    ('gaussian', FunctionTransformer(add_noise, kw_args={'noise_level':NOISE_LEVEL, 'rng': np.random.default_rng(42)})),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])

# Train the pipeline
pipeline.fit(X_train, y_train)

# Evaluate the pipeline
accuracy = pipeline.score(X_test, y_test)
print(f'Accuracy with noise level {NOISE_LEVEL}: {accuracy:.2f}')
Accuracy with noise level 0.8: 0.72

6. Writing Custom Transformers#

If you need a more complex transformer that requires fitting to the data, you can create a custom transformer by defining a new class.

To create a custom transformer, you need to define a class that implements the fit and transform methods (and optionally fit_transform for efficiency). The class should also inherit from BaseEstimator and TransformerMixin from sklearn.base.

As an example, imagine a transform that is designed to mitigate the effect of outliers in the data. We will create a transform called OutlierClipper that (i) learns the interquartile range of the data during fitting and then (ii) clips any values outside a given factor times the IQR during transformation.

The learnable parameter here is the IQR (interquartile range). This parameter is stored as an attribute of the class after fitting. We then use this to compute the clipping bounds – which are also stored as attributes. The bounds are then used during transformation to clip the data.

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin 

class OutlierClipper(BaseEstimator, TransformerMixin):
    def __init__(self, factor=1.5):
        self.factor = factor

    def fit(self, X, y=None):
        Q1 = np.percentile(X, 25, axis=0)
        Q3 = np.percentile(X, 75, axis=0)
        self.IQR_ = Q3 - Q1
        self.lower_bound_ = Q1 - self.factor * self.IQR_
        self.upper_bound_ = Q3 + self.factor * self.IQR_
        return self

    def transform(self, X):
        X_clipped = np.clip(X, self.lower_bound_, self.upper_bound_)
        return X_clipped

Once we have defined the custom transformer, we can use it in a pipeline just like any other transformer. Here is an example of using the OutlierClipper in a pipeline:

pipeline = Pipeline([
    ('clipper', OutlierClipper(factor=1.5)),
    ('classifier', RandomForestClassifier(n_estimators=100, random_state=42))
])  

# Train the pipeline
pipeline.fit(X_train, y_train)

# Evaluate the pipeline
accuracy = pipeline.score(X_test, y_test)
print(f'Accuracy with noise: {accuracy:.2f}')
Accuracy with noise: 0.95

After fitting, we could inspect the learned bounds of the Clipper as follows:

print(f'Clipper lower bounds: {pipeline.named_steps["clipper"].lower_bound_}')
print(f'Clipper upper bounds: {pipeline.named_steps["clipper"].upper_bound_}')
Clipper lower bounds: [ 3.15  1.9  -3.9  -1.95  3.15  1.9  -3.9  -2.2 ]
Clipper upper bounds: [ 8.35  4.3  10.5   4.05  8.35  4.3  10.5   4.2 ]

7. Summary#

In this tutorial, we revisited scikit-learn pipelines and saw how they can be used to organise end-to-end machine learning workflows. We started from a simple pipeline combining scaling, dimensionality reduction and classification on the Iris dataset. We then extended this idea to pairwise (verification-style) data using ColumnTransformer and nested Pipeline objects, allowing different transformations to be applied to different parts of the input. Finally, we explored how to customise pipelines by wrapping simple functions with FunctionTransformer and by implementing fully custom transformers using BaseEstimator and TransformerMixin, enabling learnable preprocessing steps such as outlier clipping.