drmaa library threads and munge: Invalid Credential Format

How difficult would it be to create and use a Job Plugin for SGE (e.g., sge.py) that is in essence a copy of

galaxy/lib/galaxy/jobs/runners/util/cli/job/torque.py

with the required SGE parameters for qsub,qstat,qdel…etc.?

As it stands drmaa-python is broken for SGE and the CLI Shell runner needs help to correctly monitor and manage jobs within galaxy.

It shouldn’t be difficult if you want to have a go at it. The major issue is that I don’t think SGE is open source, so it’d be difficult to set up automated tests for this.
It looks like the failure is with monitoring the job. What does qstat -x return ? It should be xml.
If that command doesn’t exist we probably just need to write a parser for the text output of qstat or qstat -a.

We are using Son of Gride Engine… Same but different I suppose. Thanks for the tip on qstat -x. I was able to find the correct parameter to output qstat in xml on our system. Progress is a new error Type Error for NoneType. As it stands I believe we can close out the issue, SoGE will require a JobShell of its own to work properly. If I do get it written I will submit it for review. Thanks Again.

1 Like

@mvdbeek I have cribbed the Torque.py shell runner to create a custom shell runner for SGE. I am getting an invalid xml format error. I was hoping for some help as to where my coding went wrong. I will attach the error output and my the new runner file I created:

