Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 43 additions & 2 deletions src/Spanner/Connection/Grpc.php
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

namespace Google\Cloud\Spanner\Connection;

use Grpc\UnaryCall;
use Google\Cloud\Core\GrpcRequestWrapper;
use Google\Cloud\Core\GrpcTrait;
use Google\Cloud\Core\LongRunning\OperationResponseTrait;
Expand All @@ -25,6 +26,7 @@
use Google\Cloud\Spanner\Operation;
use Google\Cloud\Spanner\SpannerClient as ManualSpannerClient;
use Google\Cloud\Spanner\V1\SpannerClient;
use Google\GAX\AgentHeaderDescriptor;
use Google\GAX\Serializer;
use Google\Protobuf;
use Google\Protobuf\FieldMask;
Expand All @@ -34,6 +36,7 @@
use Google\Protobuf\Value;
use Google\Spanner\Admin\Database\V1\Database;
use Google\Spanner\Admin\Instance\V1\Instance;
use Google\Spanner\V1\DeleteSessionRequest;
use Google\Spanner\V1\KeySet;
use Google\Spanner\V1\Mutation;
use Google\Spanner\V1\Mutation_Delete;
Expand Down Expand Up @@ -116,6 +119,11 @@ class Grpc implements ConnectionInterface
*/
private $longRunningGrpcClients;

/**
* @var AgentHeaderDescriptor
*/
private $headerDescriptor;

/**
* @param array $config [optional]
*/
Expand All @@ -142,16 +150,20 @@ public function __construct(array $config = [])

$config['serializer'] = $this->serializer;
$this->setRequestWrapper(new GrpcRequestWrapper($config));

$grpcConfig = $this->getGaxConfig(ManualSpannerClient::VERSION);
$this->spannerClient = isset($config['gapicSpannerClient'])
? $config['gapicSpannerClient']
: new SpannerClient($grpcConfig);
$this->instanceAdminClient = new InstanceAdminClient($grpcConfig);
$this->databaseAdminClient = new DatabaseAdminClient($grpcConfig);
$this->spannerClient = new SpannerClient($grpcConfig);
$this->operationsClient = $this->instanceAdminClient->getOperationsClient();
$this->longRunningGrpcClients = [
$this->instanceAdminClient,
$this->databaseAdminClient
];
$this->headerDescriptor = new AgentHeaderDescriptor([
'gapicVersion' => trim(file_get_contents(__DIR__ . '/../VERSION'))
]);
}

/**
Expand Down Expand Up @@ -444,6 +456,35 @@ public function deleteSession(array $args)
]);
}

/**
* Note: This should be removed once GAPIC exposes the ability to execute
* concurrent requests.
*
* @access private
* @param array $args
* @return UnaryCall
* @experimental
*/
public function deleteSessionAsync(array $args)
{
$database = $this->pluck('database', $args);
$headers = $this->headerDescriptor->getHeader()
+ $this->addResourcePrefixHeader($args, $database)['userHeaders'];
$request = new DeleteSessionRequest();
$request->setName($this->pluck('name', $args));
$credentialsCallback = $this->spannerClient
->getCredentialsHelper()
->createCallCredentialsCallback();

return $this->spannerClient
->getStub()
->DeleteSession(
$request,
$headers,
['call_credentials_callback' => $credentialsCallback]
);
}

/**
* @param array $args
* @return \Generator
Expand Down
12 changes: 12 additions & 0 deletions src/Spanner/Database.php
Original file line number Diff line number Diff line change
Expand Up @@ -1457,6 +1457,18 @@ public function identity()
];
}

/**
* Returns the underlying connection.
*
* @access private
* @return ConnectionInterface
* @experimental
*/
public function connection()
{
return $this->connection;
}

