From d32731500ce62f8f971f008d703e99c79530e56f Mon Sep 17 00:00:00 2001 From: Nikolai Amelichev Date: Fri, 19 Sep 2025 22:41:31 +0200 Subject: [PATCH] #77: Reimplement projections via listeners instead of TransactionLocal + Allow custom tables for projections --- .../InMemoryRepositoryTransaction.java | 6 +- .../test/inmemory/InMemoryTable.java | 24 +- .../yoj/repository/test/RepositoryTest.java | 54 ++++- .../repository/test/entity/TestEntities.java | 2 + .../repository/test/sample/model/Book.java | 69 +++--- .../sample/model/BookProjectionsOldStyle.java | 68 ++++++ .../ydb/YdbRepositoryTransaction.java | 6 +- .../yoj/repository/ydb/table/YdbTable.java | 36 ++- .../tech/ydb/yoj/repository/db/Entity.java | 7 +- .../tech/ydb/yoj/repository/db/TxOptions.java | 16 ++ .../repository/db/cache/TransactionLocal.java | 10 +- .../db/listener/EntityEventListener.java | 62 +++++ .../RepositoryTransactionListener.java | 52 ++++ .../db/projection/EntityWithProjections.java | 24 ++ .../repository/db/projection/Projection.java | 54 +++++ .../db/projection/ProjectionCache.java | 3 +- .../db/projection/ProjectionCollection.java | 227 ++++++++++++++++++ .../db/projection/ProjectionListener.java | 154 ++++++++++++ .../db/projection/RoProjectionCache.java | 32 --- .../db/projection/RwProjectionCache.java | 139 ----------- 20 files changed, 795 insertions(+), 250 deletions(-) create mode 100644 repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/BookProjectionsOldStyle.java create mode 100644 repository/src/main/java/tech/ydb/yoj/repository/db/listener/EntityEventListener.java create mode 100644 repository/src/main/java/tech/ydb/yoj/repository/db/listener/RepositoryTransactionListener.java create mode 100644 repository/src/main/java/tech/ydb/yoj/repository/db/projection/EntityWithProjections.java create mode 100644 repository/src/main/java/tech/ydb/yoj/repository/db/projection/Projection.java create mode 100644 repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCollection.java create mode 100644 repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionListener.java delete mode 100644 repository/src/main/java/tech/ydb/yoj/repository/db/projection/RoProjectionCache.java delete mode 100644 repository/src/main/java/tech/ydb/yoj/repository/db/projection/RwProjectionCache.java diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java index 8aa48337..41d06a15 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryRepositoryTransaction.java @@ -82,11 +82,12 @@ public void commit() { throw new IllegalStateException("Transaction was invalidated. Commit isn't possible"); } endTransaction("commit()", this::commitImpl); + options.getRepositoryTransactionListeners().forEach(l -> l.onCommit(this)); } private void commitImpl() { try { - transactionLocal.projectionCache().applyProjectionChanges(this); + options.getRepositoryTransactionListeners().forEach(l -> l.onBeforeFlushWrites(this)); for (Runnable pendingWrite : pendingWrites) { pendingWrite.run(); @@ -102,6 +103,7 @@ private void commitImpl() { @Override public void rollback() { endTransaction("rollback()", this::rollbackImpl); + options.getRepositoryTransactionListeners().forEach(l -> l.onRollback(this)); } private void rollbackImpl() { @@ -149,7 +151,7 @@ final > void doInWriteTransaction( }); if (options.isImmediateWrites()) { query.run(); - transactionLocal.projectionCache().applyProjectionChanges(this); + options.getRepositoryTransactionListeners().forEach(l -> l.onImmediateWrite(this)); } else { pendingWrites.add(query); } diff --git a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java index 3953a331..b8198ca6 100644 --- a/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java +++ b/repository-inmemory/src/main/java/tech/ydb/yoj/repository/test/inmemory/InMemoryTable.java @@ -93,7 +93,8 @@ public void update(Entity.Id id, Changeset changeset) { transaction.getWatcher().markRowRead(tableDescriptor, id); transaction.doInWriteTransaction("update(" + id + ", " + changeset + ")", tableDescriptor, shard -> shard.update(id, patch)); - transaction.getTransactionLocal().firstLevelCache(tableDescriptor).remove(id); + getFirstLevelCache().remove(id); + transaction.getOptions().getEntityEventListeners().forEach(l -> l.onUpdate(tableDescriptor, id, changeset)); } @Override @@ -178,7 +179,7 @@ public T find(Entity.Id id) { if (id.isPartial()) { throw new IllegalArgumentException("Cannot use partial id in find method"); } - return transaction.getTransactionLocal().firstLevelCache(tableDescriptor).get(id, __ -> { + return getFirstLevelCache().get(id, __ -> { markKeyRead(id); T entity = transaction.doInTransaction("find(" + id + ")", tableDescriptor, shard -> shard.find(id)); return postLoad(entity); @@ -196,7 +197,7 @@ public V find(Class viewType, Entity.Id id) { throw new IllegalArgumentException("Cannot use partial id in find method"); } - FirstLevelCache cache = transaction.getTransactionLocal().firstLevelCache(tableDescriptor); + FirstLevelCache cache = getFirstLevelCache(); if (cache.containsKey(id)) { return cache.peek(id) .map(entity -> toView(viewType, schema, entity)) @@ -428,8 +429,8 @@ public T insert(T tt) { T t = tt.preSave(); transaction.getWatcher().markRowRead(tableDescriptor, t.getId()); transaction.doInWriteTransaction("insert(" + t + ")", tableDescriptor, shard -> shard.insert(t)); - transaction.getTransactionLocal().firstLevelCache(tableDescriptor).put(t); - transaction.getTransactionLocal().projectionCache().save(t); + getFirstLevelCache().put(t); + transaction.getOptions().getEntityEventListeners().forEach(l -> l.onSave(tableDescriptor, t)); return t; } @@ -437,16 +438,16 @@ public T insert(T tt) { public T save(T tt) { T t = tt.preSave(); transaction.doInWriteTransaction("save(" + t + ")", tableDescriptor, shard -> shard.save(t)); - transaction.getTransactionLocal().firstLevelCache(tableDescriptor).put(t); - transaction.getTransactionLocal().projectionCache().save(t); + getFirstLevelCache().put(t); + transaction.getOptions().getEntityEventListeners().forEach(l -> l.onSave(tableDescriptor, t)); return t; } @Override public void delete(Entity.Id id) { transaction.doInWriteTransaction("delete(" + id + ")", tableDescriptor, shard -> shard.delete(id)); - transaction.getTransactionLocal().firstLevelCache(tableDescriptor).putEmpty(id); - transaction.getTransactionLocal().projectionCache().delete(id); + getFirstLevelCache().putEmpty(id); + transaction.getOptions().getEntityEventListeners().forEach(l -> l.onDelete(tableDescriptor, id)); } @Override @@ -575,8 +576,9 @@ public T postLoad(T entity) { return null; } T t = entity.postLoad(); - transaction.getTransactionLocal().firstLevelCache(tableDescriptor).put(t); - transaction.getTransactionLocal().projectionCache().load(t); + getFirstLevelCache().put(t); + transaction.getOptions().getEntityEventListeners().forEach(l -> l.onLoad(tableDescriptor, t)); + return t; } diff --git a/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java b/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java index 3f552e8a..b3c98253 100644 --- a/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java +++ b/repository-test/src/main/java/tech/ydb/yoj/repository/test/RepositoryTest.java @@ -46,6 +46,7 @@ import tech.ydb.yoj.repository.test.sample.model.BadlyWrappedEntity; import tech.ydb.yoj.repository.test.sample.model.BadlyWrappedEntity.BadStringValueWrapper; import tech.ydb.yoj.repository.test.sample.model.Book; +import tech.ydb.yoj.repository.test.sample.model.BookProjectionsOldStyle; import tech.ydb.yoj.repository.test.sample.model.Bubble; import tech.ydb.yoj.repository.test.sample.model.BytePkEntity; import tech.ydb.yoj.repository.test.sample.model.Complex; @@ -686,7 +687,7 @@ public void viewStreamAll() { } db.tx(() -> db.table(Book.class) .streamAll(Book.TitleViewId.class, 100) - .forEach(titleView -> assertThat(titleView.getTitle()).isNotBlank())); + .forEach(titleView -> assertThat(titleView.title()).isNotBlank())); assertThat(db.tx(() -> db.table(Book.class).streamAll(Book.TitleViewId.class, 2) .limit(1).collect(toList()))) @@ -2188,6 +2189,57 @@ public void projections() { .isEqualTo(0L); } + @Test + public void projectionsOldStyle() { + db.tx(() -> { + db.table(BookProjectionsOldStyle.class).save(new BookProjectionsOldStyle(new BookProjectionsOldStyle.Id("1"), 1, "title1", List.of("author1"))); + db.table(BookProjectionsOldStyle.class).save(new BookProjectionsOldStyle(new BookProjectionsOldStyle.Id("2"), 1, "title2", List.of("author2"))); + db.table(BookProjectionsOldStyle.class).save(new BookProjectionsOldStyle(new BookProjectionsOldStyle.Id("3"), 1, null, List.of("author1", "author2"))); + db.table(BookProjectionsOldStyle.class).save(new BookProjectionsOldStyle(new BookProjectionsOldStyle.Id("4"), 1, "title1", List.of())); + }); + + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByTitle.class).countAll())) + .isEqualTo(3L); + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByTitle.class).find(Range.create(new BookProjectionsOldStyle.ByTitle.Id("title1", null))))) + .hasSize(2); + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByTitle.class).find(Range.create(new BookProjectionsOldStyle.ByTitle.Id("title2", null))))) + .hasSize(1); + + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByAuthor.class).countAll())) + .isEqualTo(4L); + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByAuthor.class).find(Range.create(new BookProjectionsOldStyle.ByAuthor.Id("author1", null))))) + .hasSize(2); + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByAuthor.class).find(Range.create(new BookProjectionsOldStyle.ByAuthor.Id("author2", null))))) + .hasSize(2); + + db.tx(() -> { + db.table(BookProjectionsOldStyle.class).modifyIfPresent(new BookProjectionsOldStyle.Id("1"), b -> b.updateTitle("title2")); + db.table(BookProjectionsOldStyle.class).modifyIfPresent(new BookProjectionsOldStyle.Id("2"), b -> b.updateTitle(null)); + db.table(BookProjectionsOldStyle.class).modifyIfPresent(new BookProjectionsOldStyle.Id("3"), b -> b.withAuthors(List.of("author2"))); + db.table(BookProjectionsOldStyle.class).modifyIfPresent(new BookProjectionsOldStyle.Id("4"), b -> b.withAuthors(List.of("author1", "author2"))); + }); + + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByTitle.class).countAll())) + .isEqualTo(2L); + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByTitle.class).find(Range.create(new BookProjectionsOldStyle.ByTitle.Id("title1", null))))) + .hasSize(1); + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByTitle.class).find(Range.create(new BookProjectionsOldStyle.ByTitle.Id("title2", null))))) + .hasSize(1); + + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByAuthor.class).countAll())) + .isEqualTo(5L); + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByAuthor.class).find(Range.create(new BookProjectionsOldStyle.ByAuthor.Id("author1", null))))) + .hasSize(2); + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByAuthor.class).find(Range.create(new BookProjectionsOldStyle.ByAuthor.Id("author2", null))))) + .hasSize(3); + + db.tx(() -> db.table(BookProjectionsOldStyle.class).findAll().forEach(b -> db.table(BookProjectionsOldStyle.class).delete(b.getId()))); + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByTitle.class).countAll())) + .isEqualTo(0L); + assertThat(db.tx(() -> db.table(BookProjectionsOldStyle.ByAuthor.class).countAll())) + .isEqualTo(0L); + } + /** * {@link #parallelTx(boolean, boolean, Consumer)} make two tx. * In first - read from table (see consumers - findAll, findId, findRange) diff --git a/repository-test/src/main/java/tech/ydb/yoj/repository/test/entity/TestEntities.java b/repository-test/src/main/java/tech/ydb/yoj/repository/test/entity/TestEntities.java index e4a75502..51615028 100644 --- a/repository-test/src/main/java/tech/ydb/yoj/repository/test/entity/TestEntities.java +++ b/repository-test/src/main/java/tech/ydb/yoj/repository/test/entity/TestEntities.java @@ -6,6 +6,7 @@ import tech.ydb.yoj.repository.db.TableDescriptor; import tech.ydb.yoj.repository.test.sample.model.BadlyWrappedEntity; import tech.ydb.yoj.repository.test.sample.model.Book; +import tech.ydb.yoj.repository.test.sample.model.BookProjectionsOldStyle; import tech.ydb.yoj.repository.test.sample.model.Bubble; import tech.ydb.yoj.repository.test.sample.model.BytePkEntity; import tech.ydb.yoj.repository.test.sample.model.Complex; @@ -47,6 +48,7 @@ private TestEntities() { public static final List> ALL = List.of( Project.class, UniqueProject.class, TypeFreak.class, Complex.class, Referring.class, Primitive.class, Book.class, Book.ByAuthor.class, Book.ByTitle.class, + BookProjectionsOldStyle.class, BookProjectionsOldStyle.ByAuthor.class, BookProjectionsOldStyle.ByTitle.class, LogEntry.class, Team.class, BytePkEntity.class, EntityWithValidation.class, diff --git a/repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/Book.java b/repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/Book.java index d265da34..e1d64487 100644 --- a/repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/Book.java +++ b/repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/Book.java @@ -1,65 +1,58 @@ package tech.ydb.yoj.repository.test.sample.model; -import lombok.Value; +import lombok.NonNull; import lombok.With; import tech.ydb.yoj.repository.db.Entity; +import tech.ydb.yoj.repository.db.RecordEntity; import tech.ydb.yoj.repository.db.Table; +import tech.ydb.yoj.repository.db.projection.EntityWithProjections; +import tech.ydb.yoj.repository.db.projection.ProjectionCollection; import java.util.List; -import java.util.Optional; -import java.util.stream.Collectors; -import java.util.stream.Stream; -@Value @With -public class Book implements Entity { - Id id; - int version; - String title; - List authors; - +public record Book( + Id id, + int version, + String title, + List authors +) implements RecordEntity, EntityWithProjections { public Book updateTitle(String title) { return withTitle(title).withVersion(version + 1); } @Override - public List> createProjections() { - return Stream.concat( - Optional.ofNullable(title).map(t -> new ByTitle(new ByTitle.Id(t, id))).stream(), - authors.stream().map(a -> new ByAuthor(new ByAuthor.Id(a, id))) - ).collect(Collectors.toList()); + public ProjectionCollection collectProjections() { + return ProjectionCollection.builder() + .addEntityIfNotNull(title, t -> new ByTitle(new ByTitle.Id(t, id))) + .addAllEntities(authors.stream().map(a -> new ByAuthor(new ByAuthor.Id(a, id)))) + .build(); } - @Value - public static class Id implements Entity.Id { - String id; + public record Id(String id) implements Entity.Id { } - @Value - public static class ByTitle implements Entity { - Id id; + public record ByTitle(Id id) implements RecordEntity { + public record Id( + @NonNull + String title, - @Value - public static class Id implements Entity.Id { - String title; - Book.Id id; + Book.Id id + ) implements Entity.Id { } } - @Value - public static class ByAuthor implements Entity { - Id id; - - @Value - public static class Id implements Entity.Id { - String author; - Book.Id id; + public record ByAuthor(Id id) implements RecordEntity { + public record Id( + String author, + Book.Id id + ) implements Entity.Id { } } - @Value - public static class TitleViewId implements Table.ViewId { - Id id; - String title; + public record TitleViewId( + Id id, + String title + ) implements Table.RecordViewId { } } diff --git a/repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/BookProjectionsOldStyle.java b/repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/BookProjectionsOldStyle.java new file mode 100644 index 00000000..223c0f0f --- /dev/null +++ b/repository-test/src/main/java/tech/ydb/yoj/repository/test/sample/model/BookProjectionsOldStyle.java @@ -0,0 +1,68 @@ +package tech.ydb.yoj.repository.test.sample.model; + +import lombok.NonNull; +import lombok.Value; +import lombok.With; +import tech.ydb.yoj.repository.db.Entity; +import tech.ydb.yoj.repository.db.Table; + +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Value +@With +public class BookProjectionsOldStyle implements Entity { + Id id; + int version; + String title; + List authors; + + public BookProjectionsOldStyle updateTitle(String title) { + return withTitle(title).withVersion(version + 1); + } + + @Override + public List> createProjections() { + return Stream.concat( + Optional.ofNullable(title).map(t -> new ByTitle(new ByTitle.Id(t, id))).stream(), + authors.stream().map(a -> new ByAuthor(new ByAuthor.Id(a, id))) + ).collect(Collectors.toList()); + } + + @Value + public static class Id implements Entity.Id { + String id; + } + + @Value + public static class ByTitle implements Entity { + Id id; + + @Value + public static class Id implements Entity.Id { + @NonNull + String title; + + BookProjectionsOldStyle.Id id; + } + } + + @Value + public static class ByAuthor implements Entity { + Id id; + + @Value + public static class Id implements Entity.Id { + String author; + BookProjectionsOldStyle.Id id; + } + } + + @Value + public static class TitleViewId implements Table.ViewId { + Id id; + String title; + } +} diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java index b3e86561..01c4b6b0 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/YdbRepositoryTransaction.java @@ -155,6 +155,7 @@ public void commit() { throw t; } endTransaction("commit", this::doCommit); + options.getRepositoryTransactionListeners().forEach(l -> l.onCommit(this)); } @Override @@ -169,6 +170,7 @@ public void rollback() { log.info("Failed to rollback the transaction", t); } }); + options.getRepositoryTransactionListeners().forEach(l -> l.onRollback(this)); } private void doCommit() { @@ -282,7 +284,7 @@ private static Params getSdkParams(Statement statement, PARA } private void flushPendingWrites() { - transactionLocal.projectionCache().applyProjectionChanges(this); + options.getRepositoryTransactionListeners().forEach(l -> l.onBeforeFlushWrites(this)); QueriesMerger.create(cache) .merge(pendingWrites) .forEach(this::execute); @@ -480,7 +482,7 @@ public void pendingExecute(Statement statement, PARAMS value YdbRepository.Query query = new YdbRepository.Query<>(statement, value); if (options.isImmediateWrites()) { execute(query); - transactionLocal.projectionCache().applyProjectionChanges(this); + options.getRepositoryTransactionListeners().forEach(l -> l.onImmediateWrite(this)); } else { pendingWrites.add(query); } diff --git a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java index 494f1908..3ace6d23 100644 --- a/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java +++ b/repository-ydb-v2/src/main/java/tech/ydb/yoj/repository/ydb/table/YdbTable.java @@ -16,6 +16,7 @@ import tech.ydb.yoj.repository.db.TableDescriptor; import tech.ydb.yoj.repository.db.TableQueryImpl; import tech.ydb.yoj.repository.db.Tx; +import tech.ydb.yoj.repository.db.TxOptions; import tech.ydb.yoj.repository.db.ViewSchema; import tech.ydb.yoj.repository.db.bulk.BulkParams; import tech.ydb.yoj.repository.db.cache.FirstLevelCache; @@ -264,7 +265,7 @@ public T find(Entity.Id id) { if (id.isPartial()) { throw new IllegalArgumentException("Cannot use partial id in find method"); } - return executor.getTransactionLocal().firstLevelCache(tableDescriptor).get(id, __ -> { + return getFirstLevelCache().get(id, __ -> { var statement = new FindYqlStatement<>(tableDescriptor, schema, schema); List res = postLoad(executor.execute(statement, id)); return res.isEmpty() ? null : res.get(0); @@ -376,7 +377,7 @@ public > List find( List found = postLoad(findUncached(ids, filter, orderBy, limit)); if (!isPartialIdMode && ids.size() > found.size()) { Set> foundIds = found.stream().map(Entity::getId).collect(toSet()); - FirstLevelCache cache = executor.getTransactionLocal().firstLevelCache(tableDescriptor); + FirstLevelCache cache = getFirstLevelCache(); Sets.difference(ids, foundIds).forEach(cache::putEmpty); } return found; @@ -525,15 +526,16 @@ public > List findIds(Set partialIds) { public void update(Entity.Id id, Changeset changeset) { UpdateModel.ById> model = new UpdateModel.ById<>(id, changeset.toMap()); executor.pendingExecute(new UpdateByIdStatement<>(tableDescriptor, schema, model), model); - executor.getTransactionLocal().firstLevelCache(tableDescriptor).remove(id); + getFirstLevelCache().remove(id); + executor.getOptions().getEntityEventListeners().forEach(l -> l.onUpdate(tableDescriptor, id, changeset)); } @Override public T insert(T t) { T entityToSave = t.preSave(); executor.pendingExecute(new InsertYqlStatement<>(tableDescriptor, schema), entityToSave); - executor.getTransactionLocal().firstLevelCache(tableDescriptor).put(entityToSave); - executor.getTransactionLocal().projectionCache().save(entityToSave); + getFirstLevelCache().put(entityToSave); + executor.getOptions().getEntityEventListeners().forEach(l -> l.onSave(tableDescriptor, entityToSave)); return t; } @@ -541,16 +543,16 @@ public T insert(T t) { public T save(T t) { T entityToSave = t.preSave(); executor.pendingExecute(new UpsertYqlStatement<>(tableDescriptor, schema), entityToSave); - executor.getTransactionLocal().firstLevelCache(tableDescriptor).put(entityToSave); - executor.getTransactionLocal().projectionCache().save(entityToSave); + getFirstLevelCache().put(entityToSave); + executor.getOptions().getEntityEventListeners().forEach(l -> l.onSave(tableDescriptor, entityToSave)); return t; } @Override public void delete(Entity.Id id) { executor.pendingExecute(new DeleteByIdStatement<>(tableDescriptor, schema), id); - executor.getTransactionLocal().firstLevelCache(tableDescriptor).putEmpty(id); - executor.getTransactionLocal().projectionCache().delete(id); + getFirstLevelCache().putEmpty(id); + executor.getOptions().getEntityEventListeners().forEach(l -> l.onDelete(tableDescriptor, id)); } /** @@ -578,19 +580,19 @@ public > void migrate(ID id) { T rawEntity = foundRaw.get(0); T entityToSave = rawEntity.postLoad().preSave(); executor.pendingExecute(new UpsertYqlStatement<>(tableDescriptor, schema), entityToSave); - executor.getTransactionLocal().projectionCache().save(entityToSave); + executor.getOptions().getEntityEventListeners().forEach(l -> l.onSave(tableDescriptor, entityToSave)); } public FirstLevelCache getFirstLevelCache() { return executor.getTransactionLocal().firstLevelCache(tableDescriptor); } - @Override @NonNull + @Override public T postLoad(T e) { T e1 = e.postLoad(); - executor.getTransactionLocal().firstLevelCache(tableDescriptor).put(e1); - executor.getTransactionLocal().projectionCache().load(e1); + getFirstLevelCache().put(e1); + executor.getOptions().getEntityEventListeners().forEach(l -> l.onLoad(tableDescriptor, e1)); return e1; } @@ -608,6 +610,8 @@ default void bulkUpsert(BulkMapper mapper, List input, BulkParams p Stream readTable(ReadTableMapper mapper, ReadTableParams params); TransactionLocal getTransactionLocal(); + + TxOptions getOptions(); } public static class CheckingQueryExecutor implements QueryExecutor { @@ -657,6 +661,12 @@ public TransactionLocal getTransactionLocal() { check(); return delegate.getTransactionLocal(); } + + @Override + public TxOptions getOptions() { + check(); + return delegate.getOptions(); + } } /** diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/Entity.java b/repository/src/main/java/tech/ydb/yoj/repository/db/Entity.java index 8ec1348b..0a5b9bf1 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/Entity.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/Entity.java @@ -23,9 +23,10 @@ default E preSave() { } /** - * Warning! Projections will be moved to a separate YOJ module in YOJ 3.0.0. - * The {@code Entity.createProjections()} method will be moved to a subinterface of {@code Entity} - * in a separate library. + * Warning! Projections will be moved to a separate YOJ module. + * Please use {@link tech.ydb.yoj.repository.db.projection.EntityWithProjections EntityWithProjections} for entities that can have projections, + * and implement the {@link tech.ydb.yoj.repository.db.projection.EntityWithProjections#collectProjections() collectProjections()} method + * instead of {@code Entity.createProjections()}. * @see #77 */ default List> createProjections() { diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java b/repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java index 889b3597..df1e14b3 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/TxOptions.java @@ -7,8 +7,12 @@ import lombok.With; import tech.ydb.yoj.ExperimentalApi; import tech.ydb.yoj.repository.db.cache.TransactionLog; +import tech.ydb.yoj.repository.db.listener.EntityEventListener; +import tech.ydb.yoj.repository.db.listener.RepositoryTransactionListener; +import tech.ydb.yoj.repository.db.projection.ProjectionListener; import java.time.Duration; +import java.util.List; /** * Transaction options: isolation level, caching and logging settings. @@ -45,7 +49,17 @@ public class TxOptions { @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/162") QueryTracingFilter tracingFilter; + @NonNull + @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/77") + List entityEventListeners; + + @NonNull + @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/77") + List repositoryTransactionListeners; + public static TxOptions create(@NonNull IsolationLevel isolationLevel) { + var projectionListener = new ProjectionListener(); + return builder() .isolationLevel(isolationLevel) // FIXME First-level cache is enabled by default (for backwards compatibility) @@ -53,6 +67,8 @@ public static TxOptions create(@NonNull IsolationLevel isolationLevel) { .firstLevelCache(true) .logLevel(TransactionLog.Level.DEBUG) .logStatementOnSuccess(true) + .entityEventListeners(isolationLevel.isReadOnly() ? List.of() : List.of(projectionListener)) + .repositoryTransactionListeners(isolationLevel.isReadOnly() ? List.of() : List.of(projectionListener)) .build(); } diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java b/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java index ac628a4b..7a5d34cb 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/cache/TransactionLocal.java @@ -7,8 +7,6 @@ import tech.ydb.yoj.repository.db.Tx; import tech.ydb.yoj.repository.db.TxOptions; import tech.ydb.yoj.repository.db.projection.ProjectionCache; -import tech.ydb.yoj.repository.db.projection.RoProjectionCache; -import tech.ydb.yoj.repository.db.projection.RwProjectionCache; import java.util.IdentityHashMap; import java.util.Map; @@ -18,14 +16,12 @@ public final class TransactionLocal { private final Map, Object> singletons = new IdentityHashMap<>(); private final Supplier cacheProviderSupplier; - private final Supplier projectionCacheSupplier; private final Supplier logSupplier; public TransactionLocal(@NonNull TxOptions options) { this.cacheProviderSupplier = () -> new FirstLevelCacheProvider( options.isFirstLevelCache() ? FirstLevelCache::create : FirstLevelCache::empty ); - this.projectionCacheSupplier = options.isMutable() ? RwProjectionCache::new : RoProjectionCache::new; this.logSupplier = () -> new TransactionLog(options.getLogLevel()); } @@ -47,14 +43,14 @@ public X instance(@NonNull Supplier supplier) { * Warning: Unlike {@link #log()}, this method is not intended to be used by end-users, * only by the YOJ implementation itself. * - * @deprecated Projections will be moved from the core YOJ API in 3.0.0 to an optional module. - * The {@code projectionCache()} method is an implementation detail, and will be removed. + * @deprecated This method is no longer used by YOJ itself, and has no practical use to the library users, + * so it now throws an {@link UnsupportedOperationException}. It will be removed permanently in YOJ 2.7.0. * @see #77 */ @InternalApi @Deprecated(forRemoval = true) public ProjectionCache projectionCache() { - return instance(projectionCacheSupplier); + throw new UnsupportedOperationException("Programmatic access to ProjectionCache is not supported"); } /** diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/listener/EntityEventListener.java b/repository/src/main/java/tech/ydb/yoj/repository/db/listener/EntityEventListener.java new file mode 100644 index 00000000..c170f32a --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/listener/EntityEventListener.java @@ -0,0 +1,62 @@ +package tech.ydb.yoj.repository.db.listener; + +import lombok.NonNull; +import tech.ydb.yoj.ExperimentalApi; +import tech.ydb.yoj.repository.db.Entity; +import tech.ydb.yoj.repository.db.TableDescriptor; +import tech.ydb.yoj.repository.db.statement.Changeset; + +/** + * Listener for events with an entity inside a YOJ transaction. + */ +@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/77") +public interface EntityEventListener { + /** + * Entity has been loaded from the database (by any read query), and {@link Entity#postLoad()} has been successfully called on it. + * + * @param tableDescriptor table descriptor for the loaded entity + * @param entity loaded entity + * @param entity type + */ + default > void onLoad(@NonNull TableDescriptor tableDescriptor, @NonNull E entity) { + } + + /** + * {@link tech.ydb.yoj.repository.db.Table#save(Entity) Table.save()}, {@link tech.ydb.yoj.repository.db.Table#insert(Entity) Table.insert()} + * or a similar method has been called, then {@link Entity#preSave()} has been successfully called, and the entity has been saved. + *

