ahcore package#

Subpackages#

Submodules#

ahcore.entrypoints module#

Entrypoints

ahcore.entrypoints.create_datamodule(config: DictConfig) tuple[ahcore.utils.data.DataDescription, pytorch_lightning.core.datamodule.LightningDataModule][source]#
ahcore.entrypoints.inference(config: DictConfig) None[source]#

Contains the inference pipeline. Arguments ——— config : DictConfig

Configuration composed by Hydra.

Returns:
None
ahcore.entrypoints.train(config: DictConfig) Tensor | None[source]#

Contains the training pipeline. Can additionally evaluate model on a testset, using best weights achieved during training. Arguments ——— config : DictConfig

Configuration composed by Hydra.

Returns:
Optionalfloat

Metric score for hyperparameter optimization.

ahcore.exceptions module#

Exceptions for ahcore

exception ahcore.exceptions.ConfigurationError(message: str | None)[source]#

Bases: Exception

exception ahcore.exceptions.RecordNotFoundError[source]#

Bases: Exception

Exception for the database manager.

ahcore.lit_module module#

This module contains the core Lightning module for ahcore. This module is responsible for: - Training, Validation and Inference - Wrapping models

class ahcore.lit_module.AhCoreLightningModule(model: Module, optimizer: Optimizer, data_description: DataDescription, loss: Module | None = None, augmentations: dict[str, torch.nn.modules.module.Module] | None = None, metrics: dict[str, ahcore.metrics.metrics.MetricFactory | ahcore.metrics.metrics.WSIMetricFactory] | None = None, scheduler: LRScheduler | None = None)[source]#

Bases: LightningModule

RELEVANT_KEYS = ['coordinates', 'mpp', 'path', 'region_index', 'grid_local_coordinates', 'grid_index']#
configure_optimizers() Any[source]#

Choose what optimizers and learning-rate schedulers to use in your optimization. Normally you’d need one. But in the case of GANs or similar you might have multiple. Optimization with multiple optimizers only works in the manual optimization mode.

Return:

Any of these 6 options.

  • Single optimizer.

  • List or Tuple of optimizers.

  • Two lists - The first list has multiple optimizers, and the second has multiple LR schedulers (or multiple lr_scheduler_config).

  • Dictionary, with an "optimizer" key, and (optionally) a "lr_scheduler" key whose value is a single LR scheduler or lr_scheduler_config.

  • None - Fit will run without any optimizer.

The lr_scheduler_config is a dictionary which contains the scheduler and its associated configuration. The default configuration is shown below.

lr_scheduler_config = {
    # REQUIRED: The scheduler instance
    "scheduler": lr_scheduler,
    # The unit of the scheduler's step size, could also be 'step'.
    # 'epoch' updates the scheduler on epoch end whereas 'step'
    # updates it after a optimizer update.
    "interval": "epoch",
    # How many epochs/steps should pass between calls to
    # `scheduler.step()`. 1 corresponds to updating the learning
    # rate after every epoch/step.
    "frequency": 1,
    # Metric to to monitor for schedulers like `ReduceLROnPlateau`
    "monitor": "val_loss",
    # If set to `True`, will enforce that the value specified 'monitor'
    # is available when the scheduler is updated, thus stopping
    # training if not found. If set to `False`, it will only produce a warning
    "strict": True,
    # If using the `LearningRateMonitor` callback to monitor the
    # learning rate progress, this keyword can be used to specify
    # a custom logged name
    "name": None,
}

When there are schedulers in which the .step() method is conditioned on a value, such as the torch.optim.lr_scheduler.ReduceLROnPlateau scheduler, Lightning requires that the lr_scheduler_config contains the keyword "monitor" set to the metric name that the scheduler should be conditioned on.

Metrics can be made available to monitor by simply logging it using self.log('metric_to_track', metric_val) in your LightningModule.

Note:

Some things to know:

  • Lightning calls .backward() and .step() automatically in case of automatic optimization.

  • If a learning rate scheduler is specified in configure_optimizers() with key "interval" (default “epoch”) in the scheduler configuration, Lightning will call the scheduler’s .step() method automatically in case of automatic optimization.

  • If you use 16-bit precision (precision=16), Lightning will automatically handle the optimizer.

  • If you use torch.optim.LBFGS, Lightning handles the closure function automatically for you.

  • If you use multiple optimizers, you will have to switch to ‘manual optimization’ mode and step them yourself.

  • If you need to control how often the optimizer steps, override the optimizer_step() hook.

