Custom CLI Runner

Can anyone clarify what is expected in the following block of code in cli.py?

if state is None:
if ajs.job_wrapper.get_state() == model.Job.states.DELETED:
continue

        external_metadata = not asbool(ajs.job_wrapper.job_destination.params.get("embed_metadata_in_job", DEFAULT_EMBED_METADATA_IN_JOB))
        if external_metadata:
            self._handle_metadata_externally(ajs.job_wrapper, resolve_requirements=True)

        log.debug("(%s/%s) job not found in batch state check" % (id_tag, external_job_id))
        shell_params, job_params = self.parse_destination_params(ajs.job_destination.params)
        shell, job_interface = self.get_cli_plugins(shell_params, job_params)
        cmd_out = shell.execute(job_interface.get_single_status(external_job_id))
        state = job_interface.parse_single_status(cmd_out.stdout, external_job_id)
        if not state == model.Job.states.OK:
            log.warning('(%s/%s) job not found in batch state check, but found in individual state check' % (id_tag, external_job_id))
    if state != old_state:
        log.debug("(%s/%s) state change: from %s to %s" % (id_tag, external_job_id, old_state, state))
        if not state == model.Job.states.OK:
            # No need to change_state when the state is OK, this will be handled by `self.finish_job`
            ajs.job_wrapper.change_state(state)
    if state == model.Job.states.RUNNING and not ajs.running:
        ajs.running = True
    ajs.old_state = state
    if state == model.Job.states.OK:
        log.debug('(%s/%s) job execution finished, running job wrapper finish method' % (id_tag, external_job_id))
        self.work_queue.put((self.finish_job, ajs))

If the galaxy job gets queued by the cluster Galaxy marks it as OKay and deletes the job directories… Log output:

