Skip to content

Commit 97215a9

Browse files
GromNaNfabpot
authored andcommitted
[Lock] Split PdoStore into DoctrineDbalStore
1 parent e8b4424 commit 97215a9

13 files changed

+903
-224
lines changed

CHANGELOG.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
CHANGELOG
22
=========
33

4+
5.4.0
5+
-----
6+
7+
* added `DoctrineDbalStore` identical to `PdoStore` for `Doctrine\DBAL\Connection` or DBAL url
8+
* deprecated usage of `PdoStore` with `Doctrine\DBAL\Connection` or DBAL url
9+
* added `DoctrineDbalPostgreSqlStore` identical to `PdoPostgreSqlStore` for `Doctrine\DBAL\Connection` or DBAL url
10+
* deprecated usage of `PdoPostgreSqlStore` with `Doctrine\DBAL\Connection` or DBAL url
11+
412
5.2.0
513
-----
614

Store/DatabaseTableTrait.php

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Lock\Store;
13+
14+
use Symfony\Component\Lock\Exception\InvalidArgumentException;
15+
use Symfony\Component\Lock\Exception\InvalidTtlException;
16+
use Symfony\Component\Lock\Key;
17+
18+
/**
19+
* @internal
20+
*/
21+
trait DatabaseTableTrait
22+
{
23+
private $table = 'lock_keys';
24+
private $idCol = 'key_id';
25+
private $tokenCol = 'key_token';
26+
private $expirationCol = 'key_expiration';
27+
private $gcProbability;
28+
private $initialTtl;
29+
30+
private function init(array $options, float $gcProbability, int $initialTtl)
31+
{
32+
if ($gcProbability < 0 || $gcProbability > 1) {
33+
throw new InvalidArgumentException(sprintf('"%s" requires gcProbability between 0 and 1, "%f" given.', __METHOD__, $gcProbability));
34+
}
35+
if ($initialTtl < 1) {
36+
throw new InvalidTtlException(sprintf('"%s()" expects a strictly positive TTL, "%d" given.', __METHOD__, $initialTtl));
37+
}
38+
39+
$this->table = $options['db_table'] ?? $this->table;
40+
$this->idCol = $options['db_id_col'] ?? $this->idCol;
41+
$this->tokenCol = $options['db_token_col'] ?? $this->tokenCol;
42+
$this->expirationCol = $options['db_expiration_col'] ?? $this->expirationCol;
43+
44+
$this->gcProbability = $gcProbability;
45+
$this->initialTtl = $initialTtl;
46+
}
47+
48+
/**
49+
* Returns a hashed version of the key.
50+
*/
51+
private function getHashedKey(Key $key): string
52+
{
53+
return hash('sha256', (string) $key);
54+
}
55+
56+
private function getUniqueToken(Key $key): string
57+
{
58+
if (!$key->hasState(__CLASS__)) {
59+
$token = base64_encode(random_bytes(32));
60+
$key->setState(__CLASS__, $token);
61+
}
62+
63+
return $key->getState(__CLASS__);
64+
}
65+
66+
/**
67+
* Prune the table randomly, based on GC probability.
68+
*/
69+
private function randomlyPrune(): void
70+
{
71+
if ($this->gcProbability > 0 && (1.0 === $this->gcProbability || (random_int(0, \PHP_INT_MAX) / \PHP_INT_MAX) <= $this->gcProbability)) {
72+
$this->prune();
73+
}
74+
}
75+
}
Lines changed: 235 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the Symfony package.
5+
*
6+
* (c) Fabien Potencier <fabien@symfony.com>
7+
*
8+
* For the full copyright and license information, please view the LICENSE
9+
* file that was distributed with this source code.
10+
*/
11+
12+
namespace Symfony\Component\Lock\Store;
13+
14+
use Doctrine\DBAL\Connection;
15+
use Doctrine\DBAL\DriverManager;
16+
use Doctrine\DBAL\Platforms\PostgreSQLPlatform;
17+
use Symfony\Component\Lock\BlockingSharedLockStoreInterface;
18+
use Symfony\Component\Lock\BlockingStoreInterface;
19+
use Symfony\Component\Lock\Exception\InvalidArgumentException;
20+
use Symfony\Component\Lock\Exception\LockConflictedException;
21+
use Symfony\Component\Lock\Key;
22+
use Symfony\Component\Lock\SharedLockStoreInterface;
23+
24+
/**
25+
* DoctrineDbalPostgreSqlStore is a PersistingStoreInterface implementation using
26+
* PostgreSql advisory locks with a Doctrine DBAL Connection.
27+
*
28+
* @author Jérémy Derussé <jeremy@derusse.com>
29+
*/
30+
class DoctrineDbalPostgreSqlStore implements BlockingSharedLockStoreInterface, BlockingStoreInterface
31+
{
32+
private $conn;
33+
private static $storeRegistry = [];
34+
35+
/**
36+
* You can either pass an existing database connection a Doctrine DBAL Connection
37+
* or a URL that will be used to connect to the database.
38+
*
39+
* @param Connection|string $connOrUrl A Connection instance or Doctrine URL
40+
*
41+
* @throws InvalidArgumentException When first argument is not Connection nor string
42+
*/
43+
public function __construct($connOrUrl)
44+
{
45+
if ($connOrUrl instanceof Connection) {
46+
if (!$connOrUrl->getDatabasePlatform() instanceof PostgreSQLPlatform) {
47+
throw new InvalidArgumentException(sprintf('The adapter "%s" does not support the "%s" platform.', __CLASS__, \get_class($connOrUrl->getDatabasePlatform())));
48+
}
49+
$this->conn = $connOrUrl;
50+
} elseif (\is_string($connOrUrl)) {
51+
if (!class_exists(DriverManager::class)) {
52+
throw new InvalidArgumentException(sprintf('Failed to parse the DSN "%s". Try running "composer require doctrine/dbal".', $connOrUrl));
53+
}
54+
$this->conn = DriverManager::getConnection(['url' => $this->filterDsn($connOrUrl)]);
55+
} else {
56+
throw new \TypeError(sprintf('Argument 1 passed to "%s()" must be "%s" or string, "%s" given.', Connection::class, __METHOD__, get_debug_type($connOrUrl)));
57+
}
58+
}
59+
60+
public function save(Key $key)
61+
{
62+
// prevent concurrency within the same connection
63+
$this->getInternalStore()->save($key);
64+
65+
$sql = 'SELECT pg_try_advisory_lock(:key)';
66+
$result = $this->conn->executeQuery($sql, [
67+
'key' => $this->getHashedKey($key),
68+
]);
69+
70+
// Check if lock is acquired
71+
if (true === $result->fetchOne()) {
72+
$key->markUnserializable();
73+
// release sharedLock in case of promotion
74+
$this->unlockShared($key);
75+
76+
return;
77+
}
78+
79+
throw new LockConflictedException();
80+
}
81+
82+
public function saveRead(Key $key)
83+
{
84+
// prevent concurrency within the same connection
85+
$this->getInternalStore()->saveRead($key);
86+
87+
$sql = 'SELECT pg_try_advisory_lock_shared(:key)';
88+
$result = $this->conn->executeQuery($sql, [
89+
'key' => $this->getHashedKey($key),
90+
]);
91+
92+
// Check if lock is acquired
93+
if (true === $result->fetchOne()) {
94+
$key->markUnserializable();
95+
// release lock in case of demotion
96+
$this->unlock($key);
97+
98+
return;
99+
}
100+
101+
throw new LockConflictedException();
102+
}
103+
104+
public function putOffExpiration(Key $key, float $ttl)
105+
{
106+
// postgresql locks forever.
107+
// check if lock still exists
108+
if (!$this->exists($key)) {
109+
throw new LockConflictedException();
110+
}
111+
}
112+
113+
public function delete(Key $key)
114+
{
115+
// Prevent deleting locks own by an other key in the same connection
116+
if (!$this->exists($key)) {
117+
return;
118+
}
119+
120+
$this->unlock($key);
121+
122+
// Prevent deleting Readlocks own by current key AND an other key in the same connection
123+
$store = $this->getInternalStore();
124+
try {
125+
// If lock acquired = there is no other ReadLock
126+
$store->save($key);
127+
$this->unlockShared($key);
128+
} catch (LockConflictedException $e) {
129+
// an other key exists in this ReadLock
130+
}
131+
132+
$store->delete($key);
133+
}
134+
135+
public function exists(Key $key)
136+
{
137+
$sql = "SELECT count(*) FROM pg_locks WHERE locktype='advisory' AND objid=:key AND pid=pg_backend_pid()";
138+
$result = $this->conn->executeQuery($sql, [
139+
'key' => $this->getHashedKey($key),
140+
]);
141+
142+
if ($result->fetchOne() > 0) {
143+
// connection is locked, check for lock in internal store
144+
return $this->getInternalStore()->exists($key);
145+
}
146+
147+
return false;
148+
}
149+
150+
public function waitAndSave(Key $key)
151+
{
152+
// prevent concurrency within the same connection
153+
// Internal store does not allow blocking mode, because there is no way to acquire one in a single process
154+
$this->getInternalStore()->save($key);
155+
156+
$sql = 'SELECT pg_advisory_lock(:key)';
157+
$this->conn->executeStatement($sql, [
158+
'key' => $this->getHashedKey($key),
159+
]);
160+
161+
// release lock in case of promotion
162+
$this->unlockShared($key);
163+
}
164+
165+
public function waitAndSaveRead(Key $key)
166+
{
167+
// prevent concurrency within the same connection
168+
// Internal store does not allow blocking mode, because there is no way to acquire one in a single process
169+
$this->getInternalStore()->saveRead($key);
170+
171+
$sql = 'SELECT pg_advisory_lock_shared(:key)';
172+
$this->conn->executeStatement($sql, [
173+
'key' => $this->getHashedKey($key),
174+
]);
175+
176+
// release lock in case of demotion
177+
$this->unlock($key);
178+
}
179+
180+
/**
181+
* Returns a hashed version of the key.
182+
*/
183+
private function getHashedKey(Key $key): int
184+
{
185+
return crc32((string) $key);
186+
}
187+
188+
private function unlock(Key $key): void
189+
{
190+
do {
191+
$sql = "SELECT pg_advisory_unlock(objid::bigint) FROM pg_locks WHERE locktype='advisory' AND mode='ExclusiveLock' AND objid=:key AND pid=pg_backend_pid()";
192+
$result = $this->conn->executeQuery($sql, [
193+
'key' => $this->getHashedKey($key),
194+
]);
195+
} while (0 !== $result->rowCount());
196+
}
197+
198+
private function unlockShared(Key $key): void
199+
{
200+
do {
201+
$sql = "SELECT pg_advisory_unlock_shared(objid::bigint) FROM pg_locks WHERE locktype='advisory' AND mode='ShareLock' AND objid=:key AND pid=pg_backend_pid()";
202+
$result = $this->conn->executeQuery($sql, [
203+
'key' => $this->getHashedKey($key),
204+
]);
205+
} while (0 !== $result->rowCount());
206+
}
207+
208+
/**
209+
* Check driver and remove scheme extension from DSN.
210+
* From pgsql+advisory://server/ to pgsql://server/.
211+
*
212+
* @throws InvalidArgumentException when driver is not supported
213+
*/
214+
private function filterDsn(string $dsn): string
215+
{
216+
if (!str_contains($dsn, '://')) {
217+
throw new InvalidArgumentException(sprintf('String "%" is not a valid DSN for Doctrine DBAL.', $dsn));
218+
}
219+
220+
[$scheme, $rest] = explode(':', $dsn, 2);
221+
$driver = strtok($scheme, '+');
222+
if (!\in_array($driver, ['pgsql', 'postgres', 'postgresql'])) {
223+
throw new InvalidArgumentException(sprintf('The adapter "%s" does not support the "%s" driver.', __CLASS__, $driver));
224+
}
225+
226+
return sprintf('%s:%s', $driver, $rest);
227+
}
228+
229+
private function getInternalStore(): SharedLockStoreInterface
230+
{
231+
$namespace = spl_object_hash($this->conn);
232+
233+
return self::$storeRegistry[$namespace] ?? self::$storeRegistry[$namespace] = new InMemoryStore();
234+
}
235+
}

0 commit comments

Comments
 (0)