property data_description: DataDescription#
do_step(batch: dict[str, Any], batch_idx: int, stage: TrainerFn | str) dict[str, Any][source]#
forward(sample: Tensor) Any[source]#

This function is only used during inference

property name: str#
predict_step(batch: Any, batch_idx: int, dataloader_idx: int = 0) Any[source]#

Step function called during predict(). By default, it calls forward(). Override to add any processing logic.

The predict_step() is used to scale inference on multi-devices.

To prevent an OOM error, it is possible to use BasePredictionWriter callback to write the predictions to disk or database after each batch or on epoch end.

The BasePredictionWriter should be used while using a spawn based accelerator. This happens for Trainer(strategy="ddp_spawn") or training on 8 TPU cores with Trainer(accelerator="tpu", devices=8) as predictions won’t be returned.

Args:

batch: The output of your data iterable, normally a DataLoader. batch_idx: The index of this batch. dataloader_idx: The index of the dataloader that produced this batch.

(only if multiple dataloaders used)

Return:

Predicted output (optional).

Example

class MyModel(LightningModule):

    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        return self(batch)

dm = ...
model = MyModel()
trainer = Trainer(accelerator="gpu", devices=2)
predictions = trainer.predict(model, dm)
training_step(batch: dict[str, Any], batch_idx: int) dict[str, Any][source]#

Here you compute and return the training loss and some additional metrics for e.g. the progress bar or logger.

Args:

batch: The output of your data iterable, normally a DataLoader. batch_idx: The index of this batch. dataloader_idx: The index of the dataloader that produced this batch.

(only if multiple dataloaders used)

Return:
  • Tensor - The loss tensor

  • dict - A dictionary. Can include any keys, but must include the key 'loss'.

  • None - Skip to the next batch. This is only supported for automatic optimization.

    This is not supported for multi-GPU, TPU, IPU, or DeepSpeed.

In this step you’d normally do the forward pass and calculate the loss for a batch. You can also do fancier things like multiple forward passes or something model specific.

Example:

def training_step(self, batch, batch_idx):
    x, y, z = batch
    out = self.encoder(x)
    loss = self.loss(out, x)
    return loss

To use multiple optimizers, you can switch to ‘manual optimization’ and control their stepping:

def __init__(self):
    super().__init__()
    self.automatic_optimization = False

# Multiple optimizers (e.g.: GANs)
def training_step(self, batch, batch_idx):
    opt1, opt2 = self.optimizers()

    # do training_step with encoder
    ...
    opt1.step()
    # do training_step with decoder
    ...
    opt2.step()
Note:

When accumulate_grad_batches > 1, the loss returned here will be automatically normalized by accumulate_grad_batches internally.

validation_step(batch: dict[str, Any], batch_idx: int) dict[str, Any][source]#

Operates on a single batch of data from the validation set. In this step you’d might generate examples or calculate anything of interest like accuracy.

Args:

batch: The output of your data iterable, normally a DataLoader. batch_idx: The index of this batch. dataloader_idx: The index of the dataloader that produced this batch.

(only if multiple dataloaders used)

Return:
  • Tensor - The loss tensor

  • dict - A dictionary. Can include any keys, but must include the key 'loss'.

  • None - Skip to the next batch.

# if you have one val dataloader:
def validation_step(self, batch, batch_idx):
    ...

# if you have multiple val dataloaders:
def validation_step(self, batch, batch_idx, dataloader_idx=0):
    ...

Examples:

# CASE 1: A single validation dataset
def validation_step(self, batch, batch_idx):
    x, y = batch

    # implement your own
    out = self(x)
    loss = self.loss(out, y)

    # log 6 example images
    # or generated text... or whatever
    sample_imgs = x[:6]
    grid = torchvision.utils.make_grid(sample_imgs)
    self.logger.experiment.add_image('example_images', grid, 0)

    # calculate acc
    labels_hat = torch.argmax(out, dim=1)
    val_acc = torch.sum(y == labels_hat).item() / (len(y) * 1.0)

    # log the outputs!
    self.log_dict({'val_loss': loss, 'val_acc': val_acc})

If you pass in multiple val dataloaders, validation_step() will have an additional argument. We recommend setting the default value of 0 so that you can quickly switch between single and multiple dataloaders.

