> ## Documentation Index
> Fetch the complete documentation index at: https://docs.trainy.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Distributed Logging

> Log metrics from multinode and multiprocessing training jobs with torchrun

When running distributed training with `torchrun` across multiple nodes or processes, all workers can log metrics to the same experiment using a shared `run_id`. This enables unified tracking of distributed training runs.

## How It Works

Pluto supports attaching multiple processes to the same experiment run. When the first process calls `pluto.init()`, a new run is created. Subsequent processes with the same `run_id` will attach to that existing run instead of creating new ones.

## Setting Up Distributed Logging

### 1. Set a Shared Run ID

Before launching your distributed job, set the `PLUTO_RUN_ID` environment variable to a unique identifier:

```bash theme={null}
export PLUTO_RUN_ID="experiment-123"
```

All processes that share this environment variable will log to the same experiment.

<Note>
  When using `konduktor launch` , `PLUTO_RUN_ID` is set to the job name by default so you don't need to explicitly set it yourself.
</Note>

### 2. Initialize Pluto in Your Training Script

In your training script, initialize Pluto after setting up the distributed process group:

```python theme={null}
import pluto
import torch.distributed as dist

# Initialize distributed training
dist.init_process_group()
rank = dist.get_rank()

# Initialize Pluto - all ranks will attach to the same run
run = pluto.init(project="my-project", name="ddp-training")

# Check if this process resumed an existing run
print(f"Rank {rank}: resumed={run.resumed}, run_id={run.id}")
```

### 3. Log Metrics with Rank Prefixes

To distinguish metrics from different processes, prefix them with the rank:

```python theme={null}
# Log rank-specific metrics
run.log({f"train/loss/rank{rank}": loss})

# Or log only from rank 0 for global metrics
if rank == 0:
    run.log({"global/epoch": epoch, "global/learning_rate": lr})
```

## Complete Example

Here's a full example for a Konduktor task that runs distributed training with Pluto logging:

```yaml theme={null}
name: distributed-training-with-logging

resources:
    image_id: nvcr.io/nvidia/pytorch:23.10-py3
    accelerators: H100:8
    cpus: 60
    memory: 500
    labels:
      kueue.x-k8s.io/queue-name: user-queue

num_nodes: 2

run: |
    # (optional) Set shared run ID for all processes
    export PLUTO_RUN_ID="training-123"

    # Launch distributed training
    torchrun --nproc_per_node=$NUM_GPUS_PER_NODE \
        --nnodes=$NUM_NODES \
        --node_rank=$RANK \
        --master_addr=$MASTER_ADDR \
        --master_port=8008 \
        train.py
```

And the corresponding training script:

```python theme={null}
# train.py
import os
import pluto
import torch
import torch.distributed as dist

def main():
    # Initialize distributed
    dist.init_process_group(backend="nccl")
    rank = dist.get_rank()
    world_size = dist.get_world_size()
    local_rank = int(os.environ.get("LOCAL_RANK", 0))

    # Set device
    torch.cuda.set_device(local_rank)

    # Initialize Pluto - all processes attach to same run via PLUTO_RUN_ID
    config = {
        "world_size": world_size,
        "learning_rate": 0.001,
        "batch_size": 32,
    }
    run = pluto.init(
        project="distributed-training",
        name="ddp-experiment",
        config=config if rank == 0 else None,  # Only rank 0 sets config
    )

    # Training loop
    for epoch in range(num_epochs):
        for batch_idx, (data, target) in enumerate(train_loader):
            # ... training step ...
            loss = train_step(model, data, target)

            # Log loss from each rank
            run.log({f"train/loss/rank{rank}": loss.item()})

        # Log epoch-level metrics from rank 0
        if rank == 0:
            run.log({"train/epoch": epoch})

    run.finish()
    dist.destroy_process_group()

if __name__ == "__main__":
    main()
```

## Run Properties

The run object provides useful properties for distributed scenarios:

| Property      | Type   | Description                                        |
| ------------- | ------ | -------------------------------------------------- |
| `run.resumed` | `bool` | `True` if this process attached to an existing run |
| `run.run_id`  | `str`  | The user-provided external run ID                  |
| `run.id`      | `int`  | The server-assigned numeric run ID                 |

## Environment Variables

Pluto recognizes the following environment variables for distributed logging:

| Variable       | Description                                                |
| -------------- | ---------------------------------------------------------- |
| `PLUTO_RUN_ID` | Primary environment variable for shared run identification |
| `MLOP_RUN_ID`  | Fallback environment variable (for compatibility)          |

## Best Practices

1. **Set run\_id before launching**: Ensure `PLUTO_RUN_ID` is set before calling `torchrun` so all processes inherit the same value.
2. **Use rank prefixes**: Prefix metrics with the rank to distinguish data from different processes in the dashboard.
3. **Log config from rank 0 only**: Pass `config` only from rank 0 to avoid duplicate metadata.
4. **Unique run IDs**: Include a timestamp or UUID in your run ID to ensure each training run is distinct.
5. **`Handle the name parameter`**: The `name` parameter is only used when creating a new run. Processes that resume an existing run will ignore this parameter (a warning is logged to indicate this).
