|
15 | 15 | use EventEngine\DocumentStore\Filter\Filter;
|
16 | 16 | use EventEngine\DocumentStore\Index;
|
17 | 17 | use EventEngine\DocumentStore\OrderBy\OrderBy;
|
| 18 | +use EventEngine\DocumentStore\PartialSelect; |
18 | 19 | use EventEngine\DocumentStore\Postgres\Exception\InvalidArgumentException;
|
19 | 20 | use EventEngine\DocumentStore\Postgres\Exception\RuntimeException;
|
20 | 21 | use EventEngine\Util\VariableType;
|
| 22 | +use function implode; |
| 23 | +use function is_string; |
| 24 | +use function json_decode; |
| 25 | +use function mb_strlen; |
| 26 | +use function mb_substr; |
| 27 | +use function sprintf; |
21 | 28 |
|
22 | 29 | final class PostgresDocumentStore implements DocumentStore\DocumentStore
|
23 | 30 | {
|
| 31 | + private const PARTIAL_SELECT_DOC_ID = '__partial_sel_doc_id__'; |
| 32 | + private const PARTIAL_SELECT_MERGE = '__partial_sel_merge__'; |
| 33 | + |
24 | 34 | /**
|
25 | 35 | * @var \PDO
|
26 | 36 | */
|
@@ -489,12 +499,7 @@ public function getDoc(string $collectionName, string $docId): ?array
|
489 | 499 | }
|
490 | 500 |
|
491 | 501 | /**
|
492 |
| - * @param string $collectionName |
493 |
| - * @param Filter $filter |
494 |
| - * @param int|null $skip |
495 |
| - * @param int|null $limit |
496 |
| - * @param OrderBy|null $orderBy |
497 |
| - * @return \Traversable list of docs |
| 502 | + * @inheritDoc |
498 | 503 | */
|
499 | 504 | public function filterDocs(string $collectionName, Filter $filter, int $skip = null, int $limit = null, OrderBy $orderBy = null): \Traversable
|
500 | 505 | {
|
@@ -524,6 +529,68 @@ public function filterDocs(string $collectionName, Filter $filter, int $skip = n
|
524 | 529 | }
|
525 | 530 | }
|
526 | 531 |
|
| 532 | + /** |
| 533 | + * @inheritDoc |
| 534 | + */ |
| 535 | + public function findDocs(string $collectionName, Filter $filter, int $skip = null, int $limit = null, OrderBy $orderBy = null): \Traversable |
| 536 | + { |
| 537 | + [$filterStr, $args] = $this->filterToWhereClause($filter); |
| 538 | + |
| 539 | + $where = $filterStr ? "WHERE $filterStr" : ''; |
| 540 | + |
| 541 | + $offset = $skip !== null ? "OFFSET $skip" : ''; |
| 542 | + $limit = $limit !== null ? "LIMIT $limit" : ''; |
| 543 | + |
| 544 | + $orderBy = $orderBy ? "ORDER BY " . implode(', ', $this->orderByToSort($orderBy)) : ''; |
| 545 | + |
| 546 | + $query = <<<EOT |
| 547 | +SELECT id, doc |
| 548 | +FROM {$this->schemaName($collectionName)}.{$this->tableName($collectionName)} |
| 549 | +$where |
| 550 | +$orderBy |
| 551 | +$limit |
| 552 | +$offset; |
| 553 | +EOT; |
| 554 | + $stmt = $this->connection->prepare($query); |
| 555 | + |
| 556 | + $stmt->execute($args); |
| 557 | + |
| 558 | + while($row = $stmt->fetch(\PDO::FETCH_ASSOC)) { |
| 559 | + yield $row['id'] => json_decode($row['doc'], true); |
| 560 | + } |
| 561 | + } |
| 562 | + |
| 563 | + public function findPartialDocs(string $collectionName, PartialSelect $partialSelect, Filter $filter, int $skip = null, int $limit = null, OrderBy $orderBy = null): \Traversable |
| 564 | + { |
| 565 | + [$filterStr, $args] = $this->filterToWhereClause($filter); |
| 566 | + |
| 567 | + $select = $this->makeSelect($partialSelect); |
| 568 | + |
| 569 | + $where = $filterStr ? "WHERE $filterStr" : ''; |
| 570 | + |
| 571 | + $offset = $skip !== null ? "OFFSET $skip" : ''; |
| 572 | + $limit = $limit !== null ? "LIMIT $limit" : ''; |
| 573 | + |
| 574 | + $orderBy = $orderBy ? "ORDER BY " . implode(', ', $this->orderByToSort($orderBy)) : ''; |
| 575 | + |
| 576 | + $query = <<<EOT |
| 577 | +SELECT $select |
| 578 | +FROM {$this->schemaName($collectionName)}.{$this->tableName($collectionName)} |
| 579 | +$where |
| 580 | +$orderBy |
| 581 | +$limit |
| 582 | +$offset; |
| 583 | +EOT; |
| 584 | + |
| 585 | + $stmt = $this->connection->prepare($query); |
| 586 | + |
| 587 | + $stmt->execute($args); |
| 588 | + |
| 589 | + while($row = $stmt->fetch(\PDO::FETCH_ASSOC)) { |
| 590 | + yield $row[self::PARTIAL_SELECT_DOC_ID] => $this->transformPartialDoc($partialSelect, $row); |
| 591 | + } |
| 592 | + } |
| 593 | + |
527 | 594 | /**
|
528 | 595 | * @param string $collectionName
|
529 | 596 | * @param Filter $filter
|
@@ -714,6 +781,83 @@ private function makeInClause(string $prop, array $valList, int $argsCount, bool
|
714 | 781 | return ["$prop IN($params)", $argList, $argsCount];
|
715 | 782 | }
|
716 | 783 |
|
| 784 | + private function makeSelect(PartialSelect $partialSelect): string |
| 785 | + { |
| 786 | + $select = 'id as "'.self::PARTIAL_SELECT_DOC_ID.'", '; |
| 787 | + |
| 788 | + foreach ($partialSelect->fieldAliasMap() as $mapItem) { |
| 789 | + |
| 790 | + if($mapItem['alias'] === self::PARTIAL_SELECT_DOC_ID) { |
| 791 | + throw new RuntimeException(sprintf( |
| 792 | + "Invalid select alias. You cannot use %s as alias, because it is reserved for internal use", |
| 793 | + self::PARTIAL_SELECT_DOC_ID |
| 794 | + )); |
| 795 | + } |
| 796 | + |
| 797 | + if($mapItem['alias'] === self::PARTIAL_SELECT_MERGE) { |
| 798 | + throw new RuntimeException(sprintf( |
| 799 | + "Invalid select alias. You cannot use %s as alias, because it is reserved for internal use", |
| 800 | + self::PARTIAL_SELECT_MERGE |
| 801 | + )); |
| 802 | + } |
| 803 | + |
| 804 | + if($mapItem['alias'] === PartialSelect::MERGE_ALIAS) { |
| 805 | + $mapItem['alias'] = self::PARTIAL_SELECT_MERGE; |
| 806 | + } |
| 807 | + |
| 808 | + $select.= $this->propToJsonPath($mapItem['field']) . ' as "' . $mapItem['alias'] . '", '; |
| 809 | + } |
| 810 | + |
| 811 | + $select = mb_substr($select, 0, mb_strlen($select) - 2); |
| 812 | + |
| 813 | + return $select; |
| 814 | + } |
| 815 | + |
| 816 | + private function transformPartialDoc(PartialSelect $partialSelect, array $selectedDoc): array |
| 817 | + { |
| 818 | + $partialDoc = []; |
| 819 | + |
| 820 | + foreach ($partialSelect->fieldAliasMap() as ['field' => $field, 'alias' => $alias]) { |
| 821 | + if($alias === PartialSelect::MERGE_ALIAS) { |
| 822 | + if(null === $selectedDoc[self::PARTIAL_SELECT_MERGE] ?? null) { |
| 823 | + continue; |
| 824 | + } |
| 825 | + |
| 826 | + $value = json_decode($selectedDoc[self::PARTIAL_SELECT_MERGE], true); |
| 827 | + |
| 828 | + if(!is_array($value)) { |
| 829 | + throw new RuntimeException('Merge not possible. $merge alias was specified for field: ' . $field . ' but field value is not an array: ' . json_encode($value)); |
| 830 | + } |
| 831 | + |
| 832 | + foreach ($value as $k => $v) { |
| 833 | + $partialDoc[$k] = $v; |
| 834 | + } |
| 835 | + |
| 836 | + continue; |
| 837 | + } |
| 838 | + |
| 839 | + $value = $selectedDoc[$alias] ?? null; |
| 840 | + |
| 841 | + if(is_string($value)) { |
| 842 | + $value = json_decode($value, true); |
| 843 | + } |
| 844 | + |
| 845 | + $keys = explode('.', $alias); |
| 846 | + |
| 847 | + $ref = &$partialDoc; |
| 848 | + foreach ($keys as $i => $key) { |
| 849 | + if(!array_key_exists($key, $ref)) { |
| 850 | + $ref[$key] = []; |
| 851 | + } |
| 852 | + $ref = &$ref[$key]; |
| 853 | + } |
| 854 | + $ref = $value; |
| 855 | + unset($ref); |
| 856 | + } |
| 857 | + |
| 858 | + return $partialDoc; |
| 859 | + } |
| 860 | + |
717 | 861 | private function orderByToSort(DocumentStore\OrderBy\OrderBy $orderBy): array
|
718 | 862 | {
|
719 | 863 | $sort = [];
|
|
0 commit comments