fix: fix query thread quit issue
This commit is contained in:
parent
efbe9ecbdc
commit
89c13efba8
|
@ -77,6 +77,7 @@ void vnodeBufPoolReset(SVBufPool* pPool);
|
|||
|
||||
// vnodeQuery.c
|
||||
int32_t vnodeQueryOpen(SVnode* pVnode);
|
||||
void vnodeQueryPreClose(SVnode *pVnode);
|
||||
void vnodeQueryClose(SVnode* pVnode);
|
||||
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
|
||||
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
|
||||
|
|
|
@ -242,7 +242,10 @@ _err:
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void vnodePreClose(SVnode *pVnode) { vnodeSyncPreClose(pVnode); }
|
||||
void vnodePreClose(SVnode *pVnode) {
|
||||
vnodeQueryPreClose(pVnode);
|
||||
vnodeSyncPreClose(pVnode);
|
||||
}
|
||||
|
||||
void vnodeClose(SVnode *pVnode) {
|
||||
if (pVnode) {
|
||||
|
|
|
@ -28,6 +28,8 @@ int vnodeQueryOpen(SVnode *pVnode) {
|
|||
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), (void **)&pVnode->pQuery, &pVnode->msgCb);
|
||||
}
|
||||
|
||||
void vnodeQueryPreClose(SVnode *pVnode) { qworkerStopAllTasks((void *)pVnode->pQuery); }
|
||||
|
||||
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
|
||||
|
||||
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
|
||||
|
|
|
@ -153,6 +153,16 @@ typedef struct {
|
|||
SSchemaWrapper* qsw;
|
||||
} SSchemaInfo;
|
||||
|
||||
typedef struct {
|
||||
int32_t operatorType;
|
||||
int64_t refId;
|
||||
} SExchangeOpStopInfo;
|
||||
|
||||
typedef struct {
|
||||
SRWLatch lock;
|
||||
SArray* pStopInfo;
|
||||
} STaskStopInfo;
|
||||
|
||||
typedef struct SExecTaskInfo {
|
||||
STaskIdInfo id;
|
||||
uint32_t status;
|
||||
|
@ -171,6 +181,7 @@ typedef struct SExecTaskInfo {
|
|||
SSubplan* pSubplan;
|
||||
struct SOperatorInfo* pRoot;
|
||||
SLocalFetch localFetch;
|
||||
STaskStopInfo stopInfo;
|
||||
} SExecTaskInfo;
|
||||
|
||||
enum {
|
||||
|
|
|
@ -659,6 +659,54 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
|
|||
return pTaskInfo->code;
|
||||
}
|
||||
|
||||
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) {
|
||||
taosWLockLatch(&pTaskInfo->stopInfo.lock);
|
||||
taosArrayPush(pTaskInfo->stopInfo.pStopInfo, pInfo);
|
||||
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t stopInfoComp(void const* lp, void const* rp) {
|
||||
SExchangeOpStopInfo* key = (SExchangeOpStopInfo*)lp;
|
||||
SExchangeOpStopInfo* pInfo = (SExchangeOpStopInfo*)rp;
|
||||
|
||||
if (key->refId < pInfo->refId) {
|
||||
return -1;
|
||||
} else if (key->refId > pInfo->refId) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void qRemoveTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo) {
|
||||
taosWLockLatch(&pTaskInfo->stopInfo.lock);
|
||||
int32_t idx = taosArraySearchIdx(pTaskInfo->stopInfo.pStopInfo, pInfo, stopInfoComp, TD_EQ);
|
||||
if (idx >= 0) {
|
||||
taosArrayRemove(pTaskInfo->stopInfo.pStopInfo, idx);
|
||||
}
|
||||
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void qStopTaskOperators(SExecTaskInfo* pTaskInfo) {
|
||||
taosWLockLatch(&pTaskInfo->stopInfo.lock);
|
||||
|
||||
int32_t num = taosArrayGetSize(pTaskInfo->stopInfo.pStopInfo);
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SExchangeOpStopInfo *pStop = taosArrayGet(pTaskInfo->stopInfo.pStopInfo, i);
|
||||
SExchangeInfo* pExchangeInfo = taosAcquireRef(exchangeObjRefPool, pStop->refId);
|
||||
if (pExchangeInfo) {
|
||||
tsem_post(&pExchangeInfo->ready);
|
||||
taosReleaseRef(exchangeObjRefPool, pStop->refId);
|
||||
}
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pTaskInfo->stopInfo.lock);
|
||||
}
|
||||
|
||||
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
|
||||
|
||||
|
@ -667,7 +715,11 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
|
|||
}
|
||||
|
||||
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
|
||||
|
||||
setTaskKilled(pTaskInfo);
|
||||
|
||||
qStopTaskOperators(pTaskInfo);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -1653,7 +1653,7 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
|
|||
}
|
||||
|
||||
typedef struct SFetchRspHandleWrapper {
|
||||
uint32_t exchangeId;
|
||||
int64_t exchangeId;
|
||||
int32_t sourceIndex;
|
||||
} SFetchRspHandleWrapper;
|
||||
|
||||
|
@ -1873,6 +1873,9 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
|
|||
|
||||
while (1) {
|
||||
tsem_wait(&pExchangeInfo->ready);
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < totalSources; ++i) {
|
||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
|
||||
|
@ -1983,6 +1986,10 @@ static int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) {
|
|||
pOperator->cost.openCost = taosGetTimestampUs() - startTs;
|
||||
|
||||
tsem_wait(&pExchangeInfo->ready);
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
||||
tsem_post(&pExchangeInfo->ready);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -2002,6 +2009,9 @@ static int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
|
|||
|
||||
doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current);
|
||||
tsem_wait(&pExchangeInfo->ready);
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||
}
|
||||
|
||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, pExchangeInfo->current);
|
||||
SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current);
|
||||
|
@ -2217,6 +2227,9 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
|
|||
pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc);
|
||||
pInfo->pResultBlockList = taosArrayInit(1, POINTER_BYTES);
|
||||
|
||||
SExchangeOpStopInfo stopInfo = {QUERY_NODE_PHYSICAL_PLAN_EXCHANGE, pInfo->self};
|
||||
qAppendTaskStopInfo(pTaskInfo, &stopInfo);
|
||||
|
||||
pInfo->seqLoadData = false;
|
||||
pInfo->pTransporter = pTransporter;
|
||||
|
||||
|
@ -3219,6 +3232,7 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
|
|||
pTaskInfo->id.queryId = queryId;
|
||||
pTaskInfo->execModel = model;
|
||||
pTaskInfo->pTableInfoList = tableListCreate();
|
||||
pTaskInfo->stopInfo.pStopInfo = taosArrayInit(4, sizeof(SExchangeOpStopInfo));
|
||||
|
||||
char* p = taosMemoryCalloc(1, 128);
|
||||
snprintf(p, 128, "TID:0x%" PRIx64 " QID:0x%" PRIx64, taskId, queryId);
|
||||
|
@ -3832,6 +3846,7 @@ void doDestroyTask(SExecTaskInfo* pTaskInfo) {
|
|||
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pTaskInfo->stopInfo.pStopInfo);
|
||||
taosMemoryFreeClear(pTaskInfo->sql);
|
||||
taosMemoryFreeClear(pTaskInfo->id.str);
|
||||
taosMemoryFreeClear(pTaskInfo);
|
||||
|
|
|
@ -91,11 +91,53 @@ _return:
|
|||
|
||||
void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) {
|
||||
QW_LOCK(QW_READ, &sch->tasksLock);
|
||||
QW_DLOG("the %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs,
|
||||
taosHashGetSize(sch->tasksHash));
|
||||
int32_t taskNum = taosHashGetSize(sch->tasksHash);
|
||||
QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum);
|
||||
|
||||
uint64_t qId, tId;
|
||||
int32_t eId;
|
||||
SQWTaskStatus *pTask = NULL;
|
||||
void *pIter = taosHashIterate(sch->tasksHash, NULL);
|
||||
while (pIter) {
|
||||
pTask = (SQWTaskStatus *)pIter;
|
||||
void *key = taosHashGetKey(pIter, NULL);
|
||||
QW_GET_QTID(key, qId, tId, eId);
|
||||
|
||||
QW_TASK_DLOG("job refId:%" PRIx64 ", code:%x, task status:%d", pTask->refId, pTask->code, pTask->status);
|
||||
|
||||
pIter = taosHashIterate(sch->tasksHash, pIter);
|
||||
}
|
||||
|
||||
QW_UNLOCK(QW_READ, &sch->tasksLock);
|
||||
}
|
||||
|
||||
void qwDbgDumpTasksInfo(SQWorker *mgmt) {
|
||||
QW_DUMP("***Total remain ctx num %d", taosHashGetSize(mgmt->ctxHash));
|
||||
|
||||
int32_t i = 0;
|
||||
SQWTaskCtx *ctx = NULL;
|
||||
uint64_t qId, tId;
|
||||
int32_t eId;
|
||||
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
||||
while (pIter) {
|
||||
ctx = (SQWTaskCtx *)pIter;
|
||||
void *key = taosHashGetKey(pIter, NULL);
|
||||
QW_GET_QTID(key, qId, tId, eId);
|
||||
|
||||
QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, msgType:%d, fetchType:%d, "
|
||||
"execId:%x, level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
|
||||
"rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbFName:%s, sver:%d, tver:%d, events:%d,%d,%d,%d,%d",
|
||||
ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->msgType,
|
||||
ctx->fetchType, ctx->execId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue,
|
||||
ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, ctx->tbInfo.tbFName,
|
||||
ctx->tbInfo.sversion, ctx->tbInfo.tversion, ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY],
|
||||
ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]);
|
||||
|
||||
pIter = taosHashIterate(mgmt->ctxHash, pIter);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
|
||||
if (!gQWDebug.dumpEnable) {
|
||||
return;
|
||||
|
@ -120,7 +162,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) {
|
|||
|
||||
QW_UNLOCK(QW_READ, &mgmt->schLock);
|
||||
|
||||
QW_DUMP("total remain ctx num %d", taosHashGetSize(mgmt->ctxHash));
|
||||
qwDbgDumpTasksInfo(mgmt);
|
||||
}
|
||||
|
||||
int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) {
|
||||
|
|
|
@ -281,9 +281,11 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) {
|
|||
|
||||
int32_t qwKillTaskHandle(SQWTaskCtx *ctx) {
|
||||
int32_t code = 0;
|
||||
|
||||
// Note: free/kill may in RC
|
||||
qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle);
|
||||
if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) {
|
||||
qDebug("start to kill task");
|
||||
code = qAsyncKillTask(taskHandle);
|
||||
atomic_store_ptr(&ctx->taskHandle, taskHandle);
|
||||
}
|
||||
|
|
|
@ -1162,6 +1162,41 @@ _return:
|
|||
QW_RET(code);
|
||||
}
|
||||
|
||||
void qworkerStopAllTasks(void *qWorkerMgmt) {
|
||||
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
|
||||
|
||||
QW_DLOG("start to stop all tasks, taskNum:%d", taosHashGetSize(mgmt->ctxHash));
|
||||
|
||||
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
|
||||
while (pIter) {
|
||||
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
|
||||
void *key = taosHashGetKey(pIter, NULL);
|
||||
QW_GET_QTID(key, qId, tId, eId);
|
||||
|
||||
QW_LOCK(QW_WRITE, &ctx->lock);
|
||||
|
||||
QW_TASK_DLOG_E("start to force stop task");
|
||||
|
||||
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
|
||||
QW_TASK_WLOG_E("task already dropping");
|
||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||
|
||||
pIter = taosHashIterate(mgmt->ctxHash, pIter);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (QW_QUERY_RUNNING(ctx)) {
|
||||
qwKillTaskHandle(ctx);
|
||||
} else {
|
||||
qwDropTask(QW_FPARAMS());
|
||||
}
|
||||
|
||||
QW_UNLOCK(QW_WRITE, &ctx->lock);
|
||||
|
||||
pIter = taosHashIterate(mgmt->ctxHash, pIter);
|
||||
}
|
||||
}
|
||||
|
||||
void qWorkerDestroy(void **qWorkerMgmt) {
|
||||
if (NULL == qWorkerMgmt || NULL == *qWorkerMgmt) {
|
||||
return;
|
||||
|
|
Loading…
Reference in New Issue