diff --git a/Makefile b/Makefile index 6feebc58..c3d047c7 100644 --- a/Makefile +++ b/Makefile @@ -58,6 +58,7 @@ deploy-flink: deploy helm upgrade --install --atomic --set webhook.create=false,image.pullPolicy=Never,image.repository=docker.io/library/hoptimator-flink-operator,image.tag=latest --set-json='watchNamespaces=["default","flink"]' flink-kubernetes-operator flink-operator-repo/flink-kubernetes-operator kubectl apply -f ./deploy/dev/flink-session-cluster.yaml kubectl apply -f ./deploy/dev/flink-sql-gateway.yaml + kubectl apply -f ./deploy/dev/flink-sqljob-rbac.yaml kubectl apply -f ./deploy/samples/flink-template.yaml kubectl apply -f ./deploy/samples/flink-beam-template.yaml diff --git a/deploy/dev/flink-session-cluster.yaml b/deploy/dev/flink-session-cluster.yaml index 40f46dbf..72efc0e3 100644 --- a/deploy/dev/flink-session-cluster.yaml +++ b/deploy/dev/flink-session-cluster.yaml @@ -9,6 +9,8 @@ spec: flinkVersion: v1_18 flinkConfiguration: taskmanager.numberOfTaskSlots: "6" + python.executable: /usr/bin/python3 + python.client.executable: /usr/bin/python3 serviceAccount: flink jobManager: resource: diff --git a/deploy/dev/flink-sqljob-rbac.yaml b/deploy/dev/flink-sqljob-rbac.yaml new file mode 100644 index 00000000..57105801 --- /dev/null +++ b/deploy/dev/flink-sqljob-rbac.yaml @@ -0,0 +1,24 @@ +## Grants the Flink service account read access to SqlJob CRs, +## so FlinkRunner can fetch SQL and files at runtime. + +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: sqljob-reader +rules: +- apiGroups: ["hoptimator.linkedin.com"] + resources: ["sqljobs"] + verbs: ["get"] +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: flink-sqljob-reader +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: sqljob-reader +subjects: +- kind: ServiceAccount + name: flink + namespace: flink diff --git a/deploy/samples/flink-template.yaml b/deploy/samples/flink-template.yaml index 1d43a867..d44783eb 100644 --- a/deploy/samples/flink-template.yaml +++ b/deploy/samples/flink-template.yaml @@ -6,18 +6,15 @@ metadata: name: flink-template spec: yaml: | - apiVersion: flink.apache.org/v1beta1 - kind: FlinkSessionJob + apiVersion: hoptimator.linkedin.com/v1alpha1 + kind: SqlJob metadata: name: {{name}} spec: - deploymentName: basic-session-deployment - job: - entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner - args: - - {{flinksql}} - jarURI: file:///opt/{{flink.app.name}}.jar - parallelism: {{flink.parallelism:1}} - upgradeMode: stateless - state: running - {{flink.app.type==SQL}} + dialect: Flink + executionMode: Streaming + sql: + - {{flinksql}} + files: + {{files}} + {{flink.app.type==SQL}} diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Job.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Job.java index bdc2849e..4e87735a 100644 --- a/hoptimator-api/src/main/java/com/linkedin/hoptimator/Job.java +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/Job.java @@ -1,5 +1,6 @@ package com.linkedin.hoptimator; +import java.util.Collections; import java.util.Map; import java.util.Set; @@ -16,6 +17,7 @@ public class Job implements Deployable { private final String name; private final Set sources; private final Sink sink; + private final Map files; /** * Lazy-evaluated template functions that generate various outputs for the job. @@ -30,10 +32,16 @@ public class Job implements Deployable { private final Map> lazyEvals; public Job(String name, Set sources, Sink sink, Map> lazyEvals) { + this(name, sources, sink, lazyEvals, Collections.emptyMap()); + } + + public Job(String name, Set sources, Sink sink, Map> lazyEvals, + Map files) { this.name = name; this.sources = sources; this.sink = sink; this.lazyEvals = lazyEvals; + this.files = files != null ? files : Collections.emptyMap(); } public String name() { @@ -60,6 +68,10 @@ public ThrowingFunction fieldMap() { return eval("fieldMap"); } + public Map files() { + return files; + } + /** * Retrieves a lazy-evaluated template function by key. * diff --git a/hoptimator-api/src/main/java/com/linkedin/hoptimator/UserFunction.java b/hoptimator-api/src/main/java/com/linkedin/hoptimator/UserFunction.java new file mode 100644 index 00000000..9a51c330 --- /dev/null +++ b/hoptimator-api/src/main/java/com/linkedin/hoptimator/UserFunction.java @@ -0,0 +1,41 @@ +package com.linkedin.hoptimator; + +import java.util.Collections; +import java.util.Map; + + +public class UserFunction implements Deployable { + + private final String name; + private final String as; + private final String namespace; + private final Map options; + + public UserFunction(String name, String as, String namespace, Map options) { + this.name = name; + this.as = as; + this.namespace = namespace; + this.options = options != null ? options : Collections.emptyMap(); + } + + public String name() { + return name; + } + + public String as() { + return as; + } + + public String namespace() { + return namespace; + } + + public Map options() { + return options; + } + + @Override + public String toString() { + return "UserFunction[" + name + "]"; + } +} diff --git a/hoptimator-flink-adapter/build.gradle b/hoptimator-flink-adapter/build.gradle index 1e7a7ddb..a51a921d 100644 --- a/hoptimator-flink-adapter/build.gradle +++ b/hoptimator-flink-adapter/build.gradle @@ -8,8 +8,9 @@ dependencies { implementation project(':hoptimator-catalog') implementation project(':hoptimator-k8s') implementation project(':hoptimator-operator') - implementation libs.kubernetesClient - implementation libs.kubernetesExtendedClient + implementation project(':hoptimator-util') + implementation libs.kubernetes.client + implementation libs.kubernetes.extended.client testImplementation libs.assertj } diff --git a/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/catalog/flink/FlinkStreamingSqlJob.java b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/catalog/flink/FlinkStreamingSqlJob.java index e697631e..73116c64 100644 --- a/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/catalog/flink/FlinkStreamingSqlJob.java +++ b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/catalog/flink/FlinkStreamingSqlJob.java @@ -5,10 +5,9 @@ public class FlinkStreamingSqlJob extends Resource { - public FlinkStreamingSqlJob(String namespace, String name, String sql) { + public FlinkStreamingSqlJob(String namespace, String name) { super("FlinkStreamingSqlJob"); export("namespace", namespace); export("name", name); - export("sql", sql); } } diff --git a/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkControllerProvider.java b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkControllerProvider.java index b7f12aca..daf0d38e 100644 --- a/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkControllerProvider.java +++ b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkControllerProvider.java @@ -1,14 +1,16 @@ package com.linkedin.hoptimator.operator.flink; +import com.linkedin.hoptimator.k8s.K8sApiEndpoint; +import com.linkedin.hoptimator.k8s.K8sContext; import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJob; import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJobList; import com.linkedin.hoptimator.operator.ControllerProvider; -import com.linkedin.hoptimator.operator.Operator; import io.kubernetes.client.extended.controller.Controller; import io.kubernetes.client.extended.controller.builder.ControllerBuilder; import io.kubernetes.client.extended.controller.reconciler.Reconciler; +import java.time.Duration; import java.util.Collection; import java.util.Collections; @@ -16,15 +18,16 @@ /** A bridge to flink-kubernetes-operator */ public class FlinkControllerProvider implements ControllerProvider { - @Override - public Collection controllers(Operator operator) { - operator.registerApi("FlinkDeployment", "flinkdeployment", "flinkdeployments", "flink.apache.org", "v1beta1"); + public static final K8sApiEndpoint SQL_JOBS = + new K8sApiEndpoint<>("SqlJob", "hoptimator.linkedin.com", "v1alpha1", "sqljobs", false, + V1alpha1SqlJob.class, V1alpha1SqlJobList.class); - operator.registerApi("SqlJob", "sqljob", "sqljobs", "hoptimator.linkedin.com", "v1alpha1", V1alpha1SqlJob.class, - V1alpha1SqlJobList.class); + @Override + public Collection controllers(K8sContext context) { + context.registerInformer(SQL_JOBS, Duration.ofMinutes(5)); - Reconciler reconciler = new FlinkStreamingSqlJobReconciler(operator); - Controller controller = ControllerBuilder.defaultBuilder(operator.informerFactory()) + Reconciler reconciler = new FlinkStreamingSqlJobReconciler(context); + Controller controller = ControllerBuilder.defaultBuilder(context.informerFactory()) .withReconciler(reconciler) .withName("flink-streaming-sql-job-controller") .withWorkerCount(1) diff --git a/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkStreamingSqlJobReconciler.java b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkStreamingSqlJobReconciler.java index 7cf4f250..b5a20f4b 100644 --- a/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkStreamingSqlJobReconciler.java +++ b/hoptimator-flink-adapter/src/main/java/com/linkedin/hoptimator/operator/flink/FlinkStreamingSqlJobReconciler.java @@ -2,35 +2,44 @@ import com.linkedin.hoptimator.catalog.Resource; import com.linkedin.hoptimator.catalog.flink.FlinkStreamingSqlJob; +import com.linkedin.hoptimator.k8s.K8sApi; +import com.linkedin.hoptimator.k8s.K8sContext; +import com.linkedin.hoptimator.k8s.K8sYamlApi; import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJob; +import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJobList; import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJobSpec.DialectEnum; import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJobSpec.ExecutionModeEnum; import com.linkedin.hoptimator.k8s.models.V1alpha1SqlJobStatus; -import com.linkedin.hoptimator.operator.Operator; import io.kubernetes.client.extended.controller.reconciler.Reconciler; import io.kubernetes.client.extended.controller.reconciler.Request; import io.kubernetes.client.extended.controller.reconciler.Result; +import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; -import java.util.stream.Collectors; +import java.sql.SQLException; +import java.time.Duration; /** - * Manifests streaming SqlJobs as Flink jobs. + * Manifests streaming SqlJobs as Flink session jobs. * + *

