Open
Conversation
Refuses a DROP TABLE while an active Pipeline still references the resource (as either source or sink), so dropping the underlying Kafka topic / Venice store / MySQL table can't silently orphan a downstream pipeline. Validator framework, made Connection-aware: - Validated.validate(Issues, Connection) (was: validate(Issues)) - ValidatorProvider.validators(T, Connection) (was: validators(T)) - ValidationService.validate(T, Issues, Connection) - ValidationService.validateOrThrow(T, Connection) - ValidationService.validateOrThrow(Collection<T>, Connection) - ValidationService.validators(T, Connection) PendingDelete<T> wrapper (hoptimator-api): - Explicit "this is being deleted" signal so unrelated callers of validateOrThrow(source, connection) don't accidentally trigger pre-delete checks. - Carries an optional selfOwnerUid so cascade-deleted children can be excluded from the dependent set. K8s indexed lookup: - PipelineDependencyLabels stamps `depends-on-<slug>` labels on every Pipeline CRD at create time, naming each source/sink. The slug is a 16-char SHA-256 prefix of `<database>_<dot-joined-path>`; an annotation lists the full identifiers so a slug collision can be detected at check time. - PipelineDependencyChecker uses a server-indexed label-selector list + annotation cross-check + selfOwnerUid filter. - K8sPipelineDeployer threads sources/sink through and calls PipelineDependencyLabels.labelsFor / annotationFor at toK8sObject(). K8sPipelineBundle and K8sMaterializedViewDeployer pass the data through. Dispatch: - K8sValidatorProvider returns a K8sPipelineDependencyValidator for PendingDelete<Source>; registered via META-INF/services/com.linkedin.hoptimator.ValidatorProvider. - K8sPipelineDependencyValidator wraps PipelineDependencyChecker as a Validator. DROP TABLE wiring: - HoptimatorDdlExecutor calls ValidationService.validateOrThrow(new PendingDelete<>(source), connection) before DeploymentService.delete in the table branch. HoptimatorDdlUtils.removeTableFromSchema() is the symmetric inverse of registerTemporaryTableInSchema() for cleanup. Implementor side-effects (no behavior change): - KafkaDeployer / VeniceDeployer / MySqlDeployer no longer need a declarative DependencyGuarded marker — the guard fires from the validator framework before delete() is reached. - All existing Validated implementors (DefaultValidator, CompatibilityValidatorBase, AvroTableValidator, K8sViewTable) and ValidatorProvider implementors (DefaultValidatorProvider, CompatibilityValidatorProvider, AvroValidatorProvider) updated to the new signatures. Tests: PipelineDependencyLabelsTest, PipelineDependencyCheckerTest, K8sPipelineDeployerTest assertions for stamping, validator-framework test updates throughout. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
LogicalTableDeployer.delete() previously threw SQLFeatureNotSupported. Now implemented end-to-end as a per-tier sequence that mirrors what running DROP TABLE on each tier independently would do, plus the LogicalTable CRD removal at the top. Flow: 1. Per-tier pre-flight via the validator framework: ValidationService.validateOrThrow(new PendingDelete<>(tierSource, logicalTableUid), connection) — refuses the drop if any active external pipeline still references a tier resource. The selfOwnerUid is the LogicalTable CRD's UID so the implicit inter-tier pipelines (owned by the CRD, cascade-deleted with it) don't self-block. 2. Delete the LogicalTable CRD. K8s owner-ref cascade removes its owned Pipeline and TableTrigger CRDs. 3. Best-effort physical cleanup of each tier resource (Kafka topic, Venice store, ...). A failed tier delete logs a warning but does not abort — a stranded tier is recoverable; aborting mid-DROP isn't. 4. Per-tier schema cleanup: deregister the TemporaryTable entry in each tier schema only when its physical delete succeeded. Tests: - LogicalTableDeployerTest deleteRemovesCrdAndCleansUpTierResources, deletePropagatesCrdDeletionFailure, deleteSwallowsTierCleanupFailures. - logical-ddl.id integration test: DROP TABLE LOGICAL.testevent now succeeds and cascades the implicit nearline-to-online pipeline. - logical-offline-ddl.id companion update. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
kafka-ddl-create-table.id: cross-driver dependency-guard scenarios exercising the new pre-delete check end-to-end through the kafka driver — drop-table-while-pipeline-depends-on-it (source side and partial-view sink side). The bulk of the file count is mechanical noise reduction across existing test files: dropped unused imports, tightened generics on @SuppressWarnings, etc. — fallout from the warning_cleanup pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
7c19cc8 to
6b2b705
Compare
Code Coverage
|
60fb5e0 to
e234cec
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
DROP TABLE on a Kafka topic / Venice store / MySQL table currently succeeds even when an active Pipeline still reads from or writes to the resource. The downstream pipeline gets silently orphaned, it keeps trying to consume a topic that no
longer exists.
LogicalTableDeployer.delete() was unimplemented (threw SQLFeatureNotSupported) for the same reason: there was no safe way to verify a logical table's tier resources weren't still in use.
Approach
A pre-delete dependency check that runs in the SQL DDL path before any deployer-level state change:
The mechanism is a thin layer on Hoptimator's existing Validator/ValidatorProvider/ValidationService framework, extended to thread a Connection so validators can do external lookups.
What's in the diff (3 commits)
feat: pre-delete dependency guard for DROP TABLE
The core feature, end-to-end:
Validated.validate(Issues, Connection),ValidatorProvider.validators(T, Connection),ValidationService.validateOrThrow(T, Connection)andvalidateOrThrow(Collection<T>, Connection). Connection-aware on a single, breaking signature (similar to how deployers take connection)PendingDelete<T>wrapper inhoptimator-api. Carries an optionalselfOwnerUidso cascade-deleted children can be excluded from the dependent set.PipelineDependencyLabels(slug + identifier + label/annotation builders),PipelineDependencyChecker(label-selector query + annotation collision guard + self-owner filter).K8sPipelineDeployer.toK8sObject()stamps labels and annotation;K8sPipelineBundleandK8sMaterializedViewDeployerthread sources/sink through.K8sValidatorProviderreturns aK8sPipelineDependencyValidatorforPendingDelete<Source>; registered viaMETA-INF/services/com.linkedin.hoptimator.ValidatorProvider.HoptimatorDdlExecutor.execute(SqlDropObject)callsvalidateOrThrow(new PendingDelete<>(source), connection)beforeDeploymentService.deletein the table branch.feat: support DROP TABLE for logical tables
LogicalTableDeployer.delete() is now implemented as N-tier-DROPs + 1-CRD-delete:
test: integration scenarios + cleanup test warnings
Test plan
Known caveats
to the check and won't block a DROP. To make this a hard correctness invariant, an upgrade-time backfill job needs to re-stamp labels on every existing Pipeline CRD by re-deriving sources/sink from its spec.sql. Out of scope here.
Operationally that's fine — audience2's Flink job never references audience at runtime, so dropping audience doesn't orphan anything. The dangling SQL on audience2's View CRD is self-healing on next CREATE OR REPLACE.