Can't add custom metrics to Spark

I’m trying to publish custom metrics using Spark CustomMetric API, as supported since Spark 3.2. Spark API documentation.

I have created a custom metric implementing CustomMetric with a default constructor overriding name and description. I have also created a custom task metric implementing CustomTaskMetric with the same name as the CustomMetric class, and initialised this in currentMetricsValues of PartitionReader. The metrics have static values for now.

When I run the application, the corresponding value to the metric is N/A in the Spark history page. I have added logs in the aggregateTaskMetrics and the flow appears to be going through it. The Spark SQLAppStatusListener.aggregateMetrics is loading my class and calling the aggregateTaskMetrics, yet I’m still seeing N/A in the Spark UI page.

Driver log:

23/06/23 19:23:53 INFO Spark32CustomMetric: Spark32CustomMetric in aggregateTaskMetrics start
23/06/23 19:23:53 INFO Spark32CustomMetric: Spark32CustomMetric in aggregateTaskMetrics sum:1234end
+---------+----------+-------+-----------+
|     word|word_count| corpus|corpus_date|
+---------+----------+-------+-----------+
|     LVII|         1|sonnets|          0|
|   augurs|         1|sonnets|          0|
|   dimm'd|         1|sonnets|          0|

Am I missing something? Any help is greatly appreciated.

Spark UI page:

enter image description here

It seems that you have implemented the custom metrics correctly, but there might be a problem with how you are updating the metric values.

Make sure that you are updating the metric values using the CustomTaskMetric.update method within your aggregateTaskMetrics implementation. This method allows you to update the metric value for each task.

Here’s an example of how you can update the metric value within aggregateTaskMetrics:

@Override
public void aggregateTaskMetrics(TaskContext taskContext) {
  // Get the custom metric instance
  CustomMetric customMetric = taskContext.getMetrics().getCustomMetricInstance(Spark32CustomMetric.class.getName());

  // Update the metric value
  customMetric.update(1234L); // Replace 1234L with your actual metric value
}

Ensure that you are passing the correct metric value to the update method. The value you pass should match the type specified in your CustomMetric implementation.

Once you have made these changes, run your application again and check the Spark UI page to see if the metric value is now being displayed correctly.