Skip to content

INT8 support #13

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 40 commits into
base: update.redisearch
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
593fe98
INT8 support
filipecosta90 Jan 8, 2025
bb323e8
Updated read_data missing args. fix log level warning on transformer
fcostaoliveira Jan 8, 2025
3358246
enabled INT8 search
filipecosta90 Jan 8, 2025
b63a4c1
using batch size 1000
filipecosta90 Jan 8, 2025
4a963b1
Adding utility to convert to int8
filipecosta90 Jan 9, 2025
f58938a
Avoid overflow
filipecosta90 Jan 9, 2025
bf0c93f
Added the gist-960-euclidean-int8 dataset to datasets.json
filipecosta90 Jan 9, 2025
547ab2c
reverted change in batch_size
filipecosta90 Jan 9, 2025
8f5e758
dont use compression on the hdf5 file for int8
filipecosta90 Jan 9, 2025
92141b0
Added the gist-960-euclidean-int8 dataset to datasets.json
filipecosta90 Jan 9, 2025
53a90dc
allow quantile .99 on quantization
filipecosta90 Jan 9, 2025
48120e0
Increased M/EF configs on int8 experiment
filipecosta90 Jan 9, 2025
82116b5
Added cohere dataset
filipecosta90 Jan 16, 2025
e8fb75a
added tqdm
filipecosta90 Jan 16, 2025
9886675
prevent overflow on cohere dataset gen
filipecosta90 Jan 16, 2025
feaaaa4
prevent overflow on cohere dataset gen
filipecosta90 Jan 16, 2025
46aadff
prevent overflow on cohere dataset gen
filipecosta90 Jan 16, 2025
803fdda
Added datasets
filipecosta90 Jan 16, 2025
e40c478
Added datasets
filipecosta90 Jan 16, 2025
74dcfe2
Added datasets
filipecosta90 Jan 16, 2025
8a3879d
Added datasets
filipecosta90 Jan 16, 2025
4fdbaaa
Added datasets
filipecosta90 Jan 16, 2025
4b569da
Added datasets
filipecosta90 Jan 16, 2025
5ad86e4
Added datasets
filipecosta90 Jan 16, 2025
47f5cde
added flat
filipecosta90 Jan 16, 2025
3fc7b87
wip on int8
filipecosta90 Jan 17, 2025
d461dab
wip on int8
filipecosta90 Jan 17, 2025
f38b877
wip on int8
filipecosta90 Jan 17, 2025
da88e93
wip on int8
filipecosta90 Jan 17, 2025
ae0627b
wip on int8
filipecosta90 Jan 17, 2025
72051e9
Cleaned up create-cohere script
filipecosta90 Jan 17, 2025
2e4b2f8
Added vector types variation
filipecosta90 Jan 17, 2025
30cf0b6
support metadata on h5 file
filipecosta90 Jan 22, 2025
4965445
FLUSHALL on Redis
filipecosta90 Jan 22, 2025
db3fbc9
Added INT8 cohere dataset with metadata
fcostaoliveira Jan 24, 2025
27bfe7d
cleaned up cohere dataset script
filipecosta90 Jan 24, 2025
2cfdc63
Adjusted dataset name
filipecosta90 Jan 24, 2025
c1cdae1
Adjusted dataset name
filipecosta90 Jan 24, 2025
b29b8a4
Added EF_CONSTRUCT 8 and 16 variations
filipecosta90 Jan 24, 2025
3362bd3
Added M 8 and 16 variations
filipecosta90 Jan 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions create-cohere.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
from datasets import load_dataset
import numpy as np
import os
import pickle
from dotenv import load_dotenv
from benchmark import DATASETS_DIR
import h5py
from redis import Redis
from redis.commands.search.query import Query
from redis.commands.search.field import TextField, VectorField
from redis.commands.search.indexDefinition import IndexDefinition, IndexType
from tqdm import tqdm
import json

# Load environment variables
load_dotenv()

# Constants
LANG = "en"
DATASET_SIZE = int(os.getenv("DATASET_SIZE", "1000000"))
VECTOR_TYPE = os.getenv("VECTOR_TYPE", "INT8").lower()
QUERIES_NUM = 1000
K = 100

dataset_embed_type_dict = {"float32": "emb", "int8": "emb_int8"}
dataset_vector_dtype_dict = {"float32": np.float32, "int8": np.int8}
dataset_name_type_dict = {
"float32": "Cohere/wikipedia-2023-11-embed-multilingual-v3",
"int8": "Cohere/wikipedia-2023-11-embed-multilingual-v3-int8-binary",
}


