Developer guide for creating and modifying workflows via the Python SDK.
Client
import labelbox as lb
client = lb.Client(api_key="<YOUR_API_KEY>")
Fundamentals
Preview feature
Workflow management is a preview feature.
Workflows are connected to the Project
class and are generated automatically during project creation. Workflows help organize and control the flow of labeling tasks through different stages.
Key concepts:
- Workflows are composed of nodes and edges.
- Each node can have only one input connection, except when both
Initial labeling task
andRework (All Rejected)
nodes serve as inputs to a single downstream node. - No changes are pushed to the platform until you call
update_config()
. - All nodes must be connected for the workflow to be valid.
Access a workflow
workflow = project.get_workflow()
Clone a workflow from a different project
project_source_id = "<project_source_id>"
project_target_id = "<project_target_id>"
project_source = client.get_project(project_source_id)
project_target = client.get_project(project_target_id)
project_target.clone_workflow_from(project_source.uid)
Reset a workflow
Use reset_to_initial_nodes()
to only have the InitialLabeling and InitialRework nodes and start from scratch.
The method will return an object InitialNodes
with two attributes labeling
and rework
.
You can also pass parameters to configure the initial nodes.
# Get initial nodes with basic status
initial_nodes = workflow.reset_to_initial_nodes()
initial_labeling = initial_nodes.labeling
initial_rework = initial_nodes.rework
# Configure initial nodes
# All parameters are optional
from labelbox.schema.workflow.config import LabelingConfig, ReworkConfig
from labelbox.schema.workflow.enums import IndividualAssignment
initial_nodes = workflow.reset_to_initial_nodes(
labeling_config=LabelingConfig(
instructions="Instructions for labeling",
max_contributions_per_user=10
),
rework_config=ReworkConfig(
instructions="Instructions for reworking",
individual_assignment=[IndividualAssignment.LabelCreator],
max_contributions_per_user=5
)
)
Commit changes
To push changes made to a workflow, you must use update_config()
.
In case of validation issue, it will throw a ValueError
exception.
# Commit changes without changing node locations
workflow.update_config()
# Commit changes and attempt to realign nodes
workflow.update_config(reposition=True)
Add a node
Types of nodes are accessible through the enum NodeType
:
Initial nodes:
NodeType.InitialLabeling
- Entry point for new labeling tasksNodeType.InitialRework
- Entry point for tasks that need to be reworked
Step nodes:
NodeType.Review
- Review completed labelsNodeType.Logic
- Apply filters to route tasks conditionallyNodeType.CustomRework
- Custom rework step with configurable settings
Terminal nodes:
NodeType.Done
- Marks tasks as completedNodeType.Rework
- Sends tasks back to the rework queueNodeType.CustomRework
can be used as a terminal node or be connected to another node.
from labelbox.schema.workflow import NodeType
review_node = workflow.add_node(type=NodeType.Review)
Delete nodes
This automatically removes connected edges.
Initial nodes can't be deleted.
# Get nodes to delete
nodes_to_delete = [
node
for node in workflow.get_nodes()
if node.name == "NodeToDelete"
]
workflow.delete_nodes(nodes_to_delete)
Add an edge
Edges connect the output of a source node to the input of a target node. All nodes must be connected in the workflow. The output of the CustomRework node is optional.
Types of outputs are listed in the enum NodeOutput
:
NodeOutput.If
(default value, can be omitted)NodeOutput.Else
NodeOutput.Approved
NodeOutput.Rejected
Outputs per node
Node | Available Outputs |
---|---|
InitialLabeling | NodeOutput.If |
InitialRework | NodeOutput.If |
Review | NodeOutput.Approved , NodeOutput.Rejected |
Logic | NodeOutput.If , NodeOutput.Else |
CustomRework | Optional NodeOutput.If |
Done | None (terminal node) |
Rework | None (terminal node) |
from labelbox.schema.workflow import NodeOutput
# Connect nodes with appropriate outputs
workflow.add_edge(initial_labeling, initial_review) # Default NodeOutput.If
workflow.add_edge(initial_rework, initial_review)
workflow.add_edge(initial_review, logic, NodeOutput.Approved)
workflow.add_edge(initial_review, rework_node, NodeOutput.Rejected)
workflow.add_edge(logic, done, NodeOutput.If) # NodeOutput.If can be omitted
workflow.add_edge(logic, custom_rework_1, NodeOutput.Else)
Node attributes
The following attributes can be configured for each node type:
Node | Configurable Attributes |
---|---|
InitialLabeling | instructions , max_contributions_per_user |
InitialRework | instructions , individual_assignment , max_contributions_per_user |
Review | instructions , group_assignment , max_contributions_per_user |
Logic | name , match_filters , filters |
CustomRework | name , instructions , group_assignment , individual_assignment , max_contributions_per_user |
Done | name |
Rework | name |
Common attributes:
max_contributions_per_user
: Positive integer representing the maximum number of labels per task queue (empty for no limit)instructions
: Custom instructions for labelers working on this nodegroup_assignment
: List of user group IDs assigned to this nodeindividual_assignment
: Individual assignment strategy (you can use theIndividualAssignment
enum)
Logic node
The Logic node contains filters that determine how tasks flow through the workflow. The match_filters
attribute controls how multiple filters are evaluated:
MatchFilters.Any
: Match any of the filters (OR logic)MatchFilters.All
: Match all of the filters (AND logic)
Available filters
Each filter type can be used at most once per Logic node.
The enum FilterField
provides available filters for search or deletion:
FilterField.Annotation
FilterField.Batch
FilterField.ConsensusAverage
FilterField.Dataset
FilterField.FeatureConsensusAverage
FilterField.IssueCategory
FilterField.LabelingTime
FilterField.LabeledAt
FilterField.LabeledBy
FilterField.Metadata
FilterField.ModelPrediction
FilterField.NlSearch
FilterField.ReviewTime
FilterField.Sample
annotation
Filter by the presence of specific annotations. schema_node_ids
is a list of schema node IDs that correspond to tools or classifications defined in the project's ontology schema.
Operators: None (direct list filter)
from labelbox.schema.workflow import annotation
# Using named parameter
annotation(schema_node_ids=["<schema_node_id>"])
# Using positional parameter
annotation(["<schema_node_id>"])
batch
Filter by batch membership.
Operators:
is_one_of
is_not_one_of
from labelbox.schema.workflow import batch
# Using named parameter
batch.is_one_of(values=["<batch_id>"])
# Using positional parameter
batch.is_one_of(["<batch_id>"])
consensus_average
Filter by overall consensus score.
Operators: None (range filter with min/max)
from labelbox.schema.workflow import consensus_average
# Using named parameters
consensus_average(min=0.17, max=0.61)
# Using positional parameters
consensus_average(0.17, 0.61)
dataset
Filter by dataset membership.
Operators: None (direct list filter)
from labelbox.schema.workflow import dataset
# Using named parameter
dataset(dataset_ids=["<dataset_id>"])
# Using positional parameter
dataset(["<dataset_id>"])
feature_consensus_average
Filter by consensus score for specific features. annotations
is a list of schema node IDs that correspond to tools or classifications defined in the project's ontology schema.
Operators: None (range filter with min/max and annotation list)
from labelbox.schema.workflow import feature_consensus_average
# Using named parameters
feature_consensus_average(min=0.17, max=0.67, annotations=["<schema_node_id>"])
# Using positional parameters
feature_consensus_average(0.17, 0.67, ["<schema_node_id>"])
issue_category
Filter by issue categories flagged during review.
Operators:
is_one_of
from labelbox.schema.workflow import issue_category
# Using named parameter
issue_category.is_one_of(category_ids=["<issue_category_id>"])
# Using positional parameter
issue_category.is_one_off(["<issue_category_id>"])
labeled_at
Filter by when the label was created.
Operators:
between
from labelbox.schema.workflow import labeled_at
from datetime import datetime
# Using named parameters
labeled_at.between(
start=datetime(2024, 3, 9, 5, 5, 42),
end=datetime(2025, 4, 28, 13, 5, 42)
)
# Using positional parameters
labeled_at.between(
datetime(2024, 3, 9, 5, 5, 42),
datetime(2025, 4, 28, 13, 5, 42)
)
labeled_by
Filter by the user who created the label.
Operators:
is_one_of
from labelbox.schema.workflow import labeled_by
# Using named parameter
labeled_by.is_one_of(user_ids=["<user_1_id>", "<user_2_id>"])
# Using positional parameter
labeled_by.is_one_of(["<user_1_id>", "<user_2_id>"])
labeling_time
Filter by how long it took to create the label. The value represents a number of seconds.
Operators:
greater_than
less_than
greater_than_or_equal
less_than_or_equal
between
from labelbox.schema.workflow import labeling_time
# Using named parameter
labeling_time.greater_than(seconds=1000)
# Using positional parameter
labeling_time.greater_than(1000)
metadata
Filter by data row metadata values.
Each condition (m_condition
) takes a metadata schema ID (key
) and list of strings as parameters (value
).
Operators:
contains
starts_with
ends_with
does_not_contain
is_any
is_not_any
from labelbox.schema.workflow import metadata, m_condition
# Using named parameters
metadata(conditions=[m_condition.contains(key="<metadata_schema_id>", value=["test"])])
# Using positional parameters
metadata([m_condition.contains("<metadata_schema_id>", ["test"])])
model_prediction
Filter by model predictions. Model predictions use a list of conditions named mp_condition
.
Each condition takes a list of model IDs, followed by an integer for the minimum score (min_score
) and an integer for the optional maxim score (max_score
).
is_none
takes precedence over other operators.
Operators:
is_one_of
is_not_one_of
is_none
from labelbox.schema.workflow import model_prediction, mp_condition
# Using named parameter
model_prediction(conditions=[
mp_condition.is_one_of(models=["<model_id>"], min_score=1),
mp_condition.is_not_one_of(models=["<model_id>"], min_score=2, max_score=6),
mp_condition.is_none()
])
# Using positional parameter
model_prediction([
mp_condition.is_one_of(["<model_id>"], 1),
mp_condition.is_not_one_of(["<model_id>"], 2, 6),
mp_condition.is_none()
])
natural_language
Filter using semantic search. The content
(or prompt) follows this format:
"Find this / more of this / not this / bias_value"
where bias_value
is a decimal between 0 and 1.
Operators: None (semantic search with score range)
from labelbox.schema.workflow import natural_language
# Using named parameters
natural_language(
content="Birds in the sky/Blue sky/clouds/0.5",
min_score=0.178,
max_score=0.768
)
# Using positional parameters
natural_language("Birds in the sky/Blue sky/clouds/0.5", 0.178, 0.768)
review_time
Filter by how long it took to review the label.
Operators:
greater_than
less_than
greater_than_or_equal
less_than_or_equal
between
from labelbox.schema.workflow import review_time
# Using named parameter
review_time.less_than_or_equal(seconds=100)
# Using positional parameter
review_time.less_than_or_equal(100)
sample
Filter by percentage sampling. The percentage is entered as an integer.
Operators: None (percentage value)
from labelbox.schema.workflow import sample
# Using named parameter
sample(percentage=23)
# Using positional parameter
sample(23)
Managing filters on Logic nodes
from labelbox.schema.workflow.enums import WorkflowDefinitionId, FilterField
from labelbox.schema.workflow import mp_condition, model_prediction
workflow = project.get_workflow()
# Get the Logic node
logic = next(
node for node in workflow.get_nodes()
if node.definition_id == WorkflowDefinitionId.Logic
)
# Alternative: get by node ID
# logic = workflow.get_node_by_id("0359113a-6081-4f48-83d1-175062a0259b")
# Remove a filter based on its type
logic.remove_filter(FilterField.ModelPrediction)
# Add a filter
logic.add_filter(
model_prediction([
mp_condition.is_none()
])
)
# Apply changes
workflow.update_config()
Example: Create a minimal workflow
The following creates a basic workflow with three nodes:
- Initial labeling task
- Rework (all rejected)
- Done
import labelbox as lb
from labelbox.schema.workflow import NodeType
# Initialize client and project
client = lb.Client(api_key="<YOUR_API_KEY>")
project_id = "<project_id>"
project = client.get_project(project_id)
# Get workflow
workflow = project.get_workflow()
# Create nodes
initial_nodes = workflow.reset_to_initial_nodes()
done = workflow.add_node(type=NodeType.Done)
# Connect nodes
workflow.add_edge(initial_nodes.labeling, done)
workflow.add_edge(initial_nodes.rework, done)
# Validate and commit changes
try:
updated_workflow = workflow.update_config(reposition=True)
print("Workflow updated successfully!")
except ValueError as e:
print(f"Validation failed: {e}")
Example: Complete workflow showcase
The following example demonstrates all node types and filter options:
import labelbox as lb
from labelbox.schema.workflow import (
NodeType,
NodeOutput,
ProjectWorkflowFilter,
LabelingConfig,
ReworkConfig,
labeled_by,
metadata,
sample,
labeled_at,
mp_condition,
m_condition,
labeling_time,
review_time,
issue_category,
batch,
dataset,
annotation,
consensus_average,
model_prediction,
natural_language,
feature_consensus_average
)
from labelbox.schema.workflow.enums import IndividualAssignment, MatchFilters
from datetime import datetime
# Initialize client and project
client = lb.Client(api_key="<YOUR_API_KEY>")
project_id = "<project_id>"
project = client.get_project(project_id)
# Get workflow
workflow = project.get_workflow()
# Reset config to start a fresh workflow
initial_nodes = workflow.reset_to_initial_nodes(
labeling_config=LabelingConfig(
instructions="This is the entry point",
max_contributions_per_user=10
),
rework_config=ReworkConfig(
individual_assignment=[IndividualAssignment.LabelCreator]
)
)
initial_labeling = initial_nodes.labeling
initial_rework = initial_nodes.rework
initial_review = workflow.add_node(
type=NodeType.Review,
name="Initial review task",
group_assignment=["<user_group_id_1>", "<user_group_id_2>"]
)
logic = workflow.add_node(
type=NodeType.Logic,
name="Logic node",
match_filters=MatchFilters.Any,
filters=ProjectWorkflowFilter([
labeled_by.is_one_of(["<user_id_1>", "<user_id_2>", "<user_id_3>"]),
metadata([m_condition.contains("<metadata_schema_id>", ["test"])]),
sample(23),
labeled_at.between(
datetime(2024, 3, 9, 5, 5, 42),
datetime(2025, 4, 28, 13, 5, 42)
),
labeling_time.greater_than(1000),
review_time.less_than_or_equal(100),
issue_category.is_one_of(["<issue_category_id>"]),
batch.is_one_of(["<batch_id>"]),
dataset.is_one_of(["<dataset_id>"]),
annotation.is_one_of(["<schema_node_id>"]),
consensus_average(0.17, 0.61),
model_prediction([
mp_condition.is_one_of(["<model_id_1>"], 1),
mp_condition.is_not_one_of(["<model_id_2>"], 2, 6),
mp_condition.is_none()
]),
natural_language("Birds in the sky/Blue sky/clouds/0.5", 0.178, 0.768),
feature_consensus_average(0.17, 0.67, ["<schema_node_id>"])
])
)
# Terminal and step nodes
done = workflow.add_node(type=NodeType.Done)
rework = workflow.add_node(type=NodeType.Rework, name="To rework")
custom_rework_1 = workflow.add_node(
type=NodeType.CustomRework,
name="Custom Rework 1",
individual_assignment=IndividualAssignment.LabelCreator,
group_assignment=["<user_group_id_1>", "<user_group_id_2>"]
)
review_2 = workflow.add_node(
type=NodeType.Review,
name="Review 2"
)
custom_rework_2 = workflow.add_node(
type=NodeType.CustomRework,
name="Custom Rework 2",
instructions="Additional rework instructions"
)
done_2 = workflow.add_node(
type=NodeType.Done,
name="Ready for final review"
)
# Create edges between nodes
workflow.add_edge(initial_labeling, initial_review)
workflow.add_edge(initial_rework, initial_review)
workflow.add_edge(initial_review, logic, NodeOutput.Approved)
workflow.add_edge(initial_review, rework, NodeOutput.Rejected)
workflow.add_edge(logic, review_2, NodeOutput.If)
workflow.add_edge(logic, custom_rework_1, NodeOutput.Else)
workflow.add_edge(review_2, done, NodeOutput.Approved)
workflow.add_edge(review_2, custom_rework_2, NodeOutput.Rejected)
workflow.add_edge(custom_rework_2, done_2)
# Commit the workflow
try:
updated_workflow = workflow.update_config(reposition=True)
print("Workflow updated successfully!")
except ValueError as e:
print(f"Validation failed: {e}")