Optimizing Parquet Schemas for ML Training Performance

Machine learning training on large datasets has become the bottleneck in modern ML workflows. While practitioners obsess over model architecture and hyperparameters, they often overlook a fundamental performance constraint: how quickly training data can be read from disk and fed into GPUs or CPUs. When training models on terabytes of data stored in Parquet files, I/O performance frequently becomes the limiting factor. GPUs sit idle waiting for data, training epochs that could take hours stretch into days, and iteration cycles slow to a crawl.

Parquet’s columnar storage format is designed for analytical workloads, but ML training has distinct access patterns that require specific schema optimizations. The way you structure your Parquet schema—column types, encoding, compression, nesting, and physical layout—determines whether your training pipeline reads data at 5 GB/s or 500 MB/s. A 10x throughput difference translates directly into 10x faster training. This article explores practical strategies for optimizing Parquet schemas specifically for ML training workloads, focusing on the decisions that have the most significant impact on data loading performance.

Understanding Parquet’s Columnar Architecture

Before optimizing, we need to understand how Parquet stores and retrieves data, particularly how its design affects ML training access patterns.

Columnar Storage Fundamentals:

Unlike row-oriented formats (CSV, JSON) that store complete records sequentially, Parquet stores data by column. All values for a single column are stored together, compressed as a unit. This design excels at analytical queries that read a few columns across many rows—exactly the pattern in SQL aggregations like “SELECT AVG(price) FROM sales.”

For ML training, the access pattern differs: you need all features (columns) for some rows (a training batch). This mixed pattern means Parquet’s columnar advantage is less pronounced for ML than for analytics. However, with proper schema design, Parquet still significantly outperforms row-oriented formats.

The key insight: ML training reads entire row groups but can skip columns entirely if they’re not needed. Schema optimization focuses on maximizing the efficiency of reading all columns in a row group while minimizing overhead from unused columns and ensuring compression doesn’t create bottlenecks.

Row Groups and Column Chunks:

Parquet organizes data hierarchically:

  • File: Top level, contains metadata and row groups
  • Row Group: Typically 64-128 MB of data, contains column chunks
  • Column Chunk: All values for one column within one row group
  • Data Page: Within a column chunk, the actual compressed data pages (typically 1 MB)

When training, you read row groups sequentially. Each row group requires reading all column chunks needed for features. Row group size determines I/O granularity—larger row groups mean fewer I/O operations but larger memory buffers.

Encoding and Compression Layers:

Parquet applies two levels of data reduction:

  1. Encoding: Data-type-specific transformations (dictionary encoding for low-cardinality strings, delta encoding for timestamps)
  2. Compression: General-purpose algorithms (Snappy, GZIP, Zstandard) applied to encoded data

These layers interact. Dictionary encoding can reduce a string column to integer indices, making compression more effective. But encoding/decoding adds CPU overhead during reads. The tradeoff between compression ratio and decompression speed critically affects training throughput.

Column Type Selection and Schema Design

The physical schema—how columns are typed and structured—has profound impacts on read performance.

Use Primitive Types Where Possible:

Parquet supports nested types (structs, arrays, maps), but nested structures add significant overhead during reads. Each nesting level requires additional metadata parsing and memory allocation.

For ML features, flatten nested structures into separate columns when possible:

# Inefficient nested structure
schema = pa.schema([
    ('user_id', pa.int64()),
    ('features', pa.struct([
        ('age', pa.int32()),
        ('income', pa.float64()),
        ('click_history', pa.list_(pa.int32()))
    ]))
])

# Better: flattened structure
schema = pa.schema([
    ('user_id', pa.int64()),
    ('age', pa.int32()),
    ('income', pa.float64()),
    ('click_count', pa.int32()),  # aggregate instead of list
    ('recent_clicks', pa.string())  # or serialize as string if needed
])

Flattening trades some semantic structure for faster reads. For training, where you’re materializing all features into dense arrays anyway, this tradeoff is almost always worthwhile.

Appropriate Numeric Precision:

