fix: fix query thread quitting issue
This commit is contained in:
parent
89c13efba8
commit
f60441e2dd
|
@ -94,6 +94,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
|
|||
|
||||
int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SDeleteRes *pRes);
|
||||
|
||||
void qWorkerStopAllTasks(void *qWorkerMgmt);
|
||||
|
||||
void qWorkerDestroy(void **qWorkerMgmt);
|
||||
|
||||
int32_t qWorkerGetStat(SReadHandle *handle, void *qWorkerMgmt, SQWorkerStat *pStat);
|
||||
|
|
|
@ -385,6 +385,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR TAOS_DEF_ERROR_CODE(0, 0x072D)
|
||||
#define TSDB_CODE_QRY_JSON_IN_GROUP_ERROR TAOS_DEF_ERROR_CODE(0, 0x072E)
|
||||
#define TSDB_CODE_QRY_JOB_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x072F)
|
||||
#define TSDB_CODE_QRY_QWORKER_QUIT TAOS_DEF_ERROR_CODE(0, 0x0730)
|
||||
|
||||
// grant
|
||||
#define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800)
|
||||
|
|
|
@ -28,7 +28,7 @@ int vnodeQueryOpen(SVnode *pVnode) {
|
|||
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb);
|
||||
}
|
||||
|
||||
void vnodeQueryPreClose(SVnode *pVnode) { qworkerStopAllTasks((void *)pVnode->pQuery); }
|
||||
void vnodeQueryPreClose(SVnode *pVnode) { qWorkerStopAllTasks((void *)pVnode->pQuery); }
|
||||
|
||||
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
|
||||
|
||||
|
|
|
@ -1058,6 +1058,7 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul
|
|||
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
|
||||
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
|
||||
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
|
||||
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -688,7 +688,7 @@ void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) {
|
|||
}
|
||||
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
return;
|
||||
}
|
||||
|
||||
void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
|
||||
|
|
|
@ -246,7 +246,7 @@ typedef struct SQWorkerMgmt {
|
|||
|
||||
#define QW_ERR_RET(c) \
|
||||
do { \
|
||||
int32_t _code = c; \
|
||||
int32_t _code = (c); \
|
||||
if (_code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = _code; \
|
||||
return _code; \
|
||||
|
@ -254,7 +254,7 @@ typedef struct SQWorkerMgmt {
|
|||
} while (0)
|
||||
#define QW_RET(c) \
|
||||
do { \
|
||||
int32_t _code = c; \
|
||||
int32_t _code = (c); \
|
||||
if (_code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = _code; \
|
||||
} \
|
||||
|
@ -262,7 +262,7 @@ typedef struct SQWorkerMgmt {
|
|||
} while (0)
|
||||
#define QW_ERR_JRET(c) \
|
||||
do { \
|
||||
code = c; \
|
||||
code = (c); \
|
||||
if (code != TSDB_CODE_SUCCESS) { \
|
||||
terrno = code; \
|
||||
goto _return; \
|
||||
|
|
|
@ -683,6 +683,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
|||
bool queryStop = false;
|
||||
|
||||
do {
|
||||
ctx = NULL;
|
||||
|
||||
QW_ERR_JRET(qwHandlePrePhaseEvents(QW_FPARAMS(), QW_PHASE_PRE_CQUERY, &input, NULL));
|
||||
|
||||
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
||||
|
@ -1162,11 +1164,13 @@ _return:
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
void qworkerStopAllTasks(void *qWorkerMgmt) {
|
||||
void qWorkerStopAllTasks(void *qWorkerMgmt) {
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
|
||||
|
||||
|
||||
uint64_t qId, tId;
|
||||
int32_t eId;
|
||||
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
||||
while (pIter) {
|
||||
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
||||
|
@ -1187,8 +1191,6 @@ void qworkerStopAllTasks(void *qWorkerMgmt) {
|
|||
|
||||
if (QW_QUERY_RUNNING(ctx)) {
|
||||
qwKillTaskHandle(ctx);
|
||||
} else {
|
||||
qwDropTask(QW_FPARAMS());
|
||||
}
|
||||
|
||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||
|
|
|
@ -377,6 +377,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_ERROR, "Json not support in i
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_NOT_SUPPORT_ERROR, "Json not support in this place")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JSON_IN_GROUP_ERROR, "Json not support in group/partition by")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_NOT_EXIST, "Job not exist")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_QWORKER_QUIT, "Vnode/Qnode is quitting")
|
||||
|
||||
// grant
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, "License expired")
|
||||
|
|
Loading…
Reference in New Issue