33import java .time .Instant ;
44import java .util .ArrayList ;
55import java .util .List ;
6- import java .util .concurrent .CompletableFuture ;
76import java .util .concurrent .ThreadLocalRandom ;
87
98import tech .ydb .common .transaction .TxMode ;
10- import tech .ydb .core .Result ;
119import tech .ydb .query .tools .QueryReader ;
1210import tech .ydb .query .tools .SessionRetryContext ;
1311import tech .ydb .table .query .Params ;
2018 * @author Kirill Kurdyukov
2119 */
2220public class IssueYdbRepository {
23- // Контекст для автоматических повторных попыток выполнения запросов
24- private final SessionRetryContext retryCtx ;
21+
22+ private final QueryServiceHelper queryServiceHelper ;
2523
2624 public IssueYdbRepository (SessionRetryContext retryCtx ) {
27- this .retryCtx = retryCtx ;
25+ this .queryServiceHelper = new QueryServiceHelper ( retryCtx ) ;
2826 }
2927
3028 /**
@@ -33,29 +31,27 @@ public IssueYdbRepository(SessionRetryContext retryCtx) {
3331 * выполняются за один запрос к YDB.
3432 */
3533 public List <IssueLinkCount > linkTicketsNoInteractive (long idT1 , long idT2 ) {
36- var valueReader = retryCtx .supplyResult (
37- session -> QueryReader .readFrom (session .createQuery (
38- """
39- DECLARE $t1 AS Int64;
40- DECLARE $t2 AS Int64;
41-
42- -- Обновляем счетчики связей
43- UPDATE issues
44- SET link_count = COALESCE(link_count, 0) + 1
45- WHERE id IN ($t1, $t2);
46-
47- -- Добавляем записи о связях между тикетами
48- INSERT INTO links (source, destination)
49- VALUES ($t1, $t2), ($t2, $t1);
50-
51- -- Читаем обновленные данные
52- SELECT id, link_count FROM issues
53- WHERE id IN ($t1, $t2)
54- """ ,
55- TxMode .SERIALIZABLE_RW ,
56- Params .of ("$t1" , PrimitiveValue .newInt64 (idT1 ), "$t2" , PrimitiveValue .newInt64 (idT2 ))
57- ))
58- ).join ().getValue ();
34+ var valueReader = queryServiceHelper .executeQuery (
35+ """
36+ DECLARE $t1 AS Int64;
37+ DECLARE $t2 AS Int64;
38+
39+ -- Обновляем счетчики связей
40+ UPDATE issues
41+ SET link_count = COALESCE(link_count, 0) + 1
42+ WHERE id IN ($t1, $t2);
43+
44+ -- Добавляем записи о связях между тикетами
45+ INSERT INTO links (source, destination)
46+ VALUES ($t1, $t2), ($t2, $t1);
47+
48+ -- Читаем обновленные данные
49+ SELECT id, link_count FROM issues
50+ WHERE id IN ($t1, $t2)
51+ """ ,
52+ TxMode .SERIALIZABLE_RW ,
53+ Params .of ("$t1" , PrimitiveValue .newInt64 (idT1 ), "$t2" , PrimitiveValue .newInt64 (idT2 ))
54+ );
5955
6056 return getLinkTicketPairs (valueReader );
6157 }
@@ -71,13 +67,11 @@ WHERE id IN ($t1, $t2)
7167 * для определения стоит ли продолжать транзакцию и какой запрос выполнить следующим.
7268 */
7369 public List <IssueLinkCount > linkTicketsInteractive (long idT1 , long idT2 ) {
74- return retryCtx .supplyResult (
75- session -> {
76- // Транзакция будет изменять данные, поэтому используем режим SERIALIZABLE_RW
77- var tx = session .createNewTransaction (TxMode .SERIALIZABLE_RW );
78-
70+ return queryServiceHelper .executeInTx (
71+ TxMode .SERIALIZABLE_RW , // Транзакция будет изменять данные, поэтому используем режим SERIALIZABLE_RW
72+ tx -> {
7973 // Обновляем счетчики связей
80- tx .createQuery ("""
74+ tx .executeQuery ("""
8175 DECLARE $t1 AS Int64;
8276 DECLARE $t2 AS Int64;
8377
@@ -86,36 +80,33 @@ public List<IssueLinkCount> linkTicketsInteractive(long idT1, long idT2) {
8680 WHERE id IN ($t1, $t2);
8781 """ ,
8882 Params .of ("$t1" , PrimitiveValue .newInt64 (idT1 ), "$t2" , PrimitiveValue .newInt64 (idT2 ))
89- ). execute (). join (). getStatus (). expectSuccess () ;
83+ );
9084
9185 // Добавляем записи о связях между тикетами
92- tx .createQuery ("""
86+ tx .executeQuery ("""
9387 DECLARE $t1 AS Int64;
9488 DECLARE $t2 AS Int64;
9589
9690 INSERT INTO links (source, destination)
9791 VALUES ($t1, $t2), ($t2, $t1);
9892 """ ,
9993 Params .of ("$t1" , PrimitiveValue .newInt64 (idT1 ), "$t2" , PrimitiveValue .newInt64 (idT2 ))
100- ). execute (). join (). getStatus (). expectSuccess () ;
94+ );
10195
10296 // Читаем обновленные данные и фиксируем транзакцию
103- var valueReader = QueryReader .readFrom (
104- tx .createQueryWithCommit ("""
105- DECLARE $t1 AS Int64;
106- DECLARE $t2 AS Int64;
107-
108- SELECT id, link_count FROM issues
109- WHERE id IN ($t1, $t2)
110- """ ,
111- Params .of ("$t1" , PrimitiveValue .newInt64 (idT1 ), "$t2" , PrimitiveValue .newInt64 (idT2 )))
112- ).join ().getValue ();
113-
114- var linkTicketPairs = getLinkTicketPairs (valueReader );
115-
116- return CompletableFuture .completedFuture (Result .success (linkTicketPairs ));
97+ var valueReader = tx .executeQueryWithCommit ("""
98+ DECLARE $t1 AS Int64;
99+ DECLARE $t2 AS Int64;
100+
101+ SELECT id, link_count FROM issues
102+ WHERE id IN ($t1, $t2)
103+ """ ,
104+ Params .of ("$t1" , PrimitiveValue .newInt64 (idT1 ), "$t2" , PrimitiveValue .newInt64 (idT2 ))
105+ );
106+
107+ return getLinkTicketPairs (valueReader );
117108 }
118- ). join (). getValue () ;
109+ );
119110 }
120111
121112 /**
@@ -128,25 +119,22 @@ public void addIssue(String title, String author) {
128119 var id = ThreadLocalRandom .current ().nextLong ();
129120 var now = Instant .now ();
130121
131- retryCtx .supplyResult (
132- session -> session .createQuery (
133- """
134- DECLARE $id AS Int64;
135- DECLARE $title AS Text;
136- DECLARE $created_at AS Timestamp;
137- DECLARE $author AS Text;
138- UPSERT INTO issues (id, title, created_at, author)
139- VALUES ($id, $title, $created_at, $author);
140- """ ,
141- TxMode .SERIALIZABLE_RW ,
142- Params .of (
143- "$id" , PrimitiveValue .newInt64 (id ),
144- "$title" , PrimitiveValue .newText (title ),
145- "$created_at" , PrimitiveValue .newTimestamp (now ),
146- "$author" , PrimitiveValue .newText (author )
147- )
148- ).execute ()
149- ).join ().getStatus ().expectSuccess ("Failed upsert title" );
122+ queryServiceHelper .executeQuery ("""
123+ DECLARE $id AS Int64;
124+ DECLARE $title AS Text;
125+ DECLARE $created_at AS Timestamp;
126+ DECLARE $author AS Text;
127+ UPSERT INTO issues (id, title, created_at, author)
128+ VALUES ($id, $title, $created_at, $author);
129+ """ ,
130+ TxMode .SERIALIZABLE_RW ,
131+ Params .of (
132+ "$id" , PrimitiveValue .newInt64 (id ),
133+ "$title" , PrimitiveValue .newText (title ),
134+ "$created_at" , PrimitiveValue .newTimestamp (now ),
135+ "$author" , PrimitiveValue .newText (author )
136+ )
137+ );
150138 }
151139
152140 /**
@@ -156,11 +144,11 @@ UPSERT INTO issues (id, title, created_at, author)
156144 */
157145 public List <Issue > findAll () {
158146 var titles = new ArrayList <Issue >();
159- var resultSet = retryCtx . supplyResult (
160- session -> QueryReader . readFrom (
161- session . createQuery ( "SELECT id, title, created_at, author, COALESCE(link_count, 0) FROM issues;" , TxMode .SNAPSHOT_RO )
162- )
163- ). join (). getValue () ;
147+ var resultSet = queryServiceHelper . executeQuery (
148+ "SELECT id, title, created_at, author, COALESCE(link_count, 0) FROM issues;" ,
149+ TxMode .SNAPSHOT_RO ,
150+ Params . empty ( )
151+ );
164152
165153 var resultSetReader = resultSet .getResultSet (0 );
166154
0 commit comments