diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index d210004760..412b4b4cf6 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -152,7 +152,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); * @param tinfo qhandle * @return */ -int32_t qAsyncKillTask(qTaskInfo_t tinfo); +int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); /** * destroy query info structure diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index a97368ffee..6319281212 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -260,7 +260,7 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) -#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) +#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR || (_code) == TSDB_CODE_VND_STOPPED) #define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) #define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 6f9fa7623d..a6155f6611 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -343,6 +343,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_COL_NOT_EXISTS TAOS_DEF_ERROR_CODE(0, 0x0526) #define TSDB_CODE_VND_COL_SUBSCRIBED TAOS_DEF_ERROR_CODE(0, 0x0527) #define TSDB_CODE_VND_NO_AVAIL_BUFPOOL TAOS_DEF_ERROR_CODE(0, 0x0528) +#define TSDB_CODE_VND_STOPPED TAOS_DEF_ERROR_CODE(0, 0x0529) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/dnode/mgmt/node_mgmt/src/dmTransport.c b/source/dnode/mgmt/node_mgmt/src/dmTransport.c index 1bd7846d1c..d037cc3375 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmTransport.c +++ b/source/dnode/mgmt/node_mgmt/src/dmTransport.c @@ -241,7 +241,8 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) { rpcRe static bool rpcRfp(int32_t code, tmsg_t msgType) { if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED || - code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) { + code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK || + code == TSDB_CODE_VND_STOPPED) { if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) { return false; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 7100be58e3..3dbcd8a07e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -781,7 +781,7 @@ void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t checkForQueryBuf(size_t numOfTables); bool isTaskKilled(SExecTaskInfo* pTaskInfo); -void setTaskKilled(SExecTaskInfo* pTaskInfo); +void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode); void doDestroyTask(SExecTaskInfo* pTaskInfo); void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 963a273290..53660d88e1 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -70,7 +70,7 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + longjmp(pTaskInfo->env, pTaskInfo->code); } for (int32_t i = 0; i < totalSources; ++i) { @@ -573,7 +573,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + longjmp(pTaskInfo->env, pTaskInfo->code); } tsem_post(&pExchangeInfo->ready); @@ -621,7 +621,7 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { doSendFetchDataRequest(pExchangeInfo, pTaskInfo, pExchangeInfo->current); tsem_wait(&pExchangeInfo->ready); if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + longjmp(pTaskInfo->env, pTaskInfo->code); } SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 10ceb9ccee..ebd1afa855 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -688,7 +688,7 @@ void qStopTaskOperators(SExecTaskInfo* pTaskInfo) { taosWUnLockLatch(&pTaskInfo->stopInfo.lock); } -int32_t qAsyncKillTask(qTaskInfo_t qinfo) { +int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo; if (pTaskInfo == NULL) { @@ -697,7 +697,7 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo) { qDebug("%s execTask async killed", GET_TASKID(pTaskInfo)); - setTaskKilled(pTaskInfo); + setTaskKilled(pTaskInfo, rspCode); qStopTaskOperators(pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index d0d8b42442..dd527058ce 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -610,21 +610,10 @@ void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExprInfo, SSDataBlock* pB } bool isTaskKilled(SExecTaskInfo* pTaskInfo) { - // query has been executed more than tsShellActivityTimer, and the retrieve has not arrived - // abort current query execution. - if (pTaskInfo->owner != 0 && - ((taosGetTimestampSec() - pTaskInfo->cost.start / 1000) > 10 * getMaximumIdleDurationSec()) - /*(!needBuildResAfterQueryComplete(pTaskInfo))*/) { - assert(pTaskInfo->cost.start != 0); - // qDebug("QInfo:%" PRIu64 " retrieve not arrive beyond %d ms, abort current query execution, start:%" PRId64 - // ", current:%d", pQInfo->qId, 1, pQInfo->startExecTs, taosGetTimestampSec()); - // return true; - } - - return false; + return (0 != pTaskInfo->code) ? true : false; } -void setTaskKilled(SExecTaskInfo* pTaskInfo) { pTaskInfo->code = TSDB_CODE_TSC_QUERY_CANCELLED; } +void setTaskKilled(SExecTaskInfo* pTaskInfo, int32_t rspCode) { pTaskInfo->code = rspCode; } ///////////////////////////////////////////////////////////////////////////////////////////// STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int64_t key) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7f0dec1959..cf0e7b532f 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -629,7 +629,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { while (tsdbNextDataBlock(pTableScanInfo->base.dataReader)) { if (isTaskKilled(pTaskInfo)) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } // process this data block based on the probabilities @@ -2032,7 +2032,7 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { if (pInfo->dataReader && tsdbNextDataBlock(pInfo->dataReader)) { if (isTaskKilled(pTaskInfo)) { - longjmp(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + longjmp(pTaskInfo->env, pTaskInfo->code); } int32_t rows = 0; @@ -2529,7 +2529,7 @@ static SSDataBlock* getTableDataBlockImpl(void* param) { STsdbReader* reader = pInfo->base.dataReader; while (tsdbNextDataBlock(reader)) { if (isTaskKilled(pTaskInfo)) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } // process this data block based on the probabilities diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index a0e04b6a19..af361323a7 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -363,7 +363,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx); -int32_t qwKillTaskHandle(SQWTaskCtx *ctx); +int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode); int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status); int32_t qwDropTask(QW_FPARAMS_DEF); void qwSaveTbVersionInfo(qTaskInfo_t pTaskInfo, SQWTaskCtx *ctx); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 2c0a4072ae..86fd1d533c 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -279,14 +279,14 @@ void qwFreeTaskHandle(qTaskInfo_t *taskHandle) { } } -int32_t qwKillTaskHandle(SQWTaskCtx *ctx) { +int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { int32_t code = 0; // Note: free/kill may in RC qTaskInfo_t taskHandle = atomic_load_ptr(&ctx->taskHandle); if (taskHandle && atomic_val_compare_exchange_ptr(&ctx->taskHandle, taskHandle, NULL)) { qDebug("start to kill task"); - code = qAsyncKillTask(taskHandle); + code = qAsyncKillTask(taskHandle, rspCode); atomic_store_ptr(&ctx->taskHandle, taskHandle); } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 0890d10b65..9a318df324 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -411,7 +411,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXEC)); @@ -420,7 +420,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu case QW_PHASE_PRE_FETCH: { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP) || QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task dropping or already dropped, phase:%s", qwPhaseStr(phase)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } if (QW_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { @@ -442,7 +442,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu case QW_PHASE_PRE_CQUERY: { if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } if (ctx->rspCode) { @@ -456,7 +456,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu // qwBuildAndSendDropRsp(&ctx->ctrlConnInfo, code); // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } break; @@ -502,7 +502,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp if (QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { @@ -515,7 +515,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp // QW_TASK_DLOG("drop rsp send, handle:%p, code:%x - %s", ctx->ctrlConnInfo.handle, code, tstrerror(code)); QW_ERR_JRET(qwDropTask(QW_FPARAMS())); - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); + QW_ERR_JRET(ctx->rspCode); } if (ctx->rspCode) { @@ -861,7 +861,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (QW_QUERY_RUNNING(ctx)) { - QW_ERR_JRET(qwKillTaskHandle(ctx)); + QW_ERR_JRET(qwKillTaskHandle(ctx, TSDB_CODE_TSC_QUERY_CANCELLED)); qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROP); } else { QW_ERR_JRET(qwDropTask(QW_FPARAMS())); @@ -869,6 +869,7 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { } if (!dropped) { + QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_TSC_QUERY_CANCELLED); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } @@ -1195,8 +1196,9 @@ void qWorkerStopAllTasks(void *qWorkerMgmt) { } if (QW_QUERY_RUNNING(ctx)) { - qwKillTaskHandle(ctx); + qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); } else if (!QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 8a48977c77..02b341e28c 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -302,7 +302,7 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) { return 0; } -int32_t qwtKillTask(qTaskInfo_t qinfo) { return 0; } +int32_t qwtKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return 0; } void qwtDestroyTask(qTaskInfo_t qHandle) {} diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9a7f3332b3..c8b8db367f 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -391,6 +391,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) int64_t leftTime = tsMaxRetryWaitTime - lastTime; pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs; + pCtx->roundTimes = 0; + goto _return; } @@ -407,7 +409,7 @@ _return: int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; - SCH_TASK_DLOG("task will be redirected now, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); + SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode)); if (NULL == pData) { pTask->retryTimes = 0; @@ -430,15 +432,15 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 if (SCH_IS_DATA_BIND_TASK(pTask)) { if (pData && pData->pEpSet) { SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); - } else if (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(rspCode)) { + } else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) { + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + SEp *pEp = &addr->epSet.eps[addr->epSet.inUse]; + SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse, + addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode)); + } else { SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SCH_SWITCH_EPSET(addr); SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps); - } else { - SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); - SEp *pEp = &addr->epSet.eps[addr->epSet.inUse]; - SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d", addr->nodeId, addr->epSet.inUse, - addr->epSet.numOfEps, pEp->fqdn, pEp->port); } if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index e92b44f8c2..a9afbd7ba8 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1519,7 +1519,7 @@ bool cliGenRetryRule(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { transFreeMsg(pResp->pCont); transUnrefCliHandle(pConn); } else if (code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_SYN_INTERNAL_ERROR || - code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT) { + code == TSDB_CODE_SYN_PROPOSE_NOT_READY || code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_VND_STOPPED) { tTrace("code str %s, contlen:%d 1", tstrerror(code), pResp->contLen); noDelay = cliResetEpset(pCtx, pResp, true); transFreeMsg(pResp->pCont); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 3e8c016a74..f5d11e3e96 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -321,6 +321,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_ALREADY_EXISTS, "Table column already TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_NOT_EXISTS, "Table column not exists") TAOS_DEFINE_ERROR(TSDB_CODE_VND_COL_SUBSCRIBED, "Table column is subscribed") TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer pool") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")