Workflow

Developer guide for creating and modifying workflows via the Python SDK.

Client

import labelbox as lb
client = lb.Client(api_key="<YOUR_API_KEY>")

Fundamentals

📘

Preview feature

Workflow management is a preview feature.

Workflows are connected to the Project class and are generated automatically during project creation. Workflows help organize and control the flow of labeling tasks through different stages.

Key concepts:

  • Workflows are composed of nodes and edges.
  • Each node can have only one input connection, except when both Initial labeling task and Rework (All Rejected) nodes serve as inputs to a single downstream node.
  • No changes are pushed to the platform until you call update_config().
  • All nodes must be connected for the workflow to be valid.

Access a workflow

workflow = project.get_workflow()

Clone a workflow from a different project

project_source_id = "<project_source_id>"
project_target_id = "<project_target_id>"
project_source = client.get_project(project_source_id)
project_target = client.get_project(project_target_id)

project_target.clone_workflow_from(project_source.uid)

Reset a workflow

Use reset_to_initial_nodes() to only have the InitialLabeling and InitialRework nodes and start from scratch.
The method will return an object InitialNodes with two attributes labeling and rework.
You can also pass parameters to configure the initial nodes.


# Get initial nodes with basic status
initial_nodes = workflow.reset_to_initial_nodes()

initial_labeling = initial_nodes.labeling
initial_rework = initial_nodes.rework

# Configure initial nodes
# All parameters are optional 
from labelbox.schema.workflow.config import LabelingConfig, ReworkConfig
from labelbox.schema.workflow.enums import IndividualAssignment

initial_nodes = workflow.reset_to_initial_nodes(
    labeling_config=LabelingConfig(
        instructions="Instructions for labeling",
        max_contributions_per_user=10
    ),
    rework_config=ReworkConfig(
        instructions="Instructions for reworking",
        individual_assignment=[IndividualAssignment.LabelCreator],
        max_contributions_per_user=5
    )
)

Commit changes

To push changes made to a workflow, you must use update_config().
In case of validation issue, it will throw a ValueError exception.

# Commit changes without changing node locations
workflow.update_config()

# Commit changes and attempt to realign nodes
workflow.update_config(reposition=True)

Add a node

Types of nodes are accessible through the enum NodeType:

Initial nodes:

  • NodeType.InitialLabeling - Entry point for new labeling tasks
  • NodeType.InitialRework - Entry point for tasks that need to be reworked

Step nodes:

  • NodeType.Review - Review completed labels
  • NodeType.Logic - Apply filters to route tasks conditionally
  • NodeType.CustomRework - Custom rework step with configurable settings

Terminal nodes:

  • NodeType.Done - Marks tasks as completed
  • NodeType.Rework - Sends tasks back to the rework queue
  • NodeType.CustomRework can be used as a terminal node or be connected to another node.
from labelbox.schema.workflow import NodeType

review_node = workflow.add_node(type=NodeType.Review)

Delete nodes

This automatically removes connected edges.
Initial nodes can't be deleted.

# Get nodes to delete
nodes_to_delete = [
    node
    for node in workflow.get_nodes()
    if node.name == "NodeToDelete"
]

workflow.delete_nodes(nodes_to_delete)

Add an edge

Edges connect the output of a source node to the input of a target node. All nodes must be connected in the workflow. The output of the CustomRework node is optional.

Types of outputs are listed in the enum NodeOutput:

  • NodeOutput.If (default value, can be omitted)
  • NodeOutput.Else
  • NodeOutput.Approved
  • NodeOutput.Rejected

Outputs per node

NodeAvailable Outputs
InitialLabelingNodeOutput.If
InitialReworkNodeOutput.If
ReviewNodeOutput.Approved, NodeOutput.Rejected
LogicNodeOutput.If, NodeOutput.Else
CustomReworkOptional NodeOutput.If
DoneNone (terminal node)
ReworkNone (terminal node)
from labelbox.schema.workflow import NodeOutput

# Connect nodes with appropriate outputs
workflow.add_edge(initial_labeling, initial_review) # Default NodeOutput.If
workflow.add_edge(initial_rework, initial_review)
workflow.add_edge(initial_review, logic, NodeOutput.Approved)
workflow.add_edge(initial_review, rework_node, NodeOutput.Rejected)
workflow.add_edge(logic, done, NodeOutput.If) # NodeOutput.If can be omitted
workflow.add_edge(logic, custom_rework_1, NodeOutput.Else)

