datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

fix: use `JoinSet` to make spawned tasks cancel-safe

Open DDtKey opened this issue 1 year ago • 9 comments

Which issue does this PR close?

Closes #9317 Closes #6513

Disallows tokio::spawn & spawn_blocking, exceptions only in some tests

Rationale for this change

We need to provide cancel-safe interface, and preferably deny tokio::spawn at all

What changes are included in this PR?

Switch to JoinSet and removing of AbortOnDropSingle and AbortOnDropMany

Are these changes tested?

Not sure if there are any ideas how to test this, but existing tests works.

Are there any user-facing changes?

No

DDtKey avatar Feb 22 '24 22:02 DDtKey

Yes, I completely agree and also thought about something like this approach to have a wrapper for these cases. But didn't want to mix this up with the fix.

Just one thought, I think such API is more intuitive:

struct SpawnedTask<T> {
  inner: JoinSet<T>,
}

impl<T: 'static> SpawnedTask<T> {
   // it's constructor, without `self`
   pub fn spawn<F>(task: F) -> Self
    where
        F: Future<Output = T>,
        F: Send + 'static,
        T: Send,
    {
        let mut inner = JoinSet::new();
        inner.spawn(task);
        Self(inner)
    }   
    // and the same for spawn_blocking actually
    
    pub async fn join(mut self) -> Result<T, JoinError> {
        self.inner.join_next().await.expect("instance always have 1 task")
    }
}
  • There is no way to get runtime exception for attempt to spawn one more task
  • join with owned self disallows the instance to be called several times (i.e it guarantees to have only 1 task in its lifecycle) => guaranteed on compile time, because SpawnedTask can be crated only with public methods

I'll prepare changes

DDtKey avatar Feb 23 '24 12:02 DDtKey

Ok, I implemented a wrapper for spawned tasks. Seems reasonable to provide this right away (diff also smaller now) It simplified the code and now we provides a good interface for newcomers

Generally, there is no need in OrderedSpawnedTasks, this would be just an alias for Vec<SpawnedTask<T>>. I.e:

  • single task: use SpawnedTask
  • many unordered: use JoinSet
  • many ordered: use Vec<SpawnedTask>

See https://github.com/apache/arrow-datafusion/pull/9318/commits/fcf70f15f5aeac5552d60293a7e8f1cbae5b461a

DDtKey avatar Feb 23 '24 13:02 DDtKey

cc @alamb @tustvold

DDtKey avatar Feb 23 '24 20:02 DDtKey

The SpawnedTask abstraction looks great! Agreed that your API is more intuitive and Vec<SpawnedTask> is sufficient without an additional wrapper. Thanks again for knocking this out!

devinjdangelo avatar Feb 23 '24 23:02 devinjdangelo

I might be missing something, but what is the issue with the AbortOnDrop interfaces? They seem like less boilerplate than the proposed solution in this PR? The SpawnedTask abstraction seems to do the same thing as AbortOnDrop, so I wonder if we can avoid this being a breaking change?

tustvold avatar Feb 24 '24 00:02 tustvold

Subjectively, but I find the new interface less boilerplate - you have one wrapper which spawns and wraps the task instead of dealing with JoinHandles + AbortOnDrop + tokio::spawn

But in any case, this is not the main point here. Safety is more important. AbortOnDrop didn't provide the same guarantees, and easily can be misused.

We even may see JoinHandles were sent through channels and only then wrapped into this interface. But we can cancel the execution even before receiver part is awaited/reached/task received. Some functions returns JoinHandle - which is kinda confusing, they spawn a task and don't care if it's wrapped safely. Or just a lot of tasks spawned in a loop with await points in between and only then wrapped into AbortOnDropMany, so you probably never will reach the point they are wrapped.

There is no strict rules how to spawn tasks and how to work with them in the current codebase. And I personally encountered cancellation issues several times with datafusion. A mention in documentation of how to work with this just doesn't scale, we still can see sometimes it happens.

So I just believe we need to have a safe way to work with this and intuitive.

Just use SpawnedTask::spawn instead of tokio::spawn and we won't have at least obvious issues. Also compiler + clippy will prevent such code for us.

And as far as I can see it was raised even before, there is a task for that: #6513

DDtKey avatar Feb 24 '24 00:02 DDtKey

. AbortOnDrop didn't provide the same guarantees, and easily can be misused.

How about adding an AbortOnDrop::spawn method that handles this, and potentially deprecate the methods that take a JoinHandle down the line? This would avoid making a breaking change, and is also IMO a more descriptive name for such a construction?

I dunno, I don't feel strongly, but if we can avoid overloading people with yet more new abstractions for tokio-nonsense, I think that would be better :smile:

tustvold avatar Feb 24 '24 01:02 tustvold

deprecate the methods that take a JoinHandle down the line

But a lot of usages were already refactored here https://github.com/apache/arrow-datafusion/pull/6750 🤔 I thought it's a target to disallow working with JoinHandles directly, it was misused too often.

We have a clippy warning and can specify to use SpawnedTask in case anybody would try to use tokio::spawn (see clippy.toml) I guess it should improve dev experience

AbortOnDrop::spawn can work only for single task and it's kinda confusing naming to me (I mean "Drop::spawn"). When you need AbortOnDropMany in most of cases - you just need JoinSet (exception is ordering).

but if we can avoid overloading people with yet more new abstractions for tokio-nonsense

We actually reduced them here, it used to be 3 ours + JoinSet + spawn/JoinHandle itself. Now it's 2 ours (stream wrapper and SpawnedTask), plus JoinSet

Every month we have datafusion releases with breaking changes for users of the crate. Is that such important not to change internal structures which affects only devs? It has good outcomes at least for a product

DDtKey avatar Feb 24 '24 01:02 DDtKey

The naming is definitely negotiable, I'm not making any claims to the truth with the current version.

Subjectively, it looks like this: we spawn a task and then we have SpawnedTask. Since it's only allowed place to spawn, AbortOnDrop seems redundant to me here (there is no way to have task without this semantic). It's more like a documentation of behavior and reason of why we have such wrapper.

DDtKey avatar Feb 24 '24 02:02 DDtKey

Thank you @DDtKey and everyone who reviewed this PR. I thought it looks very nice and easy to use. Thank you all 🙏

alamb avatar Feb 27 '24 13:02 alamb