diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java index aabaf01e6..5ae73c5ef 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/ArrowFlightJdbcFlightStreamResultSet.java @@ -28,6 +28,7 @@ import org.apache.arrow.driver.jdbc.utils.FlightEndpointDataQueue; import org.apache.arrow.driver.jdbc.utils.VectorSchemaRootTransformer; import org.apache.arrow.flight.FlightInfo; +import org.apache.arrow.flight.FlightRuntimeException; import org.apache.arrow.flight.FlightStream; import org.apache.arrow.util.AutoCloseables; import org.apache.arrow.vector.VectorSchemaRoot; @@ -218,6 +219,12 @@ public boolean next() throws SQLException { @Override protected void cancel() { super.cancel(); + try { + connection.getClientHandler().cancelFlightInfo(flightInfo); + } catch (final FlightRuntimeException e) { + + } + final CloseableEndpointStreamPair currentEndpoint = this.currentEndpointData; if (currentEndpoint != null) { currentEndpoint.getStream().cancel("Cancel", null); diff --git a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java index a3f690037..dda1a3d66 100644 --- a/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java +++ b/flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java @@ -35,22 +35,7 @@ import org.apache.arrow.driver.jdbc.client.utils.ClientAuthenticationUtils; import org.apache.arrow.driver.jdbc.client.utils.FlightClientCache; import org.apache.arrow.driver.jdbc.client.utils.FlightLocationQueue; -import org.apache.arrow.flight.CallOption; -import org.apache.arrow.flight.CallStatus; -import org.apache.arrow.flight.CloseSessionRequest; -import org.apache.arrow.flight.FlightClient; -import org.apache.arrow.flight.FlightClientMiddleware; -import org.apache.arrow.flight.FlightEndpoint; -import org.apache.arrow.flight.FlightGrpcUtils; -import org.apache.arrow.flight.FlightInfo; -import org.apache.arrow.flight.FlightRuntimeException; -import org.apache.arrow.flight.FlightStatusCode; -import org.apache.arrow.flight.Location; -import org.apache.arrow.flight.LocationSchemes; -import org.apache.arrow.flight.SessionOptionValue; -import org.apache.arrow.flight.SessionOptionValueFactory; -import org.apache.arrow.flight.SetSessionOptionsRequest; -import org.apache.arrow.flight.SetSessionOptionsResult; +import org.apache.arrow.flight.*; import org.apache.arrow.flight.auth2.BearerCredentialWriter; import org.apache.arrow.flight.auth2.ClientBearerHeaderHandler; import org.apache.arrow.flight.auth2.ClientIncomingAuthHeaderMiddleware; @@ -490,6 +475,15 @@ public FlightInfo getTables( catalog, schemaPattern, tableNamePattern, types, includeSchema, getOptions()); } + /** + * Cancel execution of a distributed query. + * + * @return The server response. + */ + public CancelFlightInfoResult cancelFlightInfo(FlightInfo info) { + return sqlClient.cancelFlightInfo(new CancelFlightInfoRequest(info), getOptions()); + } + /** * Gets SQL info. *