Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions diskann-benchmark/src/backend/exhaustive/product.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,10 @@ mod imp {
5,
);

let offsets = diskann_providers::model::pq::calculate_chunk_offsets_auto(
data.ncols(),
input.num_pq_chunks.get(),
);
let dim = std::num::NonZeroUsize::new(data.ncols())
.ok_or_else(|| anyhow::anyhow!("data has zero columns"))?;
let offsets =
diskann_quantization::views::ChunkOffsets::partition(dim, input.num_pq_chunks)?;

let base = {
let threadpool = rayon::ThreadPoolBuilder::new()
Expand All @@ -97,7 +97,7 @@ mod imp {
threadpool.install(|| -> anyhow::Result<_> {
Ok(parameters.train(
data.as_view(),
diskann_quantization::views::ChunkOffsetsView::new(offsets.as_slice())?,
offsets.as_view(),
diskann_quantization::Parallelism::Rayon,
&diskann_quantization::random::StdRngBuilder::new(input.seed),
&diskann_quantization::cancel::DontCancel,
Expand All @@ -109,7 +109,7 @@ mod imp {
data.ncols(),
base.flatten().into(),
vec![0.0; data.ncols()].into(),
offsets.into(),
offsets.as_slice().into(),
)?;

let training_time: MicroSeconds = start.elapsed().into();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ mod tests {
let c = provider.query_computer(&[-0.5, -0.5]).unwrap();
let expected: f32 = 1.5 * 1.5 * 2.0;
assert_eq!(
c.evaluate_similarity(&provider.get_vector_sync(3).unwrap()),
c.evaluate_similarity(provider.get_vector_sync(3).unwrap().as_slice()),
expected
);

Expand All @@ -362,14 +362,14 @@ mod tests {
assert_eq!(
d.evaluate_similarity(
provider.get_vector_sync(0).unwrap().as_slice(),
provider.get_vector_sync(3).unwrap().as_slice(),
provider.get_vector_sync(3).unwrap().as_slice()
),
2.0
);

let slice: &[f32] = &[-0.5, -0.5];
assert_eq!(
d.evaluate_similarity(slice, &provider.get_vector_sync(3).unwrap()),
d.evaluate_similarity(slice, provider.get_vector_sync(3).unwrap().as_slice()),
expected,
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,7 @@ mod tests {
// Query Computer.
let c = provider.query_computer(&[-0.5, -0.5]).unwrap();
let expected: f32 = 1.5 * 1.5 * 2.0;
assert_eq!(
c.evaluate_similarity(&provider.get_vector_sync(3)),
expected
);
assert_eq!(c.evaluate_similarity(provider.get_vector_sync(3)), expected);

// Distance Computer.
let d = provider.distance_computer();
Expand Down
8 changes: 4 additions & 4 deletions diskann-providers/src/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ pub use configuration::IndexConfiguration;
pub mod pq;
pub use pq::{
FixedChunkPQTable, GeneratePivotArguments, MAX_PQ_TRAINING_SET_SIZE, NUM_KMEANS_REPS_PQ,
NUM_PQ_CENTROIDS, accum_row_inplace, calculate_chunk_offsets_auto, compute_pq_distance,
compute_pq_distance_for_pq_coordinates, direct_distance_impl, distance,
generate_pq_data_from_pivots_from_membuf, generate_pq_data_from_pivots_from_membuf_batch,
generate_pq_pivots, generate_pq_pivots_from_membuf,
NUM_PQ_CENTROIDS, compute_pq_distance, compute_pq_distance_for_pq_coordinates,
direct_distance_impl, distance, generate_pq_data_from_pivots_from_membuf,
generate_pq_data_from_pivots_from_membuf_batch, generate_pq_pivots,
generate_pq_pivots_from_membuf,
};

pub mod statistics;
Expand Down
19 changes: 0 additions & 19 deletions diskann-providers/src/model/pq/distance/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,25 +101,6 @@ where
}
}

impl<T> PreprocessedDistanceFunction<&Vec<u8>, f32> for QueryComputer<T>
where
T: Deref<Target = FixedChunkPQTable>,
{
fn evaluate_similarity(&self, changing: &Vec<u8>) -> f32 {
self.evaluate_similarity(changing.as_slice())
}
}

impl<T> PreprocessedDistanceFunction<&&[u8], f32> for QueryComputer<T>
where
T: Deref<Target = FixedChunkPQTable>,
{
fn evaluate_similarity(&self, changing: &&[u8]) -> f32 {
let changing: &[u8] = changing;
self.evaluate_similarity(changing)
}
}

/// Pre-dispatched distance functions for the `FixedChunkPQTable`.
#[derive(Debug)]
pub struct VTable {
Expand Down
10 changes: 8 additions & 2 deletions diskann-providers/src/model/pq/distance/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ use diskann_vector::{
use rand::{Rng, distr::Distribution};
use rand_distr::{Normal, Uniform};

use crate::model::{FixedChunkPQTable, pq::calculate_chunk_offsets_auto};
use crate::model::FixedChunkPQTable;
use diskann_quantization::views::ChunkOffsets;

/// We need a way to generate random queries.
///
Expand Down Expand Up @@ -130,7 +131,12 @@ pub(crate) fn generate_expected_vector(
/// * N + 1: The number of PQ Pivots
pub(crate) fn seed_pivot_table(config: TableConfig) -> FixedChunkPQTable {
// Get the chunk offsets for the selected dimension and bytes.
let offsets = calculate_chunk_offsets_auto(config.dim, config.pq_chunks);
let chunk_offsets = ChunkOffsets::partition(
std::num::NonZeroUsize::new(config.dim).unwrap(),
std::num::NonZeroUsize::new(config.pq_chunks).unwrap(),
)
.unwrap();
let offsets = chunk_offsets.as_slice();

// Create the pivot table following the schema described in the docstring.
let mut pivots = Vec::<f32>::new();
Expand Down
7 changes: 3 additions & 4 deletions diskann-providers/src/model/pq/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,9 @@ pub use fixed_chunk_pq_table::{
mod pq_construction;
pub use pq_construction::{
MAX_PQ_TRAINING_SET_SIZE, NUM_KMEANS_REPS_PQ, NUM_PQ_CENTROIDS, accum_row_inplace,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could please provide locations of these two call sites in the PR description:

  • Inline accum_row_inplace from diskann-providers\src\model\pq at the two call-sites.

calculate_chunk_offsets, calculate_chunk_offsets_auto, generate_pq_data_from_pivots,
generate_pq_data_from_pivots_from_membuf, generate_pq_data_from_pivots_from_membuf_batch,
generate_pq_pivots, generate_pq_pivots_from_membuf, get_chunk_from_training_data,
move_train_data_by_centroid,
generate_pq_data_from_pivots, generate_pq_data_from_pivots_from_membuf,
generate_pq_data_from_pivots_from_membuf_batch, generate_pq_pivots,
generate_pq_pivots_from_membuf, move_train_data_by_centroid,
};
Comment thread
arkrishn94 marked this conversation as resolved.

/// all metadata of individual sub-component files is written in first 4KB for unified files
Expand Down
118 changes: 44 additions & 74 deletions diskann-providers/src/model/pq/pq_construction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use std::{
io::{Seek, SeekFrom, Write},
mem::size_of,
num::NonZeroUsize,
sync::atomic::AtomicBool,
vec,
};
Expand All @@ -19,6 +20,7 @@ use diskann::{
use diskann_quantization::{
CompressInto,
product::{BasicTableView, TransposedTable, train::TrainQuantizer},
views::{ChunkOffsets, ChunkOffsetsView},
};
use diskann_utils::{
io::Metadata,
Expand Down Expand Up @@ -94,12 +96,11 @@ where
);
}

let mut chunk_offsets: Vec<usize> = vec![0; parameters.num_pq_chunks() + 1];
calculate_chunk_offsets(
parameters.dim(),
parameters.num_pq_chunks(),
&mut chunk_offsets,
);
let dim = NonZeroUsize::new(parameters.dim())
.ok_or_else(|| ANNError::log_pq_error("dim must be non-zero"))?;
let num_chunks = NonZeroUsize::new(parameters.num_pq_chunks())
.ok_or_else(|| ANNError::log_pq_error("num_pq_chunks must be non-zero"))?;
let chunk_offsets = ChunkOffsets::partition(dim, num_chunks).bridge_err()?;

let trainer = diskann_quantization::product::train::LightPQTrainingParameters::new(
parameters.num_centers(),
Expand All @@ -111,8 +112,7 @@ where
.train(
MatrixView::try_from(train_data, parameters.num_train(), parameters.dim())
.bridge_err()?,
diskann_quantization::views::ChunkOffsetsView::new(chunk_offsets.as_slice())
.bridge_err()?,
chunk_offsets.as_view(),
diskann_quantization::Parallelism::Rayon,
&random_provider,
&diskann_quantization::cancel::DontCancel,
Expand All @@ -125,7 +125,7 @@ where
pq_storage.write_pivot_data(
&full_pivot_data,
&centroid,
&chunk_offsets,
chunk_offsets.as_slice(),
parameters.num_centers(),
parameters.dim(),
storage_provider,
Expand Down Expand Up @@ -202,8 +202,10 @@ pub fn generate_pq_pivots_from_membuf<T: Copy + Into<f32>>(
}
}

// Calculate the chunk offsets
calculate_chunk_offsets(parameters.dim(), parameters.num_pq_chunks(), offsets);
// Calculate the chunk offsets, filling the caller-owned buffer.
let dim = NonZeroUsize::new(parameters.dim())
.ok_or_else(|| ANNError::log_pq_error("dim must be non-zero"))?;
let chunk_offsets_view = ChunkOffsetsView::partition_into(dim, offsets).bridge_err()?;

let trainer = diskann_quantization::product::train::LightPQTrainingParameters::new(
parameters.num_centers(),
Expand Down Expand Up @@ -235,7 +237,7 @@ pub fn generate_pq_pivots_from_membuf<T: Copy + Into<f32>>(
parameters.dim(),
)
.bridge_err()?,
diskann_quantization::views::ChunkOffsetsView::new(offsets).bridge_err()?,
chunk_offsets_view,
diskann_quantization::Parallelism::Rayon,
&rng_builder,
&cancelation,
Expand All @@ -249,35 +251,6 @@ pub fn generate_pq_pivots_from_membuf<T: Copy + Into<f32>>(
Ok(())
}

/// Gets all instances of a chunk from the training data for all records in the training data. Each vector in the
/// training dataset is divided into chunks and the PQ algorithm handles each vector chunk individually. This method
/// gets the same chunk from each vector in the training data and creates a new vector out of all of them.
///
/// # Example
/// See tests for examples
#[inline]
pub fn get_chunk_from_training_data(
train_data: &[f32],
num_train: usize,
raw_vector_dim: usize,
chunk_size: usize,
chunk_offset: usize,
) -> Vec<f32> {
let mut result: Vec<f32> = vec![0.0; num_train * chunk_size];

result
// group empty result data into chunks of chunk_size
.chunks_mut(chunk_size)
.enumerate()
// for each chunk, copy the chunk from the training data into the result vector
.for_each(|(chunk_number, result_chunk)| {
let train_data_start = chunk_number * raw_vector_dim + chunk_offset;
let train_data_end = train_data_start + chunk_size;
result_chunk.copy_from_slice(&train_data[train_data_start..train_data_end]);
});
result
}

/// Calculates the centroid if needed and moves the train_data to to the centroid
/// # Arguments
/// * `train_data` Dataset
Expand Down Expand Up @@ -324,36 +297,7 @@ pub fn move_train_data_by_centroid(
}
}

/// Calculate the number of chunks for the product quantization algorithm. Returns a vector of offsets where
/// each offset corresponds to a chunk based on the index of the chunk in the vector.
///
/// # Arguments
/// * `dimensions` Number of dimensions of the input data
/// * `num_pq_chunks` - Number of chunks that will be used in the PQ calculation. Each vector will be split into these
/// number of chunks and each chunk will be compressed down to one byte.
/// * `offsets` - An output vector of offsets, where the size is equal to the number of pq chunks + 1.
#[inline]
pub fn calculate_chunk_offsets(dimensions: usize, num_pq_chunks: usize, offsets: &mut [usize]) {
// Calculate each chunk's offset
// If we have 8 dimension and 3 chunks then offsets would be [0,3,6,8]
let mut chunk_offset: usize = 0;
offsets[0] = chunk_offset;
for chunk_index in 0..num_pq_chunks {
chunk_offset += dimensions / num_pq_chunks;
if chunk_index < (dimensions % num_pq_chunks) {
chunk_offset += 1;
}
offsets[chunk_index + 1] = chunk_offset;
}
}

pub fn calculate_chunk_offsets_auto(dimensions: usize, num_pq_chunks: usize) -> Vec<usize> {
let mut offsets = vec![0; num_pq_chunks + 1];
calculate_chunk_offsets(dimensions, num_pq_chunks, offsets.as_mut_slice());
offsets
}

/// Add the row `y` to every row in `x`.
/// Add `y` to every row of `x`.
///
/// # Panics
///
Expand Down Expand Up @@ -672,6 +616,29 @@ mod pq_test {
utils::{ParallelIteratorInPool, create_thread_pool_for_test, read_bin_from},
};

/// Test helper: Gets all instances of a chunk from the training data for all records
/// in the training data. Each vector in the training dataset is divided into chunks
/// and the PQ algorithm handles each vector chunk individually. This helper gets the
/// same chunk from each vector in the training data and returns it as a flat vector.
fn get_chunk_from_training_data(
train_data: &[f32],
num_train: usize,
raw_vector_dim: usize,
chunk_size: usize,
chunk_offset: usize,
) -> Vec<f32> {
let mut result: Vec<f32> = vec![0.0; num_train * chunk_size];
result
.chunks_mut(chunk_size)
.enumerate()
.for_each(|(chunk_number, result_chunk)| {
let train_data_start = chunk_number * raw_vector_dim + chunk_offset;
let train_data_end = train_data_start + chunk_size;
result_chunk.copy_from_slice(&train_data[train_data_start..train_data_end]);
});
result
}

#[test]
fn test_move_train_data_by_centroid() {
let dim = 20;
Expand Down Expand Up @@ -1077,9 +1044,12 @@ mod pq_test {

// Pre-emptively construct an offset view to compare mismatched slices.
// We want to check that the difference in the mismatched chunks is small.
let mut offsets = vec![0; num_pq_chunks + 1];
calculate_chunk_offsets(train_dim, num_pq_chunks, &mut offsets);
let offset_view = diskann_quantization::views::ChunkOffsetsView::new(&offsets).unwrap();
let chunk_offsets = ChunkOffsets::partition(
NonZeroUsize::new(train_dim).unwrap(),
NonZeroUsize::new(num_pq_chunks).unwrap(),
)
.unwrap();
let offset_view = chunk_offsets.as_view();
let full_data =
MatrixView::try_from(full_data_vector.as_slice(), num_train, train_dim).unwrap();
let pivot_view =
Expand Down
16 changes: 16 additions & 0 deletions diskann-providers/src/model/pq/views.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,22 @@ impl From<Bridge<diskann_quantization::views::ChunkOffsetError>> for ANNError {
}
}

// Compatibility with ANNError.
impl From<Bridge<diskann_quantization::views::PartitionError>> for ANNError {
#[track_caller]
fn from(value: Bridge<diskann_quantization::views::PartitionError>) -> Self {
ANNError::log_pq_error(value.into_inner())
}
}

// Compatibility with ANNError.
impl From<Bridge<diskann_quantization::views::PartitionIntoError>> for ANNError {
#[track_caller]
fn from(value: Bridge<diskann_quantization::views::PartitionIntoError>) -> Self {
ANNError::log_pq_error(value.into_inner())
}
}

// Compatibility with ANNError.
impl From<Bridge<diskann_quantization::views::ChunkViewError>> for ANNError {
#[track_caller]
Expand Down
Loading
Loading