Skip to content

Commit e30894f

Browse files
revert: remove Elasticsearch changes from OpenSearch PR
These changes were unrelated to OpenSearch support and should be in a separate PR. Co-authored-by: William Easton <[email protected]>
1 parent 313e702 commit e30894f

File tree

4 files changed

+107
-91
lines changed

4 files changed

+107
-91
lines changed

key-value/key-value-aio/src/key_value/aio/stores/elasticsearch/store.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,8 +270,6 @@ async def _setup_collection(self, *, collection: str) -> None:
270270
except BadRequestError as e:
271271
if "index_already_exists_exception" in str(e).lower():
272272
return
273-
if "resource_already_exists_exception" in str(e).lower():
274-
return
275273
raise
276274

277275
def _get_index_name(self, collection: str) -> str:

key-value/key-value-aio/tests/stores/elasticsearch/test_elasticsearch.py

Lines changed: 53 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from collections.abc import AsyncGenerator
22
from datetime import datetime, timedelta, timezone
3+
from typing import TYPE_CHECKING, Any
34

45
import pytest
56
from dirty_equals import IsFloat, IsStr
@@ -19,11 +20,14 @@
1920
from tests.conftest import docker_container, should_skip_docker_tests
2021
from tests.stores.base import BaseStoreTests, ContextManagerStoreTestMixin
2122

23+
if TYPE_CHECKING:
24+
from elastic_transport._response import ObjectApiResponse
25+
2226
TEST_SIZE_LIMIT = 1 * 1024 * 1024 # 1MB
2327
ES_HOST = "localhost"
24-
CONTAINER_PORT = 9200
25-
HOST_PORT = 19200
26-
ES_URL = f"http://{ES_HOST}:{HOST_PORT}"
28+
ES_PORT = 9200
29+
ES_URL = f"http://{ES_HOST}:{ES_PORT}"
30+
ES_CONTAINER_PORT = 9200
2731

2832
WAIT_FOR_ELASTICSEARCH_TIMEOUT = 30
2933

@@ -41,7 +45,12 @@ async def ping_elasticsearch() -> bool:
4145
es_client: AsyncElasticsearch = get_elasticsearch_client()
4246

4347
async with es_client:
44-
return await es_client.ping()
48+
if not await es_client.ping():
49+
return False
50+
51+
status: ObjectApiResponse[dict[str, Any]] = await es_client.options(ignore_status=404).cluster.health(wait_for_status="green")
52+
53+
return status.body.get("status") == "green"
4554

4655

4756
async def cleanup_elasticsearch_indices(elasticsearch_client: AsyncElasticsearch):
@@ -90,7 +99,7 @@ async def setup_elasticsearch(self, request: pytest.FixtureRequest) -> AsyncGene
9099
with docker_container(
91100
f"elasticsearch-test-{version}",
92101
es_image,
93-
{str(CONTAINER_PORT): HOST_PORT},
102+
{str(ES_CONTAINER_PORT): ES_PORT},
94103
{"discovery.type": "single-node", "xpack.security.enabled": "false"},
95104
):
96105
if not await async_wait_for_true(bool_fn=ping_elasticsearch, tries=WAIT_FOR_ELASTICSEARCH_TIMEOUT, wait_time=2):
@@ -99,6 +108,11 @@ async def setup_elasticsearch(self, request: pytest.FixtureRequest) -> AsyncGene
99108

100109
yield
101110

111+
@pytest.fixture
112+
async def es_client(self) -> AsyncGenerator[AsyncElasticsearch, None]:
113+
async with AsyncElasticsearch(hosts=[ES_URL]) as es_client:
114+
yield es_client
115+
102116
@override
103117
@pytest.fixture
104118
async def store(self) -> ElasticsearchStore:
@@ -114,11 +128,10 @@ async def sanitizing_store(self) -> ElasticsearchStore:
114128
)
115129

116130
@pytest.fixture(autouse=True)
117-
async def cleanup_elasticsearch(self):
118-
async with get_elasticsearch_client() as es_client:
119-
await cleanup_elasticsearch_indices(elasticsearch_client=es_client)
120-
yield
121-
await cleanup_elasticsearch_indices(elasticsearch_client=es_client)
131+
async def cleanup_elasticsearch_indices(self, es_client: AsyncElasticsearch):
132+
await cleanup_elasticsearch_indices(elasticsearch_client=es_client)
133+
yield
134+
await cleanup_elasticsearch_indices(elasticsearch_client=es_client)
122135

