Skip to content

Getting Started with Fluxly

Fluxly lets you quickly design, run, and manage DAG-based workflows.
Workflows can be triggered via CLI, API, or environment variables, so the same workflow works in any context.
This guide walks you through creating your first workflow, adding nodes, handling inputs, and executing it through your chosen entrypoint.


1. Install Fluxly

Install via pip:

Install via pip

pip install fluxly

Or clone the repository for development:

Clone for development

git clone https://github.com/ShaharBand/fluxly.git
cd fluxly
pip install -e .

2. Define a Workflow Input

Workflows use typed inputs with Pydantic for validation.
These inputs can be provided via CLI flags, API payloads, or environment variables, keeping your workflows flexible and consistent.
Annotate fields with descriptions and constraints to improve validation and automatically generate help text for any entrypoint.

Workflow Input Example

from typing import Annotated

from pydantic import Field
from fluxly.workflow import WorkflowInput

class MyWorkflowInput(WorkflowInput):
    message: Annotated[str, Field(min_length=1, max_length=100, description="Message to print")] = "Hello, Fluxly!"
    repeat: Annotated[int, Field(ge=1, le=10, description="How many times to repeat")] = 3

3. Create Nodes

Nodes encapsulate single units of work. Each node receives the workflow input and communicates results via typed outputs, avoiding ad-hoc dictionaries or positional references.

Node Example

from fluxly.node import Node

class PrintNode(Node):
    @property
    def workflow_input(self) -> MyWorkflowInput:
        return self._workflow_input

    def _logic(self) -> None:
        for i in range(self.workflow_input.repeat):
            self._logger.info(f"{i+1}: {self.workflow_input.message}")

4. Assemble a Workflow

Create a workflow, add nodes, and define their dependencies.

Assemble Workflow

from fluxly.workflow import Workflow

workflow = Workflow(name="demo-workflow")

# Nodes
node_a = PrintNode(name="PrintMessage")

# Add nodes
workflow.add_node(node_a)

# Optionally, add edges if multiple nodes exist
# workflow.add_edge(node_a, node_b)
# workflow.add_edge_if_source_completed(node_a, node_b)  # run node_b only if node_a completed

5. Execute the Workflow

Fluxly exposes your workflow as a entrypoint that can be triggered via CLI, API, or environment variables.
Your WorkflowInput is the single source of truth for both CLI flags and API payload validation.

Execute Workflow

from fluxly import Fluxly

app = Fluxly()
app.add_endpoint("run-demo", workflow, MyWorkflowInput)
app.run()  # auto: CLI if args present, otherwise API server

Run from the terminal (CLI example):

Run CLI

python your_script.py run-demo --message "Hi there!" --repeat 5

Trigger via HTTP POST request:

Run via API

import requests

url = "http://localhost:8000/run-demo/run"
payload = {"message": "Hello!"}

response = requests.post(url, json=payload)
print(response.status_code, response.json())

Trigger via environment variables (with FLUXLY_ prefix to avoid collisions):

Run via Environment Variables

export FLUXLY_MESSAGE="Hello!"
export FLUXLY_REPEAT=5
python your_script.py run-demo

Choose your entrypoint explicitly

Use one of the following based on your deployment context:

  • app.run() — auto-selects: runs CLI when arguments are provided, otherwise starts the API server.
  • app.run_cli() — force CLI mode.
  • app.run_api() — force API server.

Explicit run modes

from fluxly import Fluxly

app = Fluxly()
app.add_endpoint("run-demo", workflow, MyWorkflowInput)

# Force CLI
# app.run_cli()

# Force API
# app.run_api()

Configure the FastAPI/Uvicorn server

You can pass FastAPI and Uvicorn settings via ApiConfig and app.configure_api().

API configuration

from fluxly import Fluxly
from fluxly.api import ApiConfig

app = Fluxly()
app.add_endpoint("run-demo", workflow, MyWorkflowInput)

app.configure_api(
    ApiConfig(
        host="0.0.0.0",
        port=9000,
        log_level="info",
        fastapi_kwargs={
            "docs_url": "/docs",
            "openapi_url": "/openapi.json",
        },
        uvicorn_kwargs={
            "reload": True,
            # any other uvicorn.run kwargs
        },
    )
)

app.run_api()

6. Handle Lifecycle Hooks

Extend nodes with hooks for logging, metrics, cleanup, or notifications. Hooks exist for both nodes and workflows:

  • on_start – before execution
  • on_success – after successful completion
  • on_failure – after failure
  • on_finish – always called

Lifecycle Hooks Example

class LoggingNode(PrintNode):
    def on_start(self) -> None:
        self._logger.info(f"Starting node: {self.name}")

    def on_success(self) -> None:
        self._logger.info(f"Finished node: {self.name}")

    def on_failure(self, error) -> None:
        self._logger.info(f"Node {self.name} failed: {error}")

7. Enable Auto-Generated Documentation

Fluxly can automatically generate Markdown documentation and DAG diagrams.
You can configure this either:

  • Programmatically: set defaults in workflow.inputs.
  • At runtime: provide flags, API payloads, or environment variables; all map to WorkflowInput fields.

Programmatic defaults (set once)

# Set default documentation behavior on the workflow template
workflow.inputs = MyWorkflowInput(
    auto_generate_md=True,
    md_file_path="workflow_doc.md",
    diagram_file_path="workflow_diagram.png",
)

Run from the terminal (CLI example):

Override via CLI flags

python your_script.py run-demo \
  --auto-generate-md \
  --md-file-path workflow_doc.md \
  --diagram-file-path workflow_diagram.png

Trigger via API POST request:

Override via API

import requests

url = "http://localhost:8000/run-demo/run"
payload = {
    "auto_generate_md": True,
    "md_file_path": "workflow_doc.md",
    "diagram_file_path": "workflow_diagram.png",
}

response = requests.post(url, json=payload)
print(response.status_code, response.json())

Trigger via environment variables (with FLUXLY_ prefix):

Override via Environment Variables

export FLUXLY_AUTO_GENERATE_MD=1
export FLUXLY_MD_FILE_PATH="workflow_doc.md"
export FLUXLY_DIAGRAM_FILE_PATH="workflow_diagram.png"
python your_script.py run-demo

Note

At the end of execution, Fluxly outputs a readable summary and a diagram of the workflow.
This is useful for reviews, knowledge sharing, and onboarding new developers.


8. Next Steps

  • Add conditional edges and parallel execution groups.
  • Explore typed exceptions for reliable error handling.
  • Integrate with logging, metrics, or CI/CD pipelines.
  • Write unit tests for workflows and nodes to ensure predictable behavior.

Note

Once you are comfortable with the basics, you can start building production-ready pipelines using Fluxly, scaling from local scripts to distributed workflows.