Hi!I have a problem using bioblend’s documentation.I want the workflow to end and my whole program to end, but my code still has problems. When my workflow is still running, my program shows completion. Help me check what’s wrong.Here is all my code:
from bioblend.galaxy import GalaxyInstance
from gooey import Gooey, GooeyParser
import os
from typing import (
Any,
Dict,
List,
Optional,
TYPE_CHECKING,
)
import logging
import time
from bioblend import (
CHUNK_SIZE,
TimeoutException,
)
from bioblend.galaxy.client import Client
if TYPE_CHECKING:
from bioblend.galaxy import GalaxyInstance
log = logging.getLogger(name)
INVOCATION_TERMINAL_STATES = {“cancelled”, “failed”, “scheduled”}
class InvocationClient(Client):
gi: “GalaxyInstance”
module = “invocations”
@Gooey(dump_build_config=True, program_name=“MetaWin”, default_size=(1400, 1000), timing_options={‘show_time_remaining’: True, ‘hide_time_remaining_on_complete’: False})
def main():
desc = “MetaWin is a macro-genomic analysis tool based on Windows. Its workflow includes quality control, read trimming, assembly, binning, gene prediction, and functional annotation.”
my_cool_parser = GooeyParser(description=desc)
my_cool_parser.add_argument(
"Input1 forward", help="Input1 forward(e.g. _r1.fastq.gz)", widget="FileSaver")
my_cool_parser.add_argument(
"Input1 reverse", help="Input1 reverse(e.g. _r1.fastq.gz)", widget="FileSaver")
my_cool_parser.add_argument('-k', '--Kraken Database',
choices=['Silva', 'RDP', 'Greengenes','Standard','Prebuilt Refseq indexes'],
default='Standard',
help="Select a Kraken2 database (default: Standard)",
widget='Dropdown')
my_cool_parser.add_argument('-c', '--Confidence', default=0,
help="Confidence score threshold. Must be in [0, 1] (default: 0)", dest='Confidence',
type=float)
my_cool_parser.add_argument(
"-e", "--error", action="store_true", help="Stop process on error (default: No)")
args = my_cool_parser.parse_args()
input_files = args.__dict__["Input1 forward"]
Input1_forward = args.__dict__["Input1 forward"]
Input1_reverse = args.__dict__["Input1 reverse"]
input_dir = os.path.dirname(input_files)
output_dir = os.path.join(input_dir, "output")
if not os.path.exists(output_dir):
os.makedirs(output_dir)
galaxy_url = 'https://usegalaxy.org/'
api_key = 'c962d3fae66664a11ba60297ef0d1b84'
gi = GalaxyInstance(url=galaxy_url, key=api_key)
history_name = 'MetaWin Results'
history_id = gi.histories.create_history(name=history_name)['id']
dataset_1 = gi.tools.upload_file(Input1_forward, history_id=history_id, file_type='fastqsanger.gz')
dataset_1_id = dataset_1['outputs'][0]['id']
dataset_2 = gi.tools.upload_file(Input1_reverse, history_id=history_id, file_type='fastqsanger.gz')
dataset_2_id = dataset_2['outputs'][0]['id']
collection_payload = {
'collection_type': 'list:paired',
'name': 'Paired Collection',
'element_identifiers': [
{
'collection_type': 'paired',
'name': 'first_element',
'src': 'new_collection',
'element_identifiers': [
{
'name': 'forward',
'src': 'hda',
'id': dataset_1_id
}, {
'name':'reverse',
'src': 'hda',
'id': dataset_2_id
}
]
}
]
}
uploaded_collection = gi.histories.create_dataset_collection(history_id, collection_payload)
inputs = {
'Collection of paired reads': {'src': 'hdca', 'id': uploaded_collection['id']}
}
workflow_id = '1aa75989b24978e3'
invocation = gi.workflows.invoke_workflow(workflow_id, inputs=inputs, inputs_by="name")
def wait_for_invocation(gi: GalaxyInstance, invocation_id: str, maxwait: float = 12000, interval: float = 3, check: bool = True):
time_left = maxwait
while True:
invocation = gi.invocations.show_invocation(invocation_id)
state = invocation["state"]
if state in INVOCATION_TERMINAL_STATES:
if check and state != "scheduled":
raise Exception(f"Invocation {invocation_id} is in terminal state {state}")
return invocation
if time_left > 0:
log.info(f"Invocation {invocation_id} is in non-terminal state {state}. Will wait {time_left} more s")
time.sleep(min(time_left, interval))
time_left -= interval
else:
raise TimeoutException(
f"Invocation {invocation_id} is still in non-terminal state {state} after {maxwait} s"
)
try:
wait_for_invocation(gi, invocation['id'])
except TimeoutException as e:
log.error(f"Timeout waiting for workflow invocation to complete: {e}")
if name == ‘main’:
main()