API

class wfepy.Workflow

Workflow graph - collection of tasks.

Variables:task – collection of tasks, dict with tasks name as key
check_graph()

Check workflow graph - if some task is missing, all task are marked properly as start, join or end points, …

Raises:WorkflowError – when there are some problems with workflow graph
create_runner(*args, **kwargs)

Create Runner from this workflow.

end_points

List of names of tasks that are marked as end points.

load_tasks(module)

Load tasks from module and add them to workflow graph. Can be also module name, then module will be get from sys.module by that name.

Raises:WorkflowError – if name of loaded task is not unique
start_points

List of names of tasks that are marked as start points.

class wfepy.Runner(workflow, context=None)

Workflow execution engine.

Variables:
  • workflowWorkflow
  • context – arbitrary user object, passed to all tasks
  • state – state of execution
dump(file_path)

Dump runner to file. Stored dump contains context and state so runner execution can be restored and finished later.

finished

Workflow execution finished. True when reached end points and there is no task that should be executed.

load(file_path)

Load runner from file. See also dump().

run()

Execute tasks from workflow.

Some tasks might end in state in which they cannot be executed (waiting for external event or join point waiting for preceding tasks). If there is no task that can be executed run will stop executing and finished property will be False. In that case run should be called again (with some delay or runner can be dumped to file by dump() and executed later).

See TaskState for list of task states.

task_execute(task)

Execute Task.

transition_eval(transition)

Evauluate Transition.cond.

class wfepy.Task(func, name=NOTHING, labels=NOTHING)

Workflow task. Wraps function for use in workflow.

Wrapped function must accept context from Runner via only parameter and should return True or False whether task was completed and execution can continue with following tasks.

If wrapped function returned False execution will stop and task will be executed again in next run. This way can be implemented waiting, eg. for external event.

Variables:
  • function – wrapped function
  • name – task name (by default function name)
  • labels – task labels
  • followed_by – connection to next tasks (set of Transition)
  • preceded_by – names of preceding tasks, generated by Workflow
  • is_start_point – task is start point of workflow
  • is_join_point – task is join point of multiple tasks
  • is_end_point – task is end point of workflow
has_labels(labels, reducer=<built-in function any>)

Check if task has labels.

Reducer is used to reduce multi-labels check to single boolean value. all checks if task have all labels, any checks if task has at least one of labels.

class wfepy.TaskState

Enumeration of task states.

Variables:
  • NEW – task new in queue
  • WAITING – task is waiting, function returned False
  • BLOCKED – task is waiting for completion of preceding tasks
  • READY – task is ready for execution
  • COMPLETE – task was executed and will be expanded
  • CANCELED – task was not executed because transition condition was not met
digraph TaskState {
    _start [label="" style=invis]
    _end [label="" style=invis]
    _start_canceled [label="" style=invis]
    _end_canceled [label="" style=invis]

    { rank=same; _start, _start_canceled }
    { rank=same; _end, _end_canceled }

    { rank=same; COMPLETE, CANCELED }
    { rank=same; READY, WAITING, BLOCKED }

    _start -> NEW -> READY -> COMPLETE -> _end
    _start_canceled -> CANCELED -> _end_canceled

    READY -> WAITING [label="executed but not done\n(task returned False)"]
    WAITING -> READY [label="rescheduled\non next run"]

    NEW -> BLOCKED [label="task is join point"]
    BLOCKED -> READY [label="preceeding tasks finished"]
}
class wfepy.Transition(dest, cond=None)

Transition to following task.

Variables:
  • dest – name of following task
  • cond – condition whether following task should be executed, function that will receive context from Runner and must return bool (allows to create conditional branching and looping in graph)
class wfepy.WorkflowError

Generic workflow error.

Decorators

class wfepy.DecoratorStack(function, decorator_list=NOTHING)

Utility to collect function decorators and execute them in reverse order at once.

classmethod add(decorator)

Create decorator function that will create DecoratorStack using create() and add decorator to list of decorators.

add_decorator(decorator)

Add decorator to stack.

apply_to(func)

Apply decorators to func and return new func created by chain of decorators.

Return value of each function is used as argument of next function and first function will receive func as argument.

classmethod create(func)

Create new DecoratorStack from function or other stack.

classmethod reduce(decorator)

Create decorator function that will create DecoratorStack using create(), add decorator to list of decorators and apply decorators from stack to decorated function.

wfepy.task(*args, **kwargs)

Decorator to mark function as workflow task. See Task for arguments documentation.

wfepy.followed_by(*args, **kwargs)

Add transition to next task. See Transition for arguments documentation.

wfepy.start_point()

Mark task as start point. See Task.

wfepy.join_point()

Mark task as join point. See Task.

wfepy.end_point()

Mark task as end point. See Task.