Using more precise numeric types than necessary wastes space and slows reads. If your feature values are integers in the range 0-100, use int8 not int64. If floats don’t need double precision, use float32 not float64.

Common precision optimizations:

  • Categorical IDs: Use smallest integer type that fits (int8 for <256 categories, int16 for <65K)
  • Binary features: Use boolean or int8, not int64
  • Normalized features: float32 provides sufficient precision for [-1, 1] normalized features
  • Counts and IDs: Use appropriate integer size based on actual range

Precision reduction has multiple benefits: smaller files (better I/O), less memory during reads, faster compression/decompression. A training set with 100 features unnecessarily using float64 instead of float32 is 2x larger than necessary.

String Handling Strategy:

Strings are expensive in Parquet. Each string requires storing length metadata plus the actual bytes. For ML features, strings usually appear as categorical variables or text features.

For categorical features with low cardinality (<100K unique values), Parquet’s dictionary encoding is effective—it stores a dictionary of unique strings once per row group and references them by integer IDs. But dictionary encoding has overhead during reads as values must be looked up.

Better: pre-encode categorical strings to integers before writing Parquet. Store the integer encoding directly, moving the string-to-integer mapping to a separate lightweight metadata file. This eliminates dictionary lookup during training:

# Pre-encode categories
category_mapping = {'cat': 0, 'dog': 1, 'bird': 2}
df['pet_type_encoded'] = df['pet_type'].map(category_mapping)

# Write only the integer encoding
df[['user_id', 'pet_type_encoded', 'age']].to_parquet('training_data.parquet')

# Save mapping separately for inference
json.dump(category_mapping, open('pet_type_mapping.json', 'w'))

For text features (product descriptions, user reviews), store them as strings if you must, but consider pre-computing embeddings and storing those instead. A 768-dimensional float32 embedding array is faster to read than variable-length text that needs tokenization and embedding during training.

📐 Schema Design Principles

Optimize for Read Patterns:
• Flatten nested structures → Reduce parsing overhead
• Use minimum precision types → Smaller files, faster I/O
• Pre-encode categoricals → Eliminate dictionary lookups
• Separate features from labels → Enable column pruning
• Group related features → Improve cache locality

Typical Improvements:
• Flattening nested schemas: 2-5x faster reads
• Appropriate precision: 1.5-2x reduction in size/time
• Pre-encoded categoricals: 1.5-3x faster categorical handling

Compression Strategy for Training Workloads

Compression affects both file size and read throughput. The optimal choice depends on whether you’re I/O bound or CPU bound during training.

Compression Algorithm Selection:

Parquet supports several compression codecs with different tradeoffs:

Snappy: Fast decompression (~500 MB/s), moderate compression (2-3x). Default in many tools. Best when CPU is the bottleneck—you want decompression to be as fast as possible even if files are larger.

GZIP: Slower decompression (~200 MB/s), better compression (3-5x). Better when I/O is the bottleneck—smaller files mean less data to read from disk, compensating for slower decompression.

Zstandard (ZSTD): Configurable levels balancing speed and compression. At level 3-5, approaches Snappy’s speed with better compression. Modern best-in-class option.

LZ4: Fastest decompression (~1 GB/s), light compression (2x). Good for very fast local NVMe storage where decompression CPU is the constraint.

Uncompressed: Fastest reads but largest files. Only appropriate for extremely fast storage (local NVMe SSDs) and when storage cost is irrelevant.

Choosing Based on Storage Type:

The optimal compression depends on your storage infrastructure:

  • Cloud object storage (S3, GCS, Azure Blob): Use ZSTD or GZIP. Network bandwidth limits reads, so good compression reduces transfer time more than decompression costs.
  • Network-attached storage: Use ZSTD level 3-5. Balance between reducing network transfer and decompression overhead.
  • Local NVMe SSDs: Use Snappy or LZ4. Storage bandwidth is high (3-7 GB/s), so fast decompression matters more than compression ratio.
  • Spinning disks: Use GZIP or ZSTD. Disk I/O is slow (~150 MB/s), so aggressive compression improves throughput despite decompression cost.

