from typing import Optional, Union
import numpy as np
from scipy import sparse
__author__ = "TeddyHuang-00"
__copyright__ = "TeddyHuang-00"
__license__ = "MIT"
[docs]class FlyHash:
"""FlyHash is a local sensitive hashing (LSH) algorithm,
based on the paper "A neural algorithm for fundamental computing problems"
by S. Dasgupta, C. F. Stevens, and S. Navlakha (2017).
FlyHash is a LSH algorithm that maps input data to a sparse hash embedding,
where the dimension of the hash embedding is much larger than the input,
and keeps the locality of the input data in the hash embedding.
FlyHash is designed to be cheap to compute, yet not ganranteeing
memory efficiency. It is suitable for hashing small to medium sized data
(d ~ 10-1000) to a large hash embedding (m ~ 100-10000).
Example
-------
Using a large hash_dim m=100 for a small input_dim d=10:
>>> import numpy as np
>>> from flyhash import FlyHash
>>> d = 10
>>> m = 100
>>> flyhash = FlyHash(d, m)
>>> data = np.random.randn(5, d)
>>> hashed_data = flyhash(data)
"""
[docs] def __init__(
self,
input_dim: int,
hash_dim: int,
density: Union[int, float] = 0.1,
sparsity: float = 0.05,
quant_step: Optional[float] = None,
dtype: type = np.int32,
seed: Optional[int] = None,
):
"""Initialize FlyHash object.
Parameters
----------
input_dim : int
Input dimension of the data to be hashed.
Note that FlyHash can only hash data with a fixed input dimensions.
hash_dim : int
Output dimension of the hash embeddings.
density : Union[int, float], optional
The connection density from input to output,
either a float between 0 and 1, or an integer greater than 0,
by default 0.1.
If `density` is a float, each column of the projection matrix
has non-zero entries with probability `density`.
If `density` is an integer, each column of the projection matrix
has exactly `density` non-zero entries.
sparsity : float, optional
The sparsity level of hash embeddings, must be a float
between 0 and 1, by default 0.05.
The sparsity level is defined as the fraction of non-zero
entries in the hash embeddings. The precise number of non-zero
entries is determined by `hash_dim` and `sparsity` as
`round(hash_dim * sparsity)`.
quant_step : Optional[float], optional
The quantization step size to use, either a float representing
the step size, or `None` for all-or-none clipping, by default `None`.
If `quant_step` is a float, the hash embeddings will be quantized
to an integer multiple of `quant_step`. To avoid information loss,
the ceiling of the quantized value is used, i.e. the quantized value
is always greater than 0 iff the original value is greater than 0.
dtype : type, optional
Data type of the output hash embeddings, by default np.int32.
Note that the data type must be one of the integer types.
seed : Optional[int], optional
The random seed to use when generating the projection matrix,
by default `None`.
"""
self.input_dim = input_dim
assert self.input_dim > 0, "input_dim must be positive"
self.hash_dim = hash_dim
assert self.hash_dim > 0, "hash_dim must be positive"
self.density = density
if isinstance(self.density, int):
assert self.density > 0, "density must be positive"
else:
assert 0 < self.density < 1, "density must be between 0 and 1"
self.sparsity = sparsity
assert 0 < self.sparsity < 1, "sparsity must be between 0 and 1"
self.num_winners = int(np.round(self.hash_dim * self.sparsity))
self.quant_step = quant_step
if self.quant_step is not None:
assert self.quant_step > 0, "quant_step must be positive"
self.dtype = dtype
assert np.issubdtype(
self.dtype, np.integer
), "dtype must be one of np.integer type"
self.seed = seed
self.rng = np.random.default_rng(self.seed)
self._construct_projection_matrix()
[docs] def __call__(self, input_data: np.ndarray) -> np.ndarray:
"""Hash input_data to hash_dim vector(s).
Parameters
----------
input_data : np.ndarray
Input data to be hashed, must be 1D (input_dim,)
or 2D (batch_size, input_dim).
Returns
-------
np.ndarray
Hashed data, 1D (hash_dim,) or 2D (batch_size, hash_dim)
according to the shape of input_data.
"""
assert 0 < input_data.ndim <= 2, "input_data must be 1D or 2D"
if input_data.ndim == 1:
input_data = input_data[np.newaxis, :]
assert input_data.shape[1] == self.input_dim, (
f"input_data of shape {input_data.shape[0]}x{input_data.shape[1]} has "
f"input dimension incompatible with {self.input_dim}"
)
projected_data = input_data @ self.projection_matrix.T
return np.apply_along_axis(self._winner_take_all, 1, projected_data).squeeze()
[docs] def _construct_projection_matrix(self):
"""Construct projection matrix from input_dim to hash_dim.
Notes
-----
The projection matrix `W` is a binary matrix of shape (hash_dim, input_dim).
This method constructs `W` by sampling from a binomial distribution
with parameters `n=input_dim` and `p=density` if `density` is a float,
or by sampling from a uniform distribution over `input_dim`
if `density` is an integer.
This method is called by the constructor. It SHOULD NOT be called again
later to avoid overwriting the projection matrix. Doing so will make the
hash embeddings non-deterministic.
"""
if isinstance(self.density, int):
assert self.density <= self.hash_dim, "density must be less than hash_dim"
# Create fixed number of connections when density is an integer.
# Each column of the projection matrix has exactly density non-zero entries.
# The non-zero entries are all 1.
col = np.concatenate(
[
self.rng.choice(self.input_dim, self.density, replace=False)
for _ in range(self.hash_dim)
]
)
row = np.repeat(np.arange(self.hash_dim), self.density)
self.projection_matrix = sparse.csr_matrix(
(np.ones_like(row, dtype=bool), (row, col)),
shape=(self.hash_dim, self.input_dim),
)
else:
# Create variable number of connections when density is a float.
# Each column of the projection matrix has non-zero entries
# with probability density.
# The non-zero entries are all 1.
self.projection_matrix: sparse.csr_matrix = sparse.random(
self.hash_dim,
self.input_dim,
self.density,
format="csr",
random_state=self.rng,
dtype=bool,
data_rvs=lambda n: np.ones(n, dtype=bool),
).tocsr()
[docs] def _winner_take_all(self, input_vector: np.ndarray) -> np.ndarray:
"""Winner-take-all operation.
This also includes quantization step using `quant_step` if specified,
to reduce the number of unique values.
Parameters
----------
input_vector : np.ndarray
1D input vector to be winner-take-all-ed. The number of winners
is pre-determined by `num_winners`.
Returns
-------
np.ndarray
1D winner-take-all-ed vector of given integer type `dtype`.
"""
result = np.zeros_like(input_vector, dtype=self.dtype)
indices = np.argpartition(input_vector, -self.num_winners)[-self.num_winners :]
if self.quant_step is None:
# Clip to 0 or 1.
result[indices] = 1
else:
# Quantize to steps as specified by quant_step.
# Clipped by the range of dtype.
result[indices] = np.ceil(input_vector[indices] / self.quant_step).clip(
0, np.iinfo(self.dtype).max
)
return result
def __repr__(self):
return (
f"HashEmbedding(input_dim={self.input_dim}, hash_dim={self.hash_dim}, "
f"density={self.density}, sparsity={self.sparsity}, "
f"quant_step={self.quant_step}, dtype={self.dtype}, seed={self.seed})"
)
[docs] def reset(self, seed: Optional[int] = None):
"""Reset the random seed if provided and re-construct the projection matrix.
Parameters
----------
seed : Optional[int], optional
The new seed to be used, omitted when set to `None`, by default `None`.
"""
if seed is not None:
self.rng = np.random.default_rng(seed)
self._construct_projection_matrix()