Skip to content

Commit 8277166

Browse files
committed
Initial inclusion of IndexRotator.
1 parent 4cce6c8 commit 8277166

File tree

4 files changed

+387
-0
lines changed

4 files changed

+387
-0
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace Zumba\ElasticsearchRotator\Exception;
4+
5+
class MissingPrimaryIndex extends \RuntimeException
6+
{
7+
8+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
<?php
2+
3+
namespace Zumba\ElasticsearchRotator\Exception;
4+
5+
class PrimaryIndexCopyFailure extends \RuntimeException
6+
{
7+
8+
}

src/IndexRotator.php

Lines changed: 252 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,252 @@
1+
<?php
2+
3+
namespace Zumba\ElasticsearchRotator;
4+
5+
use Psr\Log\LoggerInterface;
6+
use Psr\Log\NullLogger;
7+
use Elasticsearch\Client;
8+
9+
class IndexRotator
10+
{
11+
const INDEX_NAME_CONFIG = '.%s_configuration';
12+
const TYPE_CONFIGURATION = 'configuration';
13+
const PRIMARY_ID = 'primary';
14+
const RETRY_TIME_COPY = 500000;
15+
const MAX_RETRY_COUNT = 5;
16+
17+
/**
18+
* Elasticsearch client instance.
19+
*
20+
* @var \Elasticsearch\Client
21+
*/
22+
private $engine;
23+
24+
/**
25+
* Prefix identifier for this index.
26+
*
27+
* @var string
28+
*/
29+
private $prefix;
30+
31+
/**
32+
* Configuration index name for this index.
33+
*
34+
* @var string
35+
*/
36+
private $configurationIndexName;
37+
38+
/**
39+
* Mapping for configuration index.
40+
*
41+
* @var array
42+
*/
43+
public static $elasticSearchConfigurationMapping = [
44+
'mappings' => [
45+
'configuration' => [
46+
'properties' => [
47+
'name' => ['type' => 'string', 'index' => 'not_analyzed'],
48+
'timestamp' => ['type' => 'date']
49+
]
50+
]
51+
]
52+
];
53+
/**
54+
* Constructor.
55+
*
56+
* @param \Elasticsearch\Client $engine
57+
* @param string $prefix Identifier for who's configuration this is intended.
58+
* @param Psr\Log\LoggerInterface $logger
59+
*/
60+
public function __construct(\Elasticsearch\Client $engine, $prefix, LoggerInterface $logger = null)
61+
{
62+
$this->engine = $engine;
63+
$this->prefix = $prefix;
64+
if ($logger !== null) {
65+
$this->logger = $logger;
66+
} else {
67+
$this->logger = new NullLogger();
68+
}
69+
$this->configurationIndexName = sprintf(static::INDEX_NAME_CONFIG, $this->prefix);
70+
}
71+
72+
/**
73+
* Get the primary index name for this configuration.
74+
*
75+
* @return string
76+
* @throws \ElasticsearchRotator\Exceptions\MissingPrimaryException
77+
*/
78+
public function getPrimaryIndex()
79+
{
80+
if (!$this->engine->indices()->exists(['index' => $this->configurationIndexName])) {
81+
throw new Exception\MissingPrimaryIndex('Configuration index not available.');
82+
}
83+
$primaryPayload = [
84+
'index' => $this->configurationIndexName,
85+
'type' => static::TYPE_CONFIGURATION,
86+
'id' => static::PRIMARY_ID
87+
];
88+
try {
89+
$primary = $this->engine->get($primaryPayload);
90+
} catch (\Elasticsearch\Common\Exceptions\Missing404Exception $e) {
91+
throw new Exception\MissingPrimaryIndex('Configuration index not available.');
92+
}
93+
return $primary['_source']['name'];
94+
}
95+
96+
/**
97+
* Sets the primary index for searches using this configuration.
98+
*
99+
* @param string $name Index name for the primary index to use.
100+
* @return void
101+
*/
102+
public function setPrimaryIndex($name)
103+
{
104+
if (!$this->engine->indices()->exists(['index' => $this->configurationIndexName])) {
105+
$this->createCurrentIndexConfiguration();
106+
}
107+
$this->engine->index([
108+
'index' => $this->configurationIndexName,
109+
'type' => static::TYPE_CONFIGURATION,
110+
'id' => static::PRIMARY_ID,
111+
'body' => [
112+
'name' => $name,
113+
'timestamp' => time()
114+
]
115+
]);
116+
$this->logger->debug('Primary index set.', compact('name'));
117+
}
118+
119+
/**
120+
* Copy the primary index to a secondary index.
121+
*
122+
* @param integer $retryCount Recursive retry count for retrying the operation of this method.
123+
* @return string ID of the newly created secondary entry.
124+
* @throws \Zumba\ElasticsearchRotator\Exception\PrimaryIndexCopyFailure
125+
*/
126+
public function copyPrimaryIndexToSecondary($retryCount = 0)
127+
{
128+
if (!$this->engine->indices()->exists(['index' => $this->configurationIndexName])) {
129+
$this->createCurrentIndexConfiguration();
130+
}
131+
try {
132+
$primaryName = $this->getPrimaryIndex();
133+
} catch (\Elasticsearch\Common\Exceptions\ServerErrorResponseException $e) {
134+
$this->logger->debug('Unable to get primary index.', json_decode($e->getMessage(), true));
135+
usleep(static::RETRY_TIME_COPY);
136+
if ($retryCount > static::MAX_RETRY_COUNT) {
137+
throw new Exception\PrimaryIndexCopyFailure('Unable to copy primary to secondary index.');
138+
}
139+
return $this->copyPrimaryIndexToSecondary($retryCount++);
140+
}
141+
$id = $this->engine->index([
142+
'index' => $this->configurationIndexName,
143+
'type' => static::TYPE_CONFIGURATION,
144+
'body' => [
145+
'name' => $primaryName,
146+
'timestamp' => time()
147+
]
148+
])['_id'];
149+
$this->logger->debug('Secondary entry created.', compact('id'));
150+
return $id;
151+
}
152+
153+
/**
154+
* Retrieve a list of all secondary indexes (rotated from) that are older than provided date (or ES date math)
155+
*
156+
* Note, if date is not provided, it will find all secondary indexes.
157+
*
158+
* @param string $olderThan
159+
* @return array
160+
*/
161+
public function getSecondaryIndexes(\DateTime $olderThan = null)
162+
{
163+
if ($olderThan === null) {
164+
$olderThan = new \DateTime();
165+
}
166+
$params = [
167+
'index' => $this->configurationIndexName,
168+
'type' => static::TYPE_CONFIGURATION,
169+
'body' => [
170+
'query' => [
171+
'bool' => [
172+
'must_not' => [
173+
'term' => [
174+
'_id' => static::PRIMARY_ID
175+
]
176+
],
177+
'filter' => [
178+
'range' => [
179+
'timestamp' => [
180+
'lt' => $olderThan->format('U')
181+
]
182+
]
183+
]
184+
]
185+
],
186+
'sort' => ['_doc' => 'asc']
187+
]
188+
];
189+
// This section is to support deprecated feature set for ES 1.x.
190+
// It may be removed in future versions of this library when ES 1.x is sufficiently unsupported.
191+
if (!$this->doesSupportCombinedQueryFilter()) {
192+
unset($params['body']['query']['bool']['filter']);
193+
$params['body']['filter']['range']['timestamp']['lt'] = $olderThan->format('U');
194+
}
195+
$results = $this->engine->search($params);
196+
if ($results['hits']['total'] == 0) {
197+
return [];
198+
}
199+
return array_map(function($entry) {
200+
return $entry['_source']['name'];
201+
}, $results['hits']['hits']);
202+
return $results['hits']['total'] > 0 ? array_column($results['hits']['hits'], '_source') : [];
203+
}
204+
205+
/**
206+
* Remove any secondary index older that provided date.
207+
*
208+
* If no date is provided, will remove all secondary indices.
209+
*
210+
* @param \DateTime $olderThan
211+
* @return array Results of the bulk operation.
212+
*/
213+
public function deleteSecondaryIndexes(\DateTime $olderThan = null)
214+
{
215+
$results = [];
216+
foreach ($this->getSecondaryIndexes($olderThan) as $indexToDelete) {
217+
if ($this->engine->indices()->exists(['index' => $indexToDelete])) {
218+
$results[$indexToDelete] = $this->engine->indices()->delete(['index' => $indexToDelete]);
219+
$this->logger->debug('Deleted secondary index.', compact('indexToDelete'));
220+
} else {
221+
$this->logger->debug('Index not found to delete.', compact('indexToDelete'));
222+
}
223+
}
224+
return $results;
225+
}
226+
227+
/**
228+
* Create the index needed to store the primary index name.
229+
*
230+
* @return void
231+
*/
232+
private function createCurrentIndexConfiguration()
233+
{
234+
$this->engine->indices()->create([
235+
'index' => $this->configurationIndexName,
236+
'body' => static::$elasticSearchConfigurationMapping
237+
]);
238+
$this->logger->debug('Configuration index created.', [
239+
'index' => $this->configurationIndexName
240+
]);
241+
}
242+
243+
/**
244+
* Determines if the combined filter in query DSL is supported.
245+
*
246+
* @return boolean
247+
*/
248+
private function doesSupportCombinedQueryFilter()
249+
{
250+
return version_compare($this->engine->info()['version']['number'], '2.0.0', '>=');
251+
}
252+
}

tests/IndexRotatorTest.php

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
<?php
2+
3+
use \Zumba\ElasticsearchRotator\IndexRotator;
4+
use \Zumba\PHPUnit\Extensions\ElasticSearch\Client\Connector;
5+
use \Zumba\PHPUnit\Extensions\ElasticSearch\DataSet\DataSet;
6+
7+
class IndexRotatorTest extends \PHPUnit_Framework_TestCase
8+
{
9+
use \Zumba\PHPUnit\Extensions\ElasticSearch\TestTrait;
10+
11+
public function setUp() {
12+
$this->indexRotator = new IndexRotator($this->getElasticSearchConnector()->getConnection(), 'config_test');
13+
parent::setUp();
14+
}
15+
16+
public function getElasticSearchConnector()
17+
{
18+
if (empty($this->connection)) {
19+
$clientBuilder = \Elasticsearch\ClientBuilder::create();
20+
if (getenv('ES_TEST_HOST')) {
21+
$clientBuilder->setHosts([getenv('ES_TEST_HOST')]);
22+
}
23+
$this->connection = new Connector($clientBuilder->build());
24+
}
25+
return $this->connection;
26+
}
27+
28+
public function getElasticSearchDataSet()
29+
{
30+
$dataSet = new DataSet($this->getElasticSearchConnector());
31+
$dataSet->setFixture([
32+
'.config_test_configuration' => [
33+
'configuration' => [
34+
[
35+
'id' => 'primary',
36+
'name' => 'some_index_1',
37+
'timestamp' => time()
38+
],
39+
[
40+
'id' => 'somesecondary2',
41+
'name' => 'some_index_3',
42+
'timestamp' => (new \DateTime('2015-02-01'))->format('U')
43+
],
44+
[
45+
'id' => 'somesecondary1',
46+
'name' => 'some_index_2',
47+
'timestamp' => (new \DateTime('2015-01-15'))->format('U')
48+
],
49+
]
50+
],
51+
'some_index_1' => [],
52+
'some_index_2' => [],
53+
'some_index_3' => [],
54+
]);
55+
$dataSet->setMappings([
56+
'.config_test_configuration' => IndexRotator::$elasticSearchConfigurationMapping['mappings']
57+
]);
58+
return $dataSet;
59+
}
60+
61+
public function testGetPrimaryIndex()
62+
{
63+
$this->assertEquals('some_index_1', $this->indexRotator->getPrimaryIndex());
64+
}
65+
66+
/**
67+
* @expectedException Zumba\ElasticsearchRotator\Exception\MissingPrimaryIndex
68+
*/
69+
public function testFailingToRetreivePrimaryIndex() {
70+
// Remove the fixtured primary index.
71+
$this->elasticSearchTearDown();
72+
$this->indexRotator->getPrimaryIndex();
73+
}
74+
75+
public function testSetPrimaryIndex()
76+
{
77+
$this->indexRotator->setPrimaryIndex('some_index_2');
78+
$this->assertEquals('some_index_2', $this->indexRotator->getPrimaryIndex());
79+
}
80+
81+
public function testCopyPrimaryIndexToSecondary()
82+
{
83+
$id = $this->indexRotator->copyPrimaryIndexToSecondary();
84+
$this->assertNotEquals('primary', $id);
85+
$results = $this->getElasticSearchConnector()->getConnection()->get([
86+
'index' => '.config_test_configuration',
87+
'type' => 'configuration',
88+
'id' => $id
89+
]);
90+
$this->assertEquals('some_index_1', $results['_source']['name']);
91+
}
92+
93+
/**
94+
* @dataProvider secondaryIndexConditionProvider
95+
*/
96+
public function testGetSecondaryIndices($olderThan, $expectedIndices) {
97+
$results = $this->indexRotator->getSecondaryIndexes($olderThan);
98+
$this->assertEmpty(array_diff($results, $expectedIndices));
99+
}
100+
101+
/**
102+
* @dataProvider secondaryIndexConditionProvider
103+
*/
104+
public function testDeleteSecondaryIndices($olderThan, $expectedToDelete) {
105+
$results = $this->indexRotator->deleteSecondaryIndexes($olderThan);
106+
$this->assertEmpty(array_diff(array_keys($results), $expectedToDelete));
107+
foreach ($results as $result) {
108+
$this->assertEquals(['acknowledged' => true], $result);
109+
}
110+
}
111+
112+
public function secondaryIndexConditionProvider() {
113+
return [
114+
'all' => [null, ['some_index_3', 'some_index_2']],
115+
'older than 2015-02-01' => [new \DateTime('2015-02-01'), ['some_index_2']],
116+
'older than 2015-01-01' => [new \DateTime('2015-01-01'), []]
117+
];
118+
}
119+
}

0 commit comments

Comments
 (0)