1717use EventEngine \DocumentStore \OrderBy \OrderBy ;
1818use EventEngine \DocumentStore \PartialSelect ;
1919use EventEngine \DocumentStore \Postgres \Exception \RuntimeException ;
20- use EventEngine \DocumentStore \Postgres \Filter \PostgresFilterProcessor ;
2120use EventEngine \DocumentStore \Postgres \Filter \FilterProcessor ;
21+ use EventEngine \DocumentStore \Postgres \Filter \PostgresFilterProcessor ;
22+ use EventEngine \DocumentStore \Postgres \OrderBy \OrderByClause ;
23+ use EventEngine \DocumentStore \Postgres \OrderBy \OrderByProcessor ;
24+ use EventEngine \DocumentStore \Postgres \OrderBy \PostgresOrderByProcessor ;
2225use EventEngine \Util \VariableType ;
2326
2427use function implode ;
@@ -43,6 +46,11 @@ final class PostgresDocumentStore implements DocumentStore\DocumentStore
4346 */
4447 private $ filterProcessor ;
4548
49+ /**
50+ * @var OrderByProcessor
51+ */
52+ private $ orderByProcessor ;
53+
4654 private $ tablePrefix = 'em_ds_ ' ;
4755
4856 private $ docIdSchema = 'UUID NOT NULL ' ;
@@ -57,7 +65,8 @@ public function __construct(
5765 string $ docIdSchema = null ,
5866 bool $ transactional = true ,
5967 bool $ useMetadataColumns = false ,
60- FilterProcessor $ filterProcessor = null
68+ FilterProcessor $ filterProcessor = null ,
69+ OrderByProcessor $ orderByProcessor = null
6170 ) {
6271 $ this ->connection = $ connection ;
6372 $ this ->connection ->setAttribute (\PDO ::ATTR_ERRMODE , \PDO ::ERRMODE_EXCEPTION );
@@ -67,6 +76,11 @@ public function __construct(
6776 }
6877 $ this ->filterProcessor = $ filterProcessor ;
6978
79+ if (null === $ orderByProcessor ) {
80+ $ orderByProcessor = new PostgresOrderByProcessor ($ useMetadataColumns );
81+ }
82+ $ this ->orderByProcessor = $ orderByProcessor ;
83+
7084 if (null !== $ tablePrefix ) {
7185 $ this ->tablePrefix = $ tablePrefix ;
7286 }
@@ -441,7 +455,7 @@ public function upsertDoc(string $collectionName, string $docId, array $docOrSub
441455 {
442456 $ doc = $ this ->getDoc ($ collectionName , $ docId );
443457
444- if ($ doc !== null ) {
458+ if ($ doc !== null ) {
445459 $ this ->updateDoc ($ collectionName , $ docId , $ docOrSubset );
446460 } else {
447461 $ this ->addDoc ($ collectionName , $ docId , $ docOrSubset );
@@ -625,12 +639,16 @@ public function filterDocs(string $collectionName, Filter $filter, int $skip = n
625639 $ filterStr = $ filterClause ->clause ();
626640 $ args = $ filterClause ->args ();
627641
642+ $ orderByClause = $ orderBy ? $ this ->orderByProcessor ->process ($ orderBy ) : new OrderByClause (null , []);
643+ $ orderByStr = $ orderByClause ->clause ();
644+ $ orderByArgs = $ orderByClause ->args ();
645+
628646 $ where = $ filterStr ? "WHERE $ filterStr " : '' ;
629647
630648 $ offset = $ skip !== null ? "OFFSET $ skip " : '' ;
631649 $ limit = $ limit !== null ? "LIMIT $ limit " : '' ;
632650
633- $ orderBy = $ orderBy ? "ORDER BY " . implode ( ' , ' , $ this -> orderByToSort ( $ orderBy )) : '' ;
651+ $ orderBy = $ orderByStr ? "ORDER BY $ orderByStr " : '' ;
634652
635653 $ query = <<<EOT
636654SELECT doc
@@ -642,7 +660,7 @@ public function filterDocs(string $collectionName, Filter $filter, int $skip = n
642660EOT ;
643661 $ stmt = $ this ->connection ->prepare ($ query );
644662
645- $ stmt ->execute ($ args );
663+ $ stmt ->execute (array_merge ( $ args, $ orderByArgs ) );
646664
647665 while ($ row = $ stmt ->fetch (\PDO ::FETCH_ASSOC )) {
648666 yield json_decode ($ row ['doc ' ], true );
@@ -658,12 +676,16 @@ public function findDocs(string $collectionName, Filter $filter, int $skip = nul
658676 $ filterStr = $ filterClause ->clause ();
659677 $ args = $ filterClause ->args ();
660678
679+ $ orderByClause = $ orderBy ? $ this ->orderByProcessor ->process ($ orderBy ) : new OrderByClause (null , []);
680+ $ orderByStr = $ orderByClause ->clause ();
681+ $ orderByArgs = $ orderByClause ->args ();
682+
661683 $ where = $ filterStr ? "WHERE $ filterStr " : '' ;
662684
663685 $ offset = $ skip !== null ? "OFFSET $ skip " : '' ;
664686 $ limit = $ limit !== null ? "LIMIT $ limit " : '' ;
665687
666- $ orderBy = $ orderBy ? "ORDER BY " . implode ( ' , ' , $ this -> orderByToSort ( $ orderBy )) : '' ;
688+ $ orderBy = $ orderByStr ? "ORDER BY $ orderByStr " : '' ;
667689
668690 $ query = <<<EOT
669691SELECT id, doc
@@ -675,7 +697,7 @@ public function findDocs(string $collectionName, Filter $filter, int $skip = nul
675697EOT ;
676698 $ stmt = $ this ->connection ->prepare ($ query );
677699
678- $ stmt ->execute ($ args );
700+ $ stmt ->execute (array_merge ( $ args, $ orderByArgs ) );
679701
680702 while ($ row = $ stmt ->fetch (\PDO ::FETCH_ASSOC )) {
681703 yield $ row ['id ' ] => json_decode ($ row ['doc ' ], true );
@@ -688,14 +710,18 @@ public function findPartialDocs(string $collectionName, PartialSelect $partialSe
688710 $ filterStr = $ filterClause ->clause ();
689711 $ args = $ filterClause ->args ();
690712
713+ $ orderByClause = $ orderBy ? $ this ->orderByProcessor ->process ($ orderBy ) : new OrderByClause (null , []);
714+ $ orderByStr = $ orderByClause ->clause ();
715+ $ orderByArgs = $ orderByClause ->args ();
716+
691717 $ select = $ this ->makeSelect ($ partialSelect );
692718
693719 $ where = $ filterStr ? "WHERE $ filterStr " : '' ;
694720
695721 $ offset = $ skip !== null ? "OFFSET $ skip " : '' ;
696722 $ limit = $ limit !== null ? "LIMIT $ limit " : '' ;
697723
698- $ orderBy = $ orderBy ? "ORDER BY " . implode ( ' , ' , $ this -> orderByToSort ( $ orderBy )) : '' ;
724+ $ orderBy = $ orderByStr ? "ORDER BY $ orderByStr " : '' ;
699725
700726 $ query = <<<EOT
701727SELECT $ select
@@ -708,7 +734,7 @@ public function findPartialDocs(string $collectionName, PartialSelect $partialSe
708734
709735 $ stmt = $ this ->connection ->prepare ($ query );
710736
711- $ stmt ->execute ($ args );
737+ $ stmt ->execute (array_merge ( $ args, $ orderByArgs ) );
712738
713739 while ($ row = $ stmt ->fetch (\PDO ::FETCH_ASSOC )) {
714740 yield $ row [self ::PARTIAL_SELECT_DOC_ID ] => $ this ->transformPartialDoc ($ partialSelect , $ row );
@@ -870,28 +896,6 @@ private function transformPartialDoc(PartialSelect $partialSelect, array $select
870896 return $ partialDoc ;
871897 }
872898
873- private function orderByToSort (DocumentStore \OrderBy \OrderBy $ orderBy ): array
874- {
875- $ sort = [];
876-
877- if ($ orderBy instanceof DocumentStore \OrderBy \AndOrder) {
878- /** @var DocumentStore\OrderBy\Asc|DocumentStore\OrderBy\Desc $orderByA */
879- $ orderByA = $ orderBy ->a ();
880- $ direction = $ orderByA instanceof DocumentStore \OrderBy \Asc ? 'ASC ' : 'DESC ' ;
881- $ prop = $ this ->propToJsonPath ($ orderByA ->prop ());
882- $ sort [] = "{$ prop } $ direction " ;
883-
884- $ sortB = $ this ->orderByToSort ($ orderBy ->b ());
885-
886- return array_merge ($ sort , $ sortB );
887- }
888-
889- /** @var DocumentStore\OrderBy\Asc|DocumentStore\OrderBy\Desc $orderBy */
890- $ direction = $ orderBy instanceof DocumentStore \OrderBy \Asc ? 'ASC ' : 'DESC ' ;
891- $ prop = $ this ->propToJsonPath ($ orderBy ->prop ());
892- return ["{$ prop } $ direction " ];
893- }
894-
895899 private function indexToSqlCmd (Index $ index , string $ collectionName ): string
896900 {
897901 if ($ index instanceof DocumentStore \FieldIndex) {
0 commit comments