galaxy.jobs.mapper DEBUG 2019-04-22 15:07:56,035 [p:38344,w:0,m:2] [JobHandlerQueue.monitor_thread] (594) Mapped job to destination id: LocalShell
galaxy.jobs.handler DEBUG 2019-04-22 15:07:56,063 [p:38344,w:0,m:2] [JobHandlerQueue.monitor_thread] (594) Dispatching to cli runner
galaxy.jobs DEBUG 2019-04-22 15:07:56,086 [p:38344,w:0,m:2] [JobHandlerQueue.monitor_thread] (594) Persisting job destination (destination id: LocalShell)
galaxy.jobs DEBUG 2019-04-22 15:07:56,087 [p:38344,w:0,m:2] [JobHandlerQueue.monitor_thread] (594) Working directory for job is: /Dedicated/clingalproddata/database/jobs_directory/000/594
galaxy.jobs.runners DEBUG 2019-04-22 15:07:56,103 [p:38344,w:0,m:2] [JobHandlerQueue.monitor_thread] Job [594] queued (40.204 ms)
galaxy.jobs.handler INFO 2019-04-22 15:07:56,123 [p:38344,w:0,m:2] [JobHandlerQueue.monitor_thread] (594) Job dispatched
galaxy.jobs.command_factory INFO 2019-04-22 15:07:56,246 [p:38344,w:0,m:2] [ShellRunner.work_thread-0] Built script [/Dedicated/clingalproddata/database/jobs_directory/000/594/tool_script.sh] for tool command [echo “Running with ‘${GALAXY_SLOTS:-1}’ threads” > “/Dedicated/clingalproddata/database/files/000/dataset_671.dat”]
galaxy.jobs.runners DEBUG 2019-04-22 15:07:56,320 [p:38344,w:0,m:2] [ShellRunner.work_thread-0] (594) command is: rm -rf working; mkdir -p working; cd working; /Dedicated/clingalproddata/database/jobs_directory/000/594/tool_script.sh; return_code=$?; cd ‘/Dedicated/clingalproddata/database/jobs_directory/000/594’;
[ “$GALAXY_VIRTUAL_ENV” = “None” ] && GALAXY_VIRTUAL_ENV=“$_GALAXY_VIRTUAL_ENV”; _galaxy_setup_environment True
python “/Dedicated/clingalproddata/database/jobs_directory/000/594/set_metadata_KNv21u.py” “/Dedicated/clingalproddata/database/jobs_directory/000/594/registry.xml” “/Dedicated/clingalproddata/database/jobs_directory/000/594/working/galaxy.json” “/Dedicated/clingalproddata/database/jobs_directory/000/594/metadata_in_HistoryDatasetAssociation_806_uXnJC8,/Dedicated/clingalproddata/database/jobs_directory/000/594/metadata_kwds_HistoryDatasetAssociation_806_hQcOzn,/Dedicated/clingalproddata/database/jobs_directory/000/594/metadata_out_HistoryDatasetAssociation_806_gc_BfH,/Dedicated/clingalproddata/database/jobs_directory/000/594/metadata_results_HistoryDatasetAssociation_806_adwo6t,/Dedicated/clingalproddata/database/files/000/dataset_671.dat,/Dedicated/clingalproddata/database/jobs_directory/000/594/metadata_override_HistoryDatasetAssociation_806_Hi6XQ8” 5242880; sh -c “exit $return_code”
galaxy.jobs.runners.cli DEBUG 2019-04-22 15:07:56,341 [p:38344,w:0,m:2] [ShellRunner.work_thread-0] (594) submitting file: /Dedicated/clingalproddata/database/jobs_directory/000/594/galaxy_594.sh
galaxy.jobs.runners.cli INFO 2019-04-22 15:07:59,556 [p:38344,w:0,m:2] [ShellRunner.work_thread-0] (594) queued with identifier: submitted
galaxy.jobs DEBUG 2019-04-22 15:07:59,556 [p:38344,w:0,m:2] [ShellRunner.work_thread-0] (594) Persisting job destination (destination id: LocalShell)
128.255.162.194 - - [22/Apr/2019:15:07:59 -0500] “GET /api/histories/3be00b2dc330bf21/contents?details=21f51087e2f8c9cb%2C820e8a68e593cae6%2Cb864d31a7696bc50%2C6905cb35e1e81f5a%2C2cd348c1fb24f9ad%2C00caf7494de597ee%2C3c35cd92a39bfa13%2C8ec91fbec3795e2f%2C82db617418e6bc1b%2Cfbd2dc1fb21882db%2C83e581a4b150a907%2C37dcab24bd9ca28b%2Cc5afba4dc1e58b9c%2Cb98bcf8e380dadee%2Cf66341e9f43e26d6%2C02234c079b2b140d%2Ccf0b1d1cc4735367%2Cc40b1fbe0607aa09&order=hid&v=dev&q=update_time-ge&q=deleted&q=purged&qv=2019-04-22T20%3A07%3A55.000Z&qv=False&qv=False HTTP/1.1” 200 - “https://clinical-galaxy.iihg.uiowa.edu/?job_id=64c391652c2ec90d&__identifer=m1o72f2talq” “Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:60.0) Gecko/20100101 Firefox/60.0”
[pid: 38337|app: 0|req: 8/8] 128.255.162.194 () {46 vars in 2172 bytes} [Mon Apr 22 15:07:59 2019] GET /api/histories/3be00b2dc330bf21/contents?details=21f51087e2f8c9cb%2C820e8a68e593cae6%2Cb864d31a7696bc50%2C6905cb35e1e81f5a%2C2cd348c1fb24f9ad%2C00caf7494de597ee%2C3c35cd92a39bfa13%2C8ec91fbec3795e2f%2C82db617418e6bc1b%2Cfbd2dc1fb21882db%2C83e581a4b150a907%2C37dcab24bd9ca28b%2Cc5afba4dc1e58b9c%2Cb98bcf8e380dadee%2Cf66341e9f43e26d6%2C02234c079b2b140d%2Ccf0b1d1cc4735367%2Cc40b1fbe0607aa09&order=hid&v=dev&q=update_time-ge&q=deleted&q=purged&qv=2019-04-22T20%3A07%3A55.000Z&qv=False&qv=False => generated 1304 bytes in 36 msecs (HTTP/1.1 200) 3 headers in 124 bytes (1 switches on core 6)
galaxy.jobs.runners.cli DEBUG 2019-04-22 15:08:03,060 [p:38344,w:0,m:2] [ShellRunner.monitor_thread] (594/submitted) job not found in batch state check
128.255.162.194 - - [22/Apr/2019:15:08:03 -0500] “GET /api/histories/3be00b2dc330bf21/contents?details=21f51087e2f8c9cb%2C820e8a68e593cae6%2Cb864d31a7696bc50%2C6905cb35e1e81f5a%2C2cd348c1fb24f9ad%2C00caf7494de597ee%2C3c35cd92a39bfa13%2C8ec91fbec3795e2f%2C82db617418e6bc1b%2Cfbd2dc1fb21882db%2C83e581a4b150a907%2C37dcab24bd9ca28b%2Cc5afba4dc1e58b9c%2Cb98bcf8e380dadee%2Cf66341e9f43e26d6%2C02234c079b2b140d%2Ccf0b1d1cc4735367%2Cc40b1fbe0607aa09&order=hid&v=dev&q=update_time-ge&q=deleted&q=purged&qv=2019-04-22T20%3A07%3A59.000Z&qv=False&qv=False HTTP/1.1” 200 - “https://clinical-galaxy.iihg.uiowa.edu/?job_id=64c391652c2ec90d&__identifer=m1o72f2talq” “Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:60.0) Gecko/20100101 Firefox/60.0”
[pid: 38337|app: 0|req: 9/9] 128.255.162.194 () {46 vars in 2172 bytes} [Mon Apr 22 15:08:03 2019] GET /api/histories/3be00b2dc330bf21/contents?details=21f51087e2f8c9cb%2C820e8a68e593cae6%2Cb864d31a7696bc50%2C6905cb35e1e81f5a%2C2cd348c1fb24f9ad%2C00caf7494de597ee%2C3c35cd92a39bfa13%2C8ec91fbec3795e2f%2C82db617418e6bc1b%2Cfbd2dc1fb21882db%2C83e581a4b150a907%2C37dcab24bd9ca28b%2Cc5afba4dc1e58b9c%2Cb98bcf8e380dadee%2Cf66341e9f43e26d6%2C02234c079b2b140d%2Ccf0b1d1cc4735367%2Cc40b1fbe0607aa09&order=hid&v=dev&q=update_time-ge&q=deleted&q=purged&qv=2019-04-22T20%3A07%3A59.000Z&qv=False&qv=False => generated 2 bytes in 28 msecs (HTTP/1.1 200) 3 headers in 124 bytes (1 switches on core 0)
galaxy.jobs.runners.cli DEBUG 2019-04-22 15:08:06,281 [p:38344,w:0,m:2] [ShellRunner.monitor_thread] (594/submitted) state change: from new to ok
galaxy.jobs.runners.cli DEBUG 2019-04-22 15:08:06,281 [p:38344,w:0,m:2] [ShellRunner.monitor_thread] (594/submitted) job execution finished, running job wrapper finish method
galaxy.jobs.runners ERROR 2019-04-22 15:08:06,286 [p:38344,w:0,m:2] [ShellRunner.work_thread-0] (594/submitted) Job output not returned from cluster: [Errno 2] No such file or directory: ‘/Dedicated/clingalproddata/database/jobs_directory/000/594/galaxy_594.o’
galaxy.jobs.runners.cli DEBUG 2019-04-22 15:08:06,406 [p:38344,w:0,m:2] [ShellRunner.work_thread-0] (594/submitted) User killed running job, but error encountered during termination:

1 Like

Reference: https://docs.galaxyproject.org/en/release_19.01/admin/cluster.html#cli

I appreciate the link, however I wouldn’t have been able to write the Shell runner without reading the documentation for the cli… I also have read https://docs.galaxyproject.org/en/release_18.09/dev/build_a_job_runner.html.

My code:

from logging import getLogger
import re
from galaxy.model import Job
job_states = Job.states

from ..job import BaseJobExec

log = getLogger(__name__)

ERROR_MESSAGE_UNRECOGNIZED_ARG = 'Unrecognized long argument passed to sge CLI plugin: %s'

argmap = {'destination': '-q',
          'Execution_Time': '-a',
          'Account_Name': '-A',
          'Checkpoint': '-c',
          'Error_Path': '-e',
          'Group_List': '-g',
          'Hold_Types': '-h',
          'Join_Paths': '-j',
          'Keep_Files': '-k',
          'Resource_List': '-l',
          'Mail_Points': '-m',
          'Mail_Users': '-M',
          'Job_Name': '-N',
          'Output_Path': '-o',
          'Priority': '-p',
          'Rerunable': '-r',
          'Shell_Path_List': '-S',
          'job_array_request': '-t',
          'User_List': '-u',
          'Variable_List': '-v'}


class sge(BaseJobExec):

    def __init__(self, **params):
        self.params = {}
        for k, v in params.items():
            self.params[k] = v

    def job_script_kwargs(self, ofile, efile, job_name):
        pbsargs = {'-o': ofile,
                   '-e': efile,
                   '-N': job_name}
        for k, v in self.params.items():
            if k == 'plugin':
                continue
            try:
                if not k.startswith('-'):
                    k = argmap[k]
                pbsargs[k] = v
            except KeyError:
                log.warning(ERROR_MESSAGE_UNRECOGNIZED_ARG % k)
        template_pbsargs = ''
        for k, v in pbsargs.items():
            template_pbsargs += '#$ %s %s\n' % (k, v)
        return dict(headers=template_pbsargs)

    def submit(self, script_file):
        return 'qsub %s' % script_file

    def delete(self, job_id):
        return 'qdel %s' % job_id

    def get_status(self, job_ids=None):
        return 'qstat'

    def get_single_status(self, job_id):
        return "qstat -u svc-clingalprod | grep %s | awk '{print $1,$5}'" % job_id

    def parse_status(self, status, job_ids):
        rval = {}
        for line in status.strip().splitlines():
            if line.startswith("job-ID"):
                continue
	    if line.startswith("-"):
                continue
            line_parts = re.compile("\s+").split(line)
            #if len(line_parts) < 9:
                #continue
            id = line_parts[0]
            state = line_parts[4]
            if id in job_ids:
                # map SGE job states to Galaxy job states.
                rval[id] = self._get_job_state(state)
        return rval
    def parse_single_status(self, status, job_id):
        for line in status.splitlines():
            line = line.split()
            if line[0].strip() != None:
                return self._get_job_state(line[1].strip())
        # no state found, job has exited
        return job_states.OK
    def _get_job_state(self, state):
        try:
            return {
                'r': job_states.RUNNING,
                'Rr': job_states.RUNNING,
                'qw': job_states.QUEUED,
                'hqw': job_states.QUEUED,
            }.get(state)
        except KeyError:
            raise KeyError("Failed to map sge status code [%s] to job state." % state)