Column-Level Compression Decisions:

Not all columns compress equally or benefit equally from compression. Consider per-column compression strategies:

import pyarrow as pa
import pyarrow.parquet as pq

# Define per-column compression
compression = {
    'user_id': 'uncompressed',  # High cardinality, poor compression
    'timestamp': 'zstd',  # Timestamps compress well
    'category': 'zstd',  # Low cardinality, excellent compression
    'embeddings': 'lz4',  # Float arrays, prioritize speed
    'raw_text': 'gzip'  # Text compresses very well, worth the CPU
}

# Write with column-specific compression
pq.write_table(
    table, 
    'training_data.parquet',
    compression=compression,
    use_dictionary=True
)

High-cardinality identifier columns (user IDs, transaction IDs) compress poorly and are often not features—leave them uncompressed. Embedding vectors and normalized floats compress moderately and are read-heavy—use fast compression. Text compresses extremely well—use aggressive compression if you must store it.

Measuring Actual Performance:

Don’t rely on theoretical compression ratios. Profile your actual training pipeline:

import time
import pyarrow.parquet as pq

# Benchmark read performance with different compressions
compressions = ['snappy', 'gzip', 'zstd', 'lz4']

for codec in compressions:
    # Write with this compression
    pq.write_table(table, f'test_{codec}.parquet', compression=codec)
    
    # Time reading
    start = time.time()
    for _ in range(5):  # Multiple passes to average
        df = pq.read_table(f'test_{codec}.parquet').to_pandas()
    elapsed = (time.time() - start) / 5
    
    file_size = os.path.getsize(f'test_{codec}.parquet') / (1024**3)
    print(f"{codec}: {elapsed:.2f}s, {file_size:.2f} GB, {file_size/elapsed:.2f} GB/s")

Use the codec that maximizes throughput (GB/s), not the one with the best compression ratio. A 2 GB file that reads in 1 second is better than a 1 GB file that reads in 1.5 seconds.

Row Group Sizing and Partitioning

Row group size determines I/O granularity and memory usage during training. Too small creates excessive I/O operations; too large increases memory pressure and reduces parallelism.

Optimal Row Group Size:

The default row group size is often 64-128 MB. For ML training, larger row groups (256-512 MB) often perform better because:

  • Fewer I/O operations per file
  • Better compression efficiency (more data to find patterns)
  • Reduced metadata overhead (fewer row group metadata blocks)

However, very large row groups (>1 GB) can cause problems:

  • Each row group must fit in memory during reading
  • Reduced parallelism when reading with multiple threads
  • Longer time before first batch is available

A good guideline: row groups should be 2-5x your typical training batch size in bytes. If training with 1024 samples and each sample is 100 KB, that’s ~100 MB per batch. Row groups of 256-512 MB allow reading 2-5 batches per row group, amortizing I/O overhead.

File-Level Partitioning:

Beyond row groups within files, how you partition data across multiple files affects performance:

# Write partitioned by date for temporal training
table.to_parquet(
    'training_data/',
    partition_cols=['date'],
    row_group_size=256 * 1024 * 1024  # 256 MB
)

# Results in structure:
# training_data/date=2024-01-01/part-0.parquet
# training_data/date=2024-01-02/part-0.parquet
# ...

For ML training, partitioning helps when:

  • Training on time-based splits (train on Jan-Nov, validate on Dec)
  • Distributed training where each worker reads different partitions
  • Iterative training where you process data in chunks

Avoid over-partitioning. Thousands of tiny files create metadata overhead. Aim for files in the 256 MB – 2 GB range. If your dataset is 1 TB, that’s 500-4000 files—manageable. If it’s 100,000 files of 10 MB each, you’ll spend more time in file opens than reading data.

Column Ordering and Layout Optimization

The physical order of columns in Parquet affects cache performance and read efficiency.

Group Related Features:

Columns stored adjacent in the Parquet file benefit from CPU cache locality when read together. Group features that are always accessed together:

