diff --git a/src/Helper/Vectors.php b/src/Helper/Vectors.php new file mode 100644 index 000000000..225191324 --- /dev/null +++ b/src/Helper/Vectors.php @@ -0,0 +1,28 @@ + $vector + */ + public static function packDenseVector(array $vector): string + { + return base64_encode(pack('G*', ...$vector)); + } +} diff --git a/src/Utility.php b/src/Utility.php index ed46b1b11..1d3182e78 100644 --- a/src/Utility.php +++ b/src/Utility.php @@ -40,4 +40,4 @@ public static function formatVariableName(string $var): string } return preg_replace('/[^a-zA-Z0-9_]/', '', $var); } -} \ No newline at end of file +} diff --git a/tests/Integration/BulkTest.php b/tests/Integration/BulkTest.php index f09e95763..ea6e23388 100644 --- a/tests/Integration/BulkTest.php +++ b/tests/Integration/BulkTest.php @@ -15,6 +15,7 @@ namespace Elastic\Elasticsearch\Tests\Integration; use Elastic\Elasticsearch\Client; +use Elastic\Elasticsearch\Helper\Vectors; use Elastic\Elasticsearch\Tests\Utility; use PHPUnit\Framework\TestCase; @@ -86,4 +87,50 @@ public function testBulkIndexWithoutId() $this->assertEquals(200, $response->getStatusCode()); $this->assertCount(2, $response['items']); } -} \ No newline at end of file + + public function testBulkIndexWithBase64Vector() + { + $response = $this->client->indices()->create([ + 'index' => self::TEST_INDEX, + 'body' => [ + 'mappings' => [ + 'properties' => [ + 'title' => ['type' => 'text'], + 'emb' => ['type' => 'dense_vector'], + ], + ], + ], + ]); + $this->assertEquals(200, $response->getStatusCode()); + + $response = $this->client->indices()->refresh([ + 'index' => self::TEST_INDEX, + ]); + $this->assertEquals(200, $response->getStatusCode()); + + $response = $this->client->bulk([ + 'body' => [ + [ + "index" => [ + "_index" => self::TEST_INDEX + ], + ], + [ + "text" => "text one", + "emb" => Vectors::packDenseVector([1.0, 2.0]) + ], + [ + "index" => [ + "_index" => self::TEST_INDEX + ], + ], + [ + "text" => "text two", + "emb" => Vectors::packDenseVector([3.4, 5.6]) + ], + ] + ]); + $this->assertEquals(200, $response->getStatusCode()); + $this->assertCount(2, $response['items']); + } +} diff --git a/util/dense_vector_benchmark.php b/util/dense_vector_benchmark.php new file mode 100644 index 000000000..660b24f3d --- /dev/null +++ b/util/dense_vector_benchmark.php @@ -0,0 +1,183 @@ +indices()->exists(['index' => $index])->getStatusCode() != 404) { + $client->indices()->delete(['index' => $index]); + } + $client->indices()->create([ + 'index' => $index, + 'body' => [ + 'mappings' => [ + 'properties' => [ + 'docid' => [ + 'type' => 'keyword', + ], + 'title' => [ + 'type' => 'text', + ], + 'text' => [ + 'type' => 'text', + ], + 'emb' => [ + 'type' => 'dense_vector', + 'index_options' => [ + 'type' => 'flat', + ], + ], + ], + ], + ], + ]); + $client->indices()->refresh(['index' => $index]); + + // run the bulk upload + $len = sizeof($dataset); + $params = ['body' => []]; + $start = microtime(true); + for ($i = 1; $i <= $len * $repetitions; $i++) { + $doc = $dataset[($i - 1) % $len]; + $params['body'][] = ['index' => ['_index' => $index]]; + $params['body'][] = [ + 'docid' => $doc['docid'], + 'title' => $doc['title'], + 'text' => $doc['text'], + 'emb' => $packed ? Vectors::packDenseVector($doc['emb']) : $doc['emb'], + ]; + if ($i % $chunk_size == 0) { + $response = $client->bulk($params); + if ($response['errors']) { + echo 'Error during bulk upload. Exiting'; + exit(1); + } + $params = ['body' => []]; + unset($response); + } + } + if (!empty($params['body'])) { + $response = $client->bulk($params); + if ($response['errors']) { + echo 'Error during bulk upload. Exiting'; + exit(1); + } + unset($params); + unset($response); + } + return microtime(true) - $start; +} + +$opts = getopt('s:r:', array('url:', 'json', 'runs:', 'help'), $rest_index); +if (array_key_exists('help', $opts)) { + echo 'Usage: ' . $argv[0] . '[-s CHUNK_SIZES] [-r REPETITIONS] [--url URL] [--json] [--runs RUNS] DATASET_FILE\n'; + echo ' -s CHUNK_SIZES List of chunk sizes to use, separated by commas (default: 100,250,500,1000)\n'; + echo ' -r REPETITIONS Number of times the dataset is repeated (default: 20)\n'; + echo ' --url URL The Elasticsearch connection URL\n'; + echo ' --json Output benchmark results in JSON format\n'; + echo ' --runs Number of runs that are averaged for each chunk size (default: 3)\n'; + exit(0); +} +if (!array_key_exists('url', $opts)) { + echo 'Error: --url argument is required.'; + exit(1); +} +else { + $ELASTICSEARCH_URL = $opts['url']; +} +if (array_key_exists('s', $opts)) { + $chunk_sizes = array_map(fn($v) => intval($v), explode(',', $opts['s'])); +} +if (array_key_exists('r', $opts)) { + $repetitions = intval($opts['r']); +} +if (array_key_exists('json', $opts)) { + $json_output = TRUE; +} +if (array_key_exists('runs', $opts)) { + $runs = intval($opts['runs']); +} +if (!$argv[$rest_index]) { + echo 'Error'; + exit(1); +} +else { + $dataset_file = $argv[$rest_index]; +} + +// read CSV dataset +$f = fopen($dataset_file, 'rt'); +while (!feof($f)) { + $line = fgets($f); + if ($line !== FALSE) { + $dataset[] = json_decode($line, true); + } +} +fclose($f); + +// initialize client +$client = Elastic\Elasticsearch\ClientBuilder::create() + ->setHosts([$ELASTICSEARCH_URL]) + ->build(); + +// run the benchmark +$results = []; +foreach ($chunk_sizes as $chunk_size) { + if (!$json_output) { + echo 'Uploading ' . $dataset_file . ' with chunk size ' . $chunk_size . "...\n"; + } + $normal_runs = []; + $packed_runs = []; + for ($run = 0; $run < $runs; $run++) { + $normal_runs[] = upload($client, $index, $dataset, $chunk_size, $repetitions, FALSE); + $packed_runs[] = upload($client, $index, $dataset, $chunk_size, $repetitions, TRUE); + } + $t = array_sum($normal_runs) / $runs; + $pt = array_sum($packed_runs) / $runs; + $result = [ + 'dataset_size' => sizeof($dataset) * $repetitions, + 'chunk_size' => $chunk_size, + 'float32' => [ + 'duration' => intval($t * 1000 + 0.5), + ], + 'base64' => [ + 'duration' => intval($pt * 1000 + 0.5), + ], + ]; + $results[] = $result; + if (!$json_output) { + echo 'Size: ' . $result['dataset_size'] . "\n"; + echo 'float duration: ' . number_format($t, 2) . 's (' . number_format($result['dataset_size'] / $t, 2) . " docs/s)\n"; + echo 'base64 duration: ' . number_format($pt, 2) . 's (' . number_format($result['dataset_size'] / $pt, 2) . " docs/s)\n"; + echo 'Speed up: ' . number_format($t / $pt, 2) . "x\n"; + } +} + +if ($json_output) { + echo json_encode($results, JSON_PRETTY_PRINT); +}