Async task / queue management

I’ve hit a situation and would appreciate advice on how to proceed. At the point my playbook starts, there are some number of predefined “jobs,” each of which should run on a predetermined “cluster”, and there are several clusters. The playbook targets “localhost”; each job’s “cluster” is a parameter to a simple task that uses a proprietary connection/execution module to run the jobs, the details of which aren’t relevant. (I promise!)

What we’ve done until now is fire off all the jobs on each cluster asynchronously. That works fine as long as there are fewer than about 3 jobs on any given cluster. More than that, and users start to notice degraded performance. When this was first put together, more than 3 jobs wasn’t an issue; now it is.

What I’d like to implement is a per-cluster queue wherein 3 of N jobs can run simultaneously, and when any of the 3 running jobs completes we can start another, etc. until there are no more pending jobs, then each of the last jobs completes in turn and that cluster is done. Simultaneously, the other clusters are off jobbing their respective jobs the same way.

This all worked really well in my head this morning! That was before I (re)discovered (A) that until: can’t be used on include_tasks: and (B) that async_status requires a particular job ID. Ansible doesn’t seem to have the equivalent of bash’s wait which can wait for any child process completion, or for any process id from a list. Maybe such a thing could be implemented by snooping around the temporary async job task files in ~/.ansible_async/? I don’t know, but implementing job queues in vanilla Ansible is kind of tough with neither A nor B available.

It wouldn’t be nearly as hard (i.e. it might be possible) to use the batch filter and group the jobs into sets of 3 per cluster and run each batch to completion. But we’re trying to minimize time, and that approach leaves gaps in utilization.

So that’s were I sit at the end of the day with text buffers full of non-working code and a wall-sore head full of shattered ideas. I’d appreciate reading your suggestions.

2 Likes

Hi,

Have you tried to use a different approach and use something with strategy host pinned, serial and forks >=2x serial?

I read your post very fast but i think that something can be done in that way, you can simulate a sort of queue, where an host will performed all tasks in serial because is pinned.
When it finished will leave one slot in the queue and new host will proceed.

Fro example6 hosts, strategy host_pinned, serial 3, forks 6(to be sure that queue is availabe). Play will start in the first 3 hosts, all three hosts procede in each task indipendently. Suppese that 2 host require more time to oerfomr one task but the third hosta complete all tasks.
Instead wait all other 3 hosts, one slot will be availabe and fourfth host proceed.

In any case, for my opinion and expertise, awx need to be considered as “orchestration” only for not much complex scenario.
For flow that require a lot of complex strategy, probably is not the simplest tool and require a lot of :brain: to elaborate ad orchestrate everything.

Good luck

2 Likes

until can’t be used on include_tasks, but that doesn’t mean you can’t redo include_tasks until a condition is true. You just have to use tail recursion.

test.yml:

- hosts: localhost
  gather_facts: false
  tasks:
    - ansible.builtin.set_fact:
        queued_tasks: "{{ range(0, 45, 5) | zip(range(45, 0, -5)) | flatten }}"
        queue_start: "{{ lookup('pipe', 'date +%s') | int }}"

    - ansible.builtin.include_tasks:
        file: tq.yml

tq.yml:

- when: (running_tasks | default([]) | length < 3) and (queued_tasks | length > 0)
  block:
    - name: Launch a new task
      ansible.builtin.command: sleep {{ queued_tasks[0] }}
      async: 60
      poll: 0
      register: result

    - ansible.builtin.debug:
        msg: Launched {{ queued_tasks[0] }} at {{ (lookup('pipe', 'date +%s') | int) - queue_start }}

    - name: Move launched task between queues
      ansible.builtin.set_fact:
        queued_tasks: "{{ queued_tasks[1:] }}"
        running_tasks: "{{ running_tasks | default([]) | union([result.ansible_job_id]) }}"

# consider adding a sleep here to make the loop less busy
# - ansible.builtin.wait_for:
#     timeout: 5
#   when: running_tasks | length >= 3

- name: Check on tasks
  ansible.builtin.async_status:
    jid: "{{ item }}"
  register: result
  loop: "{{ running_tasks }}"

- name: Remove finished tasks from check queue
  ansible.builtin.set_fact:
    running_tasks: "{{ result.results | rejectattr('finished') | map(attribute='item') }}"

- name: Run again if needed
  ansible.builtin.include_tasks:
    file: tq.yml
  when: (queued_tasks | length > 0) or (running_tasks | length > 0)

Output:

$ ansible-playbook test.yml | grep Launched
    msg: Launched 0 at 1
    msg: Launched 45 at 2
    msg: Launched 5 at 3
    msg: Launched 40 at 5
    msg: Launched 10 at 10
    msg: Launched 35 at 24
    msg: Launched 15 at 48
    msg: Launched 30 at 50
    msg: Launched 20 at 62
    msg: Launched 25 at 65
    msg: Launched 25 at 85
    msg: Launched 20 at 89
    msg: Launched 30 at 95
    msg: Launched 15 at 112
    msg: Launched 35 at 115
    msg: Launched 10 at 128
    msg: Launched 40 at 131
    msg: Launched 5 at 143
4 Likes

Alas, this tail recursion is not optimized. It still blows the stack after ~250 iterations. That may be okay in some cases, but it’s orders of magnitude below what we need.

This is one of those cases where perhaps Ansible isn’t the right tool for the job.
Thanks anyway.

Hi @utoddl,

Why you don’t adopt a solution taht run outside awx and use it to run interna job?
Awx have api, cli, python library, so you can have a very simple python script with a cron or “while true” that is more performante and more stable than awx job.

What I mean with stabile is that a. In my experience, awx is not the best use case for long, very long run. Moreover for rcursive and/or infinite loop.

This topic was automatically closed 30 days after the last reply. New replies are no longer allowed.