123136
@pytest.mark.skip(reason="Distributed Caches are unbounded")
124137
@override
@@ -145,19 +158,18 @@ async def test_long_key_name(self, store: ElasticsearchStore, sanitizing_store:
145158
await sanitizing_store.put(collection="test_collection", key="test_key" * 100, value={"test": "test"})
146159
assert await sanitizing_store.get(collection="test_collection", key="test_key" * 100) == {"test": "test"}
147160

148-
async def test_put_put_two_indices(self, store: ElasticsearchStore):
161+
async def test_put_put_two_indices(self, store: ElasticsearchStore, es_client: AsyncElasticsearch):
149162
await store.put(collection="test_collection", key="test_key", value={"test": "test"})
150163
await store.put(collection="test_collection_2", key="test_key", value={"test": "test"})
151164
assert await store.get(collection="test_collection", key="test_key") == {"test": "test"}
152165
assert await store.get(collection="test_collection_2", key="test_key") == {"test": "test"}
153166

154-
async with get_elasticsearch_client() as es_client:
155-
indices = await es_client.options(ignore_status=404).indices.get(index="kv-store-e2e-test-*")
156-
assert len(indices.body) == 2
157-
index_names: list[str] = [str(key) for key in indices]
158-
assert index_names == snapshot(["kv-store-e2e-test-test_collection", "kv-store-e2e-test-test_collection_2"])
167+
indices = await es_client.options(ignore_status=404).indices.get(index="kv-store-e2e-test-*")
168+
assert len(indices.body) == 2
169+
index_names: list[str] = [str(key) for key in indices]
170+
assert index_names == snapshot(["kv-store-e2e-test-test_collection", "kv-store-e2e-test-test_collection_2"])
159171

160-
async def test_value_stored_as_flattened_object(self, store: ElasticsearchStore):
172+
async def test_value_stored_as_flattened_object(self, store: ElasticsearchStore, es_client: AsyncElasticsearch):
161173
"""Verify values are stored as flattened objects, not JSON strings"""
162174
await store.put(collection="test", key="test_key", value={"name": "Alice", "age": 30})
163175

@@ -166,32 +178,30 @@ async def test_value_stored_as_flattened_object(self, store: ElasticsearchStore)
166178
index_name = store._get_index_name(collection="test") # pyright: ignore[reportPrivateUsage]
167179
doc_id = store._get_document_id(key="test_key") # pyright: ignore[reportPrivateUsage]
168180

169-
async with get_elasticsearch_client() as es_client:
170-
response = await es_client.get(index=index_name, id=doc_id)
171-
assert response.body["_source"] == snapshot(
172-
{
173-
"version": 1,
174-
"key": "test_key",
175-
"collection": "test",
176-
"value": {"flattened": {"name": "Alice", "age": 30}},
177-
"created_at": IsStr(min_length=20, max_length=40),
178-
}
179-
)
180-
181-
# Test with TTL
182-
await store.put(collection="test", key="test_key", value={"name": "Bob", "age": 25}, ttl=10)
183-
184-
response = await es_client.get(index=index_name, id=doc_id)
185-
assert response.body["_source"] == snapshot(
186-
{
187-
"version": 1,
188-
"key": "test_key",
189-
"collection": "test",
190-
"value": {"flattened": {"name": "Bob", "age": 25}},
191-
"created_at": IsStr(min_length=20, max_length=40),
192-
"expires_at": IsStr(min_length=20, max_length=40),
193-
}
194-
)
181+
response = await es_client.get(index=index_name, id=doc_id)
182+
assert response.body["_source"] == snapshot(
183+
{
184+
"version": 1,
185+
"key": "test_key",
186+
"collection": "test",
187+
"value": {"flattened": {"name": "Alice", "age": 30}},
188+
"created_at": IsStr(min_length=20, max_length=40),
189+
}
190+
)
191+
192+
# Test with TTL
193+
await store.put(collection="test", key="test_key", value={"name": "Bob", "age": 25}, ttl=10)
194+
response = await es_client.get(index=index_name, id=doc_id)
195+
assert response.body["_source"] == snapshot(
196+
{
197+
"version": 1,
198+
"key": "test_key",
199+
"collection": "test",
200+
"value": {"flattened": {"name": "Bob", "age": 25}},
201+
"created_at": IsStr(min_length=20, max_length=40),
202+
"expires_at": IsStr(min_length=20, max_length=40),
203+
}
204+
)
195205

196206
@override
197207
async def test_special_characters_in_collection_name(self, store: ElasticsearchStore, sanitizing_store: ElasticsearchStore): # pyright: ignore[reportIncompatibleMethodOverride]

key-value/key-value-sync/src/key_value/sync/code_gen/stores/elasticsearch/store.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
get_source_from_body,
3939
)
4040
except ImportError as e:
41-
msg = "ElasticsearchStore requires py-key-value-sync[elasticsearch]"
41+
msg = "ElasticsearchStore requires py-key-value-aio[elasticsearch]"
4242
raise ImportError(msg) from e
4343