[pid: 22112|app: 0|req: 17/17] 128.255.162.194 () {50 vars in 1356 bytes} [Thu Apr 11 13:57:02 2019] POST /api/tools/testing/build => generated 11411 bytes in 70 msecs (HTTP/1.1 200) 3 headers in 124 bytes (1 switches on core 0)
galaxy.jobs.mapper DEBUG 2019-04-11 13:57:03,614 [p:22119,w:0,m:2] [JobHandlerQueue.monitor_thread] (534) Mapped job to destination id: LocalShell
galaxy.jobs.handler DEBUG 2019-04-11 13:57:03,643 [p:22119,w:0,m:2] [JobHandlerQueue.monitor_thread] (534) Dispatching to cli runner
galaxy.jobs DEBUG 2019-04-11 13:57:03,666 [p:22119,w:0,m:2] [JobHandlerQueue.monitor_thread] (534) Persisting job destination (destination id: LocalShell)
galaxy.jobs DEBUG 2019-04-11 13:57:03,667 [p:22119,w:0,m:2] [JobHandlerQueue.monitor_thread] (534) Working directory for job is: /Dedicated/clingalproddata/database/jobs_directory/000/534
galaxy.jobs.runners DEBUG 2019-04-11 13:57:03,687 [p:22119,w:0,m:2] [JobHandlerQueue.monitor_thread] Job [534] queued (44.021 ms)
galaxy.jobs.handler INFO 2019-04-11 13:57:03,706 [p:22119,w:0,m:2] [JobHandlerQueue.monitor_thread] (534) Job dispatched
galaxy.jobs.command_factory INFO 2019-04-11 13:57:03,818 [p:22119,w:0,m:2] [ShellRunner.work_thread-0] Built script [/Dedicated/clingalproddata/database/jobs_directory/000/534/tool_script.sh] for tool command [echo "Running with '${GALAXY_SLOTS:-1}' threads" > "/Dedicated/clingalproddata/database/files/000/dataset_611.dat"]
galaxy.jobs.runners DEBUG 2019-04-11 13:57:03,901 [p:22119,w:0,m:2] [ShellRunner.work_thread-0] (534) command is: rm -rf working; mkdir -p working; cd working; /Dedicated/clingalproddata/database/jobs_directory/000/534/tool_script.sh; return_code=$?; cd '/Dedicated/clingalproddata/database/jobs_directory/000/534';
[ "$GALAXY_VIRTUAL_ENV" = "None" ] && GALAXY_VIRTUAL_ENV="$_GALAXY_VIRTUAL_ENV"; _galaxy_setup_environment True
python "/Dedicated/clingalproddata/database/jobs_directory/000/534/set_metadata_ozUJfS.py" "/Dedicated/clingalproddata/database/jobs_directory/000/534/registry.xml" "/Dedicated/clingalproddata/database/jobs_directory/000/534/working/galaxy.json" "/Dedicated/clingalproddata/database/jobs_directory/000/534/metadata_in_HistoryDatasetAssociation_746_CAZuzJ,/Dedicated/clingalproddata/database/jobs_directory/000/534/metadata_kwds_HistoryDatasetAssociation_746_yjwKxo,/Dedicated/clingalproddata/database/jobs_directory/000/534/metadata_out_HistoryDatasetAssociation_746_klUuOl,/Dedicated/clingalproddata/database/jobs_directory/000/534/metadata_results_HistoryDatasetAssociation_746_spgqK8,/Dedicated/clingalproddata/database/files/000/dataset_611.dat,/Dedicated/clingalproddata/database/jobs_directory/000/534/metadata_override_HistoryDatasetAssociation_746_jUWOx6" 5242880; sh -c "exit $return_code"
galaxy.jobs.runners.cli DEBUG 2019-04-11 13:57:03,923 [p:22119,w:0,m:2] [ShellRunner.work_thread-0] (534) submitting file: /Dedicated/clingalproddata/database/jobs_directory/000/534/galaxy_534.sh
128.255.162.194 - - [11/Apr/2019:13:57:06 -0500] "GET /api/histories/3be00b2dc330bf21/contents?details=00ab6520faf25f0a&order=hid&v=dev&q=update_time-ge&q=deleted&q=purged&qv=2019-04-11T18%3A57%3A02.000Z&qv=False&qv=False HTTP/1.1" 200 - "https://clinical-galaxy.iihg.uiowa.edu/?job_id=ed09665819beb35b&__identifer=snaxla4csg" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:60.0) Gecko/20100101 Firefox/60.0"
[pid: 22112|app: 0|req: 18/18] 128.255.162.194 () {46 vars in 1558 bytes} [Thu Apr 11 13:57:06 2019] GET /api/histories/3be00b2dc330bf21/contents?details=00ab6520faf25f0a&order=hid&v=dev&q=update_time-ge&q=deleted&q=purged&qv=2019-04-11T18%3A57%3A02.000Z&qv=False&qv=False => generated 1314 bytes in 38 msecs (HTTP/1.1 200) 3 headers in 124 bytes (1 switches on core 2)
galaxy.jobs.runners.cli INFO 2019-04-11 13:57:07,136 [p:22119,w:0,m:2] [ShellRunner.work_thread-0] (534) queued with identifier: submitted
galaxy.jobs DEBUG 2019-04-11 13:57:07,137 [p:22119,w:0,m:2] [ShellRunner.work_thread-0] (534) Persisting job destination (destination id: LocalShell)
galaxy.jobs.runners.util.cli.job.sge WARNING 2019-04-11 13:57:10,462 [p:22119,w:0,m:2] [ShellRunner.monitor_thread] No valid qstat XML return from `qstat -xml`, got the following: <?xml version='1.0'?>
<job_info  xmlns:xsd="http://arc.liv.ac.uk/repos/darcs/sge/source/dist/util/resources/schemas/qstat/qstat.xsd">
  <queue_info>
    <job_list state="running">
      <JB_job_number>7440828</JB_job_number>
      <JAT_prio>0.14478</JAT_prio>
      <JB_name>NVT-octyl-7ns</JB_name>
      <JB_owner>amith</JB_owner>
      <state>r</state>
      <JAT_start_time>2019-04-09T16:17:20</JAT_start_time>
      <queue_name>UI@argon-lc-i18-21.hpc</queue_name>
      <slots>56</slots>
    </job_list>
    <job_list state="running">
      <JB_job_number>6148294</JB_job_number>
      <JAT_prio>0.59493</JAT_prio>
      <JB_name>Ebola_time0</JB_name>
      <JB_owner>jiangcxu</JB_owner>
      <state>r</state>
      <JAT_start_time>2019-02-28T23:27:30</JAT_start_time>
      <queue_name>UI@argon-lc-i18-20.hpc</queue_name>
      <slots>56</slots>
    </job_list>
from logging import getLogger
try:
    import xml.etree.cElementTree as et
except ImportError:
    import xml.etree.ElementTree as et

try:
    from galaxy.model import Job
    job_states = Job.states
