diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java index 83abc424..5d37f661 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sApiEndpoints.java @@ -10,6 +10,8 @@ import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList; +import com.linkedin.hoptimator.k8s.models.V1alpha1Subscription; +import com.linkedin.hoptimator.k8s.models.V1alpha1SubscriptionList; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger; import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList; import com.linkedin.hoptimator.k8s.models.V1alpha1View; @@ -57,6 +59,9 @@ public final class K8sApiEndpoints { public static final K8sApiEndpoint JOB_TEMPLATES = new K8sApiEndpoint<>("JobTemplate", "hoptimator.linkedin.com", "v1alpha1", "jobtemplates", false, V1alpha1JobTemplate.class, V1alpha1JobTemplateList.class); + public static final K8sApiEndpoint SUBSCRIPTIONS = + new K8sApiEndpoint<>("Subscription", "hoptimator.linkedin.com", "v1alpha1", "subscriptions", false, + V1alpha1Subscription.class, V1alpha1SubscriptionList.class); public static final K8sApiEndpoint TABLE_TRIGGERS = new K8sApiEndpoint<>("TableTrigger", "hoptimator.linkedin.com", "v1alpha1", "tabletriggers", false, V1alpha1TableTrigger.class, V1alpha1TableTriggerList.class); diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sJobDeployerTest.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sJobDeployerTest.java index 11c97412..ec2c0107 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sJobDeployerTest.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/K8sJobDeployerTest.java @@ -9,7 +9,6 @@ import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate; import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList; import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateSpec; -import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.kubernetes.client.openapi.models.V1ObjectMeta; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -25,19 +24,14 @@ import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Properties; - import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) -@SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"}, - justification = "Mockito doReturn().when() stubs — framework captures the return value") class K8sJobDeployerTest { @Mock @@ -98,8 +92,6 @@ K8sSnapshot createSnapshot(K8sContext context) { @Test void specifyWithNoTemplatesReturnsEmpty() throws SQLException { - doReturn(new Properties()).when(connection).connectionProperties(); - Sink sink = new Sink("sinkdb", Arrays.asList("schema", "sink_table"), Collections.emptyMap()); Job job = createTestJob(sink); @@ -114,8 +106,6 @@ void specifyWithNoTemplatesReturnsEmpty() throws SQLException { @Test void specifyRendersMatchingTemplate() throws SQLException { - doReturn(new Properties()).when(connection).connectionProperties(); - templates.add(new V1alpha1JobTemplate() .metadata(new V1ObjectMeta().name("template1")) .spec(new V1alpha1JobTemplateSpec() @@ -136,8 +126,6 @@ void specifyRendersMatchingTemplate() throws SQLException { @Test void specifyFiltersOutNonMatchingDatabases() throws SQLException { - doReturn(new Properties()).when(connection).connectionProperties(); - templates.add(new V1alpha1JobTemplate() .metadata(new V1ObjectMeta().name("template1")) .spec(new V1alpha1JobTemplateSpec() @@ -157,8 +145,6 @@ void specifyFiltersOutNonMatchingDatabases() throws SQLException { @Test void specifyWithNullDatabasesMatchesAll() throws SQLException { - doReturn(new Properties()).when(connection).connectionProperties(); - templates.add(new V1alpha1JobTemplate() .metadata(new V1ObjectMeta().name("template1")) .spec(new V1alpha1JobTemplateSpec() @@ -178,8 +164,6 @@ void specifyWithNullDatabasesMatchesAll() throws SQLException { @Test void specifyRendersTemplateVariables() throws SQLException { - doReturn(new Properties()).when(connection).connectionProperties(); - templates.add(new V1alpha1JobTemplate() .metadata(new V1ObjectMeta().name("template1")) .spec(new V1alpha1JobTemplateSpec() @@ -203,8 +187,6 @@ void specifyRendersTemplateVariables() throws SQLException { @Test void specifyLambdasReturnNonEmptyValues() throws SQLException { // Verify each key field is non-empty. - doReturn(new Properties()).when(connection).connectionProperties(); - templates.add(new V1alpha1JobTemplate() .metadata(new V1ObjectMeta().name("template1")) .spec(new V1alpha1JobTemplateSpec() @@ -237,10 +219,6 @@ void specifyLambdasReturnNonEmptyValues() throws SQLException { @Test void specifyWithFlinkConfigPropertiesIncludesThem() throws SQLException { // Verify that sink options ARE merged into the environment - Properties connProps = new Properties(); - connProps.setProperty("flinkConfig1", "value1"); - doReturn(connProps).when(connection).connectionProperties(); - Map sinkOptions = new HashMap<>(); sinkOptions.put("sinkOption", "sinkVal"); Sink sink = new Sink("sinkdb", Arrays.asList("schema", "sink_table"), sinkOptions); @@ -264,8 +242,6 @@ void specifyWithFlinkConfigPropertiesIncludesThem() throws SQLException { @Test void specifyConditionalRenderedTemplateNotNull() throws SQLException { // Verify null templates are skipped - doReturn(new Properties()).when(connection).connectionProperties(); - templates.add(new V1alpha1JobTemplate() .metadata(new V1ObjectMeta().name("template1")) .spec(new V1alpha1JobTemplateSpec() diff --git a/hoptimator-operator/build.gradle b/hoptimator-operator/build.gradle index b5ce8284..4314b1d1 100644 --- a/hoptimator-operator/build.gradle +++ b/hoptimator-operator/build.gradle @@ -6,9 +6,6 @@ plugins { dependencies { implementation project(':hoptimator-api') - implementation project(':hoptimator-avro') // <-- marked for deletion - implementation project(':hoptimator-planner') // <-- marked for deletion - implementation project(':hoptimator-catalog') // <-- marked for deletion implementation project(':hoptimator-jdbc') implementation project(':hoptimator-util') implementation project(':hoptimator-k8s') @@ -18,7 +15,7 @@ dependencies { implementation libs.kubernetes.extended.client implementation libs.slf4j.api implementation libs.commons.cli - implementation libs.avro + implementation libs.calcite.avatica implementation libs.cron.utils testImplementation(testFixtures(project(':hoptimator-k8s'))) diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java index b3cf4e71..61ee8779 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/PipelineOperatorApp.java @@ -4,6 +4,7 @@ import com.linkedin.hoptimator.k8s.K8sApiEndpoints; import com.linkedin.hoptimator.k8s.K8sContext; import com.linkedin.hoptimator.operator.pipeline.PipelineReconciler; +import com.linkedin.hoptimator.operator.subscription.SubscriptionReconciler; import com.linkedin.hoptimator.operator.trigger.TableTriggerReconciler; import com.linkedin.hoptimator.operator.trigger.ViewReconciler; import io.kubernetes.client.extended.controller.Controller; @@ -18,6 +19,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.DriverManager; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -59,25 +62,42 @@ public static void main(String[] args) throws Exception { String watchNamespaceInput = cmd.getOptionValue("watch", ""); Properties connectionProperties = new Properties(); connectionProperties.put("k8s.watch.namespace", watchNamespaceInput); - K8sContext context = K8sContext.create(new HoptimatorConnection(null, connectionProperties)); - new PipelineOperatorApp(context).run(); + + // Create a JDBC connection for SQL planning (used by SubscriptionReconciler) + Connection jdbcConnection = DriverManager.getConnection("jdbc:hoptimator://", connectionProperties); + K8sContext context = K8sContext.create(jdbcConnection); + + SubscriptionReconciler.Planner planner = SubscriptionReconciler.jdbcPlanner(jdbcConnection); + new PipelineOperatorApp(context).run(planner); } public void run() { run(Collections.emptyList()); } + public void run(SubscriptionReconciler.Planner planner) { + run(planner, Collections.emptyList()); + } + public void run(List initialControllers) { + run(null, initialControllers); + } + + public void run(SubscriptionReconciler.Planner planner, List initialControllers) { // register informers context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5)); context.registerInformer(K8sApiEndpoints.TABLE_TRIGGERS, Duration.ofMinutes(5)); context.registerInformer(K8sApiEndpoints.VIEWS, Duration.ofMinutes(5)); + context.registerInformer(K8sApiEndpoints.SUBSCRIPTIONS, Duration.ofMinutes(5)); List controllers = new ArrayList<>(initialControllers); controllers.addAll(ControllerService.controllers(context)); controllers.add(PipelineReconciler.controller(context)); controllers.add(TableTriggerReconciler.controller(context)); controllers.add(ViewReconciler.controller(context)); + if (planner != null) { + controllers.add(SubscriptionReconciler.controller(context, planner)); + } ControllerManager controllerManager = new ControllerManager(context.informerFactory(), controllers.toArray(new Controller[0])); diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java deleted file mode 100644 index 4fe57773..00000000 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironment.java +++ /dev/null @@ -1,35 +0,0 @@ -package com.linkedin.hoptimator.operator.subscription; - -import com.linkedin.hoptimator.avro.AvroConverter; -import com.linkedin.hoptimator.catalog.Resource; -import com.linkedin.hoptimator.planner.Pipeline; - - -/** - * Exposes Subscription variables to resource templates. - *