Node attributes

The following attributes can be configured for each node type:

NodeConfigurable Attributes
InitialLabelinginstructions, max_contributions_per_user
InitialReworkinstructions, individual_assignment, max_contributions_per_user
Reviewinstructions, group_assignment, max_contributions_per_user
Logicname, match_filters, filters
CustomReworkname, instructions, group_assignment, individual_assignment, max_contributions_per_user
Donename
Reworkname

Common attributes:

  • max_contributions_per_user: Positive integer representing the maximum number of labels per task queue (empty for no limit)
  • instructions: Custom instructions for labelers working on this node
  • group_assignment: List of user group IDs assigned to this node
  • individual_assignment: Individual assignment strategy (you can use the IndividualAssignment enum)

Logic node

The Logic node contains filters that determine how tasks flow through the workflow. The match_filters attribute controls how multiple filters are evaluated:

  • MatchFilters.Any: Match any of the filters (OR logic)
  • MatchFilters.All: Match all of the filters (AND logic)

Available filters

Each filter type can be used at most once per Logic node.
The enum FilterField provides available filters for search or deletion:

  • FilterField.Annotation
  • FilterField.Batch
  • FilterField.ConsensusAverage
  • FilterField.Dataset
  • FilterField.FeatureConsensusAverage
  • FilterField.IssueCategory
  • FilterField.LabelingTime
  • FilterField.LabeledAt
  • FilterField.LabeledBy
  • FilterField.Metadata
  • FilterField.ModelPrediction
  • FilterField.NlSearch
  • FilterField.ReviewTime
  • FilterField.Sample

annotation

Filter by the presence of specific annotations. schema_node_ids is a list of schema node IDs that correspond to tools or classifications defined in the project's ontology schema.

Operators: None (direct list filter)

from labelbox.schema.workflow import annotation

# Using named parameter
annotation(schema_node_ids=["<schema_node_id>"])

# Using positional parameter
annotation(["<schema_node_id>"])

batch

Filter by batch membership.

Operators:

  • is_one_of
  • is_not_one_of
from labelbox.schema.workflow import batch

# Using named parameter
batch.is_one_of(values=["<batch_id>"])

# Using positional parameter
batch.is_one_of(["<batch_id>"])

consensus_average

Filter by overall consensus score.

Operators: None (range filter with min/max)

from labelbox.schema.workflow import consensus_average

# Using named parameters
consensus_average(min=0.17, max=0.61)

# Using positional parameters
consensus_average(0.17, 0.61)

dataset

Filter by dataset membership.

Operators: None (direct list filter)

from labelbox.schema.workflow import dataset

# Using named parameter
dataset(dataset_ids=["<dataset_id>"])

# Using positional parameter
dataset(["<dataset_id>"])

feature_consensus_average

Filter by consensus score for specific features. annotations is a list of schema node IDs that correspond to tools or classifications defined in the project's ontology schema.

Operators: None (range filter with min/max and annotation list)

from labelbox.schema.workflow import feature_consensus_average

# Using named parameters
feature_consensus_average(min=0.17, max=0.67, annotations=["<schema_node_id>"])

# Using positional parameters
feature_consensus_average(0.17, 0.67, ["<schema_node_id>"])

issue_category

Filter by issue categories flagged during review.

Operators:

  • is_one_of
from labelbox.schema.workflow import issue_category

# Using named parameter
issue_category.is_one_of(category_ids=["<issue_category_id>"])

# Using positional parameter
issue_category.is_one_off(["<issue_category_id>"])

labeled_at

Filter by when the label was created.

Operators:

  • between
from labelbox.schema.workflow import labeled_at
from datetime import datetime

# Using named parameters
labeled_at.between(
    start=datetime(2024, 3, 9, 5, 5, 42), 
    end=datetime(2025, 4, 28, 13, 5, 42)
)

# Using positional parameters
labeled_at.between(
    datetime(2024, 3, 9, 5, 5, 42), 
    datetime(2025, 4, 28, 13, 5, 42)
)

labeled_by

Filter by the user who created the label.

Operators:

  • is_one_of
from labelbox.schema.workflow import labeled_by

# Using named parameter
labeled_by.is_one_of(user_ids=["<user_1_id>", "<user_2_id>"])

# Using positional parameter
labeled_by.is_one_of(["<user_1_id>", "<user_2_id>"])

