diff --git a/.gitignore b/.gitignore index b3e0d2d28..716693f42 100644 --- a/.gitignore +++ b/.gitignore @@ -207,3 +207,5 @@ metricsdiff.txt annotated/ tmp_ingest/ + +data/ \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 000000000..579735b39 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,14 @@ +{ + "version": "0.2.0", + "configurations": [ + { + "name": "Test ingest nomicdb", + "env": {"PYTHONPATH":"/workspaces/unstructured-ingest/"}, + "type": "debugpy", + "request": "launch", + "program": "./test_e2e/python/test-ingest-nomicdb.py", + "console": "integratedTerminal", + "justMyCode": false + } + ] + } \ No newline at end of file diff --git a/requirements/connectors/nomicdb.in b/requirements/connectors/nomicdb.in new file mode 100644 index 000000000..228c1f246 --- /dev/null +++ b/requirements/connectors/nomicdb.in @@ -0,0 +1,3 @@ +-c ../common/constraints.txt + +nomic \ No newline at end of file diff --git a/requirements/connectors/nomicdb.txt b/requirements/connectors/nomicdb.txt new file mode 100644 index 000000000..e69de29bb diff --git a/test_e2e/python/.env.sample b/test_e2e/python/.env.sample new file mode 100644 index 000000000..aea284088 --- /dev/null +++ b/test_e2e/python/.env.sample @@ -0,0 +1,5 @@ +UNSTRUCTURED_API_KEY= +UNSTRUCTURED_API_URL= +NOMIC_ORG_NAME= +NOMIC_ADS_NAME= +NOMIC_API_KEY= \ No newline at end of file diff --git a/test_e2e/python/test-ingest-nomicdb.py b/test_e2e/python/test-ingest-nomicdb.py new file mode 100644 index 000000000..ebc41ae4a --- /dev/null +++ b/test_e2e/python/test-ingest-nomicdb.py @@ -0,0 +1,80 @@ +import os +from unstructured_client.models import operations, shared +from unstructured_ingest.connector.nomicdb import ( + NomicAccessConfig, + NomicWriteConfig, + SimpleNomicConfig +) + +from unstructured_ingest.connector.local import SimpleLocalConfig + +from unstructured_ingest.interfaces import PartitionConfig, ProcessorConfig, ReadConfig +from unstructured_ingest.runner import SharePointRunner, LocalRunner +from unstructured_ingest.runner.writers.base_writer import Writer +from unstructured_ingest.runner.writers.nomicdb import ( + NomicWriter, +) +from dotenv import load_dotenv +load_dotenv('./test_e2e/python/.env') + +def get_writer( + organisation_name: str, + dataset_name: str, + description: str, + api_key: str, + domain: str = "atlas.nomic.ai", + tenant: str = "production", + is_public: bool = False +) -> Writer: + return NomicWriter( + connector_config=SimpleNomicConfig( + organisation_name=organisation_name, + dataset_name=dataset_name, + description=description, + domain=domain, + tenant=tenant, + is_public=is_public, + access_config=NomicAccessConfig( + api_key=api_key + ), + ), + write_config=NomicWriteConfig( + num_processes=2, + batch_size=80 + ) + ) + +def main(): + """ + parse data and ingest into nomicdb and construct nomic atlas map + """ + writer = get_writer( + organisation_name=os.getenv('NOMIC_ORG_NAME'), + dataset_name=os.getenv('NOMIC_ADS_NAME'), + description='a dataset created by test-ingest-nomicdb', + api_key=os.getenv('NOMIC_API_KEY'), + is_public=True) + + runner = LocalRunner( + processor_config=ProcessorConfig( + verbose=True, + output_dir="./data/local-ingest-output/", + num_processes=2 + ), + read_config=ReadConfig(), + writer=writer, + partition_config=PartitionConfig( + partition_by_api=True, + api_key=os.getenv("UNSTRUCTURED_API_KEY"), + partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"), + strategy="auto", + ), + connector_config=SimpleLocalConfig( + input_path='./example-docs/', + recursive=False, + ), + ) + runner.run() + +if __name__ == '__main__': + main() \ No newline at end of file diff --git a/unstructured_ingest/connector/nomicdb.py b/unstructured_ingest/connector/nomicdb.py new file mode 100644 index 000000000..5b6f28cd3 --- /dev/null +++ b/unstructured_ingest/connector/nomicdb.py @@ -0,0 +1,125 @@ +import multiprocessing as mp +import typing as t +from dataclasses import dataclass + +from unstructured_ingest.enhanced_dataclass import enhanced_field +from unstructured_ingest.error import DestinationConnectionError, WriteError +from unstructured_ingest.interfaces import ( + AccessConfig, + BaseConnectorConfig, + BaseDestinationConnector, + ConfigSessionHandleMixin, + IngestDocSessionHandleMixin, + WriteConfig, +) +from unstructured_ingest.logger import logger +from unstructured_ingest.utils.data_prep import batch_generator, flatten_dict +from unstructured_ingest.utils.dep_check import requires_dependencies +import nomic +from nomic import atlas + +if t.TYPE_CHECKING: + from nomic import AtlasDataset + +@dataclass +class NomicAccessConfig(AccessConfig): + api_key: t.Optional[str] = enhanced_field(sensitive=True) + +@dataclass +class SimpleNomicConfig(ConfigSessionHandleMixin, BaseConnectorConfig): + organisation_name: str + dataset_name: str + description: str + domain: t.Optional[str] = None + tenant: t.Optional[str] = None + is_public: t.Optional[bool] = False + access_config: t.Optional[NomicAccessConfig] = None + +@dataclass +class NomicWriteConfig(WriteConfig): + batch_size: int = 50 + num_processes: int = 1 + +@dataclass +class NomicDestinationConnector(IngestDocSessionHandleMixin, BaseDestinationConnector): + write_config: NomicWriteConfig + connector_config: SimpleNomicConfig + _dataset: t.Optional["AtlasDataset"] = None + + @property + def nomic_dataset(self): + if self._dataset is None: + self._dataset = self.create_dataset() + return self._dataset + + def initialize(self): + nomic.cli.login( + token=self.connector_config.access_config.api_key, + domain=self.connector_config.domain, + tenant=self.connector_config.tenant + ) + + @requires_dependencies(["nomic"], extras="nomic") + def create_dataset(self) -> "AtlasDataset": + from nomic import AtlasDataset + + dataset = AtlasDataset( + identifier=f"{self.connector_config.organisation_name}/{self.connector_config.dataset_name}", + unique_id_field='element_id', + description=self.connector_config.description, + is_public=self.connector_config.is_public, + ) + + return dataset + + @DestinationConnectionError.wrap + def check_connection(self): + nomic.cli.login( + token=self.connector_config.access_config.api_key, domain=self.connector_config.domain, tenant=self.connector_config.tenant + ) + + @DestinationConnectionError.wrap + @requires_dependencies(["nomic"], extras="nomic") + def upsert_batch(self, batch: t.List[t.Dict[str,t.Any]]): + dataset = self.nomic_dataset + try: + dataset.add_data(list(batch)) + # logger.debug(f"Successfully add {len(batch)} into dataset {dataset.id}") + except Exception as api_error: + raise WriteError(f"Nomic error: {api_error}") from api_error + + def write_dict(self, *args, elements_dict: t.List[t.Dict[str, t.Any]], **kwargs) -> None: + logger.info( + f"Upserting {len(elements_dict)} elements to " + f"{self.connector_config.organisation_name}", + ) + + nomicdb_batch_size = self.write_config.batch_size + + logger.info(f'using {self.write_config.num_processes} processes to upload') + if self.write_config.num_processes == 1: + for chunk in batch_generator(elements_dict, nomicdb_batch_size): + self.upsert_batch(chunk) + else: + with mp.Pool( + processes=self.write_config.num_processes, + ) as pool: + pool.map(self.upsert_batch, list(batch_generator(elements_dict, nomicdb_batch_size))) + + dataset = self.nomic_dataset + dataset.create_index( + indexed_field='text', + topic_model=True, + duplicate_detection=True, + projection=None + ) + + + + def normalize_dict(self, element_dict: dict) -> dict: + return { + "element_id": element_dict['element_id'], + "text": element_dict['text'], + "type": element_dict['type'], + "filename": element_dict['metadata']['filename'] + } \ No newline at end of file diff --git a/unstructured_ingest/runner/writers/__init__.py b/unstructured_ingest/runner/writers/__init__.py index 8b07adb9e..60a842d37 100644 --- a/unstructured_ingest/runner/writers/__init__.py +++ b/unstructured_ingest/runner/writers/__init__.py @@ -21,6 +21,7 @@ from .sql import SqlWriter from .vectara import VectaraWriter from .weaviate import WeaviateWriter +from .nomicdb import NomicWriter writer_map: t.Dict[str, t.Type[Writer]] = { "astradb": AstraDBWriter, @@ -43,6 +44,7 @@ "sql": SqlWriter, "vectara": VectaraWriter, "weaviate": WeaviateWriter, + "nomic": NomicWriter } __all__ = ["writer_map"] diff --git a/unstructured_ingest/runner/writers/nomicdb.py b/unstructured_ingest/runner/writers/nomicdb.py new file mode 100644 index 000000000..4bf97dff9 --- /dev/null +++ b/unstructured_ingest/runner/writers/nomicdb.py @@ -0,0 +1,19 @@ +import typing as t +from dataclasses import dataclass + +from unstructured_ingest.interfaces import BaseDestinationConnector +from unstructured_ingest.runner.writers.base_writer import Writer + +if t.TYPE_CHECKING: + from unstructured_ingest.connector.nomic import NomicWriteConfig, SimpleNomicConfig + + +@dataclass +class NomicWriter(Writer): + write_config: "NomicWriteConfig" + connector_config: "SimpleNomicConfig" + + def get_connector_cls(self) -> t.Type[BaseDestinationConnector]: + from unstructured_ingest.connector.nomicdb import NomicDestinationConnector + + return NomicDestinationConnector