diff --git a/daras_ai_v2/vector_search.py b/daras_ai_v2/vector_search.py index 87471948b..110f760fd 100644 --- a/daras_ai_v2/vector_search.py +++ b/daras_ai_v2/vector_search.py @@ -242,7 +242,7 @@ def get_top_k_references( def vespa_search_results_to_refs( search_result: dict, -) -> typing.Iterable[SearchReference]: +) -> typing.Iterable[tuple[str, SearchReference]]: for hit in search_result["root"].get("children", []): try: ref = EmbeddingsReference.objects.get(vespa_doc_id=hit["fields"]["id"]) @@ -250,7 +250,6 @@ def vespa_search_results_to_refs( except EmbeddingsReference.DoesNotExist: continue if "text/html" in ref.embedded_file.metadata.mime_type: - # logger.debug(f"Generating fragments {ref['url']} as it is a HTML file") ref.url = generate_text_fragment_url(url=ref.url, text=ref.snippet) yield ( ref_key, diff --git a/scripts/cleanup_vespa_db.py b/scripts/cleanup_vespa_db.py new file mode 100644 index 000000000..5536626da --- /dev/null +++ b/scripts/cleanup_vespa_db.py @@ -0,0 +1,66 @@ +import typing +from datetime import timedelta + +from django.utils import timezone + +from daras_ai_v2 import settings +from daras_ai_v2.vector_search import get_vespa_app +from embeddings.models import EmbeddedFile + +if typing.TYPE_CHECKING: + from vespa.io import VespaResponse + + +STALENESS_THRESHOLD_DAYS = 90 +BATCH_SIZE = 1_000 + + +def cleanup_stale_cache(): + vespa = get_vespa_app() + + while True: + stale_qs = ( + EmbeddedFile.objects.prefetch_related("embeddings_references") + .filter( + updated_at__lt=timezone.now() - timedelta(days=STALENESS_THRESHOLD_DAYS) + ) + .order_by("updated_at")[:BATCH_SIZE] + ) + stale_files = list(stale_qs) + if not stale_files: + break + + docs_to_delete = ( + {"id": ref.vespa_doc_id} + for ef in stale_files + for ref in ef.embeddings_references.all() + ) + total_deleted = 0 + + def vespa_callback(response: "VespaResponse", id: str): + nonlocal total_deleted + if response.is_successful(): + total_deleted += 1 + else: + print(f"""\ + Failed to delete document {id} from Vespa. + Status code: {response.get_status_code()} + JSON: {response.get_json()} + """) + + vespa.feed_iterable( + docs_to_delete, + schema=settings.VESPA_SCHEMA, + operation_type="delete", + callback=vespa_callback, + ) + print(f"Deleted {total_deleted} documents from Vespa.") + + deleted_per_model = EmbeddedFile.objects.filter( + id__in=[ef.id for ef in stale_files] + ).delete() + print(f"Deleted EmbeddedFiles & related objects: {deleted_per_model}") + + +def run(): + cleanup_stale_cache()