except ImportError:
    # Not in Galaxy, map Galaxy job states to Pulsar ones.
    from pulsar.util import enum
    job_states = enum(RUNNING='running', OK='complete', QUEUED='queued')

from ..job import BaseJobExec

log = getLogger(__name__)

ERROR_MESSAGE_UNRECOGNIZED_ARG = 'Unrecognized long argument passed to sge CLI plugin: %s'

argmap = {'destination': '-q',
          'Execution_Time': '-a',
          'Account_Name': '-A',
          'Checkpoint': '-c',
          'Error_Path': '-e',
          'Group_List': '-g',
          'Hold_Types': '-h',
          'Join_Paths': '-j',
          'Keep_Files': '-k',
          'Resource_List': '-l',
          'Mail_Points': '-m',
          'Mail_Users': '-M',
          'Job_Name': '-N',
          'Output_Path': '-o',
          'Priority': '-p',
          'Rerunable': '-r',
          'Shell_Path_List': '-S',
          'job_array_request': '-t',
          'User_List': '-u',
          'Variable_List': '-v'}


class sge(BaseJobExec):

    def __init__(self, **params):
        self.params = {}
        for k, v in params.items():
            self.params[k] = v

    def job_script_kwargs(self, ofile, efile, job_name):
        pbsargs = {'-o': ofile,
                   '-e': efile,
                   '-N': job_name}
        for k, v in self.params.items():
            if k == 'plugin':
                continue
            try:
                if not k.startswith('-'):
                    k = argmap[k]
                pbsargs[k] = v
            except KeyError:
                log.warning(ERROR_MESSAGE_UNRECOGNIZED_ARG % k)
        template_pbsargs = ''
        for k, v in pbsargs.items():
            template_pbsargs += '#PBS %s %s\n' % (k, v)
        return dict(headers=template_pbsargs)

    def submit(self, script_file):
        return 'qsub %s' % script_file

    def delete(self, job_id):
        return 'qdel %s' % job_id

    def get_status(self, job_ids=None):
        return 'qstat -q IIHG, UI -g d -xml'

    def get_single_status(self, job_id):
        return 'qstat -j %s' % job_id

    def parse_status(self, status, job_ids):
        # in case there's noise in the output, find the big blob 'o xml
        tree = None
        rval = {}
        for line in status.strip().splitlines():
            try:
                tree = et.fromstring(line.strip())
                assert tree.tag == 'job_info'
                break
            except Exception:
                tree = None
        if tree is None:
            log.warning('No valid qstat XML return from `qstat -xml`, got the following: %s' % status)
            return None
        else:
            for job in tree.findall('./*/'):
                id = job.find('JB_job_number').text
                if id in job_ids:
                    state = job.find('state').text
                    # map PBS job states to Galaxy job states.
                    rval[id] = self._get_job_state(state)
        return rval

    def parse_single_status(self, status, job_id):
        for line in status.splitlines():
            line = line.split(' = ')
            if line[0].strip() == 'job_state':
                return self._get_job_state(line[1].strip())
        # no state found, job has exited
        return job_states.OK

    def _get_job_state(self, state):
        try:
            return {
                'E': job_states.RUNNING,
                'R': job_states.RUNNING,
                'Q': job_states.QUEUED,
                'C': job_states.OK
            }.get(state)
        except KeyError:
            raise KeyError("Failed to map sge status code [%s] to job state." % state)


__all__ = ('sge',)
1 Like

I think you almost got it right, it’s probably just the XML parsing that doesn’t work.
have a look at https://github.com/galaxyproject/galaxy/pull/7803/files, if this works we can either merge that PR or you can copy this and create your own PR if you would like the credits to appear on github.

Thanks. Yeah the SoGE XML output was not very flat. I ended up stealing from the torque-slurm.py shell runner and parsing tabled text. I am not sure about Submitting a PR because the get single status function requires an account name. Maybe if I can figure out how to abstract that it will be of more benefit to the community. I did run across one other SoGE cluster user in the community. I warned them to avoid 8.1.9 and Munge for Galaxy.

It won’t be used if the parse_status function works. Just let me know if it works for you now, that’d be enough reason to merge the PR.