Examples¶
Simple¶
Whole worfklow is build from tasks and connections between them.
Tasks are functions with task()
decorator and connection between tasks is
defined by followed_by()
decorator. First argument of followed_by()
decorator is name of next tasks, that should be executed when current task is
finished.
Tasks names are intentionally strings so you don’t need to care about imports or
order of declarations in file. But that is not requirement, followed_by()
also accept other tasks (function decorated with task()
).
import wfepy as wf
@wf.task()
@wf.start_point()
@wf.followed_by('make_coffee')
def start(ctx):
# All tasks must return True or False if they were finished or waiting for
# some external event or something and must be executed again later.
return True
@wf.task()
@wf.followed_by('drink_coffee')
def make_coffee(ctx):
return True
@wf.task()
@wf.followed_by('end')
def drink_coffee(ctx):
import random
if not random.choice([True, False]):
# Still drinking. Returing False means this task was not completed and
# must be executed again on next run.
return False
return True
@wf.task()
@wf.end_point()
def end(ctx):
return True
Workflow can be converted to graph. Nice to have in documentation or for debugging purposes. Even this workflow is pretty simple, real-world workflow can be complex with lot of tasks declared across many files, with conditional branches, …
![digraph Workflow {
drink_coffee [fillcolor=white style="solid,filled"]
drink_coffee -> end
end [fillcolor=red style="bold,filled"]
make_coffee [fillcolor=white style="solid,filled"]
make_coffee -> drink_coffee
start [fillcolor=green style="bold,filled"]
start -> make_coffee
}](_images/graphviz-61dcce8b23fe779dbdcbd9c68a177766948dce9a.png)
Finally, workflow can be executed. Example script that will execute workflow from example above.
import logging
import wfepy
import wfepy.utils
logging.basicConfig(level=logging.INFO)
# Import module with tasks.
import simple
# Create new workflow.
wf = wfepy.Workflow()
# Load tasks from module and add them to workflow.
wf.load_tasks(simple)
# Check if graph is OK, all tasks are defined, decorated correctly, ...
wf.check_graph()
# Render graph.
wfepy.utils.render_graph(wf, 'basic.gv')
# Create runner for workflow.
runner = wf.create_runner()
# Execute workflow.
runner.run()
# Check if workflow finished, no tasks are waiting.
while not runner.finished:
logging.info('Workflow is not finished, trying run it again...')
runner.run()
Output from script
INFO:wfepy.workflow:Executing task start
INFO:wfepy.workflow:Task start is complete
INFO:wfepy.workflow:Executing task make_coffee
INFO:wfepy.workflow:Task make_coffee is complete
INFO:wfepy.workflow:Executing task drink_coffee
INFO:wfepy.workflow:Task drink_coffee is waiting
INFO:root:Workflow is not finished, trying run it again...
INFO:wfepy.workflow:Executing task drink_coffee
INFO:wfepy.workflow:Task drink_coffee is waiting
INFO:root:Workflow is not finished, trying run it again...
INFO:wfepy.workflow:Executing task drink_coffee
INFO:wfepy.workflow:Task drink_coffee is complete
INFO:wfepy.workflow:Executing task end
INFO:wfepy.workflow:Task end is complete
INFO:wfepy.workflow:Reached end point end
Task drink_coffee
was waiting for something and no other tasks could be
executed, so process stopped.
Waiting tasks are tasks that returned False
while finished tasks must return
True
. This allow implement waiting for events, for example when user must
add comment to Jira task before process can continue.
Branches¶
Task can be also followed by multiple tasks so process will be executing multiple task branches in parallel. Task are not executed in parallel by threads or processes but it still can be used to execute as much as possible tasks if task in one branch is waiting.
Looking at coffee drinking example, you can do some other things while waiting until coffee and while drinking.
import random
import wfepy as wf
@wf.task()
@wf.start_point()
@wf.followed_by('make_coffee')
@wf.followed_by('check_reddit')
def start(ctx):
return True
@wf.task()
@wf.followed_by('drink_coffee')
def make_coffee(ctx):
return True
@wf.task()
@wf.followed_by('write_some_code')
def check_reddit(ctx):
return True
@wf.task()
@wf.followed_by('end')
def write_some_code(ctx):
return random.choice([True, False])
@wf.task()
@wf.followed_by('end')
def drink_coffee(ctx):
return random.choice([True, False])
@wf.task()
@wf.join_point()
@wf.end_point()
def end(ctx):
return True
Task start
has multiple followed_by
decorations so when this task
finish, process will expand followed by list and start executing tasks from both
branches. In the end of workflow branches are joined in end
task. Join
points must be explicitly marked by join_point
decorator to avoid mistakes.
If you forgot to mark join point (or start point or end point)
wfepy.Workflow.check_graph()
will raise error and you should fix it.
![digraph Workflow {
check_reddit [fillcolor=white style="solid,filled"]
check_reddit -> write_some_code
drink_coffee [fillcolor=white style="solid,filled"]
drink_coffee -> end
end [fillcolor=red style="bold,filled"]
make_coffee [fillcolor=white style="solid,filled"]
make_coffee -> drink_coffee
start [fillcolor=green style="bold,filled"]
start -> check_reddit
start -> make_coffee
write_some_code [fillcolor=white style="solid,filled"]
write_some_code -> end
}](_images/graphviz-ce2b1e736a797fcaf7bbd7bb92c18419127a825f.png)