from contextlib import suppress
import glob
import os.path
import subprocess
import time
from ipyparallel.error import NoEnginesRegistered
from hpc05.client import Client
from hpc05.ssh_utils import setup_ssh
from hpc05.utils import print_same_line
VERBOSE = True
[docs]def watch_file(fname):
fp = open(fname, "r")
while True:
new = fp.readline()
# Once all lines are read this just returns ''
# until the file changes and a new line appears
if new:
yield new.strip()
else:
time.sleep(0.01)
[docs]def watch_stdout(stdout):
text_iter = iter(stdout.readline, b"")
while True:
lines = [
l.strip() for l in next(text_iter).replace("\r", "\n").strip().split("\n")
]
for line in lines:
if line:
yield line
[docs]def wait_for_succesful_start(log_file, timeout=300):
t_start = time.time()
watch = watch_file if isinstance(log_file, str) else watch_stdout
for line in watch(log_file):
print(line) if VERBOSE else print_same_line(line)
if "Engines appear to have started successfully" in line:
break
if "Cluster is already running with" in line:
# Currently not working!
raise Exception(
"Failed to start a ipcluster because a cluster is "
"already running, run "
"`hpc05.kill_remote_ipcluster()`."
)
if time.time() - t_start > timeout:
raise Exception(f"Failed to start a ipcluster in {timeout} seconds.")
time.sleep(0.01)
msg = 'The log-file reports "Engines appear to have started successfully".'
print_same_line(msg, new_line_end=True)
[docs]def start_ipcluster(n, profile, env_path=None, timeout=300):
"""Start an `ipcluster` locally.
Parameters
----------
n : int
Number of engines to be started.
profile : str, default 'pbs'
Profile name of IPython profile.
env_path : str, default=None
Path of the Python environment, '/path/to/ENV/' if Python is in /path/to/ENV/bin/python.
Examples '~/miniconda3/envs/dev/', 'miniconda3/envs/dev', '~/miniconda3'.
Defaults to the environment that is sourced in `.bashrc` or `.bash_profile`.
timeout : int
Time limit after which the connection attempt is cancelled.
Returns
-------
None
"""
log_file_pattern = os.path.expanduser(
f"~/.ipython/profile_{profile}/log/ipcluster-*.log"
)
for f in glob.glob(log_file_pattern):
# Remove old log files.
os.remove(f)
pid_pattern = os.path.expanduser(f"~/.ipython/profile_{profile}/pid/*")
for f in glob.glob(pid_pattern):
# Remove old pid files.
os.remove(f)
ipcluster = "ipcluster"
if env_path:
ipcluster = os.path.join(os.path.expanduser(env_path), "bin", ipcluster)
print(f"Launching {n} engines in a ipcluster.")
cmd = f"{ipcluster} start --profile={profile} --n={n} --log-to-file --daemon &"
# For an unknown reason `subprocess.Popen(cmd.split())` doesn't work when
# running `start_remote_ipcluster` and connecting to it, so we use os.system.
os.system(cmd + ("> /dev/null 2>&1" if not VERBOSE else ""))
for i in range(timeout):
print_same_line(f"Waiting for {i} seconds for the log-file.")
time.sleep(1) # We wait a bit since we need the log file to exist
# We don't PIPE stdout of the process above because we need a detached
# process so we tail the log file.
with suppress(IndexError):
log_file = glob.glob(log_file_pattern)[0]
break
print(f"Found the log-file ({log_file}) in {i} seconds.")
wait_for_succesful_start(log_file, timeout=timeout)
[docs]def start_remote_ipcluster(
n,
profile="pbs",
hostname="hpc05",
username=None,
password=None,
env_path=None,
timeout=300,
):
"""Starts an `ipcluster` over ssh on `hostname` and wait untill it's
successfully started.
Parameters
----------
n : int
Number of engines to be started.
profile : str, default 'pbs'
Profile name of IPython profile.
hostname : str
Hostname of machine where the ipcluster runs.
username : str
Username to log into `hostname`. If not provided, it tries to look it up in
your `.ssh/config`.
password : str
Password for `ssh username@hostname`.
env_path : str, default=None
Path of the Python environment, '/path/to/ENV/' if Python is in `/path/to/ENV/bin/python`.
Examples '~/miniconda3/envs/dev/', 'miniconda3/envs/dev', '~/miniconda3'.
Defaults to the environment that is sourced in `.bashrc` or `.bash_profile`.
timeout : int
Time for which we try to connect to get all the engines.
Returns
-------
None
"""
if env_path is None:
env_path = ""
python_exec = "python"
else:
python_exec = os.path.join(env_path, "bin", "python")
with setup_ssh(hostname, username, password) as ssh:
cmd = f"import hpc05; hpc05.start_ipcluster({n}, '{profile}', '{env_path}', {timeout})"
cmd = f'{python_exec} -c "{cmd}"'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
wait_for_succesful_start(stdout, timeout=timeout)
[docs]def connect_ipcluster(
n,
profile="pbs",
hostname="hpc05",
username=None,
password=None,
culler=True,
culler_args=None,
env_path=None,
local=True,
timeout=300,
folder=None,
client_kwargs=None,
):
"""Connect to an `ipcluster` on the cluster headnode.
Parameters
----------
n : int
Number of engines to be started.
profile : str, default 'pbs'
Profile name of IPython profile.
hostname : str
Hostname of machine where the ipcluster runs. If connecting
via the headnode use: `socket.gethostname()` or set `local=True`.
username : str
Username to log into `hostname`. If not provided, it tries to look it up in
your `.ssh/config`.
password : str
Password for `ssh username@hostname`.
culler : bool
Controls starting of the culler. Default: True.
culler_args : str
Add arguments to the culler. e.g. '--timeout=200'
env_path : str, default: None
Path of the Python environment, '/path/to/ENV/' if Python is in /path/to/ENV/bin/python.
Examples '~/miniconda3/envs/dev/', 'miniconda3/envs/dev', '~/miniconda3'.
Defaults to the environment that is sourced in `.bashrc` or `.bash_profile`.
local : bool, default: True
Connect to the client locally or over ssh. Set it False if
a connection over ssh is needed.
timeout : int
Time for which we try to connect to get all the engines.
folder : str, optional
Folder that is added to the path of the engines, e.g. "~/Work/my_current_project".
client_kwargs : dict
Keyword arguments that are passed to `hpc05.Client()`.
Returns
-------
client : ipython.Client object
An IPyparallel client.
dview : ipyparallel.client.view.DirectView object
Direct view, equivalent to `client[:]`.
lview : ipyparallel.client.view.LoadBalancedView
LoadedBalancedView, equivalent to `client.load_balanced_view()`.
"""
client = Client(
profile=profile,
hostname=hostname,
username=username,
password=password,
culler=culler,
culler_args=culler_args,
env_path=env_path,
local=local,
timeout=timeout,
**(client_kwargs or {}),
)
print("Connected to the `ipcluster` using an `ipyparallel.Client`.")
t_start = time.time()
done = False
n_engines_old = 0
while not done:
n_engines = len(client)
done = n_engines >= n
with suppress(NoEnginesRegistered):
# This can happen, we just need to wait a little longer.
dview = client[:]
t = int(time.time() - t_start)
msg = f"Connected to {n_engines} out of {n} engines after {t} seconds."
print_same_line(msg, new_line_end=(n_engines_old != n_engines))
if t > timeout:
raise Exception(
f"Not all ({n_engines}/{n}) connected after {timeout} seconds."
)
n_engines_old = n_engines
time.sleep(1)
dview.use_dill()
lview = client.load_balanced_view()
if folder is not None:
print(f"Adding {folder} to path.")
cmd = f"import sys, os; sys.path.append(os.path.expanduser('{folder}'))"
dview.execute(cmd).result()
return client, dview, lview
[docs]def start_and_connect(
n,
profile="pbs",
hostname="hpc05",
culler=True,
culler_args=None,
env_path=None,
local=True,
timeout=300,
folder=None,
client_kwargs=None,
kill_old_ipcluster=True,
):
"""Start an `ipcluster` locally and connect to it.
Parameters
----------
n : int
Number of engines to be started.
profile : str, default 'pbs'
Profile name of IPython profile.
hostname : str
Hostname of machine where the ipcluster runs. If connecting
via the headnode use: `socket.gethostname()` or set `local=True`.
culler : bool
Controls starting of the culler. Default: True.
culler_args : str
Add arguments to the culler. e.g. '--timeout=200'
env_path : str, default: None
Path of the Python environment, '/path/to/ENV/' if Python is in /path/to/ENV/bin/python.
Examples '~/miniconda3/envs/dev/', 'miniconda3/envs/dev', '~/miniconda3'.
Defaults to the environment that is sourced in `.bashrc` or `.bash_profile`.
local : bool, default: True
Connect to the client locally or over ssh. Set it False if
a connection over ssh is needed.
timeout : int
Time for which we try to connect to get all the engines.
folder : str, optional
Folder that is added to the path of the engines, e.g. "~/Work/my_current_project".
client_kwargs : dict
Keyword arguments that are passed to `hpc05.Client()`.
kill_old_ipcluster : bool
If True, it cleansup any old instances of `ipcluster` and kills
your jobs in qstat or squeue.
Returns
-------
client : ipython.Client object
An IPyparallel client.
dview : ipyparallel.client.view.DirectView object
Direct view, equivalent to `client[:]`.
lview : ipyparallel.client.view.LoadBalancedView
LoadedBalancedView, equivalent to `client.load_balanced_view()`.
"""
if kill_old_ipcluster:
kill_ipcluster(profile)
print("Killed old intances of ipcluster.")
start_ipcluster(n, profile, env_path, timeout)
# all arguments for `connect_ipcluster` except `username` and `password`.
return connect_ipcluster(
n,
profile=profile,
hostname=hostname,
culler=culler,
culler_args=culler_args,
env_path=env_path,
local=local,
timeout=timeout,
folder=folder,
client_kwargs=client_kwargs,
)
[docs]def start_remote_and_connect(
n,
profile="pbs",
hostname="hpc05",
username=None,
password=None,
culler=True,
culler_args=None,
env_path=None,
timeout=300,
folder=None,
client_kwargs=None,
kill_old_ipcluster=True,
):
"""Start a remote `ipcluster` on `hostname` and connect to it.
Parameters
----------
n : int
Number of engines to be started.
profile : str, default 'pbs'
Profile name of IPython profile.
hostname : str
Hostname of machine where the ipcluster runs. If connecting
via the headnode use: `socket.gethostname()` or set `local=True`.
username : str
Username to log into `hostname`. If not provided, it tries to look it up in
your `.ssh/config`.
password : str
Password for `ssh username@hostname`.
culler : bool
Controls starting of the culler. Default: True.
culler_args : str
Add arguments to the culler. e.g. '--timeout=200'
env_path : str, default: None
Path of the Python environment, '/path/to/ENV/' if Python is in /path/to/ENV/bin/python.
Examples '~/miniconda3/envs/dev/', 'miniconda3/envs/dev', '~/miniconda3'.
Defaults to the environment that is sourced in `.bashrc` or `.bash_profile`.
timeout : int
Time for which we try to connect to get all the engines.
folder : str, optional
Folder that is added to the path of the engines, e.g. "~/Work/my_current_project".
client_kwargs : dict
Keyword arguments that are passed to `hpc05.Client()`.
kill_old_ipcluster : bool
If True, it cleansup any old instances of `ipcluster` and kills
your jobs in qstat or squeue.
Returns
-------
client : ipython.Client object
An IPyparallel client.
dview : ipyparallel.client.view.DirectView object
Direct view, equivalent to `client[:]`.
lview : ipyparallel.client.view.LoadBalancedView
LoadedBalancedView, equivalent to `client.load_balanced_view()`.
"""
if kill_old_ipcluster:
kill_remote_ipcluster(hostname, username, password, env_path)
print("Killed old intances of ipcluster.")
start_remote_ipcluster(n, profile, hostname, username, password, env_path, timeout)
time.sleep(2)
# all arguments for `connect_ipcluster` except `local`.
return connect_ipcluster(
n,
profile=profile,
hostname=hostname,
username=username,
password=password,
culler=culler,
culler_args=culler_args,
env_path=env_path,
local=False,
timeout=timeout,
folder=folder,
client_kwargs=client_kwargs,
)
[docs]def kill_ipcluster(name=None):
"""Kill your ipcluster and cleanup the files.
This should do the same as the following bash function (recommended:
add this in your `.bash_profile` / `.bashrc`):
```bash
del() {
qselect -u $USER | xargs qdel
rm -f *.hpc05.hpc* ipengine* ipcontroller* pbs_*
pkill -f hpc05_culler 2> /dev/null
pkill -f ipcluster 2> /dev/null
pkill -f ipengine 2> /dev/null
pkill -f ipyparallel.controller 2> /dev/null
pkill -f ipyparallel.engines 2> /dev/null
}
```
"""
clean_up_cmds = [
"qselect -u $USER | xargs qdel",
"rm -f *.hpc05.hpc* ipengine* ipcontroller* pbs_*",
"pkill -f hpc05_culler",
"pkill -f ipcluster",
"pkill -f ipengine",
"pkill -f ipyparallel.controller",
"pkill -f ipyparallel.engines",
"scancel --name='ipy-engine-' --user=$USER", # SLURM
"scancel --name='ipy-controller-' --user=$USER", # SLURM
]
if name is not None:
clean_up_cmds.append(f"scancel --name='{name}' --user=$USER")
clean_up_cmds = [cmd + " 2> /dev/null" for cmd in clean_up_cmds]
for cmd in clean_up_cmds:
process = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True)
process.wait()
[docs]def kill_remote_ipcluster(
hostname="hpc05", username=None, password=None, env_path=None
):
"""Kill your remote ipcluster and cleanup the files.
This should do the same as the following bash function (recommended:
add this in your `.bash_profile` / `.bashrc`):
```bash
del() {
qselect -u $USER | xargs qdel
rm -f *.hpc05.hpc* ipengine* ipcontroller* pbs_*
pkill -f hpc05_culler 2> /dev/null
pkill -f ipcluster 2> /dev/null
pkill -f ipengine 2> /dev/null
pkill -f ipyparallel.controller 2> /dev/null
pkill -f ipyparallel.engines 2> /dev/null
}
```
"""
if env_path is None:
env_path = ""
python_exec = "python"
else:
python_exec = os.path.join(env_path, "bin", "python")
with setup_ssh(hostname, username, password) as ssh:
cmd = f"import hpc05; hpc05.connect.kill_ipcluster()"
cmd = f'{python_exec} -c "{cmd}"'
stdin, stdout, stderr = ssh.exec_command(cmd, get_pty=True)
with suppress(Exception):
lines = stdout.readlines()
for line in lines:
print(line)