import contextlib
import functools
import inspect
import os
import platform
import random
import tempfile
import threading
from contextvars import ContextVar
from dataclasses import dataclass
from typing import (
Any,
Callable,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
TypeVar,
Union,
cast,
)
import numpy
from packaging.version import Version
try:
from pydantic.v1 import ValidationError, create_model
except ImportError:
from pydantic import ValidationError, create_model # type: ignore
from wasabi import table # type: ignore
from .compat import (
cupy,
cupy_from_dlpack,
has_cupy,
has_cupy_gpu,
has_gpu,
has_mxnet,
has_tensorflow,
has_torch,
has_torch_cuda_gpu,
has_torch_mps,
)
from .compat import mxnet as mx
from .compat import tensorflow as tf
from .compat import torch
DATA_VALIDATION: ContextVar[bool] = ContextVar("DATA_VALIDATION", default=False)
from typing import TYPE_CHECKING
from . import types # noqa: E402
from .types import ArgsKwargs, ArrayXd, FloatsXd, IntsXd, Padded, Ragged # noqa: E402
if TYPE_CHECKING:
from .api import Ops
def get_torch_default_device() -> "torch.device":
if torch is None:
raise ValueError("Cannot get default Torch device when Torch is not available.")
from .backends import get_current_ops
from .backends.cupy_ops import CupyOps
from .backends.mps_ops import MPSOps
ops = get_current_ops()
if isinstance(ops, CupyOps):
device_id = torch.cuda.current_device()
return torch.device(f"cuda:{device_id}")
elif isinstance(ops, MPSOps):
return torch.device("mps")
return torch.device("cpu")
def get_array_module(arr): # pragma: no cover
if is_numpy_array(arr):
return numpy
elif is_cupy_array(arr):
return cupy
else:
raise ValueError(
"Only numpy and cupy arrays are supported"
f", but found {type(arr)} instead. If "
"get_array_module module wasn't called "
"directly, this might indicate a bug in Thinc."
)
def gpu_is_available():
return has_gpu
def fix_random_seed(seed: int = 0) -> None: # pragma: no cover
"""Set the random seed across random, numpy.random and cupy.random."""
random.seed(seed)
numpy.random.seed(seed)
if has_torch:
torch.manual_seed(seed)
if has_cupy_gpu:
cupy.random.seed(seed)
if has_torch and has_torch_cuda_gpu:
torch.cuda.manual_seed_all(seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False
def is_xp_array(obj: Any) -> bool:
"""Check whether an object is a numpy or cupy array."""
return is_numpy_array(obj) or is_cupy_array(obj)
def is_cupy_array(obj: Any) -> bool: # pragma: no cover
"""Check whether an object is a cupy array."""
if not has_cupy:
return False
elif isinstance(obj, cupy.ndarray):
return True
else:
return False
def is_numpy_array(obj: Any) -> bool:
"""Check whether an object is a numpy array."""
if isinstance(obj, numpy.ndarray):
return True
else:
return False
def is_torch_array(obj: Any) -> bool: # pragma: no cover
if torch is None:
return False
elif isinstance(obj, torch.Tensor):
return True
else:
return False
def is_torch_cuda_array(obj: Any) -> bool: # pragma: no cover
return is_torch_array(obj) and obj.is_cuda
def is_torch_gpu_array(obj: Any) -> bool: # pragma: no cover
return is_torch_cuda_array(obj) or is_torch_mps_array(obj)
def is_torch_mps_array(obj: Any) -> bool: # pragma: no cover
return is_torch_array(obj) and hasattr(obj, "is_mps") and obj.is_mps
def is_tensorflow_array(obj: Any) -> bool: # pragma: no cover
if not has_tensorflow:
return False
elif isinstance(obj, tf.Tensor): # type: ignore
return True
else:
return False
def is_tensorflow_gpu_array(obj: Any) -> bool: # pragma: no cover
return is_tensorflow_array(obj) and "GPU:" in obj.device
def is_mxnet_array(obj: Any) -> bool: # pragma: no cover
if not has_mxnet:
return False
elif isinstance(obj, mx.nd.NDArray): # type: ignore
return True
else:
return False
def is_mxnet_gpu_array(obj: Any) -> bool: # pragma: no cover
return is_mxnet_array(obj) and obj.context.device_type != "cpu"
def to_numpy(data): # pragma: no cover
if isinstance(data, numpy.ndarray):
return data
elif has_cupy and isinstance(data, cupy.ndarray):
return data.get()
else:
return numpy.array(data)
def set_active_gpu(gpu_id: int) -> "cupy.cuda.Device": # pragma: no cover
"""Set the current GPU device for cupy and torch (if available)."""
if not has_cupy_gpu:
raise ValueError("No CUDA GPU devices detected")
device = cupy.cuda.device.Device(gpu_id)
device.use()
if has_torch_cuda_gpu:
torch.cuda.set_device(gpu_id)
return device
def require_cpu() -> bool: # pragma: no cover
"""Use CPU through best available backend."""
from .backends import get_ops, set_current_ops
ops = get_ops("cpu")
set_current_ops(ops)
return True
def prefer_gpu(gpu_id: int = 0) -> bool: # pragma: no cover
"""Use GPU if it's available. Returns True if so, False otherwise."""
if has_gpu:
require_gpu(gpu_id=gpu_id)
return has_gpu
def require_gpu(gpu_id: int = 0) -> bool: # pragma: no cover
from .backends import CupyOps, MPSOps, set_current_ops
if platform.system() == "Darwin" and not has_torch_mps:
if has_torch:
raise ValueError("Cannot use GPU, installed PyTorch does not support MPS")
raise ValueError("Cannot use GPU, PyTorch is not installed")
elif platform.system() != "Darwin" and not has_cupy:
raise ValueError("Cannot use GPU, CuPy is not installed")
elif not has_gpu:
raise ValueError("No GPU devices detected")
if has_cupy_gpu:
set_current_ops(CupyOps())
set_active_gpu(gpu_id)
else:
set_current_ops(MPSOps())
return True
def copy_array(dst: ArrayXd, src: ArrayXd) -> None: # pragma: no cover
if isinstance(dst, numpy.ndarray) and isinstance(src, numpy.ndarray):
dst[:] = src
elif is_cupy_array(dst):
src = cupy.array(src, copy=False)
cupy.copyto(dst, src)
else:
numpy.copyto(dst, src) # type: ignore
def to_categorical(
Y: IntsXd,
n_classes: Optional[int] = None,
*,
label_smoothing: float = 0.0,
) -> FloatsXd:
if n_classes is None:
n_classes = int(numpy.max(Y) + 1) # type: ignore
if label_smoothing < 0.0:
raise ValueError(
"Label-smoothing parameter has to be greater than or equal to 0"
)
if label_smoothing == 0.0:
if n_classes == 0:
raise ValueError("n_classes should be at least 1")
nongold_prob = 0.0
else:
if not n_classes > 1:
raise ValueError(
"n_classes should be greater than 1 when label smoothing is enabled,"
f"but {n_classes} was provided."
)
nongold_prob = label_smoothing / (n_classes - 1)
max_smooth = (n_classes - 1) / n_classes
if n_classes > 1 and label_smoothing >= max_smooth:
raise ValueError(
f"For {n_classes} classes "
"label_smoothing parameter has to be less than "
f"{max_smooth}, but found {label_smoothing}."
)
xp = get_array_module(Y)
label_distr = xp.full((n_classes, n_classes), nongold_prob, dtype="float32")
xp.fill_diagonal(label_distr, 1 - label_smoothing)
return label_distr[Y]
def get_width(
X: Union[ArrayXd, Ragged, Padded, Sequence[ArrayXd]], *, dim: int = -1
) -> int:
"""Infer the 'width' of a batch of data, which could be any of: Array,
Ragged, Padded or Sequence of Arrays.
"""
if isinstance(X, Ragged):
return get_width(X.data, dim=dim)
elif isinstance(X, Padded):
return get_width(X.data, dim=dim)
elif hasattr(X, "shape") and hasattr(X, "ndim"):
X = cast(ArrayXd, X)
if len(X.shape) == 0:
return 0
elif len(X.shape) == 1:
return int(X.max()) + 1
else:
return X.shape[dim]
elif isinstance(X, (list, tuple)):
if len(X) == 0:
return 0
else:
return get_width(X[0], dim=dim)
else:
err = "Cannot get width of object: has neither shape nor __getitem__"
raise ValueError(err)
def assert_tensorflow_installed() -> None: # pragma: no cover
"""Raise an ImportError if TensorFlow is not installed."""
template = "TensorFlow support requires {pkg}: pip install thinc[tensorflow]\n\nEnable TensorFlow support with thinc.api.enable_tensorflow()"
if not has_tensorflow:
raise ImportError(template.format(pkg="tensorflow>=2.0.0,<2.6.0"))
def assert_mxnet_installed() -> None: # pragma: no cover
"""Raise an ImportError if MXNet is not installed."""
if not has_mxnet:
raise ImportError(
"MXNet support requires mxnet: pip install thinc[mxnet]\n\nEnable MXNet support with thinc.api.enable_mxnet()"
)
def assert_pytorch_installed() -> None: # pragma: no cover
"""Raise an ImportError if PyTorch is not installed."""
if not has_torch:
raise ImportError("PyTorch support requires torch: pip install thinc[torch]")
def convert_recursive(
is_match: Callable[[Any], bool], convert_item: Callable[[Any], Any], obj: Any
) -> Any:
"""Either convert a single value if it matches a given function, or
recursively walk over potentially nested lists, tuples and dicts applying
the conversion, and returns the same type. Also supports the ArgsKwargs
dataclass.
"""
if is_match(obj):
return convert_item(obj)
elif isinstance(obj, ArgsKwargs):
converted = convert_recursive(is_match, convert_item, list(obj.items()))
return ArgsKwargs.from_items(converted)
elif isinstance(obj, dict):
converted = {}
for key, value in obj.items():
key = convert_recursive(is_match, convert_item, key)
value = convert_recursive(is_match, convert_item, value)
converted[key] = value
return converted
elif isinstance(obj, list):
return [convert_recursive(is_match, convert_item, item) for item in obj]
elif isinstance(obj, tuple):
return tuple(convert_recursive(is_match, convert_item, item) for item in obj)
else:
return obj
def iterate_recursive(is_match: Callable[[Any], bool], obj: Any) -> Any:
"""Either yield a single value if it matches a given function, or recursively
walk over potentially nested lists, tuples and dicts yielding matching
values. Also supports the ArgsKwargs dataclass.
"""
if is_match(obj):
yield obj
elif isinstance(obj, ArgsKwargs):
yield from iterate_recursive(is_match, list(obj.items()))
elif isinstance(obj, dict):
for key, value in obj.items():
yield from iterate_recursive(is_match, key)
yield from iterate_recursive(is_match, value)
elif isinstance(obj, list) or isinstance(obj, tuple):
for item in obj:
yield from iterate_recursive(is_match, item)
def xp2torch(
xp_tensor: ArrayXd,
requires_grad: bool = False,
device: Optional["torch.device"] = None,
) -> "torch.Tensor": # pragma: no cover
"""Convert a numpy or cupy tensor to a PyTorch tensor."""
assert_pytorch_installed()
if device is None:
device = get_torch_default_device()
if hasattr(xp_tensor, "toDlpack"):
dlpack_tensor = xp_tensor.toDlpack() # type: ignore
torch_tensor = torch.utils.dlpack.from_dlpack(dlpack_tensor)
elif hasattr(xp_tensor, "__dlpack__"):
torch_tensor = torch.utils.dlpack.from_dlpack(xp_tensor)
else:
torch_tensor = torch.from_numpy(xp_tensor)
torch_tensor = torch_tensor.to(device)
if requires_grad:
torch_tensor.requires_grad_()
return torch_tensor
def torch2xp(
torch_tensor: "torch.Tensor", *, ops: Optional["Ops"] = None
) -> ArrayXd: # pragma: no cover
"""Convert a torch tensor to a numpy or cupy tensor depending on the `ops` parameter.
If `ops` is `None`, the type of the resultant tensor will be determined by the source tensor's device.
"""
from .api import NumpyOps
assert_pytorch_installed()
if is_torch_cuda_array(torch_tensor):
if isinstance(ops, NumpyOps):
return torch_tensor.detach().cpu().numpy()
else:
return cupy_from_dlpack(torch.utils.dlpack.to_dlpack(torch_tensor))
else:
if isinstance(ops, NumpyOps) or ops is None:
return torch_tensor.detach().cpu().numpy()
else:
return cupy.asarray(torch_tensor)
def xp2tensorflow(
xp_tensor: ArrayXd, requires_grad: bool = False, as_variable: bool = False
) -> "tf.Tensor": # type: ignore # pragma: no cover
"""Convert a numpy or cupy tensor to a TensorFlow Tensor or Variable"""
assert_tensorflow_installed()
if hasattr(xp_tensor, "toDlpack"):
dlpack_tensor = xp_tensor.toDlpack() # type: ignore
tf_tensor = tf.experimental.dlpack.from_dlpack(dlpack_tensor) # type: ignore
elif hasattr(xp_tensor, "__dlpack__"):
dlpack_tensor = xp_tensor.__dlpack__() # type: ignore
tf_tensor = tf.experimental.dlpack.from_dlpack(dlpack_tensor) # type: ignore
else:
tf_tensor = tf.convert_to_tensor(xp_tensor) # type: ignore
if as_variable:
# tf.Variable() automatically puts in GPU if available.
# So we need to control it using the context manager
with tf.device(tf_tensor.device): # type: ignore
tf_tensor = tf.Variable(tf_tensor, trainable=requires_grad) # type: ignore
if requires_grad is False and as_variable is False:
# tf.stop_gradient() automatically puts in GPU if available.
# So we need to control it using the context manager
with tf.device(tf_tensor.device): # type: ignore
tf_tensor = tf.stop_gradient(tf_tensor) # type: ignore
return tf_tensor
def tensorflow2xp(
tf_tensor: "tf.Tensor", *, ops: Optional["Ops"] = None # type: ignore
) -> ArrayXd: # pragma: no cover
"""Convert a Tensorflow tensor to numpy or cupy tensor depending on the `ops` parameter.
If `ops` is `None`, the type of the resultant tensor will be determined by the source tensor's device.
"""
from .api import NumpyOps
assert_tensorflow_installed()
if is_tensorflow_gpu_array(tf_tensor):
if isinstance(ops, NumpyOps):
return tf_tensor.numpy()
else:
dlpack_tensor = tf.experimental.dlpack.to_dlpack(tf_tensor) # type: ignore
return cupy_from_dlpack(dlpack_tensor)
else:
if isinstance(ops, NumpyOps) or ops is None:
return tf_tensor.numpy()
else:
return cupy.asarray(tf_tensor.numpy())
def xp2mxnet(
xp_tensor: ArrayXd, requires_grad: bool = False
) -> "mx.nd.NDArray": # type: ignore # pragma: no cover
"""Convert a numpy or cupy tensor to a MXNet tensor."""
assert_mxnet_installed()
if hasattr(xp_tensor, "toDlpack"):
dlpack_tensor = xp_tensor.toDlpack() # type: ignore
mx_tensor = mx.nd.from_dlpack(dlpack_tensor) # type: ignore
else:
mx_tensor = mx.nd.from_numpy(xp_tensor) # type: ignore
if requires_grad:
mx_tensor.attach_grad()
return mx_tensor
def mxnet2xp(
mx_tensor: "mx.nd.NDArray", *, ops: Optional["Ops"] = None # type: ignore
) -> ArrayXd: # pragma: no cover
"""Convert a MXNet tensor to a numpy or cupy tensor."""
from .api import NumpyOps
assert_mxnet_installed()
if is_mxnet_gpu_array(mx_tensor):
if isinstance(ops, NumpyOps):
return mx_tensor.detach().asnumpy()
else:
return cupy_from_dlpack(mx_tensor.to_dlpack_for_write())
else:
if isinstance(ops, NumpyOps) or ops is None:
return mx_tensor.detach().asnumpy()
else:
return cupy.asarray(mx_tensor.asnumpy())
# This is how functools.partials seems to do it, too, to retain the return type
PartialT = TypeVar("PartialT")
def partial(
func: Callable[..., PartialT], *args: Any, **kwargs: Any
) -> Callable[..., PartialT]:
"""Wrapper around functools.partial that retains docstrings and can include
other workarounds if needed.
"""
partial_func = functools.partial(func, *args, **kwargs)
partial_func.__doc__ = func.__doc__
return partial_func
class DataValidationError(ValueError):
def __init__(
self,
name: str,
X: Any,
Y: Any,
errors: Union[Sequence[Mapping[str, Any]], List[Dict[str, Any]]] = [],
) -> None:
"""Custom error for validating inputs / outputs at runtime."""
message = f"Data validation error in '{name}'"
type_info = f"X: {type(X)} Y: {type(Y)}"
data = []
for error in errors:
err_loc = " -> ".join([str(p) for p in error.get("loc", [])])
data.append((err_loc, error.get("msg")))
result = [message, type_info, table(data)]
ValueError.__init__(self, "\n\n" + "\n".join(result))
class _ArgModelConfig:
extra = "forbid"
arbitrary_types_allowed = True
def validate_fwd_input_output(
name: str, func: Callable[[Any, Any, bool], Any], X: Any, Y: Any
) -> None:
"""Validate the input and output of a forward function against the type
annotations, if available. Used in Model.initialize with the input and
output samples as they pass through the network.
"""
sig = inspect.signature(func)
empty = inspect.Signature.empty
params = list(sig.parameters.values())
if len(params) != 3:
bad_params = f"{len(params)} ({', '.join([p.name for p in params])})"
err = f"Invalid forward function. Expected 3 arguments (model, X , is_train), got {bad_params}"
raise DataValidationError(name, X, Y, [{"msg": err}])
annot_x = params[1].annotation
annot_y = sig.return_annotation
sig_args: Dict[str, Any] = {"__config__": _ArgModelConfig}
args = {}
if X is not None and annot_x != empty:
if isinstance(X, list) and len(X) > 5:
X = X[:5]
sig_args["X"] = (annot_x, ...)
args["X"] = X
if Y is not None and annot_y != empty:
if isinstance(Y, list) and len(Y) > 5:
Y = Y[:5]
sig_args["Y"] = (annot_y, ...)
args["Y"] = (Y, lambda x: x)
ArgModel = create_model("ArgModel", **sig_args)
# Make sure the forward refs are resolved and the types used by them are
# available in the correct scope. See #494 for details.
ArgModel.update_forward_refs(**types.__dict__)
try:
ArgModel.parse_obj(args)
except ValidationError as e:
raise DataValidationError(name, X, Y, e.errors()) from None
@contextlib.contextmanager
def make_tempfile(mode="r"):
f = tempfile.NamedTemporaryFile(mode=mode, delete=False)
yield f
f.close()
os.remove(f.name)
@contextlib.contextmanager
def data_validation(validation):
with threading.Lock():
prev = DATA_VALIDATION.get()
DATA_VALIDATION.set(validation)
yield
DATA_VALIDATION.set(prev)
@contextlib.contextmanager
def use_nvtx_range(message: str, id_color: int = -1):
"""Context manager to register the executed code as an NVTX range. The
ranges can be used as markers in CUDA profiling."""
if has_cupy:
cupy.cuda.nvtx.RangePush(message, id_color)
yield
cupy.cuda.nvtx.RangePop()
else:
yield
@dataclass
class ArrayInfo:
"""Container for info for checking array compatibility."""
shape: types.Shape
dtype: types.DTypes
@classmethod
def from_array(cls, arr: ArrayXd):
return cls(shape=arr.shape, dtype=arr.dtype)
def check_consistency(self, arr: ArrayXd):
if arr.shape != self.shape:
raise ValueError(
f"Shape mismatch in backprop. Y: {self.shape}, dY: {arr.shape}"
)
if arr.dtype != self.dtype:
raise ValueError(
f"Type mismatch in backprop. Y: {self.dtype}, dY: {arr.dtype}"
)
# fmt: off
__all__ = [
"get_array_module",
"get_torch_default_device",
"fix_random_seed",
"is_cupy_array",
"is_numpy_array",
"set_active_gpu",
"prefer_gpu",
"require_gpu",
"copy_array",
"to_categorical",
"get_width",
"xp2torch",
"torch2xp",
"tensorflow2xp",
"xp2tensorflow",
"validate_fwd_input_output",
"DataValidationError",
"make_tempfile",
"use_nvtx_range",
"ArrayInfo",
"has_cupy",
"has_torch",
]
# fmt: on