In the era of Generative AI, the quality and scale of data processing have become more critical than ever. While sophisticated language models and ML algorithms steal the spotlight, the behind-the-scenes work of data preparation remains the unsung hero of successful AI implementations. From cleaning inconsistent formats to transforming raw inputs into structured information, these preparatory steps directly impact model performance and output quality. However, as data volumes grow exponentially, traditional sequential processing approaches quickly become bottlenecks, turning what should be one-time tasks into resource-intensive operations that delay model training and deployment. For organizations working with moderate to large datasets—too small to justify a full Hadoop or Spark implementation, yet too unwieldy for single-threaded processing—finding the middle ground of efficient parallelism has become essential for maintaining agile AI development cycles.
‍
In the technical products sales domain, we faced a classic challenge: building a RAG-based GenAI chatbot capable of answering technical queries by leveraging historical customer service email conversations. Our analysis showed that approximately 65% of customer inquiries were repetitive in nature, making this an ideal candidate for automation through a modern GenAI solution.
The foundation of any effective AI system is high-quality data, and in our case, this meant processing a substantial volume of historical email conversations between customers and technical experts. As is often the case with real-world AI implementations, data preparation emerged as the most challenging aspect of the entire process.
‍
Our email data resided in two distinct sources:
Together, these sources comprised approximately 1 million email threads with a total data footprint of around 400GB. The sheer volume made even simple operations time-consuming when approached sequentially.
Our data preparation task involved multiple steps:
‍
The end goal was to transform this raw, unstructured data into a clean, structured corpus that could serve as the knowledge base for our RAG (Retrieval-Augmented Generation) system. With properly processed data, the chatbot would be able to retrieve relevant historical conversations and generate accurate, contextually appropriate responses to customer inquiries.
Let's examine the data processing implementation we initially developed to handle these email sources.
‍
Our approach to processing the email data required handling two distinct sources through separate workflows, as illustrated below:
‍
‍
We needed to process emails from two primary sources:
‍
To handle these diverse sources efficiently, we developed two parallel processing flows:
Since msg format is proprietary to Microsoft, we couldn't directly load it in a python script for content extraction. The parsers in scripting languages weren't good enough. Then we found a linux cli tool popular for this purpose msgconvert which can convert msg files into an open format eml files.
Example usage:
msgconvert /path/to/msg_file.msg --outfile /path/to/eml_file.eml
‍
Here's a high level code snippet for sequential eml parsing, cleaning and save to db:
#!/usr/bin/env python3
import os
from pathlib import Path
import argparse
from tqdm import tqdm
def process_single_email(file_path):
"""
Process a single email file
- Parse the email
- Extract relevant data
- Transform content as needed
"""
# Implementation details abstracted away
email_data = parse_email_file(file_path)
if email_data:
return transform_email_data(email_data)
return None
def parse_email_file(file_path):
"""Parse an email file and return structured data"""
# Implementation details abstracted away
return {...} # Email data dictionary
def transform_email_data(email_data):
"""Process and transform email data"""
# Implementation details abstracted away
return {...} # Processed email data
def save_to_database(processed_data):
"""Save processed email data to database"""
# Implementation details abstracted away
pass
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Process email files sequentially")
parser.add_argument("--dir", type=Path, default=Path.cwd(), help="Directory with email files")
parser.add_argument("--db-name", default="emails", help="Database name")
args = parser.parse_args()
# Get list of email files
email_files = list(args.dir.glob("*.eml"))
# Initialize database connection
db = initialize_database(args.db_name)
# Process emails sequentially with progress bar
results = []
for file_path in tqdm(email_files, desc="Processing emails"):
processed_data = process_single_email(file_path)
if processed_data:
results.append(processed_data)
# Save all results to database
save_to_database(results)
‍
The above script shows a traditional sequential approach to processing email files:
This approach is straightforward but can be slow when dealing with many files, as each file must be fully processed before moving to the next one.
‍
Outlook provides programmatic access and APIs via powershell and many other languages. We felt powershell is well suited and integrated for this purpose given the complexity of navigating through the Microsoft ecosystem. Here's the Outlook APIs documentation for reference.
# High-level script for extracting emails from Outlook folders and saving to SQLite
# Configuration
$DatabasePath = "C:\path\to\emails.db"
$FolderName = "Inbox"
function Initialize-Database {
param ($dbPath)
# Load SQLite assembly and create connection
# Implementation details abstracted away
# Return the connection object
return $connection
}
function Process-SingleEmail {
param ($emailItem, $dbConnection)
# Extract email properties
$subject = $emailItem.Subject
$body = $emailItem.Body
$htmlBody = $emailItem.HTMLBody
$receivedDate = $emailItem.ReceivedTime
# Save to database
Save-EmailToDatabase -emailData @{
Subject = $subject
Body = $body
HtmlBody = $htmlBody
ReceivedDate = $receivedDate
} -attachments $emailItem.Attachments -dbConnection $dbConnection
# Return success
return $true
}
function Save-EmailToDatabase {
param ($emailData, $attachments, $dbConnection)
# Save email data to database
# Implementation details abstracted away
# Save attachments
foreach ($attachment in $attachments) {
# Save attachment to database
# Implementation details abstracted away
}
}
function Main {
# Initialize Outlook
$outlook = New-Object -ComObject Outlook.Application
$namespace = $outlook.GetNamespace("MAPI")
# Connect to database
$dbConnection = Initialize-Database -dbPath $DatabasePath
# Get the target folder
$folder = $namespace.Folders.Item(1).Folders.Item($FolderName)
# Get all items
$totalItems = $folder.Items.Count
Write-Host "Found $totalItems emails to process"
# Process each email sequentially
$processedCount = 0
$startTime = Get-Date
foreach ($item in $folder.Items) {
if ($item -is [Microsoft.Office.Interop.Outlook.MailItem]) {
Process-SingleEmail -emailItem $item -dbConnection $dbConnection
$processedCount++
# Show progress
Write-Progress -Activity "Processing Emails" -Status "Processing email $processedCount of $totalItems" -PercentComplete (($processedCount / $totalItems) * 100)
}
}
$endTime = Get-Date
$duration = ($endTime - $startTime).TotalSeconds
# Close connection
$dbConnection.Close()
# Print summary
Write-Host "Completed processing $processedCount emails in $duration seconds"
Write-Host "Processing speed: $([math]::Round($processedCount / $duration, 2)) emails/second"
}
# Run the main function
Main
‍
The above script follows a straightforward approach:
This approach is simple to implement but can be slow for processing large mailboxes as it handles each email sequentially.
‍
After extracting the raw email data from Outlook, we process it through a Python cleaning script similar to the one used for EML parsing. This script handles common email-specific cleaning tasks including:
This sequential cleaning process faces the same performance limitations as our earlier EML parsing approach, with each email being processed one at a time.
‍
Sequential processing creates significant bottlenecks when handling large datasets. In our case with approximately 1 million emails, the limitations become quickly apparent.
With each email taking 1-2 seconds to process sequentially, our complete dataset would require:
This timeline creates several critical problems:
The traditional answer to big data processing involves systems like:
However, these enterprise-scale solutions introduce unnecessary complexity and cost for our scale. While powerful, they're designed for petabyte-scale operations across distributed infrastructure—overkill for our million-email dataset.
Concurrency and parallel computing are expansive fields, but we don't need their full complexity. We're looking for middle ground—more efficient than sequential processing but simpler than distributed big data frameworks.
‍
Our sequential pipeline has a key characteristic perfect for parallelization: each file processes independently of others. This independence means we can parallelize each step without compromising data integrity.
Fortunately, Linux, Python, and PowerShell all provide robust parallelism tools tailored to different use cases. We'll now explore how these accessible approaches can dramatically reduce processing time without the overhead of enterprise distributed systems.
‍
After identifying the bottlenecks in sequential processing, let's explore three practical parallel implementations across different platforms. Each provides a straightforward approach to parallelism without the complexity of enterprise-scale distributed systems.
GNU Parallel is a shell tool designed to execute jobs in parallel. It's particularly powerful for batch processing files in a Unix/Linux environment.
GNU Parallel takes a simple command that would normally run on a single file and automatically distributes it across multiple CPU cores. It handles the job distribution, monitoring, and output collection, making parallelism accessible without complex coding.
find "/src_path/to/msg_files" -name "*.msg" | parallel -j 8 msgconvert {} --outfile "/dest_path/to/eml_files"/{/.}.eml
‍
In this example:
This simple one-liner can reduce processing time by a factor roughly equal to the number of CPU cores available. On an 8-core system, we might see a 6-7x speedup (accounting for some overhead), reducing a 12-day job to less than 2 days without any complex coding.
‍
Python's multiprocessing library provides a robust way to utilize multiple cores for parallel processing. Unlike threading in Python (limited by the Global Interpreter Lock), multiprocessing creates separate processes that can truly run in parallel.
#!/usr/bin/env python3
import os
from pathlib import Path
import argparse
from multiprocessing import Pool
from tqdm import tqdm
import time
def process_single_email(file_path):
"""
Process a single email file
- Parse the email
- Extract relevant data
- Transform content as needed
"""
# Implementation details abstracted away
email_data = parse_email_file(file_path)
if email_data:
return transform_email_data(email_data)
return None
def parse_email_file(file_path):
"""Parse an email file and return structured data"""
# Implementation details abstracted away
return {...} # Email data dictionary
def transform_email_data(email_data):
"""Process and transform email data"""
# Implementation details abstracted away
return {...} # Processed email data
def save_to_database(processed_data):
"""Save processed email data to database"""
# Implementation details abstracted away
pass
def main():
# Parse command-line arguments
parser = argparse.ArgumentParser(description="Process email files in parallel")
parser.add_argument("--dir", type=Path, default=Path.cwd(), help="Directory with email files")
parser.add_argument("--db-name", default="emails", help="Database name")
parser.add_argument("--jobs", type=int, default=4, help="Number of parallel processes")
args = parser.parse_args()
# Get list of email files
email_files = list(args.dir.glob("*.eml"))
# Initialize database connection
db = initialize_database(args.db_name)
# Start timing
start_time = time.time()
# Process emails in parallel
with Pool(processes=args.jobs) as pool:
# Map the function to all files and collect results
# tqdm wraps the iterator to show a progress bar
results = list(tqdm(
pool.imap(process_single_email, email_files),
total=len(email_files),
desc=f"Processing emails with {args.jobs} workers"
))
# Filter out None results
results = [r for r in results if r]
# Calculate processing time
processing_time = time.time() - start_time
# Save all results to database
save_to_database(results)
print(f"Processed {len(results)} emails in {processing_time:.2f} seconds")
print(f"Processing speed: {len(results)/processing_time:.2f} emails/second")
if __name__ == "__main__":
main()
‍
The key parallel components in this Python script:
‍
Unlike the sequential approach where each email would be processed and immediately saved to the database one-by-one, this parallel implementation:
On an 8-core system, this typically yields a 5-7x speedup over sequential processing, with the bonus of built-in progress tracking and performance metrics.
‍
PowerShell offers several parallelism options, with Runspace Pools being particularly effective for I/O-bound operations like email processing.
# Parallel Outlook Email Processing
# High-level script for extracting emails from Outlook folders and saving to SQLite using parallel processing
# Configuration
$DatabasePath = "C:\path\to\emails.db"
$FolderName = "Inbox"
$MaxJobs = 4 # Number of parallel jobs
function Initialize-Database {
param ($dbPath)
# Load SQLite assembly and create connection
# Implementation details abstracted away
# Return the connection object
return $connection
}
function Process-SingleEmail {
param ($emailItem)
# Extract email properties
$subject = $emailItem.Subject
$body = $emailItem.Body
$htmlBody = $emailItem.HTMLBody
$receivedDate = $emailItem.ReceivedTime
# Process attachments if any
$attachmentData = @()
foreach ($attachment in $emailItem.Attachments) {
$attachmentData += @{
Name = $attachment.FileName
Data = $attachment.PropertyAccessor.GetProperty("http://schemas.microsoft.com/mapi/proptag/0x37010102")
}
}
# Return processed data (will be saved to DB later)
return @{
Subject = $subject
Body = $body
HtmlBody = $htmlBody
ReceivedDate = $receivedDate
Attachments = $attachmentData
}
}
function Save-EmailBatchToDatabase {
param ($emailBatch, $dbConnection)
# Begin transaction for better performance
$transaction = $dbConnection.BeginTransaction()
foreach ($emailData in $emailBatch) {
# Save email to database
# Implementation details abstracted away
# Save attachments if any
foreach ($attachment in $emailData.Attachments) {
# Save attachment to database
# Implementation details abstracted away
}
}
# Commit transaction
$transaction.Commit()
}
function Main {
# Initialize Outlook
$outlook = New-Object -ComObject Outlook.Application
$namespace = $outlook.GetNamespace("MAPI")
# Connect to database
$dbConnection = Initialize-Database -dbPath $DatabasePath
# Get the target folder
$folder = $namespace.Folders.Item(1).Folders.Item($FolderName)
# Get all items
$allItems = @()
foreach ($item in $folder.Items) {
if ($item -is [Microsoft.Office.Interop.Outlook.MailItem]) {
$allItems += $item
}
}
$totalItems = $allItems.Count
Write-Host "Found $totalItems emails to process"
# Process emails in parallel
$startTime = Get-Date
# Create a throttle limit for jobs
$throttleLimit = $MaxJobs
# Create runspace pool
$runspacePool = [runspacefactory]::CreateRunspacePool(1, $throttleLimit)
$runspacePool.Open()
# Create job tracking collections
$jobs = @()
$results = @()
# Create script block for parallel processing
$scriptBlock = {
param ($emailItem)
Process-SingleEmail -emailItem $emailItem
}
# Start parallel jobs
foreach ($item in $allItems) {
$powerShell = [powershell]::Create().AddScript($scriptBlock).AddParameter("emailItem", $item)
$powerShell.RunspacePool = $runspacePool
$jobs += @{
PowerShell = $powerShell
Handle = $powerShell.BeginInvoke()
}
}
# Track progress
$completed = 0
while ($jobs.Handle.IsCompleted -contains $false) {
$completedJobs = $jobs | Where-Object { $_.Handle.IsCompleted -eq $true }
foreach ($job in $completedJobs) {
if ($job.PowerShell.EndInvoke($job.Handle)) {
$results += $job.PowerShell.EndInvoke($job.Handle)
$completed++
# Show progress
Write-Progress -Activity "Processing Emails in Parallel" -Status "Processed $completed of $totalItems" -PercentComplete (($completed / $totalItems) * 100)
}
# Clean up resources
$job.PowerShell.Dispose()
$jobs.Remove($job)
}
# Small sleep to prevent CPU spinning
Start-Sleep -Milliseconds 100
}
# Process any remaining jobs
foreach ($job in $jobs) {
$results += $job.PowerShell.EndInvoke($job.Handle)
$job.PowerShell.Dispose()
}
# Save all results to database in batches
$batchSize = 100
for ($i = 0; $i -lt $results.Count; $i += $batchSize) {
$batch = $results[$i..([Math]::Min($i + $batchSize - 1, $results.Count - 1))]
Save-EmailBatchToDatabase -emailBatch $batch -dbConnection $dbConnection
}
# Calculate processing time
$endTime = Get-Date
$duration = ($endTime - $startTime).TotalSeconds
# Close resources
$runspacePool.Close()
$runspacePool.Dispose()
$dbConnection.Close()
# Print summary
Write-Host "Completed processing $totalItems emails in $duration seconds"
Write-Host "Processing speed: $([math]::Round($totalItems / $duration, 2)) emails/second"
Write-Host "Using $MaxJobs parallel jobs"
}
# Run the main function
Main
‍
The PowerShell parallelism implementation offers several key advantages over the sequential approach:
This approach separates the extraction phase from the storage phase, allowing each to be optimized independently. By processing multiple emails concurrently and then saving in efficient batches, we achieve both parallelism benefits and database optimization.
On a typical workstation, this parallel PowerShell approach can yield a 4-6x performance improvement over sequential processing for Outlook extraction tasks, turning a multi-week project into a few days of processing.
‍
To demonstrate the real-world impact of parallel processing, we conducted benchmarks using GNU Parallel with the msgconvert tool across varying workloads and parallelism levels.
The benchmark results clearly illustrate how parallel processing transforms the performance curve:
‍
‍
As shown in the graph, the performance gains follow Amdahl's Law, where:
‍
The most significant observations:
‍
The benchmark reveals an important practical consideration: hardware limitations establish an upper bound on parallelism benefits. Note how the lines for 32 and 64 parallel jobs nearly overlap, indicating we've reached the parallelization ceiling for this workload on the test hardware.
This plateau occurs due to several factors:
‍
While these benchmarks specifically measure GNU Parallel performance, similar scaling patterns emerge when using Python's multiprocessing library and PowerShell's RunspacePool. The fundamental performance characteristics—initial linear scaling followed by diminishing returns—remain consistent across all three implementations, though the exact efficiency curve varies based on implementation details.
For optimal performance across any platform, these results suggest configuring parallelism to match your hardware capabilities—typically setting job count to match or slightly exceed your CPU core count provides the best balance of performance gain versus resource consumption.
‍
Our journey through parallel processing implementations reveals crucial lessons for AI practitioners working with real-world data:
In the age of Generative AI, where model quality depends directly on data quality, these parallelization techniques serve as the unsung heroes of AI development pipelines. By reducing data processing time from weeks to days, they enable faster iteration cycles, more experimental approaches, and ultimately better AI solutions—keeping pace with the rapidly evolving GenAI landscape without requiring enterprise-scale infrastructure investments.
‍
‍
In the era of Generative AI, the quality and scale of data processing have become more critical than ever. While sophisticated language models and ML algorithms steal the spotlight, the behind-the-scenes work of data preparation remains the unsung hero of successful AI implementations.