def create_redis_index(vector_type):
client = Redis()
try:
client.ft().dropindex(
delete_documents=True
) # Remove existing index if it exists
except:
pass
index_def = IndexDefinition(index_type=IndexType.HASH)
schema = (
TextField("_id"),
TextField("title"),
TextField("text"),
VectorField(
"vector",
"FLAT",
{"TYPE": vector_type, "DIM": 1024, "DISTANCE_METRIC": "COSINE"},
),
)
client.ft().create_index(schema, definition=index_def)
print("Redis search index created.")


def load_vectors(vector_type, vector_dtype, dataset_name, emb_fieldname):
embeddings_file = f"{vector_type}_embeddings_{DATASET_SIZE}.pkl"
if os.path.exists(embeddings_file):
with open(embeddings_file, "rb") as f:
(vectors, metadata, query_vectors) = pickle.load(f)
print(
f"Prepared {len(vectors)} dataset vectors, {len(metadata)} metadata, and {len(query_vectors)} query vectors"
)
return vectors, metadata, query_vectors
dataset = load_dataset(
dataset_name,
LANG,
split="train",
streaming=True,
)
vectors, metadata = [], []
query_vectors = []
for num, doc in tqdm(
enumerate(dataset.take(DATASET_SIZE + QUERIES_NUM)), desc="Loading dataset"
):
vector = doc[emb_fieldname]
if num >= DATASET_SIZE:
query_vectors.append(vector)
else:
vectors.append(vector)
metadata.append(
{
"_id": doc["_id"],
"title": doc.get("title", ""),
"text": doc.get("text", ""),
}
)
vectors = np.array(vectors, dtype=vector_dtype)
with open(embeddings_file, "wb") as f:
pickle.dump((vectors, metadata, query_vectors), f)
print(
f"Prepared {len(vectors)} dataset vectors, {len(metadata)} metadata, and {len(query_vectors)} query vectors"
)
return vectors, metadata, query_vectors


def ingest_vectors(vectors, metadata, vector_type):
client = Redis()
client.flushdb() # Clean DB before ingestion
create_redis_index(vector_type) # Ensure index is created before ingestion
pipeline = client.pipeline()
for i, (vector, meta) in enumerate(
tqdm(zip(vectors, metadata), desc="Ingesting vectors", total=len(vectors))
):
pipeline.hset(f"{i}", mapping={"vector": vector.tobytes(), **meta})
if i % 100 == 0:
pipeline.execute()
pipeline.execute()
print("Vector ingestion complete.")


def verify_metadata(vectors):
client = Redis()
sample_indices = np.random.choice(len(vectors), 5, replace=False)
for idx in sample_indices:
data = client.hgetall(f"{idx}")
if data:
print(f"Metadata for vector {idx}: {data}")
else:
print(f"No metadata found for vector {idx}")


def run():
vector_type = VECTOR_TYPE
dataset_name = dataset_name_type_dict[VECTOR_TYPE]
vector_dtype = dataset_vector_dtype_dict[VECTOR_TYPE]
emb_fieldname = dataset_embed_type_dict[VECTOR_TYPE]
print(
f"Creating dataset for vector type={vector_type}. Using dataset {dataset_name} and field {emb_fieldname} for the embeddings"
)
vectors, metadata, queries = load_vectors(
vector_type, vector_dtype, dataset_name, emb_fieldname
)
ingest_vectors(vectors[:DATASET_SIZE], metadata[:DATASET_SIZE], vector_type)
verify_metadata(vectors[:DATASET_SIZE])
assert len(queries) == QUERIES_NUM
assert len(vectors) == DATASET_SIZE
assert len(metadata) == DATASET_SIZE
neighbors, distances = [], []
K = 100
client_ft = Redis().ft()
q = (
Query("*=>[KNN $K @vector $vec_param AS vector_score]")
.sort_by("vector_score", asc=True)
.paging(0, K)
.return_fields("vector_score")
.dialect(4)
.timeout(12000000)
)
for query_vector in tqdm(queries, desc="Processing queries"):
params_dict = {
"vec_param": np.array(query_vector).astype(vector_dtype).tobytes(),
"K": K,
}
results = client_ft.search(q, query_params=params_dict)
nb = [int(result.id) for result in results.docs]
ds = [int(result.id) for result in results.docs]
if len(nb) != K:
print(f"wrong len {len(nb)}")
continue

neighbors.append([int(result.id) for result in results.docs])
distances.append([float(result.vector_score) for result in results.docs])
vector_dimension = len(vectors[0])
output_dir = os.path.join(
DATASETS_DIR,
f"cohere-wikipedia-{vector_dimension}-angular-{vector_type}",
)
os.makedirs(output_dir, exist_ok=True) # Ensure directory exists
output_path = os.path.join(
output_dir,
f"cohere-wikipedia-{vector_dimension}-angular-{vector_type}.hdf5",
)

