Groups
It's common to need to start multiple tasks in a loop - for example:
import coflux as cf
@cf.task()
def my_task(i: int):
...
@cf.workflow()
def my_workflow(n: int):
for i in range(n):
my_task(i)
But starting a large number of tasks can make it difficult to navigate the graph in the UI - especially when those tasks are themselves starting other tasks. To make the graphs easier to navigate, Coflux has the concept of task groups.
A group can be created using a context manager:
@cf.workflow()
def my_workflow(n: int):
with cf.group("My tasks"): # ←
for i in range(n):
my_task(i)
Now all of the steps will be assigned to the group. In the UI only one step from the group will be displayed at a time.

The name passed to cf.group(...)
is optional, and simply serves as a way to label the group in the UI.
Note that steps can be run in parallel by 'submitting' them :
@cf.workflow()
def my_workflow(n: int):
with cf.group("My tasks"):
for i in range(n):
my_task.submit(i) # ←
See the concurrency page for more details.
Map-reduce
A map-reduce-style pattern can be used to split work up to be processed by separate agents, and then combining the results together.
For example:
@cf.workflow()
def wordcount_workflow(n: int = 10, k: int = 5):
with cf.group("Process chapters"):
executions = [process_chapter.submit(i) for i in range(n)]
chapter_counts = [e.result() for e in executions]
merged = merge_counters(chapter_counts)
return Counter(merged).most_common(k)
Heterogeneous groups
Tasks that are called within a group don't need to be the same:
with cf.group("My tasks"):
for i in range(n):
if n % 2 == 0:
even_task(i)
else:
odd_task(i)
Multiple groups
Multiple groups can be defined within a task, and tasks can be called outside of a group:
@cf.workflow()
def my_workflow():
with cf.group("First group"):
first_task()
with cf.group("Second group"):
second_task()
third_task()