"Detach" cluster from Jupyter kernel
When I start a cluster from a Jupyter notebook and then shut down the notebook, the cluster will also shut down. Is there a way to detach a cluster from a notebook kernel?
Background: I would like to use ipyparallel for a long-term parametric study (2w). The basic idea is to
- start n engines from a Jupyter notebook,
- schedule m>>n jobs (via
DirectView.map()?), - leave/shutdown notebook,
- eventually return/restart notebook to post-process results.
Would this be feasible?
Yes! The shutdown_atexit attribute governs whether the cluster should be shutdown during Python's atexit process teardown stage:
c = ipp.Cluster(shutdown_atexit=False)
Or set the attribute at any time later:
cluster.shutdown_atexit = False
This is how the ipcluster start --daemonize command works.
Here's one way to initialize a notebook or script that will connect to an existing cluster, starting it if doesn't exist yet:
import ipyparallel as ipp
# specify cluster_id so we know how to connect in the future
cluster_id = "tk421"
try:
cluster = ipp.Cluster.from_file(cluster_id=cluster_id)
print("Connected to existing cluster")
except FileNotFoundError:
print("Starting new cluster")
cluster = ipp.Cluster(
cluster_id=cluster_id,
n=4,
# leave the cluster running when this process exits
shutdown_atexit=False,
)
cluster.start_cluster_sync()
# same from here, whether we started the cluster or not
rc = cluster.connect_client_sync()
rc.wait_for_engines(4)
print(rc.ids)
Thanks, this was very helpful! 👍
I‘m using ipyparallel.client.view.DirectView.map() to run code on my cluster. Is it possible somehow to restore the ipyparallel.client.asyncresult.AsyncMapResult object as well when you reconnect to an existing cluster (in case you do not have a shared/persistent file system for the results)?
You can use the result database on the Hub for this (if you don't disable it and it doesn't fill up). It's not the most robust API, though.
If you save the msg_ids, you can construct an AsyncHubResult with client.get_result(msg_ids). It won't do the partition-reconstruction of an AsyncMapResult, but you will be able to get all the results. The main difference is that each element in the AsyncHubResult will be a chunk of results (e.g. list of lists) rather than the flat result list.
This workflow could be a lot better, but this works, adding onto the above script:
import json
import os
import time
import ipyparallel as ipp
# specify cluster_id so we know how to connect in the future
cluster_id = "tk421"
try:
cluster = ipp.Cluster.from_file(cluster_id=cluster_id)
print("Connected to existing cluster")
except FileNotFoundError:
print("Starting new cluster")
cluster = ipp.Cluster(
cluster_id=cluster_id,
n=4,
# leave the cluster running when this process exits
shutdown_atexit=False,
)
cluster.start_cluster_sync()
# same from here, whether we started the cluster or not
rc = cluster.connect_client_sync()
rc.wait_for_engines(4)
print(rc.ids)
def task_function(i):
import time # noqa
time.sleep(i)
return i
task_file = "task.json"
if not os.path.isfile(task_file):
# first run, submit the job
print("submitting new job!")
amr = rc[:].map_async(task_function, range(10))
print(f"Saving task file to {task_file}: {amr.msg_ids}")
with open(task_file, "w") as f:
json.dump(amr.msg_ids, f)
print("Interrupt me to stop waiting!")
amr.wait_interactive()
else:
# second run, wait for results
print(f"Loading task from {task_file}")
with open(task_file) as f:
msg_ids = json.load(f)
# get async hub result from the Hub
ar = rc.get_result(msg_ids)
# a reconstructed result will have as each element
# a _chunk_ of results, not individual results, so nest iteration
for result_chunk in ar:
for result in result_chunk:
print(result)
# could also do results = itertools.chain(*list(ar))
Thank you for the solution and the warning.
if ... it doesn't fill up
What kind of database does it use? How much data can it store?
docs are here. It's in-memory by default, and the default limit to start culling records is 1024 tasks or 1GB of results, whichever comes first. These can be configured with c.DictDB.record_limit and c.DictDB.size_limit, respectively. If you use an sqlite database, there is no limit (the task db will grow forever unless you explicitly delete it).