client_python icon indicating copy to clipboard operation
client_python copied to clipboard

"last" aggregation in Multiprocess_mode (Gauge)

Open singerjess opened this issue 3 years ago • 1 comments

Related to https://github.com/prometheus/client_python/issues/154. I'm working with Fast API + Gunicorn, and I have several cases where I need to display a gauge metric, but only the "last" value received. Not the sum, or the maximum value (for example, the CPU usage of another device that reports that metric to my API).

Currently, the Multiprocess_mode doesn't have a way to provide the "last" value. I think that would be a valuable feature to add.

What I did locally, was add another metric with the timestamp of the last metric added by each node, and then with a custom collector select the process with the higher timestamp, and return the metric associated with that process id. This is a workaround, a cleaner way would be to add a new multiprocess mode to Gauges ("last"), and set the timestamp to the Gauge (currently I see it's null) when that mode is used. Want to check what you think of a potential solution like this before doing a PR.

I include here the code of the "accumulate_metrics" method from my custom collector. (There are a couple of technical details, since I can't add the "last" mode, I use the "liveall" mode and I specify the metric name starts with "last"):

@staticmethod
    def _accumulate_metrics(metrics, accumulate):

        ### Custom code to get last value
        last_pid = -1
        last_timestamp = 0.0
        for index, metric in enumerate(metrics.values()):
            for s in metric.samples:
                name, labels, value, timestamp, exemplar = s
                current_pid = -1
                if name == "process_heartbeat_timestamp":
                    for l in labels:
                        if l[0] == 'pid':
                            current_pid = l[1]
                    if float(value) > float(last_timestamp):
                        last_timestamp = value
                        last_pid = int(current_pid)

        ### End custom code
        for index, metric in enumerate(metrics.values()):
            samples = defaultdict(float)
            buckets = defaultdict(lambda: defaultdict(float))
            samples_setdefault = samples.setdefault
            for s in metric.samples:
                name, labels, value, timestamp, exemplar = s

                if metric.type == 'gauge':
                    without_pid_key = (name, tuple(l for l in labels if l[0] != 'pid'))
                    ### Custom code to get last value
                    if metric._multiprocess_mode == 'liveall' and name.startswith("last"):
                        current_pid = -1
                        for l in labels:
                            if l[0] == 'pid':
                                current_pid = int(l[1])
                        current = samples_setdefault(without_pid_key, value)
                        if current_pid == last_pid:
                            samples[without_pid_key] = value
                    ### End custom code
                    elif metric._multiprocess_mode == 'min':
                        current = samples_setdefault(without_pid_key, value)
                        if value < current:
                            samples[without_pid_key] = value
                    elif metric._multiprocess_mode == 'max':
                        current = samples_setdefault(without_pid_key, value)
                        if value > current:
                            samples[without_pid_key] = value
                    elif metric._multiprocess_mode == 'livesum':
                        samples[without_pid_key] += value
                    else:  # all/liveall
                        samples[(name, labels)] = value


                elif metric.type == 'histogram':
                    # A for loop with early exit is faster than a genexpr
                    # or a listcomp that ends up building unnecessary things
                    for l in labels:
                        if l[0] == 'le':
                            bucket_value = float(l[1])
                            # _bucket
                            without_le = tuple(l for l in labels if l[0] != 'le')
                            buckets[without_le][bucket_value] += value
                            break
                    else:  # did not find the `le` key
                        # _sum/_count
                        samples[(name, labels)] += value
                else:
                    # Counter and Summary.
                    samples[(name, labels)] += value

            # Accumulate bucket values.
            if metric.type == 'histogram':
                for labels, values in buckets.items():
                    acc = 0.0
                    for bucket, value in sorted(values.items()):
                        sample_key = (
                            metric.name + '_bucket', labels + (('le', floatToGoString(bucket)),),)
                        if accumulate:
                            acc += value
                            samples[sample_key] = acc
                        else:
                            samples[sample_key] = value
                    if accumulate:
                        samples[(metric.name + '_count', labels)] = acc

            # Convert to correct sample format.
            metric.samples = [Sample(name_, dict(labels), value) for (name_, labels), value in
                              samples.items()]
        return metrics.values()

singerjess avatar Oct 20 '22 15:10 singerjess

Hello, thank you for this issue, and the example code that others can use now!

I think I would be happy to accept a last aggregation for Gauges, as it does seem to have some real use cases where a custom collector is not ideal. How are you thinking of implementing this? Today I do not think we set the timestamp anywhere, so the db file may need to be modified to include a timestamp field if a last aggregation is used.

csmarchbanks avatar Oct 24 '22 19:10 csmarchbanks

Note for any implementer, you can find how the ruby client implemented this (and some excellent context) in https://github.com/prometheus/client_ruby/pull/172. We would do something similar, and also need to disable inc/dec when using a latest aggregation.

csmarchbanks avatar Mar 13 '23 23:03 csmarchbanks