> ## Documentation Index
> Fetch the complete documentation index at: https://docs.labelbox.com/llms.txt
> Use this file to discover all available pages before exploring further.

# Workflow

> Developer guide for creating and modifying workflows via the Python SDK.

# Client

```python theme={null}
import labelbox as lb
client = lb.Client(api_key="<YOUR_API_KEY>")
```

# Fundamentals

Workflows are connected to the `Project` class and are generated automatically during project creation. Workflows help organize and control the flow of labeling tasks through different stages.

Key concepts:

* Workflows are composed of **nodes** and **edges**.
* Each node can have only one input connection, except when both `Initial labeling task` and `Rework (All Rejected)` nodes serve as inputs to a single downstream node.
* No changes are pushed to the platform until you call `update_config()`.
* All nodes must be connected for the workflow to be valid.

## Access a workflow

```python theme={null}
project.get_workflow()
```

## Clone a workflow from a different project

```python theme={null}
project_source_id = "<project_source_id>"
project_target_id = "<project_target_id>"
project_source = client.get_project(project_source_id)
project_target = client.get_project(project_target_id)

project_target.clone_workflow_from(project_source.uid)
```

## Reset a workflow

Use `reset_to_initial_nodes()` to only have the InitialLabeling and InitialRework nodes and start from scratch. The method will return an object `InitialNodes` with two attributes `labeling` and `rework`. You can also pass parameters to configure the initial nodes.

```python theme={null}
# Get initial nodes with basic status
initial_nodes = workflow.reset_to_initial_nodes()

initial_labeling = initial_nodes.labeling
initial_rework = initial_nodes.rework

# Configure initial nodes
# All parameters are optional 
from labelbox.schema.workflow.config import LabelingConfig, ReworkConfig
from labelbox.schema.workflow.enums import IndividualAssignment

initial_nodes = workflow.reset_to_initial_nodes(
    labeling_config=LabelingConfig(
        instructions="Instructions for labeling",
        max_contributions_per_user=10
    ),
    rework_config=ReworkConfig(
        instructions="Instructions for reworking",
        individual_assignment=[IndividualAssignment.LabelCreator],
        max_contributions_per_user=5
    )
)
```

## Commit changes

To push changes made to a workflow, you must use `update_config()`. In case of validation issue, it will throw a `ValueError` exception.

```python theme={null}
# Commit changes without changing node locations
workflow.update_config()

# Commit changes and attempt to realign nodes
workflow.update_config(reposition=True)
```

# Nodes

## Add a node

Types of nodes are listed in the enum `NodeType`:

**Initial nodes:**

* `NodeType.InitialLabeling` - Entry point for new labeling tasks
* `NodeType.InitialRework` - Entry point for tasks that need to be reworked

**Step nodes:**

* `NodeType.Review` - Review completed labels
* `NodeType.Logic` - Apply filters to route tasks conditionally
* `NodeType.CustomRework` - Custom rework step with configurable settings

**Terminal nodes:**

* `NodeType.Done` - Marks tasks as completed
* `NodeType.Rework` - Sends tasks back to the rework queue
* `NodeType.CustomRework` can be used as a terminal node or be connected to another node.

`NodeType.CustomRework` can be used as a terminal node or be connected to another node.

```python theme={null}
from labelbox.schema.workflow import NodeType

review_node = workflow.add_node(type=NodeType.Review)
```

## Delete a node

This automatically removes connected edges. Initial nodes can't be deleted.

```python theme={null}
# Get nodes to delete
nodes_to_delete = [
    node
    for node in workflow.get_nodes()
    if node.name == "NodeToDelete"
]

workflow.delete_nodes(nodes_to_delete)
```

# Edges

## Add an edge

Edges connect the output of a source node to the input of a target node. All nodes must be connected in the workflow.

Types of outputs are listed in the enum `NodeOutput`:

* `NodeOutput.If` (default value, can be omitted)
* `NodeOutput.Else`
* `NodeOutput.Approved`
* `NodeOutput.Rejected`

