Local LLM applications face unique challenges when tasks extend beyond simple queries and responses. Analyzing hundreds of documents, generating comprehensive reports, processing entire codebases, or conducting multi-hour research requires architectures fundamentally different from chat interfaces. These long-running tasks introduce concerns about reliability, progress tracking, resource management, and graceful failure handling that quick queries never encounter. A system that works perfectly for 10-second interactions often fails catastrophically when a task takes 10 hours.
The complexity emerges from multiple fronts simultaneously. Hardware can fail mid-task. Memory usage accumulates over thousands of operations. Users need visibility into progress for tasks taking hours. Partial results must be preserved when failures inevitably occur. These aren’t minor implementation details—they’re architectural requirements that must be designed in from the start. Understanding how to build robust long-running task systems separates demos from production-ready local LLM applications.
Understanding Long-Running Task Characteristics
Long-running tasks differ qualitatively from interactive queries in ways that demand different architectural approaches.
What Makes a Task “Long-Running”
Duration defines the boundary. Tasks taking minutes to hours rather than seconds cross into long-running territory. A 3-second chat response is interactive. A 3-minute document analysis is transitional. A 3-hour codebase audit is definitively long-running.
The practical implications:
- User can’t wait for completion in a single session
- System must handle interruptions (user closes browser, network drops, computer sleeps)
- Resources must be managed carefully to avoid exhaustion
- Progress visibility becomes essential for user confidence
- Failure recovery must preserve work rather than restarting
Examples of long-running tasks:
- Processing 500+ documents for analysis or summarization
- Generating comprehensive technical reports with research
- Code review across entire repositories
- Multi-step research requiring iteration and synthesis
- Content generation for books, courses, or extensive documentation
- Data pipeline processing with LLM components
Resource Consumption Patterns
Long tasks consume resources differently than quick queries. A single 3-second query uses 3 seconds of GPU time. A 3-hour task uses 3 hours continuously—a 3,600x difference. This creates distinct challenges.
GPU occupancy: The GPU remains occupied for the entire duration, preventing other work. For single-user systems, this might be acceptable. For shared systems, it requires sophisticated scheduling or dedicated task workers.
Memory accumulation: Each operation allocates memory—tokenization buffers, intermediate results, cached states. Over thousands of operations, small leaks become system-killing memory exhaustion. Tasks that would run fine for 10 iterations crash at iteration 1,000.
Thermal management: Consumer GPUs running at full load for hours hit thermal limits, triggering throttling that degrades performance. A task might start at 50 tokens/second but drop to 30 tokens/second after 2 hours of continuous operation.
Task State Management
The foundation of reliable long-running systems is proper state management enabling resume from interruption.
Checkpoint Architecture
Regular checkpointing saves progress at intervals, enabling recovery without starting over. The checkpoint frequency balances safety against overhead.
Checkpoint design considerations:
class TaskCheckpoint:
def __init__(self, task_id):
self.task_id = task_id
self.checkpoint_dir = f"checkpoints/{task_id}"
os.makedirs(self.checkpoint_dir, exist_ok=True)
def save(self, progress_data):
checkpoint = {
'timestamp': time.time(),
'current_step': progress_data['step'],
'total_steps': progress_data['total'],
'completed_items': progress_data['completed'],
'partial_results': progress_data['results'],
'state': progress_data['state']
}
path = f"{self.checkpoint_dir}/checkpoint_{progress_data['step']}.json"
with open(path, 'w') as f:
json.dump(checkpoint, f)
# Keep only recent checkpoints to save space
self._cleanup_old_checkpoints(keep=5)
def load_latest(self):
checkpoints = sorted(glob.glob(f"{self.checkpoint_dir}/checkpoint_*.json"))
if not checkpoints:
return None
with open(checkpoints[-1], 'r') as f:
return json.load(f)
What to checkpoint:
- Current position in the task (document 47 of 200)
- Intermediate results accumulated so far
- State information needed to continue (conversation context, gathered facts)
- Timestamps for progress estimation
- Error counts and retry states
Checkpoint frequency trade-offs: Checkpoint every item for maximum safety but high overhead. Checkpoint every 10-50 items for balance. Checkpoint only at major milestones for minimal overhead but greater loss on failure.
Resume Logic
Detecting resumable tasks requires checking for existing checkpoints on task start:
async def process_documents(task_id, documents):
checkpoint_manager = TaskCheckpoint(task_id)
checkpoint = checkpoint_manager.load_latest()
if checkpoint:
print(f"Resuming from step {checkpoint['current_step']}")
start_index = checkpoint['current_step']
results = checkpoint['partial_results']
else:
print("Starting new task")
start_index = 0
results = []
for i in range(start_index, len(documents)):
try:
result = await process_single_document(documents[i])
results.append(result)
# Checkpoint every 10 documents
if (i + 1) % 10 == 0:
checkpoint_manager.save({
'step': i + 1,
'total': len(documents),
'completed': results,
'results': results,
'state': {}
})
except Exception as e:
logger.error(f"Failed on document {i}: {e}")
# Save checkpoint before potentially retrying
checkpoint_manager.save({
'step': i,
'total': len(documents),
'completed': results,
'results': results,
'state': {'last_error': str(e)}
})
raise
return results
Resume considerations:
- Verify checkpoint integrity before trusting it
- Handle cases where checkpointed state is corrupted
- Provide user visibility into resume vs. restart decisions
- Consider whether partial results remain valid after time elapses
Idempotency for Reliability
Idempotent operations can be safely retried without causing duplicate effects or corrupted state. This property is crucial for long-running tasks.
Designing for idempotency:
- Use deterministic IDs for output files (based on input, not random generation)
- Check for existing results before processing items
- Use atomic file operations (write to temp, then rename)
- Make database operations idempotent (upserts instead of inserts)
Example pattern:
async def process_document_idempotent(doc_id, document):
result_path = f"results/{doc_id}.json"
# Skip if already processed
if os.path.exists(result_path):
with open(result_path, 'r') as f:
return json.load(f)
# Process document
result = await analyze_document(document)
# Atomic write: temp file then rename
temp_path = f"{result_path}.tmp"
with open(temp_path, 'w') as f:
json.dump(result, f)
os.rename(temp_path, result_path)
return result
This design ensures that if the process crashes after processing but before checkpointing, resuming will skip already-completed items rather than processing them again.
Long-Running Task Design Principles
Progress Tracking and User Feedback
For tasks taking hours, users need confidence the system is working and visibility into completion timing.
Progress Calculation
Simple progress tracking counts completed vs. total items:
class ProgressTracker:
def __init__(self, total_items):
self.total = total_items
self.completed = 0
self.start_time = time.time()
self.errors = 0
def update(self, increment=1, error=False):
self.completed += increment
if error:
self.errors += increment
return self.get_status()
def get_status(self):
percent = (self.completed / self.total) * 100
elapsed = time.time() - self.start_time
if self.completed > 0:
time_per_item = elapsed / self.completed
remaining_items = self.total - self.completed
eta_seconds = time_per_item * remaining_items
eta = datetime.now() + timedelta(seconds=eta_seconds)
else:
eta = None
return {
'percent': percent,
'completed': self.completed,
'total': self.total,
'errors': self.errors,
'elapsed': elapsed,
'eta': eta.isoformat() if eta else None
}
Advanced progress estimation considers varying item complexity. Some documents take 10 seconds, others 60 seconds. Simple counting-based ETA becomes inaccurate.
Weighted progress assigns complexity estimates:
class WeightedProgressTracker:
def __init__(self, items_with_weights):
self.items = items_with_weights
self.total_weight = sum(w for _, w in items_with_weights)
self.completed_weight = 0
def complete_item(self, item_index):
weight = self.items[item_index][1]
self.completed_weight += weight
return {
'percent': (self.completed_weight / self.total_weight) * 100,
'completed': item_index + 1,
'total': len(self.items)
}
Real-Time Updates
WebSocket connections enable pushing progress updates to user interfaces without polling:
from fastapi import WebSocket
class TaskManager:
def __init__(self):
self.active_tasks = {}
self.websockets = {}
async def run_task(self, task_id, documents, websocket):
self.websockets[task_id] = websocket
progress = ProgressTracker(len(documents))
for i, doc in enumerate(documents):
result = await process_document(doc)
status = progress.update()
# Push update to connected client
await websocket.send_json({
'type': 'progress',
'data': status
})
await websocket.send_json({
'type': 'complete',
'data': {'status': 'success'}
})
Polling alternative for simpler setups:
# Server endpoint
@app.get("/task/{task_id}/status")
async def get_task_status(task_id: str):
if task_id not in task_registry:
raise HTTPException(404, "Task not found")
task = task_registry[task_id]
return task.get_progress()
# Client polls every 2 seconds
async function pollProgress(taskId) {
while (true) {
const response = await fetch(`/task/${taskId}/status`);
const progress = await response.json();
updateUI(progress);
if (progress.status === 'complete' || progress.status === 'failed') {
break;
}
await sleep(2000);
}
}
Status Persistence
Store task status in database for tasks that outlive server restarts:
class TaskStatus:
PENDING = "pending"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
class TaskRegistry:
def __init__(self, db_path):
self.db = sqlite3.connect(db_path)
self._init_tables()
def update_status(self, task_id, status, progress=None, error=None):
self.db.execute("""
UPDATE tasks
SET status = ?, progress = ?, error = ?, updated_at = ?
WHERE task_id = ?
""", (status, json.dumps(progress), error, time.time(), task_id))
self.db.commit()
def get_status(self, task_id):
cursor = self.db.execute("""
SELECT status, progress, error, created_at, updated_at
FROM tasks WHERE task_id = ?
""", (task_id,))
row = cursor.fetchone()
if not row:
return None
return {
'status': row[0],
'progress': json.loads(row[1]) if row[1] else None,
'error': row[2],
'created_at': row[3],
'updated_at': row[4]
}
Error Handling and Recovery
Long-running tasks encounter many failure modes. Robust systems handle these gracefully.
Partial Failure Strategies
Item-level failures shouldn’t kill entire tasks. If document 47 of 200 fails to process, the system should log the error, skip that document, and continue with document 48.
Failure tracking:
class ResilientTaskRunner:
def __init__(self, max_retries=3):
self.max_retries = max_retries
self.failures = []
async def process_with_retry(self, item, process_func):
for attempt in range(self.max_retries):
try:
return await process_func(item)
except Exception as e:
logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt == self.max_retries - 1:
# Final attempt failed, record and continue
self.failures.append({
'item': item,
'error': str(e),
'attempts': self.max_retries
})
return None
await asyncio.sleep(2 ** attempt) # Exponential backoff
async def run_batch(self, items, process_func):
results = []
for item in items:
result = await self.process_with_retry(item, process_func)
results.append(result)
return {
'results': results,
'failures': self.failures,
'success_rate': (len(results) - len(self.failures)) / len(results)
}
Reporting failures to users provides transparency:
def generate_failure_report(failures):
if not failures:
return "All items processed successfully."
report = f"Completed with {len(failures)} failures:\n\n"
for i, failure in enumerate(failures[:10], 1): # Show first 10
report += f"{i}. Item: {failure['item']}\n"
report += f" Error: {failure['error']}\n\n"
if len(failures) > 10:
report += f"... and {len(failures) - 10} more failures"
return report
Timeout Management
Individual operations need timeouts to prevent hanging indefinitely:
async def process_with_timeout(item, timeout_seconds=300):
try:
result = await asyncio.wait_for(
process_item(item),
timeout=timeout_seconds
)
return result
except asyncio.TimeoutError:
logger.error(f"Item {item} timed out after {timeout_seconds}s")
return None
Total task timeouts prevent runaway tasks:
async def run_task_with_limit(task_func, max_duration_hours=8):
max_seconds = max_duration_hours * 3600
start = time.time()
async def monitor():
while True:
if time.time() - start > max_seconds:
raise TimeoutError(f"Task exceeded {max_duration_hours} hour limit")
await asyncio.sleep(60)
monitor_task = asyncio.create_task(monitor())
try:
result = await task_func()
monitor_task.cancel()
return result
except TimeoutError:
logger.error("Task killed due to time limit")
raise
Resource Management for Extended Operation
Hours-long tasks require careful resource management to avoid exhaustion or hardware damage.
Memory Leak Prevention
Python’s garbage collection doesn’t always clean up promptly. Long-running tasks accumulate memory over thousands of iterations.
Explicit cleanup strategies:
import gc
async def process_documents_cleanly(documents):
results = []
for i, doc in enumerate(documents):
result = await process_document(doc)
results.append(result)
# Explicit cleanup every 50 documents
if (i + 1) % 50 == 0:
gc.collect()
# Log memory usage
import psutil
process = psutil.Process()
memory_mb = process.memory_info().rss / 1024 / 1024
logger.info(f"Memory usage after {i+1} docs: {memory_mb:.1f} MB")
return results
Detecting memory leaks through monitoring:
class MemoryMonitor:
def __init__(self, threshold_mb=8000, check_interval=50):
self.threshold = threshold_mb
self.interval = check_interval
self.baseline = None
def check(self, iteration):
if iteration % self.interval != 0:
return True
import psutil
memory_mb = psutil.Process().memory_info().rss / 1024 / 1024
if self.baseline is None:
self.baseline = memory_mb
growth = memory_mb - self.baseline
if memory_mb > self.threshold:
logger.error(f"Memory exceeded threshold: {memory_mb:.1f}MB")
return False
if growth > 1000: # 1GB growth
logger.warning(f"Possible memory leak: {growth:.1f}MB growth")
return True
Thermal Management
Continuous GPU load generates heat. Consumer GPUs not designed for sustained workloads may throttle or shut down.
Temperature monitoring:
import pynvml
class ThermalManager:
def __init__(self, max_temp=80, cool_down_temp=70):
pynvml.nvmlInit()
self.handle = pynvml.nvmlDeviceGetHandleByIndex(0)
self.max_temp = max_temp
self.cool_down_temp = cool_down_temp
def get_temperature(self):
return pynvml.nvmlDeviceGetTemperature(self.handle, 0)
async def thermal_pause_if_needed(self):
temp = self.get_temperature()
if temp >= self.max_temp:
logger.warning(f"GPU temp {temp}°C exceeds limit, pausing")
while True:
await asyncio.sleep(30)
temp = self.get_temperature()
if temp <= self.cool_down_temp:
logger.info(f"GPU cooled to {temp}°C, resuming")
break
async def process_with_thermal_management(self, items, process_func):
for i, item in enumerate(items):
result = await process_func(item)
# Check temperature every 10 items
if (i + 1) % 10 == 0:
await self.thermal_pause_if_needed()
yield result
Rate Limiting and Throttling
For tasks calling external APIs (even local ones), rate limiting prevents overwhelming services:
class RateLimiter:
def __init__(self, max_per_minute=60):
self.max_per_minute = max_per_minute
self.calls = []
async def acquire(self):
now = time.time()
# Remove calls older than 1 minute
self.calls = [t for t in self.calls if now - t < 60]
if len(self.calls) >= self.max_per_minute:
# Wait until oldest call ages out
wait_time = 60 - (now - self.calls[0])
logger.info(f"Rate limit reached, waiting {wait_time:.1f}s")
await asyncio.sleep(wait_time)
self.calls = self.calls[1:]
self.calls.append(now)
# Usage
rate_limiter = RateLimiter(max_per_minute=30)
async def process_with_rate_limit(item):
await rate_limiter.acquire()
return await process_item(item)
Task Execution Architecture
Task Prioritization and Scheduling
When running multiple long tasks, intelligent scheduling improves overall system performance.
Background Task Queues
Separate interactive and background work:
class TaskScheduler:
def __init__(self):
self.interactive_queue = Queue()
self.background_queue = Queue()
async def submit_task(self, task, priority='background'):
task_id = str(uuid.uuid4())
if priority == 'interactive':
self.interactive_queue.put((task_id, task))
else:
self.background_queue.put((task_id, task))
return task_id
async def worker(self):
while True:
# Check interactive queue first
if not self.interactive_queue.empty():
task_id, task = self.interactive_queue.get()
elif not self.background_queue.empty():
task_id, task = self.background_queue.get()
else:
await asyncio.sleep(1)
continue
await self.execute_task(task_id, task)
Pause and Resume Capabilities
Users should be able to pause long tasks:
class PausableTask:
def __init__(self, task_id):
self.task_id = task_id
self.paused = False
self.cancelled = False
async def pause(self):
self.paused = True
logger.info(f"Task {self.task_id} paused")
async def resume(self):
self.paused = False
logger.info(f"Task {self.task_id} resumed")
async def cancel(self):
self.cancelled = True
logger.info(f"Task {self.task_id} cancelled")
async def run(self, items, process_func):
for item in items:
# Check pause state
while self.paused and not self.cancelled:
await asyncio.sleep(1)
if self.cancelled:
logger.info("Task cancelled, cleaning up")
break
await process_func(item)
Practical Implementation Patterns
Combining these concepts into working systems requires thoughtful architecture.
Worker Pool Pattern
Dedicated worker processes handle long tasks separately from the web server:
# FastAPI server
@app.post("/tasks/analyze-documents")
async def create_analysis_task(files: List[UploadFile]):
task_id = str(uuid.uuid4())
# Save files for worker to access
save_uploaded_files(task_id, files)
# Queue task for background worker
await task_queue.enqueue({
'task_id': task_id,
'type': 'document_analysis',
'file_count': len(files)
})
return {'task_id': task_id, 'status': 'queued'}
# Separate worker process
class Worker:
async def run(self):
while True:
task = await task_queue.dequeue()
await self.execute_task(task)
async def execute_task(self, task):
tracker = ProgressTracker(task['file_count'])
checkpoint = TaskCheckpoint(task['task_id'])
# Resume from checkpoint if exists
# Process items with error handling
# Save progress regularly
# Update status in database
Notification Systems
Alert users when long tasks complete:
async def notify_completion(user_id, task_id, results):
# Email notification
await send_email(
to=get_user_email(user_id),
subject=f"Task {task_id} completed",
body=f"Your analysis is complete. {results['summary']}"
)
# WebSocket notification if user is connected
if user_id in active_connections:
await active_connections[user_id].send_json({
'type': 'task_complete',
'task_id': task_id,
'results': results
})
Conclusion
Designing local LLM systems for long-running tasks requires fundamentally different architectures than interactive applications. The essential components—regular checkpointing, progress visibility, graceful error handling, resource monitoring, and thermal management—aren’t optional niceties but requirements for reliability. Tasks taking hours expose every architectural weakness that quick queries hide: memory leaks, thermal throttling, partial failures, and interruption handling all become critical concerns demanding careful design.
The successful patterns share common themes: assume failures will occur and design for recovery, provide users visibility into progress and control over execution, manage resources explicitly rather than hoping they’re sufficient, and separate long-running work from interactive interfaces through background workers. These principles transform fragile demos that work under ideal conditions into robust systems that reliably complete multi-hour tasks despite the inevitable interruptions, errors, and resource constraints of real-world deployment. Build for the worst case and you’ll handle the average case gracefully.