- * Variables have a `pipeline.` prefix (even though they come from the - * Subscription object), because the planner is unaware of Subscriptions. - * For example, the CLI constructs pipelines without any corresponding - * Subscription object. In the future, we may have additional K8s objects - * that result in pipelines. - *

- * The exported variables include: - *

- * - `pipeline.namespace`, the K8s namespace where the pipeline should be - * deployed. This is a recommendation -- templates may elect to ignore it. - * - `pipeline.name`, a unique name for the pipeline. Templates can use this - * as a basis for deriving K8s object names, Kafka topic names, etc. The - * name is guaranteed to be a valid K8s object name, e.g. `my-subscription`. - * - `pipeline.avroSchema`, an Avro schema for the pipeline's output type. - */ -@Deprecated -public class SubscriptionEnvironment extends Resource.SimpleEnvironment { - - public SubscriptionEnvironment(String namespace, String name, Pipeline pipeline) { - export("pipeline.namespace", namespace); - export("pipeline.name", name); - export("pipeline.avroSchema", - AvroConverter.avro("com.linkedin.hoptimator", "OutputRecord", pipeline.outputType()).toString(false)); - } -} diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java index a1a38359..86f907d8 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java @@ -1,53 +1,94 @@ package com.linkedin.hoptimator.operator.subscription; -import com.linkedin.hoptimator.catalog.HopTable; -import com.linkedin.hoptimator.catalog.Resource; -import com.linkedin.hoptimator.k8s.models.V1alpha1Subscription; -import com.linkedin.hoptimator.k8s.models.V1alpha1SubscriptionSpec; -import com.linkedin.hoptimator.k8s.models.V1alpha1SubscriptionStatus; -import com.linkedin.hoptimator.operator.Operator; -import com.linkedin.hoptimator.planner.HoptimatorPlanner; -import com.linkedin.hoptimator.planner.Pipeline; -import com.linkedin.hoptimator.planner.PipelineRel; -import io.kubernetes.client.common.KubernetesObject; -import io.kubernetes.client.extended.controller.Controller; -import io.kubernetes.client.extended.controller.builder.ControllerBuilder; -import io.kubernetes.client.extended.controller.reconciler.Reconciler; -import io.kubernetes.client.extended.controller.reconciler.Request; -import io.kubernetes.client.extended.controller.reconciler.Result; -import io.kubernetes.client.util.generic.KubernetesApiResponse; -import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesObject; -import io.kubernetes.client.util.generic.dynamic.Dynamics; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import java.sql.SQLException; +import java.time.Duration; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.Predicate; -import java.util.stream.Collectors; -import java.util.stream.Stream; +import java.util.Properties; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import io.kubernetes.client.extended.controller.Controller; +import io.kubernetes.client.extended.controller.builder.ControllerBuilder; +import io.kubernetes.client.extended.controller.reconciler.Reconciler; +import io.kubernetes.client.extended.controller.reconciler.Request; +import io.kubernetes.client.extended.controller.reconciler.Result; +import io.kubernetes.client.openapi.models.V1ObjectMeta; + +import com.linkedin.hoptimator.Pipeline; +import com.linkedin.hoptimator.Source; +import com.linkedin.hoptimator.SqlDialect; +import com.linkedin.hoptimator.k8s.K8sApi; +import com.linkedin.hoptimator.k8s.K8sApiEndpoints; +import com.linkedin.hoptimator.k8s.K8sContext; +import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineStatus; +import com.linkedin.hoptimator.k8s.models.V1alpha1Subscription; +import com.linkedin.hoptimator.k8s.models.V1alpha1SubscriptionList; +import com.linkedin.hoptimator.k8s.models.V1alpha1SubscriptionSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1SubscriptionStatus; +import com.linkedin.hoptimator.util.DeploymentService; +import com.linkedin.hoptimator.util.planner.PipelineRel; + + +/** + * Reconciles Subscription CRs by planning SQL pipelines and deploying them + * as Pipeline CRs with associated K8s resources. + * + *

