Skip to content

Commit f97ceae

Browse files
committed
[KYUUBI #7248] Ensure jdbc engine statements are canceled when receive cancel operation
1 parent 5887e76 commit f97ceae

File tree

5 files changed

+69
-6
lines changed

5 files changed

+69
-6
lines changed

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/dialect/JdbcDialect.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,18 @@ abstract class JdbcDialect extends SupportServiceLoader with Logging {
8181
def getTRowSetGenerator(): JdbcTRowSetGenerator
8282

8383
def getSchemaHelper(): SchemaHelper
84+
85+
def cancelStatement(jdbcStatement: Statement): Unit = {
86+
if (jdbcStatement != null) {
87+
jdbcStatement.cancel()
88+
}
89+
}
90+
91+
def closeStatement(jdbcStatement: Statement): Unit = {
92+
if (jdbcStatement != null && !jdbcStatement.isClosed) {
93+
jdbcStatement.close()
94+
}
95+
}
8496
}
8597

8698
object JdbcDialects extends Logging {

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/ExecuteStatement.scala

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,12 +120,26 @@ class ExecuteStatement(
120120
super.validateFetchOrientation(order)
121121
}
122122

123+
override def cancel(): Unit = withLockRequired {
124+
if (!isTerminalState(state)) {
125+
setState(OperationState.CANCELED)
126+
if (jdbcStatement != null) {
127+
dialect.cancelStatement(jdbcStatement)
128+
jdbcStatement = null
129+
}
130+
}
131+
}
132+
123133
override def cleanup(targetState: OperationState): Unit = withLockRequired {
124134
try {
125135
super.cleanup(targetState)
126136
} finally {
127-
if (jdbcStatement != null && !jdbcStatement.isClosed) {
128-
jdbcStatement.close()
137+
if (jdbcStatement != null) {
138+
if (targetState == OperationState.CANCELED) {
139+
dialect.cancelStatement(jdbcStatement)
140+
} else {
141+
dialect.closeStatement(jdbcStatement)
142+
}
129143
jdbcStatement = null
130144
}
131145
}

externals/kyuubi-jdbc-engine/src/main/scala/org/apache/kyuubi/engine/jdbc/operation/JdbcOperation.scala

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,6 @@ abstract class JdbcOperation(session: Session) extends AbstractOperation(session
6262
resp
6363
}
6464

65-
override def cancel(): Unit = {
66-
cleanup(OperationState.CANCELED)
67-
}
68-
6965
override def close(): Unit = {
7066
cleanup(OperationState.CLOSED)
7167
}

externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/mysql/OperationWithEngineSuite.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,23 @@ class OperationWithEngineSuite extends MySQLOperationSuite with HiveJDBCTestHelp
7575
assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
7676
}
7777
}
78+
79+
test("Mysql - JDBC ExecuteStatement operation cancel should kill SQL statement") {
80+
withSessionHandle { (client, handle) =>
81+
val tExecuteStatementReq = new TExecuteStatementReq()
82+
tExecuteStatementReq.setSessionHandle(handle)
83+
tExecuteStatementReq.setStatement("SELECT sleep(1200)")
84+
tExecuteStatementReq.setRunAsync(true)
85+
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
86+
assert(tExecuteStatementResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
87+
88+
Thread.sleep(1000) // wait for statement to start executing
89+
90+
val tCancelOperationReq = new TCancelOperationReq()
91+
tCancelOperationReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
92+
val TCancelOperationReq = client.CancelOperation(tCancelOperationReq)
93+
assert(TCancelOperationReq.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
94+
// If the statement is not cancelled successfully, will block here until 1200s
95+
}
96+
}
7897
}

externals/kyuubi-jdbc-engine/src/test/scala/org/apache/kyuubi/engine/jdbc/starrocks/StarRocksOperationWithEngineSuite.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,4 +75,26 @@ class StarRocksOperationWithEngineSuite extends StarRocksOperationSuite with Hiv
7575
assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
7676
}
7777
}
78+
79+
test("starrocks - JDBC ExecuteStatement operation cancel should kill SQL statement") {
80+
withSessionHandle { (client, handle) =>
81+
val tExecuteStatementReq = new TExecuteStatementReq()
82+
tExecuteStatementReq.setSessionHandle(handle)
83+
tExecuteStatementReq.setStatement("SELECT sleep(1200)")
84+
tExecuteStatementReq.setRunAsync(true)
85+
val tExecuteStatementResp = client.ExecuteStatement(tExecuteStatementReq)
86+
87+
// todo:we need to improvement the cancel mechanism in starrocks server side,
88+
// once a query submitted, we should be able to cancel it immediately.
89+
Thread.sleep(1000) // wait for statement to start executing
90+
91+
val tCancelOperationReq = new TCancelOperationReq()
92+
tCancelOperationReq.setOperationHandle(tExecuteStatementResp.getOperationHandle)
93+
94+
val tFetchResultsResp = client.CancelOperation(tCancelOperationReq)
95+
assert(tFetchResultsResp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
96+
// If the statement is not cancelled successfully, will block here until 1200s
97+
}
98+
99+
}
78100
}

0 commit comments

Comments
 (0)