add "at_time" source
I have some optimized Kafka code that I need to run which consumes data from Kafka with C++ and then creates GPU DataFrames (also C++ code) without having to pass each message back to python which with my benchmarks I noted was rather time consuming to create the large number of PyObjects.
It occured to me that if there was a source that acted like a lite version of cron which would allow a user to say something like "trigger this stream every 10 seconds, 3 days, 4 months, Every odd Friday, etc" then my optimized code could be triggered in my map function at my desired interval.
I realize this could be done in the user application by simply calling "emit" at those desired intervals but it seemed much more clean to just offer a source that did the same.
I'm willing to open a PR with this one but wanted to get anyones feedback first.
I feel like there is functionality very close to this already, since there are stream nodes (e.g., timed_window) which do things on a timed basis. Would you be interested in implementing the "cron" source? Perhaps it should emit the time, or perhaps some custom payload.
@Stream.register_api(staticmethod)
class scheduler(Stream):
"""schedu stream.
Examples:
s = scheduler()
s.add_job(name='hello',func = lambda x:'heoo',seconds=5,start_date='2019-04-03 09:25:00')
s.get_jobs()
s.add_job(func=lambda :print('yahoo'),seconds=5)
Parameters:
weeks (int) – number of weeks to wait
days (int) – number of days to wait
hours (int) – number of hours to wait
minutes (int) – number of minutes to wait
seconds (int) – number of seconds to wait
start_date (datetime|str) – starting point for the interval calculation
end_date (datetime|str) – latest possible date/time to trigger on
timezone (datetime.tzinfo|str) – to use for the date/time calculations.
jitter (int|None) – advance or delay by jitter seconds at most.
"""
def __init__(self, start=True, **kwargs):
from apscheduler.schedulers.tornado import TornadoScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
import pytz
self._scheduler = TornadoScheduler(
timezone=pytz.timezone('Asia/Shanghai'),
executors={'default': ThreadPoolExecutor(20)},
job_defaults={
'coalesce': False,
'max_instances': 1
}
)
super(scheduler, self).__init__(ensure_io_loop=True, **kwargs)
self.stopped = True
if start:
self.start()
def start(self):
if self.stopped:
self.stopped = False
self._scheduler.start()
def stop(self):
self._scheduler.stop()
self.stopped = True
def add_job(self, func, name=None, **kwargs):
"""增加任务.
Example:
i.add_job(name='hello',func = lambda x:'heoo',
seconds=5,start_date='2019-04-03 09:25:00')
"""
return self._scheduler.add_job(
func=lambda: self._emit(func()),
name=name,
id=name,
trigger='interval',
**kwargs)
def remove_job(self, name):
return self._scheduler.remove_job(job_id=name)
def get_jobs(self,):
return self._scheduler.get_jobs()
@zjw0358 , care to explain this?
@zjw0358 , care to explain this?
keywords: from apscheduler.schedulers.tornado import TornadoScheduler self._scheduler = TornadoScheduler......
func=lambda: self._emit(func()),
@zjw0358 So is this something that is already present in Tornado you are saying but we just aren't taking advantage of it in the scheduler? Am I understanding that correctly?
@zjw0358 the code is what i need, just wonderful.
Does this code exist in a repo somewhere? streamz now supports registering nodes via entrypoints, so you can make your class more discoverable, but would be happy to mention it in the docs too.