Skip to content

Commit bc4e087

Browse files
Merge pull request #8 from ydb-platform/lesson-8
feat: lesson 8
2 parents 157239b + f11e149 commit bc4e087

File tree

10 files changed

+523
-62
lines changed

10 files changed

+523
-62
lines changed

java/lesson-8/pom.xml

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
2+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3+
<modelVersion>4.0.0</modelVersion>
4+
<groupId>tech.ydb.app</groupId>
5+
<artifactId>lesson-8</artifactId>
6+
<version>1.0-SNAPSHOT</version>
7+
<name>Lesson-8</name>
8+
9+
<properties>
10+
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
11+
<maven.compiler.source>21</maven.compiler.source>
12+
<maven.compiler.target>21</maven.compiler.target>
13+
</properties>
14+
15+
<dependencyManagement>
16+
<dependencies>
17+
<dependency>
18+
<groupId>tech.ydb</groupId>
19+
<artifactId>ydb-sdk-bom</artifactId>
20+
<version>2.3.8</version>
21+
<type>pom</type>
22+
<scope>import</scope>
23+
</dependency>
24+
</dependencies>
25+
</dependencyManagement>
26+
27+
<dependencies>
28+
<dependency>
29+
<groupId>tech.ydb</groupId>
30+
<artifactId>ydb-sdk-query</artifactId>
31+
</dependency>
32+
<dependency>
33+
<groupId>tech.ydb</groupId>
34+
<artifactId>ydb-sdk-topic</artifactId>
35+
</dependency>
36+
</dependencies>
37+
38+
<build>
39+
<pluginManagement>
40+
<plugins>
41+
<plugin>
42+
<groupId>org.apache.maven.plugins</groupId>
43+
<artifactId>maven-compiler-plugin</artifactId>
44+
<version>3.12.1</version>
45+
<configuration>
46+
<source>21</source>
47+
</configuration>
48+
<executions>
49+
<execution>
50+
<id>attach-javadocs</id>
51+
<goals>
52+
<goal>jar</goal>
53+
</goals>
54+
</execution>
55+
</executions>
56+
</plugin>
57+
</plugins>
58+
</pluginManagement>
59+
</build>
60+
</project>
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package tech.ydb.app;
2+
3+
import java.util.UUID;
4+
import tech.ydb.core.grpc.GrpcTransport;
5+
import tech.ydb.query.QueryClient;
6+
import tech.ydb.query.tools.SessionRetryContext;
7+
import tech.ydb.topic.TopicClient;
8+
9+
/**
10+
* @author Kirill Kurdyukov
11+
*/
12+
public class Application {
13+
14+
private static final String CONNECTION_STRING = "grpc://localhost:2136/local";
15+
16+
public static void main(String[] args) throws InterruptedException {
17+
try (GrpcTransport grpcTransport = GrpcTransport.forConnectionString(CONNECTION_STRING).build();
18+
QueryClient queryClient = QueryClient.newClient(grpcTransport).build();
19+
TopicClient topicClient = TopicClient.newClient(grpcTransport).build()) {
20+
var retryCtx = SessionRetryContext.create(queryClient).build();
21+
22+
var schemaYdbRepository = new SchemaYdbRepository(retryCtx);
23+
var issueYdbRepository = new IssueYdbRepository(retryCtx);
24+
25+
schemaYdbRepository.createSchema();
26+
27+
var first = issueYdbRepository.addIssue("Ticket 1", "Author 1");
28+
var second = issueYdbRepository.addIssue("Ticket 2", "Author 2");
29+
issueYdbRepository.updateStatus(first.id(), "future");
30+
issueYdbRepository.delete(second.id());
31+
issueYdbRepository.delete(UUID.randomUUID());
32+
33+
var readerWorker = new ReaderChangefeedWorker(topicClient);
34+
readerWorker.run();
35+
Thread.sleep(10_000);
36+
37+
readerWorker.shutdown();
38+
39+
System.out.println("Print all tickets: ");
40+
for (var ticket : issueYdbRepository.findAll()) {
41+
printIssue(ticket);
42+
}
43+
44+
schemaYdbRepository.dropSchema();
45+
}
46+
}
47+
48+
private static void printIssue(Issue ticket) {
49+
System.out.println("Ticket: {id: " + ticket.id() + ", title: " + ticket.title() + ", timestamp: "
50+
+ ticket.now() + ", author: " + ticket.author() + ", link_count: "
51+
+ ticket.linkCounts() + ", status: " + ticket.status() + "}");
52+
}
53+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
package tech.ydb.app;
2+
3+
import java.time.Instant;
4+
import java.util.UUID;
5+
6+
/**
7+
* @author Kirill Kurdyukov
8+
*/
9+
public record Issue(UUID id, String title, Instant now, String author, long linkCounts, String status) {
10+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package tech.ydb.app;
2+
3+
import java.util.UUID;
4+
5+
/**
6+
* @author Kirill Kurdyukov
7+
*/
8+
public record IssueLinkCount(UUID id, long linkCount) {
9+
}
Lines changed: 213 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,213 @@
1+
package tech.ydb.app;
2+
3+
import java.time.Instant;
4+
import java.util.ArrayList;
5+
import java.util.List;
6+
import java.util.UUID;
7+
import java.util.concurrent.CompletableFuture;
8+
import tech.ydb.common.transaction.TxMode;
9+
import tech.ydb.core.Result;
10+
import tech.ydb.query.tools.QueryReader;
11+
import tech.ydb.query.tools.SessionRetryContext;
12+
import tech.ydb.table.query.Params;
13+
import tech.ydb.table.values.PrimitiveValue;
14+
15+
/**
16+
* @author Kirill Kurdyukov
17+
*/
18+
public class IssueYdbRepository {
19+
private final SessionRetryContext retryCtx;
20+
21+
public IssueYdbRepository(SessionRetryContext retryCtx) {
22+
this.retryCtx = retryCtx;
23+
}
24+
25+
public void updateStatus(UUID id, String status) {
26+
retryCtx.supplyResult(
27+
session -> session.createQuery(
28+
"""
29+
DECLARE $id AS UUID;
30+
DECLARE $new_status AS Text;
31+
32+
UPDATE issues SET status = $new_status WHERE id = $id;
33+
""",
34+
TxMode.SERIALIZABLE_RW,
35+
Params.of("$id", PrimitiveValue.newUuid(id),
36+
"$new_status", PrimitiveValue.newText(status))
37+
).execute()
38+
).join().getStatus().expectSuccess();
39+
}
40+
41+
public List<IssueLinkCount> linkTicketsNoInteractive(UUID idT1, UUID idT2) {
42+
var valueReader = retryCtx.supplyResult(
43+
session -> QueryReader.readFrom(session.createQuery(
44+
"""
45+
DECLARE $t1 AS UUID;
46+
DECLARE $t2 AS UUID;
47+
48+
UPDATE issues
49+
SET link_count = COALESCE(link_count, 0) + 1
50+
WHERE id IN ($t1, $t2);
51+
52+
INSERT INTO links (source, destination)
53+
VALUES ($t1, $t2), ($t2, $t1);
54+
55+
SELECT id, link_count FROM issues
56+
WHERE id IN ($t1, $t2)
57+
""",
58+
TxMode.SERIALIZABLE_RW,
59+
Params.of("$t1", PrimitiveValue.newUuid(idT1), "$t2", PrimitiveValue.newUuid(idT2))
60+
))
61+
).join().getValue();
62+
63+
return getLinkTicketPairs(valueReader);
64+
}
65+
66+
public List<IssueLinkCount> linkTicketsInteractive(UUID idT1, UUID idT2) {
67+
return retryCtx.supplyResult(
68+
session -> {
69+
var tx = session.createNewTransaction(TxMode.SERIALIZABLE_RW);
70+
71+
tx.createQuery("""
72+
DECLARE $t1 AS UUID;
73+
DECLARE $t2 AS UUID;
74+
75+
UPDATE issues
76+
SET link_count = COALESCE(link_count, 0) + 1
77+
WHERE id IN ($t1, $t2);
78+
""",
79+
Params.of("$t1", PrimitiveValue.newUuid(idT1), "$t2", PrimitiveValue.newUuid(idT2))
80+
).execute().join().getStatus().expectSuccess();
81+
82+
tx.createQuery("""
83+
DECLARE $t1 AS UUID;
84+
DECLARE $t2 AS UUID;
85+
86+
INSERT INTO links (source, destination)
87+
VALUES ($t1, $t2), ($t2, $t1);
88+
""",
89+
Params.of("$t1", PrimitiveValue.newUuid(idT1), "$t2", PrimitiveValue.newUuid(idT2))
90+
).execute().join().getStatus().expectSuccess();
91+
92+
var valueReader = QueryReader.readFrom(
93+
tx.createQueryWithCommit("""
94+
DECLARE $t1 AS UUID;
95+
DECLARE $t2 AS UUID;
96+
97+
SELECT id, link_count FROM issues
98+
WHERE id IN ($t1, $t2)
99+
""",
100+
Params.of("$t1", PrimitiveValue.newUuid(idT1), "$t2", PrimitiveValue.newUuid(idT2)))
101+
).join().getValue();
102+
103+
var linkTicketPairs = getLinkTicketPairs(valueReader);
104+
105+
return CompletableFuture.completedFuture(Result.success(linkTicketPairs));
106+
}
107+
).join().getValue();
108+
}
109+
110+
public Issue addIssue(String title, String author) {
111+
var id = UUID.randomUUID();
112+
var now = Instant.now();
113+
114+
retryCtx.supplyResult(
115+
session -> session.createQuery(
116+
"""
117+
DECLARE $id AS UUID;
118+
DECLARE $title AS Text;
119+
DECLARE $created_at AS Timestamp;
120+
DECLARE $author AS Text;
121+
UPSERT INTO issues (id, title, created_at, author)
122+
VALUES ($id, $title, $created_at, $author);
123+
""",
124+
TxMode.SERIALIZABLE_RW,
125+
Params.of(
126+
"$id", PrimitiveValue.newUuid(id),
127+
"$title", PrimitiveValue.newText(title),
128+
"$created_at", PrimitiveValue.newTimestamp(now),
129+
"$author", PrimitiveValue.newText(author)
130+
)
131+
).execute()
132+
).join().getStatus().expectSuccess("Failed upsert issue");
133+
134+
return new Issue(id, title, now, author, 0, null);
135+
}
136+
137+
public void delete(UUID id) {
138+
retryCtx.supplyResult(
139+
session -> session.createQuery(
140+
"""
141+
DECLARE $id AS UUID;
142+
DELETE FROM issues WHERE id=$id;
143+
""",
144+
TxMode.SERIALIZABLE_RW,
145+
Params.of(
146+
"$id", PrimitiveValue.newUuid(id)
147+
)
148+
).execute()
149+
).join().getStatus().expectSuccess("Failed delete issue");
150+
}
151+
152+
public List<Issue> findAll() {
153+
var titles = new ArrayList<Issue>();
154+
var resultSet = retryCtx.supplyResult(
155+
session -> QueryReader.readFrom(
156+
session.createQuery("SELECT id, title, created_at, author, COALESCE(link_count, 0), status FROM issues;", TxMode.SNAPSHOT_RO)
157+
)
158+
).join().getValue();
159+
160+
var resultSetReader = resultSet.getResultSet(0);
161+
162+
while (resultSetReader.next()) {
163+
titles.add(new Issue(
164+
resultSetReader.getColumn(0).getUuid(),
165+
resultSetReader.getColumn(1).getText(),
166+
resultSetReader.getColumn(2).getTimestamp(),
167+
resultSetReader.getColumn(3).getText(),
168+
resultSetReader.getColumn(4).getInt64(),
169+
resultSetReader.getColumn(5).getText()
170+
));
171+
}
172+
173+
return titles;
174+
}
175+
176+
public Issue findByAuthor(String author) {
177+
var resultSet = retryCtx.supplyResult(
178+
session -> QueryReader.readFrom(
179+
session.createQuery(
180+
"""
181+
DECLARE $author AS Text;
182+
SELECT id, title, created_at, author, COALESCE(link_count, 0), status FROM issues VIEW authorIndex
183+
WHERE author = $author;
184+
""",
185+
TxMode.SNAPSHOT_RO,
186+
Params.of("$author", PrimitiveValue.newText(author))
187+
)
188+
)
189+
).join().getValue();
190+
191+
var resultSetReader = resultSet.getResultSet(0);
192+
resultSetReader.next();
193+
194+
return new Issue(
195+
resultSetReader.getColumn(0).getUuid(),
196+
resultSetReader.getColumn(1).getText(),
197+
resultSetReader.getColumn(2).getTimestamp(),
198+
resultSetReader.getColumn(3).getText(),
199+
resultSetReader.getColumn(4).getInt64(),
200+
resultSetReader.getColumn(5).getText()
201+
);
202+
}
203+
204+
private static List<IssueLinkCount> getLinkTicketPairs(QueryReader valueReader) {
205+
var linkTicketPairs = new ArrayList<IssueLinkCount>();
206+
var resultSet = valueReader.getResultSet(0);
207+
208+
while (resultSet.next()) {
209+
linkTicketPairs.add(new IssueLinkCount(resultSet.getColumn(0).getUuid(), resultSet.getColumn(1).getInt64()));
210+
}
211+
return linkTicketPairs;
212+
}
213+
}

java/lesson-9/src/main/java/tech/ydb/app/ReaderWorker.java renamed to java/lesson-8/src/main/java/tech/ydb/app/ReaderChangefeedWorker.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,17 +12,19 @@
1212
/**
1313
* @author Kirill Kurdyukov
1414
*/
15-
public class ReaderWorker {
15+
public class ReaderChangefeedWorker {
1616
private final SyncReader reader;
1717
private final AtomicBoolean stoppedProcess = new AtomicBoolean();
1818

1919
private volatile CompletableFuture<Void> readerJob;
2020

21-
public ReaderWorker(TopicClient topicClient) {
22-
this.reader = topicClient.createSyncReader(
21+
public ReaderChangefeedWorker(TopicClient topicClient) {
22+
this.reader = topicClient.createSyncReader(
2323
ReaderSettings.newBuilder()
24-
.setConsumerName("email")
25-
.setTopics(List.of(TopicReadSettings.newBuilder().setPath("task_status").build()))
24+
.setConsumerName("test")
25+
.setTopics(
26+
List.of(TopicReadSettings.newBuilder().setPath("issues/updates").build())
27+
)
2628
.build()
2729
);
2830

0 commit comments

Comments
 (0)