# Good column ordering
columns = [
    # Target variable (often read separately for labels)
    'label',
    
    # Dense numerical features (read together as feature matrix)
    'age', 'income', 'credit_score', 'account_balance',
    
    # Categorical features (may be processed differently)
    'country_code', 'subscription_tier', 'device_type',
    
    # Embeddings (large, might be in separate tensor)
    'user_embedding_0', 'user_embedding_1', ..., 'user_embedding_767',
    
    # Metadata (rarely needed during training)
    'user_id', 'timestamp', 'session_id'
]

df[columns].to_parquet('training_data.parquet')

This ordering enables optimizations like reading only the label column when constructing target arrays, or skipping metadata columns entirely during training. Parquet’s columnar design means you only pay for columns you access, but column ordering affects cache efficiency when you do access multiple columns.

Separate Features and Labels:

If possible, store features and labels in separate Parquet files or separate row groups. This enables:

  • Reading features only during inference (no label columns)
  • Different compression for labels (often categorical, compresses well)
  • Shuffling features and labels independently during training
# Separate files approach
features_df.to_parquet('features.parquet', row_group_size=256*1024*1024)
labels_df.to_parquet('labels.parquet', row_group_size=256*1024*1024)

# During training
features = pq.read_table('features.parquet').to_pandas()
labels = pq.read_table('labels.parquet').to_pandas()

The downside: you must maintain alignment between feature and label files. If shuffling or filtering data, both files must be modified consistently.

Encoding Strategies for Common Feature Types

Parquet’s encoding schemes optimize storage for different data patterns. Choosing appropriate encodings improves both compression and read speed.

Dictionary Encoding for Categorical Features:

Parquet automatically applies dictionary encoding to columns with low cardinality. But you can control this:

# Force dictionary encoding for categorical columns
table = pa.table(df)
writer = pq.ParquetWriter(
    'output.parquet',
    schema=table.schema,
    use_dictionary=['category_col_1', 'category_col_2'],
    compression='zstd'
)
writer.write_table(table)
writer.close()

Dictionary encoding is most effective when:

  • Cardinality is <10,000 unique values
  • Values are repeated across the column
  • String values would otherwise be stored redundantly

Be cautious with high-cardinality columns (e.g., user IDs). Dictionary encoding can actually increase file size if there are too many unique values, as the dictionary itself becomes large.

Delta Encoding for Sequential Values:

Timestamps, sequential IDs, and monotonic values benefit from delta encoding, which stores differences between consecutive values instead of absolute values:

# Timestamps naturally benefit from delta encoding
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.to_parquet('data.parquet')  # Parquet auto-applies delta encoding to timestamps

If you have sequential integer IDs (1, 2, 3, 4…), delta encoding stores them as (1, +1, +1, +1…), which compresses extremely well.

Run-Length Encoding for Repeated Values:

Columns with many consecutive repeated values (sorted categorical columns, boolean flags) benefit from run-length encoding. Parquet applies this automatically to appropriate columns, but sorting data by columns with low cardinality before writing can improve RLE effectiveness:

# Sort by categorical column to improve RLE
df_sorted = df.sort_values(['country', 'device_type', 'timestamp'])
df_sorted.to_parquet('sorted_data.parquet')

This can dramatically reduce file size for categorical columns, though it may affect the order you want for training. Consider sorting training and validation sets separately.

⚡ Performance Optimization Checklist

Schema Level:
✓ Flatten nested structures
✓ Use minimum precision types (float32, int8/16/32)
✓ Pre-encode categorical strings to integers
✓ Store embeddings instead of raw text

Compression Level:
✓ Match compression to storage (ZSTD for cloud, Snappy for NVMe)
✓ Benchmark actual read throughput
✓ Consider per-column compression strategies

Layout Level:
✓ Use 256-512 MB row groups
✓ Avoid over-partitioning (aim for 100s-1000s of files)
✓ Order columns by access patterns
✓ Consider separate feature/label files

Reading Strategies for Training Pipelines