/**
* Represent the class in a more readable and digestable fashion.
*
Expand Down
152 changes: 109 additions & 43 deletions src/Spanner/Session/CacheSessionPool.php
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
use Google\Cloud\Core\Lock\SemaphoreLock;
use Google\Cloud\Core\SysvTrait;
use Google\Cloud\Spanner\Database;
use Grpc\UnaryCall;
use Psr\Cache\CacheItemPoolInterface;

/**
Expand All @@ -39,19 +40,18 @@
* recommended way to bootstrap the session pool.
*
* Sessions are created on demand up to the maximum session value set during
* instantiation of the pool. After peak usage hours, you may find that more
* sessions are available than your demand may require. It is important to make
* sure the number of active sessions managed by the Spanner backend is kept
* as minimal as possible. In order to help maintain this balance, please use
* the {@see Google\Cloud\Spanner\Session\CacheSessionPool::downsize()} method
* on an interval that matches when you expect to see a decrease in traffic.
* This will help ensure you never run into issues where the Spanner backend is
* instantiation of the pool. To help ensure the minimum number of sessions
* required are managed by the pool, attempts will be made to automatically
* downsize after every 10 minute window. This feature is configurable and one
* may also downsize at their own choosing via
* {@see Google\Cloud\Spanner\Session\CacheSessionPool::downsize()}. Downsizing
* will help ensure you never run into issues where the Spanner backend is
* locked up after having met the maximum number of sessions assigned per node.
* For reference, the current maximum sessions per database per node is 10k. For
* more information on limits please see
* [here](https://cloud.google.com/spanner/docs/limits).
*
* Additionally, when expecting a long period of inactivity (such as a
* When expecting a long period of inactivity (such as a
* maintenance window), please make sure to call
* {@see Google\Cloud\Spanner\Session\CacheSessionPool::clear()} in order to
* delete any active sessions.
Expand Down Expand Up @@ -83,9 +83,9 @@ class CacheSessionPool implements SessionPoolInterface
use SysvTrait;

const CACHE_KEY_TEMPLATE = 'cache-session-pool.%s.%s.%s';

const DURATION_TWENTY_MINUTES = 1200;
const DURATION_ONE_MINUTE = 60;
const WINDOW_SIZE = 600;

/**
* @var array
Expand All @@ -95,7 +95,8 @@ class CacheSessionPool implements SessionPoolInterface
'minSessions' => 1,
'shouldWaitForSession' => true,
'maxCyclesToWaitForSession' => 30,
'sleepIntervalSeconds' => .5
'sleepIntervalSeconds' => .5,
'shouldAutoDownsize' => true
];

/**
Expand All @@ -118,6 +119,16 @@ class CacheSessionPool implements SessionPoolInterface
*/
private $database;

/**
* @var UnaryCall[]
*/
private $deleteCalls = [];

/**
* @var array
*/
private $deleteQueue = [];

/**
* @param CacheItemPoolInterface $cacheItemPool A PSR-6 compatible cache
* implementation used to store the session data.
Expand All @@ -140,6 +151,9 @@ class CacheSessionPool implements SessionPoolInterface
* **Defaults to** a semaphore based implementation if the
* required extensions are installed, otherwise an flock based
* implementation.
* @type bool $shouldAutoDownsize Determines whether or not to
* automatically attempt to downsize the pool after every 10
* minute window. **Defaults to** `true`.
* }
* @throws \InvalidArgumentException
*/
Expand Down Expand Up @@ -228,10 +242,13 @@ public function acquire($context = SessionPoolInterface::CONTEXT_READ)

if (!$exception) {
$session = array_shift($data['queue']);

$data['inUse'][$session['name']] = $session + [
'lastActive' => $this->time()
];

if ($this->config['shouldAutoDownsize']) {
$this->manageSessionsToDelete($data);
}
}

$this->cacheItemPool->save($item->set($data));
Expand All @@ -257,6 +274,11 @@ public function acquire($context = SessionPoolInterface::CONTEXT_READ)
$session = $this->waitForNextAvailableSession();
}

if ($this->deleteQueue) {
$this->deleteSessions($this->deleteQueue);

This comment was marked as spam.

This comment was marked as spam.

$this->deleteQueue = [];
}

return $this->database->session($session['name']);
}