Note that in the default delayed writes mode, the actual write to the database will happen only when we attempt to commit + * current transaction. + * + * @param tableDescriptor table descriptor for the saved entity + * @param entity saved entity + * @param entity type + */ + default > void onSave(@NonNull TableDescriptor tableDescriptor, @NonNull E entity) { + } + + /** + * Entity has been {@link tech.ydb.yoj.repository.db.Table#update(Entity.Id, Changeset) patched by Table.update()}. + *

Note that in the default delayed writes mode, the actual write to the database will happen only when we attempt to commit + * current transaction. + * + * @param tableDescriptor table descriptor for the saved entity + * @param entityId ID of the updated entity + * @param changeset set of partial field updates applied to the entity + * @param entity type + */ + default > void onUpdate(@NonNull TableDescriptor tableDescriptor, + @NonNull Entity.Id entityId, @NonNull Changeset changeset) { + } + + /** + * Entity has been deleted by {@link tech.ydb.yoj.repository.db.Table#delete(Entity.Id) Table.delete()} or a similar method. + *

Note that in the default delayed writes mode, the actual write to the database will happen only when we attempt to commit + * current transaction. + * + * @param tableDescriptor table descriptor for the deleted entity + * @param entityId ID of the deleted entity + * @param entity type + */ + default > void onDelete(@NonNull TableDescriptor tableDescriptor, @NonNull Entity.Id entityId) { + } +} diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/listener/RepositoryTransactionListener.java b/repository/src/main/java/tech/ydb/yoj/repository/db/listener/RepositoryTransactionListener.java new file mode 100644 index 00000000..a3b531c7 --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/listener/RepositoryTransactionListener.java @@ -0,0 +1,52 @@ +package tech.ydb.yoj.repository.db.listener; + +import lombok.NonNull; +import tech.ydb.yoj.ExperimentalApi; +import tech.ydb.yoj.repository.db.RepositoryTransaction; + +/** + * Listener for transaction events. + */ +@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/77") +public interface RepositoryTransactionListener { + /** + * This event happens right before we attempt to perform delayed writes (if any) and commit the transaction. + * This gives the last opportunity to perform more writes in the current transaction. + *

