When we first started building and deploying Flink jobs in our development environment, we found that our jobs were using up a lot more memory that we had expected. The problem was, Flink’s dashboard wasn’t giving us a lot of information regarding real-time JVM memory (heap size, etc.) to work with.
The Flink dashboard does provide basic JVM Memory values, but doesn’t seem to reflect up-to-date memory usage. As such, it was difficult to keep track of what was going on with the application — we needed a quick way to see how much memory was being used by our jobs when we were feeding in data in order to estimate resource requirements going forward.
Reading up on documentation led me to Flink Metrics. Among other things (including being able to register your own metrics from functions that extend the Flink RichFunction), Flink provides System metrics which include CPU Load and JVM Memory consumption.
For metrics reporters, JMX is available by default and all we have to do is to configure Flink to use it. Other reporters include Graphite, InfluxDB, and Prometheus but they’ll need further set-up and more configuration.
We used the below values (configured in flink_conf.yaml) to tell Flink to use the JMX reporter and which port the JMX server should be running on:
metrics.reporters: jmx
metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
metrics.reporter.jmx.port: 8961
metrics.delimiter: .
Since our Flink cluster was running on a Amazon EMR-powered development environment and we wanted to be able to monitor the metrics using a GUI with the least amount of set-up, we added the following configuration to the flink_conf.yaml file:
env.java.opts: -Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.port=9999 -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false -Djava.rmi.server.hostname=hostname.compute-1.amazonaws.com
That’s about it for configuration. Now when we submit a job to the Flink cluster, we see the following lines in the Task Manager Logs:
2019-05-16 18:27:25,931 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - Configuring jmx with {port=8961, class=org.apache.flink.metrics.jmx.JMXReporter}.
2019-05-16 18:27:25,959 INFO org.apache.flink.metrics.jmx.JMXReporter - Started JMX server on port 8961.
2019-05-16 18:27:25,959 INFO org.apache.flink.metrics.jmx.JMXReporter - Configured JMXReporter with {port:8961}
2019-05-16 18:27:25,959 INFO org.apache.flink.runtime.metrics.MetricRegistryImpl - Reporting metrics for reporter jmx of type org.apache.flink.metrics.jmx.JMXReporter.
We chose to use VisualVM as our GUI reporting tool to run against the metrics that Flink produces. Once installed, it was pretty much a two-step process to get the metrics flowing in:
- Add Remote Host — if Flink is running on a YARN cluster, specify the host name of the machine running the particular Flink Task Manager you want to go against.
- Add JMX Connection — specify the port defined by the metrics.reporter.jmx.port configuration. If using a port range, check the Task Manager logs for the entries above to get the specific port used.
After the above, we can now visualize the Metrics we wanted:
Going through this pretty quick set-up was definitely very useful for us to keep track of our jobs. We can visualize memory and CPU usage, generate heap dumps, and helps us gather useful data in sizing our jobs going forward!