fix: make kill query work for sysscanoperator
This commit is contained in:
parent
b658608541
commit
5c276fa547
|
@ -99,7 +99,7 @@ struct SExecTaskInfo {
|
||||||
void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);
|
void buildTaskId(uint64_t taskId, uint64_t queryId, char* dst);
|
||||||
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI);
|
SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOPTR_EXEC_MODEL model, SStorageAPI* pAPI);
|
||||||
void doDestroyTask(SExecTaskInfo* pTaskInfo);
|
void doDestroyTask(SExecTaskInfo* pTaskInfo);
|
||||||
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
|
bool isTaskKilled(void* pTaskInfo);
|
||||||
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
|
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode);
|
||||||
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status);
|
||||||
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId,
|
||||||
|
|
|
@ -191,7 +191,8 @@ int32_t getProperSortPageSize(size_t rowSize, uint32_t numOfCols);
|
||||||
bool tsortIsClosed(SSortHandle* pHandle);
|
bool tsortIsClosed(SSortHandle* pHandle);
|
||||||
void tsortSetClosed(SSortHandle* pHandle);
|
void tsortSetClosed(SSortHandle* pHandle);
|
||||||
|
|
||||||
void setSingleTableMerge(SSortHandle* pHandle);
|
void tsortSetSingleTableMerge(SSortHandle* pHandle);
|
||||||
|
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -59,7 +59,7 @@ SExecTaskInfo* doCreateTask(uint64_t queryId, uint64_t taskId, int32_t vgId, EOP
|
||||||
return pTaskInfo;
|
return pTaskInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isTaskKilled(SExecTaskInfo* pTaskInfo) { return (0 != pTaskInfo->code); }
|
bool isTaskKilled(void* pTaskInfo) { return (0 != ((SExecTaskInfo*)pTaskInfo)->code); }
|
||||||
|
|
||||||
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) {
|
void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) {
|
||||||
pTaskInfo->code = rspCode;
|
pTaskInfo->code = rspCode;
|
||||||
|
|
|
@ -2928,8 +2928,9 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_BLOCK_TS_MERGE, pInfo->bufPageSize, numOfBufPage,
|
||||||
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
|
||||||
|
|
||||||
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
|
tsortSetMergeLimit(pInfo->pSortHandle, mergeLimit);
|
||||||
|
tsortSetAbortCheckFn(pInfo->pSortHandle, isTaskKilled, pOperator->pTaskInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, getBlockForTableMergeScan, NULL, NULL);
|
||||||
|
@ -2949,7 +2950,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (numOfTable == 1) {
|
if (numOfTable == 1) {
|
||||||
setSingleTableMerge(pInfo->pSortHandle);
|
tsortSetSingleTableMerge(pInfo->pSortHandle);
|
||||||
} else {
|
} else {
|
||||||
code = tsortOpen(pInfo->pSortHandle);
|
code = tsortOpen(pInfo->pSortHandle);
|
||||||
}
|
}
|
||||||
|
|
|
@ -1601,6 +1601,11 @@ static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
SSysTableScanInfo* pInfo = pOperator->info;
|
SSysTableScanInfo* pInfo = pOperator->info;
|
||||||
char dbName[TSDB_DB_NAME_LEN] = {0};
|
char dbName[TSDB_DB_NAME_LEN] = {0};
|
||||||
|
|
||||||
|
if (isTaskKilled(pOperator->pTaskInfo)) {
|
||||||
|
setOperatorCompleted(pOperator);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
|
||||||
const char* name = tNameGetTableName(&pInfo->name);
|
const char* name = tNameGetTableName(&pInfo->name);
|
||||||
|
|
|
@ -71,12 +71,20 @@ struct SSortHandle {
|
||||||
SMultiwayMergeTreeInfo* pMergeTree;
|
SMultiwayMergeTreeInfo* pMergeTree;
|
||||||
|
|
||||||
bool singleTableMerge;
|
bool singleTableMerge;
|
||||||
|
|
||||||
|
bool (*abortCheckFn)(void* param);
|
||||||
|
void* abortCheckParam;
|
||||||
};
|
};
|
||||||
|
|
||||||
void setSingleTableMerge(SSortHandle* pHandle) {
|
void tsortSetSingleTableMerge(SSortHandle* pHandle) {
|
||||||
pHandle->singleTableMerge = true;
|
pHandle->singleTableMerge = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tsortSetAbortCheckFn(SSortHandle *pHandle, bool (*checkFn)(void *), void* param) {
|
||||||
|
pHandle->abortCheckFn = checkFn;
|
||||||
|
pHandle->abortCheckParam = param;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
|
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
|
||||||
|
|
||||||
// | offset[0] | offset[1] |....| nullbitmap | data |...|
|
// | offset[0] | offset[1] |....| nullbitmap | data |...|
|
||||||
|
@ -726,11 +734,10 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
|
|
||||||
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
|
SArray* pPageIdList = taosArrayInit(4, sizeof(int32_t));
|
||||||
while (1) {
|
while (1) {
|
||||||
if (tsortIsClosed(pHandle)) {
|
if (tsortIsClosed(pHandle) || (pHandle->abortCheckFn && pHandle->abortCheckFn(pHandle->abortCheckParam))) {
|
||||||
code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
|
code = terrno = TSDB_CODE_TSC_QUERY_CANCELLED;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
|
SSDataBlock* pDataBlock = getSortedBlockDataInner(pHandle, &pHandle->cmpParam, numOfRows);
|
||||||
if (pDataBlock == NULL) {
|
if (pDataBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue