streamz icon indicating copy to clipboard operation
streamz copied to clipboard

add "at_time" source

Open jdye64 opened this issue 6 years ago • 7 comments

I have some optimized Kafka code that I need to run which consumes data from Kafka with C++ and then creates GPU DataFrames (also C++ code) without having to pass each message back to python which with my benchmarks I noted was rather time consuming to create the large number of PyObjects.

It occured to me that if there was a source that acted like a lite version of cron which would allow a user to say something like "trigger this stream every 10 seconds, 3 days, 4 months, Every odd Friday, etc" then my optimized code could be triggered in my map function at my desired interval.

I realize this could be done in the user application by simply calling "emit" at those desired intervals but it seemed much more clean to just offer a source that did the same.

I'm willing to open a PR with this one but wanted to get anyones feedback first.

jdye64 avatar Sep 18 '19 20:09 jdye64

I feel like there is functionality very close to this already, since there are stream nodes (e.g., timed_window) which do things on a timed basis. Would you be interested in implementing the "cron" source? Perhaps it should emit the time, or perhaps some custom payload.

martindurant avatar Sep 19 '19 16:09 martindurant


@Stream.register_api(staticmethod)
class scheduler(Stream):
    """schedu stream.

    Examples:
    s = scheduler()
    s.add_job(name='hello',func = lambda x:'heoo',seconds=5,start_date='2019-04-03 09:25:00')
    s.get_jobs()

    

    s.add_job(func=lambda :print('yahoo'),seconds=5)

    Parameters:
    weeks (int) – number of weeks to wait
    days (int) – number of days to wait
    hours (int) – number of hours to wait
    minutes (int) – number of minutes to wait
    seconds (int) – number of seconds to wait
    start_date (datetime|str) – starting point for the interval calculation
    end_date (datetime|str) – latest possible date/time to trigger on
    timezone (datetime.tzinfo|str) – to use for the date/time calculations.
    jitter (int|None) – advance or delay  by jitter seconds at most.

    """

    def __init__(self, start=True, **kwargs):
        from apscheduler.schedulers.tornado import TornadoScheduler
        from apscheduler.executors.pool import ThreadPoolExecutor
        import pytz

        self._scheduler = TornadoScheduler(
            timezone=pytz.timezone('Asia/Shanghai'),
            executors={'default': ThreadPoolExecutor(20)},
            job_defaults={
                'coalesce': False,
                'max_instances': 1
            }
        )
        super(scheduler, self).__init__(ensure_io_loop=True, **kwargs)
        self.stopped = True
        if start:
            self.start()

    def start(self):
        if self.stopped:
            self.stopped = False
        self._scheduler.start()

    def stop(self):
        self._scheduler.stop()
        self.stopped = True

    def add_job(self, func, name=None, **kwargs):
        """增加任务.

        Example:
         i.add_job(name='hello',func = lambda x:'heoo',
         seconds=5,start_date='2019-04-03 09:25:00')
        """
        return self._scheduler.add_job(
            func=lambda: self._emit(func()),
            name=name,
            id=name,
            trigger='interval',
            **kwargs)

    def remove_job(self, name):
        return self._scheduler.remove_job(job_id=name)

    def get_jobs(self,):
        return self._scheduler.get_jobs()

zjw0358 avatar Oct 16 '19 18:10 zjw0358

@zjw0358 , care to explain this?

martindurant avatar Oct 16 '19 19:10 martindurant

@zjw0358 , care to explain this?

keywords: from apscheduler.schedulers.tornado import TornadoScheduler self._scheduler = TornadoScheduler......

func=lambda: self._emit(func()),

zjw0358 avatar Oct 16 '19 20:10 zjw0358

@zjw0358 So is this something that is already present in Tornado you are saying but we just aren't taking advantage of it in the scheduler? Am I understanding that correctly?

jdye64 avatar Dec 09 '19 01:12 jdye64

@zjw0358 the code is what i need, just wonderful.

miaobainian36 avatar Jan 29 '21 12:01 miaobainian36

Does this code exist in a repo somewhere? streamz now supports registering nodes via entrypoints, so you can make your class more discoverable, but would be happy to mention it in the docs too.

martindurant avatar Jan 29 '21 14:01 martindurant