Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ deploy-flink: deploy
helm upgrade --install --atomic --set webhook.create=false,image.pullPolicy=Never,image.repository=docker.io/library/hoptimator-flink-operator,image.tag=latest --set-json='watchNamespaces=["default","flink"]' flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator
kubectl apply -f ./deploy/dev/flink-session-cluster.yaml
kubectl apply -f ./deploy/dev/flink-sql-gateway.yaml
kubectl apply -f ./deploy/dev/flink-sqljob-rbac.yaml
kubectl apply -f ./deploy/samples/flink-template.yaml
kubectl apply -f ./deploy/samples/flink-beam-template.yaml

Expand Down
2 changes: 2 additions & 0 deletions deploy/dev/flink-session-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ spec:
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "6"
python.executable: /usr/bin/python3
python.client.executable: /usr/bin/python3
serviceAccount: flink
jobManager:
resource:
Expand Down
24 changes: 24 additions & 0 deletions deploy/dev/flink-sqljob-rbac.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
## Grants the Flink service account read access to SqlJob CRs,
## so FlinkRunner can fetch SQL and files at runtime.

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: sqljob-reader
rules:
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["sqljobs"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: flink-sqljob-reader
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: sqljob-reader
subjects:
- kind: ServiceAccount
name: flink
namespace: flink
21 changes: 9 additions & 12 deletions deploy/samples/flink-template.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,15 @@ metadata:
name: flink-template
spec:
yaml: |
apiVersion: flink.apache.org/v1beta1
kind: FlinkSessionJob
apiVersion: hoptimator.linkedin.com/v1alpha1
kind: SqlJob
metadata:
name: {{name}}
spec:
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- {{flinksql}}
jarURI: file:///opt/{{flink.app.name}}.jar
parallelism: {{flink.parallelism:1}}
upgradeMode: stateless
state: running
{{flink.app.type==SQL}}
dialect: Flink
executionMode: Streaming
sql:
- {{flinksql}}
files:
{{files}}
{{flink.app.type==SQL}}
12 changes: 12 additions & 0 deletions hoptimator-api/src/main/java/com/linkedin/hoptimator/Job.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.linkedin.hoptimator;

import java.util.Collections;
import java.util.Map;
import java.util.Set;

Expand All @@ -16,6 +17,7 @@ public class Job implements Deployable {
private final String name;
private final Set<Source> sources;
private final Sink sink;
private final Map<String, String> files;

/**
* Lazy-evaluated template functions that generate various outputs for the job.
Expand All @@ -30,10 +32,16 @@ public class Job implements Deployable {
private final Map<String, ThrowingFunction<SqlDialect, String>> lazyEvals;

public Job(String name, Set<Source> sources, Sink sink, Map<String, ThrowingFunction<SqlDialect, String>> lazyEvals) {
this(name, sources, sink, lazyEvals, Collections.emptyMap());
}

public Job(String name, Set<Source> sources, Sink sink, Map<String, ThrowingFunction<SqlDialect, String>> lazyEvals,
Map<String, String> files) {
this.name = name;
this.sources = sources;
this.sink = sink;
this.lazyEvals = lazyEvals;
this.files = files != null ? files : Collections.emptyMap();
}

public String name() {
Expand All @@ -60,6 +68,10 @@ public ThrowingFunction<SqlDialect, String> fieldMap() {
return eval("fieldMap");
}

public Map<String, String> files() {
return files;
}

/**
* Retrieves a lazy-evaluated template function by key.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.linkedin.hoptimator;

import java.util.Collections;
import java.util.Map;


public class UserFunction implements Deployable {

private final String name;
private final String as;
private final String namespace;
private final Map<String, String> options;

public UserFunction(String name, String as, String namespace, Map<String, String> options) {
this.name = name;
this.as = as;
this.namespace = namespace;
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is namespace used for? The only UserFunction caller seems to set it as null.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I think that's a holdover from IN <namespace> that got copied from create trigger. Will fix.

this.options = options != null ? options : Collections.emptyMap();
}

public String name() {
return name;
}

public String as() {
return as;
}

public String namespace() {
return namespace;
}

public Map<String, String> options() {
return options;
}

@Override
public String toString() {
return "UserFunction[" + name + "]";
}
}
5 changes: 3 additions & 2 deletions hoptimator-flink-adapter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ dependencies {
implementation project(':hoptimator-catalog')
implementation project(':hoptimator-k8s')
implementation project(':hoptimator-operator')
implementation libs.kubernetesClient
implementation libs.kubernetesExtendedClient
implementation project(':hoptimator-util')
implementation libs.kubernetes.client
implementation libs.kubernetes.extended.client

testImplementation libs.assertj
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@

public class FlinkStreamingSqlJob extends Resource {

public FlinkStreamingSqlJob(String namespace, String name, String sql) {
public FlinkStreamingSqlJob(String namespace, String name) {
super("FlinkStreamingSqlJob");
export("namespace", namespace);
export("name", name);
export("sql", sql);
}
}
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
package com.linkedin.hoptimator.operator.flink;

import com.linkedin.hoptimator.k8s.K8sApiEndpoint;
import com.linkedin.hoptimator.k8s.K8sContext;
import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJob;
import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJobList;
import com.linkedin.hoptimator.operator.ControllerProvider;
import com.linkedin.hoptimator.operator.Operator;

import io.kubernetes.client.extended.controller.Controller;
import io.kubernetes.client.extended.controller.builder.ControllerBuilder;
import io.kubernetes.client.extended.controller.reconciler.Reconciler;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;


/** A bridge to flink-kubernetes-operator */
public class FlinkControllerProvider implements ControllerProvider {

@Override
public Collection<Controller> controllers(Operator operator) {
operator.registerApi("FlinkDeployment", "flinkdeployment", "flinkdeployments", "flink.apache.org", "v1beta1");
public static final K8sApiEndpoint<V1alpha1SqlJob, V1alpha1SqlJobList> SQL_JOBS =
new K8sApiEndpoint<>("SqlJob", "hoptimator.linkedin.com", "v1alpha1", "sqljobs", false,
V1alpha1SqlJob.class, V1alpha1SqlJobList.class);

operator.registerApi("SqlJob", "sqljob", "sqljobs", "hoptimator.linkedin.com", "v1alpha1", V1alpha1SqlJob.class,
V1alpha1SqlJobList.class);
@Override
public Collection<Controller> controllers(K8sContext context) {
context.registerInformer(SQL_JOBS, Duration.ofMinutes(5));

Reconciler reconciler = new FlinkStreamingSqlJobReconciler(operator);
Controller controller = ControllerBuilder.defaultBuilder(operator.informerFactory())
Reconciler reconciler = new FlinkStreamingSqlJobReconciler(context);
Controller controller = ControllerBuilder.defaultBuilder(context.informerFactory())
.withReconciler(reconciler)
.withName("flink-streaming-sql-job-controller")
.withWorkerCount(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,50 +2,62 @@

import com.linkedin.hoptimator.catalog.Resource;
import com.linkedin.hoptimator.catalog.flink.FlinkStreamingSqlJob;
import com.linkedin.hoptimator.k8s.K8sApi;
import com.linkedin.hoptimator.k8s.K8sContext;
import com.linkedin.hoptimator.k8s.K8sYamlApi;
import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJob;
import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJobList;
import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJobSpec.DialectEnum;
import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJobSpec.ExecutionModeEnum;
import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJobStatus;
import com.linkedin.hoptimator.operator.Operator;

import io.kubernetes.client.extended.controller.reconciler.Reconciler;
import io.kubernetes.client.extended.controller.reconciler.Request;
import io.kubernetes.client.extended.controller.reconciler.Result;
import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesObject;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.stream.Collectors;
import java.sql.SQLException;
import java.time.Duration;


/**
* Manifests streaming SqlJobs as Flink jobs.
* Manifests streaming SqlJobs as Flink session jobs.
*
* <p>The reconciler creates a FlinkSessionJob that references the SqlJob by name.
* The FlinkRunner fetches SQL and files directly from the SqlJob CR at runtime.
*/
public class FlinkStreamingSqlJobReconciler implements Reconciler {
private static final Logger log = LoggerFactory.getLogger(FlinkStreamingSqlJobReconciler.class);
private static final String SQLJOB = "hoptimator.linkedin.com/v1alpha1/SqlJob";

private final Operator operator;
private final K8sContext context;
private final K8sApi<V1alpha1SqlJob, V1alpha1SqlJobList> sqlJobApi;
private final K8sYamlApi yamlApi;

public FlinkStreamingSqlJobReconciler(Operator operator) {
this.operator = operator;
public FlinkStreamingSqlJobReconciler(K8sContext context) {
this.context = context;
this.sqlJobApi = new K8sApi<>(context, FlinkControllerProvider.SQL_JOBS);
this.yamlApi = new K8sYamlApi(context);
}

@Override
public Result reconcile(Request request) {
log.info("Reconciling request {}", request);
String name = request.getName();
String namespace = request.getNamespace();
Result result = new Result(true, operator.pendingRetryDuration());

try {
V1alpha1SqlJob object = operator.<V1alpha1SqlJob>fetch(SQLJOB, namespace, name);

if (object == null) {
log.info("Object {}/{} deleted. Skipping.");
return new Result(false);
V1alpha1SqlJob object;
try {
object = sqlJobApi.get(namespace, name);
} catch (SQLException e) {
if (e.getErrorCode() == 404) {
log.info("Object {} deleted. Skipping.", name);
return new Result(false);
}
throw e;
}

V1alpha1SqlJobStatus status = object.getStatus();
Expand All @@ -54,9 +66,6 @@ public Result reconcile(Request request) {
object.setStatus(status);
}

List<String> sql = object.getSpec().getSql();
String script = sql.stream().collect(Collectors.joining(";\n"));

DialectEnum dialect = object.getSpec().getDialect();
if (!DialectEnum.FLINK.equals(dialect)) {
log.info("Not Flink SQL. Skipping.");
Expand All @@ -70,41 +79,21 @@ public Result reconcile(Request request) {
}

Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(Resource.Environment.EMPTY);
Resource sqlJob = new FlinkStreamingSqlJob(namespace, name, script);
boolean allReady = true;
boolean anyFailed = false;
for (String yaml : sqlJob.render(templateFactory)) {
operator.apply(yaml, object);
if (!operator.isReady(yaml)) {
allReady = false;
}
if (operator.isFailed(yaml)) {
anyFailed = true;
allReady = false;
}
}

object.getStatus().setReady(allReady);
object.getStatus().setFailed(anyFailed);
Resource sqlJob = new FlinkStreamingSqlJob(namespace, name);

if (allReady) {
object.getStatus().setMessage("Ready.");
result = new Result(false); // done
}
if (anyFailed) {
object.getStatus().setMessage("Failed.");
result = new Result(false); // done
for (String yaml : sqlJob.render(templateFactory)) {
DynamicKubernetesObject obj = yamlApi.objFromYaml(yaml);
context.own(obj);
yamlApi.update(obj);
}

operator.apiFor(SQLJOB)
.updateStatus(object, x -> object.getStatus())
.onFailure((x, y) -> log.error("Failed to update status of SqlJob {}: {}.", name, y.getMessage()))
.throwsApiException();
object.getStatus().setReady(true);
object.getStatus().setMessage("Ready.");
sqlJobApi.updateStatus(object, object.getStatus());
} catch (Exception e) {
log.error("Encountered exception while reconciling Flink streaming SqlJob {}/{}", namespace, name, e);
return new Result(true, operator.failureRetryDuration());
return new Result(true, Duration.ofMinutes(5));
}
return result;
return new Result(false);
}
}

Original file line number Diff line number Diff line change
@@ -1,29 +1,14 @@
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
kind: FlinkSessionJob
metadata:
namespace: {{namespace}}
name: {{name}}-flink-job
name: {{name}}
spec:
image: docker.io/library/hoptimator-flink-runner
imagePullPolicy: Never
flinkVersion: v1_18
flinkConfiguration:
taskmanager.numberOfTaskSlots: "1"
serviceAccount: flink
jobManager:
resource:
memory: "2048m"
cpu: 0.1
taskManager:
resource:
memory: "2048m"
cpu: 0.1
deploymentName: basic-session-deployment
job:
entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner
args:
- {{sql}}
jarURI: local:///opt/hoptimator-flink-runner.jar
- --sqljob={{namespace}}/{{name}}
jarURI: file:///opt/hoptimator-flink-runner.jar
parallelism: 1
upgradeMode: stateless
state: running

Loading
Loading