distributed icon indicating copy to clipboard operation
distributed copied to clipboard

Count task states per task prefix and expose to Prometheus

Open ntabris opened this issue 3 years ago • 7 comments

Keeps an accumulated count on task prefix of number of tasks that have been in each state, and exposes this on Prometheus endpoint.

Here's an example plot of this data in Grafana:

image
  • [x] Tests added / passed
  • [x] Passes pre-commit run --all-files

ntabris avatar Sep 29 '22 19:09 ntabris

Can one of the admins verify this patch?

Admins can comment ok to test to allow this one PR to run or add to allowlist to allow all future PRs from the same author to run.

GPUtester avatar Sep 29 '22 19:09 GPUtester

Aside from the "foo" comment this all seems pretty clean to me. I recommend adding a screenshot of the prometheus chart that this enables for motivation.

cc'ing @fjetter for when he gets back. Maybe also @crusaderky in the meantime?

mrocklin avatar Sep 30 '22 14:09 mrocklin

Supercedes https://github.com/dask/distributed/pull/7083

mrocklin avatar Sep 30 '22 14:09 mrocklin

cc also @jacobtomlinson , although I suspect he won't have much to say about the implementation

mrocklin avatar Sep 30 '22 14:09 mrocklin

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±  0         15 suites  ±0   6h 37m 42s :stopwatch: + 14m 6s   3 155 tests +  2    3 069 :heavy_check_mark: ±  0    85 :zzz: +1  1 :x: +1  23 350 runs  +15  22 441 :heavy_check_mark: +11  908 :zzz: +3  1 :x: +1 

For more details on these failures, see this check.

Results for commit 299ff1de. ± Comparison against base commit 044225ae.

:recycle: This comment has been updated with latest results.

github-actions[bot] avatar Sep 30 '22 15:09 github-actions[bot]

add to allowlist

quasiben avatar Sep 30 '22 16:09 quasiben

The test failure on that's specifically py38 on win doesn't appear to be related to anything I changed. Can someone help with this?

ntabris avatar Sep 30 '22 22:09 ntabris

I am wondering if totals are actually the most useful counter here. Wouldn't the current value be more useful?

If you collect current values, you'll lose what happens between the times that you sample (i.e., scrape the prometheus endpoint).

With a counter, it's easy to get the number of tasks that transitioned for the entire sample interval—plot the difference between the counts. That seems more useful if you're sampling at interval that's relatively large to the duration of task transitions, and it more and more approximates the current value as your sampling (i.e., scraping) interval gets smaller.

ntabris avatar Oct 03 '22 13:10 ntabris

With a counter, it's easy to get the number of tasks that transitioned for the entire sample interval—plot the difference between the counts. That seems more useful if you're sampling at interval that's relatively large to the duration of task transitions, and it more and more approximates the current value as your sampling (i.e., scraping) interval gets smaller.

That's true but I'm typically interested at the current value at a given point in time (i.e. rather a gauge in the prometheus sense). To make that graph you are showing you need to calculate a differences between states, don't you? How does the formula look like that you're plotting?

fjetter avatar Oct 03 '22 13:10 fjetter

That's true but I'm typically interested at the current value at a given point in time (i.e. rather a gauge in the prometheus sense). To make that graph you are showing you need to calculate a differences between states, don't you? How does the formula look like that you're plotting?

Something like increase(dask_scheduler_prefix_state_totals_total{cluster="$cluster",state="processing"}[10s]).

ntabris avatar Oct 03 '22 14:10 ntabris

That's true but I'm typically interested at the current value at a given point in time (i.e. rather a gauge in the prometheus sense). To make that graph you are showing you need to calculate a differences between states, don't you? How does the formula look like that you're plotting?

Something like increase(dask_scheduler_prefix_state_totals_total{cluster="$cluster",state="processing"}[10s]).

So the plot doesn't show prefixes in state processing but rather prefixes that were transitioned to processing in the last 10s. IIUC the following would spike once and then be flat for the entire duration of the computation. Is this what you intend to show?

import time

def do_stuff(x, delay):
    time.sleep(delay)

client.map(do_stuff, range(10), delay=60)

fjetter avatar Oct 03 '22 14:10 fjetter

I think that we actually want to show the derivative of tasks that have transitioned into the memory state. This is closer to what the task stream represents today.

mrocklin avatar Oct 03 '22 21:10 mrocklin

I think that we actually want to show the derivative of tasks that have transitioned into the memory state. This is closer to what the task stream represents today.

This is easy to get with the metrics exposed here.

So the plot doesn't show prefixes in state processing but rather prefixes that were transitioned to processing in the last 10s. IIUC the following would spike once and then be flat for the entire duration of the computation. Is this what you intend to show?

