import sys
import torch
from torchsurv.tools.validate_data import validate_log_shape, validate_loss
__all__ = [
"cumulative_hazard",
"survival_function",
"log_hazard",
"neg_log_likelihood",
]
[docs]
def cumulative_hazard(
log_params: torch.Tensor,
time: torch.Tensor,
all_times: bool = True,
clamp_value: float = 1e10,
) -> torch.Tensor:
"""Cumulative hazard for the Weibull Accelerated Time Failure (AFT) survival model.
Args:
log_params (torch.Tensor, float):
Parameters of the Weibull distribution of shape = (n_samples, 1) or (n_samples, 2).
The first column corresponds to the log scale parameter. The second column
corresponds to the log shape parameter. If the log shape parameter is missing, it is
imputed with 0.
time (torch.Tensor, float):
Time-to-event or censoring of length n_samples.
all_times (bool)
If True, subject-specific cumulative hazard is evaluated at all ``time`` (used for evaluation metrics).
If False, subject-specific cumulative hazard is evaluated at respective ``time``.
Defaults is True.
Returns:
(torch.Tensor, float): Subject-specific cumulative hazard evaluated at ``time``.
Examples:
>>> _ = torch.manual_seed(42)
>>> time = torch.randint(low=1, high=100, size=(4,))
>>> log_params = torch.randn((4, 2))
>>> cumulative_hazard(log_params, time, all_times=False) # Cumulative hazard at respective time
tensor([ 8.6257, 112.2115, 3.5105, 112.6339])
>>> cumulative_hazard(log_params, time, all_times=True) # Default. Cumulative hazard at all time
tensor([[ 8.6257, 233.0865, 239.2167, 126.2805],
[ 12.7698, 112.2115, 114.1484, 74.9134],
[ 0.8706, 3.4725, 3.5105, 2.6850],
[ 6.9530, 212.7592, 218.5687, 112.6339]])
"""
log_scale, log_shape = validate_log_shape(log_params).unbind(1)
if all_times:
# Use all times for each sample
time = time.unsqueeze(0).expand(len(time), len(time)) # expand across rows
log_scale = log_scale.unsqueeze(1).expand(
len(time), len(time)
) # expand across columns
log_shape = log_shape.unsqueeze(1).expand(
len(time), len(time)
) # expand across columns
return torch.clamp(
torch.exp(
torch.exp(log_shape)
* (torch.log(torch.clamp(time, min=1e-100, max=torch.inf)) - log_scale)
),
min=0,
max=clamp_value,
)
[docs]
def log_hazard(
log_params: torch.Tensor,
time: torch.Tensor,
all_times: bool = True,
clamp_value: float = 1e10,
) -> torch.Tensor:
"""Log hazard of the Weibull Accelerated Time Failure (AFT) survival model.
Args:
log_params (torch.Tensor, float):
Parameters of the Weibull distribution of shape = (n_samples, 1) or (n_samples, 2).
The first column corresponds to the log scale parameter. The second column
corresponds to the log shape parameter. If the log shape parameter is missing, it is
imputed with 0.
time (torch.Tensor, float):
Time at which to evaluate the log hazard.
Should be of length n_samples to evaluate the log hazard at observed time-to-event or censoring,
or of length one to evaluate the log hazard at a new time.
all_times (bool):
If True, subject-specific log hazard is evaluated at all ``time`` (used for evaluation metrics).
If False, subject-specific log hazard is evaluated at respective ``time``.
Defaults is True.
Ignored if ``time`` is of length one.
Returns:
(torch.Tensor, float): Subject-specific log hazard evaluated at ``time``.
Examples:
>>> _ = torch.manual_seed(42)
>>> time = torch.randint(low=1, high=100, size=(4,))
>>> log_params = torch.randn((4, 2))
>>> log_hazard(log_params, time, all_times = False) # Log hazard at respective time
tensor([ 0.4392, -0.0303, -3.9672, 0.9140])
>>> log_hazard(log_params, time, all_times = True) # Default. Log hazard at all time
tensor([[ 0.4392, 1.1174, 1.1227, 0.9913],
[ 0.4148, -0.0303, -0.0338, 0.0525],
[-2.7225, -3.9575, -3.9672, -3.7279],
[ 0.2606, 1.0632, 1.0695, 0.9140]])
>>> log_hazard(log_params, time=torch.tensor(10.0)) # Log hazard at one new time (e.g., 10 years)
tensor([ 0.5316, 0.3542, -2.8907, 0.3699])
>>> for t in torch.tensor([100.0, 150.0]): log_hazard(log_params, time=t) # Subject-specific log hazard at multiple new times
tensor([ 1.1280, -0.0372, -3.9767, 1.0757])
tensor([ 1.2330, -0.1062, -4.1680, 1.1999])
>>> log_params *= 1e2 # Increase scale
>>> log_hazard(log_params, time, all_times = False) # Check for Torch.Inf values
tensor([-1.0000e+10, -2.3197e+01, -6.8385e+01, -1.0000e+10])
"""
log_scale, log_shape = validate_log_shape(log_params).unbind(1)
if time.dim() == 0:
# Use fixed time for each sample
time = time.repeat(len(log_params))
elif time.size(0) == log_params.size(0) and all_times:
# Use all times for each sample
time = time.unsqueeze(0).expand(len(time), len(time)) # expand across rows
log_scale = log_scale.unsqueeze(1).expand(
len(time), len(time)
) # expand across columns
log_shape = log_shape.unsqueeze(1).expand(
len(time), len(time)
) # expand across columns
if time.size(0) != log_params.size(0):
raise ValueError(
f"Dimension mismatch: 'time' ({len(time)}) does not match the length of 'log_params' ({len(log_params)})."
)
return torch.clamp(
log_shape
- log_scale
+ torch.expm1(log_shape)
* (torch.log(torch.clamp(time, min=1e-100, max=torch.inf)) - log_scale),
min=-clamp_value,
max=clamp_value,
)
[docs]
def neg_log_likelihood(
log_params: torch.Tensor,
event: torch.Tensor,
time: torch.Tensor,
reduction: str = "mean",
checks: bool = True,
) -> torch.Tensor:
r"""
Negative of the log likelihood for the Weibull Accelerated Time Failure (AFT) survival model.
Args:
log_params (torch.Tensor, float):
Parameters of the Weibull distribution of shape = (n_samples, 1) or (n_samples, 2).
The first column corresponds to the log scale parameter. The second column
corresponds to the log shape parameter. If the log shape parameter is missing, it is
imputed with 0.
event (torch.Tensor, bool):
Event indicator of length n_samples (= True if event occurred).
time (torch.Tensor, float):
Time-to-event or censoring of length n_samples.
reduction (str):
Method to reduce losses. Defaults to "mean".
Must be one of the following: "sum", "mean".
checks (bool):
Whether to perform input format checks.
Enabling checks can help catch potential issues in the input data.
Defaults to True.
Returns:
(torch.Tensor, float): Negative of the log likelihood.
Note:
For each subject :math:`i \in \{1, \cdots, N\}`, denote :math:`X_i` as the survival time and :math:`D_i` as the
censoring time. Survival data consist of the event indicator, :math:`\delta_i=1(X_i\leq D_i)`
(argument ``event``) and the time-to-event or censoring, :math:`T_i = \min(\{ X_i,D_i \})`
(argument ``time``).
The log hazard function for the Weibull AFT survival model :cite:p:`Carroll2003` of subject :math:`i` at time :math:`t` has the form:
.. math::
\log h_i(t) = \log{\rho_i} - \log{\lambda_i} + (\rho_i -1) \left( \log{t} - \log{\lambda_i}\right)
where :math:`\log{\lambda_i}` is the log scale parameter (first column of argument ``log_params``)
and :math:`\log{\rho_i}` is the log shape parameter (second column of argument ``log_params``).
The cumulative hazard for the Weibull survival model of subject :math:`i` at time :math:`t` has the form:
.. math::
H_i(t) = \left(\frac{t}{\lambda_i}\right)^{\rho_i}
The survival function for the Weibull survival model of subject :math:`i` at time :math:`t` has the form:
.. math::
S_i(t) = 1 - F(t | \lambda_i, \rho_i)
where :math:`F(t | \lambda, \rho)` is the cumulative distribution function (CDF) of the Weibull distribution given
scale parameter :math:`\lambda` and shape parameter :math:`\rho`.
The log likelihood of the Weibull survival model is
.. math::
ll = \sum_{i: \delta_i = 1} \log h_i(T_i) - \sum_{i = 1}^N H_i(T_i)
Examples:
>>> _ = torch.manual_seed(42)
>>> n = 4
>>> log_params = torch.randn((n, 2))
>>> event = torch.randint(low=0, high=2, size=(n,), dtype=torch.bool)
>>> time = torch.randint(low=1, high=100, size=(n,))
>>> neg_log_likelihood(log_params, event, time) # Default: mean of log likelihoods across subject
tensor(47.5035)
>>> neg_log_likelihood(log_params, event, time, reduction = 'sum') # Sum of log likelihoods across subject
tensor(190.0141)
>>> neg_log_likelihood(torch.randn((n, 1)), event, time) # Missing shape: exponential decrease
tensor(66.7203)
References:
.. bibliography::
:filter: False
Carroll2003
"""
if checks:
validate_loss(log_params, event, time, model_type="weibull")
# Negative log likelihood
nll = torch.neg(
event * log_hazard(log_params, time, False)
- cumulative_hazard(log_params, time, False) # Huge values here
)
if any(torch.isinf(nll)):
# Remove any torch.inf values
nll = nll[~torch.isinf(nll)]
if reduction.lower() == "mean":
loss = nll.nanmean()
elif reduction.lower() == "sum":
loss = nll.sum()
else:
raise (
ValueError(
f"Reduction {reduction} is not implemented yet, should be one of ['mean', 'sum']."
)
)
return loss
[docs]
def survival_function(
log_params: torch.Tensor, time: torch.Tensor, all_times: bool = True
) -> torch.Tensor:
"""Survival function for the Weibull Accelerated Time Failure (AFT) survival model.
Args:
log_params (torch.Tensor, float):
Parameters of the Weibull distribution of shape = (n_samples, 1) or (n_samples, 2).
The first column corresponds to the log scale parameter. The second column
corresponds to the log shape parameter. If the log shape parameter is missing, it is
imputed with 0.
time (torch.Tensor, float):
Time at which to evaluate the survival function.
Should be of length n_samples to evaluate the survival function at observed time-to-event or censoring,
or of length one to evaluate the survival function at a new time.
all_times (bool):
If True, subject-specific survival function is evaluated at all ``time`` (used for evaluation metrics).
If False, subject-specific survival function is evaluated at respective ``time``.
Defaults is True.
Ignored if ``time`` is of length one.
Returns:
(torch.Tensor, float): Subject-specific survival function evaluated at ``time``.
Examples:
>>> _ = torch.manual_seed(42)
>>> time = torch.randint(low=1, high=100, size=(4,))
>>> log_params = torch.randn((4, 2))
>>> survival_function(log_params, time, all_times = False) # Survival at respective time
tensor([0.0002, 0.0000, 0.0299, 0.0000])
>>> survival_function(log_params, time, all_times = True) # Default. Survival at all observed time
tensor([[1.7941e-04, 0.0000e+00, 0.0000e+00, 0.0000e+00],
[2.8610e-06, 0.0000e+00, 0.0000e+00, 0.0000e+00],
[4.1870e-01, 3.1040e-02, 2.9881e-02, 6.8224e-02],
[9.5576e-04, 0.0000e+00, 0.0000e+00, 0.0000e+00]])
>>> survival_function(log_params, time=torch.tensor(10.0)) # Survival at one new time (e.g., 10 years)
tensor([1.3709e-06, 5.9605e-08, 3.4954e-01, 1.5438e-05])
>>> for t in torch.tensor([100.0, 150.0]): survival_function(log_params, time=t) # Subject-specific survival at multiple new times
tensor([0.0000, 0.0000, 0.0288, 0.0000])
tensor([0.0000, 0.0000, 0.0123, 0.0000])
"""
log_scale, log_shape = validate_log_shape(log_params).unbind(1)
if time.dim() == 0:
# Use one time for each sample
time = time.repeat(len(log_params))
elif all([time.size(0) == log_params.size(0), all_times]):
# Use all times for each sample
time = time.unsqueeze(0).expand(len(time), len(time)) # expand across rows
log_scale = log_scale.unsqueeze(1).expand(
len(time), len(time)
) # expand across columns
log_shape = log_shape.unsqueeze(1).expand(
len(time), len(time)
) # expand across columns
if time.size(0) != log_params.size(0):
raise ValueError(
f"Dimension mismatch: 'time' ({len(time)}) does not match the length of 'log_params' ({len(log_params)})."
)
return 1 - torch.distributions.weibull.Weibull(
torch.exp(log_scale), torch.exp(log_shape)
).cdf(time)
if __name__ == "__main__":
import doctest
# Run doctest
results = doctest.testmod()
if results.failed == 0:
print("All tests passed.")
else:
print("Some doctests failed.")
sys.exit(1)