### Outputs per node

| Node              | Available outputs                            |
| ----------------- | -------------------------------------------- |
| `InitialLabeling` | `NodeOutput.If`                              |
| `InitialRework`   | `NodeOutput.If`                              |
| `Review`          | `NodeOutput.Approved`, `NodeOutput.Rejected` |
| `Logic`           | `NodeOutput.If`, `NodeOutput.Else`           |
| `CustomRework`    | Optional `NodeOutput.If`                     |
| `Done`            |                                              |
| `Rework`          |                                              |

```python theme={null}
from labelbox.schema.workflow import NodeOutput

# Connect nodes with appropriate outputs
workflow.add_edge(initial_labeling, initial_review) # Default NodeOutput.If
workflow.add_edge(initial_rework, initial_review)
workflow.add_edge(initial_review, logic, NodeOutput.Approved)
workflow.add_edge(initial_review, rework_node, NodeOutput.Rejected)
workflow.add_edge(logic, done, NodeOutput.If) # NodeOutput.If can be omitted
workflow.add_edge(logic, custom_rework_1, NodeOutput.Else)
```

## Node attributes

The following attributes can be configured for each node type:

| **Node**          | **Configuratble attributes**                                                                      |
| :---------------- | :------------------------------------------------------------------------------------------------ |
| `InitialLabeling` | `instructions`, `max_contributions_per_user`                                                      |
| `InitialRework`   | `instructions`, `individual_assignment`, `max_contributions_per_user`                             |
| `Review`          | `instructions`, `group_assignment`, `max_contributions_per_user`                                  |
| `Logic`           | `name`, `match_filters`, `filters`                                                                |
| `CustomRework`    | `name`, `instructions`, `group_assignment`, `individual_assignment`, `max_contributions_per_user` |
| `Done`            | `name`                                                                                            |
| `Rework`          | `name`                                                                                            |

**Common attributes:**

* `max_contributions_per_user`: Positive integer representing the maximum number of labels per task queue (empty for no limit)
* `instructions`: Custom instructions for labelers working on this node
* `group_assignment`: List of user group IDs assigned to this node
* `individual_assignment`: Individual assignment strategy (you can use the `IndividualAssignment` enum)

# Filters

## Logic node

The Logic node contains filters that determine how tasks flow through the workflow. The `match_filters` attribute controls how multiple filters are evaluated:

* `MatchFilters.Any`: Match any of the filters (OR logic)
* `MatchFilters.All`: Match all of the filters (AND logic)

## Add and remove filters

```python theme={null}
from labelbox.schema.workflow.enums import WorkflowDefinitionId
from labelbox.schema.workflow.enums import FilterField

workflow = project.get_workflow()


logic = next(
    node for node in workflow.get_nodes()
    if node.definition_id == WorkflowDefinitionId.Logic
)
# logic = workflow.get_node_by_id("0359113a-6081-4f48-83d1-175062a0259b")

# Remove a filter based on its type 
logic.remove_filter(FilterField.ModelPrediction)

# Apply changes
#workflow.update_config()

# Add a filter
logic.add_filter(
    model_prediction([
        mp_condition.is_none()
    ])
)

# Apply changes
#workflow.update_config()
```

## Manage filters on Logic nodes

```python theme={null}
from labelbox.schema.workflow.enums import WorkflowDefinitionId, FilterField
from labelbox.schema.workflow import mp_condition, model_prediction

workflow = project.get_workflow()

# Get the Logic node
logic = next(
    node for node in workflow.get_nodes()
    if node.definition_id == WorkflowDefinitionId.Logic
)
# Alternative: get by node ID
# logic = workflow.get_node_by_id("0359113a-6081-4f48-83d1-175062a0259b")

# Remove a filter based on its type 
logic.remove_filter(FilterField.ModelPrediction)

# Add a filter
logic.add_filter(
    model_prediction([
        mp_condition.is_none()
    ])
)

# Apply changes
workflow.update_config()
```

## Available filters