How you read Parquet files during training matters as much as how you write them.

Batch Reading and Prefetching:

Don’t read entire Parquet files into memory. Use batch reading with prefetching to overlap I/O with computation:

import pyarrow.parquet as pq

class ParquetDataLoader:
    def __init__(self, file_path, batch_size):
        self.parquet_file = pq.ParquetFile(file_path)
        self.batch_size = batch_size
        self.num_row_groups = self.parquet_file.num_row_groups
        
    def __iter__(self):
        for i in range(self.num_row_groups):
            # Read one row group at a time
            table = self.parquet_file.read_row_group(i)
            df = table.to_pandas()
            
            # Yield batches from this row group
            for start_idx in range(0, len(df), self.batch_size):
                batch = df.iloc[start_idx:start_idx + self.batch_size]
                # Convert to tensors
                features = torch.tensor(batch[feature_cols].values, dtype=torch.float32)
                labels = torch.tensor(batch['label'].values, dtype=torch.long)
                yield features, labels

# Usage
loader = ParquetDataLoader('training_data.parquet', batch_size=1024)
for features, labels in loader:
    # Train on batch
    loss = model(features, labels)
    loss.backward()
    optimizer.step()

This approach reads one row group at a time, converts it to training batches, then moves to the next row group. Memory usage stays bounded while maintaining good throughput.

Parallel Reading with Multiple Workers:

For very large datasets, use multiple processes to read different Parquet files in parallel:

from torch.utils.data import DataLoader, Dataset
import pyarrow.parquet as pq
import glob

class ParquetDataset(Dataset):
    def __init__(self, file_pattern):
        self.files = sorted(glob.glob(file_pattern))
        # Compute cumulative row counts for indexing
        self.cum_lengths = [0]
        for f in self.files:
            pf = pq.ParquetFile(f)
            self.cum_lengths.append(self.cum_lengths[-1] + pf.metadata.num_rows)
        
    def __len__(self):
        return self.cum_lengths[-1]
    
    def __getitem__(self, idx):
        # Find which file contains this index
        file_idx = bisect.bisect_right(self.cum_lengths, idx) - 1
        local_idx = idx - self.cum_lengths[file_idx]
        
        # Read specific row (cache file handle in practice)
        table = pq.read_table(self.files[file_idx])
        df = table.to_pandas()
        row = df.iloc[local_idx]
        
        features = torch.tensor(row[feature_cols].values, dtype=torch.float32)
        label = torch.tensor(row['label'], dtype=torch.long)
        return features, label

# Use PyTorch DataLoader with multiple workers
dataset = ParquetDataset('training_data/*.parquet')
loader = DataLoader(dataset, batch_size=1024, num_workers=4, pin_memory=True)

This enables parallel I/O across CPU cores, significantly improving throughput when training on large datasets stored across many files.

Column Pruning:

Only read the columns you need. If your Parquet file contains metadata columns not used for training, explicitly specify columns to read:

# Only read feature columns
columns_to_read = ['feature_1', 'feature_2', ..., 'feature_n', 'label']
table = pq.read_table('training_data.parquet', columns=columns_to_read)

Parquet’s columnar format makes this efficient—you only read the column chunks for specified columns, skipping others entirely. For files with many metadata columns, this can reduce read time by 30-50%.

Validating and Monitoring Read Performance

Optimization requires measurement. Build performance monitoring into your training pipeline to catch regressions.

Profiling Data Loading:

Profile where time is spent during training:

import time

# Measure data loading vs computation
data_time = 0
compute_time = 0

for epoch in range(num_epochs):
    for i, (features, labels) in enumerate(loader):
        # Data loading time measured by DataLoader
        if i == 0:
            batch_start = time.time()
        
        # Move to GPU
        features, labels = features.cuda(), labels.cuda()
        data_time += time.time() - batch_start
        
        # Forward pass
        compute_start = time.time()
        outputs = model(features)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        compute_time += time.time() - compute_start
        
        batch_start = time.time()
    
    print(f"Epoch {epoch}: Data loading: {data_time:.2f}s, Compute: {compute_time:.2f}s")

