Pipeline Pattern

The Pipeline pattern provides a clean way to pass data through a series of operations (pipes). Each pipe transforms the data and passes it to the next pipe. It's ideal for multi-step processes like order processing, data validation, file uploads, and request handling.

Key features:

You can create a pipeline using the pipeline() helper function:

$result = pipeline($data)
    ->through([
        ValidateData::class,
        TransformData::class,
        SaveData::class,
    ])
    ->run();

Basic Usage

Simple Transformation

$result = pipeline(10)
    ->through([
        fn($n) => $n * 2,  // 10 * 2 = 20
        fn($n) => $n + 5,  // 20 + 5 = 25
    ])
    ->run();

// $result = 25

String Processing

$result = pipeline('hello world')
    ->through([
        fn($str) => strtoupper($str),           // HELLO WORLD
        fn($str) => str_replace(' ', '_', $str), // HELLO_WORLD
    ])
    ->run();

// $result = 'HELLO_WORLD'

Using Pipe Classes

Instead of closures, you can use dedicated pipe classes for better organization and reusability.

Creating a Pipe Class

<?php

namespace App\Pipes;

class UpperCasePipe
{
    public function __invoke($value)
    {
        return strtoupper($value);
    }
}

Using the Pipe

use App\Pipes\UpperCasePipe;

$result = pipeline('hello')
    ->through([
        UpperCasePipe::class,  // Resolved from container
    ])
    ->run();

// $result = 'HELLO'

Dependency Injection

Pipe classes support automatic dependency injection through the container.

Pipe with Dependencies

<?php

namespace App\Pipes\Order;

use App\Services\InventoryService;

class ValidateInventory
{
    protected $inventoryService;

    // Dependencies automatically injected
    public function __construct(InventoryService $inventoryService)
    {
        $this->inventoryService = $inventoryService;
    }

    public function __invoke($order)
    {
        foreach ($order->items as $item) {
            if (!$this->inventoryService->isAvailable($item)) {
                throw new \Exception('Product out of stock');
            }
        }

        return $order;
    }
}

Using It

$order = pipeline($order)
    ->through([
        ValidateInventory::class,  // InventoryService injected automatically
    ])
    ->run();

Real-World Examples

Order Processing Pipeline

<?php

namespace App\Controllers;

use App\Pipes\Order\ValidateInventory;
use App\Pipes\Order\ApplyDiscounts;
use App\Pipes\Order\CalculateTax;
use App\Pipes\Order\ProcessPayment;
use App\Pipes\Order\ReserveInventory;
use App\Pipes\Order\SendConfirmationEmail;

class OrderController
{
    public function checkout($request)
    {
        $order = new Order;
        $order->customer_id = $request->customer_id;
        $order->save();

        $order = pipeline($order)
            ->through([
                ValidateInventory::class,
                ApplyDiscounts::class,
                CalculateTax::class,
                ProcessPayment::class,
                ReserveInventory::class,
                SendConfirmationEmail::class,
            ])
            ->run();

        return response()->json($order);
    }
}

File Upload Pipeline

$file = pipeline($request->file('document'))
    ->through([
        ValidateFileType::class,
        ValidateFileSize::class,
        ScanForVirus::class,
        OptimizeFile::class,
        UploadToS3::class,
        CreateDatabaseRecord::class,
    ])
    ->run();

Data Validation Pipeline

$data = pipeline($request->all())
    ->through([
        SanitizeInput::class,
        ValidateEmail::class,
        CheckDuplicateEmail::class,
        ValidatePassword::class,
        NormalizePhone::class,
    ])
    ->run();

$user = User::create($data);

Lead Scoring Pipeline (CRM)

$lead = pipeline($lead)
    ->through([
        ScoreByCompanySize::class,    // +20 if > 100 employees
        ScoreByIndustry::class,       // +15 if tech
        ScoreByEngagement::class,     // +10 per email open
        ScoreByJobTitle::class,       // +25 if C-level
        AssignSalesRep::class,        // Auto-assign if score > 80
    ])
    ->run();

Mixing Pipe Types

You can mix class names, instances, and closures in the same pipeline:

$result = pipeline($data)
    ->through([
        ValidateData::class,              // Class name (DI)
        new TransformData($config),       // Instance
        fn($data) => $data['result'],     // Closure
        ProcessData::class,               // Class name (DI)
    ])
    ->run();

Conditional Pipes

You can conditionally add pipes based on your business logic by building the pipes array dynamically:

Basic Conditional Pipes

// Build pipes array conditionally
$pipes = [
    ValidateInventory::class,
];

if ($order->hasCoupon()) {
    $pipes[] = ApplyDiscount::class;
}

if ($order->isInternational()) {
    $pipes[] = CalculateInternationalTax::class;
} else {
    $pipes[] = CalculateTax::class;
}

$pipes[] = ProcessPayment::class;

// Run pipeline with conditional pipes
$result = pipeline($order)
    ->through($pipes)
    ->run();

Order Type Based Pipes

$pipes = [ValidateInventory::class];

// Different pipes for different order types
switch ($order->type) {
    case 'wholesale':
        $pipes[] = ApplyWholesaleDiscount::class;
        // No tax for wholesale
        break;

    case 'international':
        $pipes[] = ApplyStandardDiscount::class;
        $pipes[] = CalculateInternationalTax::class;
        $pipes[] = CheckCustomsRestrictions::class;
        break;

    default:
        $pipes[] = ApplyStandardDiscount::class;
        $pipes[] = CalculateTax::class;
}

$pipes[] = ProcessPayment::class;

$result = pipeline($order)->through($pipes)->run();

User Role Based Pipes

$pipes = [ValidateData::class];