4444
logger = logging.getLogger(__name__)
@@ -242,8 +242,6 @@ def _setup_collection(self, *, collection: str) -> None:
242242
except BadRequestError as e:
243243
if "index_already_exists_exception" in str(e).lower():
244244
return
245-
if "resource_already_exists_exception" in str(e).lower():
246-
return
247245
raise
248246

249247
def _get_index_name(self, collection: str) -> str:

key-value/key-value-sync/tests/code_gen/stores/elasticsearch/test_elasticsearch.py

Lines changed: 53 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
# DO NOT CHANGE! Change the original file instead.
44
from collections.abc import Generator
55
from datetime import datetime, timedelta, timezone
6+
from typing import TYPE_CHECKING, Any
67

78
import pytest
89
from dirty_equals import IsFloat, IsStr
@@ -22,11 +23,14 @@
2223
from tests.code_gen.conftest import docker_container, should_skip_docker_tests
2324
from tests.code_gen.stores.base import BaseStoreTests, ContextManagerStoreTestMixin
2425

26+
if TYPE_CHECKING:
27+
from elastic_transport._response import ObjectApiResponse
28+
2529
TEST_SIZE_LIMIT = 1 * 1024 * 1024 # 1MB
2630
ES_HOST = "localhost"
27-
CONTAINER_PORT = 9200
28-
HOST_PORT = 19200
29-
ES_URL = f"http://{ES_HOST}:{HOST_PORT}"
31+
ES_PORT = 9200
32+
ES_URL = f"http://{ES_HOST}:{ES_PORT}"
33+
ES_CONTAINER_PORT = 9200
3034

3135
WAIT_FOR_ELASTICSEARCH_TIMEOUT = 30
3236
# Released Apr 2025
@@ -42,7 +46,12 @@ def ping_elasticsearch() -> bool:
4246
es_client: Elasticsearch = get_elasticsearch_client()
4347

4448
with es_client:
45-
return es_client.ping()
49+
if not es_client.ping():
50+
return False
51+
52+
status: ObjectApiResponse[dict[str, Any]] = es_client.options(ignore_status=404).cluster.health(wait_for_status="green")
53+
54+
return status.body.get("status") == "green"
4655

4756

