ipyparallel icon indicating copy to clipboard operation
ipyparallel copied to clipboard

"Detach" cluster from Jupyter kernel

Open lukas-koschmieder opened this issue 4 years ago • 6 comments

When I start a cluster from a Jupyter notebook and then shut down the notebook, the cluster will also shut down. Is there a way to detach a cluster from a notebook kernel?

lukas-koschmieder avatar Nov 09 '21 11:11 lukas-koschmieder

Background: I would like to use ipyparallel for a long-term parametric study (2w). The basic idea is to

  1. start n engines from a Jupyter notebook,
  2. schedule m>>n jobs (via DirectView.map()?),
  3. leave/shutdown notebook,
  4. eventually return/restart notebook to post-process results.

Would this be feasible?

lukas-koschmieder avatar Nov 09 '21 11:11 lukas-koschmieder

Yes! The shutdown_atexit attribute governs whether the cluster should be shutdown during Python's atexit process teardown stage:

c = ipp.Cluster(shutdown_atexit=False)

Or set the attribute at any time later:

cluster.shutdown_atexit = False

This is how the ipcluster start --daemonize command works.

Here's one way to initialize a notebook or script that will connect to an existing cluster, starting it if doesn't exist yet:

import ipyparallel as ipp

# specify cluster_id so we know how to connect in the future
cluster_id = "tk421"

try:
    cluster = ipp.Cluster.from_file(cluster_id=cluster_id)
    print("Connected to existing cluster")
except FileNotFoundError:
    print("Starting new cluster")
    cluster = ipp.Cluster(
        cluster_id=cluster_id,
        n=4,
        # leave the cluster running when this process exits
        shutdown_atexit=False,
    )
    cluster.start_cluster_sync()

# same from here, whether we started the cluster or not
rc = cluster.connect_client_sync()
rc.wait_for_engines(4)
print(rc.ids)

minrk avatar Nov 09 '21 18:11 minrk

Thanks, this was very helpful! 👍

I‘m using ipyparallel.client.view.DirectView.map() to run code on my cluster. Is it possible somehow to restore the ipyparallel.client.asyncresult.AsyncMapResult object as well when you reconnect to an existing cluster (in case you do not have a shared/persistent file system for the results)?

lukas-koschmieder avatar Nov 10 '21 09:11 lukas-koschmieder

You can use the result database on the Hub for this (if you don't disable it and it doesn't fill up). It's not the most robust API, though.

If you save the msg_ids, you can construct an AsyncHubResult with client.get_result(msg_ids). It won't do the partition-reconstruction of an AsyncMapResult, but you will be able to get all the results. The main difference is that each element in the AsyncHubResult will be a chunk of results (e.g. list of lists) rather than the flat result list.

This workflow could be a lot better, but this works, adding onto the above script:

import json
import os
import time

import ipyparallel as ipp

# specify cluster_id so we know how to connect in the future
cluster_id = "tk421"

try:
    cluster = ipp.Cluster.from_file(cluster_id=cluster_id)
    print("Connected to existing cluster")
except FileNotFoundError:
    print("Starting new cluster")
    cluster = ipp.Cluster(
        cluster_id=cluster_id,
        n=4,
        # leave the cluster running when this process exits
        shutdown_atexit=False,
    )
    cluster.start_cluster_sync()

# same from here, whether we started the cluster or not
rc = cluster.connect_client_sync()
rc.wait_for_engines(4)
print(rc.ids)

def task_function(i):
    import time  # noqa
    time.sleep(i)
    return i

task_file = "task.json"

if not os.path.isfile(task_file):
    # first run, submit the job
    print("submitting new job!")
    amr = rc[:].map_async(task_function, range(10))
    print(f"Saving task file to {task_file}: {amr.msg_ids}")
    with open(task_file, "w") as f:
        json.dump(amr.msg_ids, f)
    print("Interrupt me to stop waiting!")
    amr.wait_interactive()

else:
    # second run, wait for results
    print(f"Loading task from {task_file}")
    with open(task_file) as f:
        msg_ids = json.load(f)
    # get async hub result from the Hub
    ar = rc.get_result(msg_ids)
    # a reconstructed result will have as each element
    # a _chunk_ of results, not individual results, so nest iteration
    for result_chunk in ar:
        for result in result_chunk:
            print(result)
    # could also do results = itertools.chain(*list(ar))

minrk avatar Nov 11 '21 15:11 minrk

Thank you for the solution and the warning.

if ... it doesn't fill up

What kind of database does it use? How much data can it store?

lukas-koschmieder avatar Nov 12 '21 10:11 lukas-koschmieder

docs are here. It's in-memory by default, and the default limit to start culling records is 1024 tasks or 1GB of results, whichever comes first. These can be configured with c.DictDB.record_limit and c.DictDB.size_limit, respectively. If you use an sqlite database, there is no limit (the task db will grow forever unless you explicitly delete it).

minrk avatar Nov 12 '21 12:11 minrk