// Add pipes based on user role
if ($user->isAdmin()) {
    $pipes[] = SkipApproval::class;
} else {
    $pipes[] = RequireApproval::class;
    $pipes[] = NotifyManager::class;
}

$pipes[] = SaveData::class;

$result = pipeline($data)->through($pipes)->run();

Feature Flag Based Pipes

$pipes = [ProcessOrder::class];

// Add pipes based on feature flags
if (feature('new_discount_engine')) {
    $pipes[] = ApplyNewDiscounts::class;
} else {
    $pipes[] = ApplyLegacyDiscounts::class;
}

if (feature('fraud_detection')) {
    $pipes[] = CheckFraud::class;
}

$pipes[] = CompleteOrder::class;

$result = pipeline($order)->through($pipes)->run();

Using Helper Method

For complex conditional logic, extract to a helper method:

class OrderPipeline
{
    public static function getPipes(Order $order): array
    {
        $pipes = [ValidateInventory::class];

        // Discount logic
        if ($order->hasCoupon()) {
            $pipes[] = ApplyDiscount::class;
        }

        // Tax logic
        if ($order->isInternational()) {
            $pipes[] = CalculateInternationalTax::class;
            $pipes[] = CheckCustomsRestrictions::class;
        } else {
            $pipes[] = CalculateTax::class;
        }

        // Payment
        $pipes[] = ProcessPayment::class;

        // Post-processing
        if ($order->requiresShipping()) {
            $pipes[] = CalculateShipping::class;
            $pipes[] = NotifyWarehouse::class;
        }

        return $pipes;
    }
}

// Usage
$result = pipeline($order)
    ->through(OrderPipeline::getPipes($order))
    ->run();

This approach keeps your controller clean while maintaining flexibility.

Error Handling

Pipes can throw exceptions to stop the pipeline:

class ValidateInventory
{
    public function __invoke($order)
    {
        if (!$this->hasStock($order)) {
            throw new OutOfStockException('Insufficient inventory');
        }

        return $order;
    }
}

Usage with error handling:

try {
    $order = pipeline($order)
        ->through([
            ValidateInventory::class,
            ProcessPayment::class,
        ])
        ->run();
} catch (OutOfStockException $e) {
    return response()->json(['error' => $e->getMessage()], 400);
}

Testing Pipes

Each pipe can be tested independently:

<?php

namespace Tests\Pipes;

use Tests\TestCase;
use App\Pipes\Order\ValidateInventory;
use App\Models\Order;
use App\Models\Product;

class ValidateInventoryTest extends TestCase
{
    public function testValidatesSuccessfully()
    {
        // Arrange
        $product = new Product;
        $product->stock = 100;
        $product->save();

        $order = new Order;
        $order->save();
        $order->items()->create([
            'product_id' => $product->id,
            'quantity' => 10,
        ]);

        // Act
        $pipe = new ValidateInventory($this->inventoryService);
        $result = $pipe($order);

        // Assert
        $this->assertInstanceOf(Order::class, $result);
    }

    public function testThrowsExceptionWhenOutOfStock()
    {
        // Test exception case
        $this->expectException(OutOfStockException::class);

        // ... test code
    }
}

Best Practices

Keep Pipes Small

Each pipe should have a single responsibility:

// ✅ Good - single responsibility
class ValidateInventory { ... }
class ProcessPayment { ... }
class SendEmail { ... }

// ❌ Bad - doing too much
class ProcessOrderAndSendEmailAndUpdateInventory { ... }

Make Pipes Reusable

Design pipes to work in different contexts:

// ✅ Good - reusable
class ValidateEmail
{
    public function __invoke($data)
    {
        if (!filter_var($data['email'], FILTER_VALIDATE_EMAIL)) {
            throw new ValidationException('Invalid email');
        }
        return $data;
    }
}

// Can be used in multiple pipelines:
// - User registration
// - Profile update
// - Contact form

Use Dependency Injection

Let the container handle dependencies:

// ✅ Good - uses DI
class ProcessPayment
{
    public function __construct(PaymentGateway $gateway)
    {
        $this->gateway = $gateway;
    }
}

// ❌ Bad - creates dependencies
class ProcessPayment
{
    public function __invoke($order)
    {
        $gateway = new PaymentGateway(); // Hard to test
    }
}

Return the Passable

Always return the data being passed through:

// ✅ Good
public function __invoke($order)
{
    $order->validated = true;
    return $order;  // Return it!
}

// ❌ Bad
public function __invoke($order)
{
    $order->validated = true;
    // Forgot to return - breaks pipeline!
}

When to Use Pipeline

Use Pipeline for:

Don't use Pipeline for:

Configuration-Driven Pipelines

You can make pipelines configurable:

// config/pipelines.php
return [
    'order_processing' => [
        'standard' => [
            ValidateInventory::class,
            ApplyDiscounts::class,
            CalculateTax::class,
            ProcessPayment::class,
        ],
        'wholesale' => [
            ValidateInventory::class,
            ApplyWholesaleDiscounts::class,
            ProcessPayment::class,  // No tax for wholesale
        ],
    ],
];

// Usage
$pipes = config("pipelines.order_processing.{$order->type}");

$order = pipeline($order)
    ->through($pipes)
    ->run();

This allows you to change business logic without code changes.

API Reference

pipeline($passable)

Create a new pipeline instance.

Parameters:

Returns: Pipeline instance

through(array $pipes)

Set the pipes to pass data through.

Parameters:

Returns: Pipeline instance (chainable)

run()

Execute the pipeline and return the result.

Returns: The transformed passable

Example:

$result = pipeline($data)
    ->through([...])
    ->run();