Pretty much. (I think it would actually be transition in last 5s, since with 5s sample interval I think this gives you the difference of two samples that are 5s apart, but I'm not highly confident about this.)

Stepping back, there are three related numbers:

  1. How fast tasks (for a given prefix) are finishing (transitioning into memory state)
  2. How many tasks (for a given prefix) are being processed
  3. How long the tasks take to process

This PR exposes (1). @fjetter, I think you're asking for (2), and noting that (1) and (2) come apart more when the time to complete individual tasks is relatively long. Is that right?

(1) and (2) are also interesting different when you're comparing tasks that take significantly different lengths of time to complete... for example, if you see that task prefix A is completing tasks much faster than task prefix B, you might want to know if this is because B is waiting on A, or because A is just a fast thing to do.

My sense here is that (1) is valuable, and (2) or (3) or both would also be valuable, but that having (2) wouldn't remove the value of also having (1). Is that wrong?

ntabris avatar Oct 03 '22 22:10 ntabris

(2) was done, I think, in https://github.com/dask/distributed/pull/7083

It was good! But I think that this is better. Sampling at 5s felt lossy. This feels like it gives the same information, and without loss. The reason for (2) over (1) I think is that it doesn't require any additional state in the scheduler, but the state here seems fairly innocuous.

mrocklin avatar Oct 04 '22 00:10 mrocklin

How fast tasks (for a given prefix) are finishing (transitioning into memory state)

Yes, I see value in this derivative. Earlier I've been using the scheduler_tasks Gauge to infer progress on a computation and the slope was often a very important diagnostics tool. Thinking about it a bit longer, I'm actually pretty excited about the prospect of having this by task prefix. I think I was thrown off by the graph title which I would choose differently.


What I was thinking about is more along the lines of this scheduler_tasks gauge but split up by prefixes.

An earlier attempt to build visualizations on task groups is for instance this PR https://github.com/dask/distributed/pull/5320 This PR used durations which is a much harder problem since we get duration measurements only for historic data. not sure if we can publish prometheus metrics with a custom timestamp, can we?

A similar but simpler visualization could be built if we simply exposed the current state as a gauge and create such a stacked chart w/out durations (not as powerful but a good first step).

I actually think it is best to just add both and see how we can make use of them. The only problem I see is that we do not have an easy way to manage deprecations with these metrics but I'm OK with that for now.

Explicitly, I am proposing to add

diff --git a/distributed/http/scheduler/prometheus/core.py b/distributed/http/scheduler/prometheus/core.py
index d8a8f00c5..46e962117 100644
--- a/distributed/http/scheduler/prometheus/core.py
+++ b/distributed/http/scheduler/prometheus/core.py
@@ -54,9 +54,17 @@ class SchedulerMetricCollector(PrometheusCollector):
             labels=["task_prefix_name"],
         )

+        prefix_states = GaugeMetricFamily(
+            self.build_name("prefix_states"),
+            "Current number of tasks in a given state by prefix",
+        )
+
         for tp in self.server.task_prefixes.values():
             suspicious_tasks.add_metric([tp.name], tp.suspicious)
+            for st, val in tp.states.items():
+                prefix_states.add_metric([tp.name, st], val)
         yield suspicious_tasks
+        yield prefix_states

With this we could actually also "deprecate" the tasks Gauge since this prefix_states includes the same info (after an aggregation)

I suggest to do this in a separate PR.

fjetter avatar Oct 04 '22 08:10 fjetter

I'm actually pretty excited about the prospect of having this by task prefix.

Yay! I've also updated the example plot in the PR description.

not sure if we can publish prometheus metrics with a custom timestamp, can we?

I'm not currently aware of a good way to do something like that, but I'm thinking about this.

A similar but simpler visualization could be built if we simply exposed the current state as a gauge and create such a stacked chart w/out durations (not as powerful but a good first step). [...] I suggest to do this in a separate PR.

Sounds good, agree that it makes sense to do this separately.

ntabris avatar Oct 04 '22 16:10 ntabris

I've gone through things again and everything seems fine to me. I'm +1 on merging.

However, after reading through comments and thinking a bit more about this I realize that we are losing duration information. I didn't realize this in the first pass. We had this implicitly when we were sampling. I didn't think about how losing sampling was also causing us to lose am implicit weighting on longer tasks.

I expect we'll want something slightly different if we're looking to recreate the task group progress chart (which I do think we want). Looking at the code I think that we maybe already have this state in TaskPrefix.all_durations["compute"]. I suspect that if we were to publish that as a counter that that would give us the chart that we want.

(if so, hopefully @ntabris doesn't mind the previous exercise as a good learning experience)

I think that there are now two options:

  1. If @ntabris is ready to be done then we merge this and handle the rest in follow-up work. I'm +1 to hit the green button.
  2. If @ntabris wants to do a bit more work then we also publish all_durations["compute"] and focus on that for near-term charting.

mrocklin avatar Oct 04 '22 20:10 mrocklin

then we also publish all_durations["compute"] and focus on that for near-term charting

If metrics are cheap then maybe we publish all of all_durations, similar to how we do now with states. This would give us transfer, serialization, and disk costs as Dask sees them. This is maybe a bit redundant with actual hardware metrics, so of only modest value.

mrocklin avatar Oct 04 '22 20:10 mrocklin

I still think this is worth having. The rate of tasks in a given state is still interesting information. I could see average or max rate to be a valuable metric to compare performance related changes and it is definitely a cheap metric to have.

I opened https://github.com/dask/distributed/issues/7107 for a follow up conversation about the task stream

fjetter avatar Oct 05 '22 10:10 fjetter