import os, subprocess, time, logging, shutil, copy
import numpy as np
from flarestack.shared import (
fs_dir,
log_dir,
fs_scratch_dir,
make_analysis_pickle,
host_server,
inj_dir_name,
name_pickle_output_dir,
cluster_dir,
)
from flarestack.core.multiprocess_wrapper import run_multiprocess
from flarestack.core.minimisation import MinimisationHandler
from flarestack.core.results import ResultsHandler
logger = logging.getLogger(__name__)
[docs]class Submitter(object):
submitter_dict = dict()
[docs] def __init__(
self,
mh_dict,
use_cluster,
n_cpu=None,
do_sensitivity_scale_estimation=False,
remove_old_results=False,
**cluster_kwargs,
):
"""
A class that takes care of submitting the trial calculations.
Also can estimate the sensitivity scale before submitting.
:param mh_dict: dict, MinimisationHandler dictionary
:param use_cluster: bool, whether to run the trials locally or on the cluster
:param n_cpu: int, number of cores to use
:param do_sensitivity_scale_estimation: str, containing 'asimov', 'quick_injections' or both
:param remove_old_results: bool, if True will delete directories containing injection values and pickled
results from previous trials
:param cluster_kwargs: keyword arguments used by the cluster
"""
self.mh_dict = copy.deepcopy(mh_dict)
self.use_cluster = use_cluster
self.n_cpu = os.cpu_count() - 1 if isinstance(n_cpu, type(None)) else n_cpu
self.job_id = None
self.remove_old_results = remove_old_results
self.do_sensitivity_scale_estimation = do_sensitivity_scale_estimation
self.sens_guess = self.disc_guess = None
self.successful_guess_by_quick_injections = False
self.cluster_kwargs = cluster_kwargs
def __str__(self):
s = (
f'\n----- Submitter for {self.mh_dict["name"]} -----\n'
f'{"" if self.use_cluster else "not "}using cluster \n'
f"using {self.n_cpu} CPUs locally\n"
f"job-id: {self.job_id} \n"
f'{self.do_sensitivity_scale_estimation if self.do_sensitivity_scale_estimation else "no"} '
f"scale estimation \n"
)
if self.cluster_kwargs:
s += "cluster kwargs: \n"
for k, v in self.cluster_kwargs.items():
s += f" {k}: {v} \n"
return s
[docs] def submit_cluster(self, mh_dict):
"""Splits the trials into jobs and submits them to be calculated on the cluster"""
raise NotImplementedError
[docs] def submit_local(self, mh_dict):
"""Uses the MultiprocessWrapper to split the trials into jobs and run them locally"""
# max CPU number is all but one
make_analysis_pickle(mh_dict)
n_cpu = min(self.n_cpu, os.cpu_count() - 1)
run_multiprocess(n_cpu=n_cpu, mh_dict=mh_dict)
[docs] def submit(self, mh_dict):
if self.remove_old_results:
self._clean_injection_values_and_pickled_results(self.mh_dict["name"])
if self.use_cluster:
self.submit_cluster(mh_dict)
else:
self.submit_local(mh_dict)
[docs] def wait_for_job(self):
"""Waits until the cluster is finished processing the job with the ID self.job_id"""
raise NotImplementedError
# @staticmethod
# def _wait_for_cluster(job_ids=None):
# raise NotImplementedError
[docs] @staticmethod
def get_pending_ids():
raise NotImplementedError
[docs] @staticmethod
def wait_for_cluster(job_ids=None):
"""
Waits until the cluster is done. Wait for all jobs if job_ids is None or give a list of IDs
:param job_ids: list, optional, if given, specifies the IDs of the obs that will be waited on
"""
# If no job IDs are specified, get all IDs currently listed for this user
cls = Submitter.get_submitter_class()
if not job_ids:
# job_ids = np.unique(cls.get_ids(DESYSubmitter.status_cmd))
job_ids = cls.get_pending_ids()
for id in job_ids:
logger.info(f"waiting for job {id}")
# create a submitter, it does not need the mh_dict when no functions are calles
s = cls(None, None)
s.job_id = id # set the right job_id
s.wait_for_job() # use the built-in function to wait for completion of that job
@property
def _quick_injections_name(self):
name = self.mh_dict["name"]
return f"{name if not name.endswith(os.sep) else name[:-1]}_quick_injection/"
[docs] def run_quick_injections_to_estimate_sensitivity_scale(self):
"""
Roughly estimates the injection scale in order to find a better scale range.
The quick injection trials are run locally.
Note that a scale still has to be given in the mh_dict as a first estimate.
"""
logger.info(f"doing quick trials to estimate scale")
if self.mh_dict["mh_name"] == "fit_weights":
raise NotImplementedError(
"This method does not work with the fit_weights MinimizationHandler "
"because it assumes a background TS distribution median of zero! "
"Be the hero to think of something!"
)
# The given scale will serve as an initial guess
guess = self.mh_dict["scale"] if not self.disc_guess else self.disc_guess
# make sure
self._clean_injection_values_and_pickled_results(self._quick_injections_name)
# repeat the guessing until success:
while not self.successful_guess_by_quick_injections:
quick_injections_mh_dict = dict(self.mh_dict)
quick_injections_mh_dict["name"] = self._quick_injections_name
quick_injections_mh_dict["background_ntrials_factor"] = 1
quick_injections_mh_dict["n_trials"] = 10
quick_injections_mh_dict["scale"] = guess
self.submit_local(quick_injections_mh_dict)
# collect the quick injections
quick_injections_rh = ResultsHandler(
quick_injections_mh_dict, do_sens=False, do_disc=False
)
# guess the disc and sens scale
(
self.disc_guess,
self.sens_guess,
) = quick_injections_rh.estimate_sens_disc_scale()
if any((g < 0) or (g > guess) for g in [self.disc_guess, self.sens_guess]):
logger.info(
f"Could not perform scale guess because "
f"at least one guess outside [0, {guess}]! "
f"Adjusting accordingly."
)
guess = abs(max((self.sens_guess, self.disc_guess)) * 1.5)
elif guess > 5 * self.disc_guess:
logger.info(
f"Could not perform scale guess beause "
f"initial scale guess {guess} much larger than "
f"disc scale guess {self.disc_guess}. "
f"Adjusting initial guess to {4 * self.disc_guess} and retry."
)
guess = 4 * abs(self.disc_guess)
else:
logger.info("Scale guess successful. Adjusting injection scale.")
self.successful_guess_by_quick_injections = True
self._clean_injection_values_and_pickled_results(quick_injections_rh.name)
@staticmethod
def _clean_injection_values_and_pickled_results(name):
"""
Removes directories containing injection values and pickled results
:param name: str, the path used in the minimisation handler dictionary (mh_dict)
"""
directories = [name_pickle_output_dir(name), inj_dir_name(name)]
for d in directories:
if os.path.isdir(d):
logger.debug(f"removing {d}")
shutil.rmtree(d)
else:
logger.warning(f"Can not remove {d}! It is not a directory!")
[docs] def do_asimov_scale_estimation(self):
"""estimate the injection scale using Asimov estimation"""
logger.info("doing asimov estimation")
mh = MinimisationHandler.create(self.mh_dict)
scale_estimate = mh.guess_scale()
logger.debug(f"estimated scale: {scale_estimate}")
self.disc_guess = scale_estimate
self.sens_guess = 0.3 * self.disc_guess
[docs] def analyse(self, do_disc=False):
"""
Submits the minimisation handler dictionary (self.mh_dict) to be analysed.
This happens locally if self.use_cluster == False.
:param do_disc: bool, if True, use the estimated discovery potential as
the injection scale instead of the sensitivity.
"""
if self.do_sensitivity_scale_estimation:
if "asimov" in self.do_sensitivity_scale_estimation:
self.do_asimov_scale_estimation()
if "quick_injections" in self.do_sensitivity_scale_estimation:
self.run_quick_injections_to_estimate_sensitivity_scale()
if not do_disc:
self.mh_dict["scale"] = self.sens_guess / 0.5
else:
self.mh_dict["scale"] = self.disc_guess / 0.5
self.submit(self.mh_dict)
[docs] @classmethod
def register_submitter_class(cls, server_name):
"""Adds a new subclass of Submitter, with class name equal to "server_name"."""
def decorator(subclass):
cls.submitter_dict[server_name] = subclass
return subclass
return decorator
[docs] @classmethod
def get_submitter(cls, *args, **kwargs):
"""
Get an initialised instance of the Submitter class suited for the
used server.
:param args: arguments passed to teh submitter
:param kwargs: keyword arguments passed to the submitter
:return: instance of Submitter subclass
"""
return Submitter.get_submitter_class()(*args, **kwargs)
[docs] @classmethod
def get_submitter_class(cls):
"""Get the Submitter class suited for the used server."""
if host_server not in cls.submitter_dict:
logger.warning(
f"No submitter implemented for host server {host_server}! "
f"Using LocalSubmitter but you wont't be able to use cluster operations!"
)
return cls.submitter_dict["local"]
return cls.submitter_dict[host_server]
[docs]@Submitter.register_submitter_class("local")
class LocalSubmitter(Submitter):
[docs] def __init__(
self,
mh_dict,
use_cluster,
n_cpu=None,
do_sensitivity_scale_estimation=False,
**cluster_kwargs,
):
if use_cluster:
raise NotImplementedError(
"No cluster operation implemented because you are using the LocalSubmitter!"
)
super(LocalSubmitter, self).__init__(
mh_dict,
use_cluster,
n_cpu,
do_sensitivity_scale_estimation,
**cluster_kwargs,
)
[docs]@Submitter.register_submitter_class("DESY")
class DESYSubmitter(Submitter):
submit_file = os.path.join(cluster_dir, "SubmitDESY.sh")
username = os.path.basename(os.environ["HOME"])
status_cmd = f"qstat -u {username}"
submit_cmd = "qsub"
root_dir = os.path.dirname(fs_dir[:-1])
[docs] def __init__(self, mh_dict, use_cluster, n_cpu=None, **cluster_kwargs):
"""
Initialises a DESYSubmitter instance.
:param mh_dict: the MinimisationHandler dict
:type mh_dict: dict
:param use_cluster: wjether to use the cluster
:type use_cluster: bool
:param n_cpu: how many CPUs to use on the local machine
:type n_cpu: int
:param cluster_kwargs: keyword arguments for the cluster, available are:
h_cpu in the form "hh:mm:ss": how long the cluster jobs run
trials_per_task: int, how many trials to run per cluster job
cluster_cpu: int, how many CPUs to use on the cluster machines
ram_per_core in the form "<number>G": e.g. 6G to use 6GB RAM for each cluster job
"""
super(DESYSubmitter, self).__init__(
mh_dict, use_cluster, n_cpu, **cluster_kwargs
)
# extract information that will be used by the cluster script
self.h_cpu = self.cluster_kwargs.get("h_cpu", "23:59:00")
self.trials_per_task = self.cluster_kwargs.get("trials_per_task", 1)
self.cluster_cpu = self.cluster_kwargs.get("cluster_cpu", self.n_cpu)
self.ram_per_core = self.cluster_kwargs.get(
"ram_per_core", "{0:.1f}G".format(6.0 / float(self.cluster_cpu) + 2.0)
)
self.remove_old_logs = self.cluster_kwargs.get("remove_old_logs", True)
self.manual_submit = self.cluster_kwargs.get("manual_submit", False)
if not self.manual_submit:
if shutil.which(DESYSubmitter.submit_cmd) is None:
logger.warning(
f"Submit command {DESYSubmitter.submit_cmd} is not available on the current host. Forcing 'manual_submit' mode."
)
self.manual_submit = True
@staticmethod
def _qstat_output(qstat_command):
"""return the output of the qstat_command"""
# start a subprocess to query the cluster
process = subprocess.Popen(qstat_command, stdout=subprocess.PIPE, shell=True)
# read the output
tmp = process.stdout.read().decode()
return str(tmp)
[docs] @staticmethod
def get_ids(qstat_command):
"""Takes a command that queries the DESY cluster and returns a list of job IDs"""
st = DESYSubmitter._qstat_output(qstat_command)
# If the output is an empty string there are no tasks left
if st == "":
ids = list()
else:
# Extract the list of job IDs
ids = np.array([int(s.split(" ")[2]) for s in st.split("\n")[2:-1]])
return ids
def _ntasks_from_qstat_command(self, qstat_command):
"""Returns the number of tasks from the output of qstat_command"""
# get the output of qstat_command
ids = self.get_ids(qstat_command)
ntasks = 0 if len(ids) == 0 else len(ids[ids == self.job_id])
return ntasks
@property
def ntasks_total(self):
"""Returns the total number of tasks"""
return self._ntasks_from_qstat_command(DESYSubmitter.status_cmd)
@property
def ntasks_running(self):
"""Returns the number of running tasks"""
return self._ntasks_from_qstat_command(DESYSubmitter.status_cmd + " -s r")
[docs] def wait_for_job(self):
if self.job_id:
time.sleep(10)
i = 31
j = 6
while self.ntasks_total != 0:
if i > 3:
logger.info(
f"{time.asctime(time.localtime())} - Job{self.job_id}:"
f" {self.ntasks_total} entries in queue. "
f"Of these, {self.ntasks_running} are running tasks, and "
f"{self.ntasks_total - self.ntasks_running} are tasks still waiting to be executed."
)
i = 0
j += 1
if j > 7:
logger.info(self._qstat_output(self.status_cmd))
j = 0
time.sleep(30)
i += 1
else:
logger.info(f"No Job ID!")
[docs] def make_cluster_submission_script(self):
"""Produces the shell script used to run on the DESY cluster."""
flarestack_scratch_dir = os.path.dirname(fs_scratch_dir[:-1]) + "/"
text = (
"#!/bin/zsh \n"
"## \n"
"##(otherwise the default shell would be used) \n"
"#$ -S /bin/zsh \n"
"## \n"
"##(the running time for this job) \n"
f"#$ -l h_cpu={self.h_cpu} \n"
"#$ -l h_rss=" + str(self.ram_per_core) + "\n"
"## \n"
"## \n"
"##(send mail on job's abort) \n"
"#$ -m a \n"
"## \n"
"##(stderr and stdout are merged together to stdout) \n"
"#$ -j y \n"
"## \n"
"## name of the job \n"
"## -N Flarestack script " + DESYSubmitter.username + " \n"
"## \n"
"##(redirect output to:) \n"
"#$ -o /dev/null \n"
"## \n"
"sleep $(( ( RANDOM % 60 ) + 1 )) \n"
'exec > "$TMPDIR"/${JOB_ID}_stdout.txt '
'2>"$TMPDIR"/${JOB_ID}_stderr.txt \n'
"eval $(/cvmfs/icecube.opensciencegrid.org/py3-v4.1.0/setup.sh) \n"
"export PYTHONPATH=" + DESYSubmitter.root_dir + "/ \n"
"export FLARESTACK_SCRATCH_DIR=" + flarestack_scratch_dir + " \n"
"python " + fs_dir + "core/multiprocess_wrapper.py -f $1 -n $2 \n"
"cp $TMPDIR/${JOB_ID}_stdout.txt " + log_dir + "\n"
"cp $TMPDIR/${JOB_ID}_stderr.txt " + log_dir + "\n "
)
logger.info("Creating file at {0}".format(DESYSubmitter.submit_file))
with open(DESYSubmitter.submit_file, "w") as f:
f.write(text)
logger.debug("Bash file created: \n {0}".format(text))
cmd = "chmod +x " + DESYSubmitter.submit_file
os.system(cmd)
[docs] def submit_cluster(self, mh_dict):
"""Submits the job to the cluster"""
# if specified, remove old logs from log directory
if self.remove_old_logs:
self.clear_log_dir()
# Get the number of tasks that will have to be submitted in order to get ntrials
ntrials = mh_dict["n_trials"]
n_tasks = int(ntrials / self.trials_per_task)
logger.debug(f"running {ntrials} trials in {n_tasks} tasks")
# The mh_dict will be submitted n_task times and will perform mh_dict['n_trials'] each time.
# Therefore we have to adjust mh_dict['n_trials'] in order to actually perform the number
# specified in self.mh_dict['n_trials']
mh_dict["n_trials"] = self.trials_per_task
path = make_analysis_pickle(mh_dict)
# assemble the submit command
submit_cmd = DESYSubmitter.submit_cmd
if self.cluster_cpu > 1:
submit_cmd += " -pe multicore {0} -R y".format(self.cluster_cpu)
submit_cmd += (
f" -t 1-{n_tasks}:1 {DESYSubmitter.submit_file} {path} {self.cluster_cpu}"
)
logger.info(f"Ram per core: {self.ram_per_core}")
logger.info(f"{time.asctime(time.localtime())}: {submit_cmd}")
self.make_cluster_submission_script()
if not self.manual_submit:
process = subprocess.Popen(submit_cmd, stdout=subprocess.PIPE, shell=True)
msg = process.stdout.read().decode()
logger.info(str(msg))
self.job_id = int(str(msg).split("job-array")[1].split(".")[0])
else:
input(
f"Running in 'manual_submit' mode. Login to a submission host and launch the following command:\n"
f"{submit_cmd}\n"
f"Press enter to continue after the jobs are finished.\n"
f"[ENTER]"
)
# @staticmethod
# def _wait_for_cluster(job_ids=None):
# """Waits until the cluster is done. Wait for all jobs if job_ids is None or give a list of IDs"""
# # If no job IDs are specified, get all IDs currently listed for this user
# if not job_ids:
# job_ids = np.unique(DESYSubmitter.get_ids(DESYSubmitter.status_cmd))
#
# for id in job_ids:
# logger.info(f'waiting for job {id}')
# # create a submitter, it does not need the mh_dict when no functions are calles
# s = DESYSubmitter(None, None)
# s.job_id = id # set the right job_id
# s.wait_for_job() # use the built-in function to wait for completion of that job
[docs] @staticmethod
def get_pending_ids():
return np.unique(np.unique(DESYSubmitter.get_ids(DESYSubmitter.status_cmd)))
[docs] @staticmethod
def clear_log_dir():
for f in os.listdir(log_dir):
ff = f"{log_dir}/{f}"
logger.debug(f"removing {ff}")
os.remove(ff)
[docs]@Submitter.register_submitter_class("WIPAC")
class WIPACSubmitter(Submitter):
wipac_cluster_dir = os.path.join(cluster_dir, "WIPAC")
home_dir = os.environ["HOME"]
username = os.path.basename(home_dir)
status_cmd = f"condor_q {username}"
root_dir = os.path.dirname(fs_dir[:-1])
scratch_on_nodes = f"/scratch/{username}"
[docs] def __init__(self, *args, **kwargs):
"""
A class that takes care of submitting the trial calculations.
Also can estimate the sensitivity scale before submitting.
:param args: arguments to be passed to Submitter class
:param kwargs: keyword arguments used by the cluster, available are:
manual_submit: bool, if True only prints out the location of the submit file without actually
submitting to the cluster
trials_per_task: int, how many trials to run per cluster job
cluster_cpu: int, how many CPUs to use per cluster job
ram_per_core: float, how much RAM for each cluster jobs in MB
"""
super(WIPACSubmitter, self).__init__(*args, **kwargs)
from flarestack.data.icecube import icecube_dataset_dir
self.icecube_dataset_dir = icecube_dataset_dir
self.manual_submit = self.cluster_kwargs.get("manual_submit", False)
self.trials_per_task = self.cluster_kwargs.get("trials_per_task", 1)
self.cluster_cpu = self.cluster_kwargs.get("cluster_cpu", self.n_cpu)
self.ram_per_core = self.cluster_kwargs.get("ram_per_core", "2000")
self.cluster_files_directory = os.path.join(
WIPACSubmitter.wipac_cluster_dir,
self.mh_dict["name"] if self.mh_dict else "",
)
self.submit_file = os.path.join(self.cluster_files_directory, "job.submit")
self.executable_file = os.path.join(self.cluster_files_directory, "job.sh")
self.submit_cmd = (
f"ssh {WIPACSubmitter.username}@submit-1.icecube.wisc.edu "
f"'condor_submit " + self.submit_file + "'"
)
self._status_output = None
[docs] def make_executable_file(self, path):
"""
Produces the executable that will be submitted to the NPX cluster.
:param path: str, path to the file
"""
flarestack_scratch_dir = os.path.dirname(fs_scratch_dir[:-1]) + "/"
txt = (
f"#!/bin/sh \n"
f"eval $(/cvmfs/icecube.opensciencegrid.org/py3-v4.2.0/setup.sh) \n"
f"export PYTHONPATH={WIPACSubmitter.root_dir}/ \n"
f"export FLARESTACK_SCRATCH_DIR={flarestack_scratch_dir} \n"
f"export HOME={WIPACSubmitter.home_dir} \n "
f"export FLARESTACK_DATASET_DIR={self.icecube_dataset_dir} \n"
f"python {fs_dir}core/multiprocess_wrapper.py -f {path} -n {self.cluster_cpu}"
)
logger.debug("writing executable to " + self.executable_file)
with open(self.executable_file, "w") as f:
f.write(txt)
[docs] def make_submit_file(self, n_tasks):
"""
Produces the submit file that will be submitted to the NPX cluster.
:param n_tasks: Number of jobs that will be created
"""
text = (
f"executable = {self.executable_file} \n"
f"log = {WIPACSubmitter.scratch_on_nodes}/$(cluster)_$(process)job.log \n"
f"output = {WIPACSubmitter.scratch_on_nodes}/$(cluster)_$(process)job.out \n"
f"error = {WIPACSubmitter.scratch_on_nodes}/$(cluster)_$(process)job.err \n"
f"should_transfer_files = YES \n"
f"when_to_transfer_output = ON_EXIT \n"
f"arguments = $(process) \n"
f"RequestMemory = {self.ram_per_core} \n"
f"\n"
f"queue {n_tasks}"
)
logger.debug("writing submitfile at " + self.submit_file)
with open(self.submit_file, "w") as f:
f.write(text)
[docs] def submit_cluster(self, mh_dict):
"""Submits the job to the cluster"""
# Get the number of tasks that will have to be submitted in order to get ntrials
ntrials = self.mh_dict["n_trials"]
n_tasks = int(ntrials / self.trials_per_task)
logger.debug(f"running {ntrials} trials in {n_tasks} tasks")
# The mh_dict will be submitted n_task times and will perform mh_dict['n_trials'] each time.
# Therefore we have to adjust mh_dict['n_trials'] in order to actually perform the number
# specified in self.mh_dict['n_trials']
mh_dict["n_trials"] = self.trials_per_task
path = make_analysis_pickle(mh_dict)
# make the executable and the submit file
if not os.path.isdir(self.cluster_files_directory):
logger.debug(f"making directory {self.cluster_files_directory}")
os.makedirs(self.cluster_files_directory)
self.make_executable_file(path)
self.make_submit_file(n_tasks)
if not self.manual_submit:
cmd = (
f"ssh {WIPACSubmitter.username}@submit-1.icecube.wisc.edu "
f"'condor_submit {self.submit_file}'"
)
logger.debug(f"command is {cmd}")
prc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
msg = prc.stdout.read().decode()
logger.info(msg)
self.job_id = str(msg).split("cluster ")[-1].split(".")[0]
else:
input(
f"You selected manual submit mode: \n"
f"\tThe submit file can be found here: \n"
f"\t{self.submit_file} \n"
f"\tPlease submit this to the cluster and hit enter when all jobs are done! \n"
f"[ENTER]"
)
[docs] @staticmethod
def get_condor_status():
"""
Queries condor to get cluster status.
:return: str, output of query command
"""
cmd = [
"ssh",
f"{WIPACSubmitter.username}@submit-1.icecube.wisc.edu",
"'condor_q'",
]
return subprocess.check_output(cmd).decode()
[docs] def collect_condor_status(self):
"""Gets the condor status and saves it to private attribute"""
self._status_output = self.get_condor_status()
@property
def condor_status(self):
"""
Get the status of jobs running on condor.
:return: number of jobs that are done, running, waiting, total, held
"""
status_list = [
[y for y in ii.split(" ") if y]
for ii in self._status_output.split("\n")[4:-6]
]
done = running = waiting = total = held = None
for li in status_list:
if li[2] == self.job_id:
done, running, waiting = li[5:8]
held = 0 if len(li) == 10 else li[8]
total = li[-2]
return done, running, waiting, total, held
[docs] def wait_for_job(self):
if self.job_id:
logger.info("waiting for job with ID " + str(self.job_id))
time.sleep(5)
self.collect_condor_status()
j = 0
while not np.all(np.array(self.condor_status) == None):
d, r, w, t, h = self.condor_status
logger.info(
f"{time.asctime(time.localtime())} - Job{self.job_id}: "
f"{d} done, {r} running, {w} waiting, {h} held of total {t}"
)
j += 1
if j > 7:
logger.info(self._status_output)
j = 0
time.sleep(90)
self.collect_condor_status()
logger.info("Done waiting for jon with ID " + str(self.job_id))
else:
logger.info(f"No Job ID!")
[docs] @staticmethod
def get_pending_ids():
condor_status = WIPACSubmitter.get_condor_status()
ids = np.array([ii.split(" ")[2] for ii in condor_status.split("\n")[4:-6]])
return ids