Each filter type can be used at most once per Logic node. The enum `FilterField` provides available filters for search or deletion:

* `FilterField.Annotation`
* `FilterField.Batch`
* `FilterField.ConsensusAverage`
* `FilterField.Dataset`
* `FilterField.FeatureConsensusAverage`
* `FilterField.IssueCategory`
* `FilterField.LabelingTime`
* `FilterField.LabeledAt`
* `FilterField.LabeledBy`
* `FilterField.Metadata`
* `FilterField.ModelPrediction`
* `FilterField.NlSearch`
* `FilterField.ReviewTime`
* `FilterField.Sample`

### annotation

Filter by the presence of specific annotations. `schema_node_ids` is a list of schema node IDs that correspond to tools or classifications defined in the project's ontology schema.

**Operators:** None (direct list filter)

```python theme={null}
from labelbox.schema.workflow import annotation

# Using named parameter
annotation(schema_node_ids=["<schema_node_id>"])

# Using positional parameter
annotation(["<schema_node_id>"])
```

### batch

Filter by batch membership.

\*\*Operators: \*\*

* `is_one_of`
* `is_not_one_of`

```python theme={null}
from labelbox.schema.workflow import batch

# Using named parameter
batch.is_one_of(values=["<batch_id>"])

# Using positional parameter
batch.is_one_of(["<batch_id>"])
```

### consensus\_average

Filter by overall consensus score.

**Operators:** None (range filter with min/max)

```python theme={null}
from labelbox.schema.workflow import consensus_average

# Using named parameters
consensus_average(min=0.17, max=0.61)

# Using positional parameters
consensus_average(0.17, 0.61)
```

### dataset

Filter by dataset membership.

**Operators:** None (direct list filter)

```python theme={null}
from labelbox.schema.workflow import dataset

# Using named parameter
dataset(dataset_ids=["<dataset_id>"])

# Using positional parameter
dataset(["<dataset_id>"])
```

### feature\_consensus\_average

Filter by consensus score for specific features. `annotations` is a list of schema node IDs that correspond to tools or classifications defined in the project's ontology schema.

**Operators:** None (range filter with min/max and annotation list)

```python theme={null}
from labelbox.schema.workflow import feature_consensus_average

# Using named parameters
feature_consensus_average(min=0.17, max=0.67, annotations=["<schema_node_id>"])

# Using positional parameters
feature_consensus_average(0.17, 0.67, ["<schema_node_id>"])
```

### issue\_category

Filter by issue categories flagged during review.

**Operators:**

* `is_one_of`

```python theme={null}
from labelbox.schema.workflow import issue_category

# Using named parameter
issue_category.is_one_of(category_ids=["<issue_category_id>"])

# Using positional parameter
issue_category.is_one_off(["<issue_category_id>"])
```

### labeling\_time

Filter by how long it took to create the label. The value represents a number of seconds.

**Operators:**

* `greater_than`
* `less_than`
* `greater_than_or_equal`
* `less_than_or_equal`
* `between`

```python theme={null}
from labelbox.schema.workflow import labeling_time

# Using named parameter
labeling_time.greater_than(seconds=1000)

# Using positional parameter
labeling_time.greater_than(1000)
```

### labeled\_at

Filter by when the label was created.

**Operators:**

* `between`

```python theme={null}
from labelbox.schema.workflow import labeled_at
from datetime import datetime

# Using named parameters
labeled_at.between(
    start=datetime(2024, 3, 9, 5, 5, 42), 
    end=datetime(2025, 4, 28, 13, 5, 42)
)

# Using positional parameters
labeled_at.between(
    datetime(2024, 3, 9, 5, 5, 42), 
    datetime(2025, 4, 28, 13, 5, 42)
)
```

### labeled\_by

Filter by the user who created the label.

**Operators:**

* `is_one_of`

```python theme={null}
from labelbox.schema.workflow import labeled_by

# Using named parameter
labeled_by.is_one_of(user_ids=["<user_1_id>", "<user_2_id>"])

# Using positional parameter
labeled_by.is_one_of(["<user_1_id>", "<user_2_id>"])
```