# CASE 2: multiple validation dataloaders
def validation_step(self, batch, batch_idx, dataloader_idx=0):
    # dataloader_idx tells you which dataset this is.
    ...
Note:

If you don’t need to validate you don’t need to implement this method.

Note:

When the validation_step() is called, the model has been put in eval mode and PyTorch gradients have been disabled. At the end of validation, the model goes back to training mode and gradients are enabled.

property wsi_metrics: WSIMetricFactory | None#

ahcore.losses module#

Loss factory

All the relevant loss modules. In ahcore, losses are returned per sample in the batch.

class ahcore.losses.LossFactory(losses: list[dict[str, Callable[[torch.Tensor, torch.Tensor, Optional[torch.Tensor], Optional[torch.Tensor]], torch.Tensor]]], weights: list[Union[torch.Tensor, float]] | None = None, class_proportions: Tensor | None = None)[source]#

Bases: Module

Loss factory to construct the total loss.

Parameters:
losseslist[dict[str, Callable[[torch.Tensor, torch.Tensor, torch.Tensor | None], torch.Tensor]]

List of losses which are functions which accept (input, target, roi, weight). The weight will be applied per class.

weightslist

List of length losses. The weights weight the total contribution so weight_0 * loss_0_val + … will be the resulting loss.

class_proportionstorch.Tensor, optional

The class proportions are used to weight the loss per class. This is useful for class imbalance.

forward(input: Tensor, target: Tensor, roi: Tensor | None = None) Tensor[source]#

Defines the computation performed at every call.

Should be overridden by all subclasses.

Note

Although the recipe for forward pass needs to be defined within this function, one should call the Module instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.

ahcore.losses.cross_entropy(input: Tensor, target: Tensor, roi: Tensor | None = None, weight: Tensor | None = None, ignore_index: int | None = None, topk: float | None = None, label_smoothing: float = 0.0, limit: float | None = None) Tensor[source]#

Compute a ROI weighted cross entropy function. The resulting output is a per-sample cross entropy.

Parameters:
inputtorch.Tensor

Input of shape (N, C, H, W).

targettorch.Tensor

One-hot encoded target of shape (N, C, H, W).

roitorch.Tensor

ROI of shape (N, 1, H, W)

weighttorch.Tensor, optional

Per class weight

ignore_indexint, optional

Specifies a target value that is ignored and does not contribute to the input gradient.

topkfloat, optional

Apply top-k in the loss

label_smoothingfloat, optional

Float in [0, 1]. Amount of smoothing. Rethinking the Inception Architecture for Computer Vision. Default: \(0.0\).

limitfloat, optional

If set this will be the value the cross entropy is clipped (from below). This has to be a negative value.

Returns:
torch.Tensor

Output as a torch.Tensor float

ahcore.losses.soft_dice(input: Tensor, target: Tensor, roi: Tensor | None = None, weight: Tensor | None = None, ignore_index: int | None = None, eps: float = 1e-17) Tensor[source]#

Criterion that computes Sørensen-Dice Coefficient loss.

According to [1], we compute the Sørensen-Dice Coefficient as follows:

\[\text{Dice}(x, class) = \frac{2 |X| \cap |Y|}{|X| + |Y|}\]
where:
  • \(X\) expects to be the scores of each class.

  • \(Y\) expects to be the one-hot tensor with the class labels.

the loss, is finally computed as:

\[\text{loss}(x, class) = 1 - \text{Dice}(x, class)\]

[1] https://en.wikipedia.org/wiki/S%C3%B8rensen%E2%80%93Dice_coefficient

The shapes of input and target need to be \((N, C, H, W)\) where \(C\) = number of classes.

Parameters:
inputtorch.Tensor

Input of shape (N, C, H, W).

targettorch.Tensor

One-hot encoded target of shape (N, C, H, W).

roitorch.Tensor

ROI of shape (N, 1, H, W)

weighttorch.Tensor, optional

Per class weight

ignore_indexint, optional

Specifies a target value that is ignored and does not contribute to the input gradient.

epsfloat

Regularizer in the division

Returns:
torch.Tensor

Output as a torch.Tensor float

ahcore.readers module#

Reader classes.

  • H5FileImageReader: to read files written using the ahcore.writers.H5FileImageWriter.

class ahcore.readers.H5FileImageReader(filename: Path, stitching_mode: StitchingMode)[source]#

Bases: object

close() None[source]#
classmethod from_file_path(filename: Path, stitching_mode: StitchingMode = StitchingMode.CROP) H5FileImageReader[source]#
get_mpp(scaling: float | None) float[source]#
get_scaling(mpp: float | None) float[source]#

Inverse of get_mpp().

property mpp: float#
read_region(location: tuple[int, int], scaling: float, size: tuple[int, int]) ndarray[Any, dtype[generic]][source]#
Parameters:
locationtuple[int, int]

Location from the top left (x, y) in pixel coordinates given at the requested scaling.

scalingfloat
sizetuple[int, int]

Size of the output region

Returns:
np.ndarray

The requested region.

read_region_raw(location: tuple[int, int], size: tuple[int, int]) ndarray[Any, dtype[generic]][source]#

Reads a region in the stored h5 file. This function stitches the regions as saved in the h5 file. Doing this it takes into account: 1) The region overlap, several region merging strategies are implemented: cropping, averaging across borders

