Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package com.linkedin.hoptimator;

import java.util.List;
import java.util.Map;


/** Represents a CREATE JOB request for deploying a SqlJob to Kubernetes. */
public class SqlJobDeployable implements Deployable {

private final String name;
private final String dialect;
private final String executionMode;
private final List<String> sql;
private final Map<String, String> options;

public SqlJobDeployable(String name, String dialect, String executionMode,
List<String> sql, Map<String, String> options) {
this.name = name;
this.dialect = dialect;
this.executionMode = executionMode;
this.sql = sql;
this.options = options;
}

public String name() {
return name;
}

/** Dialect, e.g. "Flink", "FlinkBeam". May be null for default. */
public String dialect() {
return dialect;
}

/** Execution mode, e.g. "Streaming", "Batch". May be null for default. */
public String executionMode() {
return executionMode;
}

public List<String> sql() {
return sql;
}

public Map<String, String> options() {
return options;
}

@Override
public String toString() {
return "SqlJob[" + name + "]";
}
}
10 changes: 10 additions & 0 deletions hoptimator-jdbc/src/main/codegen/config.fmpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ data: {
"org.apache.calcite.sql.ddl.SqlDdlNodes"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateDatabase"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateFunction"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateJob"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable"
"com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger"
Expand Down Expand Up @@ -67,6 +68,10 @@ data: {
"REFRESH"
"PAUSE"
"RESUME"
"JOB"
"FLINK"
"STREAMING"
"BATCH"
]

# List of non-reserved keywords to add;
Expand All @@ -86,12 +91,17 @@ data: {
"REFRESH"
"PAUSE"
"RESUME"
"JOB"
"FLINK"
"STREAMING"
"BATCH"
]

# List of methods for parsing extensions to "CREATE [OR REPLACE]" calls.
# Each must accept arguments "(SqlParserPos pos, boolean replace)".
# Example: "SqlCreateForeignSchema".
createStatementParserMethods: [
"SqlCreateJob"
"SqlCreateDatabase"
"SqlCreateMaterializedView"
"SqlCreateTrigger"
Expand Down
29 changes: 29 additions & 0 deletions hoptimator-jdbc/src/main/codegen/includes/parserImpls.ftl
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,35 @@ SqlCreate SqlCreateTrigger(Span s, boolean replace) :
}
}

SqlCreate SqlCreateJob(Span s, boolean replace) :
{
final boolean ifNotExists;
final SqlIdentifier id;
final SqlNode sqlBody;
String dialect = null;
String executionMode = null;
SqlNodeList optionList = null;
}
{
[ <FLINK> { dialect = "Flink"; } ]
(
<STREAMING> { executionMode = "Streaming"; }
|
<BATCH> { executionMode = "Batch"; }
|
{ }
)
<JOB> ifNotExists = IfNotExistsOpt()
id = CompoundIdentifier()
<AS>
sqlBody = StringLiteral()
[ optionList = Options() ]
{
return new SqlCreateJob(s.end(this), replace, ifNotExists, id, sqlBody,
dialect, executionMode, optionList);
}
}