__all__ = ('sge',)

Under parse_single_status why is it working intermittently? Is it a timing isssue with the cli.py or have I done something ncorrectly in my shell job class?

1 Like

Example output from get_single_status:

[svc-clingalprod@clinical-galaxy job]$ qstat -u svc-clingalprod | grep 8791197 | awk ‘{print $1,$5}’
8791197 Eqw

It looks like your submit() method is outputting the full qsub output. The cli runner splits stdout on whitespace and uses the last element as the job id, which in your case appears to be the word submitted:

galaxy.jobs.runners.cli INFO 2019-04-22 15:07:59,556 [p:38344,w:0,m:2] [ShellRunner.work_thread-0] (594) queued with identifier: submitted
galaxy.jobs DEBUG 2019-04-22 15:07:59,556 [p:38344,w:0,m:2] [ShellRunner.work_thread-0] (594) Persisting job destination (destination id: LocalShell)
galaxy.jobs.runners.cli DEBUG 2019-04-22 15:08:03,060 [p:38344,w:0,m:2] [ShellRunner.monitor_thread] (594/submitted) job not found in batch state check

If you update your submit() method to output just the SGE job ID that should get you past the issue you’re seeing here.

1 Like

@nate THANKS!!! qsub just needed the -terse parameter added to the submission. For what it is worth this was our best solution for getting SoGridEngine v8.1.9 working with Galaxy.

2 Likes

We’d be happy to add your gridengine CLI plugin to Galaxy if you wanted to clean it up to pass our linting standards and make a pull request for it.