From f1e37cde780b182fce99a22be55c617fae7eb403 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 15 Jul 2022 10:21:47 +0800 Subject: [PATCH 01/10] fix: fix msg disorder issue --- source/libs/qworker/inc/qwInt.h | 2 +- source/libs/qworker/src/qwDbg.c | 2 +- source/libs/qworker/src/qwMsg.c | 3 +-- source/libs/qworker/src/qworker.c | 38 +++++++++++-------------------- 4 files changed, 16 insertions(+), 29 deletions(-) diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 539643c390..6c9871425b 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -135,7 +135,6 @@ typedef struct SQWTaskCtx { int32_t execId; bool queryRsped; - bool queryFetched; bool queryEnd; bool queryContinue; bool queryInQueue; @@ -228,6 +227,7 @@ typedef struct SQWorkerMgmt { #define QW_SET_EVENT_PROCESSED(ctx, event) atomic_store_8(&(ctx)->events[event], QW_EVENT_PROCESSED) #define QW_GET_PHASE(ctx) atomic_load_8(&(ctx)->phase) +#define QW_SET_PHASE(ctx, _value) do { if ((_value) != QW_PHASE_PRE_FETCH && (_value) != QW_PHASE_POST_FETCH) { atomic_store_8(&(ctx)->phase, _value); } } while (0) #define QW_SET_RSP_CODE(ctx, code) atomic_store_32(&(ctx)->rspCode, code) #define QW_UPDATE_RSP_CODE(ctx, code) atomic_val_compare_exchange_32(&(ctx)->rspCode, 0, code) diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 869eedf8f6..66a89cf57a 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -166,7 +166,7 @@ int32_t qwDbgResponseREdirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { } if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) { - ctx->phase = QW_PHASE_POST_QUERY; + QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); gQWDebug.tmp = false; return TSDB_CODE_SUCCESS; diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 73110472f7..e1f16b9719 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -374,8 +374,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int qwMsg.msgInfo.needFetch = msg->needFetch; char * sql = strndup(msg->msg, msg->sqlLen); - QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, sql:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql); - + QW_SCH_TASK_DLOG("processQuery start, node:%p, type:%s, handle:%p, SQL:%s", node, TMSG_INFO(pMsg->msgType), pMsg->info.handle, sql); QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, sql)); QW_SCH_TASK_DLOG("processQuery end, node:%p", node); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 3e8ced318c..fb8ce09615 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -293,11 +293,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_LOCK(QW_WRITE, &ctx->lock); - if (QW_PHASE_PRE_FETCH == phase) { - atomic_store_8((int8_t *)&ctx->queryFetched, true); - } else { - atomic_store_8(&ctx->phase, phase); - } + QW_SET_PHASE(ctx, phase); if (atomic_load_8((int8_t *)&ctx->queryEnd)) { QW_TASK_ELOG_E("query already end"); @@ -370,6 +366,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } _return: + if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); @@ -390,7 +387,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp int32_t code = 0; SQWTaskCtx *ctx = NULL; SRpcHandleInfo connInfo = {0}; - SRpcHandleInfo *rspConnection = NULL; QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); @@ -403,13 +399,6 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } - if (QW_PHASE_POST_QUERY == phase) { - connInfo = ctx->ctrlConnInfo; - rspConnection = &connInfo; - - ctx->queryRsped = true; - } - if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { if (QW_PHASE_POST_FETCH == phase) { QW_TASK_WLOG("drop received at wrong phase %s", qwPhaseStr(phase)); @@ -437,17 +426,16 @@ _return: qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC); } - if (rspConnection) { - qwBuildAndSendQueryRsp(input->msgType + 1, rspConnection, code, ctx); - QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", rspConnection->handle, code, tstrerror(code)); + if (QW_PHASE_POST_QUERY == phase) { + ctx->queryRsped = true; + qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); } if (ctx) { QW_UPDATE_RSP_CODE(ctx, code); - if (QW_PHASE_POST_FETCH != phase) { - atomic_store_8(&ctx->phase, phase); - } + QW_SET_PHASE(ctx, phase); QW_UNLOCK(QW_WRITE, &ctx->lock); qwReleaseTaskCtx(mgmt, ctx); @@ -634,8 +622,8 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_LOCK(QW_WRITE, &ctx->lock); if (queryEnd || code || 0 == atomic_load_8((int8_t *)&ctx->queryContinue)) { - // Note: if necessary, fetch need to put cquery to queue again - atomic_store_8(&ctx->phase, 0); + // Note: query is not running anymore + QW_SET_PHASE(ctx, 0); QW_UNLOCK(QW_WRITE, &ctx->lock); break; } @@ -722,7 +710,7 @@ _return: int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { int32_t code = 0; - bool rsped = false; + bool dropped = false; SQWTaskCtx *ctx = NULL; bool locked = false; @@ -740,14 +728,14 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); - } else if (ctx->phase > 0) { + } else if (QW_GET_PHASE(ctx) > 0) { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - rsped = true; + dropped = true; } else { // task not started } - if (!rsped) { + if (!dropped) { ctx->ctrlConnInfo = qwMsg->connInfo; QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); From c67fa0c1ce604ab4d6b738a5282e2c93f897ac45 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 15 Jul 2022 14:17:00 +0800 Subject: [PATCH 02/10] enh: add taosd query exit processing --- include/libs/transport/trpc.h | 2 +- source/client/src/clientHb.c | 4 +++- source/libs/qworker/inc/qwInt.h | 4 ++-- source/libs/qworker/src/qwUtil.c | 26 +++++++++++++++++--------- source/libs/qworker/src/qworker.c | 4 ++-- source/libs/scheduler/src/schRemote.c | 1 + source/libs/transport/src/trans.c | 2 +- 7 files changed, 27 insertions(+), 16 deletions(-) diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index 7441b38321..2ae1f7b854 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -135,7 +135,7 @@ void rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg int32_t rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo); void rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); void rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); -int64_t rpcAllocHandle(); +void* rpcAllocHandle(); #ifdef __cplusplus } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index d7c2c26d23..f16937c836 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -171,6 +171,7 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) { pTscObj->pAppInfo->totalDnodes = pRsp->query->totalDnodes; pTscObj->pAppInfo->onlineDnodes = pRsp->query->onlineDnodes; pTscObj->connId = pRsp->query->connId; + tscTrace("conn %p hb rsp, dnodes %d/%d", pTscObj->connId, pTscObj->pAppInfo->onlineDnodes, pTscObj->pAppInfo->totalDnodes); if (pRsp->query->killRid) { tscDebug("request rid %" PRIx64 " need to be killed now", pRsp->query->killRid); @@ -286,7 +287,8 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { taosMemoryFreeClear(param); if (code != 0) { - (*pInst)->onlineDnodes = 0; + (*pInst)->onlineDnodes = ((*pInst)->totalDnodes ? 0 : -1); + tscDebug("hb rsp error %s, update server status %d/%d", tstrerror(code), (*pInst)->onlineDnodes, (*pInst)->totalDnodes); } if (rspNum) { diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 6c9871425b..2cc2993601 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -362,7 +362,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx); -int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx); +int32_t qwKillTaskHandle(SQWTaskCtx *ctx); int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status); int32_t qwDropTask(QW_FPARAMS_DEF); void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); @@ -372,7 +372,7 @@ int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); void qwClearExpiredSch(SQWorker *mgmt, SArray* pExpiredSch); int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); -void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx); +void qwFreeTaskCtx(SQWTaskCtx *ctx); void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 9c49cbcb1f..1d5d9a989a 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -270,7 +270,7 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTask void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); } -void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) { +void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { // Note: free/kill may in RC qTaskInfo_t otaskHandle = atomic_load_ptr(taskHandle); if (otaskHandle && atomic_val_compare_exchange_ptr(taskHandle, otaskHandle, NULL)) { @@ -278,7 +278,7 @@ void qwFreeTaskHandle(QW_FPARAMS_DEF, qTaskInfo_t *taskHandle) { } } -int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { +int32_t qwKillTaskHandle(SQWTaskCtx *ctx) { int32_t code = 0; // Note: free/kill may in RC qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); @@ -290,7 +290,7 @@ int32_t qwKillTaskHandle(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { QW_RET(code); } -void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { +void qwFreeTaskCtx(SQWTaskCtx *ctx) { if (ctx->ctrlConnInfo.handle) { tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER); } @@ -300,7 +300,7 @@ void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { // NO need to release dataConnInfo - qwFreeTaskHandle(QW_FPARAMS(), &ctx->taskHandle); + qwFreeTaskHandle(&ctx->taskHandle); if (ctx->sinkHandle) { dsDestroyDataSinker(ctx->sinkHandle); @@ -336,7 +336,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } - qwFreeTaskCtx(QW_FPARAMS(), &octx); + qwFreeTaskCtx(&octx); QW_TASK_DLOG_E("task ctx dropped"); @@ -463,13 +463,21 @@ void qwDestroyImpl(void *pMgmt) { mgmt->hbTimer = NULL; taosTmrCleanUp(mgmt->timer); - // TODO STOP ALL QUERY - - // TODO FREE ALL + uint64_t qId, tId; + int32_t eId; + void *pIter = taosHashIterate(mgmt->ctxHash, NULL); + while (pIter) { + SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; + void *key = taosHashGetKey(pIter, NULL); + QW_GET_QTID(key, qId, tId, eId); + qwFreeTaskCtx(ctx); + QW_TASK_DLOG("task ctx freed"); + pIter = taosHashIterate(mgmt->ctxHash, pIter); + } taosHashCleanup(mgmt->ctxHash); - void *pIter = taosHashIterate(mgmt->schHash, NULL); + pIter = taosHashIterate(mgmt->schHash, NULL); while (pIter) { SQWSchStatus *sch = (SQWSchStatus *)pIter; qwDestroySchStatus(sch); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index fb8ce09615..b15a3bd293 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -726,7 +726,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (QW_QUERY_RUNNING(ctx)) { - QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); + QW_ERR_JRET(qwKillTaskHandle(ctx)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); } else if (QW_GET_PHASE(ctx) > 0) { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); @@ -940,7 +940,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { _return: - qwFreeTaskCtx(QW_FPARAMS(), &ctx); + qwFreeTaskCtx(&ctx); QW_RET(TSDB_CODE_SUCCESS); } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index d195c22c37..41d9f46a87 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1010,6 +1010,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); persistHandle = true; + SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); break; } case TDMT_SCH_FETCH: diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 8a16b20a6f..725f3b32cf 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -170,7 +170,7 @@ void rpcSetDefaultAddr(void* thandle, const char* ip, const char* fqdn) { transSetDefaultAddr(thandle, ip, fqdn); } -int64_t rpcAllocHandle() { return transAllocHandle(); } +void* rpcAllocHandle() { return (void*)transAllocHandle(); } int32_t rpcInit() { transInit(); From c5675bc8212bf2c4942fae29f69eb67f2bc5d18c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 15 Jul 2022 14:40:42 +0800 Subject: [PATCH 03/10] fix: fix msg disorder issue --- source/libs/qworker/src/qwUtil.c | 2 +- source/libs/transport/src/transCli.c | 8 +++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 1d5d9a989a..b56cb29628 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -472,7 +472,7 @@ void qwDestroyImpl(void *pMgmt) { QW_GET_QTID(key, qId, tId, eId); qwFreeTaskCtx(ctx); - QW_TASK_DLOG("task ctx freed"); + QW_TASK_DLOG_E("task ctx freed"); pIter = taosHashIterate(mgmt->ctxHash, pIter); } taosHashCleanup(mgmt->ctxHash); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 3805787ce2..851824e439 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1206,7 +1206,13 @@ SCliThrd* transGetWorkThrd(STrans* trans, int64_t handle, bool* validHandle) { if (idx < 0) return NULL; return ((SCliObj*)trans->tcphandle)->pThreadObj[idx]; } - return transGetWorkThrdFromHandle(handle, validHandle); + SCliThrd* pThrd = transGetWorkThrdFromHandle(handle, validHandle); + if (*validHandle == true && pThrd == NULL) { + int idx = cliRBChoseIdx(trans); + if (idx < 0) return NULL; + pThrd = ((SCliObj*)trans->tcphandle)->pThreadObj[idx]; + } + return pThrd; } void transReleaseCliHandle(void* handle) { int idx = -1; From c8758058745550dc26f7907e748b266abea7cde6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 15 Jul 2022 15:49:30 +0800 Subject: [PATCH 04/10] refactor code --- source/libs/transport/src/transCli.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 851824e439..3956add423 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1,4 +1,5 @@ /** Copyright (c) 2019 TAOS Data, Inc. + * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 @@ -809,7 +810,7 @@ SCliConn* cliGetConn(SCliMsg* pMsg, SCliThrd* pThrd, bool* ignore) { conn = exh->handle; if (conn == NULL) { conn = getConnFromPool(pThrd->pool, EPSET_GET_INUSE_IP(&pCtx->epSet), EPSET_GET_INUSE_PORT(&pCtx->epSet)); - *ignore = (conn && 0 == specifyConnRef(conn, true, refId)) ? false : true; + if (conn != NULL) specifyConnRef(conn, true, refId); } transReleaseExHandle(transGetRefMgt(), refId); } @@ -849,14 +850,20 @@ void cliHandleReq(SCliMsg* pMsg, SCliThrd* pThrd) { bool ignore = false; SCliConn* conn = cliGetConn(pMsg, pThrd, &ignore); if (ignore == true) { + tError("ignore msg"); return; } + if (conn != NULL) { transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); cliSend(conn); } else { conn = cliCreateConn(pThrd); + + int64_t refId = (int64_t)pMsg->msg.info.handle; + if (refId != 0) specifyConnRef(conn, true, refId); + transCtxMerge(&conn->ctx, &pCtx->appCtx); transQueuePush(&conn->cliMsgs, pMsg); From 02af3581f33095b62ff2946dcb8be1c037434a64 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 15 Jul 2022 16:01:42 +0800 Subject: [PATCH 05/10] enh: add debug info --- source/libs/qworker/inc/qwInt.h | 2 ++ source/libs/qworker/inc/qwMsg.h | 1 + source/libs/qworker/src/qwDbg.c | 42 +++++++++++++++++++++++---- source/libs/qworker/src/qwMsg.c | 14 +++++++++ source/libs/scheduler/src/schRemote.c | 2 +- 5 files changed, 55 insertions(+), 6 deletions(-) diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index d3347f4d1f..3c6c02ede6 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -75,6 +75,8 @@ typedef struct SQWDebug { bool lockEnable; bool statusEnable; bool dumpEnable; + bool sleepSimulate; + bool deadSimulate; bool tmp; } SQWDebug; diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index acb7004a51..16fe6b21c2 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -40,6 +40,7 @@ void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComple int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx); int32_t qwBuildAndSendExplainRsp(SRpcHandleInfo *pConn, SExplainExecInfo *execInfo, int32_t num); +int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code); void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 893f31acd1..cd54c5e5f9 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -9,7 +9,7 @@ #include "tmsg.h" #include "tname.h" -SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = false}; +SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true, .tmp = false}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { @@ -175,29 +175,61 @@ int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { return TSDB_CODE_SUCCESS; } +void qwDbgSimulateSleep() { + if (!gQWDebug.sleepSimulate) { + return; + } + + taosSsleep(taosRand() % 10); +} + +void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t msgType) { + if (!gQWDebug.deadSimulate) { + return; + } + + //FETCH OR QUERY USE DIFFERENT CONNINFO + qwBuildAndSendErrorRsp(msgType + 1, ctx->dataConnInfo, TSDB_CODE_RPC_BROKEN_LINK); + + qwDropTask(QW_FPARAMS()); +} + + int32_t qwDbgEnableDebug(char *option) { if (0 == strcasecmp(option, "lock")) { gQWDebug.lockEnable = true; - qDebug("qw lock debug enabled"); + qError("qw lock debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "status")) { gQWDebug.statusEnable = true; - qDebug("qw status debug enabled"); + qError("qw status debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "dump")) { gQWDebug.dumpEnable = true; - qDebug("qw dump debug enabled"); + qError("qw dump debug enabled"); + return TSDB_CODE_SUCCESS; + } + + if (0 == strcasecmp(option, "sleep")) { + gQWDebug.sleepSimulate = true; + qError("qw sleep debug enabled"); + return TSDB_CODE_SUCCESS; + } + + if (0 == strcasecmp(option, "dead")) { + gQWDebug.sleepSimulate = true; + qError("qw dead debug enabled"); return TSDB_CODE_SUCCESS; } if (0 == strcasecmp(option, "tmp")) { gQWDebug.tmp = true; - qDebug("qw tmp debug enabled"); + qError("qw tmp debug enabled"); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 147ce76fc2..8bbd03f735 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -43,6 +43,20 @@ void qwFreeFetchRsp(void *msg) { } } +int32_t qwBuildAndSendErrorRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code) { + SRpcMsg rpcRsp = { + .msgType = rspType, + .pCont = NULL, + .contLen = 0, + .code = code, + .info = *pConn, + }; + + tmsgSendRsp(&rpcRsp); + + return TSDB_CODE_SUCCESS; +} + int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SQWTaskCtx *ctx) { STbVerInfo* tbInfo = ctx ? &ctx->tbInfo : NULL; int64_t affectedRows = ctx ? ctx->affectedRows : 0; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 41d9f46a87..ec07ee85fd 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1010,7 +1010,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); persistHandle = true; - SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); + //SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); break; } case TDMT_SCH_FETCH: From f91d6dca9e1370d9ae6a7dd2a41e90ac7e17832e Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Fri, 15 Jul 2022 18:07:45 +0800 Subject: [PATCH 06/10] add more debug --- source/client/src/clientImpl.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 5e620d1060..ba75e3b85f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1276,7 +1276,12 @@ int32_t doProcessMsgFromServer(void* param) { assert(pMsg->info.ahandle != NULL); STscObj* pTscObj = NULL; - tscDebug("processMsgFromServer message: %s, code: %s", TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code)); + STraceId* trace = &pMsg->info.traceId; + char tbuf[40] = {0}; + TRACE_TO_STR(trace, tbuf); + + tscDebug("processMsgFromServer message: %s, code: %s, gtid: %s", TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code), + tbuf); if (pSendInfo->requestObjRefId != 0) { SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); From 0048bd8cb634b027d1c504196dcd901791cc350b Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 15 Jul 2022 18:09:27 +0800 Subject: [PATCH 07/10] fix: fix redirect issue --- source/client/src/clientImpl.c | 2 +- source/libs/qworker/src/qwDbg.c | 4 ++-- source/libs/qworker/src/qworker.c | 6 ++---- source/libs/scheduler/inc/schInt.h | 3 ++- source/libs/scheduler/src/schRemote.c | 2 +- source/libs/scheduler/src/schTask.c | 6 ++++-- 6 files changed, 12 insertions(+), 11 deletions(-) diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 5e620d1060..90d16d17a5 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -1276,7 +1276,7 @@ int32_t doProcessMsgFromServer(void* param) { assert(pMsg->info.ahandle != NULL); STscObj* pTscObj = NULL; - tscDebug("processMsgFromServer message: %s, code: %s", TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code)); + tscDebug("processMsgFromServer handle %p, message: %s, code: %s", pMsg->info.handle, TMSG_INFO(pMsg->msgType), tstrerror(pMsg->code)); if (pSendInfo->requestObjRefId != 0) { SRequestObj* pRequest = (SRequestObj*)taosAcquireRef(clientReqRefPool, pSendInfo->requestObjRefId); diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index cd54c5e5f9..7906617a3f 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -188,8 +188,8 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t msgType) { return; } - //FETCH OR QUERY USE DIFFERENT CONNINFO - qwBuildAndSendErrorRsp(msgType + 1, ctx->dataConnInfo, TSDB_CODE_RPC_BROKEN_LINK); + SRpcHandleInfo *pConn = ((msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo : &ctx->ctrlConnInfo); + qwBuildAndSendErrorRsp(msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK); qwDropTask(QW_FPARAMS()); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index a671585014..c763645a07 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -426,7 +426,7 @@ _return: qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_PART_SUCC); } - if (QW_PHASE_POST_QUERY == phase) { + if (QW_PHASE_POST_QUERY == phase && ctx) { ctx->queryRsped = true; qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); @@ -730,11 +730,9 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_QUERY_RUNNING(ctx)) { QW_ERR_JRET(qwKillTaskHandle(ctx)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); - } else if (QW_GET_PHASE(ctx) > 0) { + } else { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); dropped = true; - } else { - // task not started } if (!dropped) { diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index bc0270635d..4979a41f17 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -58,7 +58,7 @@ typedef enum { #define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000 #define SCH_MAX_TASK_TIMEOUT_USEC 60000000 -#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA +#define SCH_MAX_CANDIDATE_EP_NUM (TSDB_MAX_REPLICA + 100) @@ -211,6 +211,7 @@ typedef struct SSchTask { int32_t maxExecTimes; // task max exec times int32_t maxRetryTimes; // task max retry times int32_t retryTimes; // task retry times + bool waitRetry; // wait for retry int32_t execId; // task current execute index SSchLevel *level; // level SRWLatch planLock; // task update plan lock diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index ec07ee85fd..41d9f46a87 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1010,7 +1010,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, memcpy(pMsg->msg + len, pTask->msg, pTask->msgLen); persistHandle = true; - //SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); + SCH_SET_TASK_HANDLE(pTask, rpcAllocHandle()); break; } case TDMT_SCH_FETCH: diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9483ecd6eb..a46b293965 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -125,8 +125,8 @@ int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_ SCH_TASK_DLOG("execId %d removed from execNodeList", execId); } - if (execId != pTask->execId) { // ignore it - SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId); + if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it + SCH_TASK_DLOG("execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); SCH_ERR_RET(TSDB_CODE_SCH_IGNORE_ERROR); } @@ -335,6 +335,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 return TSDB_CODE_SUCCESS; } + pTask->waitRetry = true; schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); SCH_ERR_JRET(schRemoveTaskFromExecList(pJob, pTask)); @@ -790,6 +791,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); pTask->execId++; pTask->retryTimes++; + pTask->waitRetry = false; SCH_TASK_DLOG("start to launch task, execId %d, retry %d", pTask->execId, pTask->retryTimes); From dce0f06e31de54564c04a4eb0ad2af93772e1980 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 15 Jul 2022 20:40:48 +0800 Subject: [PATCH 08/10] fix: fix redirect issue --- source/dnode/mnode/impl/src/mndMain.c | 2 ++ source/libs/qworker/inc/qwInt.h | 4 ++-- source/libs/qworker/src/qwDbg.c | 11 ++++++++++- source/libs/qworker/src/qworker.c | 21 +++++++++++++++------ source/libs/scheduler/src/schTask.c | 10 ++++++++++ 5 files changed, 39 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 29e68ce4e8..7151c4a026 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -42,6 +42,7 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" +#include "qworker.h" static void *mndBuildTimerMsg(int32_t *pContLen) { SMTimerReq timerReq = {0}; @@ -375,6 +376,7 @@ void mndPreClose(SMnode *pMnode) { void mndClose(SMnode *pMnode) { if (pMnode != NULL) { mDebug("start to close mnode"); + qWorkerDestroy((void **)&pMnode->pQuery); mndCleanupSteps(pMnode, -1); taosMemoryFreeClear(pMnode->path); taosMemoryFreeClear(pMnode); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 3c6c02ede6..56af7ee1e0 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -132,7 +132,7 @@ typedef struct SQWTaskCtx { int8_t taskType; int8_t explain; int8_t needFetch; - int32_t queryType; + int32_t msgType; int32_t fetchType; int32_t execId; @@ -380,7 +380,7 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet); int32_t qwAddTaskCtx(QW_FPARAMS_DEF); -int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx); +int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped); #ifdef __cplusplus diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 7906617a3f..4e174018f4 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -147,7 +147,7 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int return TSDB_CODE_SUCCESS; } -int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { +int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { if (gQWDebug.tmp) { if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { SEpSet epSet = {0}; @@ -162,16 +162,25 @@ int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx) { ctx->phase = QW_PHASE_POST_QUERY; qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet); + *rsped = true; return TSDB_CODE_SUCCESS; } if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); + *rsped = true; + return TSDB_CODE_SUCCESS; + } + + if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 3)) { + qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); + *rsped = true; return TSDB_CODE_SUCCESS; } } + *rsped = false; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index c763645a07..7e12a3a424 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -428,7 +428,14 @@ _return: if (QW_PHASE_POST_QUERY == phase && ctx) { ctx->queryRsped = true; - qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); + + bool rsped = false; + SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; + qwDbgResponseRedirect(&qwMsg, ctx, &rsped); + if (!rsped) { + qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); + } + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); } @@ -476,8 +483,6 @@ int32_t qwPreprocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_INIT)); - qwDbgResponseRedirect(qwMsg, ctx); - _return: if (ctx) { @@ -505,7 +510,7 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, const char* sql) { ctx->taskType = qwMsg->msgInfo.taskType; ctx->explain = qwMsg->msgInfo.explain; ctx->needFetch = qwMsg->msgInfo.needFetch; - ctx->queryType = qwMsg->msgType; + ctx->msgType = qwMsg->msgType; //QW_TASK_DLOGL("subplan json string, len:%d, %s", qwMsg->msgLen, qwMsg->msg); @@ -650,7 +655,7 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); - ctx->queryType = qwMsg->msgType; + ctx->msgType = qwMsg->msgType; SOutputData sOutput = {0}; QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); @@ -702,7 +707,11 @@ _return: } if (code || rsp) { - qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + bool rsped = false; + qwDbgResponseRedirect(qwMsg, ctx, &rsped); + if (!rsped) { + qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + } QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), dataLen); } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index a46b293965..bfba81b118 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -138,7 +138,17 @@ int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int3 return TSDB_CODE_SUCCESS; } + if ((execId != pTask->execId) || pTask->waitRetry) { // ignore it + SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); + return TSDB_CODE_SUCCESS; + } + SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId)); + if (NULL == nodeInfo) { // ignore it + SCH_TASK_DLOG("handle not updated since execId %d already not exist, current execId %d, waitRetry %d", execId, pTask->execId, pTask->waitRetry); + return TSDB_CODE_SUCCESS; + } + nodeInfo->handle = handle; SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId); From e0b56ea593a0542fa50de65923576797b091e03c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 16 Jul 2022 15:38:46 +0800 Subject: [PATCH 09/10] fix: fix query retry issue --- source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 1 + source/libs/catalog/src/ctgAsync.c | 6 +-- source/libs/executor/src/executorimpl.c | 1 + source/libs/qworker/inc/qwInt.h | 6 ++- source/libs/qworker/inc/qwMsg.h | 1 + source/libs/qworker/src/qwDbg.c | 58 ++++++++++++++------- source/libs/qworker/src/qwMsg.c | 1 - source/libs/qworker/src/qworker.c | 23 ++++---- source/libs/scheduler/inc/schInt.h | 17 +++--- source/libs/scheduler/src/schJob.c | 46 +++++++++++----- source/libs/scheduler/src/schRemote.c | 12 ++--- source/libs/scheduler/src/schTask.c | 36 ++++++++----- 12 files changed, 132 insertions(+), 76 deletions(-) diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index 6814643b59..1c7edbe6be 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -81,6 +81,7 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { taosWriteQitem(pMgmt->queryWorker.queue, pMsg); return 0; case READ_QUEUE: + case FETCH_QUEUE: dTrace("msg:%p, is created and will put into qnode-fetch queue", pMsg); taosWriteQitem(pMgmt->fetchWorker.queue, pMsg); return 0; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index e77df8f7f2..920acbac2e 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -163,7 +163,7 @@ int32_t ctgInitGetQnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); return TSDB_CODE_SUCCESS; } @@ -178,7 +178,7 @@ int32_t ctgInitGetDnodeTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " the %d task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); return TSDB_CODE_SUCCESS; } @@ -264,7 +264,7 @@ int32_t ctgInitGetSvrVerTask(SCtgJob *pJob, int32_t taskIdx, void* param) { taosArrayPush(pJob->pTasks, &task); - qDebug("QID:0x%" PRIx64 " [%dth] task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); + qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized", pJob->queryId, taskIdx, ctgTaskTypeStr(task.type)); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 89542571ea..72279d63b2 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1978,6 +1978,7 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { qDebug("%s fetch rsp received, index:%d, rows:%d", pSourceDataInfo->taskId, index, pRsp->numOfRows); } else { pSourceDataInfo->code = code; + qDebug("%s fetch rsp received, index:%d, error:%d", pSourceDataInfo->taskId, index, tstrerror(code)); } pSourceDataInfo->status = EX_SOURCE_DATA_READY; diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 56af7ee1e0..c8e5204e91 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -77,7 +77,7 @@ typedef struct SQWDebug { bool dumpEnable; bool sleepSimulate; bool deadSimulate; - bool tmp; + bool redirectSimulate; } SQWDebug; extern SQWDebug gQWDebug; @@ -380,7 +380,9 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet); int32_t qwAddTaskCtx(QW_FPARAMS_DEF); -int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped); +void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped); +void qwDbgSimulateSleep(void); +void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); #ifdef __cplusplus diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 16fe6b21c2..5378934343 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -46,6 +46,7 @@ int32_t qwMallocFetchRsp(int32_t length, SRetrieveTableRsp **rsp); int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn); +int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); #ifdef __cplusplus } diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 4e174018f4..98d7825b2c 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -9,7 +9,7 @@ #include "tmsg.h" #include "tname.h" -SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = true, .tmp = false}; +SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .redirectSimulate = false, .deadSimulate = false, .sleepSimulate = false}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { @@ -147,8 +147,17 @@ int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int return TSDB_CODE_SUCCESS; } -int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { - if (gQWDebug.tmp) { +void qwDbgSimulateRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { + static int32_t ignoreTime = 0; + if (*rsped) { + return; + } + + if (gQWDebug.redirectSimulate) { + if (++ignoreTime <= 10) { + return; + } + if (TDMT_SCH_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { SEpSet epSet = {0}; epSet.inUse = 1; @@ -163,44 +172,55 @@ int32_t qwDbgResponseRedirect(SQWMsg *qwMsg, SQWTaskCtx *ctx, bool *rsped) { ctx->phase = QW_PHASE_POST_QUERY; qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, &epSet); *rsped = true; - return TSDB_CODE_SUCCESS; + return; } if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType && (0 == taosRand() % 3)) { QW_SET_PHASE(ctx, QW_PHASE_POST_QUERY); qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); *rsped = true; - return TSDB_CODE_SUCCESS; + return; } - if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 3)) { + if ((TDMT_SCH_FETCH == qwMsg->msgType) && (0 == taosRand() % 9)) { qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); *rsped = true; - return TSDB_CODE_SUCCESS; + return; } } - - *rsped = false; - return TSDB_CODE_SUCCESS; } -void qwDbgSimulateSleep() { +void qwDbgSimulateSleep(void) { if (!gQWDebug.sleepSimulate) { return; } - taosSsleep(taosRand() % 10); + static int32_t ignoreTime = 0; + if (++ignoreTime > 10) { + taosSsleep(taosRand() % 20); + } } -void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t msgType) { +void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) { if (!gQWDebug.deadSimulate) { return; } - SRpcHandleInfo *pConn = ((msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo : &ctx->ctrlConnInfo); - qwBuildAndSendErrorRsp(msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK); + if (*rsped) { + return; + } - qwDropTask(QW_FPARAMS()); + static int32_t ignoreTime = 0; + + if (++ignoreTime > 10 && 0 == taosRand() % 9) { + SRpcHandleInfo *pConn = ((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo : &ctx->ctrlConnInfo); + qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK); + + qwBuildAndSendDropMsg(QW_FPARAMS(), pConn); + *rsped = true; + + return; + } } @@ -236,9 +256,9 @@ int32_t qwDbgEnableDebug(char *option) { return TSDB_CODE_SUCCESS; } - if (0 == strcasecmp(option, "tmp")) { - gQWDebug.tmp = true; - qError("qw tmp debug enabled"); + if (0 == strcasecmp(option, "redirect")) { + gQWDebug.redirectSimulate = true; + qError("qw redirect debug enabled"); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 8bbd03f735..b8d9957c30 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -198,7 +198,6 @@ int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { int32_t code = tmsgPutToQueue(&mgmt->msgCb, FETCH_QUEUE, &pNewMsg); if (TSDB_CODE_SUCCESS != code) { QW_SCH_TASK_ELOG("put drop task msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code)); - rpcFreeCont(req); QW_ERR_RET(code); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 7e12a3a424..e99695e962 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -83,6 +83,7 @@ int32_t qwExecTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *queryEnd) { // if *taskHandle is NULL, it's killed right now if (taskHandle) { + qwDbgSimulateSleep(); code = qExecTask(taskHandle, &pRes, &useconds); if (code) { if (code != TSDB_CODE_OPS_NOT_SUPPORT) { @@ -431,12 +432,12 @@ _return: bool rsped = false; SQWMsg qwMsg = {.msgType = ctx->msgType, .connInfo = ctx->ctrlConnInfo}; - qwDbgResponseRedirect(&qwMsg, ctx, &rsped); + qwDbgSimulateRedirect(&qwMsg, ctx, &rsped); + qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); if (!rsped) { qwBuildAndSendQueryRsp(input->msgType + 1, &ctx->ctrlConnInfo, code, ctx); - } - - QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); + QW_TASK_DLOG("query msg rsped, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); + } } if (ctx) { @@ -656,13 +657,12 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); ctx->msgType = qwMsg->msgType; + ctx->dataConnInfo = qwMsg->connInfo; SOutputData sOutput = {0}; QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput)); if (NULL == rsp) { - ctx->dataConnInfo = qwMsg->connInfo; - QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_FETCH); } else { bool qComplete = (DS_BUF_EMPTY == sOutput.bufStatus && sOutput.queryEnd); @@ -708,12 +708,15 @@ _return: if (code || rsp) { bool rsped = false; - qwDbgResponseRedirect(qwMsg, ctx, &rsped); + if (ctx) { + qwDbgSimulateRedirect(qwMsg, ctx, &rsped); + qwDbgSimulateDead(QW_FPARAMS(), ctx, &rsped); + } if (!rsped) { qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code); + QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), + dataLen); } - QW_TASK_DLOG("%s send, handle:%p, code:%x - %s, dataLen:%d", TMSG_INFO(qwMsg->msgType + 1), qwMsg->connInfo.handle, code, tstrerror(code), - dataLen); } QW_RET(TSDB_CODE_SUCCESS); @@ -745,8 +748,6 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (!dropped) { - ctx->ctrlConnInfo = qwMsg->connInfo; - QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 4979a41f17..65b45cc612 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -55,13 +55,11 @@ typedef enum { #define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 #define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT #define SCHEDULE_DEFAULT_POLICY SCH_LOAD_SEQ +#define SCHEDULE_DEFAULT_MAX_NODE_NUM 20 #define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000 #define SCH_MAX_TASK_TIMEOUT_USEC 60000000 -#define SCH_MAX_CANDIDATE_EP_NUM (TSDB_MAX_REPLICA + 100) - - - +#define SCH_DEFAULT_MAX_RETRY_NUM 6 typedef struct SSchDebug { bool lockEnable; @@ -275,7 +273,8 @@ typedef struct SSchJob { int32_t errCode; SRWLatch resLock; SExecResult execRes; - void *resData; //TODO free it or not + void *fetchRes; //TODO free it or not + bool fetched; int32_t resNumOfRows; SSchResInfo userRes; const char *sql; @@ -327,7 +326,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode) #define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) #define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) (SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)))) -#define SCH_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH) +#define SCH_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH) #define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) (SCH_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen)))) #define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR) @@ -369,6 +368,8 @@ extern SSchedulerMgmt schMgmt; qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) #define SCH_TASK_DLOG(param, ...) \ qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) +#define SCH_TASK_TLOG(param, ...) \ + qTrace("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) #define SCH_TASK_DLOGL(param, ...) \ qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) #define SCH_TASK_WLOG(param, ...) \ @@ -442,7 +443,7 @@ void schFreeRpcCtx(SRpcCtx *pCtx); int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp); bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus); int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask); -int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp); +int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp); int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRsp *pRsp); void schProcessOnDataFetched(SSchJob *job); int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask); @@ -493,7 +494,7 @@ int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask); void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode); int32_t schHandleJobFailure(SSchJob *pJob, int32_t errCode); int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode); -bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync); +bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync); extern SSchDebug gSCHDebug; diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 1b1268baf1..13a369fac9 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -110,7 +110,7 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { break; case JOB_TASK_STATUS_PART_SUCC: if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC && - newStatus != JOB_TASK_STATUS_DROP) { + newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) { SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -389,13 +389,18 @@ int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) { int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) { int32_t code = 0; - if (pJob->resData && ((SRetrieveTableRsp *)pJob->resData)->completed) { - SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL)); + + SCH_LOCK(SCH_WRITE, &pJob->resLock); + + pJob->fetched = true; + + if (pJob->fetchRes && ((SRetrieveTableRsp *)pJob->fetchRes)->completed) { + SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_SUCC, NULL)); } while (true) { - *pData = atomic_load_ptr(&pJob->resData); - if (*pData != atomic_val_compare_exchange_ptr(&pJob->resData, *pData, NULL)) { + *pData = atomic_load_ptr(&pJob->fetchRes); + if (*pData != atomic_val_compare_exchange_ptr(&pJob->fetchRes, *pData, NULL)) { continue; } @@ -414,7 +419,11 @@ int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) { SCH_JOB_DLOG("fetch done, totalRows:%d", pJob->resNumOfRows); - return TSDB_CODE_SUCCESS; +_return: + + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + + return code; } int32_t schNotifyUserExecRes(SSchJob* pJob) { @@ -512,8 +521,12 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode) { } -int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { - schPostJobRes(pJob, SCH_OP_EXEC); +int32_t schProcessOnJobPartialSuccess(SSchJob *pJob) { + if (schChkCurrentOp(pJob, SCH_OP_FETCH, -1)) { + SCH_ERR_RET(schLaunchFetchTask(pJob)); + } else { + schPostJobRes(pJob, 0); + } return TSDB_CODE_SUCCESS; } @@ -526,7 +539,7 @@ int32_t schProcessOnExplainDone(SSchJob *pJob, SSchTask *pTask, SRetrieveTableRs SCH_TASK_DLOG("got explain rsp, rows:%d, complete:%d", htonl(pRsp->numOfRows), pRsp->completed); atomic_store_32(&pJob->resNumOfRows, htonl(pRsp->numOfRows)); - atomic_store_ptr(&pJob->resData, pRsp); + atomic_store_ptr(&pJob->fetchRes, pRsp); SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC); @@ -561,7 +574,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schSaveJobQueryRes(SSchJob *pJob, SQueryTableRsp *rsp) { +int32_t schSaveJobExecRes(SSchJob *pJob, SQueryTableRsp *rsp) { if (rsp->tbFName[0]) { SCH_LOCK(SCH_WRITE, &pJob->resLock); @@ -600,7 +613,7 @@ int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask) { int32_t schLaunchJob(SSchJob *pJob) { if (EXPLAIN_MODE_STATIC == pJob->attr.explainMode) { - SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->resData)); + SCH_ERR_RET(qExecStaticExplain(pJob->pDag, (SRetrieveTableRsp **)&pJob->fetchRes)); SCH_ERR_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); } else { SSchLevel *level = taosArrayGet(pJob->levels, pJob->levelIdx); @@ -661,7 +674,7 @@ void schFreeJobImpl(void *job) { qDestroyQueryPlan(pJob->pDag); taosMemoryFreeClear(pJob->userRes.execRes); - taosMemoryFreeClear(pJob->resData); + taosMemoryFreeClear(pJob->fetchRes); taosMemoryFree(pJob); int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1); @@ -795,9 +808,14 @@ void schDirectPostJobRes(SSchedulerReq* pReq, int32_t errCode) { } } -bool schChkCurrentOp(SSchJob *pJob, int32_t op, bool sync) { +bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) { + bool r = false; SCH_LOCK(SCH_READ, &pJob->opStatus.lock); - bool r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync); + if (sync >= 0) { + r = (pJob->opStatus.op == op) && (pJob->opStatus.syncReq == sync); + } else { + r = (pJob->opStatus.op == op); + } SCH_UNLOCK(SCH_READ, &pJob->opStatus.lock); return r; diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 41d9f46a87..6983bbf013 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -256,7 +256,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SCH_ERR_JRET(rsp->code); - SCH_ERR_JRET(schSaveJobQueryRes(pJob, rsp)); + SCH_ERR_JRET(schSaveJobExecRes(pJob, rsp)); atomic_add_fetch_32(&pJob->resNumOfRows, rsp->affectedRows); @@ -277,8 +277,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } - if (pJob->resData) { - SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->resData); + if (pJob->fetchRes) { + SCH_TASK_ELOG("explain result is already generated, res:%p", pJob->fetchRes); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } @@ -325,13 +325,13 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa return TSDB_CODE_SUCCESS; } - if (pJob->resData) { - SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->resData); + if (pJob->fetchRes) { + SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->fetchRes); taosMemoryFreeClear(rsp); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } - atomic_store_ptr(&pJob->resData, rsp); + atomic_store_ptr(&pJob->fetchRes, rsp); atomic_add_fetch_32(&pJob->resNumOfRows, htonl(rsp->numOfRows)); if (rsp->completed) { diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index bfba81b118..282e81bb5d 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -47,10 +47,10 @@ void schFreeTask(SSchJob *pJob, SSchTask *pTask) { void schInitTaskRetryTimes(SSchJob *pJob, SSchTask *pTask, SSchLevel *pLevel) { if (SCH_IS_DATA_BIND_TASK(pTask) || (!SCH_IS_QUERY_JOB(pJob)) || (SCH_ALL != schMgmt.cfg.schPolicy)) { - pTask->maxRetryTimes = SCH_MAX_CANDIDATE_EP_NUM; + pTask->maxRetryTimes = SCH_DEFAULT_MAX_RETRY_NUM; } else { int32_t nodeNum = taosArrayGetSize(pJob->nodeList); - pTask->maxRetryTimes = TMAX(nodeNum, SCH_MAX_CANDIDATE_EP_NUM); + pTask->maxRetryTimes = TMAX(nodeNum, SCH_DEFAULT_MAX_RETRY_NUM); } pTask->maxExecTimes = pTask->maxRetryTimes * (pLevel->level + 1); @@ -64,11 +64,11 @@ int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel * pTask->execId = -1; pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; pTask->taskId = schGenTaskId(); - pTask->execNodes = - taosHashInit(SCH_MAX_CANDIDATE_EP_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); schInitTaskRetryTimes(pJob, pTask, pLevel); + pTask->execNodes = + taosHashInit(pTask->maxExecTimes, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); pTask->profile.execTime = taosArrayInit(pTask->maxExecTimes, sizeof(int64_t)); if (NULL == pTask->execNodes || NULL == pTask->profile.execTime) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -405,6 +405,18 @@ _return: int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; + if (JOB_TASK_STATUS_PART_SUCC == pJob->status) { + SCH_LOCK(SCH_WRITE, &pJob->resLock); + if (pJob->fetched) { + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode)); + SCH_ERR_RET(rspCode); + } + SCH_UNLOCK(SCH_WRITE, &pJob->resLock); + + schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC); + } + if (SCH_IS_DATA_BIND_TASK(pTask)) { if (NULL == pData->pEpSet) { SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode)); @@ -602,7 +614,7 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { if (pJob->nodeList) { nodeNum = taosArrayGetSize(pJob->nodeList); - for (int32_t i = 0; i < nodeNum && addNum < SCH_MAX_CANDIDATE_EP_NUM; ++i) { + for (int32_t i = 0; i < nodeNum; ++i) { SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i); SQueryNodeAddr *naddr = &nload->addr; @@ -611,8 +623,8 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("set %dth candidate addr, id %d, fqdn:%s, port:%d", i, naddr->nodeId, SCH_GET_CUR_EP(naddr)->fqdn, - SCH_GET_CUR_EP(naddr)->port); + SCH_TASK_TLOG("set %dth candidate addr, id %d, inUse:%d/%d, fqdn:%s, port:%d", i, naddr->nodeId, naddr->epSet.inUse, naddr->epSet.numOfEps, + SCH_GET_CUR_EP(naddr)->fqdn, SCH_GET_CUR_EP(naddr)->port); ++addNum; } @@ -632,9 +644,9 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { } pTask->candidateIdx = 0; - pTask->candidateAddrs = taosArrayInit(SCH_MAX_CANDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); + pTask->candidateAddrs = taosArrayInit(SCHEDULE_DEFAULT_MAX_NODE_NUM, sizeof(SQueryNodeAddr)); if (NULL == pTask->candidateAddrs) { - SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCH_MAX_CANDIDATE_EP_NUM); + SCH_TASK_ELOG("taosArrayInit %d condidate addrs failed", SCHEDULE_DEFAULT_MAX_NODE_NUM); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -897,9 +909,9 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { int32_t schLaunchFetchTask(SSchJob *pJob) { int32_t code = 0; - void *resData = atomic_load_ptr(&pJob->resData); - if (resData) { - SCH_JOB_DLOG("res already fetched, res:%p", resData); + void *fetchRes = atomic_load_ptr(&pJob->fetchRes); + if (fetchRes) { + SCH_JOB_DLOG("res already fetched, res:%p", fetchRes); return TSDB_CODE_SUCCESS; } From 81c25a1a3014057f9e45f05fca40644f725da952 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 16 Jul 2022 17:32:10 +0800 Subject: [PATCH 10/10] fix: fix mnode close issue --- source/dnode/mnode/impl/src/mndMain.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 120b98d85a..041fc2a2d1 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -42,7 +42,6 @@ #include "mndTrans.h" #include "mndUser.h" #include "mndVgroup.h" -#include "qworker.h" static void *mndBuildTimerMsg(int32_t *pContLen) { SMTimerReq timerReq = {0}; @@ -387,7 +386,6 @@ void mndPreClose(SMnode *pMnode) { void mndClose(SMnode *pMnode) { if (pMnode != NULL) { mDebug("start to close mnode"); - qWorkerDestroy((void **)&pMnode->pQuery); mndCleanupSteps(pMnode, -1); taosMemoryFreeClear(pMnode->path); taosMemoryFreeClear(pMnode);