SqlCreate SqlCreateDatabase(Span s, boolean replace) :
{
final boolean ifNotExists;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.linkedin.hoptimator.View;
import com.linkedin.hoptimator.jdbc.ddl.HoptimatorDdlParserImpl;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateDatabase;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateJob;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTrigger;
Expand Down Expand Up @@ -282,6 +283,19 @@ public void execute(SqlCreateDatabase create, CalcitePrepare.Context context) {
logger.info("CREATE DATABASE {} completed", create.name);
}

/** Executes a {@code CREATE JOB} command. */
public void execute(SqlCreateJob create, CalcitePrepare.Context context) {
HoptimatorDdlUtils.DdlMode mode = create.getReplace()
? HoptimatorDdlUtils.DdlMode.UPDATE : HoptimatorDdlUtils.DdlMode.CREATE;
try {
HoptimatorDdlUtils.processCreateJob(connection, create, mode);
} catch (SQLException | RuntimeException e) {
logger.info("Failed to deploy job {}", create.name);
throw new DdlException(create, e.getMessage(), e);
}
logger.info("CREATE JOB {} completed", create.name);
}

/** Executes a {@code PAUSE TRIGGER} command. */
public void execute(SqlPauseTrigger pause, CalcitePrepare.Context context) {
updateTriggerPausedState(pause, pause.name, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import com.linkedin.hoptimator.DatabaseDeployable;
import com.linkedin.hoptimator.Deployer;
import com.linkedin.hoptimator.MaterializedView;
import com.linkedin.hoptimator.SqlJobDeployable;
import com.linkedin.hoptimator.Pipeline;
import com.linkedin.hoptimator.Source;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateDatabase;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateJob;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateMaterializedView;
import com.linkedin.hoptimator.jdbc.ddl.SqlCreateTable;
import com.linkedin.hoptimator.util.DeploymentService;
Expand Down Expand Up @@ -672,6 +674,69 @@ static SpecifyResult processCreateDatabase(HoptimatorConnection conn,
}
}

/**
* Shared implementation of the {@code CREATE JOB} pipeline for both real deployment
* and dry-run (SPECIFY) modes.
*
* @param conn the JDBC connection
* @param create the parsed DDL node
* @param mode whether to CREATE, UPDATE, or SPECIFY
* @return a SpecifyResult (specs are empty for CREATE/UPDATE, YAML for SPECIFY)
* @throws SQLException on validation or deployment errors
*/
static SpecifyResult processCreateJob(HoptimatorConnection conn,
SqlCreateJob create, DdlMode mode) throws SQLException {
HoptimatorConnection.HoptimatorConnectionDualLogger logger = conn.getLogger(HoptimatorDdlUtils.class);

logger.info("Validating statement: {}", create);
ValidationService.validateOrThrow(create);

if (create.name.names.size() > 1) {
throw new SQLException("Job names cannot be compound identifiers.");
}
String name = create.name.names.get(0);
String sqlBody = ((SqlLiteral) create.sqlBody).getValueAs(String.class);

// Split SQL body on semicolons into individual statements
List<String> sqlStatements = new ArrayList<>();
for (String stmt : sqlBody.split(";")) {
String trimmed = stmt.trim();
if (!trimmed.isEmpty()) {
sqlStatements.add(trimmed);
}
}
if (sqlStatements.isEmpty()) {
throw new SQLException("Job " + name + " has no SQL statements.");
}

Map<String, String> jobOptions = options(create.options);
SqlJobDeployable job = new SqlJobDeployable(name, create.dialect, create.executionMode,
sqlStatements, jobOptions);

Collection<Deployer> deployers = null;
try {
logger.info("Validating job {}", name);
ValidationService.validateOrThrow(job);
deployers = DeploymentService.deployers(job, conn);
ValidationService.validateOrThrow(deployers);

List<String> specs = mode.executeDeployers(deployers, conn);
if (mode.mutable()) {
logger.info("Deployed job {}", name);
} else {
DeploymentService.restore(deployers);
}
return new SpecifyResult(specs, null, Collections.singletonList(name));
} catch (SQLException | RuntimeException e) {
logger.info("Failed to deploy job {}", name);
if (deployers != null) {
DeploymentService.restore(deployers);
logger.info("Restored deployable resources for job {}", name);
}
throw e;
}
}

/**
* Returns the YAML specs that would be created for any supported SQL statement —
* {@code CREATE TABLE}, {@code CREATE MATERIALIZED VIEW}, or {@code INSERT INTO}.
Expand All @@ -698,6 +763,10 @@ public static SpecifyResult specifyFromSql(String sql, HoptimatorConnection conn
return processCreateDatabase(conn, (SqlCreateDatabase) sqlNode, DdlMode.SPECIFY);
}

if (sqlNode instanceof SqlCreateJob) {
return processCreateJob(conn, (SqlCreateJob) sqlNode, DdlMode.SPECIFY);
}

if (sqlNode instanceof SqlCreateTable) {
return processCreateTable(conn.createPrepareContext(), conn, (SqlCreateTable) sqlNode, DdlMode.SPECIFY);
}
Expand Down
Loading
Loading