metadata_json = np.array(
[json.dumps(meta) for meta in metadata[:DATASET_SIZE]], dtype="S"
)
assert len(metadata_json) == len(vectors)

with h5py.File(output_path, "w") as h5f:
h5f.create_dataset("train", data=vectors, compression=None)
h5f.create_dataset("test", data=queries, compression=None)
h5f.create_dataset(
"neighbors", data=np.array(neighbors, dtype=np.int32), compression=None
)
h5f.create_dataset(
"distances", data=np.array(distances, dtype=np.float32), compression=None
)
h5f.create_dataset("metadata", data=metadata_json, compression=None)


if __name__ == "__main__":
run()
121 changes: 121 additions & 0 deletions create-gist-960-int8.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import numpy as np
from ast import Dict
from dataset_reader.ann_h5_reader import AnnH5Reader
from benchmark import DATASETS_DIR
from dataset_reader.base_reader import BaseReader, Query, Record
import tqdm
import time
import h5py
import os

numpy_types_dict = {"float32": np.float32, "int8": np.int8, "uint8": np.uint8}


# quantize vectors pre-dimension
class ScalarQuantization:
def __init__(self, dim, precision: str = "uint8", quantile: float = 0.99):
self.N = 255 # 2^B - 1
self.dim = dim
self.precision = precision
self.quantile = quantile
if precision == "uint8":
self.offset = 0
elif precision == "int8":
self.offset = 128

def train(self, train_dataset: np.ndarray):
# Use quantiles to calculate x_min and x_max
lower_quantile = 1 - self.quantile
self.x_min = np.quantile(train_dataset, lower_quantile, axis=0)
self.x_max = np.quantile(train_dataset, self.quantile, axis=0)
self.delta = (
self.x_max - self.x_min
) / self.N # Calculate delta for each dimension

def quantize(self, dataset: np.ndarray):
q_vals = np.floor((dataset - self.x_min) / self.delta)
# use int32 to avoid overflow during offset subtraction
q_vals = np.clip(q_vals, 0, self.N).astype(np.int32)
q_vals -= self.offset
return q_vals.astype(numpy_types_dict[self.precision])

def decompress(self, x):
return (self.delta * (x + 0.5 + self.offset).astype(np.float32)) + self.x_min

def get_quantization_params(self) -> Dict:
return {"x_min": self.x_min, "x_max": self.x_max, "delta": self.delta}


if __name__ == "__main__":
import os

# h5py file 4 keys:
# `train` - float vectors (num vectors 1183514)
# `test` - float vectors (num vectors 10000)
# `neighbors` - int - indices of nearest neighbors for test (num items 10k, each item
# contains info about 100 nearest neighbors)
# `distances` - float - distances for nearest neighbors for test vectors

test_path = os.path.join(
DATASETS_DIR, "gist-960-euclidean", "gist-960-euclidean.hdf5"
)

data = AnnH5Reader(test_path).read_data()
queries = AnnH5Reader(test_path).read_queries()

train_dataset_size = 100000
full_dataset_size = 1000000
train_dataset = []
full_dataset = []
test = []
neighbors = []
distances = []
for query in tqdm.tqdm(queries):
test.append(np.array(query.vector).astype(np.float32))
neighbors.append(query.expected_result)
distances.append(query.expected_scores)

for record in tqdm.tqdm(data):
if len(full_dataset) >= full_dataset_size:
break
full_dataset.append(np.array(record.vector).astype(np.float32))
if len(train_dataset) < train_dataset_size:
train_dataset.append(np.array(record.vector).astype(np.float32))

train_dataset = np.array(
train_dataset
) # Convert list of vectors into a single NumPy array
print("n vectors = ", len(train_dataset))
print("vector shape = ", train_dataset[0].shape)
precision = "int8"
quantizer = ScalarQuantization(train_dataset[0].shape, precision, .99)
print("Creating quantizer for type = ", precision)

print("\nTraining dataset ... ")
start = time.time()
quantizer.train(train_dataset)
dur = time.time() - start
print(
f"Training took {dur} seconds. \nQuantization params = {quantizer.get_quantization_params()}"
)

# quantize dataset
full_dataset = np.array(full_dataset)
test = np.array(test)
print("\Quantizing dataset ... ")
start = time.time()
quantized_dataset = quantizer.quantize(full_dataset)
quantized_queries = quantizer.quantize(test)
dur = time.time() - start
print(f"Quantization took {dur} seconds.")
print("vector 1 shape = ", quantized_dataset[0].shape)
print("vector 1 sample = ", quantized_dataset[0])