labeling_time

Filter by how long it took to create the label. The value represents a number of seconds.

Operators:

  • greater_than
  • less_than
  • greater_than_or_equal
  • less_than_or_equal
  • between
from labelbox.schema.workflow import labeling_time

# Using named parameter
labeling_time.greater_than(seconds=1000)

# Using positional parameter
labeling_time.greater_than(1000)

metadata

Filter by data row metadata values.
Each condition (m_condition) takes a metadata schema ID (key) and list of strings as parameters (value).

Operators:

  • contains
  • starts_with
  • ends_with
  • does_not_contain
  • is_any
  • is_not_any
from labelbox.schema.workflow import metadata, m_condition

# Using named parameters
metadata(conditions=[m_condition.contains(key="<metadata_schema_id>", value=["test"])])

# Using positional parameters
metadata([m_condition.contains("<metadata_schema_id>", ["test"])])

model_prediction

Filter by model predictions. Model predictions use a list of conditions named mp_condition.
Each condition takes a list of model IDs, followed by an integer for the minimum score (min_score) and an integer for the optional maxim score (max_score).
is_none takes precedence over other operators.

Operators:

  • is_one_of
  • is_not_one_of
  • is_none
from labelbox.schema.workflow import model_prediction, mp_condition

# Using named parameter
model_prediction(conditions=[
    mp_condition.is_one_of(models=["<model_id>"], min_score=1),
    mp_condition.is_not_one_of(models=["<model_id>"], min_score=2, max_score=6),
    mp_condition.is_none()
])

# Using positional parameter
model_prediction([
    mp_condition.is_one_of(["<model_id>"], 1),
    mp_condition.is_not_one_of(["<model_id>"], 2, 6),
    mp_condition.is_none()
])

natural_language

Filter using semantic search. The content (or prompt) follows this format:
"Find this / more of this / not this / bias_value"
where bias_value is a decimal between 0 and 1.

Operators: None (semantic search with score range)

from labelbox.schema.workflow import natural_language

# Using named parameters
natural_language(
    content="Birds in the sky/Blue sky/clouds/0.5", 
    min_score=0.178, 
    max_score=0.768
)

# Using positional parameters
natural_language("Birds in the sky/Blue sky/clouds/0.5", 0.178, 0.768)

review_time

Filter by how long it took to review the label.

Operators:

  • greater_than
  • less_than
  • greater_than_or_equal
  • less_than_or_equal
  • between
from labelbox.schema.workflow import review_time

# Using named parameter
review_time.less_than_or_equal(seconds=100)

# Using positional parameter
review_time.less_than_or_equal(100)

sample

Filter by percentage sampling. The percentage is entered as an integer.

Operators: None (percentage value)

from labelbox.schema.workflow import sample

# Using named parameter
sample(percentage=23)

# Using positional parameter
sample(23)

Managing filters on Logic nodes

from labelbox.schema.workflow.enums import WorkflowDefinitionId, FilterField
from labelbox.schema.workflow import mp_condition, model_prediction

workflow = project.get_workflow()

# Get the Logic node
logic = next(
    node for node in workflow.get_nodes()
    if node.definition_id == WorkflowDefinitionId.Logic
)
# Alternative: get by node ID
# logic = workflow.get_node_by_id("0359113a-6081-4f48-83d1-175062a0259b")

# Remove a filter based on its type 
logic.remove_filter(FilterField.ModelPrediction)

# Add a filter
logic.add_filter(
    model_prediction([
        mp_condition.is_none()
    ])
)

# Apply changes
workflow.update_config()

Example: Create a minimal workflow

The following creates a basic workflow with three nodes:

  • Initial labeling task
  • Rework (all rejected)
  • Done
import labelbox as lb
from labelbox.schema.workflow import NodeType

# Initialize client and project
client = lb.Client(api_key="<YOUR_API_KEY>")
project_id = "<project_id>"
project = client.get_project(project_id)

# Get workflow
workflow = project.get_workflow()

# Create nodes
initial_nodes = workflow.reset_to_initial_nodes()
done = workflow.add_node(type=NodeType.Done)

# Connect nodes
workflow.add_edge(initial_nodes.labeling, done)
workflow.add_edge(initial_nodes.rework, done)


# Validate and commit changes
try:
    updated_workflow = workflow.update_config(reposition=True)
    print("Workflow updated successfully!")
