Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
bd20a49
moved channel pooling into own file
daniel-sanche Jun 8, 2023
a01910b
added dynamic pooling and tracked channel classes
daniel-sanche Jun 8, 2023
0599799
use contextmanager for tracking rpcs
daniel-sanche Jun 9, 2023
919c1b1
added pool options
daniel-sanche Jun 9, 2023
1d2ecab
adjust next idx
daniel-sanche Jun 9, 2023
338a436
added background resize task
daniel-sanche Jun 9, 2023
711f9c0
gneratlized channel warmup
daniel-sanche Jun 9, 2023
097fd49
ran black
daniel-sanche Jun 9, 2023
91ffe0e
moved channel pooling out of gapic
daniel-sanche Jun 9, 2023
05915d5
made it easy to pass custom transports and channels
daniel-sanche Jun 9, 2023
0099503
client uses new dynamic pool
daniel-sanche Jun 9, 2023
4ed4ab2
improved channel pooling
daniel-sanche Jun 9, 2023
ab29b99
improved client to use new API
daniel-sanche Jun 9, 2023
5c3eb17
moved replace channel fully into client
daniel-sanche Jun 9, 2023
1c31799
moved channel refresh into separate grpc subclass
daniel-sanche Jun 9, 2023
1816ad1
added refreshable channel
daniel-sanche Jun 9, 2023
6ee52ea
moved channel clean-up into one place
daniel-sanche Jun 9, 2023
5bd973f
got tests to run
daniel-sanche Jun 9, 2023
c4b17fd
warm channel as part of create
daniel-sanche Jun 10, 2023
ac96c4c
added separate warm callback
daniel-sanche Jun 10, 2023
6fa3a29
refactoring
daniel-sanche Jun 10, 2023
dcc8572
allow warming for single instance
daniel-sanche Jun 12, 2023
438c0fa
fixed style issues
daniel-sanche Jun 12, 2023
8246297
updated gapic submodule
daniel-sanche Jun 12, 2023
e9c9f54
create channel using lambda
daniel-sanche Jun 12, 2023
234d12c
raise warnings when called outside async context
daniel-sanche Jun 12, 2023
0e1881f
got existing unit tests working
daniel-sanche Jun 12, 2023
4df6dac
tracked channel returns Call subclasses
daniel-sanche Jun 13, 2023
852f938
created super classes for all channel wrappers
daniel-sanche Jun 13, 2023
b6348c7
improced next_channel logic
daniel-sanche Jun 13, 2023
6ae98c8
created multicallable shared wrapper
daniel-sanche Jun 13, 2023
c5457d9
cleaned up client channel creation
daniel-sanche Jun 13, 2023
3a6ddea
raise exception if create_channel_fn not given
daniel-sanche Jun 13, 2023
359afc0
start background tasks on client aenter
daniel-sanche Jun 13, 2023
fea513b
added tests for _WrappedChannel
daniel-sanche Jun 13, 2023
d1c3e30
got wrapped channel tests running on refreshable channel
daniel-sanche Jun 13, 2023
de55179
added tests for refreshable channel
daniel-sanche Jun 14, 2023
1df445e
added tests for background functions
daniel-sanche Jun 14, 2023
3208449
completed and tested tracked_channel class
daniel-sanche Jun 14, 2023
146a880
simplified wrapped multicallable
daniel-sanche Jun 14, 2023
61f3d6a
added missing files
daniel-sanche Jun 14, 2023
c1c94de
fixed bugs in channels
daniel-sanche Jun 16, 2023
7860cad
added tests for pooled channels
daniel-sanche Jun 16, 2023
34ebb85
created dynamic pool test class with inherited test cases
daniel-sanche Jun 16, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gapic-generator-fork
161 changes: 161 additions & 0 deletions google/cloud/bigtable/_channel_pooling/dynamic_pooled_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
# -*- coding: utf-8 -*-
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

from typing import Any, Callable, Coroutine

import asyncio
from dataclasses import dataclass

from grpc.experimental import aio # type: ignore

from .pooled_channel import PooledChannel
from .pooled_channel import StaticPoolOptions
from .tracked_channel import TrackedChannel

from google.cloud.bigtable._channel_pooling.wrapped_channel import _BackgroundTaskMixin