Note: Currently, this event happens in both delayed writes transactions (where all writes are queued until commit, + * and can be merged together) and in immediate writes transactions (where all writes are executed immediately). + * This is an implementation detail that might change in the future. + * + * @see #onImmediateWrite(RepositoryTransaction) + * + * @param transaction repository transaction + */ + default void onBeforeFlushWrites(@NonNull RepositoryTransaction transaction) { + } + + /** + * A write query has been successfully executed in this immediate writes transaction. This event does not happen + * in a delayed writes transaction. + * + * @param transaction repository transaction + */ + default void onImmediateWrite(@NonNull RepositoryTransaction transaction) { + } + + /** + * Transaction has been committed successfully. + * + * @param transaction repository transaction + * @see RepositoryTransaction#commit() + */ + default void onCommit(@NonNull RepositoryTransaction transaction) { + } + + /** + * Transaction has been rolled back successfully. + * + * @param transaction repository transaction + * @see RepositoryTransaction#rollback() + */ + default void onRollback(@NonNull RepositoryTransaction transaction) { + } +} diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/EntityWithProjections.java b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/EntityWithProjections.java new file mode 100644 index 00000000..ccdf37e8 --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/EntityWithProjections.java @@ -0,0 +1,24 @@ +package tech.ydb.yoj.repository.db.projection; + +import tech.ydb.yoj.ExperimentalApi; +import tech.ydb.yoj.repository.db.Entity; + +/** + * Base interface for {@link Entity entities} that can have projections. + * + * @param self type + */ +@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/77") +public interface EntityWithProjections> extends Entity { + /** + * Creates a collections of projections (=computed index entities) for this entity. The returned collection may be empty. + *