# Create a new HDF5 file and write the data
output_path = os.path.join(DATASETS_DIR, "gist-960-euclidean-int8.hdf5")

with h5py.File(output_path, "w") as h5f:
h5f.create_dataset("train", data=quantized_dataset, compression=None)
h5f.create_dataset("test", data=quantized_queries, compression=None)
h5f.create_dataset("neighbors", data=neighbors, compression=None)
h5f.create_dataset("distances", data=distances, compression=None)
19 changes: 15 additions & 4 deletions dataset_reader/ann_h5_reader.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from typing import Iterator

import json
import h5py
import numpy as np

Expand Down Expand Up @@ -27,13 +27,24 @@ def read_queries(self) -> Iterator[Query]:
expected_scores=expected_scores.tolist(),
)

def read_data(self) -> Iterator[Record]:
def read_data(self, start_idx: int = 0, end_idx: int = None) -> Iterator[Record]:
data = h5py.File(self.path)

has_metadata = "metadata" in data # Check if metadata exists
for idx, vector in enumerate(data["train"]):
if self.normalize:
vector /= np.linalg.norm(vector)
yield Record(id=idx, vector=vector.tolist(), metadata=None)
metadata = None
if has_metadata:
try:
metadata_str = data["metadata"][idx].decode("utf-8").strip()
if metadata_str.startswith("{") and metadata_str.endswith("}"):
metadata = json.loads(metadata_str)
else:
metadata = None
except (IndexError, AttributeError, UnicodeDecodeError, json.JSONDecodeError) as e :
metadata = None # Handle cases where metadata retrieval fails

yield Record(id=idx, vector=vector.tolist(), metadata=metadata)


if __name__ == "__main__":
Expand Down
24 changes: 24 additions & 0 deletions datasets/datasets.json
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,22 @@
"path": "deep-image-96-angular/deep-image-96-angular.hdf5",
"link": "http://ann-benchmarks.com/deep-image-96-angular.hdf5"
},
{
"name": "cohere-wikipedia-1024-angular-int8",
"vector_size": 1024,
"distance": "cosine",
"type": "h5",
"path": "cohere-wikipedia-1024-angular-int8/cohere-wikipedia-1024-angular-int8.hdf5",
"link": "http://benchmarks.redislabs.s3.amazonaws.com/vecsim/cohere-wikipedia-1024-angular/cohere-wikipedia-1024-angular-int8.hdf5"
},
{
"name": "cohere-wikipedia-1024-angular-float32",
"vector_size": 1024,
"distance": "cosine",
"type": "h5",
"path": "cohere-wikipedia-1024-angular-float32/cohere-wikipedia-1024-angular-float32.hdf5",
"link": "http://benchmarks.redislabs.s3.amazonaws.com/vecsim/cohere-wikipedia-1024-angular/cohere-wikipedia-1024-angular-float32.hdf5"
},
{
"name": "gist-960-euclidean",
"vector_size": 960,
Expand All @@ -39,6 +55,14 @@
"path": "gist-960-euclidean/gist-960-euclidean.hdf5",
"link": "http://ann-benchmarks.com/gist-960-euclidean.hdf5"
},
{
"name": "gist-960-euclidean-int8",
"vector_size": 960,
"distance": "l2",
"type": "h5",
"path": "gist-960-euclidean-int8/gist-960-euclidean-int8.hdf5",
"link": "http://benchmarks.redislabs.s3.amazonaws.com/vecsim/gist-960-euclidean-int8/gist-960-euclidean-int8.hdf5"
},
{
"name": "laion-img-emb-512-1M-cosine",
"vector_size": 512,
Expand Down
2 changes: 1 addition & 1 deletion engine/clients/redis/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
REDIS_USER = os.getenv("REDIS_USER", None)
REDIS_CLUSTER = bool(int(os.getenv("REDIS_CLUSTER", 0)))
REDIS_HYBRID_POLICY = os.getenv("REDIS_HYBRID_POLICY", None)
REDIS_KEEP_DOCUMENTS = bool(os.getenv("REDIS_KEEP_DOCUMENTS", 1))
REDIS_KEEP_DOCUMENTS = bool(os.getenv("REDIS_KEEP_DOCUMENTS", 0))
REDIS_JUST_INDEX = bool(os.getenv("REDIS_JUST_INDEX", 0))
GPU_STATS = bool(int(os.getenv("GPU_STATS", 0)))
GPU_STATS_ENDPOINT = os.getenv("GPU_STATS_ENDPOINT", None)
Expand Down
Loading