-
Notifications
You must be signed in to change notification settings - Fork 73
[HZ-5276] [HZ-5277] Asyncio Module Cloud Support #754
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
cbe98dc
162fd17
9931956
856e3df
35384bf
fdda120
fee5b45
fc2c38b
1772031
170cf89
539c904
22449a8
5406bc6
a417a4a
d00c480
baa3bc1
ebfc9e2
6928837
3e03cbf
51ced7a
e635b94
4f103f6
042cc58
265a2b4
293975d
2718478
58783dc
3cf9982
7e97ec7
6a558e8
a630706
c1798ea
e92936a
6ced889
c313bfa
6222c6b
120a58a
80880b8
6431acc
e9a9b5e
5334cd1
a14290a
492ccc1
e8a2600
24eb6bf
6ab9365
3f3a9c5
bfb805d
5f59992
91bf1d1
ef7570f
2128f5e
62697e3
a87a5c6
5023568
8d7eede
00a2d12
539466b
2aff5e4
284de6d
767bfd5
e673679
ab4a746
319bb35
8e325ea
eed53b3
a6d5949
f61ec8e
4446ba7
1ca7fd6
1c1699d
d9acede
bd23f41
76759ec
74a9aca
e33af1d
81b5041
2a2d6e8
6da4226
0a829d4
3cf71d5
d61731d
9e1a2e6
6c75f4a
7ad8c4d
550a006
2c8f10e
7f92a93
4a741ae
264e9fa
df561e1
a4697a4
207d9e6
f863f58
082f02c
fddd197
f2378cd
eb60360
3a9d4f5
59b667e
f86e6f0
d73b3a9
759c97e
2db5c1d
ab8ca27
b724f52
0f191c8
9093e27
f724d2d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,28 @@ | ||
| import asyncio | ||
|
|
||
| from hazelcast.asyncio import HazelcastClient | ||
|
|
||
|
|
||
| async def amain(): | ||
| client = await HazelcastClient.create_and_start( | ||
| # Set up cluster name for authentication | ||
| cluster_name="asyncio", | ||
| # Set the token of your cloud cluster | ||
| cloud_discovery_token="wE1w1USF6zOnaLVjLZwbZHxEoZJhw43yyViTbe6UBTvz4tZniA", | ||
| ssl_enabled=True, | ||
| ssl_cafile="/path/to/ca.pem", | ||
| ssl_certfile="/path/to/cert.pem", | ||
| ssl_keyfile="/path/to/key.pem", | ||
| ssl_password="05dd4498c3f", | ||
| ) | ||
| my_map = await client.get_map("map-on-the-cloud") | ||
| await my_map.put("key", "value") | ||
|
|
||
| value = await my_map.get("key") | ||
| print(value) | ||
|
|
||
| await client.shutdown() | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| asyncio.run(amain()) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -183,7 +183,7 @@ def __init__( | |
| self._cluster_id = None | ||
| self._load_balancer = None | ||
| self._use_public_ip = ( | ||
| isinstance(address_provider, DefaultAddressProvider) and config.use_public_ip | ||
| isinstance(address_provider, DefaultAsyncioAddressProvider) and config.use_public_ip | ||
| ) | ||
| # asyncio tasks are weakly referenced | ||
| # storing tasks here in order not to lose them midway | ||
|
|
@@ -385,9 +385,10 @@ async def _get_or_connect_to_address(self, address): | |
| for connection in list(self.active_connections.values()): | ||
| if connection.remote_address == address: | ||
| return connection | ||
| translated = self._translate(address) | ||
| connection = await self._create_connection(translated) | ||
| response = await self._authenticate(connection) | ||
| translated = await self._translate(address) | ||
| connection = self._create_connection(translated) | ||
| await connection._create_task | ||
| response = self._authenticate(connection) | ||
| await self._on_auth(response, connection) | ||
| return connection | ||
|
|
||
|
|
@@ -396,23 +397,24 @@ async def _get_or_connect_to_member(self, member): | |
| if connection: | ||
| return connection | ||
|
|
||
| translated = self._translate_member_address(member) | ||
| connection = await self._create_connection(translated) | ||
| response = await self._authenticate(connection) | ||
| translated = await self._translate_member_address(member) | ||
| connection = self._create_connection(translated) | ||
| await connection._create_task | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What's the reason not returning a "future" on
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still, it looks odd to me. Instead of tracking an internal field, I would make it a method which is called after initializing the object. Also, I think both connect and auth can be combined into one. Nevertheless, I won't stand on it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I would still have to call it with So that doesn't buy us much. |
||
| response = self._authenticate(connection) | ||
| await self._on_auth(response, connection) | ||
| return connection | ||
|
|
||
| async def _create_connection(self, address): | ||
| return await self._reactor.connection_factory( | ||
| def _create_connection(self, address): | ||
| return self._reactor.connection_factory( | ||
| self, | ||
| self._connection_id_generator.get_and_increment(), | ||
| address, | ||
| self._config, | ||
| self._invocation_service.handle_client_message, | ||
| ) | ||
|
|
||
| def _translate(self, address): | ||
| translated = self._address_provider.translate(address) | ||
| async def _translate(self, address): | ||
| translated = await self._address_provider.translate(address) | ||
| if not translated: | ||
| raise ValueError( | ||
| "Address provider %s could not translate address %s" | ||
|
|
@@ -421,15 +423,15 @@ def _translate(self, address): | |
|
|
||
| return translated | ||
|
|
||
| def _translate_member_address(self, member): | ||
| async def _translate_member_address(self, member): | ||
| if self._use_public_ip: | ||
| public_address = member.address_map.get(_CLIENT_PUBLIC_ENDPOINT_QUALIFIER, None) | ||
| if public_address: | ||
| return public_address | ||
|
|
||
| return member.address | ||
|
|
||
| return self._translate(member.address) | ||
| return await self._translate(member.address) | ||
|
|
||
| async def _trigger_cluster_reconnection(self): | ||
| if self._reconnect_mode == ReconnectMode.OFF: | ||
|
|
@@ -529,7 +531,8 @@ async def _sync_connect_to_cluster(self): | |
| if connection: | ||
| return | ||
|
|
||
| for address in self._get_possible_addresses(): | ||
| addresses = await self._get_possible_addresses() | ||
| for address in addresses: | ||
| self._check_client_active() | ||
| if address in tried_addresses_per_attempt: | ||
| # We already tried this address on from the member list | ||
|
|
@@ -614,6 +617,7 @@ def _authenticate(self, connection) -> asyncio.Future: | |
|
|
||
| async def _on_auth(self, response, connection): | ||
| try: | ||
| response = await response | ||
| response = client_authentication_codec.decode_response(response) | ||
| except Exception as e: | ||
| await connection.close_connection("Failed to authenticate connection", e) | ||
|
|
@@ -790,8 +794,8 @@ def _check_client_active(self): | |
| if not self._lifecycle_service.running: | ||
| raise HazelcastClientNotActiveError() | ||
|
|
||
| def _get_possible_addresses(self): | ||
| primaries, secondaries = self._address_provider.load_addresses() | ||
| async def _get_possible_addresses(self): | ||
| primaries, secondaries = await self._address_provider.load_addresses() | ||
| if self._shuffle_member_list: | ||
| # The relative order between primary and secondary addresses should | ||
| # not be changed. So we shuffle the lists separately and then add | ||
|
|
@@ -1028,17 +1032,13 @@ def __hash__(self): | |
| return self._id | ||
|
|
||
|
|
||
| class DefaultAddressProvider: | ||
| """Provides initial addresses for client to find and connect to a node. | ||
|
|
||
| It also provides a no-op translator. | ||
| """ | ||
|
|
||
| class DefaultAsyncioAddressProvider: | ||
| def __init__(self, addresses): | ||
| self._addresses = addresses | ||
|
|
||
| def load_addresses(self): | ||
| async def load_addresses(self): | ||
| """Returns the possible primary and secondary member addresses to connect to.""" | ||
| # NOTE: This method is marked with async since the caller assumes that. | ||
| configured_addresses = self._addresses | ||
|
|
||
| if not configured_addresses: | ||
|
|
@@ -1053,9 +1053,10 @@ def load_addresses(self): | |
|
|
||
| return primaries, secondaries | ||
|
|
||
| def translate(self, address): | ||
| async def translate(self, address): | ||
| """No-op address translator. | ||
|
|
||
| It is there to provide the same API with other address providers. | ||
| """ | ||
| # NOTE: This method is marked with async since the caller assumes that. | ||
| return address | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,58 @@ | ||
| import asyncio | ||
| import logging | ||
|
|
||
| from hazelcast.discovery import HazelcastCloudDiscovery | ||
|
|
||
| _logger = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class HazelcastCloudAddressProvider: | ||
| """Provides initial addresses for client to find and connect to a node | ||
| and resolves private IP addresses of Hazelcast Cloud service. | ||
| """ | ||
|
|
||
| def __init__(self, token, connection_timeout): | ||
| self.cloud_discovery = HazelcastCloudDiscovery(token, connection_timeout) | ||
| self._private_to_public = dict() | ||
|
|
||
| async def load_addresses(self): | ||
| """Loads member addresses from Hazelcast Cloud endpoint. | ||
|
|
||
| Returns: | ||
| tuple[list[hazelcast.core.Address], list[hazelcast.core.Address]]: The possible member addresses | ||
| as primary addresses to connect to. | ||
| """ | ||
| try: | ||
| nodes = await asyncio.to_thread(self.cloud_discovery.discover_nodes) | ||
| # Every private address is primary | ||
| return list(nodes.keys()), [] | ||
| except Exception as e: | ||
| _logger.warning("Failed to load addresses from Hazelcast Cloud: %s", e) | ||
| return [], [] | ||
|
|
||
| async def translate(self, address): | ||
| """Translates the given address to another address specific to network or service. | ||
|
|
||
| Args: | ||
| address (hazelcast.core.Address): Private address to be translated | ||
|
|
||
| Returns: | ||
| hazelcast.core.Address: New address if given address is known, otherwise returns None | ||
| """ | ||
| if address is None: | ||
| return None | ||
|
|
||
| public_address = self._private_to_public.get(address, None) | ||
| if public_address: | ||
| return public_address | ||
|
|
||
| await self.refresh() | ||
|
|
||
| return self._private_to_public.get(address, None) | ||
|
|
||
| async def refresh(self): | ||
| """Refreshes the internal lookup table if necessary.""" | ||
| try: | ||
| self._private_to_public = self.cloud_discovery.discover_nodes() | ||
| except Exception as e: | ||
| _logger.warning("Failed to load addresses from Hazelcast Cloud: %s", e) |
Uh oh!
There was an error while loading. Please reload this page.