This uses the new planner infrastructure (DeploymentService/PipelineRel) + * rather than the legacy HoptimatorPlanner, providing a migration path for + * existing Subscription-based workloads. + */ public final class SubscriptionReconciler implements Reconciler { private static final Logger log = LoggerFactory.getLogger(SubscriptionReconciler.class); - private static final String SUBSCRIPTION = "hoptimator.linkedin.com/v1alpha1/Subscription"; - - private final Operator operator; - private final HoptimatorPlanner.Factory plannerFactory; - private final Resource.Environment environment; - private final Predicate filter; - - SubscriptionReconciler(Operator operator, HoptimatorPlanner.Factory plannerFactory, - Resource.Environment environment, Predicate filter) { - this.operator = operator; - this.plannerFactory = plannerFactory; - this.environment = environment; - this.filter = filter; + + private final K8sContext context; + private final K8sApi subscriptionApi; + private final K8sApi pipelineApi; + private final Planner planner; + + /** + * Encapsulates the SQL planning step so it can be mocked in tests. + */ + public interface Planner { + PlanResult plan(String sql, String database, String name, Map hints) throws Exception; + } + + /** + * The result of planning a Subscription's SQL. + */ + public static class PlanResult { + final String pipelineSql; + final List pipelineSpecs; + + PlanResult(String pipelineSql, List pipelineSpecs) { + this.pipelineSql = pipelineSql; + this.pipelineSpecs = pipelineSpecs; + } + } + + private SubscriptionReconciler(K8sContext context, Planner planner) { + this(context, + new K8sApi<>(context, K8sApiEndpoints.SUBSCRIPTIONS), + new K8sApi<>(context, K8sApiEndpoints.PIPELINES), + planner); + } + + SubscriptionReconciler(K8sContext context, + K8sApi subscriptionApi, + K8sApi pipelineApi, + Planner planner) { + this.context = context; + this.subscriptionApi = subscriptionApi; + this.pipelineApi = pipelineApi; + this.planner = planner; } @Override @@ -56,24 +97,19 @@ public Result reconcile(Request request) { String name = request.getName(); String namespace = request.getNamespace(); - Result result = new Result(true, operator.pendingRetryDuration()); + Result result = new Result(true, pendingRetryDuration()); try { - V1alpha1Subscription object = operator.fetch(SUBSCRIPTION, namespace, name); - - if (object == null) { - log.info("Object {}/{} deleted. Skipping.", namespace, name); - return new Result(false); - } - - if (filter != null && !filter.test(object)) { - log.info("Object {}/{} filtered. Skipping.", namespace, name); - return new Result(false); + V1alpha1Subscription object; + try { + object = subscriptionApi.get(namespace, name); + } catch (SQLException e) { + if (e.getErrorCode() == 404) { + log.info("Object {}/{} deleted. Skipping.", namespace, name); + return new Result(false); + } + throw e; } - String kind = object.getKind(); - - Objects.requireNonNull(object.getMetadata()).setNamespace(namespace); - V1alpha1SubscriptionStatus status = object.getStatus(); if (status == null) { status = new V1alpha1SubscriptionStatus(); @@ -84,263 +120,179 @@ public Result reconcile(Request request) { object.getSpec().setHints(new HashMap<>()); } - if (status.getJobResources() == null) { - status.setJobResources(Collections.emptyList()); - } - - if (status.getDownstreamResources() == null) { - status.setDownstreamResources(Collections.emptyList()); - } - - // We deploy in three phases: - // 1. Plan a pipeline, and write the plan to Status. - // 2. Deploy the pipeline per plan. - // 3. Verify readiness of the entire pipeline. - // Each phase should be a separate reconciliation loop to avoid races. - // TODO: We should disown orphaned resources when the pipeline changes. if (diverged(object.getSpec(), status)) { - // Phase 1 - log.info("Planning a new pipeline for {}/{} with SQL `{}`...", kind, name, object.getSpec().getSql()); + // Phase 1: Plan the pipeline + log.info("Planning a new pipeline for Subscription/{} with SQL `{}`...", name, object.getSpec().getSql()); try { - Pipeline pipeline = pipeline(object); - Resource.Environment subEnv = new SubscriptionEnvironment(namespace, name, pipeline).orElse(environment); - Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(subEnv); - - // For sink resources, also expose hints. - Resource.TemplateFactory sinkTemplateFactory = new Resource.SimpleTemplateFactory( - subEnv.orElse(new Resource.SimpleEnvironment(map(object.getSpec().getHints())))); - - // Render resources related to all source tables. - List upstreamResources = pipeline.upstreamResources() - .stream() - .flatMap(x -> x.render(templateFactory).stream()) - .collect(Collectors.toList()); - - // Render the SQL job - Collection sqlJob = pipeline.sqlJob().render(templateFactory); - - // Render resources related to the sink table. For these resources, we pass along any - // "hints" as part of the environment. - List downstreamResources = pipeline.downstreamResources() - .stream() - .flatMap(x -> x.render(sinkTemplateFactory).stream()) - .collect(Collectors.toList()); - - List combined = new ArrayList<>(); - combined.addAll(upstreamResources); - combined.addAll(sqlJob); - combined.addAll(downstreamResources); - - status.setResources(combined); - status.setJobResources(new ArrayList<>(sqlJob)); - status.setDownstreamResources(downstreamResources); + PlanResult plan = planner.plan( + object.getSpec().getSql(), + object.getSpec().getDatabase(), + name, + object.getSpec().getHints()); status.setSql(object.getSpec().getSql()); status.setHints(object.getSpec().getHints()); - status.setReady(null); // null indicates that pipeline needs to be deployed - status.setFailed(null); + status.setResources(plan.pipelineSpecs); + status.setReady(false); + status.setFailed(false); status.setMessage("Planned."); } catch (Exception e) { - // SqlValidatorException and ValidationException indicate bad user SQL (client error). - // Log at WARN level to avoid false server-error alerts. - if (isSqlValidationError(e)) { - log.warn("SQL validation error when planning a pipeline for {}/{} with SQL `{}`: {}", kind, name, - object.getSpec().getSql(), e.getMessage()); - } else { - log.error("Encountered error when planning a pipeline for {}/{} with SQL `{}`.", kind, name, - object.getSpec().getSql(), e); - } - - // Mark the Subscription as failed. + log.error("Encountered error when planning a pipeline for Subscription/{} with SQL `{}`.", name, + object.getSpec().getSql(), e); status.setFailed(true); status.setMessage("Error: " + e.getMessage()); - result = new Result(true, operator.failureRetryDuration()); + result = new Result(true, failureRetryDuration()); } - } else if (status.getReady() == null && status.getResources() != null) { - // Phase 2 - log.info("Deploying pipeline for {}/{}...", kind, name); - - boolean deployed = status.getResources().stream().allMatch(x -> apply(x, object)); - - if (deployed) { - status.setReady(false); - status.setFailed(false); - status.setMessage("Deployed."); + } else if (!Boolean.TRUE.equals(status.getReady())) { + // Phase 2/3: Deploy and monitor the Pipeline + log.info("Checking status of pipeline for Subscription/{}...", name); + + V1alpha1Pipeline pipeline = pipelineApi.getIfExists(namespace, name); + if (pipeline == null) { + // Phase 2: Deploy the Pipeline CR + log.info("Deploying pipeline for Subscription/{}...", name); + try { + deployPipeline(object); + status.setReady(false); + status.setFailed(false); + status.setMessage("Deployed."); + } catch (Exception e) { + log.error("Encountered error deploying pipeline for Subscription/{}.", name, e); + status.setFailed(true); + status.setMessage("Error: " + e.getMessage()); + result = new Result(true, failureRetryDuration()); + } } else { - return new Result(true, operator.failureRetryDuration()); + // Phase 3: Check Pipeline readiness + V1alpha1PipelineStatus pipelineStatus = pipeline.getStatus(); + if (pipelineStatus != null && Boolean.TRUE.equals(pipelineStatus.getReady())) { + status.setReady(true); + status.setFailed(false); + status.setMessage("Ready."); + log.info("Subscription/{} is ready.", name); + result = new Result(false); + } else if (pipelineStatus != null && Boolean.TRUE.equals(pipelineStatus.getFailed())) { + status.setReady(false); + status.setFailed(true); + status.setMessage("Pipeline failed: " + pipelineStatus.getMessage()); + log.info("Pipeline for Subscription/{} failed.", name); + } else { + status.setReady(false); + status.setFailed(false); + status.setMessage("Deployed."); + log.info("Pipeline for Subscription/{} is NOT ready.", name); + } } } else { - log.info("Checking status of pipeline for {}/{}...", kind, name); - - boolean ready = Objects.requireNonNull(status.getResources()).stream().allMatch(operator::isReady); - - if (ready) { - status.setReady(true); - status.setFailed(false); - status.setMessage("Ready."); - log.info("{}/{} is ready.", kind, name); - result = new Result(false); - } else { - status.setReady(false); - status.setFailed(false); - status.setMessage("Deployed."); - log.info("Pipeline for {}/{} is NOT ready.", kind, name); - } + // Already ready — no-op + result = new Result(false); } - status.setAttributes(Stream.concat(status.getJobResources().stream(), status.getDownstreamResources().stream()) - .map(this::fetchAttributes) - .flatMap(x -> x.entrySet().stream()) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (x, y) -> y))); - - operator.apiFor(SUBSCRIPTION) - .updateStatus(object, x -> object.getStatus()) - .onFailure((x, y) -> log.error("Failed to update status of {}/{}: {}.", kind, name, y.getMessage())); + subscriptionApi.updateStatus(object, object.getStatus()); } catch (Exception e) { log.error("Encountered exception while reconciling Subscription {}/{}", namespace, name, e); - return new Result(true, operator.failureRetryDuration()); + return new Result(true, failureRetryDuration()); } return result; } - private boolean apply(String yaml, KubernetesObject owner) { - try { - operator.apply(yaml, owner); - } catch (Exception e) { - log.error("Failed to apply {}.", yaml, e); - return false; + private void deployPipeline(V1alpha1Subscription object) throws SQLException { + String name = Objects.requireNonNull(object.getMetadata()).getName(); + String namespace = object.getMetadata().getNamespace(); + List specs = object.getStatus().getResources(); + if (specs == null || specs.isEmpty()) { + throw new SQLException("No pipeline specs to deploy for Subscription/" + name); } - return true; - } - private Pipeline pipeline(V1alpha1Subscription object) throws Exception { - String name = Objects.requireNonNull(object.getMetadata()).getName(); - String sql = Objects.requireNonNull(object.getSpec()).getSql(); - String database = object.getSpec().getDatabase(); - HoptimatorPlanner planner = plannerFactory.makePlanner(); - PipelineRel plan = planner.pipeline(sql); - PipelineRel.Implementor impl = new PipelineRel.Implementor(plan); - - // Create an output/sink table using the subscription name, and add it to the pipeline. - HopTable sink = planner.database(database).makeTable(name, impl.rowType()); - log.info("Implementing sink table {}.{} with {} resources.", database, name, sink.writeResources().size()); - impl.implement(sink); - - return impl.pipeline(sink); + String sql = object.getStatus().getSql(); + + V1alpha1Pipeline pipeline = new V1alpha1Pipeline() + .kind(K8sApiEndpoints.PIPELINES.kind()) + .apiVersion(K8sApiEndpoints.PIPELINES.apiVersion()) + .metadata(new V1ObjectMeta().name(name).namespace(namespace)) + .spec(new V1alpha1PipelineSpec() + .sql(sql) + .yaml(String.join("\n---\n", specs))); + + pipelineApi.update(pipeline); } - // Whether status has diverged from spec (i.e. we need to re-plan the pipeline) private static boolean diverged(V1alpha1SubscriptionSpec spec, V1alpha1SubscriptionStatus status) { - return status.getSql() == null || !status.getSql().equals(spec.getSql()) || status.getHints() == null - || !status.getHints().equals(spec.getHints()); + return status.getSql() == null || !status.getSql().equals(spec.getSql()) + || status.getHints() == null || !status.getHints().equals(spec.getHints()); } - // Fetch attributes from downstream controllers - private Map fetchAttributes(String yaml) { - DynamicKubernetesObject obj; - try { - obj = Dynamics.newFromYaml(yaml); - } catch (Exception e) { - log.warn("Failed to parse YAML in fetchAttributes: {}", e.getMessage()); - return Collections.emptyMap(); - } - if (obj.getMetadata() == null) { - log.warn("Failed to fetch attributes: parsed YAML has null metadata."); - return Collections.emptyMap(); - } - String namespace = obj.getMetadata().getNamespace(); - String name = obj.getMetadata().getName(); - String kind = obj.getKind(); - try { - KubernetesApiResponse existing = operator.apiFor(obj).get(namespace, name); - existing.onFailure((code, status) -> log.info("Failed to fetch {}/{}: {}.", kind, name, status.getMessage())); - if (!existing.isSuccess()) { - return Collections.emptyMap(); - } else { - return guessAttributes(existing.getObject()); - } - } catch (Exception e) { - return Collections.emptyMap(); - } + // TODO load from configuration + Duration failureRetryDuration() { + return Duration.ofMinutes(5); } - private static Map guessAttributes(DynamicKubernetesObject obj) { - // We make a best effort to guess the attributes of the dynamic object. - if (obj == null || obj.getRaw() == null) { - return Collections.emptyMap(); - } - try { - return obj.getRaw() - .get("status") - .getAsJsonObject() - .get("attributes") - .getAsJsonObject() - .entrySet() - .stream() - .filter(x -> x.getValue().isJsonPrimitive()) - .collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().getAsString())); - } catch (Exception e) { - log.debug("Exception looking for .status.attributes. Swallowing.", e); - } - try { - return obj.getRaw() - .get("status") - .getAsJsonObject() - .get("jobStatus") - .getAsJsonObject() - .entrySet() - .stream() - .filter(x -> x.getValue().isJsonPrimitive()) - .collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().getAsString())); - } catch (Exception e) { - log.debug("Exception looking for .status.jobStatus. Swallowing.", e); - } - try { - return obj.getRaw() - .get("status") - .getAsJsonObject() - .entrySet() - .stream() - .filter(x -> x.getValue().isJsonPrimitive()) - .collect(Collectors.toMap(Map.Entry::getKey, x -> x.getValue().getAsString())); - } catch (Exception e) { - log.debug("Exception looking for .status. Swallowing.", e); - } - return Collections.emptyMap(); + // TODO load from configuration + Duration pendingRetryDuration() { + return Duration.ofMinutes(1); } - public static Controller controller(Operator operator, HoptimatorPlanner.Factory plannerFactory, - Resource.Environment environment, Predicate filter) { - Reconciler reconciler = new SubscriptionReconciler(operator, plannerFactory, environment, filter); - return ControllerBuilder.defaultBuilder(operator.informerFactory()) + /** + * Creates a Planner that uses DeploymentService and a JDBC connection for SQL planning. + */ + public static Planner jdbcPlanner(java.sql.Connection connection) { + return (sql, database, name, hints) -> { + com.linkedin.hoptimator.jdbc.HoptimatorConnection hoptConn = + (com.linkedin.hoptimator.jdbc.HoptimatorConnection) connection; + + // Set up connection properties with hints + Properties props = new Properties(); + props.putAll(hoptConn.connectionProperties()); + if (hints != null && !hints.isEmpty()) { + String hintsStr = hints.entrySet().stream() + .map(e -> java.net.URLEncoder.encode(e.getKey(), java.nio.charset.StandardCharsets.UTF_8) + + "=" + java.net.URLEncoder.encode(e.getValue(), java.nio.charset.StandardCharsets.UTF_8)) + .collect(java.util.stream.Collectors.joining(",")); + props.put("hints", hintsStr); + } + String pipelineName = database + "-" + name; + props.setProperty(DeploymentService.PIPELINE_OPTION, pipelineName); + + // Plan the SQL query + org.apache.calcite.jdbc.CalcitePrepare.Context calciteContext = hoptConn.createPrepareContext(); + org.apache.calcite.rel.RelRoot root = + new com.linkedin.hoptimator.jdbc.HoptimatorDriver.Prepare(hoptConn).convert(calciteContext, sql).root; + PipelineRel.Implementor plan = DeploymentService.plan(root, hoptConn.materializations(), props); + + // Set the sink based on the database and subscription name + org.apache.calcite.jdbc.CalciteSchema rootSchema = calciteContext.getRootSchema(); + org.apache.calcite.jdbc.CalciteSchema dbSchema = rootSchema.getSubSchema(database, true); + if (dbSchema == null || !(dbSchema.schema instanceof com.linkedin.hoptimator.Database)) { + throw new SQLException(database + " is not a physical database."); + } + String dbName = ((com.linkedin.hoptimator.Database) dbSchema.schema).databaseName(); + List sinkPath = new ArrayList<>(dbSchema.path(null)); + sinkPath.add(name); + plan.setSink(dbName, sinkPath, root.rel.getRowType(), Collections.emptyMap()); + + // Build Pipeline and collect specs + Pipeline pipeline = plan.pipeline(name, connection); + + List specs = new ArrayList<>(); + for (Source source : pipeline.sources()) { + specs.addAll(DeploymentService.specify(source, connection)); + } + specs.addAll(DeploymentService.specify(pipeline.sink(), connection)); + specs.addAll(DeploymentService.specify(pipeline.job(), connection)); + + String pipelineSql = pipeline.job().sql().apply(SqlDialect.ANSI); + return new PlanResult(pipelineSql, specs); + }; + } + + public static Controller controller(K8sContext context, Planner planner) { + Reconciler reconciler = new SubscriptionReconciler(context, planner); + return ControllerBuilder.defaultBuilder(context.informerFactory()) .withReconciler(reconciler) .withName("subscription-controller") .withWorkerCount(1) .watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1Subscription.class, x).build()) .build(); } - - private static Map map(Map m) { - if (m == null) { - return Collections.emptyMap(); - } else { - return m; - } - } - - // Checks whether an exception represents a SQL validation error (i.e., bad user SQL), - // as opposed to a server-side infrastructure failure. - static boolean isSqlValidationError(Throwable e) { - Throwable x = e; - while (x != null) { - String className = x.getClass().getName(); - if (className.contains("SqlValidatorException") || className.contains("ValidationException")) { - return true; - } - x = x.getCause(); - } - return false; - } } - diff --git a/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironmentTest.java b/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironmentTest.java deleted file mode 100644 index 5537f370..00000000 --- a/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/subscription/SubscriptionEnvironmentTest.java +++ /dev/null @@ -1,70 +0,0 @@ -package com.linkedin.hoptimator.operator.subscription; - -import com.linkedin.hoptimator.planner.Pipeline; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.sql.type.SqlTypeFactoryImpl; -import org.apache.calcite.sql.type.SqlTypeName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.mockito.Mockito.when; - - -@ExtendWith(MockitoExtension.class) -class SubscriptionEnvironmentTest { - - @Mock - private Pipeline mockPipeline; - - private RelDataType buildSimpleRowType() { - SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); - return typeFactory.builder().add("col", SqlTypeName.VARCHAR).build(); - } - - @Test - void exportsPipelineNamespace() { - when(mockPipeline.outputType()).thenReturn(buildSimpleRowType()); - SubscriptionEnvironment env = new SubscriptionEnvironment("test-ns", "my-sub", mockPipeline); - assertEquals("test-ns", env.getOrDefault("pipeline.namespace", () -> "missing")); - } - - @Test - void exportsPipelineName() { - when(mockPipeline.outputType()).thenReturn(buildSimpleRowType()); - SubscriptionEnvironment env = new SubscriptionEnvironment("ns", "my-subscription", mockPipeline); - assertEquals("my-subscription", env.getOrDefault("pipeline.name", () -> "missing")); - } - - @Test - void exportsPipelineAvroSchema() { - when(mockPipeline.outputType()).thenReturn(buildSimpleRowType()); - SubscriptionEnvironment env = new SubscriptionEnvironment("ns", "my-sub", mockPipeline); - String avroSchema = env.getOrDefault("pipeline.avroSchema", () -> "missing"); - assertNotNull(avroSchema); - // Avro schema should be a JSON string containing the record name - assertEquals(true, avroSchema.contains("OutputRecord") || avroSchema.contains("string")); - } - - @Test - void avroSchemaContainsFieldName() { - when(mockPipeline.outputType()).thenReturn(buildSimpleRowType()); - SubscriptionEnvironment env = new SubscriptionEnvironment("ns", "my-sub", mockPipeline); - String avroSchema = env.getOrDefault("pipeline.avroSchema", () -> "missing"); - // The field 'col' should appear in the Avro schema JSON - assertEquals(true, avroSchema.contains("col")); - } - - @Test - void allThreePropertiesPresent() { - when(mockPipeline.outputType()).thenReturn(buildSimpleRowType()); - SubscriptionEnvironment env = new SubscriptionEnvironment("prod-ns", "sub-name", mockPipeline); - assertEquals("prod-ns", env.getOrDefault("pipeline.namespace", () -> "missing")); - assertEquals("sub-name", env.getOrDefault("pipeline.name", () -> "missing")); - assertNotNull(env.getOrDefault("pipeline.avroSchema", () -> null)); - } -} diff --git a/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconcilerTest.java b/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconcilerTest.java index edafbf25..df129503 100644 --- a/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconcilerTest.java +++ b/hoptimator-operator/src/test/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconcilerTest.java @@ -1,102 +1,40 @@ package com.linkedin.hoptimator.operator.subscription; -import com.google.gson.JsonObject; -import com.google.gson.JsonPrimitive; -import com.linkedin.hoptimator.catalog.Database; -import com.linkedin.hoptimator.catalog.HopTable; -import com.linkedin.hoptimator.catalog.Resource; +import com.linkedin.hoptimator.k8s.FakeK8sApi; +import com.linkedin.hoptimator.k8s.models.V1alpha1Pipeline; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineSpec; +import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineStatus; import com.linkedin.hoptimator.k8s.models.V1alpha1Subscription; +import com.linkedin.hoptimator.k8s.models.V1alpha1SubscriptionList; import com.linkedin.hoptimator.k8s.models.V1alpha1SubscriptionSpec; import com.linkedin.hoptimator.k8s.models.V1alpha1SubscriptionStatus; -import com.linkedin.hoptimator.operator.Operator; -import com.linkedin.hoptimator.planner.HoptimatorPlanner; -import com.linkedin.hoptimator.planner.PipelineRel; -import io.kubernetes.client.extended.controller.Controller; import io.kubernetes.client.extended.controller.reconciler.Request; import io.kubernetes.client.extended.controller.reconciler.Result; -import io.kubernetes.client.informer.SharedIndexInformer; -import io.kubernetes.client.informer.SharedInformerFactory; -import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.models.V1ObjectMeta; -import io.kubernetes.client.util.generic.GenericKubernetesApi; -import io.kubernetes.client.util.generic.KubernetesApiResponse; -import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi; -import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesObject; -import org.apache.calcite.rel.type.RelDataType; -import org.apache.calcite.rel.type.RelDataTypeSystem; -import org.apache.calcite.sql.type.SqlTypeFactoryImpl; -import org.apache.calcite.sql.type.SqlTypeName; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.extension.ExtendWith; -import org.mockito.Mock; -import org.mockito.junit.jupiter.MockitoExtension; import java.time.Duration; +import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.function.Predicate; +import java.util.List; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.doThrow; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.lenient; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; -@ExtendWith(MockitoExtension.class) class SubscriptionReconcilerTest { - @Mock - private Operator operator; - - @Mock - private HoptimatorPlanner.Factory plannerFactory; - - @Mock - private HoptimatorPlanner mockPlanner; - - @Mock - private PipelineRel mockPlan; - - @Mock - private Database mockDatabase; - - @Mock - private GenericKubernetesApi mockSubscriptionApi; - - @Mock - private KubernetesApiResponse mockUpdateStatusResponse; - + private List subscriptions; + private List pipelines; private SubscriptionReconciler reconciler; - - // YAML that triggers ClassCastException in snakeyaml Composer - private static final String CLASS_CAST_EXCEPTION_YAML = "!!java.util.Date 2021-01-01"; - // YAML that may trigger ScannerException or parse as object with null metadata - private static final String SCANNER_EXCEPTION_YAML = "key: value: with: colons: everywhere:"; - // Valid YAML with no metadata — triggers null-metadata guard in fetchAttributes - private static final String NO_METADATA_YAML = "apiVersion: v1\nkind: ConfigMap"; - - private static SubscriptionReconciler createReconciler(Operator operator, - HoptimatorPlanner.Factory plannerFactory, - Resource.Environment environment, - Predicate filter) { - return new SubscriptionReconciler(operator, plannerFactory, environment, filter); - } - - private RelDataType buildSimpleRowType() { - SqlTypeFactoryImpl typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); - return typeFactory.builder().add("col", SqlTypeName.VARCHAR).build(); - } + private SubscriptionReconciler.Planner successPlanner; + private SubscriptionReconciler.Planner failPlanner; private V1alpha1Subscription buildSubscription(String ns, String name, String sql) { V1alpha1Subscription sub = new V1alpha1Subscription(); @@ -107,816 +45,290 @@ private V1alpha1Subscription buildSubscription(String ns, String name, String sq @BeforeEach void setUp() { - lenient().when(operator.failureRetryDuration()).thenReturn(Duration.ofMinutes(5)); - lenient().when(operator.pendingRetryDuration()).thenReturn(Duration.ofMinutes(1)); - reconciler = createReconciler(operator, plannerFactory, Resource.Environment.EMPTY, null); - } + subscriptions = new ArrayList<>(); + pipelines = new ArrayList<>(); + + successPlanner = (sql, database, name, hints) -> + new SubscriptionReconciler.PlanResult("INSERT INTO ...", List.of( + "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: " + name + "\n namespace: ns")); - // ── Helper: stub the apiFor(SUBSCRIPTION).updateStatus(...).onFailure(...) chain - @SuppressWarnings("unchecked") - private void stubUpdateStatus() throws ApiException { - when(operator.apiFor(anyString())).thenReturn((GenericKubernetesApi) mockSubscriptionApi); - when(mockSubscriptionApi.updateStatus(any(), any())).thenReturn(mockUpdateStatusResponse); - when(mockUpdateStatusResponse.onFailure(any())).thenReturn(mockUpdateStatusResponse); + failPlanner = (sql, database, name, hints) -> { + throw new RuntimeException("planner unavailable"); + }; + + reconciler = new SubscriptionReconciler(null, + new FakeK8sApi<>(subscriptions), + new FakeK8sApi<>(pipelines), + successPlanner); } // Deleted object @Test void deletedObjectDoesNotRequeue() { - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(null); Result result = reconciler.reconcile(new Request("ns", "missing-sub")); assertFalse(result.isRequeue()); } - // Filtered object + // Phase 1 – planning succeeds, status updated @Test - void filteredObjectDoesNotRequeue() { + void phase1PlansSuccessfully() { V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - SubscriptionReconciler filteredReconciler = - createReconciler(operator, plannerFactory, Resource.Environment.EMPTY, s -> false); - Result result = filteredReconciler.reconcile(new Request("ns", "my-sub")); - assertFalse(result.isRequeue()); + subscriptions.add(sub); + + reconciler.reconcile(new Request("ns", "my-sub")); + + V1alpha1SubscriptionStatus status = sub.getStatus(); + assertNotNull(status); + assertEquals("SELECT 1", status.getSql()); + assertFalse(status.getReady()); + assertFalse(status.getFailed()); + assertEquals("Planned.", status.getMessage()); + assertNotNull(status.getResources()); + assertFalse(status.getResources().isEmpty()); } // Phase 1 – planning error marks subscription failed @Test - void planningErrorMarksFailed() throws Exception { + void planningErrorMarksFailed() { V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(plannerFactory.makePlanner()).thenThrow(new RuntimeException("planner unavailable")); - stubUpdateStatus(); + subscriptions.add(sub); - Result result = reconciler.reconcile(new Request("ns", "my-sub")); + SubscriptionReconciler failReconciler = new SubscriptionReconciler(null, + new FakeK8sApi<>(subscriptions), + new FakeK8sApi<>(pipelines), + failPlanner); + + Result result = failReconciler.reconcile(new Request("ns", "my-sub")); assertTrue(result.isRequeue()); assertTrue(sub.getStatus().getFailed()); + assertTrue(sub.getStatus().getMessage().startsWith("Error:")); } - // Phase 1 – diverged: plannerFactory.makePlanner() is called when sql==null + // Phase 2 – deploy Pipeline CR when it doesn't exist yet @Test - void phase1AttemptsPlanningWhenDiverged() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - - // Planner returns a plan; Implementor will attempt SQL generation and fail - // (mock RelNode lacks a Calcite cluster). The catch block captures this. - when(plannerFactory.makePlanner()).thenReturn(mockPlanner); - when(mockPlanner.pipeline(anyString())).thenReturn(mockPlan); - when(mockPlan.getInputs()).thenReturn(Collections.emptyList()); - RelDataType rowType = buildSimpleRowType(); - when(mockPlan.getRowType()).thenReturn(rowType); - when(mockPlanner.database(anyString())).thenReturn(mockDatabase); - HopTable mockSink = new HopTable("TESTDB", "my-sub", rowType, - Collections.emptyList(), Collections.emptyList(), new HashMap<>()); - when(mockDatabase.makeTable(anyString(), any())).thenReturn(mockSink); - - stubUpdateStatus(); - - reconciler.reconcile(new Request("ns", "my-sub")); - - // plannerFactory.makePlanner() must have been called (phase 1 was entered) - verify(plannerFactory).makePlanner(); - // status.failed is set when planning throws (SQL generation needs a real cluster) - assertNotNull(sub.getStatus().getFailed()); - } - - // Phase 2 – deploy: ready==null and resources present → apply resources - @Test - void phase2DeploysResourcesWhenReadyIsNull() throws Exception { + void phase2DeploysPipelineWhenNotExists() { V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); status.setSql("SELECT 1"); status.setHints(new HashMap<>()); - status.setReady(null); - status.setResources(Collections.singletonList( - "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n")); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); + status.setReady(false); + status.setFailed(false); + status.setResources(List.of("apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns")); sub.setStatus(status); + subscriptions.add(sub); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - stubUpdateStatus(); - + // No pipeline exists → should deploy reconciler.reconcile(new Request("ns", "my-sub")); assertFalse(sub.getStatus().getReady()); assertFalse(sub.getStatus().getFailed()); + assertEquals("Deployed.", sub.getStatus().getMessage()); } - // Phase 3 – all resources ready → ready=true, no requeue + // Phase 3 – Pipeline is ready → Subscription becomes ready @Test - void phase3ReadyWhenAllResourcesReady() throws Exception { + void phase3ReadyWhenPipelineReady() { V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); status.setSql("SELECT 1"); status.setHints(new HashMap<>()); status.setReady(false); - String yaml = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); + status.setResources(List.of("kind: ConfigMap")); sub.setStatus(status); + subscriptions.add(sub); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(true); - stubUpdateStatus(); + V1alpha1Pipeline pipeline = new V1alpha1Pipeline() + .metadata(new V1ObjectMeta().name("my-sub").namespace("ns")) + .spec(new V1alpha1PipelineSpec().yaml("kind: Job\nmetadata:\n name: job1")) + .status(new V1alpha1PipelineStatus().ready(true).failed(false).message("Ready.")); + pipelines.add(pipeline); Result result = reconciler.reconcile(new Request("ns", "my-sub")); assertFalse(result.isRequeue()); assertTrue(sub.getStatus().getReady()); + assertFalse(sub.getStatus().getFailed()); + assertEquals("Ready.", sub.getStatus().getMessage()); } - // Phase 3 – resource not ready → requeue + // Phase 3 – Pipeline failed → Subscription fails @Test - void phase3RequeuedWhenResourceNotReady() throws Exception { + void phase3FailedWhenPipelineFailed() { V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); status.setSql("SELECT 1"); status.setHints(new HashMap<>()); status.setReady(false); - String yaml = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); + status.setResources(List.of("kind: ConfigMap")); sub.setStatus(status); + subscriptions.add(sub); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(false); - stubUpdateStatus(); + V1alpha1Pipeline pipeline = new V1alpha1Pipeline() + .metadata(new V1ObjectMeta().name("my-sub").namespace("ns")) + .spec(new V1alpha1PipelineSpec().yaml("kind: Job\nmetadata:\n name: job1")) + .status(new V1alpha1PipelineStatus().ready(false).failed(true).message("Job crashed.")); + pipelines.add(pipeline); Result result = reconciler.reconcile(new Request("ns", "my-sub")); assertTrue(result.isRequeue()); assertFalse(sub.getStatus().getReady()); - } - - // diverged() – sql null in status triggers phase 1 - @Test - void divergedWhenStatusSqlIsNull() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(plannerFactory.makePlanner()).thenThrow(new RuntimeException("expected diverged path")); - stubUpdateStatus(); - - reconciler.reconcile(new Request("ns", "my-sub")); - assertTrue(sub.getStatus().getFailed()); + assertTrue(sub.getStatus().getMessage().contains("Job crashed.")); } - // diverged() – sql and hints match → not diverged - @Test - void notDivergedWhenSqlAndHintsMatch() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(new HashMap<>()); - status.setReady(true); - String yaml = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - sub.getSpec().setHints(new HashMap<>()); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(true); - stubUpdateStatus(); - - Result result = reconciler.reconcile(new Request("ns", "my-sub")); - - assertFalse(result.isRequeue()); - } - - // hints differ → diverged + // Phase 3 – Pipeline not ready → requeue @Test - void divergedWhenHintsDiffer() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - sub.getSpec().setHints(Collections.singletonMap("key", "value")); - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(new HashMap<>()); - status.setReady(true); - status.setResources(Collections.emptyList()); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(plannerFactory.makePlanner()).thenThrow(new RuntimeException("diverged path hit")); - stubUpdateStatus(); - - reconciler.reconcile(new Request("ns", "my-sub")); - - assertTrue(sub.getStatus().getFailed()); - } - - // .status.attributes field - @Test - @SuppressWarnings("unchecked") - void fetchAttributesFromStatusAttributesField() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(new HashMap<>()); - status.setReady(false); - - JsonObject attributes = new JsonObject(); - attributes.add("topicName", new JsonPrimitive("my-topic")); - JsonObject statusObj = new JsonObject(); - statusObj.add("attributes", attributes); - statusObj.add("ready", new JsonPrimitive(false)); - JsonObject raw = new JsonObject(); - raw.add("status", statusObj); - raw.add("apiVersion", new JsonPrimitive("v1")); - raw.add("kind", new JsonPrimitive("ConfigMap")); - JsonObject metadata = new JsonObject(); - metadata.add("name", new JsonPrimitive("cm1")); - metadata.add("namespace", new JsonPrimitive("ns")); - raw.add("metadata", metadata); - - String yaml = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\nstatus:\n attributes:\n topicName: my-topic\n ready: false\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.singletonList(yaml)); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(false); - - DynamicKubernetesApi mockDynApi = mock(DynamicKubernetesApi.class); - when(operator.apiFor(any(DynamicKubernetesObject.class))).thenReturn(mockDynApi); - KubernetesApiResponse mockResp = mock(KubernetesApiResponse.class); - when(mockDynApi.get(anyString(), anyString())).thenReturn(mockResp); - when(mockResp.onFailure(any())).thenReturn(mockResp); - when(mockResp.isSuccess()).thenReturn(true); - DynamicKubernetesObject dynObj = new DynamicKubernetesObject(raw); - dynObj.setMetadata(new V1ObjectMeta().name("cm1").namespace("ns")); - when(mockResp.getObject()).thenReturn(dynObj); - - stubUpdateStatus(); - - reconciler.reconcile(new Request("ns", "my-sub")); - - assertNotNull(sub.getStatus().getAttributes()); - assertTrue(sub.getStatus().getAttributes().containsKey("topicName")); - } - - // .status.jobStatus field - @Test - @SuppressWarnings("unchecked") - void fetchAttributesFromStatusJobStatusField() throws Exception { + void phase3RequeuesWhenPipelineNotReady() { V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); status.setSql("SELECT 1"); status.setHints(new HashMap<>()); status.setReady(false); - - JsonObject jobStatus = new JsonObject(); - jobStatus.add("jobId", new JsonPrimitive("job-123")); - JsonObject statusObj = new JsonObject(); - statusObj.add("jobStatus", jobStatus); - JsonObject raw = new JsonObject(); - raw.add("status", statusObj); - raw.add("apiVersion", new JsonPrimitive("v1")); - raw.add("kind", new JsonPrimitive("FlinkDeployment")); - JsonObject metadata = new JsonObject(); - metadata.add("name", new JsonPrimitive("flink-job")); - metadata.add("namespace", new JsonPrimitive("ns")); - raw.add("metadata", metadata); - - String yaml = "apiVersion: v1\nkind: FlinkDeployment\nmetadata:\n name: flink-job\n namespace: ns\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.singletonList(yaml)); - status.setDownstreamResources(Collections.emptyList()); + status.setResources(List.of("kind: ConfigMap")); sub.setStatus(status); + subscriptions.add(sub); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(false); - - DynamicKubernetesApi mockDynApi = mock(DynamicKubernetesApi.class); - when(operator.apiFor(any(DynamicKubernetesObject.class))).thenReturn(mockDynApi); - KubernetesApiResponse mockResp = mock(KubernetesApiResponse.class); - when(mockDynApi.get(anyString(), anyString())).thenReturn(mockResp); - when(mockResp.onFailure(any())).thenReturn(mockResp); - when(mockResp.isSuccess()).thenReturn(true); - DynamicKubernetesObject dynObj = new DynamicKubernetesObject(raw); - dynObj.setMetadata(new V1ObjectMeta().name("flink-job").namespace("ns")); - when(mockResp.getObject()).thenReturn(dynObj); - - stubUpdateStatus(); - - reconciler.reconcile(new Request("ns", "my-sub")); - - assertNotNull(sub.getStatus().getAttributes()); - assertTrue(sub.getStatus().getAttributes().containsKey("jobId")); - } - - // API call fails → empty attributes - @Test - @SuppressWarnings("unchecked") - void fetchAttributesReturnsEmptyWhenApiFails() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(new HashMap<>()); - status.setReady(false); - - String yaml = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.singletonList(yaml)); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(false); - - DynamicKubernetesApi mockDynApi = mock(DynamicKubernetesApi.class); - when(operator.apiFor(any(DynamicKubernetesObject.class))).thenReturn(mockDynApi); - KubernetesApiResponse mockResp = mock(KubernetesApiResponse.class); - when(mockDynApi.get(anyString(), anyString())).thenReturn(mockResp); - when(mockResp.onFailure(any())).thenReturn(mockResp); - when(mockResp.isSuccess()).thenReturn(false); - - stubUpdateStatus(); - - reconciler.reconcile(new Request("ns", "my-sub")); - - assertTrue(sub.getStatus().getAttributes() == null - || sub.getStatus().getAttributes().isEmpty()); - } - - // Returns non-null Controller - @Test - @SuppressWarnings("unchecked") - void controllerReturnsNonNull() { - SharedInformerFactory mockInformerFactory = mock(SharedInformerFactory.class); - SharedIndexInformer mockInformer = mock(SharedIndexInformer.class); - when(mockInformerFactory.getExistingSharedIndexInformer(V1alpha1Subscription.class)) - .thenReturn(mockInformer); - when(operator.informerFactory()).thenReturn(mockInformerFactory); - - Controller controller = SubscriptionReconciler.controller(operator, plannerFactory, - Resource.Environment.EMPTY, null); - assertNotNull(controller); - } - - // Deploy failure (apply throws) → requeue - @Test - void phase2RequeuedWhenApplyThrows() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(new HashMap<>()); - status.setReady(null); - status.setResources(Collections.singletonList( - "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n")); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - doThrow(new ApiException("apply failed")).when(operator).apply(anyString(), any()); + V1alpha1Pipeline pipeline = new V1alpha1Pipeline() + .metadata(new V1ObjectMeta().name("my-sub").namespace("ns")) + .spec(new V1alpha1PipelineSpec().yaml("kind: Job\nmetadata:\n name: job1")) + .status(new V1alpha1PipelineStatus().ready(false).failed(false).message("Deployed.")); + pipelines.add(pipeline); Result result = reconciler.reconcile(new Request("ns", "my-sub")); assertTrue(result.isRequeue()); - } - - // Outer exception handler – fetch throws → requeue - @Test - void outerExceptionHandlerRequeuedWhenFetchThrows() { - when(operator.fetch(anyString(), anyString(), anyString())) - .thenThrow(new RuntimeException("unexpected error")); - - Result result = reconciler.reconcile(new Request("ns", "boom")); - - assertTrue(result.isRequeue()); - } - - // .status direct fields (no attributes/jobStatus) - @Test - @SuppressWarnings("unchecked") - void fetchAttributesFromStatusDirectFields() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(new HashMap<>()); - status.setReady(false); - - // Build yaml where .status has primitive fields (no attributes/jobStatus keys) - JsonObject statusObj = new JsonObject(); - statusObj.add("topicUrl", new JsonPrimitive("kafka://my-topic")); - statusObj.add("ready", new JsonPrimitive(false)); - JsonObject raw = new JsonObject(); - raw.add("status", statusObj); - raw.add("apiVersion", new JsonPrimitive("v1alpha1")); - raw.add("kind", new JsonPrimitive("KafkaTopic")); - JsonObject metadata = new JsonObject(); - metadata.add("name", new JsonPrimitive("topic1")); - metadata.add("namespace", new JsonPrimitive("ns")); - raw.add("metadata", metadata); - - String yaml = "apiVersion: v1alpha1\nkind: KafkaTopic\nmetadata:\n name: topic1\n namespace: ns\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.singletonList(yaml)); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(false); - - DynamicKubernetesApi mockDynApi = mock(DynamicKubernetesApi.class); - when(operator.apiFor(any(DynamicKubernetesObject.class))).thenReturn(mockDynApi); - KubernetesApiResponse mockResp = mock(KubernetesApiResponse.class); - when(mockDynApi.get(anyString(), anyString())).thenReturn(mockResp); - when(mockResp.onFailure(any())).thenReturn(mockResp); - when(mockResp.isSuccess()).thenReturn(true); - DynamicKubernetesObject dynObj = new DynamicKubernetesObject(raw); - dynObj.setMetadata(new V1ObjectMeta().name("topic1").namespace("ns")); - when(mockResp.getObject()).thenReturn(dynObj); - - stubUpdateStatus(); - - reconciler.reconcile(new Request("ns", "my-sub")); - - // .status has topicUrl and ready as primitives → fetched as attributes - assertNotNull(sub.getStatus().getAttributes()); - assertTrue(sub.getStatus().getAttributes().containsKey("topicUrl")); - } - - // operator.apiFor(obj) throws → returns empty - @Test - void fetchAttributesReturnsEmptyWhenApiForThrows() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(new HashMap<>()); - status.setReady(false); - - String yaml = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.singletonList(yaml)); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(false); - when(operator.apiFor(any(DynamicKubernetesObject.class))) - .thenThrow(new RuntimeException("no api registered")); - - stubUpdateStatus(); - - // Should not throw; attributes will be empty - reconciler.reconcile(new Request("ns", "my-sub")); - - assertTrue(sub.getStatus().getAttributes() == null - || sub.getStatus().getAttributes().isEmpty()); - } - - // Verify specific status field values set by setters - @Test - void phase1SetsAllStatusFieldsOnPlanningSuccess() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - sub.getSpec().setHints(Collections.singletonMap("k", "v")); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - - // Let planning succeed fully by providing a complete planner/plan setup - when(plannerFactory.makePlanner()).thenReturn(mockPlanner); - when(mockPlanner.pipeline(anyString())).thenReturn(mockPlan); - when(mockPlan.getInputs()).thenReturn(Collections.emptyList()); - RelDataType rowType = buildSimpleRowType(); - when(mockPlan.getRowType()).thenReturn(rowType); - when(mockPlanner.database(anyString())).thenReturn(mockDatabase); - HopTable mockSink = new HopTable("TESTDB", "my-sub", rowType, - Collections.emptyList(), Collections.emptyList(), new HashMap<>()); - when(mockDatabase.makeTable(anyString(), any())).thenReturn(mockSink); - stubUpdateStatus(); - - reconciler.reconcile(new Request("ns", "my-sub")); - - // Status was either written (planning succeeded) or failed (planning threw). - // Either way status must be non-null and have been assigned. - V1alpha1SubscriptionStatus status = sub.getStatus(); - assertNotNull(status); - // If planning succeeded the SQL is stamped; if it threw, failed=true. - // The important coverage is that the setters are reached. - if (Boolean.TRUE.equals(status.getFailed())) { - // Planning threw an internal exception — failed path assertions - assertTrue(status.getFailed()); - assertNotNull(status.getMessage()); - } else { - // Planning fully succeeded — assert all phase-1 setters fired - assertEquals("SELECT 1", status.getSql()); - assertEquals(Collections.singletonMap("k", "v"), status.getHints()); - assertNotNull(status.getResources()); - assertNull(status.getReady()); - assertNull(status.getFailed()); - assertEquals("Planned.", status.getMessage()); - } - } - - // Phase 1 planning error – verify setFailed(true) and setMessage - @Test - void phase1PlanningErrorSetsFailedAndMessage() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(plannerFactory.makePlanner()).thenThrow(new RuntimeException("boom")); - stubUpdateStatus(); - - reconciler.reconcile(new Request("ns", "my-sub")); - - V1alpha1SubscriptionStatus status = sub.getStatus(); - assertNotNull(status); - assertTrue(status.getFailed()); - assertNotNull(status.getMessage()); - assertTrue(status.getMessage().startsWith("Error:")); - } - - // Phase 2 deployed successfully – verify setReady(false), setFailed(false), setMessage("Deployed.") - @Test - void phase2DeployedSuccessSetsStatusFields() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(new HashMap<>()); - status.setReady(null); - status.setResources(Collections.singletonList( - "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n")); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - stubUpdateStatus(); - - reconciler.reconcile(new Request("ns", "my-sub")); - assertFalse(sub.getStatus().getReady()); assertFalse(sub.getStatus().getFailed()); assertEquals("Deployed.", sub.getStatus().getMessage()); } - // Phase 3 ready – verify setReady(true), setFailed(false), setMessage("Ready.") + // Already ready → no requeue @Test - void phase3ReadySetsStatusFields() throws Exception { + void readySubscriptionDoesNotRequeue() { V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); status.setSql("SELECT 1"); status.setHints(new HashMap<>()); - status.setReady(false); - String yaml = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(true); - stubUpdateStatus(); - - reconciler.reconcile(new Request("ns", "my-sub")); - - assertTrue(sub.getStatus().getReady()); - assertFalse(sub.getStatus().getFailed()); - assertEquals("Ready.", sub.getStatus().getMessage()); - } - - // Phase 3 not-ready – verify setReady(false), setFailed(false), message - @Test - void phase3NotReadySetsStatusFields() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(new HashMap<>()); - status.setReady(false); - String yaml = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); + status.setReady(true); + status.setResources(List.of("kind: ConfigMap")); sub.setStatus(status); + subscriptions.add(sub); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(false); - stubUpdateStatus(); - - reconciler.reconcile(new Request("ns", "my-sub")); + Result result = reconciler.reconcile(new Request("ns", "my-sub")); - assertFalse(sub.getStatus().getReady()); - assertFalse(sub.getStatus().getFailed()); - assertEquals("Deployed.", sub.getStatus().getMessage()); + assertFalse(result.isRequeue()); } - // updateStatus is called after reconciliation + // diverged() – sql null in status triggers phase 1 @Test - void updateStatusIsCalledAfterPhase3() throws Exception { + void divergedWhenStatusSqlIsNull() { V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(new HashMap<>()); - status.setReady(false); - String yaml = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(true); - stubUpdateStatus(); + subscriptions.add(sub); reconciler.reconcile(new Request("ns", "my-sub")); - // Verify that apiFor was called and updateStatus was invoked on the returned api - verify(mockSubscriptionApi).updateStatus(any(), any()); + // Phase 1 was entered; status should have SQL + assertEquals("SELECT 1", sub.getStatus().getSql()); } - // apply() returns false when operator.apply() + // diverged() – sql and hints match → not diverged @Test - void applyReturnsFalseWhenOperatorApplyThrows() throws Exception { + void notDivergedWhenSqlAndHintsMatch() { V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); + sub.getSpec().setHints(new HashMap<>()); V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); status.setSql("SELECT 1"); status.setHints(new HashMap<>()); - status.setReady(null); - String resource = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n"; - status.setResources(Collections.singletonList(resource)); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); + status.setReady(true); + status.setResources(List.of("kind: ConfigMap")); sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - doThrow(new ApiException("apply failed")).when(operator).apply(anyString(), any()); + subscriptions.add(sub); Result result = reconciler.reconcile(new Request("ns", "my-sub")); - // When apply() returns false (because operator.apply threw), Phase 2 returns - // a failure-retry result — so the status should NOT have been set to deployed. - assertTrue(result.isRequeue()); - // ready is still null (never moved past Phase 2 partially) - assertNull(sub.getStatus().getReady()); + // Not diverged, already ready → no requeue + assertFalse(result.isRequeue()); } - // When only hints change (SQL same), re-plan is triggered + // hints differ → diverged → re-plan @Test - void divergedWhenOnlyHintsChange() throws Exception { + void divergedWhenHintsDiffer() { V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - sub.getSpec().setHints(Collections.singletonMap("newKey", "newVal")); - + sub.getSpec().setHints(Collections.singletonMap("key", "value")); V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); // SQL matches - // hints are EMPTY in status but spec has {newKey=newVal} → diverged + status.setSql("SELECT 1"); status.setHints(new HashMap<>()); status.setReady(true); - status.setResources(Collections.emptyList()); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); + status.setResources(List.of("kind: ConfigMap")); sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(plannerFactory.makePlanner()).thenThrow(new RuntimeException("diverged path hit")); - stubUpdateStatus(); + subscriptions.add(sub); reconciler.reconcile(new Request("ns", "my-sub")); - // Phase 1 was entered because hints diverged; planner threw → failed=true - assertTrue(sub.getStatus().getFailed()); + // Re-planned with new hints + assertEquals(Collections.singletonMap("key", "value"), sub.getStatus().getHints()); + assertEquals("Planned.", sub.getStatus().getMessage()); } - // when SQL and hints both match, phase 1 is NOT triggered. Confirms the second equals check also matters + // failureRetryDuration() returns exactly 5 minutes @Test - void notDivergedWhenSqlMatchesAndHintsMatch() throws Exception { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - sub.getSpec().setHints(Collections.singletonMap("key", "val")); - - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(Collections.singletonMap("key", "val")); - status.setReady(true); - String yaml = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.emptyList()); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(true); - stubUpdateStatus(); - - Result result = reconciler.reconcile(new Request("ns", "my-sub")); + void failureRetryDurationIsFiveMinutes() { + assertEquals(Duration.ofMinutes(5), reconciler.failureRetryDuration()); + } - // Not diverged → Phase 3 → not requeued - assertFalse(result.isRequeue()); - // plannerFactory should NOT have been called - verify(plannerFactory, never()).makePlanner(); + // pendingRetryDuration() returns exactly 1 minute + @Test + void pendingRetryDurationIsOneMinute() { + assertEquals(Duration.ofMinutes(1), reconciler.pendingRetryDuration()); } - // guessAttributes() — non-primitive JSON value is NOT included + // Phase 1 – hints are passed through @Test - @SuppressWarnings("unchecked") - void guessAttributesExcludesNonPrimitiveValues() throws Exception { + void phase1PassesHintsToPlanner() { V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(new HashMap<>()); - status.setReady(false); - - // Build a raw JSON where .status.attributes has one primitive and one nested object - JsonObject nestedObj = new JsonObject(); - nestedObj.addProperty("inner", "value"); - - JsonObject attributes = new JsonObject(); - attributes.add("primKey", new JsonPrimitive("primVal")); - attributes.add("objKey", nestedObj); // non-primitive — must be excluded - - JsonObject statusObj = new JsonObject(); - statusObj.add("attributes", attributes); - - JsonObject raw = new JsonObject(); - raw.add("status", statusObj); - raw.add("apiVersion", new JsonPrimitive("v1")); - raw.add("kind", new JsonPrimitive("ConfigMap")); - JsonObject metadata = new JsonObject(); - metadata.add("name", new JsonPrimitive("cm1")); - metadata.add("namespace", new JsonPrimitive("ns")); - raw.add("metadata", metadata); - - String yaml = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n"; - status.setResources(Collections.singletonList(yaml)); - status.setJobResources(Collections.singletonList(yaml)); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(false); + sub.getSpec().setHints(Collections.singletonMap("topicPrefix", "test")); + subscriptions.add(sub); - DynamicKubernetesApi mockDynApi = mock(DynamicKubernetesApi.class); - when(operator.apiFor(any(DynamicKubernetesObject.class))).thenReturn(mockDynApi); - KubernetesApiResponse mockResp = mock(KubernetesApiResponse.class); - when(mockDynApi.get(anyString(), anyString())).thenReturn(mockResp); - when(mockResp.onFailure(any())).thenReturn(mockResp); - when(mockResp.isSuccess()).thenReturn(true); - DynamicKubernetesObject dynObj = new DynamicKubernetesObject(raw); - dynObj.setMetadata(new V1ObjectMeta().name("cm1").namespace("ns")); - when(mockResp.getObject()).thenReturn(dynObj); + final boolean[] hintsSeen = {false}; + SubscriptionReconciler.Planner spyPlanner = (sql, database, name, hints) -> { + hintsSeen[0] = hints != null && "test".equals(hints.get("topicPrefix")); + return new SubscriptionReconciler.PlanResult("INSERT INTO ...", List.of("kind: ConfigMap")); + }; - stubUpdateStatus(); + SubscriptionReconciler spyReconciler = new SubscriptionReconciler(null, + new FakeK8sApi<>(subscriptions), + new FakeK8sApi<>(pipelines), + spyPlanner); - reconciler.reconcile(new Request("ns", "my-sub")); + spyReconciler.reconcile(new Request("ns", "my-sub")); - // primKey should be present; objKey (nested object) must NOT be present - assertNotNull(sub.getStatus().getAttributes()); - assertTrue(sub.getStatus().getAttributes().containsKey("primKey")); - assertFalse(sub.getStatus().getAttributes().containsKey("objKey")); - } - - // --- fetchAttributes YAML error-handling tests (merged from SubscriptionReconcilerFetchAttributesTest) --- - // These test that malformed YAML in jobResources doesn't propagate out of reconcile(). - // Exercises the try/catch blocks in fetchAttributes() without reflection. - - private V1alpha1Subscription buildPhase3Subscription(String jobResourceYaml) { - V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); - V1alpha1SubscriptionStatus status = new V1alpha1SubscriptionStatus(); - status.setSql("SELECT 1"); - status.setHints(new HashMap<>()); - status.setReady(false); - String validYaml = "apiVersion: v1\nkind: ConfigMap\nmetadata:\n name: cm1\n namespace: ns\n"; - status.setResources(Collections.singletonList(validYaml)); - status.setJobResources(Collections.singletonList(jobResourceYaml)); - status.setDownstreamResources(Collections.emptyList()); - sub.setStatus(status); - return sub; + assertTrue(hintsSeen[0], "Planner should receive hints from subscription spec"); } + // Outer exception handler – reconciliation exception → requeue @Test - void fetchAttributesHandlesClassCastExceptionYaml() throws Exception { - V1alpha1Subscription sub = buildPhase3Subscription(CLASS_CAST_EXCEPTION_YAML); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(false); - stubUpdateStatus(); - - assertDoesNotThrow(() -> reconciler.reconcile(new Request("ns", "my-sub"))); - } + void outerExceptionHandlerRequeuesOnFailure() { + // Subscription exists but subscriptionApi.updateStatus will throw + V1alpha1Subscription sub = buildSubscription("ns", "my-sub", "SELECT 1"); + sub.getStatus(); - @Test - void fetchAttributesHandlesScannerExceptionYaml() throws Exception { - V1alpha1Subscription sub = buildPhase3Subscription(SCANNER_EXCEPTION_YAML); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(false); - stubUpdateStatus(); + FakeK8sApi throwingApi = + new FakeK8sApi<>(List.of(sub)) { + @Override + public void updateStatus(V1alpha1Subscription obj, Object status) throws java.sql.SQLException { + throw new java.sql.SQLException("status update failed"); + } + }; - assertDoesNotThrow(() -> reconciler.reconcile(new Request("ns", "my-sub"))); - } + SubscriptionReconciler throwingReconciler = new SubscriptionReconciler(null, + throwingApi, + new FakeK8sApi<>(pipelines), + successPlanner); - @Test - void fetchAttributesHandlesYamlWithNullMetadata() throws Exception { - V1alpha1Subscription sub = buildPhase3Subscription(NO_METADATA_YAML); - when(operator.fetch(anyString(), anyString(), anyString())).thenReturn(sub); - when(operator.isReady(anyString())).thenReturn(false); - stubUpdateStatus(); + Result result = throwingReconciler.reconcile(new Request("ns", "my-sub")); - assertDoesNotThrow(() -> reconciler.reconcile(new Request("ns", "my-sub"))); + assertTrue(result.isRequeue()); } }