### metadata

Filter by data row metadata values.

Each condition `m_condition`) takes a metadata schema ID `key`) and list of strings as parameters `value`).

**Operators:**

* `contains`
* `starts_with`
* `ends_with`
* `does_not_contain`
* `is_any`
* `is_not_any`

```python theme={null}
from labelbox.schema.workflow import metadata, m_condition

# Using named parameters
metadata(conditions=[m_condition.contains(key="<metadata_schema_id>", value=["test"])])

# Using positional parameters
metadata([m_condition.contains("<metadata_schema_id>", ["test"])])
```

### model\_prediction

Filter by model predictions. Model predictions use a list of conditions named `mp_condition`.

Each condition takes a list of model IDs, followed by an integer for the minimum score `min_score`) and an integer for the optional maxim score `max_score`).

`is_none` takes precedence over other operators.

**Operators:**

* `is_one_of`
* `is_not_one_of`
* `is_none`

```python theme={null}
from labelbox.schema.workflow import model_prediction, mp_condition

# Using named parameter
model_prediction(conditions=[
    mp_condition.is_one_of(models=["<model_id>"], min_score=1),
    mp_condition.is_not_one_of(models=["<model_id>"], min_score=2, max_score=6),
    mp_condition.is_none()
])

# Using positional parameter
model_prediction([
    mp_condition.is_one_of(["<model_id>"], 1),
    mp_condition.is_not_one_of(["<model_id>"], 2, 6),
    mp_condition.is_none()
])
```

### natural\_language

Filter using semantic search. The `content` (or prompt) follows this format:

`"Find this / more of this / not this / bias_value"`

where `bias_value` is a decimal between 0 and 1.

**Operators**: None (semantic search with score range)

```python theme={null}
from labelbox.schema.workflow import natural_language

# Using named parameters
natural_language(
    content="Birds in the sky/Blue sky/clouds/0.5", 
    min_score=0.178, 
    max_score=0.768
)

# Using positional parameters
natural_language("Birds in the sky/Blue sky/clouds/0.5", 0.178, 0.768)
```

### review\_time

Filter by how long it took to review the label.

**Operators:**

* `greater_than`
* `less_than`
* `greater_than_or_equal`
* `less_than_or_equal`
* `between`

```python theme={null}
from labelbox.schema.workflow import review_time

# Using named parameter
review_time.less_than_or_equal(seconds=100)

# Using positional parameter
review_time.less_than_or_equal(100)
```

### sample

Filter by percentage sampling. The percentage is entered as an integer.

**Operators:** None (percentage value)

```python theme={null}
from labelbox.schema.workflow import sample

# Using named parameter
sample(percentage=23)

# Using positional parameter
sample(23)
```

# Example: Create a minimal workflow

The following example will create a basic workflow with three nodes:

* Initial labeling task
* Rework (all rejected)
* Done

```python theme={null}
import labelbox as lb
from labelbox.schema.workflow import NodeType

# Initialize client and project
client = lb.Client(api_key="<YOUR_API_KEY>")
project_id = "<project_id>"
project = client.get_project(project_id)

# Get workflow
workflow = project.get_workflow()

# Create nodes
initial_nodes = workflow.reset_to_initial_nodes()
done = workflow.add_node(type=NodeType.Done)

# Connect nodes
workflow.add_edge(initial_nodes.labeling, done)
workflow.add_edge(initial_nodes.rework, done)


# Validate and commit changes
try:
    updated_workflow = workflow.update_config(reposition=True)
    print("Workflow updated successfully!")
except ValueError as e:
    print(f"Validation failed: {e}")
```

# Example: Showcase complete workflow

The following example shows all the nodes and filters.