The default implementation uses the result of {@link Entity#createProjections()}, for backwards compatibility. + * It is highly recommended to construct a {@link ProjectionCollection} explicitly, instead. + * + * @return projections for this entity + */ + @ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/77") + default ProjectionCollection collectProjections() { + return ProjectionCollection.of(Entity.super.createProjections()); + } +} diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/Projection.java b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/Projection.java new file mode 100644 index 00000000..e5eac87e --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/Projection.java @@ -0,0 +1,54 @@ +package tech.ydb.yoj.repository.db.projection; + +import com.google.common.base.Preconditions; +import lombok.NonNull; +import tech.ydb.yoj.repository.db.Entity; +import tech.ydb.yoj.repository.db.EntitySchema; +import tech.ydb.yoj.repository.db.TableDescriptor; + +/** + * Represents an entity that is a projection of another entity, that is, a programmatically defined index entity. + * Because an {@link Entity} does not carry information about the exact table it must be saved to, a {@code Projection} takes an + * explicit {@link TableDescriptor} for that purpose. + * + * @param tableDescriptor table descriptor to use when saving and loading the entity + * @param entity projection entity + * @param projection entity type + * @see ProjectionCollection + * @see EntityWithProjections#collectProjections() + */ +public record Projection>( + @NonNull TableDescriptor tableDescriptor, + @NonNull E entity +) { + @SuppressWarnings("unchecked") + public Projection(@NonNull E entity) { + this(TableDescriptor.from(EntitySchema.of(entity.getClass())), entity); + } + + public Projection { + Preconditions.checkArgument(!(entity instanceof EntityWithProjections), + "A projection entity must not itself implement EntityWithProjections, but this one does: <%s>", entity.getClass().getCanonicalName()); + + var projectionProjections = entity.createProjections(); + Preconditions.checkArgument(projectionProjections.isEmpty(), + "A projection entity must not return any projections from Entity.createProjections(), but for %s we got: %s", + entity, projectionProjections); + } + + @NonNull + public Entity.Id entityId() { + return entity.getId(); + } + + @NonNull + public Key key() { + return new Key<>(tableDescriptor, entity.getId()); + } + + public record Key>( + @NonNull TableDescriptor tableDescriptor, + @NonNull Entity.Id entityId + ) { + } +} diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCache.java b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCache.java index dba2106e..111c7f3e 100644 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCache.java +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCache.java @@ -5,8 +5,7 @@ import tech.ydb.yoj.repository.db.RepositoryTransaction; /** - * @deprecated Projections will be moved from the core YOJ API in 3.0.0 to an optional module. - * The {@code ProjectionCache} interface is an implementation detail, and will be removed or moved to an internal package. + * @deprecated The {@code ProjectionCache} interface is an implementation detail which is not used anymore, and it will be removed in YOJ 2.7.0. * @see #77 */ @InternalApi diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCollection.java b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCollection.java new file mode 100644 index 00000000..51cde54d --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionCollection.java @@ -0,0 +1,227 @@ +package tech.ydb.yoj.repository.db.projection; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; +import lombok.NonNull; +import org.jetbrains.annotations.Contract; +import tech.ydb.yoj.ExperimentalApi; +import tech.ydb.yoj.repository.db.Entity; +import tech.ydb.yoj.repository.db.TableDescriptor; + +import javax.annotation.Nullable; +import java.util.AbstractCollection; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Spliterator; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toList; + +/** + * A collection of projections (programmatically defined index entities). + * + * @see #of() + * @see #of(Entity[]) + * @see #of(Projection[]) + * @see #copyOf(Iterable) + * @see #builder() + * @see Projection + */ +@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/77") +public final class ProjectionCollection extends AbstractCollection> { + private static final ProjectionCollection EMPTY = new ProjectionCollection(List.of()); + + private final List> projections; + + private ProjectionCollection(@NonNull List> projections) { + this.projections = List.copyOf(projections); + } + + @NonNull + public static ProjectionCollection of() { + return EMPTY; + } + + @NonNull + public static ProjectionCollection of(@NonNull Iterable> projectionEntities) { + return Iterables.isEmpty(projectionEntities) ? EMPTY : builder().addAllEntities(projectionEntities).build(); + } + + @NonNull + public static ProjectionCollection of(@NonNull Entity... projectionEntities) { + return projectionEntities.length == 0 ? EMPTY : builder().addAllEntities(projectionEntities).build(); + } + + @NonNull + public static ProjectionCollection of(@NonNull Projection... projections) { + return projections.length == 0 ? EMPTY : builder().addAll(projections).build(); + } + + @NonNull + public static ProjectionCollection copyOf(@NonNull Iterable> projections) { + return Iterables.isEmpty(projections) ? EMPTY : builder().addAll(projections).build(); + } + + @NonNull + public static Builder builder() { + return new Builder(); + } + + @Override + public int size() { + return projections.size(); + } + + @Override + public boolean isEmpty() { + return projections.isEmpty(); + } + + @NonNull + @Override + public Stream> stream() { + return projections.stream(); + } + + @NonNull + @Override + public Iterator> iterator() { + return projections.iterator(); + } + + @Override + public void forEach(Consumer> action) { + projections.forEach(action); + } + + @NonNull + @Override + public Spliterator> spliterator() { + return projections.spliterator(); + } + + public static final class Builder { + private final Map, Entity> projections = new LinkedHashMap<>(); + + private Builder() { + } + + public > Builder addEntity(@NonNull E projectionEntity) { + return add(new Projection<>(projectionEntity)); + } + + @Contract(value = "false, _ -> this; true, !null -> this; true, null -> fail") + public > Builder addEntityIf(boolean condition, @Nullable E projectionEntity) { + return condition + ? addEntity(Objects.requireNonNull(projectionEntity, "projectionEntity")) + : this; + } + + public > Builder addEntityIfNotNull(@Nullable E optionalEntity) { + return addEntityIf(optionalEntity != null, optionalEntity); + } + + @Contract(value = "null, _ -> this; !null, _ -> this") + public > Builder addEntityIfNotNull(@Nullable X value, + @NonNull Function<@NonNull X, @NonNull E> projectionCtor) { + return value != null ? addEntity(projectionCtor.apply(value)) : this; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public Builder addAllEntities(@NonNull Entity... entities) { + for (Entity e : entities) { + this.addEntity((Entity) e); + } + return this; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public Builder addAllEntities(@NonNull Iterable<@NonNull ? extends Entity> entities) { + for (Entity e : entities) { + this.addEntity((Entity) e); + } + return this; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public Builder addAllEntities(@NonNull Stream<@NonNull ? extends Entity> entities) { + entities.forEach(e -> this.addEntity((Entity) e)); + return this; + } + + public Builder addAllEntitiesIf(boolean condition, @NonNull Stream<@NonNull ? extends Entity> entities) { + return condition ? addAllEntities(entities) : this; + } + + @NonNull + public > Builder add(@NonNull TableDescriptor tableDescriptor, @NonNull E projection) { + return add(new Projection<>(tableDescriptor, projection)); + } + + @NonNull + public > Builder add(@NonNull Projection projection) { + Preconditions.checkArgument(!projections.containsKey(projection.key()), "Duplicate projection: %s", projection.key()); + projections.put(projection.key(), projection.entity()); + return this; + } + + public > Builder addIfNotNull(@Nullable Projection optionalProjection) { + return addIf(optionalProjection != null, optionalProjection); + } + + @Contract(value = "false, _ -> this; true, !null -> this; true, null -> fail") + public > Builder addIf(boolean condition, @Nullable Projection projection) { + return condition + ? add(Objects.requireNonNull(projection, "projection")) + : this; + } + + public > Builder addIfNotNull(@Nullable X value, + @NonNull Function<@NonNull X, @NonNull Projection> projectionCtor) { + return value != null ? add(projectionCtor.apply(value)) : this; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public Builder addAll(@NonNull Projection... projections) { + for (Projection p : projections) { + this.add((Projection) p); + } + return this; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public Builder addAll(@NonNull Iterable<@NonNull ? extends Projection> projections) { + for (Projection p : projections) { + this.add((Projection) p); + } + return this; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public Builder addAll(@NonNull Stream<@NonNull ? extends Projection> projections) { + projections.forEach(p -> this.add((Projection) p)); + return this; + } + + public Builder addAllIf(boolean condition, @NonNull Stream<@NonNull ? extends Projection> projections) { + return condition ? addAll(projections) : this; + } + + @NonNull + public ProjectionCollection build() { + return new ProjectionCollection(projections.entrySet().stream().map(Builder::projection).collect(toList())); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private static Projection projection(@NonNull Map.Entry, ?> e) { + TableDescriptor tableDescriptor = e.getKey().tableDescriptor(); + Entity entity = (Entity) e.getValue(); + return new Projection<>(tableDescriptor, entity); + } + } +} diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionListener.java b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionListener.java new file mode 100644 index 00000000..c1b58788 --- /dev/null +++ b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/ProjectionListener.java @@ -0,0 +1,154 @@ +package tech.ydb.yoj.repository.db.projection; + +import lombok.NonNull; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import tech.ydb.yoj.ExperimentalApi; +import tech.ydb.yoj.InternalApi; +import tech.ydb.yoj.repository.db.Entity; +import tech.ydb.yoj.repository.db.RepositoryTransaction; +import tech.ydb.yoj.repository.db.TableDescriptor; +import tech.ydb.yoj.repository.db.listener.EntityEventListener; +import tech.ydb.yoj.repository.db.listener.RepositoryTransactionListener; + +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.stream.Stream; + +import static java.util.stream.Collectors.toMap; + +@InternalApi +@ExperimentalApi(issue = "https://github.com/ydb-platform/yoj-project/issues/77") +public final class ProjectionListener implements EntityEventListener, RepositoryTransactionListener { + private static final Logger log = LoggerFactory.getLogger(ProjectionListener.class); + + private final Map, Row> rows = new LinkedHashMap<>(); + + @Override + public > void onLoad(@NonNull TableDescriptor tableDescriptor, @NonNull E entity) { + row(tableDescriptor, entity).load(entity); + } + + @Override + public > void onSave(@NonNull TableDescriptor tableDescriptor, @NonNull E entity) { + row(tableDescriptor, entity).save(entity); + } + + @Override + public > void onDelete(@NonNull TableDescriptor tableDescriptor, @NonNull Entity.Id entityId) { + row(tableDescriptor, entityId).delete(); + } + + private > Row row(TableDescriptor tableDescriptor, E entity) { + return row(tableDescriptor, entity.getId()); + } + + @SuppressWarnings("unchecked") + private > Row row(TableDescriptor tableDescriptor, Entity.Id entityId) { + return (Row) rows.computeIfAbsent(new Projection.Key<>(tableDescriptor, entityId), __ -> new Row<>()); + } + + @Override + public void onBeforeFlushWrites(@NonNull RepositoryTransaction transaction) { + applyProjectionChanges(transaction); + } + + @Override + public void onImmediateWrite(@NonNull RepositoryTransaction transaction) { + applyProjectionChanges(transaction); + } + + private void applyProjectionChanges(@NonNull RepositoryTransaction transaction) { + Map, Projection> oldProjections = rows.values().stream() + .flatMap(Row::projectionsBefore) + .collect(toMap(Projection::key, e -> e, this::mergeOldProjections)); + + Map, Projection> newProjections = rows.values().stream() + .flatMap(Row::projectionsAfter) + .collect(toMap(Projection::key, e -> e, this::mergeNewProjections)); + + for (Row row : rows.values()) { + row.flush(); + } + + oldProjections.values().stream() + .filter(e -> !newProjections.containsKey(e.key())) + .forEach(e -> deleteEntity(transaction, e)); + newProjections.values().stream() + .filter(e -> !e.equals(oldProjections.get(e.key()))) + .forEach(e -> saveEntity(transaction, e)); + } + + private > void deleteEntity(RepositoryTransaction transaction, Projection projection) { + transaction.table(projection.tableDescriptor()).delete(projection.entityId()); + } + + private > void saveEntity(RepositoryTransaction transaction, Projection projection) { + transaction.table(projection.tableDescriptor()).save(projection.entity()); + } + + private Projection mergeOldProjections(Projection p1, Projection p2) { + if (p1 == p2 || p1.equals(p2)) { + log.error(""" + FIX THIS ASAP! Got two equal projections with the same ID: {}. NO exception is thrown so that \ + you can just fix and migrate the entities to fix the projections""", p1); + return p1; + } + throw new IllegalStateException("Got two unequal projections with the same ID and table descriptor: p1=" + p1 + "; p2=" + p2); + } + + private Projection mergeNewProjections(Projection p1, Projection p2) { + throw new IllegalStateException("Got two projections with the same ID and table descriptor: p1=" + p1 + "; p2=" + p2); + } + + private static final class Row> { + private Entity loaded; + private Entity saved; + private boolean writable; + + public void load(Entity entity) { + if (loaded == null) { + loaded = entity; + } + } + + public void save(Entity entity) { + saved = entity; + writable = true; + } + + public void delete() { + saved = null; + writable = true; + } + + public Stream> projectionsBefore() { + return streamProjections(loaded); + } + + public Stream> projectionsAfter() { + return streamProjections(saved); + } + + @NonNull + private Stream> streamProjections(Entity entity) { + if (writable && entity != null) { + if (entity instanceof EntityWithProjections ewp) { + return ewp.collectProjections().stream(); + } else { + return ProjectionCollection.of(entity.createProjections()).stream(); + } + } else { + return Stream.empty(); + } + } + + public void flush() { + if (writable) { + loaded = saved; + } + saved = null; + writable = false; + } + } +} diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RoProjectionCache.java b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RoProjectionCache.java deleted file mode 100644 index ef004029..00000000 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RoProjectionCache.java +++ /dev/null @@ -1,32 +0,0 @@ -package tech.ydb.yoj.repository.db.projection; - -import tech.ydb.yoj.InternalApi; -import tech.ydb.yoj.repository.db.Entity; -import tech.ydb.yoj.repository.db.RepositoryTransaction; - -/** - * @deprecated Projections will be moved from the core YOJ API in 3.0.0 to an optional module. - * The {@code RoProjectionCache} class is an implementation detail, and will be removed or moved to an internal package. - * @see #77 - */ -@InternalApi -@Deprecated(forRemoval = true) -public class RoProjectionCache implements ProjectionCache { - @Override - public void load(Entity entity) { - } - - @Override - public void save(Entity entity) { - throw new UnsupportedOperationException("Should not be invoked in RO"); - } - - @Override - public void delete(Entity.Id id) { - throw new UnsupportedOperationException("Should not be invoked in RO"); - } - - @Override - public void applyProjectionChanges(RepositoryTransaction transaction) { - } -} diff --git a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RwProjectionCache.java b/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RwProjectionCache.java deleted file mode 100644 index d0ed688f..00000000 --- a/repository/src/main/java/tech/ydb/yoj/repository/db/projection/RwProjectionCache.java +++ /dev/null @@ -1,139 +0,0 @@ -package tech.ydb.yoj.repository.db.projection; - -import lombok.NonNull; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import tech.ydb.yoj.InternalApi; -import tech.ydb.yoj.repository.db.Entity; -import tech.ydb.yoj.repository.db.RepositoryTransaction; - -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.stream.Stream; - -import static java.util.stream.Collectors.toMap; - -/** - * @deprecated Projections will be moved from the core YOJ API in 3.0.0 to an optional module. - * The {@code RwProjectionCache} class is an implementation detail, and will be removed or moved to an internal package. - * - * @see #77 - */ -@InternalApi -@Deprecated(forRemoval = true) -public class RwProjectionCache implements ProjectionCache { - private static final Logger log = LoggerFactory.getLogger(RwProjectionCache.class); - - private final Map, Row> rows = new LinkedHashMap<>(); - - @Override - public void load(Entity entity) { - row(entity.getId()).load(entity); - } - - @Override - public void save(Entity entity) { - row(entity.getId()).save(entity); - } - - @Override - public void delete(Entity.Id id) { - row(id).delete(); - } - - private Row row(Entity.Id id) { - return rows.computeIfAbsent(id, __ -> new Row()); - } - - @Override - public void applyProjectionChanges(RepositoryTransaction transaction) { - Map, Entity> oldProjections = rows.values().stream() - .flatMap(Row::projectionsBefore) - .collect(toMap(Entity::getId, e -> e, this::mergeOldProjections)); - Map, Entity> newProjections = rows.values().stream() - .flatMap(Row::projectionsAfter) - .collect(toMap(Entity::getId, e -> e, this::mergeNewProjections)); - - for (Row row : rows.values()) { - row.flush(); - } - - oldProjections.values().stream() - .filter(e -> !newProjections.containsKey(e.getId())) - .forEach(e -> deleteEntity(transaction, e.getId())); - newProjections.values().stream() - .filter(e -> !e.equals(oldProjections.get(e.getId()))) - .forEach(e -> saveEntity(transaction, e)); - } - - private > void deleteEntity(RepositoryTransaction transaction, Entity.Id entityId) { - transaction.table(entityId.getType()).delete(entityId); - } - - private > void saveEntity(RepositoryTransaction transaction, Entity entity) { - @SuppressWarnings("unchecked") - T castedEntity = (T) entity; - - transaction.table(entity.getId().getType()).save(castedEntity); - } - - private Entity mergeOldProjections(Entity p1, Entity p2) { - if (p1 == p2 || p1.equals(p2)) { - log.error("FIX THIS ASAP! Got two equal projections with the same ID: {}. NO exception is thrown so that " - + "you can just fix and migrate the entities to fix the projections", p1); - return p1; - } - throw new IllegalStateException("Got two unequal projections with the same ID: p1=" + p1 + "; p2=" + p2); - } - - private Entity mergeNewProjections(Entity p1, Entity p2) { - throw new IllegalStateException("Got two projections with the same ID: p1=" + p1 + "; p2=" + p2); - } - - private static class Row { - Entity loaded; - Entity saved; - boolean writable; - - void load(Entity entity) { - if (loaded == null) { - loaded = entity; - } - } - - void save(Entity entity) { - saved = entity; - writable = true; - } - - void delete() { - saved = null; - writable = true; - } - - Stream> projectionsBefore() { - return projectionStream(loaded); - } - - Stream> projectionsAfter() { - return projectionStream(saved); - } - - @NonNull - private Stream> projectionStream(Entity entity) { - if (writable && entity != null) { - return entity.createProjections().stream(); - } else { - return Stream.empty(); - } - } - - void flush() { - if (writable) { - loaded = saved; - } - saved = null; - writable = false; - } - } -}