and taking the maximum across borders.

  1. If tiles are saved or not. In case the tiles are skipped due to a background mask, an empty tile is returned.

Parameters:
locationtuple[int, int]

Coordinates (x, y) of the upper left corner of the region.

sizetuple[int, int]

The (h, w) size of the extracted region.

Returns:
np.ndarray

Extracted region

property size: tuple[int, int]#
class ahcore.readers.StitchingMode(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

AVERAGE = 'average'#
CROP = 'crop'#
MAXIMUM = 'maximum'#
ahcore.readers.crop_to_bbox(array: ndarray[Any, dtype[generic]], bbox: tuple[tuple[int, int], tuple[int, int]]) ndarray[Any, dtype[generic]][source]#

ahcore.writers module#

This module contains writer classes. Currently implemented:

  • H5FileImageWriter: class to write H5 files based on iterators, for instance, the output of a dataset class. Can for instance be used to store outputs of models. The readers contain separate modules to read these h5 files.

class ahcore.writers.H5FileImageWriter(filename: Path, size: tuple[int, int], mpp: float, tile_size: tuple[int, int], tile_overlap: tuple[int, int], num_samples: int, is_compressed_image: bool = False, color_profile: bytes | None = None, progress: Any | None = None, extra_metadata: dict[str, Any] | None = None, precision: InferencePrecision | None = None, grid: Grid | None = None)[source]#

Bases: object

Image writer that writes tile-by-tile to h5.

add_associated_images(images: tuple[tuple[str, numpy.ndarray[Any, numpy.dtype[numpy.uint8]]], ...], description: str | None = None) None[source]#

Adds associated images to the h5 file.

adjust_batch_precision(batch: ndarray[Any, dtype[generic]]) ndarray[Any, dtype[generic]][source]#

Adjusts the batch precision based on the precision set in the writer.

consume(batch_generator: Generator[tuple[numpy.ndarray[Any, numpy.dtype[numpy.generic]], numpy.ndarray[Any, numpy.dtype[numpy.generic]]], None, None], connection_to_parent: Connection | None = None) None[source]#

Consumes tiles one-by-one from a generator and writes them to the h5 file.

init_writer(first_coordinates: ndarray[Any, dtype[generic]], first_batch: ndarray[Any, dtype[generic]], h5file: File) None[source]#

Initializes the image_dataset based on the first tile.

class ahcore.writers.H5TileFeatureWriter(filename: Path, size: tuple[int, int])[source]#

Bases: object

Feature writer that writes tile-by-tile feature representation to h5.

consume_features(feature_generator: Generator[tuple[numpy.ndarray[Any, numpy.dtype[numpy.generic]], numpy.ndarray[Any, numpy.dtype[numpy.generic]]], None, None], connection_to_parent: Connection | None = None) None[source]#

Consumes tiles one-by-one from a generator and writes them to the h5 file.

init_writer(first_features: ndarray[Any, dtype[generic]], h5file: File) None[source]#
ahcore.writers.decode_array_to_pil(array: ndarray[Any, dtype[uint8]]) Image[source]#

Convert encoded array to PIL image

Parameters:
arraynpt.NDArray[npt.uint8]

The encoded array

Returns:
PIL.Image.Image

The decoded image

Module contents#

Main ahcore module