Pulsar configuration

Hi,
I’m trying to setup a galaxy server with a pulsar setup. I am following the documentation here:

I have encountered an issue though. After initializing the rabbitmq server I can see three channels connecting to the broker. One for each of the queues (pulsar__kill_ack, pulsar__setup_ack, pulsar__status_update)


However when I submit a job from galaxy a new channel appears with no consumers and all messages get dropped.

Here are my config files.
galaxy.yml

    job_config:

        runners:
            local_runner:
                load: galaxy.jobs.runners.local:LocalJobRunner
                workers: 4
            pulsar_runner:
                load: galaxy.jobs.runners.pulsar:PulsarMQJobRunner
                amqp_url: "pyamqp://pulsar_au:pulsar_pass@rabbit-host:5671//pulsar/pulsar_au?ssl=1"
                amqp_acknowledge: true
                amqp_ack_republish_time: 1200
                amqp_consumer_timeout: 2
                amqp_publish_retry: true
                amqp_publish_retry_max_retries: 60
                galaxy_url: "https://galaxy.rcs.ucalgary.ca"
                manager: _default_

        execution:
            default: tpv_dispatcher
            environments:
                local_env:
                    runner: local_runner
                    tmp_dir: true
                tpv_dispatcher:
                    runner: dynamic
                    type: python
                    function: map_tool_to_destination
                    rules_module: tpv.rules
                    tpv_config_files:
                        - https://gxy.io/tpv/db.yml
                        - /galaxy/server/config/tpv_rules_local.yml
        tools:
        -   class: local
            environment: local_env

definitions.json

{
    "rabbit_version": "3.11.28",
    "rabbitmq_version": "3.11.28",
    "users": [
        {
            "hashing_algorithm": "rabbit_password_hashing_sha256",
            "limits": {},
            "name": "admin",
            "password_hash": "jkhslkhgkluw",
            "tags": [
                "administrator"
            ]
        },
        {
            "hashing_algorithm": "rabbit_password_hashing_sha256",
            "limits": {},
            "name": "pulsar_au",
            "password_hash": "jkshdfljhih",
            "tags": []
        }
    ],
    "vhosts": [
        {
            "limits": [],
            "metadata": {
                "description": "",
                "tags": []
            },
            "name": "/pulsar/pulsar_au"
        },
        {
            "limits": [],
            "metadata": {
                "description": "Default virtual host",
                "tags": []
            },
            "name": "/"
        }
    ],
    "permissions": [
        {
            "user": "admin",
            "vhost": "/",
            "configure": ".*",
            "read": ".*",
            "write": ".*"
        },
        {
            "user": "admin",
            "vhost": "/pulsar/pulsar_au",
            "configure": ".*",
            "read": ".*",
            "write": ".*"
        },
        {
            "user": "pulsar_au",
            "vhost": "/pulsar/pulsar_au",
            "configure": ".*",
            "read": ".*",
            "write": ".*"
        }
    ],
    "parameters": [],
    "policies": [],
    "queues": [],
    "exchanges": [],
    "bindings": []
}

tpv_rules_local.yml

global:
  default_inherits: default

tools:
  default:
    abstract: true
    cores: 1
    mem: cores * 4
    params:
      walltime: 8
    scheduling:
      require:
        - pulsar

destinations:
  pulsar:
    runner: pulsar_runner
    params:
      default_file_action: remote_transfer
      dependency_resolution: remote
      jobs_directory: /mnt/pulsar/files/staging
      persistence_directory: /mnt/pulsar/files/persisted_data
      remote_metadata: false
      rewrite_parameters: true
      transport: curl
      outputs_to_working_directory: false
    scheduling:
      require:
        - pulsar

My question is what could be causing this issue? How to correctly reroute galaxy requests to the proper queue and exchange?

Best regards,
Pedro

Welcome, @pemartin

Thanks for posting all of the details!! Let’s ask for expert help at the Admin chat. They may reply here or there, and feel free to join → You're invited to talk on Matrix

Let’s continue the discussion on github: Galaxy not including appropriate routing_key for RabbitMQ · galaxyproject/galaxy · Discussion #17910 · GitHub