Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions daras_ai_v2/vector_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,15 +242,14 @@ def get_top_k_references(

def vespa_search_results_to_refs(
search_result: dict,
) -> typing.Iterable[SearchReference]:
) -> typing.Iterable[tuple[str, SearchReference]]:
for hit in search_result["root"].get("children", []):
try:
ref = EmbeddingsReference.objects.get(vespa_doc_id=hit["fields"]["id"])
ref_key = ref.url
except EmbeddingsReference.DoesNotExist:
continue
if "text/html" in ref.embedded_file.metadata.mime_type:
# logger.debug(f"Generating fragments {ref['url']} as it is a HTML file")
ref.url = generate_text_fragment_url(url=ref.url, text=ref.snippet)
yield (
ref_key,
Expand Down
62 changes: 62 additions & 0 deletions scripts/cleanup_vespa_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import typing
from datetime import timedelta

from django.utils import timezone

from daras_ai_v2 import settings
from daras_ai_v2.vector_search import get_vespa_app
from embeddings.models import EmbeddedFile

if typing.TYPE_CHECKING:
from vespa.io import VespaResponse


STALENESS_THRESHOLD_DAYS = 90
BATCH_SIZE = 1_000


def cleanup_stale_cache():
vespa = get_vespa_app()

while True:
stale_files = EmbeddedFile.objects.prefetch_related(
"embeddings_references"
).filter(
updated_at__lt=timezone.now() - timedelta(days=STALENESS_THRESHOLD_DAYS)
)[:BATCH_SIZE]
if not stale_files:
break

docs_to_delete = (
{"id": ref.vespa_doc_id}
for ef in stale_files
for ref in ef.embeddings_references.all()
)
if docs_to_delete:
total_deleted = 0

def vespa_callback(response: "VespaResponse", id: str):
nonlocal total_deleted
if response.is_successful:
total_deleted += 1
else:
print(
f"Failed to delete document {id} from Vespa: {response.status_code} - {response.get_json()}"
)

vespa.feed_iterable(
docs_to_delete,
schema=settings.VESPA_SCHEMA,
operation_type="delete",
callback=vespa_callback,
)
print(f"Deleted {total_deleted} documents from Vespa.")

deleted_per_model = EmbeddedFile.objects.filter(
id__in=[ef.id for ef in stale_files]
).delete()
print(f"Deleted EmbeddedFiles & related objects: {deleted_per_model}")


def run():
cleanup_stale_cache()