Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import com.linkedin.hoptimator.k8s.models.V1alpha1PipelineList;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplate;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTemplateList;
import com.linkedin.hoptimator.k8s.models.V1alpha1Subscription;
import com.linkedin.hoptimator.k8s.models.V1alpha1SubscriptionList;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTrigger;
import com.linkedin.hoptimator.k8s.models.V1alpha1TableTriggerList;
import com.linkedin.hoptimator.k8s.models.V1alpha1View;
Expand Down Expand Up @@ -57,6 +59,9 @@ public final class K8sApiEndpoints {
public static final K8sApiEndpoint<V1alpha1JobTemplate, V1alpha1JobTemplateList> JOB_TEMPLATES =
new K8sApiEndpoint<>("JobTemplate", "hoptimator.linkedin.com", "v1alpha1", "jobtemplates", false,
V1alpha1JobTemplate.class, V1alpha1JobTemplateList.class);
public static final K8sApiEndpoint<V1alpha1Subscription, V1alpha1SubscriptionList> SUBSCRIPTIONS =
new K8sApiEndpoint<>("Subscription", "hoptimator.linkedin.com", "v1alpha1", "subscriptions", false,
V1alpha1Subscription.class, V1alpha1SubscriptionList.class);
public static final K8sApiEndpoint<V1alpha1TableTrigger, V1alpha1TableTriggerList> TABLE_TRIGGERS =
new K8sApiEndpoint<>("TableTrigger", "hoptimator.linkedin.com", "v1alpha1", "tabletriggers", false,
V1alpha1TableTrigger.class, V1alpha1TableTriggerList.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplate;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateList;
import com.linkedin.hoptimator.k8s.models.V1alpha1JobTemplateSpec;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -25,19 +24,14 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;


@ExtendWith(MockitoExtension.class)
@SuppressFBWarnings(value = {"RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT"},
justification = "Mockito doReturn().when() stubs — framework captures the return value")
class K8sJobDeployerTest {

@Mock
Expand Down Expand Up @@ -98,8 +92,6 @@ K8sSnapshot createSnapshot(K8sContext context) {

@Test
void specifyWithNoTemplatesReturnsEmpty() throws SQLException {
doReturn(new Properties()).when(connection).connectionProperties();

Sink sink = new Sink("sinkdb", Arrays.asList("schema", "sink_table"),
Collections.emptyMap());
Job job = createTestJob(sink);
Expand All @@ -114,8 +106,6 @@ void specifyWithNoTemplatesReturnsEmpty() throws SQLException {

@Test
void specifyRendersMatchingTemplate() throws SQLException {
doReturn(new Properties()).when(connection).connectionProperties();

templates.add(new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("template1"))
.spec(new V1alpha1JobTemplateSpec()
Expand All @@ -136,8 +126,6 @@ void specifyRendersMatchingTemplate() throws SQLException {

@Test
void specifyFiltersOutNonMatchingDatabases() throws SQLException {
doReturn(new Properties()).when(connection).connectionProperties();

templates.add(new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("template1"))
.spec(new V1alpha1JobTemplateSpec()
Expand All @@ -157,8 +145,6 @@ void specifyFiltersOutNonMatchingDatabases() throws SQLException {

@Test
void specifyWithNullDatabasesMatchesAll() throws SQLException {
doReturn(new Properties()).when(connection).connectionProperties();

templates.add(new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("template1"))
.spec(new V1alpha1JobTemplateSpec()
Expand All @@ -178,8 +164,6 @@ void specifyWithNullDatabasesMatchesAll() throws SQLException {

@Test
void specifyRendersTemplateVariables() throws SQLException {
doReturn(new Properties()).when(connection).connectionProperties();

templates.add(new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("template1"))
.spec(new V1alpha1JobTemplateSpec()
Expand All @@ -203,8 +187,6 @@ void specifyRendersTemplateVariables() throws SQLException {
@Test
void specifyLambdasReturnNonEmptyValues() throws SQLException {
// Verify each key field is non-empty.
doReturn(new Properties()).when(connection).connectionProperties();

templates.add(new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("template1"))
.spec(new V1alpha1JobTemplateSpec()
Expand Down Expand Up @@ -237,10 +219,6 @@ void specifyLambdasReturnNonEmptyValues() throws SQLException {
@Test
void specifyWithFlinkConfigPropertiesIncludesThem() throws SQLException {
// Verify that sink options ARE merged into the environment
Properties connProps = new Properties();
connProps.setProperty("flinkConfig1", "value1");
doReturn(connProps).when(connection).connectionProperties();

Map<String, String> sinkOptions = new HashMap<>();
sinkOptions.put("sinkOption", "sinkVal");
Sink sink = new Sink("sinkdb", Arrays.asList("schema", "sink_table"), sinkOptions);
Expand All @@ -264,8 +242,6 @@ void specifyWithFlinkConfigPropertiesIncludesThem() throws SQLException {
@Test
void specifyConditionalRenderedTemplateNotNull() throws SQLException {
// Verify null templates are skipped
doReturn(new Properties()).when(connection).connectionProperties();

templates.add(new V1alpha1JobTemplate()
.metadata(new V1ObjectMeta().name("template1"))
.spec(new V1alpha1JobTemplateSpec()
Expand Down
5 changes: 1 addition & 4 deletions hoptimator-operator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@ plugins {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect there are more files that are now unused in the hoptimator-operator package, namely "Operator.java" but maybe more that we can remove.

dependencies {
implementation project(':hoptimator-api')
implementation project(':hoptimator-avro') // <-- marked for deletion
implementation project(':hoptimator-planner') // <-- marked for deletion
implementation project(':hoptimator-catalog') // <-- marked for deletion
implementation project(':hoptimator-jdbc')
implementation project(':hoptimator-util')
implementation project(':hoptimator-k8s')
Expand All @@ -18,7 +15,7 @@ dependencies {
implementation libs.kubernetes.extended.client
implementation libs.slf4j.api
implementation libs.commons.cli
implementation libs.avro
implementation libs.calcite.avatica
implementation libs.cron.utils

testImplementation(testFixtures(project(':hoptimator-k8s')))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.hoptimator.k8s.K8sApiEndpoints;
import com.linkedin.hoptimator.k8s.K8sContext;
import com.linkedin.hoptimator.operator.pipeline.PipelineReconciler;
import com.linkedin.hoptimator.operator.subscription.SubscriptionReconciler;
import com.linkedin.hoptimator.operator.trigger.TableTriggerReconciler;
import com.linkedin.hoptimator.operator.trigger.ViewReconciler;
import io.kubernetes.client.extended.controller.Controller;
Expand All @@ -18,6 +19,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -59,25 +62,42 @@ public static void main(String[] args) throws Exception {
String watchNamespaceInput = cmd.getOptionValue("watch", "");
Properties connectionProperties = new Properties();
connectionProperties.put("k8s.watch.namespace", watchNamespaceInput);
K8sContext context = K8sContext.create(new HoptimatorConnection(null, connectionProperties));
new PipelineOperatorApp(context).run();

// Create a JDBC connection for SQL planning (used by SubscriptionReconciler)
Connection jdbcConnection = DriverManager.getConnection("jdbc:hoptimator://", connectionProperties);
K8sContext context = K8sContext.create(jdbcConnection);

SubscriptionReconciler.Planner planner = SubscriptionReconciler.jdbcPlanner(jdbcConnection);
new PipelineOperatorApp(context).run(planner);
Comment on lines +70 to +71
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like there is a lot of logic just to allow SubscriptionReconciler to support drop in planners, though in practice it'll always use the jdbcPlanner. Do you think we need this?

}

public void run() {
run(Collections.emptyList());
}

public void run(SubscriptionReconciler.Planner planner) {
run(planner, Collections.emptyList());
}

public void run(List<Controller> initialControllers) {
run(null, initialControllers);
}

public void run(SubscriptionReconciler.Planner planner, List<Controller> initialControllers) {
// register informers
context.registerInformer(K8sApiEndpoints.PIPELINES, Duration.ofMinutes(5));
context.registerInformer(K8sApiEndpoints.TABLE_TRIGGERS, Duration.ofMinutes(5));
context.registerInformer(K8sApiEndpoints.VIEWS, Duration.ofMinutes(5));
context.registerInformer(K8sApiEndpoints.SUBSCRIPTIONS, Duration.ofMinutes(5));

List<Controller> controllers = new ArrayList<>(initialControllers);
controllers.addAll(ControllerService.controllers(context));
controllers.add(PipelineReconciler.controller(context));
controllers.add(TableTriggerReconciler.controller(context));
controllers.add(ViewReconciler.controller(context));
if (planner != null) {
controllers.add(SubscriptionReconciler.controller(context, planner));
}

ControllerManager controllerManager =
new ControllerManager(context.informerFactory(), controllers.toArray(new Controller[0]));
Expand Down

This file was deleted.

Loading
Loading