Open WebUI’s Pipelines feature is one of its most powerful and underused capabilities. Pipelines let you intercept every message that flows through Open WebUI — before it reaches the model and after the model responds — and run custom Python logic at each stage. You can use this to add RAG retrieval, apply content filters, route requests to different models based on content, log conversations, enforce rate limits, call external APIs, and much more. This guide explains how Pipelines work and walks through building several practical examples from scratch.
Pipelines are Python classes that Open WebUI loads and manages. They run as a separate service alongside Open WebUI, connected via HTTP. This separation means a buggy pipeline cannot crash Open WebUI itself, and you can hot-reload pipelines during development without restarting the main application. Each pipeline can expose configuration valves — user-adjustable parameters that appear as settings in the Open WebUI interface — making your pipelines configurable without code changes.
Setting Up the Pipelines Service
The Pipelines service is a separate Docker container that Open WebUI connects to. Start it alongside your existing Open WebUI and Ollama setup:
docker run -d \ --name pipelines \ --network open-webui-network \ -p 9099:9099 \ -v ./pipelines:/app/pipelines \ ghcr.io/open-webui/pipelines:main
Then connect Open WebUI to it. In Open WebUI, go to Admin Panel → Settings → Connections and add a new OpenAI API connection with the URL http://pipelines:9099 (if running in the same Docker network) or http://localhost:9099 (if running directly). Set the API key to 0p3n-w3bu! — the default key for the Pipelines service. Once connected, any pipeline files you place in the mounted ./pipelines directory are automatically loaded and appear as models in Open WebUI.
Pipeline Types
There are three types of pipelines, each serving a different purpose. A Filter pipeline sits between the user and the model — it can inspect and modify both the incoming messages (inlet) and the outgoing response (outlet). A Pipe pipeline acts as a custom model endpoint — when a user selects it, their messages go to your pipeline’s pipe function instead of to a real model, giving you complete control over what generates the response. A Manifold pipeline is a special pipe that dynamically registers multiple model IDs, useful for exposing many backend models through a single pipeline class.
For most use cases, Filter pipelines are what you want — they let you augment or control the flow without replacing the model entirely. Pipe pipelines are appropriate when you want to implement a custom model backend, such as a RAG pipeline that handles retrieval and generation itself, or a router that selects from multiple models based on message content.
Your First Filter Pipeline
Here is a minimal Filter pipeline that logs every message and adds a system prompt if none is present:
from typing import List, Optional
from pydantic import BaseModel
class Pipeline:
class Valves(BaseModel):
system_prompt: str = "You are a helpful assistant."
log_messages: bool = True
def __init__(self):
self.name = "System Prompt Injector"
self.valves = self.Valves()
async def on_startup(self):
print(f"Pipeline {self.name} started")
async def on_shutdown(self):
print(f"Pipeline {self.name} stopped")
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
"""Called before the request reaches the model."""
messages = body.get("messages", [])
if self.valves.log_messages:
last = next((m for m in reversed(messages) if m["role"] == "user"), None)
if last:
print(f"[{user.get('name', 'unknown')}] {last['content'][:100]}")
# Inject system prompt if missing
has_system = any(m["role"] == "system" for m in messages)
if not has_system and self.valves.system_prompt:
body["messages"] = [
{"role": "system", "content": self.valves.system_prompt}
] + messages
return body
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
"""Called after the model responds."""
return bodySave this as system_prompt.py in your pipelines directory. It appears immediately in Open WebUI as a filter that can be enabled for any model. The Valves inner class defines configurable parameters — system_prompt and log_messages — that appear as editable fields in Open WebUI’s pipeline settings UI. Users with admin access can adjust these values without touching the code.
A RAG Pipeline
A Pipe pipeline that performs retrieval-augmented generation is one of the most useful things you can build. This example searches a local document collection for relevant context before sending the augmented prompt to Ollama:
import httpx
from typing import Iterator, Generator
class Pipeline:
class Valves(BaseModel):
ollama_url: str = "http://localhost:11434"
ollama_model: str = "llama3.2"
embed_model: str = "nomic-embed-text"
top_k: int = 3
def __init__(self):
self.name = "Local RAG"
self.valves = self.Valves()
self.documents = [] # List of {"text": str, "embedding": list}
async def on_startup(self):
await self.load_documents()
async def load_documents(self):
"""Load and embed documents from a local directory."""
import os, pathlib
docs_dir = pathlib.Path("/app/pipelines/docs")
if not docs_dir.exists():
return
async with httpx.AsyncClient(timeout=60) as client:
for f in docs_dir.glob("*.txt"):
text = f.read_text()
resp = await client.post(
f"{self.valves.ollama_url}/api/embed",
json={"model": self.valves.embed_model, "input": text[:2000]}
)
emb = resp.json()["embeddings"][0]
self.documents.append({"text": text, "embedding": emb, "name": f.name})
print(f"Loaded {len(self.documents)} documents")
def cosine_sim(self, a, b):
dot = sum(x*y for x,y in zip(a,b))
na = sum(x**2 for x in a)**0.5
nb = sum(x**2 for x in b)**0.5
return dot / (na * nb) if na and nb else 0
async def pipe(self, body: dict, __user__: dict = {}) -> str:
messages = body.get("messages", [])
query = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "")
context = ""
if self.documents and query:
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(
f"{self.valves.ollama_url}/api/embed",
json={"model": self.valves.embed_model, "input": query}
)
q_emb = resp.json()["embeddings"][0]
ranked = sorted(self.documents, key=lambda d: self.cosine_sim(q_emb, d["embedding"]), reverse=True)
context = "
".join(d["text"][:500] for d in ranked[:self.valves.top_k])
augmented = f"Context:
{context}
Question: {query}" if context else query
full_messages = [m for m in messages if m["role"] != "user"] + [{"role":"user","content":augmented}]
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.post(
f"{self.valves.ollama_url}/api/chat",
json={"model": self.valves.ollama_model, "messages": full_messages, "stream": False}
)
return resp.json()["message"]["content"]Place text files in /app/pipelines/docs/ inside the container. On startup the pipeline reads and embeds each file, then at query time it embeds the user’s question, finds the most similar documents by cosine similarity, and prepends them as context. This is a complete local RAG implementation with no external vector database required — suitable for small document collections of up to a few hundred files.
A Model Router Pipeline
Route requests to different Ollama models based on message content — coding questions go to a code model, general questions go to a general model:
import httpx, re
class Pipeline:
class Valves(BaseModel):
ollama_url: str = "http://localhost:11434"
code_model: str = "qwen2.5-coder:7b"
general_model: str = "llama3.2"
def __init__(self):
self.name = "Smart Router"
self.valves = self.Valves()
CODE_PATTERNS = re.compile(
r"\b(code|function|script|bug|error|debug|python|javascript|bash|sql|implement|refactor)\b",
re.IGNORECASE
)
async def pipe(self, body: dict, __user__: dict = {}) -> str:
messages = body.get("messages", [])
last_user = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "")
model = self.valves.code_model if self.CODE_PATTERNS.search(last_user) \
else self.valves.general_model
async with httpx.AsyncClient(timeout=120) as client:
resp = await client.post(
f"{self.valves.ollama_url}/api/chat",
json={"model": model, "messages": messages, "stream": False}
)
result = resp.json()["message"]["content"]
return f"*[Routed to {model}]*
{result}"The router uses a simple regex to classify messages as code-related or general, then selects the appropriate model. The response is prefixed with which model handled it so users can see the routing decision. For a production router you would use a classifier model rather than regex — make a fast call to a small model like llama3.2:3b asking it to classify the query, then route to the appropriate specialist model based on the classification result.
A Rate Limiting Filter
Prevent any single user from flooding the system with requests using a simple in-memory rate limiter in the inlet filter:
import time
from collections import defaultdict
from fastapi import HTTPException
class Pipeline:
class Valves(BaseModel):
requests_per_minute: int = 10
def __init__(self):
self.name = "Rate Limiter"
self.valves = self.Valves()
self.request_times = defaultdict(list)
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
if not user:
return body
user_id = user.get("id", "anonymous")
now = time.time()
window = 60
self.request_times[user_id] = [
t for t in self.request_times[user_id] if now - t < window
]
if len(self.request_times[user_id]) >= self.valves.requests_per_minute:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Max {self.valves.requests_per_minute} requests per minute."
)
self.request_times[user_id].append(now)
return bodyRaising an HTTPException from the inlet function stops the request before it reaches the model and returns the error to the Open WebUI client. The sliding window tracks request timestamps per user, pruning entries older than 60 seconds on each call. This in-memory approach is stateless across restarts — for persistent rate limiting across pipeline restarts, write the timestamps to a file or a Redis instance.
Streaming Responses from a Pipe
For a better user experience, return a generator from your pipe method to stream tokens rather than waiting for the full response:
import json
from typing import Generator
async def pipe(self, body: dict, __user__: dict = {}) -> Generator:
messages = body.get("messages", [])
model = body.get("model", self.valves.ollama_model)
async with httpx.AsyncClient(timeout=120) as client:
async with client.stream(
"POST",
f"{self.valves.ollama_url}/api/chat",
json={"model": model, "messages": messages, "stream": True}
) as resp:
async for line in resp.aiter_lines():
if not line:
continue
chunk = json.loads(line)
token = chunk.get("message", {}).get("content", "")
if token:
yield token
if chunk.get("done"):
breakWhen the pipe method is an async generator (using yield), Open WebUI automatically handles the streaming response — tokens appear in the chat interface as they are generated, just as they would from a direct Ollama connection. This makes the pipeline transparent to the user in terms of streaming behaviour while still allowing you to inject arbitrary logic before and after the generation.
Debugging Pipelines
Pipelines run in a separate process, so debugging requires looking at the pipelines container logs rather than the Open WebUI logs. Tail them with docker logs -f pipelines while testing — any print() statements in your pipeline code appear there, making it easy to trace what is happening. For more structured logging, use Python’s logging module at the top of your pipeline file.
The Pipelines service exposes a management API at http://localhost:9099. You can list loaded pipelines with GET /v1/pipelines, upload a new pipeline file with POST /v1/pipelines/upload, and trigger a reload without restarting the container. During development, mount your pipeline files as a volume (as shown in the Docker run command above) so you can edit them locally and they update inside the container immediately — the service polls for file changes and reloads automatically.
Installing Pipeline Dependencies
If your pipeline requires Python packages beyond what is pre-installed in the Pipelines image, you can specify them directly in your pipeline file using the special requirements comment at the top:
"""title: My Pipeline author: yourname requirements: sentence-transformers, chromadb """ from sentence_transformers import SentenceTransformer import chromadb
The Pipelines service reads the requirements field and installs the listed packages with pip when the pipeline is first loaded. This makes it possible to use any Python package — vector databases, PDF parsers, web scrapers, ML libraries — without building a custom Docker image. Packages are installed into the container’s Python environment and persist until the container is recreated.
A Content Moderation Filter
Use a fast local model to screen user messages for inappropriate content before they reach the main model. This is particularly useful when Open WebUI is shared across a team and you want a lightweight safety layer without sending messages to an external moderation API:
import httpx, json
from fastapi import HTTPException
class Pipeline:
class Valves(BaseModel):
ollama_url: str = "http://localhost:11434"
moderation_model: str = "llama3.2:3b"
block_message: str = "Your message was flagged and could not be processed."
def __init__(self):
self.name = "Content Moderator"
self.valves = self.Valves()
async def inlet(self, body: dict, user: Optional[dict] = None) -> dict:
messages = body.get("messages", [])
last = next((m["content"] for m in reversed(messages) if m["role"] == "user"), "")
if not last:
return body
check_prompt = f"""Is the following message harmful, abusive, or inappropriate? Answer only YES or NO.
Message: {last[:500]}"""
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.post(
f"{self.valves.ollama_url}/api/chat",
json={
"model": self.valves.moderation_model,
"messages": [{"role": "user", "content": check_prompt}],
"stream": False
}
)
verdict = resp.json()["message"]["content"].strip().upper()
if "YES" in verdict:
raise HTTPException(status_code=400, detail=self.valves.block_message)
return bodyUsing a small 3B model for moderation keeps the latency overhead low — typically under a second on a machine where the model is already loaded. The 15-second timeout is intentionally short; if the moderation check fails or times out, the request passes through rather than blocking legitimate users. For stricter enforcement, catch the timeout exception and block by default instead.
A Conversation Logger Pipeline
Log every conversation to a JSONL file for later analysis, fine-tuning data collection, or audit trails. This outlet filter appends each completed exchange after the model responds:
import json, pathlib, time
from datetime import datetime
class Pipeline:
class Valves(BaseModel):
log_file: str = "/app/pipelines/conversations.jsonl"
def __init__(self):
self.name = "Conversation Logger"
self.valves = self.Valves()
async def outlet(self, body: dict, user: Optional[dict] = None) -> dict:
try:
messages = body.get("messages", [])
log_path = pathlib.Path(self.valves.log_file)
log_path.parent.mkdir(parents=True, exist_ok=True)
entry = {
"timestamp": datetime.utcnow().isoformat(),
"user_id": user.get("id") if user else None,
"user_name": user.get("name") if user else None,
"model": body.get("model"),
"messages": messages,
}
with open(log_path, "a") as f:
f.write(json.dumps(entry) + "
")
except Exception as e:
print(f"Logger error: {e}")
return bodyThe logger uses JSONL (one JSON object per line) format, which is easy to process with tools like jq, Python’s standard json module, or data processing libraries like Pandas. The try/except wrapper ensures a logging failure never breaks the conversation — the user always gets their response even if the log write fails. Mount the pipelines directory as a volume so the log file persists outside the container and survives restarts.
Combining Multiple Pipelines
Open WebUI lets you enable multiple filter pipelines simultaneously on the same model, applying them in priority order. You can stack the rate limiter, content moderator, system prompt injector, and conversation logger on a single Ollama model, and each filter runs in sequence on every request. This composable design means you build each capability in isolation — easier to test, easier to maintain — and combine them as needed per model or per user group.
Set the priority of each filter in the Open WebUI pipeline settings. Lower priority numbers run first in the inlet (before the model) and last in the outlet (after the model), so a priority 0 rate limiter blocks requests before the priority 10 content moderator even runs. This ordering matters: you want to fail fast on rate limit violations before spending compute on moderation checks, and you want moderation to run before system prompt injection to avoid wasting context window on blocked requests.
Pipelines are one of the features that make Open WebUI genuinely powerful for team deployments rather than just a prettier interface to Ollama. With a few hundred lines of Python spread across three or four pipeline files, you can add enterprise-grade capabilities — audit logging, content moderation, user rate limiting, automatic RAG retrieval, intelligent model routing — to any Ollama model with no changes to the underlying model setup and no modifications to Open WebUI itself.
Getting Started with Your First Pipeline
The best way to start with Pipelines is to take the System Prompt Injector example above, save it as a file in your pipelines directory, and verify it appears in Open WebUI. From there, add a print statement to the inlet function and check the container logs to confirm it fires on each message. Once you have that feedback loop working — edit the file, watch the logs, send a test message — building more complex logic is straightforward. The Pipelines GitHub repository includes a growing collection of community-contributed examples covering web search integration, image generation routing, external database lookups, and multi-model comparison, all of which follow the same class structure and valve pattern shown in this guide. Reading a few of those examples alongside your own experimentation is the fastest path to understanding what Pipelines can do and building the specific capabilities your Open WebUI deployment needs.
Pipelines transform Open WebUI from a chat interface into a programmable AI platform. Every team that runs Open WebUI for more than a handful of users will eventually hit a requirement that Pipelines can solve cleanly — whether that is controlling costs through rate limiting, ensuring compliance through conversation logging, improving answer quality through RAG retrieval, or optimising performance through intelligent model routing. The Python class structure is simple enough to get started in an afternoon, and the valve system makes your pipelines configurable by non-developers through the Open WebUI interface without any code changes required.