11import contextlib
22from collections .abc import AsyncGenerator
33from datetime import datetime , timedelta , timezone
4+ from typing import Any
45
56import pytest
6- from dirty_equals import IsFloat
7+ from dirty_equals import IsFloat , IsStr
8+ from elasticsearch import AsyncElasticsearch
79from inline_snapshot import snapshot
810from key_value .shared .stores .wait import async_wait_for_true
911from key_value .shared .utils .managed_entry import ManagedEntry
1012from opensearchpy import AsyncOpenSearch
1113from typing_extensions import override
1214
15+ from key_value .aio .protocols .key_value import AsyncKeyValueProtocol
1316from key_value .aio .stores .base import BaseStore
1417from key_value .aio .stores .opensearch import OpenSearchStore
1518from key_value .aio .stores .opensearch .store import (
2124from tests .stores .base import BaseStoreTests , ContextManagerStoreTestMixin
2225
2326TEST_SIZE_LIMIT = 1 * 1024 * 1024 # 1MB
24- OS_HOST = "localhost"
25- OS_PORT = 9200
26- OS_URL = f"http://{ OS_HOST } :{ OS_PORT } "
27- OS_CONTAINER_PORT = 9200
27+ LOCALHOST = "localhost"
28+
29+ CONTAINER_PORT = 9200
30+ HOST_PORT = 19200
31+
32+ OPENSEARCH_URL = f"http://{ LOCALHOST } :{ HOST_PORT } "
33+
2834
2935WAIT_FOR_OPENSEARCH_TIMEOUT = 30
3036
3541
3642
3743def get_opensearch_client () -> AsyncOpenSearch :
38- return AsyncOpenSearch (hosts = [OS_URL ], use_ssl = False , verify_certs = False )
44+ return AsyncOpenSearch (hosts = [OPENSEARCH_URL ], use_ssl = False , verify_certs = False )
3945
4046
4147async def ping_opensearch () -> bool :
42- os_client : AsyncOpenSearch = get_opensearch_client ()
48+ opensearch_client : AsyncOpenSearch = get_opensearch_client ()
4349
44- async with os_client :
50+ async with opensearch_client :
4551 try :
46- return await os_client .ping ()
52+ return await opensearch_client .ping ()
4753 except Exception :
4854 return False
4955
@@ -69,7 +75,7 @@ def test_managed_entry_document_conversion():
6975
7076 assert document == snapshot (
7177 {
72- "value" : {"flattened " : {"test" : "test" }},
78+ "value" : {"f " : {"test" : "test" }},
7379 "created_at" : "2025-01-01T00:00:00+00:00" ,
7480 "expires_at" : "2025-01-01T00:00:10+00:00" ,
7581 }
@@ -94,7 +100,7 @@ async def setup_opensearch(self, request: pytest.FixtureRequest) -> AsyncGenerat
94100 with docker_container (
95101 f"opensearch-test-{ version } " ,
96102 os_image ,
97- {str (OS_CONTAINER_PORT ): OS_PORT },
103+ {str (CONTAINER_PORT ): HOST_PORT },
98104 {
99105 "discovery.type" : "single-node" ,
100106 "DISABLE_SECURITY_PLUGIN" : "true" ,
@@ -109,16 +115,16 @@ async def setup_opensearch(self, request: pytest.FixtureRequest) -> AsyncGenerat
109115
110116 @pytest .fixture
111117 async def opensearch_client (self , setup_opensearch : None ) -> AsyncGenerator [AsyncOpenSearch , None ]:
112- os_client = get_opensearch_client ()
118+ opensearch_client = get_opensearch_client ()
113119
114- async with os_client :
115- await cleanup_opensearch_indices (opensearch_client = os_client )
120+ async with opensearch_client :
121+ await cleanup_opensearch_indices (opensearch_client = opensearch_client )
116122
117- yield os_client
123+ yield opensearch_client
118124
119125 @override
120126 @pytest .fixture
121- async def default_store (self , opensearch_client : AsyncOpenSearch ) -> AsyncGenerator [BaseStore , None ]:
127+ async def store (self , opensearch_client : AsyncOpenSearch ) -> AsyncGenerator [BaseStore , None ]:
122128 store = OpenSearchStore (
123129 opensearch_client = opensearch_client ,
124130 index_prefix = "opensearch-kv-store-e2e-test" ,
@@ -130,40 +136,86 @@ async def default_store(self, opensearch_client: AsyncOpenSearch) -> AsyncGenera
130136
131137 @override
132138 @pytest .fixture
133- async def collection_sanitized_store (self , opensearch_client : AsyncOpenSearch ) -> AsyncGenerator [BaseStore , None ]:
139+ async def sanitizing_store (self , opensearch_client : AsyncOpenSearch ) -> AsyncGenerator [BaseStore , None ]:
134140 store = OpenSearchStore (
135141 opensearch_client = opensearch_client ,
136142 index_prefix = "opensearch-kv-store-e2e-test" ,
137143 default_collection = "test-collection" ,
144+ key_sanitization_strategy = OpenSearchV1KeySanitizationStrategy (),
138145 collection_sanitization_strategy = OpenSearchV1CollectionSanitizationStrategy (),
139146 )
140147
141148 async with store :
142149 yield store
143150
144151 @override
145- @pytest .fixture
146- async def key_sanitized_store (self , opensearch_client : AsyncOpenSearch ) -> AsyncGenerator [BaseStore , None ]:
147- store = OpenSearchStore (
148- opensearch_client = opensearch_client ,
149- index_prefix = "opensearch-kv-store-e2e-test" ,
150- default_collection = "test-collection" ,
151- key_sanitization_strategy = OpenSearchV1KeySanitizationStrategy (),
152- )
152+ @pytest .mark .timeout (120 )
153+ async def test_store (self , store : BaseStore ):
154+ """Tests that the store is a valid AsyncKeyValueProtocol."""
155+ assert isinstance (store , AsyncKeyValueProtocol ) is True
153156
154- async with store :
155- yield store
157+ @pytest .mark .skip (reason = "Distributed Caches are unbounded" )
158+ @override
159+ async def test_not_unbounded (self , store : BaseStore ): ...
156160
161+ @pytest .mark .skip (reason = "Skip concurrent tests on distributed caches" )
157162 @override
158- @pytest .fixture
159- async def fully_sanitized_store (self , opensearch_client : AsyncOpenSearch ) -> AsyncGenerator [BaseStore , None ]:
160- store = OpenSearchStore (
161- opensearch_client = opensearch_client ,
162- index_prefix = "opensearch-kv-store-e2e-test" ,
163- default_collection = "test-collection" ,
164- key_sanitization_strategy = OpenSearchV1KeySanitizationStrategy (),
165- collection_sanitization_strategy = OpenSearchV1CollectionSanitizationStrategy (),
163+ async def test_concurrent_operations (self , store : BaseStore ): ...
164+
165+ @override
166+ async def test_long_collection_name (self , store : OpenSearchStore , sanitizing_store : OpenSearchStore ): # pyright: ignore[reportIncompatibleMethodOverride]
167+ with pytest .raises (Exception ): # noqa: B017, PT011
168+ await store .put (collection = "test_collection" * 100 , key = "test_key" , value = {"test" : "test" })
169+
170+ await sanitizing_store .put (collection = "test_collection" * 100 , key = "test_key" , value = {"test" : "test" })
171+ assert await sanitizing_store .get (collection = "test_collection" * 100 , key = "test_key" ) == {"test" : "test" }
172+
173+ @override
174+ async def test_long_key_name (self , store : OpenSearchStore , sanitizing_store : OpenSearchStore ): # pyright: ignore[reportIncompatibleMethodOverride]
175+ """Tests that a long key name will not raise an error."""
176+ with pytest .raises (Exception ): # noqa: B017, PT011
177+ await store .put (collection = "test_collection" , key = "test_key" * 100 , value = {"test" : "test" })
178+
179+ await sanitizing_store .put (collection = "test_collection" , key = "test_key" * 100 , value = {"test" : "test" })
180+ assert await sanitizing_store .get (collection = "test_collection" , key = "test_key" * 100 ) == {"test" : "test" }
181+
182+ async def test_put_put_two_indices (self , store : OpenSearchStore , opensearch_client : AsyncOpenSearch ):
183+ await store .put (collection = "test_collection" , key = "test_key" , value = {"test" : "test" })
184+ await store .put (collection = "test_collection_2" , key = "test_key" , value = {"test" : "test" })
185+ assert await store .get (collection = "test_collection" , key = "test_key" ) == {"test" : "test" }
186+ assert await store .get (collection = "test_collection_2" , key = "test_key" ) == {"test" : "test" }
187+
188+ indices : dict [str , Any ] = await opensearch_client .indices .get (index = "opensearch-kv-store-e2e-test-*" )
189+ index_names : list [str ] = list (indices .keys ())
190+ assert index_names == snapshot (["opensearch-kv-store-e2e-test-test_collection" , "opensearch-kv-store-e2e-test-test_collection_2" ])
191+
192+ async def test_value_stored_as_f_object (self , store : OpenSearchStore , opensearch_client : AsyncElasticsearch ):
193+ """Verify values are stored as f objects, not JSON strings"""
194+ await store .put (collection = "test" , key = "test_key" , value = {"name" : "Alice" , "age" : 30 })
195+
196+ index_name = store ._get_index_name (collection = "test" ) # pyright: ignore[reportPrivateUsage]
197+ doc_id = store ._get_document_id (key = "test_key" ) # pyright: ignore[reportPrivateUsage]
198+
199+ response = await opensearch_client .get (index = index_name , id = doc_id )
200+ assert response .body ["_source" ] == snapshot (
201+ {
202+ "value" : {"f" : {"name" : "Alice" , "age" : 30 }},
203+ "created_at" : IsStr (min_length = 20 , max_length = 40 ),
204+ }
166205 )
167206
168- async with store :
169- yield store
207+ # Test with TTL
208+ await store .put (collection = "test" , key = "test_key" , value = {"name" : "Bob" , "age" : 25 }, ttl = 10 )
209+ response = await opensearch_client .get (index = index_name , id = doc_id )
210+ assert response .body ["_source" ] == snapshot (
211+ {
212+ "value" : {"f" : {"name" : "Bob" , "age" : 25 }},
213+ "created_at" : IsStr (min_length = 20 , max_length = 40 ),
214+ "expires_at" : IsStr (min_length = 20 , max_length = 40 ),
215+ }
216+ )
217+
218+ @override
219+ async def test_special_characters_in_collection_name (self , store : OpenSearchStore , sanitizing_store : OpenSearchStore ): # pyright: ignore[reportIncompatibleMethodOverride]
220+ """Tests that a special characters in the collection name will not raise an error."""
221+ await super ().test_special_characters_in_collection_name (store = sanitizing_store )
0 commit comments