|
| 1 | +import asyncio |
1 | 2 | import json |
2 | 3 |
|
3 | 4 | import weaviate |
4 | 5 | from weaviate.classes.config import DataType, Property |
5 | 6 | from weaviate.embedded import EmbeddedOptions |
| 7 | +from weaviate.util import generate_uuid5 |
6 | 8 |
|
7 | | -from utils.embedding_util import generate_embeddings |
8 | | - |
9 | | -json_files = [ |
10 | | - "data/archived.jsonl", |
11 | | - "data/deprecated.jsonl", |
12 | | - "data/malicious.jsonl", |
13 | | -] |
14 | | - |
15 | | - |
16 | | -def setup_schema(client): |
17 | | - if not client.collections.exists("Package"): |
18 | | - client.collections.create( |
19 | | - "Package", |
20 | | - properties=[ |
21 | | - Property(name="name", data_type=DataType.TEXT), |
22 | | - Property(name="type", data_type=DataType.TEXT), |
23 | | - Property(name="status", data_type=DataType.TEXT), |
24 | | - Property(name="description", data_type=DataType.TEXT), |
25 | | - ], |
26 | | - ) |
| 9 | +from codegate.inference.inference_engine import LlamaCppInferenceEngine |
27 | 10 |
|
28 | 11 |
|
29 | | -def generate_vector_string(package): |
30 | | - vector_str = f"{package['name']}" |
31 | | - # add description |
32 | | - package_url = "" |
33 | | - if package["type"] == "pypi": |
34 | | - vector_str += " is a Python package available on PyPI" |
35 | | - package_url = f"https://trustypkg.dev/pypi/{package['name']}" |
36 | | - elif package["type"] == "npm": |
37 | | - vector_str += " is a JavaScript package available on NPM" |
38 | | - package_url = f"https://trustypkg.dev/npm/{package['name']}" |
39 | | - elif package["type"] == "go": |
40 | | - vector_str += " is a Go package. " |
41 | | - package_url = f"https://trustypkg.dev/go/{package['name']}" |
42 | | - elif package["type"] == "crates": |
43 | | - vector_str += " is a Rust package available on Crates. " |
44 | | - package_url = f"https://trustypkg.dev/crates/{package['name']}" |
45 | | - elif package["type"] == "java": |
46 | | - vector_str += " is a Java package. " |
47 | | - package_url = f"https://trustypkg.dev/java/{package['name']}" |
48 | | - |
49 | | - # add extra status |
50 | | - if package["status"] == "archived": |
51 | | - vector_str += f". However, this package is found to be archived and no longer \ |
52 | | -maintained. For additional information refer to {package_url}" |
53 | | - elif package["status"] == "deprecated": |
54 | | - vector_str += f". However, this package is found to be deprecated and no \ |
55 | | -longer recommended for use. For additional information refer to {package_url}" |
56 | | - elif package["status"] == "malicious": |
57 | | - vector_str += f". However, this package is found to be malicious. For \ |
58 | | -additional information refer to {package_url}" |
59 | | - return vector_str |
60 | | - |
61 | | - |
62 | | -def add_data(client): |
63 | | - collection = client.collections.get("Package") |
64 | | - |
65 | | - # read all the data from db, we will only add if there is no data, or is different |
66 | | - existing_packages = list(collection.iterator()) |
67 | | - packages_dict = {} |
68 | | - for package in existing_packages: |
69 | | - key = package.properties["name"] + "/" + package.properties["type"] |
70 | | - value = { |
71 | | - "status": package.properties["status"], |
72 | | - "description": package.properties["description"], |
| 12 | +class PackageImporter: |
| 13 | + def __init__(self): |
| 14 | + self.client = weaviate.WeaviateClient( |
| 15 | + embedded_options=EmbeddedOptions( |
| 16 | + persistence_data_path="./weaviate_data", |
| 17 | + grpc_port=50052 |
| 18 | + ) |
| 19 | + ) |
| 20 | + self.json_files = [ |
| 21 | + "data/archived.jsonl", |
| 22 | + "data/deprecated.jsonl", |
| 23 | + "data/malicious.jsonl", |
| 24 | + ] |
| 25 | + self.client.connect() |
| 26 | + self.inference_engine = LlamaCppInferenceEngine() |
| 27 | + self.model_path = "./models/all-minilm-L6-v2-q5_k_m.gguf" |
| 28 | + |
| 29 | + def setup_schema(self): |
| 30 | + if not self.client.collections.exists("Package"): |
| 31 | + self.client.collections.create( |
| 32 | + "Package", |
| 33 | + properties=[ |
| 34 | + Property(name="name", data_type=DataType.TEXT), |
| 35 | + Property(name="type", data_type=DataType.TEXT), |
| 36 | + Property(name="status", data_type=DataType.TEXT), |
| 37 | + Property(name="description", data_type=DataType.TEXT), |
| 38 | + ], |
| 39 | + ) |
| 40 | + |
| 41 | + def generate_vector_string(self, package): |
| 42 | + vector_str = f"{package['name']}" |
| 43 | + package_url = "" |
| 44 | + type_map = { |
| 45 | + "pypi": "Python package available on PyPI", |
| 46 | + "npm": "JavaScript package available on NPM", |
| 47 | + "go": "Go package", |
| 48 | + "crates": "Rust package available on Crates", |
| 49 | + "java": "Java package" |
| 50 | + } |
| 51 | + status_messages = { |
| 52 | + "archived": "However, this package is found to be archived and no longer maintained.", |
| 53 | + "deprecated": "However, this package is found to be deprecated and no longer " |
| 54 | + "recommended for use.", |
| 55 | + "malicious": "However, this package is found to be malicious." |
| 56 | + } |
| 57 | + vector_str += f" is a {type_map.get(package['type'], 'unknown type')} " |
| 58 | + package_url = f"https://trustypkg.dev/{package['type']}/{package['name']}" |
| 59 | + |
| 60 | + # Add extra status |
| 61 | + status_suffix = status_messages.get(package["status"], "") |
| 62 | + if status_suffix: |
| 63 | + vector_str += f"{status_suffix} For additional information refer to {package_url}" |
| 64 | + return vector_str |
| 65 | + |
| 66 | + async def process_package(self, batch, package): |
| 67 | + vector_str = self.generate_vector_string(package) |
| 68 | + vector = await self.inference_engine.embed(self.model_path, [vector_str]) |
| 69 | + # This is where the synchronous call is made |
| 70 | + batch.add_object(properties=package, vector=vector[0]) |
| 71 | + |
| 72 | + async def add_data(self): |
| 73 | + collection = self.client.collections.get("Package") |
| 74 | + existing_packages = list(collection.iterator()) |
| 75 | + packages_dict = { |
| 76 | + f"{package.properties['name']}/{package.properties['type']}": { |
| 77 | + "status": package.properties["status"], |
| 78 | + "description": package.properties["description"] |
| 79 | + } for package in existing_packages |
73 | 80 | } |
74 | | - packages_dict[key] = value |
75 | | - |
76 | | - for json_file in json_files: |
77 | | - with open(json_file, "r") as f: |
78 | | - print("Adding data from", json_file) |
79 | 81 |
|
80 | | - # temporary, just for testing |
81 | | - with collection.batch.dynamic() as batch: |
| 82 | + for json_file in self.json_files: |
| 83 | + with open(json_file, "r") as f: |
| 84 | + print("Adding data from", json_file) |
| 85 | + packages_to_insert = [] |
82 | 86 | for line in f: |
83 | 87 | package = json.loads(line) |
| 88 | + package["status"] = json_file.split('/')[-1].split('.')[0] |
| 89 | + key = f"{package['name']}/{package['type']}" |
| 90 | + |
| 91 | + if key in packages_dict and packages_dict[key] == { |
| 92 | + "status": package["status"], |
| 93 | + "description": package["description"] |
| 94 | + }: |
| 95 | + print("Package already exists", key) |
| 96 | + continue |
| 97 | + |
| 98 | + vector_str = self.generate_vector_string(package) |
| 99 | + vector = await self.inference_engine.embed(self.model_path, [vector_str]) |
| 100 | + packages_to_insert.append((package, vector[0])) |
| 101 | + |
| 102 | + # Synchronous batch insert after preparing all data |
| 103 | + with collection.batch.dynamic() as batch: |
| 104 | + for package, vector in packages_to_insert: |
| 105 | + batch.add_object(properties=package, vector=vector, |
| 106 | + uuid=generate_uuid5(package)) |
84 | 107 |
|
85 | | - # now add the status column |
86 | | - if "archived" in json_file: |
87 | | - package["status"] = "archived" |
88 | | - elif "deprecated" in json_file: |
89 | | - package["status"] = "deprecated" |
90 | | - elif "malicious" in json_file: |
91 | | - package["status"] = "malicious" |
92 | | - else: |
93 | | - package["status"] = "unknown" |
94 | | - |
95 | | - # check for the existing package and only add if different |
96 | | - key = package["name"] + "/" + package["type"] |
97 | | - if key in packages_dict: |
98 | | - if ( |
99 | | - packages_dict[key]["status"] == package["status"] |
100 | | - and packages_dict[key]["description"] |
101 | | - == package["description"] |
102 | | - ): |
103 | | - print("Package already exists", key) |
104 | | - continue |
105 | | - |
106 | | - # prepare the object for embedding |
107 | | - print("Generating data for", key) |
108 | | - vector_str = generate_vector_string(package) |
109 | | - vector = generate_embeddings(vector_str) |
110 | | - |
111 | | - batch.add_object(properties=package, vector=vector) |
112 | | - |
113 | | - |
114 | | -def run_import(): |
115 | | - client = weaviate.WeaviateClient( |
116 | | - embedded_options=EmbeddedOptions( |
117 | | - persistence_data_path="./weaviate_data", grpc_port=50052 |
118 | | - ), |
119 | | - ) |
120 | | - with client: |
121 | | - client.connect() |
122 | | - print("is_ready:", client.is_ready()) |
123 | | - |
124 | | - setup_schema(client) |
125 | | - add_data(client) |
| 108 | + async def run_import(self): |
| 109 | + self.setup_schema() |
| 110 | + await self.add_data() |
126 | 111 |
|
127 | 112 |
|
128 | 113 | if __name__ == "__main__": |
129 | | - run_import() |
| 114 | + importer = PackageImporter() |
| 115 | + asyncio.run(importer.run_import()) |
| 116 | + try: |
| 117 | + assert importer.client.is_live() |
| 118 | + pass |
| 119 | + finally: |
| 120 | + importer.client.close() |
0 commit comments