diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c5b0b89311..2876105748 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1494,6 +1494,7 @@ typedef struct SSubQueryMsg { uint64_t queryId; uint64_t taskId; int64_t refId; + int32_t execId; int8_t taskType; int8_t explain; uint32_t sqlLen; // the query sql, @@ -1513,6 +1514,7 @@ typedef struct { uint64_t sId; uint64_t queryId; uint64_t taskId; + int32_t execId; } SQueryContinueReq; typedef struct { @@ -1534,6 +1536,7 @@ typedef struct { uint64_t sId; uint64_t queryId; uint64_t taskId; + int32_t execId; } SResFetchReq; typedef struct { @@ -1545,6 +1548,7 @@ typedef struct { uint64_t queryId; uint64_t taskId; int64_t refId; + int32_t execId; int8_t status; } STaskStatus; @@ -1602,6 +1606,7 @@ typedef struct { uint64_t queryId; uint64_t taskId; int64_t refId; + int32_t execId; } STaskDropReq; typedef struct { diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 4671c8b81e..8031b16d93 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -346,6 +346,7 @@ typedef struct SDownstreamSourceNode { SQueryNodeAddr addr; uint64_t taskId; uint64_t schedId; + int32_t execId; } SDownstreamSourceNode; typedef struct SExchangePhysiNode { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e9b5c67d76..23b33674e4 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4312,6 +4312,7 @@ int32_t tSerializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp *pR if (tEncodeU64(&encoder, status->queryId) < 0) return -1; if (tEncodeU64(&encoder, status->taskId) < 0) return -1; if (tEncodeI64(&encoder, status->refId) < 0) return -1; + if (tEncodeI32(&encoder, status->execId) < 0) return -1; if (tEncodeI8(&encoder, status->status) < 0) return -1; } } else { @@ -4342,6 +4343,7 @@ int32_t tDeserializeSSchedulerHbRsp(void *buf, int32_t bufLen, SSchedulerHbRsp * if (tDecodeU64(&decoder, &status.queryId) < 0) return -1; if (tDecodeU64(&decoder, &status.taskId) < 0) return -1; if (tDecodeI64(&decoder, &status.refId) < 0) return -1; + if (tDecodeI32(&decoder, &status.execId) < 0) return -1; if (tDecodeI8(&decoder, &status.status) < 0) return -1; taosArrayPush(pRsp->taskStatus, &status); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c6f1096bfe..535272bc42 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2010,13 +2010,14 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY); - qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, GET_TASKID(pTaskInfo), - pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources); + qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", execId:%d, %d/%" PRIzu, GET_TASKID(pTaskInfo), + pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, pSource->execId, sourceIndex, totalSources); pMsg->header.vgId = htonl(pSource->addr.nodeId); pMsg->sId = htobe64(pSource->schedId); pMsg->taskId = htobe64(pSource->taskId); pMsg->queryId = htobe64(pTaskInfo->id.queryId); + pMsg->execId = htonl(pSource->execId); // send the fetch remote task result reques SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -2145,9 +2146,9 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx SSDataBlock* pRes = pExchangeInfo->pResult; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { - qDebug("%s vgId:%d, taskId:0x%" PRIx64 " index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 + qDebug("%s vgId:%d, taskId:0x%" PRIx64 " execId:%d index:%d completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", completed:%d try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pDataInfo->totalRows, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pDataInfo->totalRows, pExchangeInfo->loadInfo.totalRows, completed + 1, i + 1, totalSources); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; completed += 1; @@ -2165,17 +2166,17 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx } if (pRsp->completed == 1) { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d" " index:%d completed, numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 ", completed:%d try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i, pRes->info.rows, pDataInfo->totalRows, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRes->info.rows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, completed + 1, i + 1, totalSources); completed += 1; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; } else { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, pLoadInfo->totalRows, pLoadInfo->totalSize); } @@ -2249,8 +2250,8 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, pExchangeInfo->current); if (pDataInfo->code != TSDB_CODE_SUCCESS) { - qError("%s vgId:%d, taskID:0x%" PRIx64 " error happens, code:%s", GET_TASKID(pTaskInfo), pSource->addr.nodeId, - pSource->taskId, tstrerror(pDataInfo->code)); + qError("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d error happens, code:%s", GET_TASKID(pTaskInfo), pSource->addr.nodeId, + pSource->taskId, pSource->execId, tstrerror(pDataInfo->code)); pOperator->pTaskInfo->code = pDataInfo->code; return NULL; } @@ -2258,9 +2259,9 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { SRetrieveTableRsp* pRsp = pDataInfo->pRsp; SLoadRemoteDataInfo* pLoadInfo = &pExchangeInfo->loadInfo; if (pRsp->numOfRows == 0) { - qDebug("%s vgId:%d, taskID:0x%" PRIx64 " %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 + qDebug("%s vgId:%d, taskID:0x%" PRIx64 " execId:%d %d of total completed, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " try next", - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; @@ -2276,17 +2277,17 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) { pTableRsp->compLen, pTableRsp->numOfCols, startTs, &pDataInfo->totalRows, NULL); if (pRsp->completed == 1) { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, rowsOfSource:%" PRIu64 + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pExchangeInfo->current += 1; } else { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows, + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRes->info.rows, pLoadInfo->totalRows, pLoadInfo->totalSize); } @@ -2378,7 +2379,7 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* } for (int32_t i = 0; i < numOfSources; ++i) { - SNodeListNode* pNode = (SNodeListNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i); + SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i); taosArrayPush(pInfo->pSources, pNode); } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index b372bf75fc..97b8baec58 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -582,6 +582,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre COPY_OBJECT_FIELD(addr, sizeof(SQueryNodeAddr)); COPY_SCALAR_FIELD(taskId); COPY_SCALAR_FIELD(schedId); + COPY_SCALAR_FIELD(execId); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index df7429bd88..4375a7b04c 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -3441,6 +3441,9 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceSchedId, pNode->schedId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceSchedId, pNode->execId); + } return code; } @@ -3455,6 +3458,9 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceSchedId, &pNode->schedId); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetIntValue(pJson, jkDownstreamSourceSchedId, &pNode->execId); + } return code; } diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 4edd7a8a6e..4fa2615470 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -75,6 +75,7 @@ typedef struct SQWDebug { bool lockEnable; bool statusEnable; bool dumpEnable; + bool tmp; } SQWDebug; extern SQWDebug gQWDebug; @@ -122,6 +123,7 @@ typedef struct SQWTaskCtx { int8_t taskType; int8_t explain; int32_t queryType; + int32_t execId; bool queryFetched; bool queryEnd; @@ -200,8 +202,8 @@ typedef struct SQWorkerMgmt { int32_t paramIdx; } SQWorkerMgmt; -#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId -#define QW_IDS() sId, qId, tId, rId +#define QW_FPARAMS_DEF SQWorker *mgmt, uint64_t sId, uint64_t qId, uint64_t tId, int64_t rId, int32_t eId +#define QW_IDS() sId, qId, tId, rId, eId #define QW_FPARAMS() mgmt, QW_IDS() #define QW_STAT_INC(_item, _n) atomic_add_fetch_64(&(_item), _n) @@ -226,15 +228,18 @@ typedef struct SQWorkerMgmt { #define QW_TASK_READY(status) \ (status == JOB_TASK_STATUS_SUCCEED || status == JOB_TASK_STATUS_FAILED || status == JOB_TASK_STATUS_CANCELLED || \ status == JOB_TASK_STATUS_PARTIAL_SUCCEED) -#define QW_SET_QTID(id, qId, tId) \ - do { \ - *(uint64_t *)(id) = (qId); \ - *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); \ +#define QW_SET_QTID(id, qId, tId, eId) \ + do { \ + *(uint64_t *)(id) = (qId); \ + *(uint64_t *)((char *)(id) + sizeof(qId)) = (tId); \ + *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)) = (eId); \ } while (0) -#define QW_GET_QTID(id, qId, tId) \ - do { \ - (qId) = *(uint64_t *)(id); \ - (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \ + +#define QW_GET_QTID(id, qId, tId, eId) \ + do { \ + (qId) = *(uint64_t *)(id); \ + (tId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \ + (eId) = *(int32_t *)((char *)(id) + sizeof(qId) + sizeof(tId)); \ } while (0) #define QW_ERR_RET(c) \ @@ -365,6 +370,8 @@ void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx); void qwDbgDumpMgmtInfo(SQWorker *mgmt); int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore); +int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet); +int32_t qwAddTaskCtx(QW_FPARAMS_DEF); #ifdef __cplusplus diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index add9700a3a..0fa01a304c 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -9,7 +9,7 @@ #include "tmsg.h" #include "tname.h" -SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false}; +SQWDebug gQWDebug = {.statusEnable = true, .dumpEnable = false, .tmp = true}; int32_t qwDbgValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus, bool *ignore) { if (!gQWDebug.statusEnable) { @@ -121,3 +121,60 @@ void qwDbgDumpMgmtInfo(SQWorker *mgmt) { } +int32_t qwDbgBuildAndSendRedirectRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, SEpSet *pEpSet) { + int32_t contLen = 0; + char* rsp = NULL; + + if (pEpSet) { + contLen = tSerializeSEpSet(NULL, 0, pEpSet); + rsp = rpcMallocCont(contLen); + tSerializeSEpSet(rsp, contLen, pEpSet); + } + + SRpcMsg rpcRsp = { + .msgType = rspType, + .pCont = rsp, + .contLen = contLen, + .code = code, + .info = *pConn, + }; + + tmsgSendRsp(&rpcRsp); + + qDebug("response %s msg, code: %s", TMSG_INFO(rspType), tstrerror(code)); + + return TSDB_CODE_SUCCESS; +} + + +int32_t qwDbgEnableDebug(char *option) { + if (0 == strcasecmp(option, "lock")) { + gQWDebug.lockEnable = true; + qDebug("qw lock debug enabled"); + return TSDB_CODE_SUCCESS; + } + + if (0 == strcasecmp(option, "status")) { + gQWDebug.statusEnable = true; + qDebug("qw status debug enabled"); + return TSDB_CODE_SUCCESS; + } + + if (0 == strcasecmp(option, "dump")) { + gQWDebug.dumpEnable = true; + qDebug("qw dump debug enabled"); + return TSDB_CODE_SUCCESS; + } + + if (0 == strcasecmp(option, "tmp")) { + gQWDebug.tmp = true; + qDebug("qw tmp debug enabled"); + return TSDB_CODE_SUCCESS; + } + + qError("invalid qw debug option:%s", option); + + return TSDB_CODE_APP_ERROR; +} + + diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 30772ff1ac..cc642caa70 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -156,6 +156,41 @@ int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code) { return TSDB_CODE_SUCCESS; } +int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { + STaskDropReq *req = (STaskDropReq *)rpcMallocCont(sizeof(STaskDropReq)); + if (NULL == req) { + QW_SCH_TASK_ELOG("rpcMallocCont %d failed", (int32_t)sizeof(STaskDropReq)); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + req->header.vgId = mgmt->nodeId; + req->sId = sId; + req->queryId = qId; + req->taskId = tId; + req->refId = rId; + req->execId = eId; + + SRpcMsg pNewMsg = { + .msgType = TDMT_SCH_DROP_TASK, + .pCont = req, + .contLen = sizeof(STaskDropReq), + .code = 0, + .info = *pConn, + }; + + int32_t code = tmsgPutToQueue(&mgmt->msgCb, FETCH_QUEUE, &pNewMsg); + if (TSDB_CODE_SUCCESS != code) { + QW_SCH_TASK_ELOG("put drop task msg to queue failed, vgId:%d, code:%s", mgmt->nodeId, tstrerror(code)); + rpcFreeCont(req); + QW_ERR_RET(code); + } + + QW_SCH_TASK_DLOG("drop task msg put to queue, vgId:%d", mgmt->nodeId); + + return TSDB_CODE_SUCCESS; +} + + int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { SQueryContinueReq *req = (SQueryContinueReq *)rpcMallocCont(sizeof(SQueryContinueReq)); if (NULL == req) { @@ -167,6 +202,7 @@ int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { req->sId = sId; req->queryId = qId; req->taskId = tId; + req->execId = eId; SRpcMsg pNewMsg = { .msgType = TDMT_SCH_QUERY_CONTINUE, @@ -266,6 +302,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { msg->queryId = be64toh(msg->queryId); msg->taskId = be64toh(msg->taskId); msg->refId = be64toh(msg->refId); + msg->execId = ntohl(msg->execId); msg->phyLen = ntohl(msg->phyLen); msg->sqlLen = ntohl(msg->sqlLen); @@ -273,6 +310,7 @@ int32_t qWorkerPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; int64_t rId = msg->refId; + int32_t eId = msg->execId; SQWMsg qwMsg = {.msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info}; @@ -295,6 +333,7 @@ int32_t qWorkerAbortPreprocessQueryMsg(void *qWorkerMgmt, SRpcMsg *pMsg) { uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; int64_t rId = msg->refId; + int32_t eId = msg->execId; QW_SCH_TASK_DLOG("Abort prerocessQuery start, handle:%p", pMsg->info.handle); qwAbortPrerocessQuery(QW_FPARAMS()); @@ -324,6 +363,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; int64_t rId = msg->refId; + int32_t eId = msg->execId; SQWMsg qwMsg = {.node = node, .msg = msg->msg + msg->sqlLen, .msgLen = msg->phyLen, .connInfo = pMsg->info, .msgType = pMsg->msgType}; char * sql = strndup(msg->msg, msg->sqlLen); @@ -356,6 +396,7 @@ int32_t qWorkerProcessCQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, in uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; int64_t rId = 0; + int32_t eId = msg->execId; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info}; @@ -387,11 +428,13 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int msg->sId = be64toh(msg->sId); msg->queryId = be64toh(msg->queryId); msg->taskId = be64toh(msg->taskId); + msg->execId = ntohl(msg->execId); uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; int64_t rId = 0; + int32_t eId = msg->execId; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info}; @@ -476,11 +519,13 @@ int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int6 msg->queryId = be64toh(msg->queryId); msg->taskId = be64toh(msg->taskId); msg->refId = be64toh(msg->refId); + msg->execId = ntohl(msg->execId); uint64_t sId = msg->sId; uint64_t qId = msg->queryId; uint64_t tId = msg->taskId; int64_t rId = msg->refId; + int32_t eId = msg->execId; SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info}; @@ -553,6 +598,7 @@ int32_t qWorkerProcessDeleteMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, SR uint64_t qId = req.queryId; uint64_t tId = req.taskId; int64_t rId = 0; + int32_t eId = 0; SQWMsg qwMsg = {.node = node, .msg = req.msg, .msgLen = req.phyLen, .connInfo = pMsg->info}; QW_SCH_TASK_DLOG("processDelete start, node:%p, handle:%p, sql:%s", node, pMsg->info.handle, req.sql); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 0759cf360a..1fb0a34314 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -135,8 +135,8 @@ int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchS void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, SQWTaskStatus **task) { - char id[sizeof(qId) + sizeof(tId)] = {0}; - QW_SET_QTID(id, qId, tId); + char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, tId, eId); QW_LOCK(rwType, &sch->tasksLock); *task = taosHashGet(sch->tasksHash, id, sizeof(id)); @@ -151,8 +151,8 @@ int32_t qwAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch, S int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, int32_t status, SQWTaskStatus **task) { int32_t code = 0; - char id[sizeof(qId) + sizeof(tId)] = {0}; - QW_SET_QTID(id, qId, tId); + char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, tId, eId); SQWTaskStatus ntask = {0}; ntask.status = status; @@ -207,8 +207,8 @@ int32_t qwAddAcquireTaskStatus(QW_FPARAMS_DEF, int32_t rwType, SQWSchStatus *sch void qwReleaseTaskStatus(int32_t rwType, SQWSchStatus *sch) { QW_UNLOCK(rwType, &sch->tasksLock); } int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { - char id[sizeof(qId) + sizeof(tId)] = {0}; - QW_SET_QTID(id, qId, tId); + char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, tId, eId); *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); if (NULL == (*ctx)) { @@ -220,8 +220,8 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { } int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { - char id[sizeof(qId) + sizeof(tId)] = {0}; - QW_SET_QTID(id, qId, tId); + char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, tId, eId); *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); if (NULL == (*ctx)) { @@ -233,8 +233,8 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { } int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) { - char id[sizeof(qId) + sizeof(tId)] = {0}; - QW_SET_QTID(id, qId, tId); + char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, tId, eId); SQWTaskCtx nctx = {0}; @@ -314,8 +314,8 @@ void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { } int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { - char id[sizeof(qId) + sizeof(tId)] = {0}; - QW_SET_QTID(id, qId, tId); + char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, tId, eId); SQWTaskCtx octx; SQWTaskCtx *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); @@ -348,8 +348,8 @@ int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { SQWTaskStatus *task = NULL; int32_t code = 0; - char id[sizeof(qId) + sizeof(tId)] = {0}; - QW_SET_QTID(id, qId, tId); + char id[sizeof(qId) + sizeof(tId) + sizeof(eId)] = {0}; + QW_SET_QTID(id, qId, tId, eId); if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) { QW_TASK_WLOG_E("scheduler does not exist"); diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 8ab293d0ad..2b23f7a27f 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -168,7 +168,7 @@ int32_t qwGenerateSchHbRsp(SQWorker *mgmt, SQWSchStatus *sch, SQWHbInfo *hbInfo) // TODO GET EXECUTOR API TO GET MORE INFO - QW_GET_QTID(key, status.queryId, status.taskId); + QW_GET_QTID(key, status.queryId, status.taskId, status.execId); status.status = taskStatus->status; status.refId = taskStatus->refId; @@ -493,7 +493,9 @@ int32_t qwPrerocessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwRegisterQueryBrokenLinkArg(QW_FPARAMS(), &qwMsg->connInfo)); - QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); + QW_ERR_JRET(qwAddTaskCtx(QW_FPARAMS())); + + QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); ctx->ctrlConnInfo = qwMsg->connInfo; @@ -562,6 +564,33 @@ int32_t qwProcessQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg, int8_t taskType, int8_t ex QW_ERR_JRET(qwExecTask(QW_FPARAMS(), ctx, NULL)); } + + if (gQWDebug.tmp) { +#if 0 + SEpSet epSet = {0}; + epSet.inUse = 1; + epSet.numOfEps = 3; + strcpy(epSet.eps[0].fqdn, "localhost"); + epSet.eps[0].port = 7100; + strcpy(epSet.eps[1].fqdn, "localhost"); + epSet.eps[1].port = 7200; + strcpy(epSet.eps[2].fqdn, "localhost"); + epSet.eps[2].port = 7300; + + qwDbgBuildAndSendRedirectRsp(pMsg->msgType + 1, &pMsg->info, TSDB_CODE_RPC_REDIRECT, &epSet); + gQWDebug.tmp = false; + return TSDB_CODE_SUCCESS; +#else + if (TDMT_SCH_MERGE_QUERY == qwMsg->msgType) { + ctx->phase = QW_PHASE_POST_QUERY; + qwDbgBuildAndSendRedirectRsp(qwMsg->msgType + 1, &qwMsg->connInfo, TSDB_CODE_RPC_REDIRECT, NULL); + gQWDebug.tmp = false; + return TSDB_CODE_SUCCESS; + } +#endif + } + + _return: input.code = code; @@ -734,8 +763,6 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { SQWTaskCtx *ctx = NULL; bool locked = false; - // TODO : TASK ALREADY REMOVED AND A NEW DROP MSG RECEIVED - QW_ERR_JRET(qwAcquireTaskCtx(QW_FPARAMS(), &ctx)); QW_LOCK(QW_WRITE, &ctx->lock); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 8dfc703dd9..8595e41640 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -125,7 +125,7 @@ typedef struct SSchTaskCallbackParam { uint64_t queryId; int64_t refId; uint64_t taskId; - int32_t execIdx; + int32_t execId; void *pTrans; } SSchTaskCallbackParam; @@ -171,7 +171,7 @@ typedef struct SSchTask { uint64_t taskId; // task id SRWLatch lock; // task lock int32_t maxExecTimes; // task may exec times - int32_t execIdx; // task current execute try index + int32_t execId; // task current execute try index SSchLevel *level; // level SRWLatch planLock; // task update plan lock SSubplan *plan; // subplan @@ -243,9 +243,9 @@ extern SSchedulerMgmt schMgmt; #define SCH_LOG_TASK_START_TS(_task) \ do { \ int64_t us = taosGetTimestampUs(); \ - int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \ + int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \ (_task)->profile.execUseTime[idx] = us; \ - if (0 == (_task)->execIdx) { \ + if (0 == (_task)->execId) { \ (_task)->profile.startTs = us; \ } \ } while (0) @@ -253,7 +253,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_LOG_TASK_WAIT_TS(_task) \ do { \ int64_t us = taosGetTimestampUs(); \ - int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \ + int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \ (_task)->profile.waitTime += us - (_task)->profile.execUseTime[idx]; \ } while (0) @@ -261,12 +261,12 @@ extern SSchedulerMgmt schMgmt; #define SCH_LOG_TASK_END_TS(_task) \ do { \ int64_t us = taosGetTimestampUs(); \ - int32_t idx = (_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES; \ + int32_t idx = (_task)->execId % SCH_TASK_MAX_EXEC_TIMES; \ (_task)->profile.execUseTime[idx] = us - (_task)->profile.execUseTime[idx]; \ (_task)->profile.endTs = us; \ } while (0) -#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execIdx % SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec) +#define SCH_TASK_TIMEOUT(_task) ((taosGetTimestampUs() - (_task)->profile.execUseTime[(_task)->execId % SCH_TASK_MAX_EXEC_TIMES]) > (_task)->timeoutUsec) #define SCH_TASK_READY_FOR_LAUNCH(readyNum, task) ((readyNum) >= taosArrayGetSize((task)->children)) @@ -274,6 +274,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_UNLOCK_TASK(_task) SCH_UNLOCK(SCH_WRITE, &(_task)->lock) #define SCH_TASK_ID(_task) ((_task) ? (_task)->taskId : -1) +#define SCH_TASK_EID(_task) ((_task) ? (_task)->execId : -1) #define SCH_SET_TASK_LASTMSG_TYPE(_task, _type) do { if(_task) { atomic_store_32(&(_task)->lastMsgType, _type); } } while (0) #define SCH_GET_TASK_LASTMSG_TYPE(_task) ((_task) ? atomic_load_32(&(_task)->lastMsgType) : -1) @@ -318,13 +319,13 @@ extern SSchedulerMgmt schMgmt; #define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_TASK_ELOG(param, ...) \ - qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__) + qError("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) #define SCH_TASK_DLOG(param, ...) \ - qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__) + qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) #define SCH_TASK_DLOGL(param, ...) \ - qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__) + qDebugL("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) #define SCH_TASK_WLOG(param, ...) \ - qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " " param, pJob->queryId, SCH_TASK_ID(pTask), __VA_ARGS__) + qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_TASK_ID(pTask), SCH_TASK_EID(pTask),__VA_ARGS__) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { terrno = _code; } return _code; } while (0) @@ -366,7 +367,7 @@ void schProcessOnDataFetched(SSchJob *job); int32_t schGetTaskInJob(SSchJob *pJob, uint64_t taskId, SSchTask **pTask); void schFreeRpcCtxVal(const void *arg); int32_t schMakeBrokenLinkVal(SSchJob *pJob, SSchTask *pTask, SRpcBrokenlinkVal *brokenVal, bool isHb); -int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx); +int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId); int32_t schExecStaticExplainJob(SSchedulerReq *pReq, int64_t *job, bool sync); int32_t schExecJobImpl(SSchedulerReq *pReq, SSchJob *pJob, bool sync); int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus); @@ -378,7 +379,7 @@ int32_t schExecJob(SSchedulerReq *pReq, int64_t *pJob, SQueryResult *pRes); int32_t schAsyncExecJob(SSchedulerReq *pReq, int64_t *pJob); int32_t schFetchRows(SSchJob *pJob); int32_t schAsyncFetchRows(SSchJob *pJob); -int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx); +int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId); int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList); void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo); char* schGetOpStr(SCH_OP_TYPE type); diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 643594f4e0..26824738e9 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -28,7 +28,7 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { qDebug("sch release jobId:0x int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel) { pTask->plan = pPlan; pTask->level = pLevel; - pTask->execIdx = -1; + pTask->execId = -1; pTask->maxExecTimes = SCH_TASK_MAX_EXEC_TIMES; pTask->timeoutUsec = SCH_DEFAULT_TASK_TIMEOUT_USEC; SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); @@ -428,59 +428,59 @@ int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execIdx) { +int32_t schAppendTaskExecNode(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, int32_t execId) { SSchNodeInfo nodeInfo = {.addr = *addr, .handle = NULL}; - if (taosHashPut(pTask->execNodes, &execIdx, sizeof(execIdx), &nodeInfo, sizeof(nodeInfo))) { + if (taosHashPut(pTask->execNodes, &execId, sizeof(execId), &nodeInfo, sizeof(nodeInfo))) { SCH_TASK_ELOG("taosHashPut nodeInfo to execNodes failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("task execNode added, execIdx:%d", execIdx); + SCH_TASK_DLOG("task execNode added, execId:%d", execId); return TSDB_CODE_SUCCESS; } -int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) { +int32_t schDropTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) { if (NULL == pTask->execNodes) { return TSDB_CODE_SUCCESS; } - if (taosHashRemove(pTask->execNodes, &execIdx, sizeof(execIdx))) { - SCH_TASK_ELOG("fail to remove execIdx %d from execNodeList", execIdx); + if (taosHashRemove(pTask->execNodes, &execId, sizeof(execId))) { + SCH_TASK_ELOG("fail to remove execId %d from execNodeList", execId); } else { - SCH_TASK_DLOG("execIdx %d removed from execNodeList", execIdx); + SCH_TASK_DLOG("execId %d removed from execNodeList", execId); } - if (execIdx != pTask->execIdx) { // ignore it - SCH_TASK_DLOG("execIdx %d is not current execIdx %d", execIdx, pTask->execIdx); + if (execId != pTask->execId) { // ignore it + SCH_TASK_DLOG("execId %d is not current execId %d", execId, pTask->execId); SCH_RET(TSDB_CODE_SCH_IGNORE_ERROR); } return TSDB_CODE_SUCCESS; } -int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execIdx) { +int32_t schUpdateTaskExecNode(SSchJob *pJob, SSchTask *pTask, void *handle, int32_t execId) { if (taosHashGetSize(pTask->execNodes) <= 0) { return TSDB_CODE_SUCCESS; } - SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execIdx, sizeof(execIdx)); + SSchNodeInfo *nodeInfo = taosHashGet(pTask->execNodes, &execId, sizeof(execId)); nodeInfo->handle = handle; - SCH_TASK_DLOG("handle updated to %p for execIdx %d", handle, execIdx); + SCH_TASK_DLOG("handle updated to %p for execId %d", handle, execId); return TSDB_CODE_SUCCESS; } -int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execIdx) { +int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId) { if (dropExecNode) { - SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execIdx)); + SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId)); } SCH_SET_TASK_HANDLE(pTask, handle); - schUpdateTaskExecNode(pJob, pTask, handle, execIdx); + schUpdateTaskExecNode(pJob, pTask, handle, execId); return TSDB_CODE_SUCCESS; } @@ -828,9 +828,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo } } - if ((pTask->execIdx + 1) >= pTask->maxExecTimes) { + if ((pTask->execId + 1) >= pTask->maxExecTimes) { *needRetry = false; - SCH_TASK_DLOG("task no more retry since reach max try times, execIdx:%d", pTask->execIdx); + SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId); return TSDB_CODE_SUCCESS; } @@ -841,9 +841,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo } if (SCH_IS_DATA_SRC_TASK(pTask)) { - if ((pTask->execIdx + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { + if ((pTask->execId + 1) >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { *needRetry = false; - SCH_TASK_DLOG("task no more retry since all ep tried, execIdx:%d, epNum:%d", pTask->execIdx, + SCH_TASK_DLOG("task no more retry since all ep tried, execId:%d, epNum:%d", pTask->execId, SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)); return TSDB_CODE_SUCCESS; } @@ -859,7 +859,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo } *needRetry = true; - SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execIdx + 1, errCode, tstrerror(errCode)); + SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->execId + 1, errCode, tstrerror(errCode)); return TSDB_CODE_SUCCESS; } @@ -1171,6 +1171,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { SDownstreamSourceNode source = {.type = QUERY_NODE_DOWNSTREAM_SOURCE, .taskId = pTask->taskId, .schedId = schMgmt.sId, + .execId = pTask->execId, .addr = pTask->succeedAddr}; qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source); SCH_UNLOCK(SCH_WRITE, &parent->lock); @@ -1256,7 +1257,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { SCH_LOCK_TASK(pTask); if (SCH_TASK_TIMEOUT(pTask) && JOB_TASK_STATUS_EXECUTING == pTask->status && pJob->fetchTask != pTask && taosArrayGetSize(pTask->candidateAddrs) > 1) { - SCH_TASK_DLOG("task execIdx %d will be rescheduled now", pTask->execIdx); + SCH_TASK_DLOG("task execId %d will be rescheduled now", pTask->execId); schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); schProcessOnTaskFailure(pJob, pTask, TSDB_CODE_SCH_TIMEOUT_ERROR); @@ -1283,7 +1284,7 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) { continue; } - SCH_JOB_DLOG("TID:0x%" PRIx64 " task status in server: %s", taskStatus->taskId, jobTaskStatusStr(taskStatus->status)); + SCH_JOB_DLOG("TID:0x%" PRIx64 "EID:%d task status in server: %s", taskStatus->taskId, taskStatus->execId, jobTaskStatusStr(taskStatus->status)); pTask = NULL; schGetTaskInJob(pJob, taskStatus->taskId, &pTask); @@ -1292,6 +1293,13 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList) { schReleaseJob(taskStatus->refId); continue; } + + if (taskStatus->execId != pTask->execId) { + // TODO DROP TASK FROM SERVER!!!! + SCH_TASK_DLOG("EID %d in hb rsp mis-match", taskStatus->execId); + schReleaseJob(taskStatus->refId); + continue; + } if (taskStatus->status == JOB_TASK_STATUS_FAILED) { // RECORD AND HANDLE ERROR!!!! @@ -1362,9 +1370,9 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { int32_t code = 0; atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); - pTask->execIdx++; + pTask->execId++; - SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execIdx); + SCH_TASK_DLOG("start to launch task's %dth exec", pTask->execId); SCH_LOG_TASK_START_TS(pTask); @@ -1677,10 +1685,14 @@ _return: int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) { int32_t code = 0; + int8_t status = 0; + if (schJobNeedToStop(pJob, &status)) { + SCH_TASK_ELOG("redirect will no continue cause of job status %s", jobTaskStatusStr(status)); + SCH_RET(atomic_load_32(&pJob->errCode)); + } - if ((pTask->execIdx + 1) >= pTask->maxExecTimes) { - SCH_TASK_DLOG("task no more retry since reach max try times, execIdx:%d", pTask->execIdx); - SCH_UNLOCK_TASK(pTask); + if ((pTask->execId + 1) >= pTask->maxExecTimes) { + SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId); schProcessOnJobFailure(pJob, rspCode); return TSDB_CODE_SUCCESS; } @@ -1703,34 +1715,36 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, int32_t rspCode) { SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask)); } } - } else { - pTask->childReady = 0; + + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); - int32_t childrenNum = taosArrayGetSize(pTask->children); - for (int32_t i = 0; i < childrenNum; ++i) { - SSchTask* pChild = taosArrayGetP(pTask->children, i); - SCH_LOCK_TASK(pChild); - code = schDoTaskRedirect(pJob, pChild, rspCode); - SCH_UNLOCK_TASK(pChild); - SCH_ERR_JRET(code); - } - - qClearSubplanExecutionNode(pTask->plan); + SCH_ERR_JRET(schLaunchTask(pJob, pTask)); + + return TSDB_CODE_SUCCESS; } + + // merge plan + + pTask->childReady = 0; + + qClearSubplanExecutionNode(pTask->plan); + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); - SCH_ERR_JRET(schLaunchTask(pJob, pTask)); - - SCH_UNLOCK_TASK(pTask); + int32_t childrenNum = taosArrayGetSize(pTask->children); + for (int32_t i = 0; i < childrenNum; ++i) { + SSchTask* pChild = taosArrayGetP(pTask->children, i); + SCH_LOCK_TASK(pChild); + schDoTaskRedirect(pJob, pChild, rspCode); + SCH_UNLOCK_TASK(pChild); + } return TSDB_CODE_SUCCESS; _return: code = schProcessOnTaskFailure(pJob, pTask, code); - - SCH_UNLOCK_TASK(pTask); SCH_RET(code); } @@ -1747,7 +1761,7 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); } - schDoTaskRedirect(pJob, pTask, rspCode); + SCH_RET(schDoTaskRedirect(pJob, pTask, rspCode)); _return: diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 3688cb0240..3d547ffbf8 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -379,13 +379,13 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { SCH_TASK_DLOG("rsp msg received, type:%s, handle:%p, code:%s", TMSG_INFO(msgType), pMsg->handle, tstrerror(rspCode)); - if (pParam->execIdx != pTask->execIdx) { - SCH_TASK_DLOG("execIdx %d mis-match current execIdx %d", pParam->execIdx, pTask->execIdx); + if (pParam->execId != pTask->execId) { + SCH_TASK_DLOG("execId %d mis-match current execId %d", pParam->execId, pTask->execId); goto _return; } bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode)); - SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execIdx)); + SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, pParam->execId)); int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { @@ -401,7 +401,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { goto _return; } - SCH_ERR_JRET(schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode)); + code = schHandleResponseMsg(pJob, pTask, msgType, pMsg->pData, pMsg->len, rspCode); pMsg->pData = NULL; _return: @@ -458,7 +458,7 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bo param->refId = pJob->refId; param->taskId = SCH_TASK_ID(pTask); param->pTrans = pJob->conn.pTrans; - param->execIdx = pTask->execIdx; + param->execId = pTask->execId; *pParam = param; return TSDB_CODE_SUCCESS; @@ -1015,6 +1015,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); pMsg->refId = htobe64(pJob->refId); + pMsg->execId = htonl(pTask->execId); pMsg->taskType = TASK_TYPE_TEMP; pMsg->explain = SCH_IS_EXPLAIN_JOB(pJob); pMsg->phyLen = htonl(pTask->msgLen); @@ -1041,6 +1042,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); + pMsg->execId = htonl(pTask->execId); break; } @@ -1060,6 +1062,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, pMsg->queryId = htobe64(pJob->queryId); pMsg->taskId = htobe64(pTask->taskId); pMsg->refId = htobe64(pJob->refId); + pMsg->execId = htobe64(pTask->execId); break; } case TDMT_SCH_QUERY_HEARTBEAT: { @@ -1102,7 +1105,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, (rpcCtx.args ? &rpcCtx : NULL))); if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY) { - SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execIdx)); + SCH_ERR_RET(schAppendTaskExecNode(pJob, pTask, addr, pTask->execId)); } return TSDB_CODE_SUCCESS;