If data loading time exceeds 20-30% of total time, I/O is your bottleneck. Focus on Parquet optimizations. If compute time dominates, focus on model efficiency instead.

Bandwidth Monitoring:

Monitor actual disk/network bandwidth during training:

# Linux: monitor I/O with iostat
iostat -x 1

# Cloud: monitor network bandwidth
# AWS CloudWatch, GCP Monitoring, Azure Monitor metrics

# Application level: track bytes read
total_bytes = sum(os.path.getsize(f) for f in parquet_files)
training_time = ...  # measured
throughput = total_bytes / training_time / (1024**3)  # GB/s
print(f"Average read throughput: {throughput:.2f} GB/s")

Compare achieved throughput to theoretical maximums (NVMe: 3-7 GB/s, cloud object storage: 0.5-2 GB/s). If you’re far below theoretical maximums, schema/compression optimizations can help. If you’re close to maximums, you need faster storage or more parallelism.

Common Anti-Patterns to Avoid

Several common mistakes undermine Parquet performance for ML training.

Avoiding Over-Normalization:

Some teams normalize data excessively, creating separate Parquet files for dimensions and facts that must be joined during training. This forces expensive joins during data loading:

# Anti-pattern: separate files requiring joins
users = pq.read_table('users.parquet').to_pandas()
transactions = pq.read_table('transactions.parquet').to_pandas()
features = pq.read_table('features.parquet').to_pandas()

# Join during training (!!! slow)
training_data = transactions.merge(users, on='user_id').merge(features, on='user_id')

For ML training, denormalize. Write wide tables with all features pre-joined. Storage is cheap; training time is expensive. Pre-compute joins during data preparation, not during training loops.

Avoiding Tiny Files:

Writing thousands of small Parquet files (e.g., one file per hour for streaming data) creates metadata overhead:

# Anti-pattern: too many small files
for hour in hours:
    data_hour.to_parquet(f'data_{hour}.parquet')  # Creates 8760 files/year

Consolidate into larger files (256 MB – 2 GB each). Use batch writes or periodic compaction jobs to merge small files into larger ones.

Avoiding Unnecessary Nesting:

Don’t nest data just because you can:

# Anti-pattern: unnecessary nesting
schema = pa.schema([
    ('user', pa.struct([
        ('demographics', pa.struct([
            ('age', pa.int32()),
            ('gender', pa.string())
        ])),
        ('behavior', pa.struct([
            ('clicks', pa.int32()),
            ('purchases', pa.int32())
        ]))
    ]))
])

# Better: flat structure
schema = pa.schema([
    ('age', pa.int32()),
    ('gender', pa.string()),
    ('clicks', pa.int32()),
    ('purchases', pa.int32())
])

Nesting adds parsing overhead with no benefit for ML training where you’re materializing all features into flat arrays anyway.

Conclusion

Optimizing Parquet schemas for ML training is about aligning storage layout with read patterns: using minimum precision types to reduce file size, choosing compression algorithms that balance ratio and decompression speed for your storage infrastructure, sizing row groups to match training batch patterns, and flattening nested structures to minimize parsing overhead. These schema decisions have multiplicative effects—proper type selection combined with appropriate compression and row group sizing can improve training throughput by 5-10x compared to default configurations, transforming training pipelines from I/O-bound crawls into GPU-saturated efficient processing.

The key principle underlying all these optimizations is that ML training has distinct characteristics from analytical workloads: you read wide tables (many columns) but only some rows, you access data sequentially in batches, and you care deeply about read latency not just throughput. By designing Parquet schemas specifically for these access patterns—favoring primitives over nesting, aggressive pre-encoding of categoricals, careful compression selection, and appropriate row group sizing—you ensure that GPUs remain the bottleneck during training, not disk I/O or decompression CPU cycles. The investment in thoughtful schema design compounds across every training run, every model iteration, and every experiment, making it one of the highest-leverage optimizations in production ML systems.

Leave a Comment