```python theme={null}
import labelbox as lb
from labelbox.schema.workflow import (
    NodeType, 
    NodeOutput, 
    ProjectWorkflowFilter,
    LabelingConfig,
    ReworkConfig,
    labeled_by,
    metadata,
    sample,
    labeled_at,
    mp_condition,
    m_condition,
    labeling_time,
    review_time,
    issue_category,
    batch,
    dataset,
    annotation,
    consensus_average,
    model_prediction,
    natural_language,
    feature_consensus_average
)
from labelbox.schema.workflow.enums import IndividualAssignment, MatchFilters
from datetime import datetime

# Initialize client and project
client = lb.Client(api_key="<YOUR_API_KEY>")
project_id = "<project_id>"
project = client.get_project(project_id)

# Get workflow
workflow = project.get_workflow()

# Reset config to start a fresh workflow
initial_nodes = workflow.reset_to_initial_nodes(
    labeling_config=LabelingConfig(
        instructions="This is the entry point",
        max_contributions_per_user=10
    ),
    rework_config=ReworkConfig(
        individual_assignment=[IndividualAssignment.LabelCreator]
    )
)
initial_labeling = initial_nodes.labeling
initial_rework = initial_nodes.rework

initial_review = workflow.add_node(
    type=NodeType.Review,
    name="Initial review task",
    group_assignment=["<user_group_id_1>", "<user_group_id_2>"]
)

logic = workflow.add_node(
    type=NodeType.Logic,
    name="Logic node",
    match_filters=MatchFilters.Any,
    filters=ProjectWorkflowFilter([
        labeled_by.is_one_of(["<user_id_1>", "<user_id_2>", "<user_id_3>"]),
        metadata([m_condition.contains("<metadata_schema_id>", ["test"])]),
        sample(23),
        labeled_at.between(
            datetime(2024, 3, 9, 5, 5, 42), 
            datetime(2025, 4, 28, 13, 5, 42)
        ),
        labeling_time.greater_than(1000),
        review_time.less_than_or_equal(100),
        issue_category.is_one_of(["<issue_category_id>"]),
        batch.is_one_of(["<batch_id>"]),
        dataset.is_one_of(["<dataset_id>"]),
        annotation.is_one_of(["<schema_node_id>"]),
        consensus_average(0.17, 0.61),
        model_prediction([
            mp_condition.is_one_of(["<model_id_1>"], 1),
            mp_condition.is_not_one_of(["<model_id_2>"], 2, 6),
            mp_condition.is_none()
        ]),
        natural_language("Birds in the sky/Blue sky/clouds/0.5", 0.178, 0.768),
        feature_consensus_average(0.17, 0.67, ["<schema_node_id>"])
    ])
)

# Terminal and step nodes
done = workflow.add_node(type=NodeType.Done)
rework = workflow.add_node(type=NodeType.Rework, name="To rework")

custom_rework_1 = workflow.add_node(
    type=NodeType.CustomRework,
    name="Custom Rework 1",
    individual_assignment=IndividualAssignment.LabelCreator,
    group_assignment=["<user_group_id_1>", "<user_group_id_2>"]
)

review_2 = workflow.add_node(
    type=NodeType.Review,
    name="Review 2"
)

custom_rework_2 = workflow.add_node(
    type=NodeType.CustomRework,
    name="Custom Rework 2",
    instructions="Additional rework instructions"
)

done_2 = workflow.add_node(
    type=NodeType.Done,
    name="Ready for final review"
)

# Create edges between nodes
workflow.add_edge(initial_labeling, initial_review)
workflow.add_edge(initial_rework, initial_review)
workflow.add_edge(initial_review, logic, NodeOutput.Approved)
workflow.add_edge(initial_review, rework, NodeOutput.Rejected)
workflow.add_edge(logic, review_2, NodeOutput.If)
workflow.add_edge(logic, custom_rework_1, NodeOutput.Else)
workflow.add_edge(review_2, done, NodeOutput.Approved)
workflow.add_edge(review_2, custom_rework_2, NodeOutput.Rejected)
workflow.add_edge(custom_rework_2, done_2)

# Commit the workflow
try:
    updated_workflow = workflow.update_config(reposition=True)
    print("Workflow updated successfully!")
except ValueError as e:
    print(f"Validation failed: {e}")
```
