diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bdf333b635..041b22c76e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2737,7 +2737,7 @@ int32_t tDeserializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq); typedef struct { SMsgHead header; - uint64_t sId; + uint64_t clientId; } SSchTasksStatusReq; typedef struct { @@ -2767,7 +2767,7 @@ typedef struct SQueryNodeEpId { typedef struct { SMsgHead header; - uint64_t sId; + uint64_t clientId; SQueryNodeEpId epId; SArray* taskAction; // SArray } SSchedulerHbReq; diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 48852e5552..2d0a461875 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -626,7 +626,7 @@ typedef struct SDownstreamSourceNode { SQueryNodeAddr addr; uint64_t clientId; uint64_t taskId; - uint64_t schedId; + uint64_t sId; int32_t execId; int32_t fetchMsgType; bool localExec; diff --git a/include/libs/qworker/qworker.h b/include/libs/qworker/qworker.h index 46ab87a408..6e73e94b01 100644 --- a/include/libs/qworker/qworker.h +++ b/include/libs/qworker/qworker.h @@ -113,7 +113,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64 int32_t qWorkerDbgEnableDebug(char *option); -void qWorkerRetireJob(uint64_t jobId, int32_t errCode); +void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode); void qWorkerRetireJobs(int64_t retireSize, int32_t errCode); diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index c47fa29a4d..9b78f9de37 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -33,6 +33,7 @@ typedef enum MemPoolEvictPolicy { typedef struct SMemPoolJob { uint64_t jobId; + uint64_t clientId; int64_t allocMemSize; int64_t maxAllocMemSize; } SMemPoolJob; @@ -79,7 +80,7 @@ typedef void (*mpDecConcSessionNum)(void); typedef void (*mpIncConcSessionNum)(void); typedef void (*mpSetConcSessionNum)(int32_t); typedef void (*mpReserveFailFp)(int64_t, int32_t); -typedef void (*mpReserveReachFp)(uint64_t, int32_t); +typedef void (*mpReserveReachFp)(uint64_t, uint64_t, int32_t); typedef void (*mpCfgUpdate)(void*, void*); typedef struct SMemPoolCallBack { @@ -122,9 +123,9 @@ void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignme void taosMemPoolClose(void* poolHandle); void taosMemPoolModDestroy(void); void taosAutoMemoryFree(void *ptr); -int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob); -void taosMemPoolDestroySession(void* poolHandle, void* session); -int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob); +int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, char *sessionId); +void taosMemPoolDestroySession(void* poolHandle, void* session, int32_t* remainSessions); +int32_t taosMemPoolCallocJob(uint64_t jobId, uint64_t cId, void** ppJob); void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg); void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName); void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 89d54cf6ad..8e1612067c 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9276,7 +9276,7 @@ int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pR tEncoderInit(&encoder, buf, bufLen); TAOS_CHECK_EXIT(tStartEncode(&encoder)); - TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->sId)); + TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId)); TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->epId.nodeId)); TAOS_CHECK_EXIT(tEncodeU16(&encoder, pReq->epId.ep.port)); TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->epId.ep.fqdn)); @@ -9323,7 +9323,7 @@ int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq * tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen); TAOS_CHECK_EXIT(tStartDecode(&decoder)); - TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->sId)); + TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId)); TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->epId.nodeId)); TAOS_CHECK_EXIT(tDecodeU16(&decoder, &pReq->epId.ep.port)); TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->epId.ep.fqdn)); diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index 218baada4c..619cd45818 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -640,7 +640,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas if (pSource->localExec) { SDataBuf pBuf = {0}; - int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId, + int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId, pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData, pTaskInfo->localFetch.explainRes); code = loadRemoteDataCallback(pWrapper, &pBuf, code); @@ -649,7 +649,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas } else { SResFetchReq req = {0}; req.header.vgId = pSource->addr.nodeId; - req.sId = pSource->schedId; + req.sId = pSource->sId; req.clientId = pSource->clientId; req.taskId = pSource->taskId; req.queryId = pTaskInfo->id.queryId; diff --git a/source/libs/executor/test/queryPlanTests.cpp b/source/libs/executor/test/queryPlanTests.cpp index 6710435aba..940a5f4d35 100755 --- a/source/libs/executor/test/queryPlanTests.cpp +++ b/source/libs/executor/test/queryPlanTests.cpp @@ -1647,7 +1647,7 @@ SNode* qptMakeDownstreamSrcNode(SNode** ppNode) { pDs->addr.nodeId = qptCtx.param.vnode.vgId; memcpy(&pDs->addr.epSet, &qptCtx.param.vnode.epSet, sizeof(pDs->addr.epSet)); pDs->taskId = (QPT_CORRECT_HIGH_PROB() && qptCtx.buildCtx.pCurrTask) ? qptCtx.buildCtx.pCurrTask->id.taskId : taosRand(); - pDs->schedId = QPT_CORRECT_HIGH_PROB() ? qptCtx.param.schedulerId : taosRand(); + pDs->sId = QPT_CORRECT_HIGH_PROB() ? 0 : taosRand(); pDs->execId = taosRand(); pDs->fetchMsgType = QPT_CORRECT_HIGH_PROB() ? (QPT_RAND_BOOL_V ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) : taosRand(); pDs->localExec = QPT_RAND_BOOL_V; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index ba87912670..8c314a14b8 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -853,7 +853,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre COPY_OBJECT_FIELD(addr, sizeof(SQueryNodeAddr)); COPY_SCALAR_FIELD(clientId); COPY_SCALAR_FIELD(taskId); - COPY_SCALAR_FIELD(schedId); + COPY_SCALAR_FIELD(sId); COPY_SCALAR_FIELD(execId); COPY_SCALAR_FIELD(fetchMsgType); COPY_SCALAR_FIELD(localExec); diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index f7f858db78..01455d4109 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -5261,7 +5261,7 @@ static int32_t jsonToColumnDefNode(const SJson* pJson, void* pObj) { static const char* jkDownstreamSourceAddr = "Addr"; static const char* jkDownstreamSourceClientId = "ClientId"; static const char* jkDownstreamSourceTaskId = "TaskId"; -static const char* jkDownstreamSourceSchedId = "SchedId"; +static const char* jkDownstreamSourceSeriousId = "SeriousId"; static const char* jkDownstreamSourceExecId = "ExecId"; static const char* jkDownstreamSourceFetchMsgType = "FetchMsgType"; @@ -5276,7 +5276,7 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) { code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceTaskId, pNode->taskId); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceSchedId, pNode->schedId); + code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceSeriousId, pNode->sId); } if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceExecId, pNode->execId); @@ -5299,7 +5299,7 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) { code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceTaskId, &pNode->taskId); } if (TSDB_CODE_SUCCESS == code) { - code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceSchedId, &pNode->schedId); + code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceSeriousId, &pNode->sId); } if (TSDB_CODE_SUCCESS == code) { code = tjsonGetIntValue(pJson, jkDownstreamSourceExecId, &pNode->execId); diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index bf3ea66e47..0f9e118619 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -1761,7 +1761,7 @@ static int32_t downstreamSourceNodeInlineToMsg(const void* pObj, STlvEncoder* pE code = tlvEncodeValueU64(pEncoder, pNode->taskId); } if (TSDB_CODE_SUCCESS == code) { - code = tlvEncodeValueU64(pEncoder, pNode->schedId); + code = tlvEncodeValueU64(pEncoder, pNode->sId); } if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeValueI32(pEncoder, pNode->execId); @@ -1788,7 +1788,7 @@ static int32_t msgToDownstreamSourceNodeInlineToMsg(STlvDecoder* pDecoder, void* code = tlvDecodeValueU64(pDecoder, &pNode->taskId); } if (TSDB_CODE_SUCCESS == code) { - code = tlvDecodeValueU64(pDecoder, &pNode->schedId); + code = tlvDecodeValueU64(pDecoder, &pNode->sId); } if (TSDB_CODE_SUCCESS == code) { code = tlvDecodeValueI32(pDecoder, &pNode->execId); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 160e6bbcc8..4f92398b75 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -127,15 +127,27 @@ typedef struct SQWTaskStatus { int8_t status; } SQWTaskStatus; +typedef struct SQWSessionInfo { + void *mgmt; + uint64_t sId; + uint64_t qId; + uint64_t cId; + uint64_t tId; + int64_t rId; + int32_t eId; + void *sessionMp; +} SQWSessionInfo; typedef struct SQWJobInfo { int8_t retired; int32_t errCode; SMemPoolJob* memInfo; + + SRWLatch lock; + int8_t destroyed; SHashObj* pSessions; } SQWJobInfo; - typedef struct SQWTaskCtx { SRWLatch lock; int8_t phase; @@ -306,6 +318,19 @@ extern SQueryMgmt gQueryMgmt; #define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch) #define QW_QUERY_NOT_STARTED(ctx) (QW_GET_PHASE(ctx) == -1) +#define QW_SET_QCID(id, qId, cId) \ + do { \ + *(uint64_t *)(id) = (qId); \ + *(uint64_t *)((char *)(id) + sizeof(qId)) = (cId); \ + } while (0) + +#define QW_GET_QCID(id, qId, cId) \ + do { \ + (qId) = *(uint64_t *)(id); \ + (cId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \ + } while (0) + + #define QW_SET_QTID(id, qId, cId, tId, eId) \ do { \ *(uint64_t *)(id) = (qId); \ @@ -370,33 +395,33 @@ extern SQueryMgmt gQueryMgmt; } \ } while (0) -#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__) -#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__) +#define QW_SCH_ELOG(param, ...) qError("QW:%p clientId:%" PRIx64 " " param, mgmt, clientId, __VA_ARGS__) +#define QW_SCH_DLOG(param, ...) qDebug("QW:%p clientId:%" PRIx64 " " param, mgmt, clientId, __VA_ARGS__) #define QW_TASK_ELOG(param, ...) \ - qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__) + qError("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__) #define QW_TASK_WLOG(param, ...) \ - qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__) + qWarn("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__) #define QW_TASK_DLOG(param, ...) \ - qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__) + qDebug("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__) #define QW_TASK_DLOGL(param, ...) \ - qDebugL("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__) + qDebugL("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__) #define QW_TASK_ELOG_E(param) \ - qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId) + qError("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId) #define QW_TASK_WLOG_E(param) \ - qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId) + qWarn("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId) #define QW_TASK_DLOG_E(param) \ - qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId) + qDebug("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId) #define QW_SCH_TASK_ELOG(param, ...) \ - qError("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \ + qError("QW:%p SID:%" PRId64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \ qId, cId, tId, eId, __VA_ARGS__) #define QW_SCH_TASK_WLOG(param, ...) \ - qWarn("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, \ + qWarn("QW:%p SID:%" PRId64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, \ cId, tId, eId, __VA_ARGS__) #define QW_SCH_TASK_DLOG(param, ...) \ - qDebug("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \ + qDebug("QW:%p SID:%" PRId64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \ qId, cId, tId, eId, __VA_ARGS__) #define QW_LOCK_DEBUG(...) \ @@ -475,7 +500,7 @@ static FORCE_INLINE int32_t qwRelease(int64_t refId) { return taosReleaseRef(gQw char *qwPhaseStr(int32_t phase); char *qwBufStatusStr(int32_t bufStatus); -int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); +int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch); void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt); int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status); int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx); @@ -491,8 +516,8 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam); int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type); int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type); void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch); -int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch); -void qwFreeTaskCtx(SQWTaskCtx *ctx); +int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch); +void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx); int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx); void qwDbgDumpMgmtInfo(SQWorker *mgmt); @@ -505,8 +530,9 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx); int32_t qwInitQueryPool(void); void qwDestroyJobInfo(SQWJobInfo* pJob); +void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx); void qwRetireJob(SQWJobInfo* pJob); - +void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session); int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession); #ifdef __cplusplus diff --git a/source/libs/qworker/inc/qwMsg.h b/source/libs/qworker/inc/qwMsg.h index 6d57a3df46..d640bae822 100644 --- a/source/libs/qworker/inc/qwMsg.h +++ b/source/libs/qworker/inc/qwMsg.h @@ -47,7 +47,7 @@ void qwFreeFetchRsp(void *msg); int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp); int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code); int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); -int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn); +int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t clientId, SRpcHandleInfo *pConn); int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn); #ifdef __cplusplus diff --git a/source/libs/qworker/src/qwDbg.c b/source/libs/qworker/src/qwDbg.c index 897080df3e..95e0f0ddba 100644 --- a/source/libs/qworker/src/qwDbg.c +++ b/source/libs/qworker/src/qwDbg.c @@ -96,7 +96,7 @@ void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) { int32_t taskNum = taosHashGetSize(sch->tasksHash); QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum); - uint64_t qId, cId, tId; + uint64_t qId, cId, tId, sId = 0; int32_t eId; SQWTaskStatus *pTask = NULL; void *pIter = taosHashIterate(sch->tasksHash, NULL); @@ -118,19 +118,20 @@ void qwDbgDumpTasksInfo(SQWorker *mgmt) { int32_t i = 0; SQWTaskCtx *ctx = NULL; - uint64_t qId, cId, tId; + uint64_t qId, cId, tId, sId; int32_t eId; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { ctx = (SQWTaskCtx *)pIter; void *key = taosHashGetKey(pIter, NULL); QW_GET_QTID(key, qId, cId, tId, eId); + sId = ctx->sId; QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, queryMsgType:%d, " - "sId:%" PRId64 ", level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, " + "level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, " "rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbNum:%d, events:%d,%d,%d,%d,%d", ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->queryMsgType, - ctx->sId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue, + ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue, ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, (int32_t)taosArrayGetSize(ctx->tbInfo), ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY], ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]); diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index e508e63e67..59577f380b 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -34,14 +34,14 @@ void qwIncConcurrentTaskNumCb(void) { //TODO } -int32_t qwInitJobInfo(uint64_t qId, SQWJobInfo* pJob) { +int32_t qwInitJobInfo(QW_FPARAMS_DEF, SQWJobInfo* pJob) { pJob->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); if (NULL == pJob->pSessions) { - qError("fail to init session hash, code: 0x%x", terrno); + QW_TASK_ELOG("fail to init session hash, code: 0x%x", terrno); return terrno; } - int32_t code = taosMemPoolCallocJob(qId, (void**)&pJob->memInfo); + int32_t code = taosMemPoolCallocJob(qId, cId, (void**)&pJob->memInfo); if (TSDB_CODE_SUCCESS != code) { taosHashCleanup(pJob->pSessions); pJob->pSessions = NULL; @@ -72,24 +72,53 @@ int32_t qwInitJobHash(void) { } -int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) { +void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) { + char id[sizeof(tId) + sizeof(eId) + 1] = {0}; + QW_SET_TEID(id, tId, eId); + int32_t remainSessions = 0; + + (void)taosHashRemove(pJobInfo->pSessions, id, sizeof(id)); + + taosMemPoolDestroySession(gMemPoolHandle, session, &remainSessions); + + if (0 == remainSessions) { + QW_LOCK(QW_WRITE, &pJobInfo->lock); + if (0 == taosHashGetSize(pJobInfo->pSessions)) { + atomic_store_8(&pJobInfo->destroyed, 1); + qwDestroyJobInfo(pJobInfo); + QW_UNLOCK(QW_WRITE, &pJobInfo->lock); + + char id2[sizeof(qId) + sizeof(cId) + 1] = {0}; + QW_SET_QCID(id2, qId, cId); + (void)taosHashRemove(gQueryMgmt.pJobInfo, id2, sizeof(id2)); + QW_TASK_DLOG_E("the whole query job removed"); + } else { + QW_UNLOCK(QW_WRITE, &pJobInfo->lock); + } + } +} + +int32_t qwRetrieveJobInfo(QW_FPARAMS_DEF, SQWJobInfo** ppJob) { int32_t code = TSDB_CODE_SUCCESS; SQWJobInfo* pJob = NULL; + char id[sizeof(qId) + sizeof(cId) + 1] = {0}; if (NULL == gQueryMgmt.pJobInfo) { QW_ERR_RET(qwInitJobHash()); } + + QW_SET_QCID(id, qId, cId); while (true) { - pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, &qId, sizeof(qId)); + pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, id, sizeof(id)); if (NULL == pJob) { SQWJobInfo jobInfo = {0}; - code = qwInitJobInfo(qId, &jobInfo); + code = qwInitJobInfo(QW_FPARAMS(), &jobInfo); if (TSDB_CODE_SUCCESS != code) { return code; } - code = taosHashPut(gQueryMgmt.pJobInfo, &qId, sizeof(qId), &jobInfo, sizeof(jobInfo)); + code = taosHashPut(gQueryMgmt.pJobInfo, id, sizeof(id), &jobInfo, sizeof(jobInfo)); if (TSDB_CODE_SUCCESS != code) { qwDestroyJobInfo(&jobInfo); if (TSDB_CODE_DUP_KEY == code) { @@ -97,31 +126,68 @@ int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) { continue; } + QW_TASK_ELOG("fail to put job to job hash, error: %s", tstrerror(code)); return code; } - pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, &qId, sizeof(qId)); + pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, id, sizeof(id)); if (NULL == pJob) { - qError("QID:0x%" PRIx64 " not in joj hash, may be dropped", qId); + QW_TASK_ELOG_E("job not in job hash, may be dropped"); return TSDB_CODE_QRY_JOB_NOT_EXIST; } } + if (atomic_load_8(&pJob->destroyed)) { + continue; + } + break; } - ctx->pJobInfo = pJob; + *ppJob = pJob; - char id[sizeof(tId) + sizeof(eId)] = {0}; - QW_SET_TEID(id, tId, eId); + return code; +} - QW_ERR_JRET(taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo)); +int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) { + int32_t code = TSDB_CODE_SUCCESS; + SQWJobInfo* pJob = NULL; + SQWSessionInfo session = {.mgmt = mgmt, + .sId = sId, + .qId = qId, + .cId = cId, + .tId = tId, + .rId = rId, + .eId = eId + }; - code = taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES); - if (TSDB_CODE_SUCCESS != code) { - qError("fail to put session into query session hash, code: 0x%x", code); - QW_ERR_JRET(code); - } + do { + QW_ERR_JRET(qwRetrieveJobInfo(QW_FPARAMS(), &pJob)); + + ctx->pJobInfo = pJob; + + char id[sizeof(tId) + sizeof(eId) + 1] = {0}; + QW_SET_TEID(id, tId, eId); + + QW_ERR_JRET(taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo, id)); + session.sessionMp = *ppSession; + + QW_LOCK(QW_READ, &pJob->lock); + if (atomic_load_8(&pJob->destroyed)) { + QW_UNLOCK(QW_READ, &pJob->lock); + continue; + } + + code = taosHashPut(pJob->pSessions, id, sizeof(id), &session, sizeof(session)); + if (TSDB_CODE_SUCCESS != code) { + QW_UNLOCK(QW_READ, &pJob->lock); + QW_TASK_ELOG("fail to put session into query session hash, code: 0x%x", code); + QW_ERR_JRET(code); + } + QW_UNLOCK(QW_READ, &pJob->lock); + + break; + } while (true); _return: diff --git a/source/libs/qworker/src/qwMsg.c b/source/libs/qworker/src/qwMsg.c index 7dbad90cc0..69965cd78c 100644 --- a/source/libs/qworker/src/qwMsg.c +++ b/source/libs/qworker/src/qwMsg.c @@ -350,10 +350,10 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) { return TSDB_CODE_SUCCESS; } -int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn) { +int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t clientId, SRpcHandleInfo *pConn) { SSchedulerHbReq req = {0}; req.header.vgId = mgmt->nodeId; - req.sId = sId; + req.clientId = clientId; int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req); if (msgSize < 0) { @@ -720,6 +720,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ int32_t code = 0; SSchedulerHbReq req = {0}; SQWorker *mgmt = (SQWorker *)qWorkerMgmt; + uint64_t clientId = 0; QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE)); QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1); @@ -735,7 +736,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_ QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - uint64_t sId = req.sId; + clientId = req.clientId; + SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info}; if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) { QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code)); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 93af03af62..99a0c3a3d3 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -70,7 +70,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status, bool return TSDB_CODE_SUCCESS; } -int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) { +int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t clientId, int32_t rwType) { SQWSchStatus newSch = {0}; newSch.tasksHash = taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); @@ -82,7 +82,7 @@ int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) { } QW_LOCK(QW_WRITE, &mgmt->schLock); - int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch)); + int32_t code = taosHashPut(mgmt->schHash, &clientId, sizeof(clientId), &newSch, sizeof(newSch)); if (0 != code) { if (!HASH_NODE_EXIST(code)) { QW_UNLOCK(QW_WRITE, &mgmt->schLock); @@ -99,15 +99,15 @@ int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) { return TSDB_CODE_SUCCESS; } -int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) { +int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) { while (true) { QW_LOCK(rwType, &mgmt->schLock); - *sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId)); + *sch = taosHashGet(mgmt->schHash, &clientId, sizeof(clientId)); if (NULL == (*sch)) { QW_UNLOCK(rwType, &mgmt->schLock); if (QW_NOT_EXIST_ADD == nOpt) { - QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType)); + QW_ERR_RET(qwAddSchedulerImpl(mgmt, clientId, rwType)); nOpt = QW_NOT_EXIST_RET_ERR; @@ -126,12 +126,12 @@ int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQW return TSDB_CODE_SUCCESS; } -int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { - return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD); +int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch) { + return qwAcquireSchedulerImpl(mgmt, clientId, rwType, sch, QW_NOT_EXIST_ADD); } -int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) { - return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR); +int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch) { + return qwAcquireSchedulerImpl(mgmt, clientId, rwType, sch, QW_NOT_EXIST_RET_ERR); } void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); } @@ -190,7 +190,7 @@ int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, i int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) { SQWSchStatus *tsch = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch)); + QW_ERR_RET(qwAcquireAddScheduler(mgmt, cId, QW_READ, &tsch)); QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL)); @@ -298,7 +298,7 @@ int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) { QW_RET(code); } -void qwFreeTaskCtx(SQWTaskCtx *ctx) { +void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { if (ctx->ctrlConnInfo.handle) { tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER); } @@ -322,7 +322,7 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) { taosArrayDestroy(ctx->tbInfo); if (gMemPoolHandle && ctx->memPoolSession) { - taosMemPoolDestroySession(gMemPoolHandle, ctx->memPoolSession); + qwDestroySession(QW_FPARAMS(), ctx->pJobInfo, ctx->memPoolSession); } } @@ -407,7 +407,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) { QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt)); } - qwFreeTaskCtx(&octx); + qwFreeTaskCtx(QW_FPARAMS(), &octx); ctx->tbInfo = NULL; QW_TASK_DLOG_E("task ctx dropped"); @@ -423,7 +423,7 @@ int32_t qwDropTaskStatus(QW_FPARAMS_DEF) { char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0}; QW_SET_QTID(id, qId, cId, tId, eId); - if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) { + if (qwAcquireScheduler(mgmt, cId, QW_WRITE, &sch)) { QW_TASK_WLOG_E("scheduler does not exist"); return TSDB_CODE_SUCCESS; } @@ -457,7 +457,7 @@ int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask) { SQWTaskStatus *task = NULL; int32_t code = 0; - QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch)); + QW_ERR_RET(qwAcquireScheduler(mgmt, cId, QW_READ, &sch)); QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task)); QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status, dynamicTask)); @@ -597,16 +597,18 @@ void qwDestroyImpl(void *pMgmt) { mgmt->hbTimer = NULL; taosTmrCleanUp(mgmt->timer); - uint64_t qId, cId, tId; + uint64_t qId, cId, tId, sId; int32_t eId; + int64_t rId = 0; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { SQWTaskCtx *ctx = (SQWTaskCtx *)pIter; void *key = taosHashGetKey(pIter, NULL); QW_GET_QTID(key, qId, cId, tId, eId); - - qwFreeTaskCtx(ctx); + sId = ctx->sId; + + qwFreeTaskCtx(QW_FPARAMS(), ctx); QW_TASK_DLOG_E("task ctx freed"); pIter = taosHashIterate(mgmt->ctxHash, pIter); taskCount++; @@ -694,23 +696,23 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { int32_t code = TSDB_CODE_SUCCESS; int32_t num = taosArrayGetSize(pExpiredSch); for (int32_t i = 0; i < num; ++i) { - uint64_t *sId = taosArrayGet(pExpiredSch, i); + uint64_t *clientId = taosArrayGet(pExpiredSch, i); SQWSchStatus *pSch = NULL; - if (NULL == sId) { - qError("get the %dth sch failed, code:%x", i, terrno); + if (NULL == clientId) { + qError("get the %dth client failed, code:%x", i, terrno); break; } - code = qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch); + code = qwAcquireScheduler(mgmt, *clientId, QW_WRITE, &pSch); if (TSDB_CODE_SUCCESS != code) { - qError("acquire sch %" PRIx64 " failed, code:%x", *sId, code); + qError("acquire client %" PRIx64 " failed, code:%x", *clientId, code); continue; } if (taosHashGetSize(pSch->tasksHash) <= 0) { qwDestroySchStatus(pSch); - code = taosHashRemove(mgmt->schHash, sId, sizeof(*sId)); - qDebug("sch %" PRIx64 " destroy result code:%x", *sId, code); + code = taosHashRemove(mgmt->schHash, clientId, sizeof(*clientId)); + qDebug("client %" PRIx64 " destroy result code:%x", *clientId, code); } qwReleaseScheduler(QW_WRITE, mgmt); @@ -718,10 +720,77 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { } void qwDestroyJobInfo(SQWJobInfo* pJob) { - //TODO + if (NULL == pJob) { + return; + } + + taosMemoryFreeClear(pJob->memInfo); + taosHashCleanup(pJob->pSessions); + pJob->pSessions = NULL; +} + +void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { + int32_t code = TSDB_CODE_SUCCESS; + + QW_LOCK(QW_WRITE, &ctx->lock); + + QW_TASK_DLOG_E("start to force stop task"); + + if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { + QW_TASK_WLOG_E("task already dropping"); + QW_UNLOCK(QW_WRITE, &ctx->lock); + + return; + } + + if (QW_QUERY_RUNNING(ctx)) { + code = qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); + if (TSDB_CODE_SUCCESS != code) { + QW_TASK_ELOG("task running, async kill failed, error: %x", code); + } else { + QW_TASK_DLOG_E("task running, async killed"); + } + } else if (QW_FETCH_RUNNING(ctx)) { + QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); + QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); + QW_TASK_DLOG_E("task fetching, update drop received"); + } else { + code = qwDropTask(QW_FPARAMS()); + if (TSDB_CODE_SUCCESS != code) { + QW_TASK_ELOG("task drop failed, error: %x", code); + } else { + QW_TASK_DLOG_E("task dropped"); + } + } + + QW_UNLOCK(QW_WRITE, &ctx->lock); +} + +void qwRetireTask(QW_FPARAMS_DEF) { + SQWTaskCtx *ctx = NULL; + + int32_t code = qwAcquireTaskCtx(QW_FPARAMS(), &ctx); + if (TSDB_CODE_SUCCESS != code) { + return; + } + + qwStopTask(QW_FPARAMS(), ctx); + + qwReleaseTaskCtx(mgmt, ctx); } void qwRetireJob(SQWJobInfo* pJob) { - //TODO + if (NULL == pJob) { + return; + } + + void* pIter = taosHashIterate(pJob->pSessions, NULL); + while (pIter) { + SQWSessionInfo* pSession = (SQWSessionInfo*)pIter; + + qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId); + + pIter = taosHashIterate(pJob->pSessions, pIter); + } } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 3f67b337f3..a100a9afe8 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -26,7 +26,6 @@ void qwStopAllTasks(SQWorker *mgmt) { uint64_t qId, cId, tId, sId; int32_t eId; int64_t rId = 0; - int32_t code = TSDB_CODE_SUCCESS; void *pIter = taosHashIterate(mgmt->ctxHash, NULL); while (pIter) { @@ -34,41 +33,9 @@ void qwStopAllTasks(SQWorker *mgmt) { void *key = taosHashGetKey(pIter, NULL); QW_GET_QTID(key, qId, cId, tId, eId); - QW_LOCK(QW_WRITE, &ctx->lock); - sId = ctx->sId; - QW_TASK_DLOG_E("start to force stop task"); - - if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG_E("task already dropping"); - QW_UNLOCK(QW_WRITE, &ctx->lock); - - pIter = taosHashIterate(mgmt->ctxHash, pIter); - continue; - } - - if (QW_QUERY_RUNNING(ctx)) { - code = qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED); - if (TSDB_CODE_SUCCESS != code) { - QW_TASK_ELOG("task running, async kill failed, error: %x", code); - } else { - QW_TASK_DLOG_E("task running, async killed"); - } - } else if (QW_FETCH_RUNNING(ctx)) { - QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED); - QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); - QW_TASK_DLOG_E("task fetching, update drop received"); - } else { - code = qwDropTask(QW_FPARAMS()); - if (TSDB_CODE_SUCCESS != code) { - QW_TASK_ELOG("task drop failed, error: %x", code); - } else { - QW_TASK_DLOG_E("task dropped"); - } - } - - QW_UNLOCK(QW_WRITE, &ctx->lock); + qwStopTask(QW_FPARAMS(), ctx); pIter = taosHashIterate(mgmt->ctxHash, pIter); } @@ -79,7 +46,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re SSchedulerHbRsp rsp = {0}; SQWSchStatus *sch = NULL; - QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch)); + QW_ERR_RET(qwAcquireScheduler(mgmt, req->clientId, QW_READ, &sch)); QW_LOCK(QW_WRITE, &sch->hbConnLock); @@ -1167,8 +1134,8 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req)); } - QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch)); - QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo)); + QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->clientId, QW_READ, &sch)); + QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->clientId, &qwMsg->connInfo)); sch->hbBrokenTs = 0; @@ -1184,7 +1151,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) { QW_UNLOCK(QW_WRITE, &sch->hbConnLock); - QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId, + QW_DLOG("hb connection updated, clientId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->clientId, req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle); qwReleaseScheduler(QW_READ, mgmt); @@ -1261,13 +1228,13 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) { while (pIter) { SQWSchStatus *sch1 = (SQWSchStatus *)pIter; if (NULL == sch1->hbConnInfo.handle) { - uint64_t *sId = taosHashGetKey(pIter, NULL); - QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId); + uint64_t *clientId = taosHashGetKey(pIter, NULL); + QW_TLOG("cancel send hb to client %" PRIx64 " cause of no connection handle", *clientId); if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) && taosHashGetSize(sch1->tasksHash) <= 0) { - if (NULL == taosArrayPush(pExpiredSch, sId)) { - QW_ELOG("add sId 0x%" PRIx64 " to expiredSch failed, code:%x", *sId, terrno); + if (NULL == taosArrayPush(pExpiredSch, clientId)) { + QW_ELOG("add clientId 0x%" PRIx64 " to expiredSch failed, code:%x", *clientId, terrno); taosHashCancelIterate(mgmt->schHash, pIter); break; } @@ -1356,7 +1323,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) { _return: - qwFreeTaskCtx(&ctx); + qwFreeTaskCtx(QW_FPARAMS(), &ctx); QW_RET(TSDB_CODE_SUCCESS); } @@ -1632,15 +1599,19 @@ _return: } -void qWorkerRetireJob(uint64_t jobId, int32_t errCode) { - SQWJobInfo* pJob = (SQWJobInfo*)taosHashGet(gQueryMgmt.pJobInfo, &jobId, sizeof(jobId)); +void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) { + char id[sizeof(jobId) + sizeof(clientId) + 1] = {0}; + QW_SET_QCID(id, jobId, clientId); + + SQWJobInfo* pJob = (SQWJobInfo*)taosHashGet(gQueryMgmt.pJobInfo, id, sizeof(id)); if (NULL == pJob) { - qError("QID:0x%" PRIx64 " fail to get job from job hash", jobId); + qError("QID:0x%" PRIx64 " CID:0x%" PRIx64 " fail to get job from job hash", jobId, clientId); return; } if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { - qInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); + qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, + jobId, clientId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); qwRetireJob(pJob); } else { @@ -1650,7 +1621,6 @@ void qWorkerRetireJob(uint64_t jobId, int32_t errCode) { void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) { SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); - uint64_t jobId = 0; int64_t retiredSize = 0; while (retiredSize < retireSize && NULL != pJob) { if (atomic_load_8(&pJob->retired)) { @@ -1660,14 +1630,13 @@ void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) { if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); - jobId = pJob->memInfo->jobId; qwRetireJob(pJob); retiredSize += aSize; - qDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64, - jobId, aSize, retireSize); + qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job retired in batch, usedSize:%" PRId64 ", retireSize:%" PRId64, + pJob->memInfo->jobId, pJob->memInfo->clientId, aSize, retireSize); } pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob); diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 6a910453f0..2da0e63149 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -144,7 +144,7 @@ typedef struct SSchedulerCfg { typedef struct SSchedulerMgmt { uint64_t clientId; // unique clientId uint64_t taskId; // sequential taksId - uint64_t sId; // schedulerId + uint64_t seriousId; // sequential seriousId SSchedulerCfg cfg; bool exit; int32_t jobRef; @@ -274,6 +274,7 @@ typedef struct { typedef struct SSchJob { int64_t refId; uint64_t queryId; + uint64_t seriousId; SSchJobAttr attr; int32_t levelNum; int32_t taskNum; @@ -405,8 +406,13 @@ extern SSchedulerMgmt schMgmt; (SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx)) #define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) (SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect) +#if 0 #define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \ (SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code)) +#else +#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \ + (SCH_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_NETWORK_ERR(_code))) +#endif #define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \ (SCH_REDIRECT_MSGTYPE(_msgType) && \ (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || \ @@ -450,23 +456,23 @@ extern SSchedulerMgmt schMgmt; (_task)->profile.endTs = us; \ } while (0) -#define SCH_JOB_ELOG(param, ...) qError("qid:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) -#define SCH_JOB_DLOG(param, ...) qDebug("qid:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) +#define SCH_JOB_ELOG(param, ...) qError("qid:0x%" PRIx64 ",SID:%" PRId64 " " param, pJob->queryId, pJob->seriousId, __VA_ARGS__) +#define SCH_JOB_DLOG(param, ...) qDebug("qid:0x%" PRIx64 ",SID:%" PRId64 " " param, pJob->queryId, pJob->seriousId, __VA_ARGS__) #define SCH_TASK_ELOG(param, ...) \ - qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \ + qError("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \ SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__) #define SCH_TASK_DLOG(param, ...) \ - qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \ + qDebug("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \ SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__) #define SCH_TASK_TLOG(param, ...) \ - qTrace("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \ + qTrace("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \ SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__) #define SCH_TASK_DLOGL(param, ...) \ - qDebugL("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \ + qDebugL("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \ SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__) #define SCH_TASK_WLOG(param, ...) \ - qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \ + qWarn("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \ SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__) #define SCH_SET_ERRNO(_err) \ diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 375a316185..9c3ce047a9 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -345,6 +345,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) { pJob->levelNum = levelNum; SCH_RESET_JOB_LEVEL_IDX(pJob); + atomic_add_fetch_64(&pJob->seriousId, 1); SSchLevel level = {0}; SNodeListNode *plans = NULL; @@ -1038,6 +1039,7 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) { } SCH_RESET_JOB_LEVEL_IDX(pJob); + atomic_add_fetch_64(&pJob->seriousId, 1); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index a2b0af8414..037f86429f 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -456,11 +456,17 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType)); int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1); +#if 0 if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) { SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode)); } else if (SCH_TASKSET_NEED_RETRY(pJob, pTask, reqType, rspCode)) { SCH_RET(schHandleTaskSetRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode)); } +#else + if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) { + SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode)); + } +#endif pTask->redirectCtx.inRedirect = false; @@ -1042,7 +1048,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) { int32_t msgType = TDMT_SCH_QUERY_HEARTBEAT; req.header.vgId = nodeEpId->nodeId; - req.sId = schMgmt.sId; + req.clientId = schMgmt.clientId; TAOS_MEMCPY(&req.epId, nodeEpId, sizeof(SQueryNodeEpId)); SCH_LOCK(SCH_READ, &schMgmt.hbLock); @@ -1137,7 +1143,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, case TDMT_VND_DELETE: { SVDeleteReq req = {0}; req.header.vgId = addr->nodeId; - req.sId = schMgmt.sId; + req.sId = pJob->seriousId; req.queryId = pJob->queryId; req.clientId = pTask->clientId; req.taskId = pTask->taskId; @@ -1171,7 +1177,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SSubQueryMsg qMsg; qMsg.header.vgId = addr->nodeId; qMsg.header.contLen = 0; - qMsg.sId = schMgmt.sId; + qMsg.sId = pJob->seriousId; qMsg.queryId = pJob->queryId; qMsg.clientId = pTask->clientId; qMsg.taskId = pTask->taskId; @@ -1227,7 +1233,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, case TDMT_SCH_MERGE_FETCH: { SResFetchReq req = {0}; req.header.vgId = addr->nodeId; - req.sId = schMgmt.sId; + req.sId = pJob->seriousId; req.queryId = pJob->queryId; req.clientId = pTask->clientId; req.taskId = pTask->taskId; @@ -1255,7 +1261,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, STaskDropReq qMsg; qMsg.header.vgId = addr->nodeId; qMsg.header.contLen = 0; - qMsg.sId = schMgmt.sId; + qMsg.sId = pJob->seriousId; qMsg.queryId = pJob->queryId; qMsg.clientId = pTask->clientId; qMsg.taskId = pTask->taskId; @@ -1285,7 +1291,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx)); SSchedulerHbReq req = {0}; - req.sId = schMgmt.sId; + req.clientId = schMgmt.clientId; req.header.vgId = addr->nodeId; req.epId.nodeId = addr->nodeId; TAOS_MEMCPY(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); @@ -1313,7 +1319,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, STaskNotifyReq qMsg; qMsg.header.vgId = addr->nodeId; qMsg.header.contLen = 0; - qMsg.sId = schMgmt.sId; + qMsg.sId = pJob->seriousId; qMsg.queryId = pJob->queryId; qMsg.clientId = pTask->clientId; qMsg.taskId = pTask->taskId; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9be0e3fc40..8494e60f70 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -308,7 +308,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { .type = QUERY_NODE_DOWNSTREAM_SOURCE, .clientId = pTask->clientId, .taskId = pTask->taskId, - .schedId = schMgmt.sId, + .sId = pJob->seriousId, .execId = pTask->execId, .addr = pTask->succeedAddr, .fetchMsgType = SCH_FETCH_TYPE(pTask), @@ -565,6 +565,7 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask)); SCH_RESET_JOB_LEVEL_IDX(pJob); + atomic_add_fetch_64(&pJob->seriousId, 1); code = schDoTaskRedirect(pJob, pTask, pData, rspCode); @@ -1150,7 +1151,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) { } } - SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->clientId, pTask->taskId, + SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, pJob->seriousId, pJob->queryId, pTask->clientId, pTask->taskId, pJob->refId, pTask->execId, &qwMsg, explainRes)); if (SCH_IS_EXPLAIN_JOB(pJob)) { @@ -1410,7 +1411,7 @@ int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) { } } - SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->clientId, pTask->taskId, + SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, pJob->seriousId, pJob->queryId, pTask->clientId, pTask->taskId, pJob->refId, pTask->execId, &pRsp, explainRes)); if (SCH_IS_EXPLAIN_JOB(pJob)) { diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index ac34099417..29f0df4974 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -276,7 +276,7 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) { SCH_UNLOCK(SCH_WRITE, &hb->lock); SCH_UNLOCK(SCH_READ, &schMgmt.hbLock); - qDebug("hb connection updated, sId:0x%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId, + qDebug("hb connection updated, nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle); return TSDB_CODE_SUCCESS; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index db9ecd6025..21aaa6cef5 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -56,12 +56,7 @@ int32_t schedulerInit() { SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } - if (taosGetSystemUUIDU64(&schMgmt.sId)) { - qError("generate schedulerId failed, errno:%d", errno); - SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR); - } - - qInfo("scheduler 0x%" PRIx64 " initialized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum); + qInfo("scheduler 0x%" PRIx64 " initialized, maxJob:%u", getClientId(), schMgmt.cfg.maxJobNum); return TSDB_CODE_SUCCESS; } diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 7498c4693a..8b27d397c9 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -317,7 +317,7 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) if (quota > 0 && cAllocSize / 1048576UL > quota) { code = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD; uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pJob->job.jobId, cAllocSize, quota); - pPool->cfg.cb.reachFp(pJob->job.jobId, code); + pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code); (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); MP_RET(code); } @@ -326,7 +326,7 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size) code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED; uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %dMB", pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, *pPool->cfg.reserveSize); - pPool->cfg.cb.reachFp(pJob->job.jobId, code); + pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code); (void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size); MP_RET(code); } @@ -1174,7 +1174,7 @@ void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) { (void)mpUpdateCfg(pPool); } -void taosMemPoolDestroySession(void* poolHandle, void* session) { +void taosMemPoolDestroySession(void* poolHandle, void* session, int32_t* remainSessions) { SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; if (NULL == poolHandle || NULL == pSession) { @@ -1182,7 +1182,9 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) { return; } - (void)atomic_sub_fetch_32(&pSession->pJob->remainSession, 1); + if (remainSessions) { + *remainSessions = atomic_sub_fetch_32(&pSession->pJob->remainSession, 1); + } //TODO; @@ -1191,19 +1193,25 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) { mpCheckStatDetail(pPool, pSession, "DestroySession"); mpDestroyPosStat(&pSession->stat.posStat); + taosMemFreeClear(pSession->sessionId); TAOS_MEMSET(pSession, 0, sizeof(*pSession)); mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession); } -int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) { +int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, char* sessionId) { int32_t code = TSDB_CODE_SUCCESS; SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = NULL; MP_ERR_JRET(mpPopIdleNode(pPool, &pPool->sessionCache, (void**)&pSession)); + pSession->sessionId = taosStrdup(sessionId); + if (NULL == pSession->sessionId) { + MP_ERR_JRET(terrno); + } + TAOS_MEMCPY(&pSession->ctrl, &pPool->ctrl, sizeof(pSession->ctrl)); if (gMPFps[gMPMgmt.strategy].initSessionFp) { @@ -1218,7 +1226,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) { _return: if (TSDB_CODE_SUCCESS != code) { - taosMemPoolDestroySession(poolHandle, pSession); + taosMemPoolDestroySession(poolHandle, pSession, NULL); pSession = NULL; (void)atomic_add_fetch_64(&pPool->stat.statSession.initFail, 1); } else { @@ -1503,7 +1511,7 @@ int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil return code; } -int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) { +int32_t taosMemPoolCallocJob(uint64_t jobId, uint64_t cId, void** ppJob) { int32_t code = TSDB_CODE_SUCCESS; *ppJob = taosMemoryCalloc(1, sizeof(SMPJob)); if (NULL == *ppJob) { @@ -1513,6 +1521,7 @@ int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) { SMPJob* pJob = (SMPJob*)*ppJob; pJob->job.jobId = jobId; + pJob->job.clientId = cId; return code; } diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index f3718702ee..a982253606 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -64,6 +64,12 @@ threadlocal void* mptThreadPoolSession = NULL; *(uint32_t *)((char *)(id) + sizeof(tId)) = (eId); \ } while (0) +#define MPT_SET_QCID(id, qId, cId) \ + do { \ + *(uint64_t *)(id) = (qId); \ + *(uint64_t *)((char *)(id) + sizeof(qId)) = (cId); \ + } while (0) + #define mptEnableMemoryPoolUsage(_pool, _session) do { mptThreadPoolHandle = _pool; mptThreadPoolSession = _session; } while (0) #define mptDisableMemoryPoolUsage() (mptThreadPoolHandle = NULL, mptThreadPoolSession = NULL) @@ -294,7 +300,7 @@ int32_t mptInitJobInfo(uint64_t qId, SMPTJobInfo* pJob) { return terrno; } - int32_t code = taosMemPoolCallocJob(qId, (void**)&pJob->memInfo); + int32_t code = taosMemPoolCallocJob(qId, 0, (void**)&pJob->memInfo); if (TSDB_CODE_SUCCESS != code) { taosHashCleanup(pJob->pSessions); pJob->pSessions = NULL; @@ -349,11 +355,11 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* p pJobCtx->pJob = pJob; pJob->pCtx = pJobCtx; - assert(0 == taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo)); - - char id[sizeof(tId) + sizeof(eId)] = {0}; + char id[sizeof(tId) + sizeof(eId) + 0] = {0}; MPT_SET_TEID(id, tId, eId); + assert(0 == taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo, id)); + assert(0 == taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES)); _return: @@ -411,7 +417,7 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[i].stat, sizeof(*pStat))); mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]); - taosMemPoolDestroySession(gMemPoolHandle, pJobCtx->pSessions[i]); + taosMemPoolDestroySession(gMemPoolHandle, pJobCtx->pSessions[i], NULL); } uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobCtx->pJob->errCode); @@ -455,7 +461,12 @@ void mptRetireJob(SMPTJobInfo* pJob) { mptCheckCompareJobInfo(pCtx); - mptResetJob(pCtx); + int32_t taskRunning = atomic_load_32(&pCtx->taskRunningNum); + if (0 == taskRunning) { + mptDestroyJob(pCtx, false); + } else { + uDebug("JOB:0x%x retired but will not destroy cause of task running, num:%d", pCtx->jobId, taskRunning); + } } int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) { @@ -516,17 +527,20 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) { } -void mptRetireJobCb(uint64_t jobId, int32_t errCode) { - SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &jobId, sizeof(jobId)); +void mptRetireJobCb(uint64_t jobId, uint64_t clientId, int32_t errCode) { + char id[sizeof(jobId) + sizeof(clientId) + 1] = {0}; + MPT_SET_QCID(id, jobId, clientId); + + SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, id, sizeof(id)); if (NULL == pJob) { - uError("QID:0x%" PRIx64 " fail to get job from job hash", jobId); + uError("QID:0x%" PRIx64 " CID:0x%" PRIx64 " fail to get job from job hash", jobId, clientId); return; } if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { - uInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); + uInfo("QID:0x%" PRIx64 " CID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, clientId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); } else { - uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); + uDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, clientId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); } } @@ -546,11 +560,13 @@ void mptWriteMem(void* pStart, int32_t size) { void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { int32_t actId = 0; bool actDone = false; - int32_t size = taosRand() % mptCtrl.maxSingleAllocSize; + int32_t size = 0; int32_t osize = 0, nsize = 0; while (!actDone) { actId = taosRand() % 10; + size = (taosRand() % 10) ? (taosRand() % (mptCtrl.maxSingleAllocSize / 100)) : (taosRand() % mptCtrl.maxSingleAllocSize); + switch (actId) { case 0: { // malloc if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) { @@ -925,8 +941,13 @@ void mptCheckPoolUsedSize(int32_t jobNum) { int64_t poolUsedSize = 0; taosMemPoolGetUsedSizeBegin(gMemPoolHandle, &usedSize, &needEnd); + for (int32_t i = 0; i < jobNum; ++i) { SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; + + while (taosRTryLockLatch(&pJobCtx->jobExecLock)) { + } + int64_t jobUsedSize = 0; for (int32_t m = 0; m < pJobCtx->taskNum; ++m) { SMPStatDetail* pStat = NULL; @@ -942,6 +963,8 @@ void mptCheckPoolUsedSize(int32_t jobNum) { assert(pJobCtx->pJob->memInfo->allocMemSize == jobUsedSize); + taosRUnLockLatch(&pJobCtx->jobExecLock); + poolUsedSize += jobUsedSize; }