drmaa library threads and munge: Invalid Credential Format

admin
drmaa
cluster
galaxy_1809
#21

How difficult would it be to create and use a Job Plugin for SGE (e.g., sge.py) that is in essence a copy of

galaxy/lib/galaxy/jobs/runners/util/cli/job/torque.py

with the required SGE parameters for qsub,qstat,qdel…etc.?

As it stands drmaa-python is broken for SGE and the CLI Shell runner needs help to correctly monitor and manage jobs within galaxy.

#22

It shouldn’t be difficult if you want to have a go at it. The major issue is that I don’t think SGE is open source, so it’d be difficult to set up automated tests for this.
It looks like the failure is with monitoring the job. What does qstat -x return ? It should be xml.
If that command doesn’t exist we probably just need to write a parser for the text output of qstat or qstat -a.

#23

We are using Son of Gride Engine… Same but different I suppose. Thanks for the tip on qstat -x. I was able to find the correct parameter to output qstat in xml on our system. Progress is a new error Type Error for NoneType. As it stands I believe we can close out the issue, SoGE will require a JobShell of its own to work properly. If I do get it written I will submit it for review. Thanks Again.

1 Like
#24

@mvdbeek I have cribbed the Torque.py shell runner to create a custom shell runner for SGE. I am getting an invalid xml format error. I was hoping for some help as to where my coding went wrong. I will attach the error output and my the new runner file I created:

