Skip to content

Fluxly — Flexible, Typed DAG Workflows

Fluxly is a flexible framework for designing, running, and shipping DAG-based workflows with strong typing, lifecycle hooks, and maintainable code structure.
It’s built for developers who want predictable, debuggable, and production-ready workflows that can be triggered via CLI, API, or environment variables.


Consistent, Strongly Typed Interfaces

Each workflow exposes a consistent entrypoint. Inputs from your WorkflowInput class are automatically mapped to CLI flags, API payloads, or environment variables, including type hints, defaults, and descriptions.
This eliminates manual parsing and ensures consistent interfaces across scripts and production pipelines.

EntryPoint Example

from fluxly import Fluxly
from fluxly.workflow import Workflow, WorkflowInput
from fluxly.node import Node

class MyInput(WorkflowInput):
    message: str = "hello"

class Echo(Node):
    @property
    def workflow_input(self) -> MyInput:
        return self._workflow_input

    def _logic(self) -> None:
        self._logger.info(f"Message: {self.workflow_input.message}")

app = Fluxly()
app.add_endpoint("echo-msg", workflow, MyInput)
app.run()  # CLI, API, or environment triggers

Modern Python Throughout

Workflows and nodes are implemented using idiomatic Python classes and type annotations.
No DSLs or YAML schemas—use Pydantic for input validation and serialization.

Type-safe Node Example

class CounterInput(WorkflowInput):
    count: int = 5

class Counter(Node):
    @property
    def workflow_input(self) -> CounterInput:
        return self._workflow_input

    def _logic(self) -> None:
        for i in range(self.workflow_input.count):
            self._logger.info(f"Step {i+1}")

Node-to-Node Communication

Nodes communicate via typed execution classes, either by: - Importing and referencing the upstream node instance after it runs. - Using conditional edges based on upstream status.

Nodes typically live in their own files; import node classes and create instances in the workflow.

producer.py

from fluxly.node import Node, NodeExecution, NodeOutput

class ProducerOutput(NodeOutput):
    value: int | None = None

class ProducerExecution(NodeExecution):
    output: ProducerOutput = ProducerOutput()

class Producer(Node):
    def _create_execution(self) -> ProducerExecution:
        return ProducerExecution()

    def _logic(self) -> None:
        self.current_execution.output.value = 42

consumer.py

from fluxly.node import Node
from producer import Producer

class Consumer(Node):
    producer: Producer

    def _logic(self) -> None:
        value = self.producer.last_execution.output.value
        self._logger.info(value)

workflow.py

from fluxly.workflow import Workflow
from producer import Producer
from consumer import Consumer

wf = Workflow(name="demo")
producer = Producer(name="producer")
consumer = Consumer(name="consumer", producer=producer)

wf.add_node(producer)
wf.add_node(consumer)
wf.add_edge(producer, consumer)

Conditional edge example

wf.add_edge_if_source_completed(producer, consumer)

DAG Orchestration with Validation and Retries

Workflows are automatically validated as DAGs.
Execution respects dependencies and conditional edges.

DAG Example

wf = Workflow("demo")

node_a = Echo(name="A", timeout_seconds=5)
node_b = Echo(name="B")
node_c = Echo(name="C")

wf.add_node(node_a)
wf.add_node(node_b)
wf.add_node(node_c)

wf.add_edge(node_a, node_b)
wf.add_conditional_edge(node_b, node_c, condition=lambda: True)

Lifecycle Hooks

Add cross-cutting behavior without touching business logic. Hooks exist at node and workflow levels:

  • on_start – before execution
  • on_success – after success
  • on_failure – after failure
  • on_finish – always called

Lifecycle Example

class Echo(Node):
    def on_start(self) -> None:
        self._logger.info(f"Starting {self.name}")

    def on_success(self) -> None:
        self._logger.info(f"Finished {self.name}")

Typed Exceptions and Consistent Exit Codes

  • Raise custom exceptions mapped to StatusCodes.
  • Entry points (CLI, API, schedulers) receive consistent exit codes.
  • Ensures predictable CI/CD pipelines.

Exception Example

from fluxly.exceptions import WorkflowException
from fluxly.status import StatusCodes

class MyNode(Node):
    def _logic(self) -> None:
        raise WorkflowException("Something went wrong", exit_code=StatusCodes.FAILURE)

Auto-Generated Documentation and Diagrams

Fluxly can generate Markdown documentation and DAG diagrams automatically.
Helps teams review workflow structures and share knowledge.

Simple workflow

Simple workflow Clear, linear steps for quick tasks.

Complex workflow

Complex workflow Branching, conditional edges, and parallel paths.


Extensible Architecture

  • Wrap workflows and nodes to add logging, metrics, policies, integrations.
  • Define a base layer for shared defaults and configuration.
  • Scale from local scripts to production pipelines with minimal rewrites.

Tested and Type-Checked

  • Unit tests cover workflow behavior.
  • Continuous integration enforces linting, type-checks, and formatting.
  • Strong typing ensures safe refactors and long-term maintainability.