5 lessons from profiling a Flink Application ☕

Observations from profiling a Flink Application ☕

Observations from profiling a Flink Application ☕

🐿️ Background

Recently I was given a problem statement to investigate why our Flink Applications were going out of memory. To mitigate, we were increasing the heap size of the Task Managers but that was not really fixing the main issue of why containers were OOMing. I really like such kind of problems as 1) it requires you to dig deep into the application code. 2) it is more kind of research oriented problem where one needs to try out different experiments to root cause.


👨‍🎓 Lessons Learned

It was a 3-4 weeks of effort where I profiled around 10+ heap dumps to understand the behaviour of Flink Application under different traffic load. I am listing down some of the learnings from this experience.


Type of Flink Job

Set of Kafka Topics (Avro) Flink App Iceberg Table (ORC).


1. When there is no Memory Leak at all

The hardest case of profiling is when you don’t see any obvious Memory leak scenario from the dumps. But it's always good to rule it out by trying it out. Take two heap dumps of the Task Manager (TM) application 1) shortly after the startup 2) after an hour of its startup. However, there were no major difference in the heap sizes between the two. Also, the OOMs happened pretty much during the TM startup if it was going to happen. There were two major patterns though:

Scenarios when OOMs happened:

  1. When there was a lot of Kafka lag built up which has to be caught
  2. When multiple topics are assigned to a single TM

2. Retained Heaps are often misleading

It's always good to start with the top objects with the most 'Retained Heap' size. In technical terms, the retained heap of an object is the sum of the memory occupied by the object itself and the memory occupied by other objects that are accessible only through that object.
However, there will be a lot of redundant entries of retained heap sizes. Like below, ColumnVector[] and StructColumnVector are pretty much pointing to the same collection of objects. So, it won't make sense to add up the retained heap %ages together (The shallow size %ages however will sum up to 100).
Another major gotcha is that if multiple classes are referencing an heavy object, retained Heap will not show the contribution of the heavy object under any of referenced classes. For example, in below, Class A and Class B both reference Class C objects, then Class C size will not be a part of Class A or B's retained heap values.

3. Balancing CPU vs Mem : A Tradeoff

In streaming applications, the heap is mostly occupied by:

  1. Kafka Consumer Buffers (amount of records kept in blocking queue to be available for ingestion)
  2. Avro to ORC converted Vectorized Row Batches
  3. Separate ORC TreeWriter batches for the different columns

Decreasing the buffer size or row batch sizes helps to reduce the total heap size. However, this reduces the overall throughput. Also, this increased the CPU as ORC dictionary and compression is more efficient on a bigger batch size.

4. Faster Heap dump downloads

The heap dumps that were needed to be profiled were shy of 20GB in size.. The task managers were running as K8s pods with no ssh/scp available to the production host. Using the vanilla kubectl cp was 1) very slow (1GB was getting downloaded in 10+mins) 2) not showing any download progress. With multiple heap dumps which were needed to be taken, i worked on optimizing the whole download process.

4.1 Heap profile compression

Obviously, the first solution is compressing the .hprof file. However, chosing the correct compression algorithm can reduce the .tar size by at least 10X. If tar is the only binary available on the container, I found xz algorithm to be the most space efficient.

tar -c --use-compress-program="xz -9 -T0" -f flink-ingestion-job_$(date +"%Y-%m-%d_%H-%M-%S").hprof.tar.xz heap_dump_1.hprof

Details aboutxz -9 -T0

  • -9 is the maximum compression level
  • -T0 utilizes the multi-threading configuration

In case, download fails during kubectl cp

I observed when i compressed a large 15+GB hprof file, the kubectl cp often failed after downloading 90% of the file. I am not too sure of the root cause of it. However, what worked for me was to split the .hprof file first into multiple 5GB files. Compressing the directory and then kubectl cp worked fine.

mkdir split_parts && cd split_parts
split -b 5G ../flink-ingestion-job-taskmanager-7888b4bbb9-w8h4d.hprof flink-ingestion-job-i164-taskmanager-7888b4bbb9-w8h4d.prof_

# this will create and move all parts to split_parts/
cd .. && tar -c --use-compress-program="xz -9 -T0" -f flink_kyoto-ingestion-job_$(date +"%Y-%m-%d_%H-%M-%S")_split.hprof.tar.xz split_parts

To get the progress bar during kubectl cp , the following helped using kubectl exec:

kubectl exec flink-pod -- tar cf - /data/flink/flink-ingestion-job_2025-09-08_10-54-18_split.hprof.tar.xz | pv | tar xf - -C .

Finally, to extract the tar and stitch, these are the commands:

$ tar -xvf export/content/data/flink/flink-ingestion-job_2025-09-08_10-54-18_split.hprof.tar.xz
  x split_parts/
  x split_parts/flink-ingestion-job-taskmanager-7888b4bbb9-w8h4d.prof_aa
  x split_parts/flink-ingestion-job-taskmanager-7888b4bbb9-w8h4d.prof_ab
  x split_parts/flink-ingestion-job-taskmanager-7888b4bbb9-w8h4d.prof_ac
  x split_parts/flink-ingestion-job-taskmanager-7888b4bbb9-w8h4d.prof_ad
  x split_parts/flink-ingestion-job-taskmanager-7888b4bbb9-w8h4d.prof_ae  
  
$ cat split_parts/flink-ingestion-job-taskmanager-7888b4bbb9-w8h4d.prof_a* > flink-ingestion-job-taskmanager-7888b4bbb9-w8h4d-Sept.hprof
  
# load this hprof now in visualvm
# flink-ingestion-job-taskmanager-7888b4bbb9-w8h4d-Sept.hprof

5. Querying via OQL for faster analysis

Last but not the least, to correlate the formulae for object sizes to topic counts, buffer sizes, row batches, etc. i needed a programmatic way to evaluate the top objects. For this, I found OQL console in VisualVM very useful. There was some learning curve to understand the language, but once I got the hang of it, it was helpful to get counts, sums, mins, max, etc.

👋 Conclusion

I tried both VisualVM and IntelliJ IDEA's Profiler for heap analysis. IntelliJ's profiler is fast for retained heap analysis. VisualVM is 2-3X slower when loading and calculating retained sizes, but once loaded, it persists the calculation on disk, making subsequent loads of the same heap much faster.
There are a few CPU side observations (heapdumps also capture thread-dumps) which I will keep for some future blog.



PS: I would like to know if there are some tips and tricks on profiling that you've encountered!

Comments

Popular posts from this blog

My blogging workflow feat. Bear App