except ValueError as e:
    print(f"Validation failed: {e}")

Example: Complete workflow showcase

The following example demonstrates all node types and filter options:

import labelbox as lb
from labelbox.schema.workflow import (
    NodeType, 
    NodeOutput, 
    ProjectWorkflowFilter,
    LabelingConfig,
    ReworkConfig,
    labeled_by,
    metadata,
    sample,
    labeled_at,
    mp_condition,
    m_condition,
    labeling_time,
    review_time,
    issue_category,
    batch,
    dataset,
    annotation,
    consensus_average,
    model_prediction,
    natural_language,
    feature_consensus_average
)
from labelbox.schema.workflow.enums import IndividualAssignment, MatchFilters
from datetime import datetime

# Initialize client and project
client = lb.Client(api_key="<YOUR_API_KEY>")
project_id = "<project_id>"
project = client.get_project(project_id)

# Get workflow
workflow = project.get_workflow()

# Reset config to start a fresh workflow
initial_nodes = workflow.reset_to_initial_nodes(
    labeling_config=LabelingConfig(
        instructions="This is the entry point",
        max_contributions_per_user=10
    ),
    rework_config=ReworkConfig(
        individual_assignment=[IndividualAssignment.LabelCreator]
    )
)
initial_labeling = initial_nodes.labeling
initial_rework = initial_nodes.rework

initial_review = workflow.add_node(
    type=NodeType.Review,
    name="Initial review task",
    group_assignment=["<user_group_id_1>", "<user_group_id_2>"]
)

logic = workflow.add_node(
    type=NodeType.Logic,
    name="Logic node",
    match_filters=MatchFilters.Any,
    filters=ProjectWorkflowFilter([
        labeled_by.is_one_of(["<user_id_1>", "<user_id_2>", "<user_id_3>"]),
        metadata([m_condition.contains("<metadata_schema_id>", ["test"])]),
        sample(23),
        labeled_at.between(
            datetime(2024, 3, 9, 5, 5, 42), 
            datetime(2025, 4, 28, 13, 5, 42)
        ),
        labeling_time.greater_than(1000),
        review_time.less_than_or_equal(100),
        issue_category.is_one_of(["<issue_category_id>"]),
        batch.is_one_of(["<batch_id>"]),
        dataset.is_one_of(["<dataset_id>"]),
        annotation.is_one_of(["<schema_node_id>"]),
        consensus_average(0.17, 0.61),
        model_prediction([
            mp_condition.is_one_of(["<model_id_1>"], 1),
            mp_condition.is_not_one_of(["<model_id_2>"], 2, 6),
            mp_condition.is_none()
        ]),
        natural_language("Birds in the sky/Blue sky/clouds/0.5", 0.178, 0.768),
        feature_consensus_average(0.17, 0.67, ["<schema_node_id>"])
    ])
)

# Terminal and step nodes
done = workflow.add_node(type=NodeType.Done)
rework = workflow.add_node(type=NodeType.Rework, name="To rework")

custom_rework_1 = workflow.add_node(
    type=NodeType.CustomRework,
    name="Custom Rework 1",
    individual_assignment=IndividualAssignment.LabelCreator,
    group_assignment=["<user_group_id_1>", "<user_group_id_2>"]
)

review_2 = workflow.add_node(
    type=NodeType.Review,
    name="Review 2"
)

custom_rework_2 = workflow.add_node(
    type=NodeType.CustomRework,
    name="Custom Rework 2",
    instructions="Additional rework instructions"
)

done_2 = workflow.add_node(
    type=NodeType.Done,
    name="Ready for final review"
)

# Create edges between nodes
workflow.add_edge(initial_labeling, initial_review)
workflow.add_edge(initial_rework, initial_review)
workflow.add_edge(initial_review, logic, NodeOutput.Approved)
workflow.add_edge(initial_review, rework, NodeOutput.Rejected)
workflow.add_edge(logic, review_2, NodeOutput.If)
workflow.add_edge(logic, custom_rework_1, NodeOutput.Else)
workflow.add_edge(review_2, done, NodeOutput.Approved)
workflow.add_edge(review_2, custom_rework_2, NodeOutput.Rejected)
workflow.add_edge(custom_rework_2, done_2)

# Commit the workflow
try:
    updated_workflow = workflow.update_config(reposition=True)
    print("Workflow updated successfully!")
except ValueError as e:
    print(f"Validation failed: {e}")