"last" aggregation in Multiprocess_mode (Gauge)
Related to https://github.com/prometheus/client_python/issues/154. I'm working with Fast API + Gunicorn, and I have several cases where I need to display a gauge metric, but only the "last" value received. Not the sum, or the maximum value (for example, the CPU usage of another device that reports that metric to my API).
Currently, the Multiprocess_mode doesn't have a way to provide the "last" value. I think that would be a valuable feature to add.
What I did locally, was add another metric with the timestamp of the last metric added by each node, and then with a custom collector select the process with the higher timestamp, and return the metric associated with that process id. This is a workaround, a cleaner way would be to add a new multiprocess mode to Gauges ("last"), and set the timestamp to the Gauge (currently I see it's null) when that mode is used. Want to check what you think of a potential solution like this before doing a PR.
I include here the code of the "accumulate_metrics" method from my custom collector. (There are a couple of technical details, since I can't add the "last" mode, I use the "liveall" mode and I specify the metric name starts with "last"):
@staticmethod
def _accumulate_metrics(metrics, accumulate):
### Custom code to get last value
last_pid = -1
last_timestamp = 0.0
for index, metric in enumerate(metrics.values()):
for s in metric.samples:
name, labels, value, timestamp, exemplar = s
current_pid = -1
if name == "process_heartbeat_timestamp":
for l in labels:
if l[0] == 'pid':
current_pid = l[1]
if float(value) > float(last_timestamp):
last_timestamp = value
last_pid = int(current_pid)
### End custom code
for index, metric in enumerate(metrics.values()):
samples = defaultdict(float)
buckets = defaultdict(lambda: defaultdict(float))
samples_setdefault = samples.setdefault
for s in metric.samples:
name, labels, value, timestamp, exemplar = s
if metric.type == 'gauge':
without_pid_key = (name, tuple(l for l in labels if l[0] != 'pid'))
### Custom code to get last value
if metric._multiprocess_mode == 'liveall' and name.startswith("last"):
current_pid = -1
for l in labels:
if l[0] == 'pid':
current_pid = int(l[1])
current = samples_setdefault(without_pid_key, value)
if current_pid == last_pid:
samples[without_pid_key] = value
### End custom code
elif metric._multiprocess_mode == 'min':
current = samples_setdefault(without_pid_key, value)
if value < current:
samples[without_pid_key] = value
elif metric._multiprocess_mode == 'max':
current = samples_setdefault(without_pid_key, value)
if value > current:
samples[without_pid_key] = value
elif metric._multiprocess_mode == 'livesum':
samples[without_pid_key] += value
else: # all/liveall
samples[(name, labels)] = value
elif metric.type == 'histogram':
# A for loop with early exit is faster than a genexpr
# or a listcomp that ends up building unnecessary things
for l in labels:
if l[0] == 'le':
bucket_value = float(l[1])
# _bucket
without_le = tuple(l for l in labels if l[0] != 'le')
buckets[without_le][bucket_value] += value
break
else: # did not find the `le` key
# _sum/_count
samples[(name, labels)] += value
else:
# Counter and Summary.
samples[(name, labels)] += value
# Accumulate bucket values.
if metric.type == 'histogram':
for labels, values in buckets.items():
acc = 0.0
for bucket, value in sorted(values.items()):
sample_key = (
metric.name + '_bucket', labels + (('le', floatToGoString(bucket)),),)
if accumulate:
acc += value
samples[sample_key] = acc
else:
samples[sample_key] = value
if accumulate:
samples[(metric.name + '_count', labels)] = acc
# Convert to correct sample format.
metric.samples = [Sample(name_, dict(labels), value) for (name_, labels), value in
samples.items()]
return metrics.values()
Hello, thank you for this issue, and the example code that others can use now!
I think I would be happy to accept a last aggregation for Gauges, as it does seem to have some real use cases where a custom collector is not ideal. How are you thinking of implementing this? Today I do not think we set the timestamp anywhere, so the db file may need to be modified to include a timestamp field if a last aggregation is used.
Note for any implementer, you can find how the ruby client implemented this (and some excellent context) in https://github.com/prometheus/client_ruby/pull/172. We would do something similar, and also need to disable inc/dec when using a latest aggregation.