4857
def cleanup_elasticsearch_indices(elasticsearch_client: Elasticsearch):
@@ -91,7 +100,7 @@ def setup_elasticsearch(self, request: pytest.FixtureRequest) -> Generator[None,
91100
with docker_container(
92101
f"elasticsearch-test-{version}",
93102
es_image,
94-
{str(CONTAINER_PORT): HOST_PORT},
103+
{str(ES_CONTAINER_PORT): ES_PORT},
95104
{"discovery.type": "single-node", "xpack.security.enabled": "false"},
96105
):
97106
if not wait_for_true(bool_fn=ping_elasticsearch, tries=WAIT_FOR_ELASTICSEARCH_TIMEOUT, wait_time=2):
@@ -100,6 +109,11 @@ def setup_elasticsearch(self, request: pytest.FixtureRequest) -> Generator[None,
100109

101110
yield
102111

112+
@pytest.fixture
113+
def es_client(self) -> Generator[Elasticsearch, None, None]:
114+
with Elasticsearch(hosts=[ES_URL]) as es_client:
115+
yield es_client
116+
103117
@override
104118
@pytest.fixture
105119
def store(self) -> ElasticsearchStore:
@@ -115,11 +129,10 @@ def sanitizing_store(self) -> ElasticsearchStore:
115129
)
116130

117131
@pytest.fixture(autouse=True)
118-
def cleanup_elasticsearch(self):
119-
with get_elasticsearch_client() as es_client:
120-
cleanup_elasticsearch_indices(elasticsearch_client=es_client)
121-
yield
122-
cleanup_elasticsearch_indices(elasticsearch_client=es_client)
132+
def cleanup_elasticsearch_indices(self, es_client: Elasticsearch):
133+
cleanup_elasticsearch_indices(elasticsearch_client=es_client)
134+
yield
135+
cleanup_elasticsearch_indices(elasticsearch_client=es_client)
123136

124137
@pytest.mark.skip(reason="Distributed Caches are unbounded")
125138
@override
@@ -146,19 +159,18 @@ def test_long_key_name(self, store: ElasticsearchStore, sanitizing_store: Elasti
146159
sanitizing_store.put(collection="test_collection", key="test_key" * 100, value={"test": "test"})
147160
assert sanitizing_store.get(collection="test_collection", key="test_key" * 100) == {"test": "test"}
148161

149-
def test_put_put_two_indices(self, store: ElasticsearchStore):
162+
def test_put_put_two_indices(self, store: ElasticsearchStore, es_client: Elasticsearch):
150163
store.put(collection="test_collection", key="test_key", value={"test": "test"})
151164
store.put(collection="test_collection_2", key="test_key", value={"test": "test"})
152165
assert store.get(collection="test_collection", key="test_key") == {"test": "test"}
153166
assert store.get(collection="test_collection_2", key="test_key") == {"test": "test"}
154167

155-
with get_elasticsearch_client() as es_client:
156-
indices = es_client.options(ignore_status=404).indices.get(index="kv-store-e2e-test-*")
157-
assert len(indices.body) == 2
158-
index_names: list[str] = [str(key) for key in indices]
159-
assert index_names == snapshot(["kv-store-e2e-test-test_collection", "kv-store-e2e-test-test_collection_2"])
168+
indices = es_client.options(ignore_status=404).indices.get(index="kv-store-e2e-test-*")
169+
assert len(indices.body) == 2
170+
index_names: list[str] = [str(key) for key in indices]
171+
assert index_names == snapshot(["kv-store-e2e-test-test_collection", "kv-store-e2e-test-test_collection_2"])
160172

161-
def test_value_stored_as_flattened_object(self, store: ElasticsearchStore):
173+
def test_value_stored_as_flattened_object(self, store: ElasticsearchStore, es_client: Elasticsearch):
162174
"""Verify values are stored as flattened objects, not JSON strings"""
163175
store.put(collection="test", key="test_key", value={"name": "Alice", "age": 30})
164176

@@ -167,32 +179,30 @@ def test_value_stored_as_flattened_object(self, store: ElasticsearchStore):
167179
index_name = store._get_index_name(collection="test") # pyright: ignore[reportPrivateUsage]
168180
doc_id = store._get_document_id(key="test_key") # pyright: ignore[reportPrivateUsage]
169181

170-
with get_elasticsearch_client() as es_client:
171-
response = es_client.get(index=index_name, id=doc_id)
172-
assert response.body["_source"] == snapshot(
173-
{
174-
"version": 1,
175-
"key": "test_key",
176-
"collection": "test",
177-
"value": {"flattened": {"name": "Alice", "age": 30}},
178-
"created_at": IsStr(min_length=20, max_length=40),
179-
}
180-
)
181-
182-
# Test with TTL
183-
store.put(collection="test", key="test_key", value={"name": "Bob", "age": 25}, ttl=10)
184-
185-
response = es_client.get(index=index_name, id=doc_id)
186-
assert response.body["_source"] == snapshot(
187-
{
188-
"version": 1,
189-
"key": "test_key",
190-
"collection": "test",
191-
"value": {"flattened": {"name": "Bob", "age": 25}},
192-
"created_at": IsStr(min_length=20, max_length=40),
193-
"expires_at": IsStr(min_length=20, max_length=40),
194-
}
195-
)
182+
response = es_client.get(index=index_name, id=doc_id)
183+
assert response.body["_source"] == snapshot(
184+
{
185+
"version": 1,
186+
"key": "test_key",
187+
"collection": "test",
188+
"value": {"flattened": {"name": "Alice", "age": 30}},
189+
"created_at": IsStr(min_length=20, max_length=40),
190+
}
191+
)
192+
193+
# Test with TTL
194+
store.put(collection="test", key="test_key", value={"name": "Bob", "age": 25}, ttl=10)
195+
response = es_client.get(index=index_name, id=doc_id)
196+
assert response.body["_source"] == snapshot(
197+
{
198+
"version": 1,
199+
"key": "test_key",
200+
"collection": "test",
201+
"value": {"flattened": {"name": "Bob", "age": 25}},
202+
"created_at": IsStr(min_length=20, max_length=40),
203+
"expires_at": IsStr(min_length=20, max_length=40),
204+
}
205+
)
196206

197207
@override
198208
def test_special_characters_in_collection_name(self, store: ElasticsearchStore, sanitizing_store: ElasticsearchStore): # pyright: ignore[reportIncompatibleMethodOverride]

0 commit comments

Comments
 (0)