The reconciler creates a FlinkSessionJob that references the SqlJob by name. + * The FlinkRunner fetches SQL and files directly from the SqlJob CR at runtime. */ public class FlinkStreamingSqlJobReconciler implements Reconciler { private static final Logger log = LoggerFactory.getLogger(FlinkStreamingSqlJobReconciler.class); - private static final String SQLJOB = "hoptimator.linkedin.com/v1alpha1/SqlJob"; - private final Operator operator; + private final K8sContext context; + private final K8sApi sqlJobApi; + private final K8sYamlApi yamlApi; - public FlinkStreamingSqlJobReconciler(Operator operator) { - this.operator = operator; + public FlinkStreamingSqlJobReconciler(K8sContext context) { + this.context = context; + this.sqlJobApi = new K8sApi<>(context, FlinkControllerProvider.SQL_JOBS); + this.yamlApi = new K8sYamlApi(context); } @Override @@ -38,14 +47,17 @@ public Result reconcile(Request request) { log.info("Reconciling request {}", request); String name = request.getName(); String namespace = request.getNamespace(); - Result result = new Result(true, operator.pendingRetryDuration()); try { - V1alpha1SqlJob object = operator.fetch(SQLJOB, namespace, name); - - if (object == null) { - log.info("Object {}/{} deleted. Skipping."); - return new Result(false); + V1alpha1SqlJob object; + try { + object = sqlJobApi.get(namespace, name); + } catch (SQLException e) { + if (e.getErrorCode() == 404) { + log.info("Object {} deleted. Skipping.", name); + return new Result(false); + } + throw e; } V1alpha1SqlJobStatus status = object.getStatus(); @@ -54,9 +66,6 @@ public Result reconcile(Request request) { object.setStatus(status); } - List sql = object.getSpec().getSql(); - String script = sql.stream().collect(Collectors.joining(";\n")); - DialectEnum dialect = object.getSpec().getDialect(); if (!DialectEnum.FLINK.equals(dialect)) { log.info("Not Flink SQL. Skipping."); @@ -70,41 +79,21 @@ public Result reconcile(Request request) { } Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(Resource.Environment.EMPTY); - Resource sqlJob = new FlinkStreamingSqlJob(namespace, name, script); - boolean allReady = true; - boolean anyFailed = false; - for (String yaml : sqlJob.render(templateFactory)) { - operator.apply(yaml, object); - if (!operator.isReady(yaml)) { - allReady = false; - } - if (operator.isFailed(yaml)) { - anyFailed = true; - allReady = false; - } - } - - object.getStatus().setReady(allReady); - object.getStatus().setFailed(anyFailed); + Resource sqlJob = new FlinkStreamingSqlJob(namespace, name); - if (allReady) { - object.getStatus().setMessage("Ready."); - result = new Result(false); // done - } - if (anyFailed) { - object.getStatus().setMessage("Failed."); - result = new Result(false); // done + for (String yaml : sqlJob.render(templateFactory)) { + DynamicKubernetesObject obj = yamlApi.objFromYaml(yaml); + context.own(obj); + yamlApi.update(obj); } - operator.apiFor(SQLJOB) - .updateStatus(object, x -> object.getStatus()) - .onFailure((x, y) -> log.error("Failed to update status of SqlJob {}: {}.", name, y.getMessage())) - .throwsApiException(); + object.getStatus().setReady(true); + object.getStatus().setMessage("Ready."); + sqlJobApi.updateStatus(object, object.getStatus()); } catch (Exception e) { log.error("Encountered exception while reconciling Flink streaming SqlJob {}/{}", namespace, name, e); - return new Result(true, operator.failureRetryDuration()); + return new Result(true, Duration.ofMinutes(5)); } - return result; + return new Result(false); } } - diff --git a/hoptimator-flink-adapter/src/main/resources/FlinkStreamingSqlJob.yaml.template b/hoptimator-flink-adapter/src/main/resources/FlinkStreamingSqlJob.yaml.template index 57956b0c..d16674ba 100644 --- a/hoptimator-flink-adapter/src/main/resources/FlinkStreamingSqlJob.yaml.template +++ b/hoptimator-flink-adapter/src/main/resources/FlinkStreamingSqlJob.yaml.template @@ -1,29 +1,14 @@ apiVersion: flink.apache.org/v1beta1 -kind: FlinkDeployment +kind: FlinkSessionJob metadata: - namespace: {{namespace}} - name: {{name}}-flink-job + name: {{name}} spec: - image: docker.io/library/hoptimator-flink-runner - imagePullPolicy: Never - flinkVersion: v1_18 - flinkConfiguration: - taskmanager.numberOfTaskSlots: "1" - serviceAccount: flink - jobManager: - resource: - memory: "2048m" - cpu: 0.1 - taskManager: - resource: - memory: "2048m" - cpu: 0.1 + deploymentName: basic-session-deployment job: entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner args: - - {{sql}} - jarURI: local:///opt/hoptimator-flink-runner.jar + - --sqljob={{namespace}}/{{name}} + jarURI: file:///opt/hoptimator-flink-runner.jar parallelism: 1 upgradeMode: stateless state: running - diff --git a/hoptimator-flink-runner/Dockerfile-flink-runner b/hoptimator-flink-runner/Dockerfile-flink-runner index 0f59e377..93ba6348 100644 --- a/hoptimator-flink-runner/Dockerfile-flink-runner +++ b/hoptimator-flink-runner/Dockerfile-flink-runner @@ -1,2 +1,14 @@ FROM flink:1.18.1 + +# Install Python and PyFlink for Python UDF support +RUN apt-get update && \ + apt-get install -y python3 python3-pip && \ + pip3 install apache-flink==1.18.1 && \ + rm -rf /var/lib/apt/lists/* + +# Copy Python UDF modules +COPY python/ /opt/python-udfs/ +ENV PYTHONPATH=/opt/python-udfs + +# Copy runner JAR (includes Java UDFs) COPY ./build/libs/hoptimator-flink-runner-all.jar /opt/hoptimator-flink-runner.jar diff --git a/hoptimator-flink-runner/build.gradle b/hoptimator-flink-runner/build.gradle index c2840a2d..db4d0e0f 100644 --- a/hoptimator-flink-runner/build.gradle +++ b/hoptimator-flink-runner/build.gradle @@ -10,13 +10,16 @@ dependencies { implementation libs.flink.connector.kafka implementation libs.flink.connector.mysql.cdc implementation libs.flink.csv + implementation libs.kubernetes.client //Flink depends should be provided by runtime + compileOnly libs.flink.table.common compileOnly libs.flink.table.planner compileOnly libs.flink.clients compileOnly libs.flink.table.api.java.bridge testImplementation libs.assertj + testImplementation libs.flink.table.common } shadowJar { diff --git a/hoptimator-flink-runner/python/demo_udfs.py b/hoptimator-flink-runner/python/demo_udfs.py new file mode 100644 index 00000000..26b9ac0d --- /dev/null +++ b/hoptimator-flink-runner/python/demo_udfs.py @@ -0,0 +1,8 @@ +from pyflink.table.udf import udf +from pyflink.table import DataTypes + + +@udf(result_type=DataTypes.STRING()) +def reverse_string(s): + """A simple Python UDF that reverses a string.""" + return s[::-1] if s else None diff --git a/hoptimator-flink-runner/src/main/java/com/linkedin/hoptimator/flink/runner/FlinkRunner.java b/hoptimator-flink-runner/src/main/java/com/linkedin/hoptimator/flink/runner/FlinkRunner.java index 985a9100..4868c294 100644 --- a/hoptimator-flink-runner/src/main/java/com/linkedin/hoptimator/flink/runner/FlinkRunner.java +++ b/hoptimator-flink-runner/src/main/java/com/linkedin/hoptimator/flink/runner/FlinkRunner.java @@ -6,30 +6,128 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.kubernetes.client.openapi.ApiClient; +import io.kubernetes.client.util.Config; +import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesApi; +import io.kubernetes.client.util.generic.dynamic.DynamicKubernetesObject; -/** Runs SQL from command-line args. */ +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + + +/** Runs SQL from a SqlJob CR or from command-line args. */ public final class FlinkRunner { private static final Logger logger = LoggerFactory.getLogger(FlinkRunner.class); + private static final String SQLJOB_PREFIX = "--sqljob="; + private static final String UDF_DIR_PROPERTY = "hoptimator.udf.dir"; + private static final String UDF_DIR_DEFAULT = "/opt/python-udfs"; private FlinkRunner() { } - public static void main(String[] args) { + public static void main(String[] args) throws Exception { + List sqlStatements; + + // Check for --sqljob argument to fetch SQL and files from a SqlJob CR + String sqlJobRef = findSqlJobRef(args); + if (sqlJobRef != null) { + logger.info("Fetching SqlJob: {}", sqlJobRef); + sqlStatements = loadFromSqlJob(sqlJobRef); + } else { + // Backward compatibility: SQL from command-line args + logger.info("No --sqljob argument found. Using SQL from command-line args."); + sqlStatements = new ArrayList<>(); + for (String arg : args) { + String stmt = arg.replaceAll("\\n", "").trim(); + if (!stmt.isEmpty() && !stmt.startsWith("--")) { + sqlStatements.add(stmt); + } + } + } + + // Execute SQL statements EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); - for (int i = 0; i < args.length; i++) { - String stmt = args[i].replaceAll("\\n", "").trim(); - if (!stmt.isEmpty() && !stmt.startsWith("--")) { - logger.info("Executing statement #{}: {}", i, stmt); - try { - tEnv.executeSql(stmt); - } catch (Exception e) { - logger.error("Error executing SQL statement #{}: `{}`", i, stmt, e); - throw e; - } + for (int i = 0; i < sqlStatements.size(); i++) { + String stmt = sqlStatements.get(i); + logger.info("Executing statement #{}: {}", i, stmt); + try { + tEnv.executeSql(stmt); + } catch (Exception e) { + logger.error("Error executing SQL statement #{}: `{}`", i, stmt, e); + throw e; + } + } + } + + /** Finds the --sqljob=namespace/name argument, or null if not present. */ + static String findSqlJobRef(String[] args) { + for (String arg : args) { + if (arg.startsWith(SQLJOB_PREFIX)) { + return arg.substring(SQLJOB_PREFIX.length()); + } + } + return null; + } + + /** Fetches a SqlJob CR and returns the SQL statements, writing any files to the UDF directory. */ + @SuppressWarnings("unchecked") + static List loadFromSqlJob(String ref) throws Exception { + String[] parts = ref.split("/", 2); + if (parts.length != 2) { + throw new IllegalArgumentException("SqlJob reference must be namespace/name, got: " + ref); + } + String namespace = parts[0]; + String name = parts[1]; + + ApiClient client = Config.defaultClient(); + DynamicKubernetesApi api = new DynamicKubernetesApi("hoptimator.linkedin.com", "v1alpha1", "sqljobs", client); + DynamicKubernetesObject obj = api.get(namespace, name).throwsApiException().getObject(); + + Map spec = (Map) obj.getRaw().get("spec"); + if (spec == null) { + throw new IllegalStateException("SqlJob " + ref + " has no spec"); + } + + // Extract and execute files + Map files = (Map) spec.get("files"); + if (files != null && !files.isEmpty()) { + Path udfDir = Paths.get(System.getProperty(UDF_DIR_PROPERTY, UDF_DIR_DEFAULT)); + for (Map.Entry entry : files.entrySet()) { + writeFile(entry.getKey(), entry.getValue(), udfDir); } } + + // Extract SQL statements + List sql = (List) spec.get("sql"); + if (sql == null) { + return Collections.emptyList(); + } + + List statements = new ArrayList<>(); + for (String stmt : sql) { + String trimmed = stmt.replaceAll("\\n", "").trim(); + if (!trimmed.isEmpty()) { + statements.add(trimmed); + } + } + return statements; + } + + /** Writes a file to the given directory. */ + static void writeFile(String filename, String content, Path targetDir) throws IOException { + Files.createDirectories(targetDir); + Path filePath = targetDir.resolve(filename); + Files.write(filePath, content.getBytes(StandardCharsets.UTF_8)); + logger.info("Wrote UDF file: {}", filePath); } } diff --git a/hoptimator-flink-runner/src/main/java/com/linkedin/hoptimator/flink/runner/functions/Greet.java b/hoptimator-flink-runner/src/main/java/com/linkedin/hoptimator/flink/runner/functions/Greet.java new file mode 100644 index 00000000..23f9983c --- /dev/null +++ b/hoptimator-flink-runner/src/main/java/com/linkedin/hoptimator/flink/runner/functions/Greet.java @@ -0,0 +1,15 @@ +package com.linkedin.hoptimator.flink.runner.functions; + +import org.apache.flink.table.functions.ScalarFunction; + + +/** A simple scalar UDF that greets a name. */ +public class Greet extends ScalarFunction { + + public String eval(String name) { + if (name == null) { + return null; + } + return "Hello, " + name + "!"; + } +} diff --git a/hoptimator-flink-runner/src/main/java/com/linkedin/hoptimator/flink/runner/functions/StringLength.java b/hoptimator-flink-runner/src/main/java/com/linkedin/hoptimator/flink/runner/functions/StringLength.java new file mode 100644 index 00000000..5fa16a95 --- /dev/null +++ b/hoptimator-flink-runner/src/main/java/com/linkedin/hoptimator/flink/runner/functions/StringLength.java @@ -0,0 +1,15 @@ +package com.linkedin.hoptimator.flink.runner.functions; + +import org.apache.flink.table.functions.ScalarFunction; + + +/** A simple scalar UDF that returns the length of a string. */ +public class StringLength extends ScalarFunction { + + public Integer eval(String s) { + if (s == null) { + return null; + } + return s.length(); + } +} diff --git a/hoptimator-flink-runner/src/test/java/com/linkedin/hoptimator/flink/runner/FlinkRunnerTest.java b/hoptimator-flink-runner/src/test/java/com/linkedin/hoptimator/flink/runner/FlinkRunnerTest.java new file mode 100644 index 00000000..2ce352d0 --- /dev/null +++ b/hoptimator-flink-runner/src/test/java/com/linkedin/hoptimator/flink/runner/FlinkRunnerTest.java @@ -0,0 +1,45 @@ +package com.linkedin.hoptimator.flink.runner; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; + + +class FlinkRunnerTest { + + @Test + void findSqlJobRefPresent() { + String[] args = {"--sqljob=default/my-job", "CREATE TABLE t (x INT)"}; + assertThat(FlinkRunner.findSqlJobRef(args)).isEqualTo("default/my-job"); + } + + @Test + void findSqlJobRefAbsent() { + String[] args = {"CREATE TABLE t (x INT)", "INSERT INTO t SELECT 1"}; + assertThat(FlinkRunner.findSqlJobRef(args)).isNull(); + } + + @Test + void writeFileCreatesFileOnDisk(@TempDir Path tempDir) throws Exception { + FlinkRunner.writeFile("test.py", "print('hello')", tempDir); + + Path written = tempDir.resolve("test.py"); + assertThat(written).exists(); + assertThat(Files.readString(written, StandardCharsets.UTF_8)).isEqualTo("print('hello')"); + } + + @Test + void writeFileHandlesMultilineContent(@TempDir Path tempDir) throws Exception { + String content = "from pyflink.table.udf import udf\n\n@udf(result_type=DataTypes.STRING())\ndef reverse(s):\n return s[::-1]\n"; + FlinkRunner.writeFile("my_udfs.py", content, tempDir); + + Path written = tempDir.resolve("my_udfs.py"); + assertThat(written).exists(); + assertThat(Files.readString(written, StandardCharsets.UTF_8)).isEqualTo(content); + } +} diff --git a/hoptimator-flink-runner/src/test/java/com/linkedin/hoptimator/flink/runner/functions/GreetTest.java b/hoptimator-flink-runner/src/test/java/com/linkedin/hoptimator/flink/runner/functions/GreetTest.java new file mode 100644 index 00000000..db0c4430 --- /dev/null +++ b/hoptimator-flink-runner/src/test/java/com/linkedin/hoptimator/flink/runner/functions/GreetTest.java @@ -0,0 +1,26 @@ +package com.linkedin.hoptimator.flink.runner.functions; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + + +class GreetTest { + + private final Greet greet = new Greet(); + + @Test + void greetName() { + assertThat(greet.eval("Alice")).isEqualTo("Hello, Alice!"); + } + + @Test + void greetEmptyString() { + assertThat(greet.eval("")).isEqualTo("Hello, !"); + } + + @Test + void greetNull() { + assertThat(greet.eval(null)).isNull(); + } +} diff --git a/hoptimator-flink-runner/src/test/java/com/linkedin/hoptimator/flink/runner/functions/StringLengthTest.java b/hoptimator-flink-runner/src/test/java/com/linkedin/hoptimator/flink/runner/functions/StringLengthTest.java new file mode 100644 index 00000000..be88b6b7 --- /dev/null +++ b/hoptimator-flink-runner/src/test/java/com/linkedin/hoptimator/flink/runner/functions/StringLengthTest.java @@ -0,0 +1,26 @@ +package com.linkedin.hoptimator.flink.runner.functions; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.jupiter.api.Test; + + +class StringLengthTest { + + private final StringLength stringLength = new StringLength(); + + @Test + void lengthOfString() { + assertThat(stringLength.eval("hello")).isEqualTo(5); + } + + @Test + void lengthOfEmptyString() { + assertThat(stringLength.eval("")).isEqualTo(0); + } + + @Test + void lengthOfNull() { + assertThat(stringLength.eval(null)).isNull(); + } +} diff --git a/hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl b/hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl index a3a92bd2..11b3e161 100644 --- a/hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl +++ b/hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl @@ -340,17 +340,21 @@ SqlCreate SqlCreateFunction(Span s, boolean replace) : final SqlIdentifier id; final SqlNode template; SqlNode namespace = null; + SqlDataTypeSpec returnType = null; + SqlIdentifier language = null; SqlNodeList optionList = null; } { ifNotExists = IfNotExistsOpt() id = CompoundIdentifier() + [ returnType = DataType() ] + [ language = SimpleIdentifier() ] template = StringLiteral() [ namespace = StringLiteral() ] [ optionList = Options() ] { return new SqlCreateFunction(s.end(this), replace, ifNotExists, id, template, - namespace, optionList); + namespace, returnType, language, optionList); } } diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/DollarQuoting.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/DollarQuoting.java new file mode 100644 index 00000000..90dfb83b --- /dev/null +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/DollarQuoting.java @@ -0,0 +1,69 @@ +package com.linkedin.hoptimator.jdbc; + +import java.io.IOException; +import java.io.Reader; +import java.io.StringReader; +import java.io.UncheckedIOException; + + +/** + * Preprocesses SQL to convert $$-delimited strings to standard single-quoted strings. + * + *

This enables PostgreSQL-style dollar-quoting for inline code blocks, e.g.: + *

{@code
+ * CREATE FUNCTION foo LANGUAGE PYTHON AS 'foo' IN $$
+ * def foo(s):
+ *     return s[::-1]
+ * $$;
+ * }
+ * + *

The preprocessor converts {@code $$...$$} to {@code '...'}, escaping any + * internal single quotes by doubling them ({@code '} → {@code ''}). + */ +final class DollarQuoting { + + private DollarQuoting() { + } + + /** Preprocesses a Reader, converting $$-delimited strings to single-quoted SQL strings. */ + static Reader preprocess(Reader reader) { + try { + StringBuilder sb = new StringBuilder(); + char[] buf = new char[4096]; + int n; + while ((n = reader.read(buf)) != -1) { + sb.append(buf, 0, n); + } + return new StringReader(preprocess(sb.toString())); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** Converts $$-delimited strings to single-quoted SQL strings. */ + static String preprocess(String sql) { + int idx = sql.indexOf("$$"); + if (idx < 0) { + return sql; + } + StringBuilder result = new StringBuilder(); + int i = 0; + while (i < sql.length()) { + if (i + 1 < sql.length() && sql.charAt(i) == '$' && sql.charAt(i + 1) == '$') { + int end = sql.indexOf("$$", i + 2); + if (end < 0) { + throw new IllegalArgumentException("Unclosed $$ delimiter in SQL"); + } + String content = sql.substring(i + 2, end); + result.append('\''); + result.append(content.replace("'", "''")); + result.append('\''); + i = end + 2; + } else { + result.append(sql.charAt(i)); + i++; + } + } + return result.toString(); + } +} diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java index 3cb26b4c..1d8472f9 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorConnection.java @@ -3,6 +3,7 @@ import com.linkedin.hoptimator.Database; import com.linkedin.hoptimator.Sink; import com.linkedin.hoptimator.Source; +import com.linkedin.hoptimator.UserFunction; import com.linkedin.hoptimator.avro.AvroConverter; import com.linkedin.hoptimator.util.ConnectionService; import com.linkedin.hoptimator.util.DelegatingConnection; @@ -32,6 +33,7 @@ public class HoptimatorConnection extends DelegatingConnection { private final CalciteConnection connection; private final Properties connectionProperties; private final List materializations = new ArrayList<>(); + private final List functions = new ArrayList<>(); private final List> logHooks = new ArrayList<>(); @@ -96,6 +98,18 @@ public HoptimatorConnection withProperties(Properties properties) { return new HoptimatorConnection(connection, properties); } + public void registerFunction(UserFunction func) { + functions.add(func); + } + + public void removeFunction(String name) { + functions.removeIf(f -> f.name().equals(name)); + } + + public List functions() { + return functions; + } + public void registerMaterialization(List viewPath, String querySql) { String tableSql = "SELECT * FROM " + viewPath.stream().map(x -> "\"" + x + "\"").collect(Collectors.joining(".")); RelNode tableRel = HoptimatorDriver.convert(this, tableSql).root.rel; diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java index 3025cbab..639cd284 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/HoptimatorDdlExecutor.java @@ -26,9 +26,11 @@ import com.linkedin.hoptimator.Pipeline; import com.linkedin.hoptimator.Source; import com.linkedin.hoptimator.Trigger; +import com.linkedin.hoptimator.UserFunction; import com.linkedin.hoptimator.UserJob; import com.linkedin.hoptimator.View; import com.linkedin.hoptimator.jdbc.ddl.HoptimatorDdlParserImpl; +import com.linkedin.hoptimator.jdbc.ddl.SqlCreateFunction; import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView; import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable; import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger; @@ -113,7 +115,8 @@ public HoptimatorDdlExecutor(HoptimatorConnection connection) { public static final SqlParserImplFactory PARSER_FACTORY = new SqlParserImplFactory() { @Override public SqlAbstractParserImpl getParser(Reader stream) { - SqlAbstractParserImpl parser = HoptimatorDdlParserImpl.FACTORY.getParser(stream); + Reader preprocessed = DollarQuoting.preprocess(stream); + SqlAbstractParserImpl parser = HoptimatorDdlParserImpl.FACTORY.getParser(preprocessed); parser.setConformance(SqlConformanceEnum.BABEL); return parser; } @@ -264,6 +267,10 @@ public void execute(SqlCreateMaterializedView create, CalcitePrepare.Context con // Plan a pipeline to materialize the view. RelRoot root = new HoptimatorDriver.Prepare(connection).convert(context, sql).root; PipelineRel.Implementor plan = DeploymentService.plan(root, connection.materializations(), connectionProperties); + // Pass registered functions to the implementor + for (UserFunction func : connection.functions()) { + plan.addFunction(func); + } schemaSnapshot = HoptimatorDdlUtils.snapshotAndSetSinkSchema(context, new HoptimatorDriver.Prepare(connection), plan, sql, pair); logger.info("Added materialized view {} to schema {}", viewName, schemaPlus.getName()); Pipeline pipeline = plan.pipeline(viewName, connection); @@ -366,6 +373,80 @@ public void execute(SqlCreateTrigger create, CalcitePrepare.Context context) { } } + /** Executes a {@code CREATE FUNCTION} command. */ + public void execute(SqlCreateFunction create, CalcitePrepare.Context context) { + logger.info("Validating statement: {}", create); + try { + ValidationService.validateOrThrow(create); + } catch (SQLException e) { + throw new DdlException(create, e.getMessage(), e); + } + + String name = create.name.names.get(create.name.names.size() - 1); + String as = ((SqlLiteral) create.job).getValueAs(String.class); + String inClause = create.namespace != null + ? ((SqlLiteral) create.namespace).getValueAs(String.class) : null; + Map options = HoptimatorDdlUtils.options(create.options); + + // Extract LANGUAGE from the dedicated clause + if (create.language != null) { + options.put("LANGUAGE", create.language.getSimple()); + } + + // The IN clause provides the source for the function. Its interpretation depends on context: + // - JAR archive (.jar extension or URL): stored as JAR option for USING JAR emission + // - Inline code (contains whitespace, e.g. from $$...$$): stored as CODE option + // - File path or URL (contains '/'): read content and stored as CODE option + // - Module reference (simple name, e.g. 'demo_udfs'): combined with AS as module.function + if (inClause != null) { + if (inClause.endsWith(".jar")) { + // JAR archive: AS is the class, IN is the JAR location + options.put("JAR", inClause); + } else if (inClause.contains(" ") || inClause.contains("\n")) { + // Inline code: derive module.function reference from function name + String funcName = as; + String moduleName = name.toLowerCase(java.util.Locale.ROOT); + as = moduleName + "." + funcName; + options.put("CODE", inClause); + } else if (inClause.contains("/") || inClause.contains(":\\")) { + // File path or URL: read content and treat as inline code + String code = readSource(inClause); + String funcName = as; + String moduleName = name.toLowerCase(java.util.Locale.ROOT); + as = moduleName + "." + funcName; + options.put("CODE", code); + } else { + // Module reference: IN names the module, AS names the function within it + as = inClause + "." + as; + } + } + + UserFunction userFunction = new UserFunction(name, as, null, options); + + // Determine return type from RETURNS clause (default: VARCHAR) + String returnsName = create.returnType != null + ? create.returnType.getTypeName().getSimple() : "VARCHAR"; + org.apache.calcite.sql.type.SqlTypeName returnType; + try { + returnType = org.apache.calcite.sql.type.SqlTypeName.valueOf(returnsName.toUpperCase(java.util.Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new DdlException(create, "Unknown return type: " + returnsName); + } + + // Register opaque function overloads in Calcite schema so queries pass validation. + // Each overload accepts ANY-typed parameters and returns the specified type. + final Pair pair = HoptimatorDdlUtils.schema(context, true, create.name); + SchemaPlus schemaPlus = pair.left != null ? pair.left.plus() : context.getRootSchema().plus(); + for (OpaqueFunction fn : OpaqueFunction.overloads(returnType)) { + schemaPlus.add(name, fn); + } + + // Register in session for pipeline DDL emission + connection.registerFunction(userFunction); + + logger.info("CREATE FUNCTION {} completed", name); + } + // N.B. originally copy-pasted from Apache Calcite /** Executes a {@code CREATE TABLE} command. */ @@ -629,6 +710,18 @@ public void execute(SqlDropObject drop, CalcitePrepare.Context context) { throw new DdlException(drop, e.getMessage(), e); } + // Handle DROP FUNCTION + if (drop.getKind().equals(SqlKind.DROP_FUNCTION)) { + String name = drop.name.names.get(drop.name.names.size() - 1); + final Pair fnPair = HoptimatorDdlUtils.schema(context, false, drop.name); + if (fnPair.left != null) { + fnPair.left.removeFunction(name); + } + connection.removeFunction(name); + logger.info("DROP FUNCTION {} completed", name); + return; + } + // The logic below is only applicable for DROP VIEW and DROP MATERIALIZED VIEW. if (!drop.getKind().equals(SqlKind.DROP_MATERIALIZED_VIEW) && !drop.getKind().equals(SqlKind.DROP_VIEW) @@ -784,4 +877,20 @@ String databaseName() { return databaseName; } } + + /** Reads source code from a file path or URL. */ + private static String readSource(String location) { + try { + java.net.URI uri; + if (location.contains("://")) { + uri = new java.net.URI(location); + } else { + uri = java.nio.file.Paths.get(location).toUri(); + } + return new String(java.nio.file.Files.readAllBytes(java.nio.file.Paths.get(uri)), + java.nio.charset.StandardCharsets.UTF_8); + } catch (Exception e) { + throw new RuntimeException("Failed to read source from " + location, e); + } + } } diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/OpaqueFunction.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/OpaqueFunction.java new file mode 100644 index 00000000..dc672423 --- /dev/null +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/OpaqueFunction.java @@ -0,0 +1,128 @@ +package com.linkedin.hoptimator.jdbc; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.calcite.adapter.enumerable.CallImplementor; +import org.apache.calcite.linq4j.tree.Expressions; +import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.schema.FunctionParameter; +import org.apache.calcite.schema.ImplementableFunction; +import org.apache.calcite.schema.ScalarFunction; +import org.apache.calcite.sql.type.SqlTypeName; + + +/** + * A permissive opaque function used for Calcite schema registration. + * + *

When users register UDFs via CREATE FUNCTION, this class provides a + * placeholder so that queries referencing the UDF pass Calcite validation. + * The actual function implementation is provided by the runtime (e.g., Flink). + * + *

Each overload accepts a fixed number of ANY-typed parameters and returns + * a configurable type (defaulting to VARCHAR). ANY parameters allow any column + * type to be passed without casting. The return type can be specified via the + * RETURNS option in CREATE FUNCTION ... WITH (RETURNS 'INTEGER'). + * + *

At execution time (e.g., in test query previews), the function returns null. + */ +public final class OpaqueFunction implements ScalarFunction, ImplementableFunction { + + private static final int MAX_ARITY = 10; + private final int arity; + private final SqlTypeName returnType; + + private OpaqueFunction(int arity, SqlTypeName returnType) { + this.arity = arity; + this.returnType = returnType; + } + + /** + * Creates a list of opaque function overloads covering arities 0 through {@link #MAX_ARITY} + * with the given return type. + */ + public static List overloads(SqlTypeName returnType) { + return overloads(MAX_ARITY, returnType); + } + + /** + * Creates a list of opaque function overloads covering arities 0 through maxArity. + */ + public static List overloads(int maxArity, SqlTypeName returnType) { + List functions = new ArrayList<>(); + for (int i = 0; i <= maxArity; i++) { + functions.add(new OpaqueFunction(i, returnType)); + } + return Collections.unmodifiableList(functions); + } + + @Override + public RelDataType getReturnType(RelDataTypeFactory typeFactory) { + return typeFactory.createTypeWithNullability( + typeFactory.createSqlType(returnType), true); + } + + @Override + public List getParameters() { + List params = new ArrayList<>(); + for (int i = 0; i < arity; i++) { + final int ordinal = i; + params.add(new FunctionParameter() { + @Override + public int getOrdinal() { + return ordinal; + } + + @Override + public String getName() { + return "param" + ordinal; + } + + @Override + public RelDataType getType(RelDataTypeFactory typeFactory) { + return typeFactory.createTypeWithNullability( + typeFactory.createSqlType(SqlTypeName.ANY), true); + } + + @Override + public boolean isOptional() { + return false; + } + }); + } + return params; + } + + @Override + public CallImplementor getImplementor() { + // Returns null for any invocation. UDFs are opaque placeholders in the + // JDBC driver; they only truly execute in the data plane (e.g., Flink). + return (translator, call, nullAs) -> Expressions.constant(null, javaType(returnType)); + } + + private static Class javaType(SqlTypeName typeName) { + switch (typeName) { + case BOOLEAN: + return Boolean.class; + case TINYINT: + return Byte.class; + case SMALLINT: + return Short.class; + case INTEGER: + return Integer.class; + case BIGINT: + return Long.class; + case FLOAT: + case REAL: + return Float.class; + case DOUBLE: + return Double.class; + case DECIMAL: + return java.math.BigDecimal.class; + default: + return String.class; + } + } +} diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/HoptimatorDdlParserImpl.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/HoptimatorDdlParserImpl.java index 7588ab26..5febeb4d 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/HoptimatorDdlParserImpl.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/HoptimatorDdlParserImpl.java @@ -6735,10 +6735,22 @@ final public SqlCreate SqlCreateFunction(Span s, boolean replace) throws ParseEx final SqlIdentifier id; final SqlNode template; SqlNode namespace = null; + SqlDataTypeSpec returnType = null; + SqlIdentifier language = null; SqlNodeList optionList = null; jj_consume_token(FUNCTION); ifNotExists = IfNotExistsOpt(); id = CompoundIdentifier(); + // optional: RETURNS + if (jj_ntk == RETURNS || (jj_ntk == -1 && jj_ntk() == RETURNS)) { + jj_consume_token(RETURNS); + returnType = DataType(); + } + // optional: LANGUAGE (before AS) + if (jj_ntk == LANGUAGE || (jj_ntk == -1 && jj_ntk() == LANGUAGE)) { + jj_consume_token(LANGUAGE); + language = SimpleIdentifier(); + } jj_consume_token(AS); template = StringLiteral(); switch ((jj_ntk==-1)?jj_ntk():jj_ntk) { @@ -6759,7 +6771,7 @@ final public SqlCreate SqlCreateFunction(Span s, boolean replace) throws ParseEx ; } {if (true) return new SqlCreateFunction(s.end(this), replace, ifNotExists, id, template, - namespace, optionList);} + namespace, returnType, language, optionList);} throw new Error("Missing return statement in function"); } diff --git a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/SqlCreateFunction.java b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/SqlCreateFunction.java index 2386ebe3..37827f79 100644 --- a/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/SqlCreateFunction.java +++ b/hoptimator-jdbc/src/main/java/com/linkedin/hoptimator/jdbc/ddl/SqlCreateFunction.java @@ -19,6 +19,7 @@ package com.linkedin.hoptimator.jdbc.ddl; import org.apache.calcite.sql.SqlCreate; +import org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; @@ -35,30 +36,46 @@ /** - * Parse tree for `CREATE FUNCTION` statement. + * Parse tree for {@code CREATE FUNCTION} statement. + * + *

{@code
+ * CREATE FUNCTION name [RETURNS type] AS 'class' [IN 'namespace'] [LANGUAGE lang] [WITH (...)]
+ * }
*/ public class SqlCreateFunction extends SqlCreate { public final SqlIdentifier name; public final SqlNode job; public final SqlNode namespace; + public final SqlDataTypeSpec returnType; + public final SqlIdentifier language; public final SqlNodeList options; private static final SqlOperator OPERATOR = new SqlSpecialOperator("CREATE FUNCTION", SqlKind.CREATE_FUNCTION); + /** Constructor with returnType and language. */ public SqlCreateFunction(SqlParserPos pos, boolean replace, boolean ifNotExists, - SqlIdentifier name, SqlNode job, SqlNode namespace, SqlNodeList options) { + SqlIdentifier name, SqlNode job, SqlNode namespace, SqlDataTypeSpec returnType, + SqlIdentifier language, SqlNodeList options) { super(OPERATOR, pos, replace, ifNotExists); this.name = requireNonNull(name, "name"); this.job = requireNonNull(job, "job"); - this.namespace = namespace; + this.namespace = namespace; + this.returnType = returnType; + this.language = language; this.options = options; } + /** Backward-compatible constructor without returnType and language. */ + public SqlCreateFunction(SqlParserPos pos, boolean replace, boolean ifNotExists, + SqlIdentifier name, SqlNode job, SqlNode namespace, SqlNodeList options) { + this(pos, replace, ifNotExists, name, job, namespace, null, null, options); + } + @SuppressWarnings("nullness") @Override public List getOperandList() { - return ImmutableNullableList.of(name, job, namespace, options); + return ImmutableNullableList.of(name, job, namespace, returnType, language, options); } @Override public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { @@ -68,12 +85,20 @@ public SqlCreateFunction(SqlParserPos pos, boolean replace, boolean ifNotExists, writer.keyword("IF NOT EXISTS"); } name.unparse(writer, leftPrec, rightPrec); + if (returnType != null) { + writer.keyword("RETURNS"); + returnType.unparse(writer, 0, 0); + } writer.keyword("AS"); job.unparse(writer, 0, 0); if (namespace != null) { writer.keyword("IN"); namespace.unparse(writer, 0, 0); } + if (language != null) { + writer.keyword("LANGUAGE"); + language.unparse(writer, 0, 0); + } if (options != null) { writer.keyword("WITH"); options.unparse(writer, 0, 0); diff --git a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestSqlScripts.java b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestSqlScripts.java index 1c74f5ff..6015e6b8 100644 --- a/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestSqlScripts.java +++ b/hoptimator-jdbc/src/test/java/com/linkedin/hoptimator/jdbc/TestSqlScripts.java @@ -23,6 +23,11 @@ public void basicDdlScript() throws Exception { run("basic-ddl.id"); } + @Test + public void createFunctionDdlScript() throws Exception { + run("create-function-ddl.id"); + } + @Test public void createViewWithAValidatorRejectingCreateViewThrowsException() throws Exception { // Runs the test in a separate thread to isolate the context class loader changes. diff --git a/hoptimator-jdbc/src/test/resources/create-function-ddl.id b/hoptimator-jdbc/src/test/resources/create-function-ddl.id new file mode 100644 index 00000000..413cbfc6 --- /dev/null +++ b/hoptimator-jdbc/src/test/resources/create-function-ddl.id @@ -0,0 +1,116 @@ +!set outputformat mysql +!use util + +# Test CREATE FUNCTION with default return type (VARCHAR) +create function my_udf as 'com.example.MyUdf'; +(0 rows modified) + +!update + +# VARCHAR UDF validates against inline data; returns null (opaque placeholder) +select my_udf(x) as val from (values ('hello'), ('world')) as t(x); ++-----+ +| VAL | ++-----+ +| | +| | ++-----+ +(2 rows) + +!ok + +# Test CREATE FUNCTION with explicit INTEGER return type +create function int_udf returns integer as 'com.example.IntUdf'; +(0 rows modified) + +!update + +# Integer UDF validates against integer input +select int_udf(x) as val from (values (10), (20), (30)) as t(x); ++-----+ +| VAL | ++-----+ +| | +| | +| | ++-----+ +(3 rows) + +!ok + +# Test CREATE FUNCTION with BOOLEAN return type +create function bool_udf returns boolean as 'com.example.BoolUdf'; +(0 rows modified) + +!update + +# Boolean UDF validates against varchar input +select bool_udf(x) as val from (values ('a')) as t(x); ++-----+ +| VAL | ++-----+ +| | ++-----+ +(1 row) + +!ok + +# Test CREATE FUNCTION with LANGUAGE clause (LANGUAGE before AS) +create function py_udf returns varchar language python as 'my_func' in 'my_module'; +(0 rows modified) + +!update + +# Multi-arg UDF validates with two arguments +select py_udf(x, y) as val from (values ('a', 'b'), ('c', 'd')) as t(x, y); ++-----+ +| VAL | ++-----+ +| | +| | ++-----+ +(2 rows) + +!ok + +# Test CREATE FUNCTION with inline code via dollar-quoting +create function inline_udf returns varchar language python as 'inline_udf' in $$x = 1$$; +(0 rows modified) + +!update + +select inline_udf(x) as val from (values ('a')) as t(x); ++-----+ +| VAL | ++-----+ +| | ++-----+ +(1 row) + +!ok + +# Clean up +drop function my_udf; +(0 rows modified) + +!update + +drop function int_udf; +(0 rows modified) + +!update + +drop function bool_udf; +(0 rows modified) + +!update + +drop function py_udf; +(0 rows modified) + +!update + +drop function inline_udf; +(0 rows modified) + +!update diff --git a/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java b/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java index 4df59b71..c3f8f13f 100644 --- a/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java +++ b/hoptimator-jdbc/src/testFixtures/java/com/linkedin/hoptimator/jdbc/QuidemTestBase.java @@ -41,6 +41,7 @@ import com.linkedin.hoptimator.Pipeline; import com.linkedin.hoptimator.Source; +import com.linkedin.hoptimator.UserFunction; import com.linkedin.hoptimator.util.DeploymentService; @@ -264,6 +265,10 @@ public void execute(Context context, boolean execute) throws Exception { } PipelineRel.Implementor plan = DeploymentService.plan(root, Collections.emptyList(), connectionProperties); + // Pass registered functions to the implementor + for (UserFunction func : conn.functions()) { + plan.addFunction(func); + } if (create != null) { HoptimatorDdlUtils.snapshotAndSetSinkSchema(conn.createPrepareContext(), new HoptimatorDriver.Prepare(conn), plan, create, querySql); diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java index 491c1a4b..781d71ea 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/K8sJobDeployer.java @@ -55,6 +55,7 @@ public List specify() throws SQLException { .with("flinksql", () -> sql.apply(SqlDialect.FLINK)) .with("flinkconfigs", properties) .with("fieldMap", () -> "'" + fieldMap.apply(SqlDialect.ANSI) + "'") + .with("files", job.files()) .with(properties); List templates = jobTemplateApi.list() .stream() diff --git a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobSpec.java b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobSpec.java index bda08a83..cad50681 100644 --- a/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobSpec.java +++ b/hoptimator-k8s/src/main/java/com/linkedin/hoptimator/k8s/models/V1alpha1SqlJobSpec.java @@ -140,6 +140,10 @@ public ExecutionModeEnum read(final JsonReader jsonReader) throws IOException { @SerializedName(SERIALIZED_NAME_EXECUTION_MODE) private ExecutionModeEnum executionMode; + public static final String SERIALIZED_NAME_FILES = "files"; + @SerializedName(SERIALIZED_NAME_FILES) + private Map files = null; + public static final String SERIALIZED_NAME_SQL = "sql"; @SerializedName(SERIALIZED_NAME_SQL) private List sql = new ArrayList<>(); @@ -222,6 +226,37 @@ public void setExecutionMode(ExecutionModeEnum executionMode) { } + public V1alpha1SqlJobSpec files(Map files) { + + this.files = files; + return this; + } + + public V1alpha1SqlJobSpec putFilesItem(String key, String filesItem) { + if (this.files == null) { + this.files = new HashMap<>(); + } + this.files.put(key, filesItem); + return this; + } + + /** + * Inline file content to make available to the SQL job. + * @return files + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "Inline file content to make available to the SQL job.") + + public Map getFiles() { + return files; + } + + + public void setFiles(Map files) { + this.files = files; + } + + public V1alpha1SqlJobSpec sql(List sql) { this.sql = sql; @@ -261,12 +296,13 @@ public boolean equals(Object o) { return Objects.equals(this.configs, v1alpha1SqlJobSpec.configs) && Objects.equals(this.dialect, v1alpha1SqlJobSpec.dialect) && Objects.equals(this.executionMode, v1alpha1SqlJobSpec.executionMode) && + Objects.equals(this.files, v1alpha1SqlJobSpec.files) && Objects.equals(this.sql, v1alpha1SqlJobSpec.sql); } @Override public int hashCode() { - return Objects.hash(configs, dialect, executionMode, sql); + return Objects.hash(configs, dialect, executionMode, files, sql); } @@ -277,6 +313,7 @@ public String toString() { sb.append(" configs: ").append(toIndentedString(configs)).append("\n"); sb.append(" dialect: ").append(toIndentedString(dialect)).append("\n"); sb.append(" executionMode: ").append(toIndentedString(executionMode)).append("\n"); + sb.append(" files: ").append(toIndentedString(files)).append("\n"); sb.append(" sql: ").append(toIndentedString(sql)).append("\n"); sb.append("}"); return sb.toString(); diff --git a/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml b/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml index c3442bbf..7ee1a862 100644 --- a/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml +++ b/hoptimator-k8s/src/main/resources/sqljobs.crd.yaml @@ -57,6 +57,11 @@ spec: type: object additionalProperties: type: string + files: + description: Inline file content to make available to the SQL job (e.g., Python UDF code). + type: object + additionalProperties: + type: string required: - sql status: diff --git a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java index 70a49ca2..3b025688 100644 --- a/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java +++ b/hoptimator-k8s/src/test/java/com/linkedin/hoptimator/k8s/TestSqlScripts.java @@ -19,6 +19,21 @@ public void k8sDdlScriptFunction() throws Exception { run("k8s-ddl-function.id", "fun=mysql"); } + @Test + public void k8sDdlScriptUdf() throws Exception { + run("k8s-ddl-udf.id"); + } + + @Test + public void k8sDdlScriptUdfDemo() throws Exception { + run("k8s-ddl-udf-demo.id"); + } + + @Test + public void k8sDdlScriptUdfFiles() throws Exception { + run("k8s-ddl-udf-files.id"); + } + @Test public void k8sValidationScript() throws Exception { run("k8s-validation.id"); diff --git a/hoptimator-k8s/src/test/resources/k8s-ddl-udf-demo.id b/hoptimator-k8s/src/test/resources/k8s-ddl-udf-demo.id new file mode 100644 index 00000000..fa2f53d1 --- /dev/null +++ b/hoptimator-k8s/src/test/resources/k8s-ddl-udf-demo.id @@ -0,0 +1,82 @@ +!set outputformat mysql +!use k8s + +# Register demo Java UDFs baked into the Flink runner image +create function greet as 'com.linkedin.hoptimator.flink.runner.functions.Greet'; +(0 rows modified) + +!update + +create function str_len returns integer as 'com.linkedin.hoptimator.flink.runner.functions.StringLength'; +(0 rows modified) + +!update + +# Register demo Python UDF baked into the Flink runner image +create function reverse_str language python as 'reverse_string' in 'demo_udfs'; +(0 rows modified) + +!update + +# UDFs validate against real data (opaque placeholders return null at JDBC layer) +select greet("FIRST_NAME") as greeting from profile.members; ++----------+ +| GREETING | ++----------+ +| | +| | +| | ++----------+ +(3 rows) + +!ok + +select str_len("FIRST_NAME") as len from profile.members; ++-----+ +| LEN | ++-----+ +| | +| | +| | ++-----+ +(3 rows) + +!ok + +# Pipeline with Java UDF generates correct SqlJob spec +insert into ads.page_views select greet("FIRST_NAME") as page_urn, "MEMBER_URN" as member_urn from profile.members; +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob +metadata: + name: ads-database-pageviews +spec: + dialect: Flink + executionMode: Streaming + sql: + - CREATE FUNCTION IF NOT EXISTS `GREET` AS 'com.linkedin.hoptimator.flink.runner.functions.Greet' + - CREATE FUNCTION IF NOT EXISTS `STR_LEN` AS 'com.linkedin.hoptimator.flink.runner.functions.StringLength' + - CREATE FUNCTION IF NOT EXISTS `REVERSE_STR` AS 'demo_udfs.reverse_string' LANGUAGE PYTHON + - CREATE DATABASE IF NOT EXISTS `PROFILE` WITH () + - CREATE TABLE IF NOT EXISTS `PROFILE`.`MEMBERS` (`FIRST_NAME` VARCHAR, `LAST_NAME` VARCHAR, `MEMBER_URN` VARCHAR, `COMPANY_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') + - INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `GREET`(`FIRST_NAME`) AS `PAGE_URN`, `MEMBER_URN` FROM `PROFILE`.`MEMBERS` + files: + {} +!specify PAGE_VIEWS + +# Clean up +drop function greet; +(0 rows modified) + +!update + +drop function str_len; +(0 rows modified) + +!update + +drop function reverse_str; +(0 rows modified) + +!update diff --git a/hoptimator-k8s/src/test/resources/k8s-ddl-udf-files.id b/hoptimator-k8s/src/test/resources/k8s-ddl-udf-files.id new file mode 100644 index 00000000..becb3d64 --- /dev/null +++ b/hoptimator-k8s/src/test/resources/k8s-ddl-udf-files.id @@ -0,0 +1,47 @@ +!set outputformat mysql +!use k8s + +# Register a Python UDF with inline code using dollar-quoting +create function my_py_udf returns varchar language python as 'my_py_udf' in $$x = 1$$; +(0 rows modified) + +!update + +# UDF validates against real data (opaque placeholder returns null) +select my_py_udf("FIRST_NAME") as val from profile.members; ++-----+ +| VAL | ++-----+ +| | +| | +| | ++-----+ +(3 rows) + +!ok + +# Pipeline generates SqlJob with files containing the inline code +insert into ads.page_views select my_py_udf("FIRST_NAME") as page_urn, "MEMBER_URN" as member_urn from profile.members; +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob +metadata: + name: ads-database-pageviews +spec: + dialect: Flink + executionMode: Streaming + sql: + - CREATE FUNCTION IF NOT EXISTS `MY_PY_UDF` AS 'my_py_udf.my_py_udf' LANGUAGE PYTHON + - CREATE DATABASE IF NOT EXISTS `PROFILE` WITH () + - CREATE TABLE IF NOT EXISTS `PROFILE`.`MEMBERS` (`FIRST_NAME` VARCHAR, `LAST_NAME` VARCHAR, `MEMBER_URN` VARCHAR, `COMPANY_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') + - INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `MY_PY_UDF`(`FIRST_NAME`) AS `PAGE_URN`, `MEMBER_URN` FROM `PROFILE`.`MEMBERS` + files: + {my_py_udf.py: x = 1} +!specify PAGE_VIEWS + +# Clean up +drop function my_py_udf; +(0 rows modified) + +!update diff --git a/hoptimator-k8s/src/test/resources/k8s-ddl-udf.id b/hoptimator-k8s/src/test/resources/k8s-ddl-udf.id new file mode 100644 index 00000000..1f5ea198 --- /dev/null +++ b/hoptimator-k8s/src/test/resources/k8s-ddl-udf.id @@ -0,0 +1,71 @@ +!set outputformat mysql +!use k8s + +# Register a VARCHAR UDF (default return type) +create function my_udf as 'com.example.MyUdf'; +(0 rows modified) + +!update + +# UDF validates against real data +select my_udf("FIRST_NAME") as val from profile.members; ++-----+ +| VAL | ++-----+ +| | +| | +| | ++-----+ +(3 rows) + +!ok + +# Register an INTEGER UDF with explicit return type +create function int_udf returns integer as 'com.example.IntUdf'; +(0 rows modified) + +!update + +# Register a Python UDF with LANGUAGE clause +create function py_udf returns varchar language python as 'my_func' in 'my_module'; +(0 rows modified) + +!update + +# Verify pipeline SQL includes CREATE FUNCTION for all registered UDFs before connectors +insert into ads.page_views select my_udf("FIRST_NAME") as page_urn, "MEMBER_URN" as member_urn from profile.members; +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob +metadata: + name: ads-database-pageviews +spec: + dialect: Flink + executionMode: Streaming + sql: + - CREATE FUNCTION IF NOT EXISTS `MY_UDF` AS 'com.example.MyUdf' + - CREATE FUNCTION IF NOT EXISTS `INT_UDF` AS 'com.example.IntUdf' + - CREATE FUNCTION IF NOT EXISTS `PY_UDF` AS 'my_module.my_func' LANGUAGE PYTHON + - CREATE DATABASE IF NOT EXISTS `PROFILE` WITH () + - CREATE TABLE IF NOT EXISTS `PROFILE`.`MEMBERS` (`FIRST_NAME` VARCHAR, `LAST_NAME` VARCHAR, `MEMBER_URN` VARCHAR, `COMPANY_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') + - INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `MY_UDF`(`FIRST_NAME`) AS `PAGE_URN`, `MEMBER_URN` FROM `PROFILE`.`MEMBERS` + files: + {} +!specify PAGE_VIEWS + +# Clean up +drop function my_udf; +(0 rows modified) + +!update + +drop function int_udf; +(0 rows modified) + +!update + +drop function py_udf; +(0 rows modified) + +!update diff --git a/hoptimator-k8s/src/test/resources/k8s-ddl.id b/hoptimator-k8s/src/test/resources/k8s-ddl.id index 251a8bb5..2fb5277d 100644 --- a/hoptimator-k8s/src/test/resources/k8s-ddl.id +++ b/hoptimator-k8s/src/test/resources/k8s-ddl.id @@ -163,106 +163,91 @@ drop view ads.audience; !update insert into ads.page_views select first_name as page_urn, last_name as member_urn from profile.members where first_name = 'Alice'; -apiVersion: flink.apache.org/v1beta1 -kind: FlinkSessionJob +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob metadata: name: ads-database-pageviews spec: - deploymentName: basic-session-deployment - job: - entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner - args: - - CREATE DATABASE IF NOT EXISTS `PROFILE` WITH () - - CREATE TABLE IF NOT EXISTS `PROFILE`.`MEMBERS` (`FIRST_NAME` VARCHAR, `LAST_NAME` VARCHAR, `MEMBER_URN` VARCHAR, `COMPANY_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') - - CREATE DATABASE IF NOT EXISTS `ADS` WITH () - - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') - - INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `FIRST_NAME` AS `PAGE_URN`, `LAST_NAME` AS `MEMBER_URN` FROM `PROFILE`.`MEMBERS` WHERE `FIRST_NAME` = 'Alice' - jarURI: file:///opt/hoptimator-flink-runner.jar - parallelism: 1 - upgradeMode: stateless - state: running + dialect: Flink + executionMode: Streaming + sql: + - CREATE DATABASE IF NOT EXISTS `PROFILE` WITH () + - CREATE TABLE IF NOT EXISTS `PROFILE`.`MEMBERS` (`FIRST_NAME` VARCHAR, `LAST_NAME` VARCHAR, `MEMBER_URN` VARCHAR, `COMPANY_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') + - INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `FIRST_NAME` AS `PAGE_URN`, `LAST_NAME` AS `MEMBER_URN` FROM `PROFILE`.`MEMBERS` WHERE `FIRST_NAME` = 'Alice' + files: + {} !specify PAGE_VIEWS insert into ads.ad_clicks select * from ads.ad_clicks; -apiVersion: flink.apache.org/v1beta1 -kind: FlinkSessionJob +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob metadata: name: ads-database-adclicks spec: - deploymentName: basic-session-deployment - job: - entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner - args: - - CREATE DATABASE IF NOT EXISTS `ADS` WITH () - - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_source` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') - - CREATE DATABASE IF NOT EXISTS `ADS` WITH () - - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') - - INSERT INTO `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN`, `MEMBER_URN`) SELECT `CAMPAIGN_URN`, `MEMBER_URN` FROM `ADS`.`AD_CLICKS_source` - jarURI: file:///opt/hoptimator-flink-runner.jar - parallelism: 1 - upgradeMode: stateless - state: running + dialect: Flink + executionMode: Streaming + sql: + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_source` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') + - INSERT INTO `ADS`.`AD_CLICKS_sink` (`CAMPAIGN_URN`, `MEMBER_URN`) SELECT `CAMPAIGN_URN`, `MEMBER_URN` FROM `ADS`.`AD_CLICKS_source` + files: + {} !specify AD_CLICKS create or replace materialized view ads."PAGE_VIEWS$forward-field-order" as select CAMPAIGN_URN as PAGE_URN, MEMBER_URN as MEMBER_URN from ads.ad_clicks; -apiVersion: flink.apache.org/v1beta1 -kind: FlinkSessionJob +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob metadata: name: ads-database-pageviews-forward-field-order spec: - deploymentName: basic-session-deployment - job: - entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner - args: - - CREATE DATABASE IF NOT EXISTS `ADS` WITH () - - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') - - CREATE DATABASE IF NOT EXISTS `ADS` WITH () - - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') - - INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `CAMPAIGN_URN` AS `PAGE_URN`, `MEMBER_URN` FROM `ADS`.`AD_CLICKS` - jarURI: file:///opt/hoptimator-flink-runner.jar - parallelism: 1 - upgradeMode: stateless - state: running + dialect: Flink + executionMode: Streaming + sql: + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') + - INSERT INTO `ADS`.`PAGE_VIEWS` (`PAGE_URN`, `MEMBER_URN`) SELECT `CAMPAIGN_URN` AS `PAGE_URN`, `MEMBER_URN` FROM `ADS`.`AD_CLICKS` + files: + {} !specify PAGE_VIEWS create or replace materialized view ads."PAGE_VIEWS$reverse-field-order" as select MEMBER_URN as MEMBER_URN, CAMPAIGN_URN as PAGE_URN from ads.ad_clicks; -apiVersion: flink.apache.org/v1beta1 -kind: FlinkSessionJob +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob metadata: name: ads-database-pageviews-reverse-field-order spec: - deploymentName: basic-session-deployment - job: - entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner - args: - - CREATE DATABASE IF NOT EXISTS `ADS` WITH () - - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') - - CREATE DATABASE IF NOT EXISTS `ADS` WITH () - - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') - - INSERT INTO `ADS`.`PAGE_VIEWS` (`MEMBER_URN`, `PAGE_URN`) SELECT `MEMBER_URN`, `CAMPAIGN_URN` AS `PAGE_URN` FROM `ADS`.`AD_CLICKS` - jarURI: file:///opt/hoptimator-flink-runner.jar - parallelism: 1 - upgradeMode: stateless - state: running + dialect: Flink + executionMode: Streaming + sql: + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`AD_CLICKS` (`CAMPAIGN_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='blackhole') + - INSERT INTO `ADS`.`PAGE_VIEWS` (`MEMBER_URN`, `PAGE_URN`) SELECT `MEMBER_URN`, `CAMPAIGN_URN` AS `PAGE_URN` FROM `ADS`.`AD_CLICKS` + files: + {} !specify PAGE_VIEWS create or replace materialized view PROFILE."MEMBERS$test" AS SELECT "PAGE_URN" AS "COMPANY_URN", "MEMBER_URN" FROM ADS.PAGE_VIEWS; -apiVersion: flink.apache.org/v1beta1 -kind: FlinkSessionJob +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob metadata: name: profile-database-members-test spec: - deploymentName: basic-session-deployment - job: - entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner - args: - - CREATE DATABASE IF NOT EXISTS `ADS` WITH () - - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') - - CREATE DATABASE IF NOT EXISTS `PROFILE` WITH () - - CREATE TABLE IF NOT EXISTS `PROFILE`.`MEMBERS` (`FIRST_NAME` VARCHAR, `LAST_NAME` VARCHAR, `MEMBER_URN` VARCHAR, `COMPANY_URN` VARCHAR) WITH ('connector'='blackhole') - - INSERT INTO `PROFILE`.`MEMBERS` (`COMPANY_URN`, `MEMBER_URN`) SELECT `PAGE_URN` AS `COMPANY_URN`, `MEMBER_URN` FROM `ADS`.`PAGE_VIEWS` - jarURI: file:///opt/hoptimator-flink-runner.jar - parallelism: 1 - upgradeMode: stateless - state: running + dialect: Flink + executionMode: Streaming + sql: + - CREATE DATABASE IF NOT EXISTS `ADS` WITH () + - CREATE TABLE IF NOT EXISTS `ADS`.`PAGE_VIEWS` (`PAGE_URN` VARCHAR, `MEMBER_URN` VARCHAR) WITH ('connector'='datagen', 'number-of-rows'='10') + - CREATE DATABASE IF NOT EXISTS `PROFILE` WITH () + - CREATE TABLE IF NOT EXISTS `PROFILE`.`MEMBERS` (`FIRST_NAME` VARCHAR, `LAST_NAME` VARCHAR, `MEMBER_URN` VARCHAR, `COMPANY_URN` VARCHAR) WITH ('connector'='blackhole') + - INSERT INTO `PROFILE`.`MEMBERS` (`COMPANY_URN`, `MEMBER_URN`) SELECT `PAGE_URN` AS `COMPANY_URN`, `MEMBER_URN` FROM `ADS`.`PAGE_VIEWS` + files: + {} !specify MEMBERS diff --git a/hoptimator-k8s/src/test/resources/k8s-metadata.id b/hoptimator-k8s/src/test/resources/k8s-metadata.id index ef1d492c..a9fb0f98 100644 --- a/hoptimator-k8s/src/test/resources/k8s-metadata.id +++ b/hoptimator-k8s/src/test/resources/k8s-metadata.id @@ -64,8 +64,8 @@ select name, failed from "k8s".pipeline_elements order by name; +---------------------------------------------+--------+ | NAME | FAILED | +---------------------------------------------+--------+ -| FlinkSessionJob/ads-database-audience2 | false | -| FlinkSessionJob/ads-database-pages | false | +| SqlJob/ads-database-audience2 | false | +| SqlJob/ads-database-pages | false | | TableTrigger/ads-database-pageviews-trigger | false | +---------------------------------------------+--------+ (3 rows) @@ -76,8 +76,8 @@ select * from "k8s".pipeline_element_map order by element_name, pipeline_name; +---------------------------------------------+---------------+ | ELEMENT_NAME | PIPELINE_NAME | +---------------------------------------------+---------------+ -| FlinkSessionJob/ads-database-audience2 | ads-audience2 | -| FlinkSessionJob/ads-database-pages | ads-pages | +| SqlJob/ads-database-audience2 | ads-audience2 | +| SqlJob/ads-database-pages | ads-pages | | TableTrigger/ads-database-pageviews-trigger | ads-audience2 | | TableTrigger/ads-database-pageviews-trigger | ads-pages | +---------------------------------------------+---------------+ @@ -90,9 +90,9 @@ name = t2.element_name) pe on pl.name = pe.pipeline_name order by pipeline_name, +---------------+---------------------------------------------+----------------+ | PIPELINE_NAME | ELEMENT_NAME | ELEMENT_FAILED | +---------------+---------------------------------------------+----------------+ -| ads-audience2 | FlinkSessionJob/ads-database-audience2 | false | +| ads-audience2 | SqlJob/ads-database-audience2 | false | | ads-audience2 | TableTrigger/ads-database-pageviews-trigger | false | -| ads-pages | FlinkSessionJob/ads-database-pages | false | +| ads-pages | SqlJob/ads-database-pages | false | | ads-pages | TableTrigger/ads-database-pageviews-trigger | false | +---------------+---------------------------------------------+----------------+ (4 rows) diff --git a/hoptimator-kafka/src/test/resources/kafka-ddl-create-table.id b/hoptimator-kafka/src/test/resources/kafka-ddl-create-table.id index 7c11d61d..e4c7ff62 100644 --- a/hoptimator-kafka/src/test/resources/kafka-ddl-create-table.id +++ b/hoptimator-kafka/src/test/resources/kafka-ddl-create-table.id @@ -19,24 +19,21 @@ create or replace table "KAFKA"."create-table-test" ("KEY" VARCHAR, "VALUE" BINA !describe "KAFKA"."create-table-test" insert into KAFKA."create-table-test" select * from KAFKA."existing-topic-2"; -apiVersion: flink.apache.org/v1beta1 -kind: FlinkSessionJob +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob metadata: name: kafka-database-create-table-test spec: - deploymentName: basic-session-deployment - job: - entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner - args: - - CREATE DATABASE IF NOT EXISTS `KAFKA` WITH () - - CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json') - - CREATE DATABASE IF NOT EXISTS `KAFKA` WITH () - - CREATE TABLE IF NOT EXISTS `KAFKA`.`create-table-test` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='create-table-test', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json') - - INSERT INTO `KAFKA`.`create-table-test` (`KEY`, `VALUE`) SELECT `KEY`, `VALUE` FROM `KAFKA`.`existing-topic-2` - jarURI: file:///opt/hoptimator-flink-runner.jar - parallelism: 1 - upgradeMode: stateless - state: running + dialect: Flink + executionMode: Streaming + sql: + - CREATE DATABASE IF NOT EXISTS `KAFKA` WITH () + - CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json') + - CREATE DATABASE IF NOT EXISTS `KAFKA` WITH () + - CREATE TABLE IF NOT EXISTS `KAFKA`.`create-table-test` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='create-table-test', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json') + - INSERT INTO `KAFKA`.`create-table-test` (`KEY`, `VALUE`) SELECT `KEY`, `VALUE` FROM `KAFKA`.`existing-topic-2` + files: + {} --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic diff --git a/hoptimator-kafka/src/test/resources/kafka-ddl.id b/hoptimator-kafka/src/test/resources/kafka-ddl.id index 32b76d33..3c84c004 100644 --- a/hoptimator-kafka/src/test/resources/kafka-ddl.id +++ b/hoptimator-kafka/src/test/resources/kafka-ddl.id @@ -22,24 +22,21 @@ drop view kafka."new-topic$kafka-test"; !update insert into kafka."existing-topic-1" select * from kafka."existing-topic-2"; -apiVersion: flink.apache.org/v1beta1 -kind: FlinkSessionJob +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob metadata: name: kafka-database-existing-topic-1 spec: - deploymentName: basic-session-deployment - job: - entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner - args: - - CREATE DATABASE IF NOT EXISTS `KAFKA` WITH () - - CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'k1'='v1', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json') - - CREATE DATABASE IF NOT EXISTS `KAFKA` WITH () - - CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'k2'='v2', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json') - - INSERT INTO `KAFKA`.`existing-topic-1` (`KEY`, `VALUE`) SELECT `KEY`, `VALUE` FROM `KAFKA`.`existing-topic-2` - jarURI: file:///opt/hoptimator-flink-runner.jar - parallelism: 2 - upgradeMode: stateless - state: running + dialect: Flink + executionMode: Streaming + sql: + - CREATE DATABASE IF NOT EXISTS `KAFKA` WITH () + - CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-2` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'k1'='v1', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-2', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json') + - CREATE DATABASE IF NOT EXISTS `KAFKA` WITH () + - CREATE TABLE IF NOT EXISTS `KAFKA`.`existing-topic-1` (`KEY` VARCHAR, `VALUE` BINARY) WITH ('connector'='kafka', 'k2'='v2', 'key.fields'='KEY', 'key.format'='raw', 'properties.bootstrap.servers'='one-kafka-bootstrap.kafka.svc.cluster.local:9094', 'scan.startup.mode'='earliest-offset', 'topic'='existing-topic-1', 'value.fields-include'='EXCEPT_KEY', 'value.format'='json') + - INSERT INTO `KAFKA`.`existing-topic-1` (`KEY`, `VALUE`) SELECT `KEY`, `VALUE` FROM `KAFKA`.`existing-topic-2` + files: + {} --- apiVersion: kafka.strimzi.io/v1beta2 kind: KafkaTopic diff --git a/hoptimator-mysql/src/test/resources/mysql-ddl.id b/hoptimator-mysql/src/test/resources/mysql-ddl.id index 7dfecfd8..29f8f646 100644 --- a/hoptimator-mysql/src/test/resources/mysql-ddl.id +++ b/hoptimator-mysql/src/test/resources/mysql-ddl.id @@ -21,24 +21,21 @@ drop materialized view "MYSQL"."testdb"."users$partial"; !update insert into mysql."testdb"."users" ("user_id", "username", "email") select "user_id", 'a' as "username", 'b' as "email" from mysql."testdb"."orders"; -apiVersion: flink.apache.org/v1beta1 -kind: FlinkSessionJob +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob metadata: name: mysql-users spec: - deploymentName: basic-session-deployment - job: - entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner - args: - - CREATE CATALOG IF NOT EXISTS `MYSQL` WITH () - - CREATE DATABASE IF NOT EXISTS `MYSQL`.`testdb` WITH () - - CREATE TABLE IF NOT EXISTS `MYSQL`.`testdb`.`orders` (`order_id` INTEGER, `user_id` INTEGER, `product_name` VARCHAR, `quantity` INTEGER, `price` DECIMAL, `order_date` TIMESTAMP, `status` VARCHAR) WITH ('connector'='mysql', 'hostname'='localhost', 'password'='hoptimatorpassword', 'port'='3306', 'tables'='orders', 'username'='hoptimator') - - CREATE CATALOG IF NOT EXISTS `MYSQL` WITH () - - CREATE DATABASE IF NOT EXISTS `MYSQL`.`testdb` WITH () - - CREATE TABLE IF NOT EXISTS `MYSQL`.`testdb`.`users` (`user_id` INTEGER, `username` VARCHAR, `email` VARCHAR, `created_at` TIMESTAMP, `is_active` BOOLEAN) WITH ('connector'='mysql', 'hostname'='localhost', 'password'='hoptimatorpassword', 'port'='3306', 'tables'='users', 'username'='hoptimator') - - INSERT INTO `MYSQL`.`testdb`.`users` (`user_id`, `username`, `email`) SELECT `user_id`, 'a' AS `username`, 'b' AS `email` FROM `MYSQL`.`testdb`.`orders` - jarURI: file:///opt/hoptimator-flink-runner.jar - parallelism: 1 - upgradeMode: stateless - state: running + dialect: Flink + executionMode: Streaming + sql: + - CREATE CATALOG IF NOT EXISTS `MYSQL` WITH () + - CREATE DATABASE IF NOT EXISTS `MYSQL`.`testdb` WITH () + - CREATE TABLE IF NOT EXISTS `MYSQL`.`testdb`.`orders` (`order_id` INTEGER, `user_id` INTEGER, `product_name` VARCHAR, `quantity` INTEGER, `price` DECIMAL, `order_date` TIMESTAMP, `status` VARCHAR) WITH ('connector'='mysql', 'hostname'='localhost', 'password'='hoptimatorpassword', 'port'='3306', 'tables'='orders', 'username'='hoptimator') + - CREATE CATALOG IF NOT EXISTS `MYSQL` WITH () + - CREATE DATABASE IF NOT EXISTS `MYSQL`.`testdb` WITH () + - CREATE TABLE IF NOT EXISTS `MYSQL`.`testdb`.`users` (`user_id` INTEGER, `username` VARCHAR, `email` VARCHAR, `created_at` TIMESTAMP, `is_active` BOOLEAN) WITH ('connector'='mysql', 'hostname'='localhost', 'password'='hoptimatorpassword', 'port'='3306', 'tables'='users', 'username'='hoptimator') + - INSERT INTO `MYSQL`.`testdb`.`users` (`user_id`, `username`, `email`) SELECT `user_id`, 'a' AS `username`, 'b' AS `email` FROM `MYSQL`.`testdb`.`orders` + files: + {} !specify users diff --git a/hoptimator-operator-integration/build.gradle b/hoptimator-operator-integration/build.gradle index 92211ab6..8fb8c952 100644 --- a/hoptimator-operator-integration/build.gradle +++ b/hoptimator-operator-integration/build.gradle @@ -6,6 +6,7 @@ plugins { dependencies { implementation project(':hoptimator-operator') + runtimeOnly project(':hoptimator-flink-adapter') implementation libs.slf4j.simple testImplementation libs.assertj diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java index 57e00c81..433fa49a 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/Template.java @@ -121,7 +121,7 @@ public String getOrDefault(String key, ThrowingSupplier f) throws SQLExc } private String formatMapAsString(Map configMap) { - return new Yaml().dump(configMap); + return new Yaml().dump(configMap).trim(); } private String formatPropertiesAsString(Properties props) { diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java index f48e4c0c..ac3263ba 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/PipelineRel.java @@ -2,8 +2,10 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.linkedin.hoptimator.ThrowingFunction; +import com.linkedin.hoptimator.UserFunction; import java.sql.Connection; import java.sql.SQLException; +import java.util.ArrayList; import java.sql.SQLNonTransientException; import java.util.HashMap; import java.util.LinkedHashMap; @@ -51,6 +53,7 @@ public interface PipelineRel extends RelNode { /** Implements a deployable Pipeline. */ class Implementor { private final Map sources = new LinkedHashMap<>(); + private final List functions = new ArrayList<>(); private final ImmutablePairList targetFields; private final Map hints; private RelNode query; @@ -98,6 +101,10 @@ public void setSink(String database, List path, RelDataType rowType, Map this.sink = new Sink(database, path, newOptions); } + public void addFunction(UserFunction func) { + functions.add(func); + } + public void setQuery(RelNode query) { this.query = query; } @@ -109,12 +116,35 @@ public Pipeline pipeline(String name, Connection connection) throws SQLException templateEvals.put("query", query(connection)); templateEvals.put("fieldMap", fieldMap()); - Job job = new Job(name, sources.keySet(), sink, templateEvals); + // Collect inline file content and JAR references from functions + Map files = new HashMap<>(); + for (UserFunction func : functions) { + String code = func.options().get("CODE"); + if (code != null) { + // Derive filename from AS clause (e.g., 'my_module.my_func' -> 'my_module.py') + String module = func.as(); + int dotIdx = module.indexOf('.'); + String filename = (dotIdx >= 0 ? module.substring(0, dotIdx) : module) + ".py"; + files.put(filename, code); + } + String jar = func.options().get("JAR"); + if (jar != null) { + // Store JAR reference as URL value (data plane fetches it) + String jarName = jar.contains("/") ? jar.substring(jar.lastIndexOf('/') + 1) : jar; + files.put(jarName, jar); + } + } + + Job job = new Job(name, sources.keySet(), sink, templateEvals, files); return new Pipeline(sources.keySet(), sink, job); } private ScriptImplementor script(Connection connection) throws SQLException { ScriptImplementor script = ScriptImplementor.empty(); + // Emit function definitions first + for (UserFunction func : functions) { + script = script.function(func.name(), func.as(), func.options()); + } // Check if we need to add suffixes to avoid table name collisions boolean needsSuffixes = hasTableNameCollision(); diff --git a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java index 33ecd031..0ff5f451 100644 --- a/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java +++ b/hoptimator-util/src/main/java/com/linkedin/hoptimator/util/planner/ScriptImplementor.java @@ -116,6 +116,11 @@ default ScriptImplementor database(@Nullable String catalog, String database) { return with(new DatabaseImplementor(catalog, database)); } + /** Append a function definition, e.g. `CREATE FUNCTION IF NOT EXISTS ... AS ...` */ + default ScriptImplementor function(String name, String as, Map options) { + return with(new FunctionImplementor(name, as, options)); + } + /** Append an insert statement, e.g. `INSERT INTO ... SELECT ...` */ default ScriptImplementor insert(@Nullable String catalog, String schema, String table, RelNode relNode) { return insert(catalog, schema, table, null, relNode, null, Collections.emptyMap()); @@ -522,6 +527,38 @@ public void implement(SqlWriter w) { } } + /** Implements a CREATE FUNCTION IF NOT EXISTS statement */ + class FunctionImplementor implements ScriptImplementor { + private final String name; + private final String as; + private final Map options; + + public FunctionImplementor(String name, String as, Map options) { + this.name = name; + this.as = as; + this.options = options != null ? options : Collections.emptyMap(); + } + + @Override + public void implement(SqlWriter w) { + w.keyword("CREATE FUNCTION IF NOT EXISTS"); + w.identifier(name, true); + w.keyword("AS"); + w.literal("'" + as + "'"); + String language = options.get("LANGUAGE"); + if (language != null) { + w.keyword("LANGUAGE"); + w.literal(language); + } + String jar = options.get("JAR"); + if (jar != null) { + w.keyword("USING JAR"); + w.literal("'" + jar + "'"); + } + w.literal(";"); + } + } + /** Implements an identifier like TRACKING.'PageViewEvent' */ class CompoundIdentifierImplementor implements ScriptImplementor { private final String catalog; diff --git a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorTest.java b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorTest.java index 9d7e1f5b..12ecceb9 100644 --- a/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorTest.java +++ b/hoptimator-util/src/test/java/com/linkedin/hoptimator/util/planner/ScriptImplementorTest.java @@ -16,10 +16,12 @@ import java.util.AbstractMap; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -174,6 +176,68 @@ public void testFullPipelineWithCollision() { "Should insert into sink table. Got: " + sql); } + @Test + public void testFunctionWithoutLanguage() { + String sql = ScriptImplementor.empty() + .function("my_udf", "com.example.MyUdf", Collections.emptyMap()) + .sql(); + + assertTrue(sql.contains("CREATE FUNCTION IF NOT EXISTS"), + "Should contain CREATE FUNCTION IF NOT EXISTS. Got: " + sql); + assertTrue(sql.contains("my_udf") || sql.contains("`my_udf`"), + "Should contain function name. Got: " + sql); + assertTrue(sql.contains("'com.example.MyUdf'"), + "Should contain AS clause. Got: " + sql); + assertFalse(sql.contains("LANGUAGE"), + "Should not contain LANGUAGE clause when not specified. Got: " + sql); + } + + @Test + public void testFunctionWithLanguage() { + Map options = new HashMap<>(); + options.put("LANGUAGE", "PYTHON"); + + String sql = ScriptImplementor.empty() + .function("my_udf", "my_module.my_func", options) + .sql(); + + assertTrue(sql.contains("CREATE FUNCTION IF NOT EXISTS"), + "Should contain CREATE FUNCTION IF NOT EXISTS. Got: " + sql); + assertTrue(sql.contains("'my_module.my_func'"), + "Should contain AS clause. Got: " + sql); + assertTrue(sql.contains("LANGUAGE"), + "Should contain LANGUAGE clause. Got: " + sql); + assertTrue(sql.contains("PYTHON"), + "Should contain PYTHON language. Got: " + sql); + } + + @Test + public void testFunctionBeforeConnector() { + RelDataTypeFactory typeFactory = new SqlTypeFactoryImpl(RelDataTypeSystem.DEFAULT); + RelDataType rowType = typeFactory.builder() + .add("ID", typeFactory.createSqlType(SqlTypeName.INTEGER)) + .build(); + + Map config = new HashMap<>(); + config.put("connector", "datagen"); + + Map funcOptions = new HashMap<>(); + funcOptions.put("LANGUAGE", "PYTHON"); + + String sql = ScriptImplementor.empty() + .function("my_udf", "my_module.my_func", funcOptions) + .database(null, "TEST") + .connector(null, "TEST", "MY_TABLE", rowType, config) + .sql(); + + int funcIdx = sql.indexOf("CREATE FUNCTION"); + int tableIdx = sql.indexOf("CREATE TABLE"); + assertTrue(funcIdx >= 0, "Should contain CREATE FUNCTION. Got: " + sql); + assertTrue(tableIdx >= 0, "Should contain CREATE TABLE. Got: " + sql); + assertTrue(funcIdx < tableIdx, + "CREATE FUNCTION should appear before CREATE TABLE. Got: " + sql); + } + @Test public void testExplicitColumnEnumeration() { // Test for Flink 1.20 regression where INSERT with SELECT * fails diff --git a/hoptimator-venice/src/test/resources/venice-ddl-create-table.id b/hoptimator-venice/src/test/resources/venice-ddl-create-table.id index c6f243dc..86d77320 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-create-table.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-create-table.id @@ -66,24 +66,21 @@ Key schema evolution is not supported in Venice. Store integration-test-store ha # Test insert operation with existing store insert into "VENICE"."integration-test-store" ("KEY_id", "i") select "KEY", "intField" AS "i" from "VENICE"."test-store-primitive"; -apiVersion: flink.apache.org/v1beta1 -kind: FlinkSessionJob +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob metadata: name: venice-integration-test-store spec: - deploymentName: basic-session-deployment - job: - entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner - args: - - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-primitive` (`KEY` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY', 'key.fields-prefix'='', 'key.type'='PRIMITIVE', 'partial-update-mode'='true', 'storeName'='test-store-primitive', 'value.fields-include'='EXCEPT_KEY') - - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE`.`integration-test-store` (`KEY_id` INTEGER, `i` INTEGER, `s` VARCHAR, `new_field` DOUBLE) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='integration-test-store', 'value.fields-include'='EXCEPT_KEY') - - INSERT INTO `VENICE`.`integration-test-store` (`KEY_id`, `i`) SELECT `KEY` AS `KEY_id`, `intField` AS `i` FROM `VENICE`.`test-store-primitive` - jarURI: file:///opt/hoptimator-flink-runner.jar - parallelism: 1 - upgradeMode: stateless - state: running + dialect: Flink + executionMode: Streaming + sql: + - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () + - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-primitive` (`KEY` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY', 'key.fields-prefix'='', 'key.type'='PRIMITIVE', 'partial-update-mode'='true', 'storeName'='test-store-primitive', 'value.fields-include'='EXCEPT_KEY') + - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () + - CREATE TABLE IF NOT EXISTS `VENICE`.`integration-test-store` (`KEY_id` INTEGER, `i` INTEGER, `s` VARCHAR, `new_field` DOUBLE) WITH ('connector'='venice', 'key.fields'='KEY_id', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='integration-test-store', 'value.fields-include'='EXCEPT_KEY') + - INSERT INTO `VENICE`.`integration-test-store` (`KEY_id`, `i`) SELECT `KEY` AS `KEY_id`, `intField` AS `i` FROM `VENICE`.`test-store-primitive` + files: + {} !specify integration-test-store # Clean up - drop tables diff --git a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id index f4d4c05c..76a9fd74 100644 --- a/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id +++ b/hoptimator-venice/src/test/resources/venice-ddl-insert-partial.id @@ -23,22 +23,19 @@ drop materialized view "VENICE"."test-store$insert-partial"; !update insert into "VENICE"."test-store" ("KEY_name", "KEY_age", "intField") select "stringField" AS "KEY_name", "KEY" AS "KEY_age", "intField" from "VENICE"."test-store-primitive"; -apiVersion: flink.apache.org/v1beta1 -kind: FlinkSessionJob +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: SqlJob metadata: name: venice-test-store spec: - deploymentName: basic-session-deployment - job: - entryClass: com.linkedin.hoptimator.flink.runner.FlinkRunner - args: - - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-primitive` (`KEY` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY', 'key.fields-prefix'='', 'key.type'='PRIMITIVE', 'partial-update-mode'='true', 'storeName'='test-store-primitive', 'value.fields-include'='EXCEPT_KEY') - - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () - - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_name` VARCHAR, `KEY_age` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_name;KEY_age', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') - - INSERT INTO `VENICE`.`test-store` (`KEY_name`, `KEY_age`, `intField`) SELECT `stringField` AS `KEY_name`, `KEY` AS `KEY_age`, `intField` FROM `VENICE`.`test-store-primitive` - jarURI: file:///opt/hoptimator-flink-runner.jar - parallelism: 1 - upgradeMode: stateless - state: running + dialect: Flink + executionMode: Streaming + sql: + - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () + - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store-primitive` (`KEY` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY', 'key.fields-prefix'='', 'key.type'='PRIMITIVE', 'partial-update-mode'='true', 'storeName'='test-store-primitive', 'value.fields-include'='EXCEPT_KEY') + - CREATE DATABASE IF NOT EXISTS `VENICE` WITH () + - CREATE TABLE IF NOT EXISTS `VENICE`.`test-store` (`KEY_name` VARCHAR, `KEY_age` INTEGER, `intField` INTEGER, `stringField` VARCHAR) WITH ('connector'='venice', 'key.fields'='KEY_name;KEY_age', 'key.fields-prefix'='KEY_', 'key.type'='RECORD', 'partial-update-mode'='true', 'storeName'='test-store', 'value.fields-include'='EXCEPT_KEY') + - INSERT INTO `VENICE`.`test-store` (`KEY_name`, `KEY_age`, `intField`) SELECT `stringField` AS `KEY_name`, `KEY` AS `KEY_age`, `intField` FROM `VENICE`.`test-store-primitive` + files: + {} !specify test-store diff --git a/settings.gradle b/settings.gradle index 326a82ef..883d4a18 100644 --- a/settings.gradle +++ b/settings.gradle @@ -6,6 +6,7 @@ include 'hoptimator-avro' include 'hoptimator-catalog' // <-- marked for deletion include 'hoptimator-cli' include 'hoptimator-demodb' +include 'hoptimator-flink-adapter' include 'hoptimator-flink-runner' include 'hoptimator-jdbc' include 'hoptimator-jdbc-driver'