Examples

Simple

Whole worfklow is build from tasks and connections between them.

Tasks are functions with task() decorator and connection between tasks is defined by followed_by() decorator. First argument of followed_by() decorator is name of next tasks, that should be executed when current task is finished.

Tasks names are intentionally strings so you don’t need to care about imports or order of declarations in file. But that is not requirement, followed_by() also accept other tasks (function decorated with task()).

import wfepy as wf


@wf.task()
@wf.start_point()
@wf.followed_by('make_coffee')
def start(ctx):
    # All tasks must return True or False if they were finished or waiting for
    # some external event or something and must be executed again later.
    return True


@wf.task()
@wf.followed_by('drink_coffee')
def make_coffee(ctx):
    return True


@wf.task()
@wf.followed_by('end')
def drink_coffee(ctx):
    import random
    if not random.choice([True, False]):
        # Still drinking. Returing False means this task was not completed and
        # must be executed again on next run.
        return False
    return True


@wf.task()
@wf.end_point()
def end(ctx):
    return True

Workflow can be converted to graph. Nice to have in documentation or for debugging purposes. Even this workflow is pretty simple, real-world workflow can be complex with lot of tasks declared across many files, with conditional branches, …

digraph Workflow {
	drink_coffee [fillcolor=white style="solid,filled"]
	drink_coffee -> end
	end [fillcolor=red style="bold,filled"]
	make_coffee [fillcolor=white style="solid,filled"]
	make_coffee -> drink_coffee
	start [fillcolor=green style="bold,filled"]
	start -> make_coffee
}

Finally, workflow can be executed. Example script that will execute workflow from example above.

import logging
import wfepy
import wfepy.utils

logging.basicConfig(level=logging.INFO)


# Import module with tasks.
import simple


# Create new workflow.
wf = wfepy.Workflow()
# Load tasks from module and add them to workflow.
wf.load_tasks(simple)
# Check if graph is OK, all tasks are defined, decorated correctly, ...
wf.check_graph()

# Render graph.
wfepy.utils.render_graph(wf, 'basic.gv')

# Create runner for workflow.
runner = wf.create_runner()

# Execute workflow.
runner.run()

# Check if workflow finished, no tasks are waiting.
while not runner.finished:
    logging.info('Workflow is not finished, trying run it again...')
    runner.run()

Output from script

INFO:wfepy.workflow:Executing task start
INFO:wfepy.workflow:Task start is complete
INFO:wfepy.workflow:Executing task make_coffee
INFO:wfepy.workflow:Task make_coffee is complete
INFO:wfepy.workflow:Executing task drink_coffee
INFO:wfepy.workflow:Task drink_coffee is waiting

INFO:root:Workflow is not finished, trying run it again...
INFO:wfepy.workflow:Executing task drink_coffee
INFO:wfepy.workflow:Task drink_coffee is waiting

INFO:root:Workflow is not finished, trying run it again...
INFO:wfepy.workflow:Executing task drink_coffee
INFO:wfepy.workflow:Task drink_coffee is complete
INFO:wfepy.workflow:Executing task end
INFO:wfepy.workflow:Task end is complete
INFO:wfepy.workflow:Reached end point end

Task drink_coffee was waiting for something and no other tasks could be executed, so process stopped.

Waiting tasks are tasks that returned False while finished tasks must return True. This allow implement waiting for events, for example when user must add comment to Jira task before process can continue.

Branches

Task can be also followed by multiple tasks so process will be executing multiple task branches in parallel. Task are not executed in parallel by threads or processes but it still can be used to execute as much as possible tasks if task in one branch is waiting.

Looking at coffee drinking example, you can do some other things while waiting until coffee and while drinking.

import random
import wfepy as wf


@wf.task()
@wf.start_point()
@wf.followed_by('make_coffee')
@wf.followed_by('check_reddit')
def start(ctx):
    return True


@wf.task()
@wf.followed_by('drink_coffee')
def make_coffee(ctx):
    return True


@wf.task()
@wf.followed_by('write_some_code')
def check_reddit(ctx):
    return True


@wf.task()
@wf.followed_by('end')
def write_some_code(ctx):
    return random.choice([True, False])


@wf.task()
@wf.followed_by('end')
def drink_coffee(ctx):
    return random.choice([True, False])


@wf.task()
@wf.join_point()
@wf.end_point()
def end(ctx):
    return True

Task start has multiple followed_by decorations so when this task finish, process will expand followed by list and start executing tasks from both branches. In the end of workflow branches are joined in end task. Join points must be explicitly marked by join_point decorator to avoid mistakes.

If you forgot to mark join point (or start point or end point) wfepy.Workflow.check_graph() will raise error and you should fix it.

digraph Workflow {
	check_reddit [fillcolor=white style="solid,filled"]
	check_reddit -> write_some_code
	drink_coffee [fillcolor=white style="solid,filled"]
	drink_coffee -> end
	end [fillcolor=red style="bold,filled"]
	make_coffee [fillcolor=white style="solid,filled"]
	make_coffee -> drink_coffee
	start [fillcolor=green style="bold,filled"]
	start -> check_reddit
	start -> make_coffee
	write_some_code [fillcolor=white style="solid,filled"]
	write_some_code -> end
}