@dataclass
class DynamicPoolOptions:
# starting channel count
start_size: int = 3
# maximum channels to keep in the pool
max_channels: int = 10
# minimum channels in pool
min_channels: int = 1
# if rpcs exceed this number, pool may expand
max_rpcs_per_channel: int = 100
# if rpcs exceed this number, pool may shrink
min_rpcs_per_channel: int = 50
# how many channels to add/remove in a single resize event
max_resize_delta: int = 2
# how many seconds to wait between resize attempts
pool_refresh_interval: float = 60.0


class DynamicPooledChannel(PooledChannel, _BackgroundTaskMixin):
def __init__(
self,
*args,
create_channel_fn: Callable[..., aio.Channel] | None = None,
pool_options: StaticPoolOptions | DynamicPoolOptions | None = None,
warm_channel_fn: Callable[[aio.Channel], Coroutine[Any, Any, Any]]
| None = None,
on_remove: Callable[[aio.Channel], Coroutine[Any, Any, Any]] | None = None,
**kwargs,
):
if create_channel_fn is None:
raise ValueError("create_channel_fn is required")
if isinstance(pool_options, StaticPoolOptions):
raise ValueError(
"DynamicPooledChannel cannot be initialized with StaticPoolOptions"
)
self._pool: list[TrackedChannel] = []
self._pool_options = pool_options or DynamicPoolOptions()
# create the pool
PooledChannel.__init__(
self,
# create options for starting pool
pool_options=StaticPoolOptions(pool_size=self._pool_options.start_size),
# all channels must be TrackChannels
create_channel_fn=lambda: TrackedChannel(create_channel_fn(*args, **kwargs)), # type: ignore
)
# register callbacks
self._on_remove = on_remove
self._warm_channel = warm_channel_fn
# start background resize task
self._background_task: asyncio.Task[None] | None = None
self.start_background_task()

def _background_coroutine(self) -> Coroutine[Any, Any, None]:
return self._resize_routine(interval=self._pool_options.pool_refresh_interval)

@property
def _task_description(self) -> str:
return "Automatic channel pool resizing"

async def _resize_routine(self, interval: float = 60):
close_tasks: list[asyncio.Task[None]] = []
while True:
await asyncio.sleep(60)
added, removed = self._attempt_resize()
# warm up new channels immediately
if self._warm_channel:
for channel in added:
await self._warm_channel(channel)
# clear completed tasks from list
close_tasks = [t for t in close_tasks if not t.done()]
# add new tasks to close unneeded channels in the background
if self._on_remove:
for channel in removed:
close_routine = self._on_remove(channel)
close_tasks.append(asyncio.create_task(close_routine))

def _attempt_resize(self) -> tuple[list[TrackedChannel], list[TrackedChannel]]:
"""
Called periodically to resize the number of channels based on
the number of active RPCs
"""
added_list, removed_list = [], []
# estimate the peak rpcs since last resize
# peak finds max active value for each channel since last check
estimated_peak = sum(
[channel.get_and_reset_max_active_rpcs() for channel in self._pool]
)
# find the minimum number of channels to serve the peak
min_channels = estimated_peak // self._pool_options.max_rpcs_per_channel
# find the maxiumum channels we'd want to serve the peak
max_channels = estimated_peak // max(self._pool_options.min_rpcs_per_channel, 1)
# clamp the number of channels to the min and max
min_channels = max(min_channels, self.options.min_channels)
max_channels = min(max_channels, self.options.max_channels)
# Only resize the pool when thresholds are crossed
current_size = len(self._pool)
if current_size < min_channels or current_size > max_channels:
# try to aim for the middle of the bound, but limit rate of change.
tentative_target = (max_channels + min_channels) // 2
delta = tentative_target - current_size
dampened_delta = min(
max(delta, -self.options.max_resize_delta),
self.options.max_resize_delta,
)
dampened_target = current_size + dampened_delta
if dampened_target > current_size:
added_list = [self.create_channel() for _ in range(dampened_delta)]
self._pool.extend(added_list)
elif dampened_target < current_size:
# reset the next_idx if needed
if self._next_idx >= dampened_target:
self._next_idx = 0
# trim pool to the right size
self._pool, removed_list = (
self._pool[:dampened_target],
self._pool[dampened_target:],
)
return added_list, removed_list