Expand Down Expand Up @@ -414,40 +436,24 @@ public function warmup()
/**
* Clear the cache and attempt to delete all sessions in the pool.
*
* Please note this method will attempt to synchronously delete sessions and
* will block until complete.
* A session may be removed from the cache, but still tracked as active by
* the Spanner backend if a delete operation failed. To ensure you do not
* exceed the maximum number of sessions available per node, please be sure
* to check the return value of this method to be certain all sessions have
* been deleted.
*/
public function clear()
{
$sessions = $this->config['lock']->synchronize(function () {
$sessions = [];
$item = $this->cacheItemPool->getItem($this->cacheKey);
$data = (array) $item->get() ?: $this->initialize();

foreach ($data['queue'] as $session) {
$sessions[] = $session['name'];
}

foreach ($data['inUse'] as $session) {
$sessions[] = $session['name'];
}

$sessions = $data['queue'] + $data['inUse'];
$this->cacheItemPool->clear();

return $sessions;
});

foreach ($sessions as $sessionName) {
$session = $this->database->session($sessionName);

try {
$session->delete();
} catch (\Exception $ex) {
if ($ex instanceof NotFoundException) {
continue;
}
}
}
$this->deleteSessions($sessions);
}

/**
Expand Down Expand Up @@ -557,7 +563,9 @@ private function initialize()
return [
'queue' => [],
'inUse' => [],
'toCreate' => []
'toCreate' => [],
'windowStart' => $this->time(),
'maxInUseSessions' => 0
];
}

Expand All @@ -570,17 +578,13 @@ private function initialize()
*/
private function getSessionCount(array $data)
{
$count = 0;

foreach ($data as $sessionType) {
$count += count($sessionType);
}

return $count;
return count($data['queue'])
+ count($data['inUse'])
+ count($data['toCreate']);
}

/**
* Gets the next session in the queue, clearing out which are expired.
* Gets the next session in the queue, clearing out any which are expired.
*
* @param array $data
* @return array|null
Expand All @@ -597,6 +601,10 @@ private function getSession(array &$data)
$data['inUse'][$session['name']] = $session + [
'lastActive' => $this->time()
];

if ($this->config['shouldAutoDownsize']) {
$this->manageSessionsToDelete($data);
}
}

return $session;
Expand Down Expand Up @@ -746,4 +754,62 @@ private function validateConfig()
);
}
}

/**
* Delete the provided sessions.
*
* @param array $sessions
*/
private function deleteSessions(array $sessions)
{
// gRPC calls appear to cancel when the corresponding UnaryCall object
// goes out of scope. Keeping the calls in scope allows time for the
// calls to complete at the expense of a small memory footprint.
$this->deleteCalls = [];

This comment was marked as spam.


foreach ($sessions as $session) {
$this->deleteCalls[] = $this->database->connection()
->deleteSessionAsync([
'name' => $session['name'],
'database' => $this->database->name()
]);
}
}

/**
* Checks the maximum number of sessions in use over the last window(s) then
* removes the sessions from the cache and prepares them to be deleted from
* the Spanner backend.
*
* @param array $data
*/
private function manageSessionsToDelete(array &$data)
{
$secondsSinceLastWindow = $this->time() - $data['windowStart'];
$inUseCount = count($data['inUse']);

if ($secondsSinceLastWindow < self::WINDOW_SIZE + 1) {
if ($data['maxInUseSessions'] < $inUseCount) {
$data['maxInUseSessions'] = $inUseCount;
}

return;
}

$totalCount = $inUseCount + count($data['queue']) + count($data['toCreate']);
$windowsPassed = (int) ($secondsSinceLastWindow / self::WINDOW_SIZE);
$deletionCount = min(
$totalCount - (int) round($data['maxInUseSessions'] / $windowsPassed),
$totalCount - $this->config['minSessions']
);
$data['maxInUseSessions'] = $inUseCount;
$data['windowStart'] = $this->time();

if ($deletionCount) {
$this->deleteQueue += array_splice(
$data['queue'],
(int) -$deletionCount
);
}
}
}
Loading