[pid: 22112|app: 0|req: 17/17] 128.255.162.194 () {50 vars in 1356 bytes} [Thu Apr 11 13:57:02 2019] POST /api/tools/testing/build => generated 11411 bytes in 70 msecs (HTTP/1.1 200) 3 headers in 124 bytes (1 switches on core 0)
galaxy.jobs.mapper DEBUG 2019-04-11 13:57:03,614 [p:22119,w:0,m:2] [JobHandlerQueue.monitor_thread] (534) Mapped job to destination id: LocalShell
galaxy.jobs.handler DEBUG 2019-04-11 13:57:03,643 [p:22119,w:0,m:2] [JobHandlerQueue.monitor_thread] (534) Dispatching to cli runner
galaxy.jobs DEBUG 2019-04-11 13:57:03,666 [p:22119,w:0,m:2] [JobHandlerQueue.monitor_thread] (534) Persisting job destination (destination id: LocalShell)
galaxy.jobs DEBUG 2019-04-11 13:57:03,667 [p:22119,w:0,m:2] [JobHandlerQueue.monitor_thread] (534) Working directory for job is: /Dedicated/clingalproddata/database/jobs_directory/000/534
galaxy.jobs.runners DEBUG 2019-04-11 13:57:03,687 [p:22119,w:0,m:2] [JobHandlerQueue.monitor_thread] Job [534] queued (44.021 ms)
galaxy.jobs.handler INFO 2019-04-11 13:57:03,706 [p:22119,w:0,m:2] [JobHandlerQueue.monitor_thread] (534) Job dispatched
galaxy.jobs.command_factory INFO 2019-04-11 13:57:03,818 [p:22119,w:0,m:2] [ShellRunner.work_thread-0] Built script [/Dedicated/clingalproddata/database/jobs_directory/000/534/tool_script.sh] for tool command [echo "Running with '${GALAXY_SLOTS:-1}' threads" > "/Dedicated/clingalproddata/database/files/000/dataset_611.dat"]
galaxy.jobs.runners DEBUG 2019-04-11 13:57:03,901 [p:22119,w:0,m:2] [ShellRunner.work_thread-0] (534) command is: rm -rf working; mkdir -p working; cd working; /Dedicated/clingalproddata/database/jobs_directory/000/534/tool_script.sh; return_code=$?; cd '/Dedicated/clingalproddata/database/jobs_directory/000/534';
[ "$GALAXY_VIRTUAL_ENV" = "None" ] && GALAXY_VIRTUAL_ENV="$_GALAXY_VIRTUAL_ENV"; _galaxy_setup_environment True
python "/Dedicated/clingalproddata/database/jobs_directory/000/534/set_metadata_ozUJfS.py" "/Dedicated/clingalproddata/database/jobs_directory/000/534/registry.xml" "/Dedicated/clingalproddata/database/jobs_directory/000/534/working/galaxy.json" "/Dedicated/clingalproddata/database/jobs_directory/000/534/metadata_in_HistoryDatasetAssociation_746_CAZuzJ,/Dedicated/clingalproddata/database/jobs_directory/000/534/metadata_kwds_HistoryDatasetAssociation_746_yjwKxo,/Dedicated/clingalproddata/database/jobs_directory/000/534/metadata_out_HistoryDatasetAssociation_746_klUuOl,/Dedicated/clingalproddata/database/jobs_directory/000/534/metadata_results_HistoryDatasetAssociation_746_spgqK8,/Dedicated/clingalproddata/database/files/000/dataset_611.dat,/Dedicated/clingalproddata/database/jobs_directory/000/534/metadata_override_HistoryDatasetAssociation_746_jUWOx6" 5242880; sh -c "exit $return_code"
galaxy.jobs.runners.cli DEBUG 2019-04-11 13:57:03,923 [p:22119,w:0,m:2] [ShellRunner.work_thread-0] (534) submitting file: /Dedicated/clingalproddata/database/jobs_directory/000/534/galaxy_534.sh
128.255.162.194 - - [11/Apr/2019:13:57:06 -0500] "GET /api/histories/3be00b2dc330bf21/contents?details=00ab6520faf25f0a&order=hid&v=dev&q=update_time-ge&q=deleted&q=purged&qv=2019-04-11T18%3A57%3A02.000Z&qv=False&qv=False HTTP/1.1" 200 - "https://clinical-galaxy.iihg.uiowa.edu/?job_id=ed09665819beb35b&__identifer=snaxla4csg" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:60.0) Gecko/20100101 Firefox/60.0"
[pid: 22112|app: 0|req: 18/18] 128.255.162.194 () {46 vars in 1558 bytes} [Thu Apr 11 13:57:06 2019] GET /api/histories/3be00b2dc330bf21/contents?details=00ab6520faf25f0a&order=hid&v=dev&q=update_time-ge&q=deleted&q=purged&qv=2019-04-11T18%3A57%3A02.000Z&qv=False&qv=False => generated 1314 bytes in 38 msecs (HTTP/1.1 200) 3 headers in 124 bytes (1 switches on core 2)
galaxy.jobs.runners.cli INFO 2019-04-11 13:57:07,136 [p:22119,w:0,m:2] [ShellRunner.work_thread-0] (534) queued with identifier: submitted
galaxy.jobs DEBUG 2019-04-11 13:57:07,137 [p:22119,w:0,m:2] [ShellRunner.work_thread-0] (534) Persisting job destination (destination id: LocalShell)
galaxy.jobs.runners.util.cli.job.sge WARNING 2019-04-11 13:57:10,462 [p:22119,w:0,m:2] [ShellRunner.monitor_thread] No valid qstat XML return from `qstat -xml`, got the following: <?xml version='1.0'?>
<job_info  xmlns:xsd="http://arc.liv.ac.uk/repos/darcs/sge/source/dist/util/resources/schemas/qstat/qstat.xsd">
  <queue_info>
    <job_list state="running">
      <JB_job_number>7440828</JB_job_number>
      <JAT_prio>0.14478</JAT_prio>
      <JB_name>NVT-octyl-7ns</JB_name>
      <JB_owner>amith</JB_owner>
      <state>r</state>
      <JAT_start_time>2019-04-09T16:17:20</JAT_start_time>
      <queue_name>UI@argon-lc-i18-21.hpc</queue_name>
      <slots>56</slots>
    </job_list>
    <job_list state="running">
      <JB_job_number>6148294</JB_job_number>
      <JAT_prio>0.59493</JAT_prio>
      <JB_name>Ebola_time0</JB_name>
      <JB_owner>jiangcxu</JB_owner>
      <state>r</state>
      <JAT_start_time>2019-02-28T23:27:30</JAT_start_time>
      <queue_name>UI@argon-lc-i18-20.hpc</queue_name>
      <slots>56</slots>
    </job_list>
from logging import getLogger
try:
    import xml.etree.cElementTree as et
except ImportError:
    import xml.etree.ElementTree as et

try:
    from galaxy.model import Job
    job_states = Job.states