async def __aenter__(self):
await _BackgroundTaskMixin.__aenter__(self)
await PooledChannel.__aenter__(self)
return self

async def close(self, grace=None):
await _BackgroundTaskMixin.close(self, grace)
await PooledChannel.close(self, grace)

async def __aexit__(self, *args, **kwargs):
await _BackgroundTaskMixin.__aexit__(self, *args, **kwargs)
await PooledChannel.__aexit__(self, *args, **kwargs)
132 changes: 132 additions & 0 deletions google/cloud/bigtable/_channel_pooling/pooled_channel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
# -*- coding: utf-8 -*-
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import annotations

from typing import (
Callable,
)
import asyncio
from dataclasses import dataclass
from functools import partial

import grpc # type: ignore
from grpc.experimental import aio # type: ignore
from google.cloud.bigtable._channel_pooling.wrapped_channel import (
WrappedUnaryUnaryMultiCallable,
)
from google.cloud.bigtable._channel_pooling.wrapped_channel import (
WrappedUnaryStreamMultiCallable,
)
from google.cloud.bigtable._channel_pooling.wrapped_channel import (
WrappedStreamUnaryMultiCallable,
)
from google.cloud.bigtable._channel_pooling.wrapped_channel import (
WrappedStreamStreamMultiCallable,
)


@dataclass
class StaticPoolOptions:
pool_size: int = 3


class PooledChannel(aio.Channel):
def __init__(
self,
*args,
create_channel_fn: Callable[..., aio.Channel] | None = None,
pool_options: StaticPoolOptions | None = None,
**kwargs,
):
if create_channel_fn is None:
raise ValueError("create_channel_fn is required")
self._pool: list[aio.Channel] = []
self._next_idx = 0
self._create_channel: Callable[[], aio.Channel] = partial(
create_channel_fn, *args, **kwargs
)
pool_options = pool_options or StaticPoolOptions()
for i in range(pool_options.pool_size):
self._pool.append(self._create_channel())

def next_channel(self) -> aio.Channel:
next_idx = self._next_idx if self._next_idx < len(self._pool) else 0
channel = self._pool[next_idx]
self._next_idx = (next_idx + 1) % len(self._pool)
return channel

def unary_unary(self, *args, **kwargs) -> grpc.aio.UnaryUnaryMultiCallable:
return WrappedUnaryUnaryMultiCallable(
lambda *call_args, **call_kwargs: self.next_channel().unary_unary(
*args, **kwargs
)(*call_args, **call_kwargs)
)

def unary_stream(self, *args, **kwargs) -> grpc.aio.UnaryStreamMultiCallable:
return WrappedUnaryStreamMultiCallable(
lambda *call_args, **call_kwargs: self.next_channel().unary_stream(
*args, **kwargs
)(*call_args, **call_kwargs)
)

def stream_unary(self, *args, **kwargs) -> grpc.aio.StreamUnaryMultiCallable:
return WrappedStreamUnaryMultiCallable(
lambda *call_args, **call_kwargs: self.next_channel().stream_unary(
*args, **kwargs
)(*call_args, **call_kwargs)
)

def stream_stream(self, *args, **kwargs) -> grpc.aio.StreamStreamMultiCallable:
return WrappedStreamStreamMultiCallable(
lambda *call_args, **call_kwargs: self.next_channel().stream_stream(
*args, **kwargs
)(*call_args, **call_kwargs)
)

async def close(self, grace=None):
close_fns = [channel.close(grace=grace) for channel in self._pool]
await asyncio.gather(*close_fns)

async def channel_ready(self):
ready_fns = [channel.channel_ready() for channel in self._pool]
await asyncio.gather(*ready_fns)

async def __aenter__(self):
for channel in self._pool:
await channel.__aenter__()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
for channel in self._pool:
await channel.__aexit__(exc_type, exc_val, exc_tb)

def get_state(self, try_to_connect: bool = False) -> grpc.ChannelConnectivity:
raise NotImplementedError("undefined for pool of channels")

async def wait_for_state_change(self, last_observed_state):
raise NotImplementedError("undefined for pool of channels")

def index_of(self, channel) -> int:
try:
return self._pool.index(channel)
except ValueError:
return -1

@property
def channels(self) -> list[aio.Channel]:
return self._pool

def __getitem__(self, item: int) -> aio.Channel:
return self._pool[item]
Loading