Apache Spark is the go-to platform for processing large-scale datasets due to its speed and scalability. However, heavy workloads often run into performance bottlenecks caused by inefficient shuffle operations, improper resource utilization, and lack of insights into Spark jobs. Identifying and fixing these issues can be challenging without the right tools. There is a pressing need for a profiler to analyze Spark workloads and provide recommendations on how to fix the bottlenecks and improve performance.
"Why do we need something new when we already have Spark UI?" is a question that’s lingered in the minds of Apache Spark enthusiasts for some time. While Spark UI serves as a valuable tool for monitoring resources, it often leaves us craving a deeper understanding of our data processing landscape.
The Spark UI, despite all its merits, occasionally falls short in furnishing these vital insights and solutions. Hence, there's a growing need for a fresh approach – a Spark profiling and optimization tool that not only bridges these gaps but also extends its capabilities to unlock various optimization bottlenecks including heavy shuffle.
In this blog, we introduce you to ZettaProf(a comprehesive Spark profiler developed by Zettabolt) andAOCL (AMD Optimizing CPU Libraries). ZettaProf is an innovative tool that demystifies Spark profiling. ZettaProf, an advanced profiling tool for Spark applications, provides a bird's-eye view of performance, resource utilization reports, and actionable recommendations for improvement. Once bottlenecks—like heavy shuffle operations—are identified, AMD Optimizing CPU Libraries (AOCL) comes into play to optimize the underlying computation and fix performance issues.
The AOCL compression library, particularly its GZIP implementation, redefines the boundaries of what’s possible with compression in distributed systems. By making GZIP fast and efficient enough to be used for shuffle operations, AOCL GZIP brings a perfect balance of compression ratio and performance, unlocking new levels of optimization for Spark and other big data platforms.
This blog showcases the combined power of ZettaProf and AOCL to analyze, identify, and resolve shuffle-heavy operations, driving better performance for Spark applications.
Shuffle operations in Spark occur when data moves between partitions or nodes during tasks like joins, groupBy, or repartitioning, often leading to significant challenges. These include performance impacts such as increased query latency due to disk I/O, network communication, and serialization, as well as straggler tasks caused by data skew. Resource utilization is another concern, with high network bandwidth usage, disk I/O overhead, and memory pressure potentially causing bottlenecks or Out Of Memory (OOM) errors. Additionally, shuffle operations can impact stability and scalability, as they may overwhelm nodes, leading to failures or scalability limits as dataset sizes grow. Finally, these issues often result in higher infrastructure costs due to the need for additional resources and job reruns caused by shuffle bottlenecks.
These bottlenecks can degrade performance, increase resource costs, and make jobs unreliable at scale.
To address these bottlenecks, several approaches can be applied: mitigate data skew using salting, custom partitioners, or broadcast joins to balance uneven data distribution; tune shuffle partitions by adjusting spark.sql.shuffle.partitions
to match workload size and cluster resources; enable speculative execution to rerun slow tasks on other nodes; choose efficient compression codecs like GZIP, LZ4, or Snappy to reduce shuffle data size; enable shuffle file consolidation to reduce intermediate file overhead; allocate sufficient executor memory to prevent OOM errors during shuffle operations; optimize disk and network settings with faster SSDs and appropriate configurations to handle shuffle data efficiently; apply partition pruning to filter data early and minimize shuffle volumes; and repartition datasets to distribute the workload evenly across nodes.
In this above context, compression emerges as a powerful solution. By reducing the size of intermediate data, compression minimizes disk I/O, lowers network usage, and speeds up data processing during shuffles. Advanced compression algorithms like AOCL GZIP not only deliver superior compression ratios but also operate at high speeds, making them an ideal choice for optimizing shuffle-heavy workloads without compromising performance.
When introducing compression to optimize shuffle operations, there are trade-offs between various techniques that must be considered. High-compression algorithms like GZIP achieve excellent data size reduction, minimizing disk and network overhead, but their slower processing speeds can increase latency, making them less suitable for real-time workloads. Faster codecs like LZ4 and Snappy prioritize speed over compression ratio, reducing processing time but resulting in larger data sizes that may strain network and storage resources.
See also- Comparative Study of various compression techniques
ZettaProf is a next-generation profiling tool designed to optimize Spark solutions. It provides comprehensive insights into Spark jobs and helps users analyze and improve performance through its detailed reports and actionable recommendations.
ZettaProf offers comprehensive capabilities for Spark applications, including summary dashboards that provide a high-level view of job performance, runtimes, resource utilization, and bottlenecks. It enables detailed resource utilization analysis by tracking CPU, memory, disk reads/writes, and network usage to identify inefficiencies. With stage and query runtime analysis, ZettaProf dives deep into specific Spark stages and operators to pinpoint slow stages causing delays. Its shuffle diagnostics identify bottlenecks such as large data movements between nodes, disk spills due to insufficient memory, and imbalanced partitions leading to skewed workloads. Additionally, ZettaProf provides actionable recommendations to optimize shuffle performance and address other bottlenecks effectively.
When a Spark application experiences shuffle-related performance degradation, ZettaProf can help by:
Image: ZettaProf identifying High Shuffle issue
Image: ZettaProf identifying High Shuffle issue
The profiling output reveals that shuffle operations are causing high disk spills and underutilized CPU cores.
To overcome this problem there are different Compression Libraries available like gzip, snappy, lz4, Zstd and many more but AOCL is especially tailored for AMD processors, offering optimized performance for compression and decompression, especially for large-scale workloads on AMD EPYC-based systems. AOCL (AMD Optimizing CPU Libraries) is particularly beneficial for Velox Spark due to its focus on optimizing performance for AMD EPYC processors and other AMD-based hardware. Velox Spark benefits significantly from AOCL because of its reliance on highly efficient, hardware-optimized libraries to enhance computation, memory usage, and data processing speed.
AMD Optimizing CPU Libraries (AOCL) is a suite of high-performance libraries optimized for AMD EPYC™ processors and modern AMD hardware. It provides computational efficiency to fix shuffle bottlenecks and boost Spark performance.
The AOCL compression library is a state-of-the-art solution that significantly enhances data compression and decompression performance, particularly in distributed computing environments like Apache Spark. One of its standout features is its highly optimized implementation of GZIP, which sets it apart from other GZIP solutions.
The AOCL Compression Library offers advanced capabilities designed for performance and efficiency. Its hardware-aware optimization leverages AMD's architecture, utilizing vectorized instructions and efficient memory management for faster compression and decompression. With parallelized processing, AOCL GZIP is tailored for multi-core systems, ensuring low-latency operations even in shuffle-heavy scenarios. The library reduces CPU overhead during compression, freeing resources for critical computations and boosting system efficiency. It delivers enhanced throughput, outperforming standard GZIP implementations, and facilitates faster data movement during shuffle stages, essential for large-scale distributed tasks. Additionally, its lightweight memory usage minimizes pressure, reducing the risk of OutOfMemory (OOM) errors during operations.
Here's why AOCL GZIP is vastly superior and transformative for shuffle-intensive workloads:
AOCL GZIP stands out for its superior speed, efficiency, and scalability, achieving significantly faster compression and decompression compared to traditional GZIP implementations, making it ideal for real-time and large-scale data processing. Its ability to handle massive datasets ensures scalability for workloads like TPC-DS. Historically, GZIP was less favored for shuffle operations due to slower performance compared to algorithms like LZ4, but AOCL GZIP's transformative speed and efficiency make it an optimal choice for shuffle compression. This advancement allows users to achieve high compression ratios without sacrificing performance, enabling more data to fit into network transfers or disk storage and reducing overall shuffle costs. By combining high compression ratios with fast speeds, AOCL GZIP minimizes resource overhead, translating to lower infrastructure costs while maintaining system stability.
The journey begins with a meticulous analysis of your Spark application. ZettaProf examines
every aspect, from CPU and memory use to potential spills and skewness. The goal is to identify areas for improvement. If there are opportunities to optimize CPU and memory usage or eliminate major spills and skew issues, integrated tools like ZettaProf provide you with insights and recommendations. It's your green light to fine-tune your data processing application.
With new insights on how your application is running, you can take specific actions to optimize your Spark workflow for efficiency. These actions could involve:
A large e-commerce company processes massive datasets daily to generate business insights, such as customer behavior analysis and inventory optimization. The company’s workloads are very similar to TPC-DS dataset and queries. The company wanted us to recommend a solution that could solve shuffle runtime issues in shuffle-heavy queries similar to those from TPC-DS benchmark suite.
As the dataset scaled to 2TB+, shuffle operations became a major bottleneck, causing high latency due to slow disk I/O and network utilization, straggler tasks from data skew, and resource exhaustion leading to job failures. To address this, the team enabled shuffle compression but faced a trade-off: GZIP offered excellent compression ratios but was too slow, while LZ4 and Snappy were faster but had lower compression efficiency, resulting in larger intermediate data and higher network costs.
The team needed a compression algorithm that:
AMD’s AOCL compression library introduced a highly optimized version of GZIP, which fulfilled both performance and compression needs.
The team integrated AOCL GZIP into Spark’s shuffle compression configuration. Key changes included:
To assess AOCL GZIP’s impact on shuffle-intensive Spark workloads:
Our journey began with a meticulous analysis of our Spark application using ZettaProf. This analysis encompassed every aspect, from CPU and memory utilization to potential spills and skewness. The primary goal was to identify areas ripe for improvement. Whenever opportunities arose to enhance CPU and memory utilization or mitigate spills and skew issues, ZettaProf provided us with precise insights and recommendations.
The below provided table shows benchmarking results for queries executed on the TPC-DS 2TB dataset using the default Spark configuration along with AOCL GZIP. In these tests, Velox-Gluten Spark was used with GZIP in the Pre-implementation phase of AOCL GZIP.
Codec Configurations (How to enable)-
OSS Spark (Vanilla)- lz4 |
Vann spark.io.compression.codec = lz4 |
GZIP |
spark.gluten.sql.columnar.shuffle.codec = gzip |
LZ4 |
spark.gluten.sql.columnar.shuffle.codec = lz4 |
Table:1
Query | OSS Spark (Open source -Spark) | Gluten-Velox OSS - GCC Compiler with gzip Compression | Gluten-Velox OSS - GCC Compiler with lz4 Compression | Gluten-Velox OSS - AOCC Compiler with gzip compressio | AOCL GZIP improvement over Vanilla | AOCL GZIP improvement over GCC GZIP | AOCL GZIP improvement over GCC LZ4 |
---|---|---|---|---|---|---|---|
q17 |
313.449 |
73.82 |
63.261 |
47.203 |
512.20% |
56.00% |
33.60% |
q23a |
472.453 |
163.436 |
183.414 |
135.86 |
245.70% |
23.10% |
39.90% |
q23b |
435.973 |
169.87 |
193.071 |
142.187 |
223.70% |
19.90% |
38.00% |
q25 |
467.903 |
82.235 |
94.252 |
66.228 |
469.10% |
21.90% |
39.40% |
q29 |
242.307 |
73.364 |
71.258 |
47.281 |
478.10% |
56.20% |
48.90% |
q64 |
990.266 |
149.584 |
178.931 |
135.831 |
578.60% |
11.70% |
31.80% |
q78 |
739.057 |
140.516 |
153.519 |
109.406 |
570.20% |
25.10% |
39.50% |
q80 |
647.748 |
130.909 |
126.169 |
91.858 |
575.10% |
39.90% |
39.70% |
Total |
4309.156 |
983.734 |
1063.875 |
775.854 |
435.50% |
26.70% |
38.20% |
** All Numbers in Seconds
Note:- Since Gluten does not support GzIp by default hence we did code changes to support gzip with Gluten.
Configure your Spark cluster to use AOCL:
spark.executor.extraLibraryPath=/path/to/aocl/lib
spark.executor.extraJavaOptions="-XX:UseNUMA"
As we can see in Fig.1 of ZettaDiff, Blue bar denotes Pre-Implementation of AOCC-AOCL and Pink Bar Denotes Post Implementation of AOCC-AOCL.
Fig.1
Image: ZettaProf identifying High Shuffle issue
Fig.2 & 3
Fig. 4
Fig. 4
The image displays above in Fig. 4 metrics for shuffle operations, highlighting data read and written during distributed tasks. In both "Total Shuffle Read" and "Total Shuffle Write," the blue bars (153.86 GB) represent the total data processed, while the pink bars (101.68 GB) indicate the portion of data spilled to disk, likely due to memory constraints. These metrics provide insights into shuffle efficiency, with higher disk spills (pink bars) suggesting potential performance bottlenecks, such as insufficient memory allocation or suboptimal configuration.
We tested a 2TB TPC-DS benchmark workload with heavy shuffle operations. Here's the performance comparison.
Fig: 5
Fig. 5
This analysis highlights the potential of advanced compiler and library optimizations in large-scale distributed computing scenarios, such as TPC-DS benchmarks.
Fig. 6
Fig. 6
The combination of ZettaProf and AOCL provides a complete solution for optimizing Spark applications:
By leveraging ZettaProf for insights and AOCL for hardware-optimized computation, organizations can reduce job runtimes, lower costs, and scale efficiently.
This blog highlights the synergy between ZettaProf, an advanced profiling tool for Spark, and AOCL, a high-performance library optimized for AMD hardware, to tackle shuffle-heavy workloads. ZettaProf identifies performance bottlenecks, such as high latency, resource inefficiencies, and data skew in shuffle operations, providing actionable insights and recommendations. AOCL, particularly its optimized GZIP implementation, addresses these bottlenecks by delivering superior compression ratios and high-speed performance, enabling its use in shuffle-heavy scenarios where traditional GZIP would fall short.
The blog showcases a case study using a 2TB TPC-DS workload, demonstrating significant performance improvements when AOCL GZIP replaces standard compression codecs like LZ4 and GCC GZIP. The optimized solution reduces shuffle overhead, enhances scalability, and lowers infrastructure costs. By combining ZettaProf's diagnostic capabilities with AOCL's hardware-aware optimization, organizations can achieve faster job runtimes, improved resource utilization, and efficient scaling of Spark applications.
Data engineers working with Spark often face challenges in optimizing performance. These include identifying bottlenecks, managing resources effectively, addressing data and time skew issues, reducing spills and errors, and selecting the right join algorithms and configurations. Traditional tools lack intelligent optimization recommendations. ZettaProf provides holistic insights, simplifying optimization by pinpointing issues, offering resource guidance, and delivering intelligent suggestions. This streamlines the process and empowers engineers to maximize Spark applications' potential.