Skip to main content
When running distributed training with torchrun across multiple nodes or processes, all workers can log metrics to the same experiment using a shared run_id. This enables unified tracking of distributed training runs.

How It Works

Pluto supports attaching multiple processes to the same experiment run. When the first process calls pluto.init(), a new run is created. Subsequent processes with the same run_id will attach to that existing run instead of creating new ones.

Setting Up Distributed Logging

1. Set a Shared Run ID

Before launching your distributed job, set the PLUTO_RUN_ID environment variable to a unique identifier:
export PLUTO_RUN_ID="experiment-$(date +%Y%m%d-%H%M%S)"
All processes that share this environment variable will log to the same experiment.

2. Initialize Pluto in Your Training Script

In your training script, initialize Pluto after setting up the distributed process group:
import pluto
import torch.distributed as dist

# Initialize distributed training
dist.init_process_group()
rank = dist.get_rank()

# Initialize Pluto - all ranks will attach to the same run
run = pluto.init(project="my-project", name="ddp-training")

# Check if this process resumed an existing run
print(f"Rank {rank}: resumed={run.resumed}, run_id={run.id}")

3. Log Metrics with Rank Prefixes

To distinguish metrics from different processes, prefix them with the rank:
# Log rank-specific metrics
run.log({f"train/loss/rank{rank}": loss})

# Or log only from rank 0 for global metrics
if rank == 0:
    run.log({"global/epoch": epoch, "global/learning_rate": lr})

Complete Example

Here’s a full example for a Konduktor task that runs distributed training with Pluto logging:
name: distributed-training-with-logging

resources:
    image_id: nvcr.io/nvidia/pytorch:23.10-py3
    accelerators: H100:8
    cpus: 60
    memory: 500
    labels:
      kueue.x-k8s.io/queue-name: user-queue

num_nodes: 2

run: |
    # Set shared run ID for all processes
    export PLUTO_RUN_ID="training-$(date +%Y%m%d-%H%M%S)"

    # Launch distributed training
    torchrun --nproc_per_node=$NUM_GPUS_PER_NODE \
        --nnodes=$NUM_NODES \
        --node_rank=$RANK \
        --master_addr=$MASTER_ADDR \
        --master_port=8008 \
        train.py
And the corresponding training script:
# train.py
import os
import pluto
import torch
import torch.distributed as dist

def main():
    # Initialize distributed
    dist.init_process_group(backend="nccl")
    rank = dist.get_rank()
    world_size = dist.get_world_size()
    local_rank = int(os.environ.get("LOCAL_RANK", 0))

    # Set device
    torch.cuda.set_device(local_rank)

    # Initialize Pluto - all processes attach to same run via PLUTO_RUN_ID
    config = {
        "world_size": world_size,
        "learning_rate": 0.001,
        "batch_size": 32,
    }
    run = pluto.init(
        project="distributed-training",
        name="ddp-experiment",
        config=config if rank == 0 else None,  # Only rank 0 sets config
    )

    # Training loop
    for epoch in range(num_epochs):
        for batch_idx, (data, target) in enumerate(train_loader):
            # ... training step ...
            loss = train_step(model, data, target)

            # Log loss from each rank
            run.log({f"train/loss/rank{rank}": loss.item()})

        # Log epoch-level metrics from rank 0
        if rank == 0:
            run.log({"train/epoch": epoch})

    run.finish()
    dist.destroy_process_group()

if __name__ == "__main__":
    main()

Run Properties

The run object provides useful properties for distributed scenarios:
PropertyTypeDescription
run.resumedboolTrue if this process attached to an existing run
run.run_idstrThe user-provided external run ID
run.idintThe server-assigned numeric run ID

Alternative: Pass run_id Directly

Instead of using the environment variable, you can pass run_id directly to pluto.init():
import os
run_id = os.environ.get("SHARED_RUN_ID", "default-run")
run = pluto.init(project="my-project", name="experiment", run_id=run_id)

Environment Variables

Pluto recognizes the following environment variables for distributed logging:
VariableDescription
PLUTO_RUN_IDPrimary environment variable for shared run identification
MLOP_RUN_IDFallback environment variable (for compatibility)

Best Practices

  1. Set run_id before launching: Ensure PLUTO_RUN_ID is set before calling torchrun so all processes inherit the same value.
  2. Use rank prefixes: Prefix metrics with the rank to distinguish data from different processes in the dashboard.
  3. Log config from rank 0 only: Pass config only from rank 0 to avoid duplicate metadata.
  4. Unique run IDs: Include a timestamp or UUID in your run ID to ensure each training run is distinct.
  5. Handle the name parameter: The name parameter is only used when creating a new run. Processes that resume an existing run will ignore this parameter (a warning is logged to indicate this).