Distributed Computing for Large Language Models: Scalability, Speed, and Fault Tolerance
Paged Data Access to DRAM
Basic Idea: “Split” data file (virtually or physically) and stage reads of its pages from disk to DRAM (vice versa for writes)
Page Management in DRAM Cache
- ❖ Caching: Retaining pages read from disk in DRAM
- ❖ Eviction: Removing a page frame’s content in DRAM
- ❖ Spilling: Writing out pages from DRAM to disk
Cache Replacement Policy
The algorithm that chooses which page frame(s) to evict when a new page has to be cached but the OS cache in DRAM is full
- ❖ Popular policies include Least Recently Used, Most Recently Used, etc. (more shortly)
❖ Disk I/O Cost
Abstract counting of number of page I/Os; can map to bytes given page size
❖ Communication/Network I/O Cost
Abstract counting of number of pages/bytes sent/received over network
Scaling to (Local) Disk
- ❖ In general, scalable programs stage access to pages of file on disk and efficiently use available DRAM
- ❖ Modern DRAM sizes can be 10s of GBs; so we read a “chunk”/“block” of file at a time (say, 1000s of pages)
Generic Cache Replacement Policies
Cache Replacement Policy: Algorithm to decide which page frame(s) to evict to make space
- ❖ Typical frame ranking criteria: Ø recency of use Ø frequency of use Ø number of processes reading it
- ❖ Typical optimization goal: Reduce total page I/O costs.
- ❖ A few well-known policies:
- ❖ Least Recently Used (LRU): Evict page that was used longest ago
- ❖ Most Recently Used (MRU): (Opposite of LRU)
- ❖ ML-based caching policies are “hot” nowadays!
Data Layouts and Access Patterns
- ❖ Recall that data layouts and data access patterns affect what data subset gets cached in higher level of memory hierarchy. Key Principle: Optimizing layout of data file on disk based on data access pattern can help reduce I/O costs
Straightforward: row-stored; Select C From R (cost: 6 + out), Select Max(A) From R (cost: 6 + out), SELECT A FROM R WHERE B = “3b” (cost: 6 + out)
Non-dedup Project: column-stored; Select C From R (cost: 2 + out), Select Max(A) From R (cost: 2 + out)
Not Straightforward (groupby) but row-stored: select A, sum(D) From R Group By A (cost: 6 + out)
Q: But what if hash table > DRAM size?
- ❖ Program might crash depending on backend implementation. OS may keep swapping pages of hash table to/from disk; aka “thrashing”
Q: How to scale to large number of groups?
- ❖ Divide and conquer! Split up R based on values of A
- ❖ HT for each split may fit in DRAM alone
- ❖ Reduce running info. size if possible
Scalable BGD: cost 6 + out
Stochastic Gradient Descent
- ❖ Basic Idea: Use a sample (mini-batch) of D to approximate gradient instead of “full batch” gradient.
- ❖ Done without replacement
- ❖ Randomly reorder/shuffle D before every epoch
- ❖ Sequential pass: sequence of mini-batches
❖ Mini-batch gradient computations: 1 filescan per epoch. Total I/O cost per epoch: 1 shuffle cost + 1 filescan cost
Data Parallelism
Partition large data file physically across nodes/workers; within worker: DRAM-based or disk-based. Data parallelism is technically orthogonal to these 3 paradigms — shared-nothing, shared-disk, shared-memory but most commonly paired with shared-nothing
❖ Partitioning a data file across nodes is aka sharding
❖ Part of a stage in data processing workflows called ExtractTransform-Load (ETL)
❖ ETL is an umbrella term for all kinds of processing done to the data file before it is ready for users to query, analyze, etc
❖ Sharding, compression, file format conversions, etc.
Data Partitioning Strategies
- ❖ Row-wise/horizontal partitioning is most common (sharding)
- ❖ 3 common schemes (given k nodes):
- ❖ Round-robin: assign tuple i to node i MOD k
- ❖ Hashing-based: needs hash partitioning attribute(s)
- ❖ Range-based: needs ordinal partitioning attribute(s)
- ❖ Tradeoffs:
- ❖ For Relational Algebra (RA) and SQL:
- ❖ Hashing-based most common in practice for RA/SQL
- ❖ Range-based often good for range predicates in RA/SQL
- ❖ But all 3 are often OK for many ML workloads (why?)
- ❖ Replication of partition across nodes (e.g., 3x) is common to enable “fault tolerance” and better parallel runtime performance
Other Forms of Data Partitioning
Columnar Partitioning, Hybrid/Tiled Partitioning
Cluster Architectures
Manager-Worker Architecture
- ❖ 1 (or few) special node called Manager (aka “Server” or archaic “Master”); 1 or more Workers
- ❖ Manager tells workers what to do and when to talk to other nodes
- ❖ Most common in data systems (e.g., Dask, Spark, par. RDBMS, etc.)
Peer-to-Peer Architecture
- ❖ No special manager
- ❖ Workers talk to each other directly
- ❖ E.g., Horovod
- ❖ Aka Decentralized (vs Centralized)
Bulk Synchronous Parallelism (BSP) Aka (Barrier) Synchronization
❖ Most common protocol of data parallelism in data systems (e.g., in parallel RDBMSs, Hadoop, Spark) ❖ Shared-nothing sharding + manager-worker architecture. 1. Sharded data file on workers 2. Client gives program to manager (e.g., SQL query, ML training, etc.) 3. Manager divides first piece of work among workers 4. Workers work independently on self’s data partition (cross-talk can happen if Manager asks) 5. Worker sends partial results to Manager 6. Manager waits till all k done 7. Go to step 3 for next piece
Cluster overhead factors that hurt speedup: ❖ Per-worker: startup cost; tear-down cost ❖ On manager: dividing up the work; collecting/unifying partial partial results from workers ❖ Communication costs: talk between manager-worker and across workers (when asked by manager) ❖ Barrier synchronization suffers from “stragglers” (workers that fall behind) due to skews in shard sizes and/or worker capacities
Distributed filesystem (DFS) is a cluster-resident filesystem to manage distributed files ❖ A layer of abstraction on top of local filesystems ❖ Nodes manage local data as if they are local files ❖ Remote DFS: Files reside elsewhere and read/written on demand by workers ❖ In-Situ DFS: Files resides on cluster where workers exist
Network Filesystem (NFS) ❖ Main pro: simplicity of setup and usage ❖ But many cons: Not scalable to very large files, Full data replication, High contention for concurrent reads/writes, Single-point of failure
Hadoop Distributed File System (HDFS)❖ Highly scalable; scales to 10s of 1000s of nodes, PB files ❖ Designed for clusters of cheap commodity nodes ❖ Parallel reads/writes of sharded data “blocks” ❖ Replication of blocks to improve fault tolerance ❖ Cons: Read-only + batchappend (no fine-grained updates/writes)
Data-Parallel Dataflow: A dataflow graph with ops wherein each operation is executed in a data-parallel manner, a distributed model of computation
Data-Parallel Workflow: A generalization; each vertex a whole task/process that is run in a data-parallel manner
Data-Parallel Dataflow emphasizes the flow of data through a computation, whereas Data-Parallel Workflow focuses on parallel execution of tasks operating on different subsets of the input data.
Parallel RDBMSs: ❖ Parallel RDBMSs are highly successful and widely used ❖ Typically shared-nothing data parallelism ❖ Optimized runtime performance + enterprise-grade features: ❖ ANSI SQL & more ❖ Business Intelligence (BI) dashboards/APIs ❖ Transaction management; crash recovery ❖ Indexes, auto-tuning, etc.
4 new concerns of Web giants vs RDBMSs built for enterprises: Developability, Fault Tolerance, Elasticity, Cost
MapReduce: ❖ A programming model for parallel programs on sharded data + distributed system architecture ❖ Map and Reduce are terms from functional PL; software/data/ML engineer implements logic of Map, Reduce ❖ System handles data distribution, parallelization, fault tolerance, etc ❖ Under the hood, each Mapper and Reducer is a separate process; Reducers face barrier synchronization (BSP) Fault tolerance achieved using data replication
Benefits and Catch of MapReduce: ❖ Goal: High-level functional ops to simplify data-intensive programs ❖ Key Benefits: ❖ Map() and Reduce() are highly general; any data types/structures; great for ETL, text/multimedia ❖ Native scalability, large cluster parallelism ❖ System handles fault tolerance automatically ❖ Decent FOSS stacks (Hadoop and later, Spark) ❖ Catch: Users must learn “art” of casting program as MapReduce ❖ Map operates record-wise; Reduce aggregates globally ❖ But MR libraries now available in many PLs: C/C++, Java, Python, R, Scala, etc.
Map(): Process one “record” at a time independently ❖ A record can physically batch multiple data examples/tuples ❖ Dependencies across Mappers not allowed ❖ Emit 1 or more key-value pairs as output(s) ❖ Data types of input vs. output can be different
Reduce(): Gather all Map outputs across workers sharing same key into an Iterator (list) ❖ Apply aggregation function on Iterator to get final output(s)
Do word count in SQL ❖ First step: Transform text docs into relations and load: Part of the ETL stage Suppose we pre-divide each doc into words w/ schema: DocWords (DocName, Word) ❖ Second step: a single, simple SQL query! SELECT Word, COUNT (*) FROM DocWords GROUP BY Word [ORDER BY Word]
Select Operation (Map-only jobs): ❖ Input Split: ❖ Shard table tuple-wise ❖ Map(): ❖ On tuple, apply selection condition; if satisfies, emit key-value (KV) pair with dummy key, entire tuple as value ❖ Reduce(): ❖ Not needed! No cross-shard aggregation here ❖ These kinds of MR jobs are called “Map-only” jobs
Simple Agg: ❖ Input Split: ❖ Shard table tuple-wise ❖ Map(): ❖ On agg. attribute, compute incr. stats; emit pair with single global dummy key and incr. stats as value ❖ Reduce(): ❖ Since only one global dummy key, Iterator has all sufficient stats to unify into global agg.
GROUP BY Agg: ❖ Input Split: ❖ Shard table tuple-wise ❖ Map(): ❖ On agg. attribute, compute incr. stats; emit pair with grouping attribute as key and stats as value ❖ Reduce(): ❖ Iterator has all suff. stats for a single group; unify those to get result for that group ❖ Different reducers will output different groups’ results
What is Hadoop then? FOSS system implementation with q MapReduce as programming model, and q HDFS as filesystem ❖ MR user API; input splits, data distribution, shuffling, and fault tolerances handled by Hadoop under the hood ❖ Exploded in popularity in 2010s: 100s of papers, 10s of products ❖ A “revolution” in scalable+parallel data processing that took the DB world by surprise ❖ But nowadays Hadoop largely supplanted by Spark
Dataflow programming model (subsumes most of Relational Algebra; MR) ❖ Inspired by Python Pandas style of chaining functions ❖ Unified storage of relations, text, etc.; custom programs ❖ Custom design (and redesign) from scratch ❖ Key idea vs Hadoop: exploit distributed memory to cache data ❖ Key novelty vs Hadoop: lineage-based fault tolerance
Key concept in Spark. ❖ RDD has been the primary user-facing API in Spark since its inception. At the core, an RDD is an immutable distributed collection of elements of your data. ❖ Good for dataset low-level transformation, actions and control. ❖ Good for unstructured data. ❖ Good for functional programming data manipulation. ❖ Not recommended for imposing a schema on your data. ❖ Lacks some optimization and performance benefits
Databricks now recommends SparkSQL/DataFrame API; avoid RDD API unless really needed! ❖ Key Reason: Automatic query optimization becomes more feasible
Query Optimization in Spark: ❖ Projection pushdown: ❖ Drop unneeded columns early on ❖ Selection pushdown: ❖ Apply predicates close to base tables ❖ Join order optimization: ❖ Not all joins are equally costly ❖ Fusing of aggregates
Data “Lake”: Loose coupling of data file format and data/query processing stack (vs RDBMS’s tight coupling); many frontends
Scalable BGD with MapReduce/Spark: ❖ Very similar to algebraic SQL; vector addition ❖ Input Split: Shard table tuple-wise ❖ Map(): ❖ On tuple, compute per-example gradient; add these across examples in shard; emit partial sum with single dummy key ❖ Reduce(): ❖ Only one global dummy key, Iterator has partial gradients; just add all those to get full batch gradient
Building Stage of ML Lifecycle: ❖ Perform model selection, i.e., convert prepared ML-ready data to prediction function(s) and/or other analytics outputs ❖ What makes model building challenging/time-consuming? ❖ Heterogeneity of data sources/formats/types ❖ Configuration complexity of ML models ❖ Large scale of data ❖ Long training runtimes of some models ❖ Pareto optimization on criteria for application ❖ Evolution of data-generating process/application
Feature Engineering: Umbrella term for many tasks dep. on type of ML model trained:
1. Recoding and value conversions ❖ Common on relational/tabular data ❖ Typically needs some global column stats + code to reconvert each tuple (example’s feature values)
2. Joins and/or aggregates ❖ Common on relational/tabular data ❖ Most real-world relational datasets are multi-table; require key-foreign key joins, aggregation-and-key-key-joins, etc.
3. Feature interactions ❖ Sometimes used on relational/tabular data, especially for high-bias models like GLMs ❖ Pairwise is common; ternary is not unheard of
4. Feature selection ❖ Often used on high dimensional relational/tabular data ❖ Basic Idea: Instead of using whole feature set, use a subset ❖ Formulated as a discrete optimization problem ❖ NP-Hard in #features in general
5. Dimensionality reduction ❖ Often used on relational/structured/tabular data ❖ Basic Idea: Transforms features to a different latent space ❖ Examples: Principal Component Analysis (PCA), Singular Value Decomposition (SVD), Linear Discriminant Analysis (LDA), Matrix factorization
6. Temporal feature extraction ❖ Many relational/tabular data have time/date ❖ Per-example reconversion to extract numerics/categoricals ❖ Sometimes global stats needed to calibrate time ❖ Complex temporal features studied in time series mining
7. Textual feature extraction and embeddings ❖ Many relational/tabular data have text columns; in NLP, whole example is often just text ❖ Most classifiers cannot process text/strings directly ❖ Extracting numerics from text studied in text mining (Bag-of-words, scaling global stats, reconversion, NLP)
8. Learned feature extraction in deep learning ❖ A big win of Deep Learning (DL) is no manual feature engineering on unstructured data ❖ DL is not common on structured/tabular data, but growing in popularity ❖ DL is very versatile: almost any data type as input and/or output: ❖ Convolutional NNs (CNNs) over image tensors ❖ Recurrent NNs (RNNs) and Transformers over text ❖ Graph NNs (GNNs) over graph-structured data ❖ Software 2.0: Buzzword for such “learned feature extraction” programs vs old hand-crafted feature engineering
Hyper-parameters: Knobs for an ML model or training algorithm to control bias-variance tradeoff in a dataset-specific manner to make learning effective ❖ Most common approach: grid search; pick set of values for each hyperparameter and take cartesian product ❖ Also common: random search to subsample from grid
Automated Model Selection / AutoML: ❖ Pros: Ease of use; lower human cost; easier to audit; improves ML accessibility ❖ Cons: Higher resource cost; less user control; may waste domain knowledge; may leave performance on the table
Generalized Linear Models (GLMs): from statistics; Bayesian Networks: inspired by causal reasoning; Decision Tree-based: CART, Random Forest, Gradient-Boosted; Trees (GBT): inspired by symbolic logic; Support Vector Machines (SVMs): inspired by psychology; Artificial Neural Networks (ANNs): Multi-Layer Perceptrons (MLPs), Convolutional NNs (CNNs), Recurrent NNs (RNNs), Transformers, etc.; inspired by brain neuroscience; Unsupervised: Clustering (e.g., K-Means), Matrix Factorization, Latent Dirichlet Allocation (LDA), etc.
Scalable ML Training Systems: 1. Scalability: In-memory libraries vs Scalable ML system (works on larger-than-memory datasets) 2. Target Workloads: General ML library vs Decision treeoriented vs Deep learning, etc. 3. Implementation Reuse: Layered on top of scalable data system vs Custom from-scratch framework
Batch/offline prediction: model -> database(periodically run model on new data and cache the results in a databse); Realtime/Online prediction: model within server(embedded within an application?); Model-as-a-Service: model connect client and server(run model on its own web server, the backend interact with the model by making requests to the model service and receiving responses back)
Guest Speaker:
Ray: A distributed framework for scaling AI & Python workloads(Jules S. Damji)
Why Ray? Machine learning is pervasive; Distributed computing is a necessity; Python is the default language for DS/ML
What is Ray? ● A simple/general-purpose library for distributed computing ● An ecosystem of Python libraries (for scaling ML and more) ● Runs on laptop, public cloud, K8s, on-premise; A layered cake of functionality and capabilities for scaling ML workloads
Ray AI Runtime (AIR) is a scalable runtime for end-to-end ML applications
Ray Basic Design Patterns ● Ray Parallel Tasks ○ Functions as stateless units of execution ○ Functions distributed across the cluster as tasks ● Ray Objects as Futures ○ Distributed (immutable objects) store in the cluster ○ Fetched when materialized ○ Enable massive asynchronous parallelism ● Ray Actors ○ Stateful service on a cluster ○ Enable Message passing
Ray Task — A function remotely executed in a cluster; Ray Actor — A class remotely executed in a cluster
What are pain points in training/serving LLMs? 1. Scaling is costly and hard to manage ● Need spot instance support ● Hard to run distributed workloads ● Hard to optimize CPUs/GPUs 2. Existing serving / inference solutions don’t scale ● Individual replicas can’t be distributed ● Need to be able to integrate business logic 3. Distributed Training hard to get working right ● Hyperparameters need to be tuned ● Need a platform to iterate very quickly at scale
Ray provides generic platform for LLMs. 1. Simplify orchestration and scaling ● Spot instance support for data parallel training ● Easily spin up and run distributed workloads on any cloud ● Optimize CPUs/GPUs by pipelining w/ Ray Data 2. Inference and serving ● Ability to support complex pipelines integrating business logic ● Ability to support multiple node serving 3. Training ● Integrates distributed training with distributed hyperparameter tuning w/ ML frameworks
Key Takeaways ● Distributed computing is a necessity & norm ● Ray’s vision: make distributed computing simple ○ Don’t have to be distributed programming expert ● Build your own disruptive apps & libraries with Ray ● Scale your ML workloads with Ray libraries (Ray AIR) ● Ray offers the compute substrate for Generative AI workloads
Distributed Computing (Venky Ravi)
Distributed Computing Paradigms: Different paradigms and models used in distributed computing: Batch processing: Breaking tasks into smaller sub-tasks that can be processed independently. Message passing: Communication between nodes through message passing protocols like MPI. Shared memory: Multiple nodes accessing a common memory space. MapReduce: A programming model for processing large datasets in a distributed manner. Stream processing: Real-time processing of continuous data streams.
Distributed File Systems — Fault Tolerance, Scalability, Data Locality
Challenges & considerations in distributed analysis: Storage Tradeoff, Hybrid Caching, Distributing Data, Latency Impact, Overhead in Data Transfer, Hardware Support, Serialization and Interpretation
Distributed Collaborative filtering: 1. User-Item Data: Represents the initial user-item interaction data used for collaborative filtering. 2. Data Partitioning: The data is partitioned into subsets and distributed across multiple nodes. 3. Local Similarity Computation: Each node independently computes local similarities (e.g., cosine similarity) based on the user-item interactions available on that node. 4. Data Exchange and Aggregation: The computed similarities are exchanged and aggregated across the nodes to generate a global similarity matrix. 5. Recommendation Generation: Each node utilizes the global similarity matrix and the locally available user-item interactions to generate personalized recommendations for its subset of users. 6. Result Integration and Final Recommendations: The recommendations generated by each node are integrated to produce the final distributed recommendations.
Language Model: A language model is an AI model that learns patterns and relationships in natural language text, enabling it to generate coherent and contextually relevant text. It captures the probability distribution of word sequences, predicting the likelihood of the next word given the previous context.
Language Models and Challenges in Distributed Training and Inference: 1. Computational Resources: Large language models require immense computational power, memory, and storage. Training and inference across distributed systems necessitate significant hardware resources 2. Communication Overhead: In distributed training, coordinating updates across multiple nodes introduces communication overhead. Efficient communication protocols and optimized data exchange mechanisms are essential. 3. Data Synchronization: Ensuring consistent model parameters and synchronization of large amounts of data across nodes is a challenge.In distributed inference, managing data consistency for parallel processing can be complex. 4. Scalability: Scaling distributed training and inference to accommodate growing model sizes and datasets is crucial. Load balancing and resource allocation need to be optimized for efficient scalability.
Benefits of Distributed Computing for Large Language Models: Scalability: Distributed computing enables efficient scaling of resources to handle large-scale training and inference workloads. Speed: Parallel processing across multiple nodes reduces the time required for training and inference tasks. Fault tolerance: Distributed systems provide resilience by replicating data and computations across multiple nodes, ensuring uninterrupted operation even in the face of failures
Real-world Applications: Language translation: Distributed computing facilitates the training and serving of language translation models that can handle large volumes of text. Content generation: Distributed language models enable the generation of coherent and contextually relevant content for various applications, such as chatbots or content personalization. Sentiment analysis: Large language models distributed across multiple nodes can process and analyze vast amounts of text data to derive sentiment insights.
Considerations and Challenges: Data synchronization: Ensuring consistency and synchronization of data across distributed nodes. Communication overhead: Efficient communication and coordination between nodes to minimize latency and optimize performance. Resource management: Proper allocation and management of computational resources across the distributed system.