except ImportError:
    # Not in Galaxy, map Galaxy job states to Pulsar ones.
    from pulsar.util import enum
    job_states = enum(RUNNING='running', OK='complete', QUEUED='queued')

from ..job import BaseJobExec

log = getLogger(__name__)

ERROR_MESSAGE_UNRECOGNIZED_ARG = 'Unrecognized long argument passed to sge CLI plugin: %s'

argmap = {'destination': '-q',
          'Execution_Time': '-a',
          'Account_Name': '-A',
          'Checkpoint': '-c',
          'Error_Path': '-e',
          'Group_List': '-g',
          'Hold_Types': '-h',
          'Join_Paths': '-j',
          'Keep_Files': '-k',
          'Resource_List': '-l',
          'Mail_Points': '-m',
          'Mail_Users': '-M',
          'Job_Name': '-N',
          'Output_Path': '-o',
          'Priority': '-p',
          'Rerunable': '-r',
          'Shell_Path_List': '-S',
          'job_array_request': '-t',
          'User_List': '-u',
          'Variable_List': '-v'}


class sge(BaseJobExec):

    def __init__(self, **params):
        self.params = {}
        for k, v in params.items():
            self.params[k] = v

    def job_script_kwargs(self, ofile, efile, job_name):
        pbsargs = {'-o': ofile,
                   '-e': efile,
                   '-N': job_name}
        for k, v in self.params.items():
            if k == 'plugin':
                continue
            try:
                if not k.startswith('-'):
                    k = argmap[k]
                pbsargs[k] = v
            except KeyError:
                log.warning(ERROR_MESSAGE_UNRECOGNIZED_ARG % k)
        template_pbsargs = ''
        for k, v in pbsargs.items():
            template_pbsargs += '#PBS %s %s\n' % (k, v)
        return dict(headers=template_pbsargs)

    def submit(self, script_file):
        return 'qsub %s' % script_file

    def delete(self, job_id):
        return 'qdel %s' % job_id

    def get_status(self, job_ids=None):
        return 'qstat -q IIHG, UI -g d -xml'

    def get_single_status(self, job_id):
        return 'qstat -j %s' % job_id

    def parse_status(self, status, job_ids):
        # in case there's noise in the output, find the big blob 'o xml
        tree = None
        rval = {}
        for line in status.strip().splitlines():
            try:
                tree = et.fromstring(line.strip())
                assert tree.tag == 'job_info'
                break
            except Exception:
                tree = None
        if tree is None:
            log.warning('No valid qstat XML return from `qstat -xml`, got the following: %s' % status)
            return None
        else:
            for job in tree.findall('./*/'):
                id = job.find('JB_job_number').text
                if id in job_ids:
                    state = job.find('state').text
                    # map PBS job states to Galaxy job states.
                    rval[id] = self._get_job_state(state)
        return rval

    def parse_single_status(self, status, job_id):
        for line in status.splitlines():
            line = line.split(' = ')
            if line[0].strip() == 'job_state':
                return self._get_job_state(line[1].strip())
        # no state found, job has exited
        return job_states.OK

    def _get_job_state(self, state):
        try:
            return {
                'E': job_states.RUNNING,
                'R': job_states.RUNNING,
                'Q': job_states.QUEUED,
                'C': job_states.OK
            }.get(state)
        except KeyError:
            raise KeyError("Failed to map sge status code [%s] to job state." % state)


__all__ = ('sge',)
1 Like
#25

I think you almost got it right, it’s probably just the XML parsing that doesn’t work.
have a look at https://github.com/galaxyproject/galaxy/pull/7803/files, if this works we can either merge that PR or you can copy this and create your own PR if you would like the credits to appear on github.

#26

Thanks. Yeah the SoGE XML output was not very flat. I ended up stealing from the torque-slurm.py shell runner and parsing tabled text. I am not sure about Submitting a PR because the get single status function requires an account name. Maybe if I can figure out how to abstract that it will be of more benefit to the community. I did run across one other SoGE cluster user in the community. I warned them to avoid 8.1.9 and Munge for Galaxy.

#27

It won’t be used if the parse_status function works. Just let me know if it works for you now, that’d be enough reason to merge the PR.