Add Async::Waiter
WIP
As discussed here I introduce a new class called Waiter which can be a drop-in replacement Barrier. The only difference is that it can await for first N tasks.
Once and if the approach is approved I'll add more specific tests
Questions:
- Better name?
- Replace Barrier?
- How to get rid of warnings in logs when a task raises an exception? See example:
Async do |task|
waiter = Async::Waiter.new
waiter.async do
task.sleep(1)
raise StandardError, "Error!"
end
waiter.async do
task.sleep(2)
2
end
waiter.async do
task.sleep(3)
3
end
waiter.async do
task.sleep(3)
raise StandardError, "Error!"
end
done, pending = waiter.wait_for(2)
done.each do |task|
Console.logger.info(self, "Done result: #{task.wait}")
rescue StandardError => e
Console.logger.info(self, e)
end
pending.each do |task|
Console.logger.info(self, "Pending result: #{task.wait}")
rescue StandardError => e
Console.logger.info(self, e)
end
Console.logger.info(self, "Done")
end
Even though the tasks with raises have eventually been awaited, I still see warnings in the logs:
0.0s warn: Async::Task [oid=0x8c] [ec=0xa0] [pid=453505] [2022-07-30 23:31:16 +0200]
| Task may have ended with unhandled exception.
| StandardError: Error!
| → ./waiter.rb:56 in `block (2 levels) in <main>'
| ./waiter.rb:20 in `block in async'
| /home/zhulik/.asdf/installs/ruby/3.1.2/lib/ruby/gems/3.1.0/gems/async-2.0.3/lib/async/task.rb:255 in `block in schedule'
1.0s info: Object [oid=0xb4] [ec=0xc8] [pid=453505] [2022-07-30 23:31:17 +0200]
| StandardError: Error!
| → ./waiter.rb:56 in `block (2 levels) in <main>'
| ./waiter.rb:20 in `block in async'
| /home/zhulik/.asdf/installs/ruby/3.1.2/lib/ruby/gems/3.1.0/gems/async-2.0.3/lib/async/task.rb:255 in `block in schedule'
1.0s info: Object [oid=0xb4] [ec=0xc8] [pid=453505] [2022-07-30 23:31:17 +0200]
| Done result: 2
2.0s info: Object [oid=0xb4] [ec=0xc8] [pid=453505] [2022-07-30 23:31:18 +0200]
| Pending result: 3
2.0s info: Object [oid=0xb4] [ec=0xc8] [pid=453505] [2022-07-30 23:31:18 +0200]
| StandardError: Error!
| → ./waiter.rb:68 in `block (2 levels) in <main>'
| ./waiter.rb:20 in `block in async'
| /home/zhulik/.asdf/installs/ruby/3.1.2/lib/ruby/gems/3.1.0/gems/async-2.0.3/lib/async/task.rb:255 in `block in schedule'
2.0s info: Object [oid=0xb4] [ec=0xc8] [pid=453505] [2022-07-30 23:31:18 +0200]
| Done
Note: I'm leaving on vacation on August 2, the day after tomorrow and won't be able to code till August 9.
Types of Changes
- New feature.
Testing
- [ ] I added tests for my changes.
- [x] I tested my changes locally.
- [ ] I tested my changes in staging.
- [ ] I tested my changes in production.
This looks good to me - I like using a sub-class since you need to opt into it - there is extra overhead. It will require people know ahead of time they want to wait on a subset of items. Is there any other design we should consider?
I wonder if it's worth looking at how other implementations work and how they handle this particular overhead.
From #62:
The idea of a barrier is that it's stateful, you can call wait(2) several times until it's exhausted.
Does the current implementation enable this feature? Since @done never changes I think #wait(2) will always return the same result, no?
@bruno- Waiter passes all tests written for Barrier. At least for the case when all task are being awaited. For the case when N tasks are being awainted I decided that it's better when all coroutines waiting for a waiter get same N first results
Closing in favour of #189 as discussed.