Skip to content

Fluxly

Fluxly Logo

Fluxly - Lightweight framework for portable, self-contained DAG-based workflows.

Python 3.10+ License MIT


Introduction

Fluxly is a lightweight framework for building and running directed acyclic graph (DAG)-based workflows.

The entire workflow acts as a self-contained execution endpoint:

  • Run them locally or via CLI commands, API calls, or environment triggers.
  • Package them in containers for portability.
  • Integrate seamlessly with higher-level orchestrators (Argo, Airflow, Prefect, CI/CD) without extra glue code.

With Fluxly, pipelines are highly structured, enabling safer execution, easier debugging, and better modularity.
Workflows can run standalone or as part of a larger system, making Fluxly both lightweight and flexible.

Why Fluxly (the problem it solves)

  • Unstructured container pipelines become spaghetti: per‑container scripts, mismatched inputs/outputs, and duplicated retries/timeouts/logging without a shared wrapper.
  • Heavy solutions like Airflow require schedulers, DBs, and webservers—great for platforms, heavy for simple, portable pipelines.
  • Orchestrator‑centric SDKs (e.g., Prefect) add remote control planes and runtime coupling that don’t fit autonomous, fire‑and‑forget containers.
  • Fluxly keeps logic structured and isolated inside a single repo and image: typed I/O models, explicit DAG, consistent entrypoints (CLI/API/env), and clear node boundaries. Any orchestrator can trigger it, but your code stays clean and portable.
  • Ideal when each Docker should remain simple and self‑contained, with minimal coordination overhead.
  • In monorepos—or by wrapping Fluxly—you can centralize shared services, typed inputs, validations, outputs, and metadata to fit org standards. This thin wrapper standardizes containers and removes boilerplate across pipelines.

Complex workflow

Complex Workflow Diagram

Clear representation of workflow DAG with branching and parallel execution.


Key Features

  • Flexible entrypoints – workflows can be triggered via CLI, API calls, or environment variables.
  • DAG-based workflows – define arbitrary connections between ndoes and their dependencies.
  • Highly structured workflows – strict validation ensures safer pipelines, easier debugging, and predictable behavior.
  • Self-orchestrated nodes – each node manages its own execution, retries, and dependencies.
  • Lightweight building blocks – workflows are self-contained units that can run independently in any environment.
  • Extensible by design – wrap workflows with custom classes to add logging, metrics, or integrations.
  • Local-first development – debug and run workflows standalone, then scale seamlessly to CI/CD or external orchestrators.

Installation

Install Fluxly

pip install fluxly

Quick Start

1) Define Workflow Input

Define a typed WorkflowInput to specify all inputs your workflow expects.
These inputs are agnostic to how the workflow is triggered — they can come from CLI flags, API requests, or environment variables — giving you maximum flexibility.

WorkflowInput Example

from fluxly.workflow import WorkflowInput

class MyInput(WorkflowInput):
    message: str = "world"

2) Make nodes

Implement Node logic in _logic() and type-narrow workflow_input for convenience.

Node Example

from fluxly.node import Node

class Echo(Node):
    @property
    def workflow_input(self) -> MyInput:
        return self._workflow_input

    def _logic(self) -> None:
        self._logger.info(f"Echo: {self.workflow_input.message}")

3) Build a workflow

Create a Workflow, add nodes to it, and wire edges/conditions to express execution order and data dependencies.
Workflows orchestrate retries, timeouts, and overall execution.

Workflow Example

from fluxly.workflow import Workflow

def build_workflow() -> Workflow:
    wf = Workflow(name="demo", description="Demo flow", version="0.1")

    # Nodes: demonstrate node-level timeout and retries
    alpha = Echo(name="alpha", description="prints a message", timeout_seconds=5, max_retries=1, retry_delay_seconds=1)
    beta = Echo(name="beta", description="prints after alpha")
    gamma = Echo(name="gamma", description="conditional step")

    # Register nodes to the workflow
    wf.add_node(alpha)
    wf.add_node(beta)
    wf.add_node(gamma)

    # Edges (ordering and conditions)
    wf.add_edge(alpha, beta)  # run beta after alpha
    wf.add_conditional_edge(beta, gamma, condition=lambda: True)  # example condition
    # Alternatively, run a node only if its parent completed successfully
    wf.add_edge_if_source_completed(alpha, gamma)

    return wf

4) Expose Your Workflow

Use Fluxly to expose your workflow as a single, consistent entrypoint.
Inputs from your WorkflowInput are automatically mapped to CLI flags, API payloads, or environment variables, keeping your interface consistent and type-safe across all triggers.

Expose Workflow

from fluxly import Fluxly

if __name__ == "__main__":
    app = Fluxly()
    wf = build_workflow()
    # register workflow with its input type
    app.add_endpoint("my-workflow", wf, MyInput)
    app.run()  # can be triggered via CLI, API, or env vars

Run it via CLI:

Run CLI

python main.py my-workflow --message hello

# discover generated flags and help text
python main.py my-workflow --help

Trigger via HTTP POST request:

Run via API

import requests

url = "http://localhost:8000/my-workflow/run"
payload = {"message": "hello"}

response = requests.post(url, json=payload)
print(response.status_code, response.json())

Trigger via environment variables (with FLUXLY_ prefix):

Run via Environment Variables

export FLUXLY_MESSAGE="hello"
python main.py my-workflow

Generated Entrypoints (CLI and API)

Fluxly auto-generates a CLI menu and API routes per registered workflow.

CLI Menu

Fluxly CLI Menu

CLI commands generated for your workflows.

Swagger UI

Fluxly Swagger UI

API endpoints for submitting runs and fetching statuses.


Where to go next

  • Getting started: project layout, environment, and running the included demo — Getting started
  • Core concepts: workflows, nodes, execution groups, orchestration controls, and node-to-node communication — Core concepts