fix: duplicated clientId issue

This commit is contained in:
dapan1121 2024-11-13 19:19:00 +08:00
parent 7cd98a1902
commit 34cfca745b
25 changed files with 360 additions and 184 deletions

View File

@ -2737,7 +2737,7 @@ int32_t tDeserializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq);
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t clientId;
} SSchTasksStatusReq;
typedef struct {
@ -2767,7 +2767,7 @@ typedef struct SQueryNodeEpId {
typedef struct {
SMsgHead header;
uint64_t sId;
uint64_t clientId;
SQueryNodeEpId epId;
SArray* taskAction; // SArray<STaskAction>
} SSchedulerHbReq;

View File

@ -626,7 +626,7 @@ typedef struct SDownstreamSourceNode {
SQueryNodeAddr addr;
uint64_t clientId;
uint64_t taskId;
uint64_t schedId;
uint64_t sId;
int32_t execId;
int32_t fetchMsgType;
bool localExec;

View File

@ -113,7 +113,7 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64
int32_t qWorkerDbgEnableDebug(char *option);
void qWorkerRetireJob(uint64_t jobId, int32_t errCode);
void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode);
void qWorkerRetireJobs(int64_t retireSize, int32_t errCode);

View File

@ -33,6 +33,7 @@ typedef enum MemPoolEvictPolicy {
typedef struct SMemPoolJob {
uint64_t jobId;
uint64_t clientId;
int64_t allocMemSize;
int64_t maxAllocMemSize;
} SMemPoolJob;
@ -79,7 +80,7 @@ typedef void (*mpDecConcSessionNum)(void);
typedef void (*mpIncConcSessionNum)(void);
typedef void (*mpSetConcSessionNum)(int32_t);
typedef void (*mpReserveFailFp)(int64_t, int32_t);
typedef void (*mpReserveReachFp)(uint64_t, int32_t);
typedef void (*mpReserveReachFp)(uint64_t, uint64_t, int32_t);
typedef void (*mpCfgUpdate)(void*, void*);
typedef struct SMemPoolCallBack {
@ -122,9 +123,9 @@ void *taosMemPoolMallocAlign(void* poolHandle, void* session, uint32_t alignme
void taosMemPoolClose(void* poolHandle);
void taosMemPoolModDestroy(void);
void taosAutoMemoryFree(void *ptr);
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob);
void taosMemPoolDestroySession(void* poolHandle, void* session);
int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob);
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, char *sessionId);
void taosMemPoolDestroySession(void* poolHandle, void* session, int32_t* remainSessions);
int32_t taosMemPoolCallocJob(uint64_t jobId, uint64_t cId, void** ppJob);
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg);
void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName);
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd);

View File

@ -9276,7 +9276,7 @@ int32_t tSerializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *pR
tEncoderInit(&encoder, buf, bufLen);
TAOS_CHECK_EXIT(tStartEncode(&encoder));
TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->sId));
TAOS_CHECK_EXIT(tEncodeU64(&encoder, pReq->clientId));
TAOS_CHECK_EXIT(tEncodeI32(&encoder, pReq->epId.nodeId));
TAOS_CHECK_EXIT(tEncodeU16(&encoder, pReq->epId.ep.port));
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->epId.ep.fqdn));
@ -9323,7 +9323,7 @@ int32_t tDeserializeSSchedulerHbReq(void *buf, int32_t bufLen, SSchedulerHbReq *
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
TAOS_CHECK_EXIT(tStartDecode(&decoder));
TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->sId));
TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId));
TAOS_CHECK_EXIT(tDecodeI32(&decoder, &pReq->epId.nodeId));
TAOS_CHECK_EXIT(tDecodeU16(&decoder, &pReq->epId.ep.port));
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->epId.ep.fqdn));

View File

@ -640,7 +640,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
if (pSource->localExec) {
SDataBuf pBuf = {0};
int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->schedId, pTaskInfo->id.queryId,
int32_t code = (*pTaskInfo->localFetch.fp)(pTaskInfo->localFetch.handle, pSource->sId, pTaskInfo->id.queryId,
pSource->clientId, pSource->taskId, 0, pSource->execId, &pBuf.pData,
pTaskInfo->localFetch.explainRes);
code = loadRemoteDataCallback(pWrapper, &pBuf, code);
@ -649,7 +649,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
} else {
SResFetchReq req = {0};
req.header.vgId = pSource->addr.nodeId;
req.sId = pSource->schedId;
req.sId = pSource->sId;
req.clientId = pSource->clientId;
req.taskId = pSource->taskId;
req.queryId = pTaskInfo->id.queryId;

View File

@ -1647,7 +1647,7 @@ SNode* qptMakeDownstreamSrcNode(SNode** ppNode) {
pDs->addr.nodeId = qptCtx.param.vnode.vgId;
memcpy(&pDs->addr.epSet, &qptCtx.param.vnode.epSet, sizeof(pDs->addr.epSet));
pDs->taskId = (QPT_CORRECT_HIGH_PROB() && qptCtx.buildCtx.pCurrTask) ? qptCtx.buildCtx.pCurrTask->id.taskId : taosRand();
pDs->schedId = QPT_CORRECT_HIGH_PROB() ? qptCtx.param.schedulerId : taosRand();
pDs->sId = QPT_CORRECT_HIGH_PROB() ? 0 : taosRand();
pDs->execId = taosRand();
pDs->fetchMsgType = QPT_CORRECT_HIGH_PROB() ? (QPT_RAND_BOOL_V ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH) : taosRand();
pDs->localExec = QPT_RAND_BOOL_V;

View File

@ -853,7 +853,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre
COPY_OBJECT_FIELD(addr, sizeof(SQueryNodeAddr));
COPY_SCALAR_FIELD(clientId);
COPY_SCALAR_FIELD(taskId);
COPY_SCALAR_FIELD(schedId);
COPY_SCALAR_FIELD(sId);
COPY_SCALAR_FIELD(execId);
COPY_SCALAR_FIELD(fetchMsgType);
COPY_SCALAR_FIELD(localExec);

View File

@ -5261,7 +5261,7 @@ static int32_t jsonToColumnDefNode(const SJson* pJson, void* pObj) {
static const char* jkDownstreamSourceAddr = "Addr";
static const char* jkDownstreamSourceClientId = "ClientId";
static const char* jkDownstreamSourceTaskId = "TaskId";
static const char* jkDownstreamSourceSchedId = "SchedId";
static const char* jkDownstreamSourceSeriousId = "SeriousId";
static const char* jkDownstreamSourceExecId = "ExecId";
static const char* jkDownstreamSourceFetchMsgType = "FetchMsgType";
@ -5276,7 +5276,7 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceTaskId, pNode->taskId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceSchedId, pNode->schedId);
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceSeriousId, pNode->sId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceExecId, pNode->execId);
@ -5299,7 +5299,7 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) {
code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceTaskId, &pNode->taskId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceSchedId, &pNode->schedId);
code = tjsonGetUBigIntValue(pJson, jkDownstreamSourceSeriousId, &pNode->sId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkDownstreamSourceExecId, &pNode->execId);

View File

@ -1761,7 +1761,7 @@ static int32_t downstreamSourceNodeInlineToMsg(const void* pObj, STlvEncoder* pE
code = tlvEncodeValueU64(pEncoder, pNode->taskId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueU64(pEncoder, pNode->schedId);
code = tlvEncodeValueU64(pEncoder, pNode->sId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI32(pEncoder, pNode->execId);
@ -1788,7 +1788,7 @@ static int32_t msgToDownstreamSourceNodeInlineToMsg(STlvDecoder* pDecoder, void*
code = tlvDecodeValueU64(pDecoder, &pNode->taskId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueU64(pDecoder, &pNode->schedId);
code = tlvDecodeValueU64(pDecoder, &pNode->sId);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI32(pDecoder, &pNode->execId);

View File

@ -127,15 +127,27 @@ typedef struct SQWTaskStatus {
int8_t status;
} SQWTaskStatus;
typedef struct SQWSessionInfo {
void *mgmt;
uint64_t sId;
uint64_t qId;
uint64_t cId;
uint64_t tId;
int64_t rId;
int32_t eId;
void *sessionMp;
} SQWSessionInfo;
typedef struct SQWJobInfo {
int8_t retired;
int32_t errCode;
SMemPoolJob* memInfo;
SRWLatch lock;
int8_t destroyed;
SHashObj* pSessions;
} SQWJobInfo;
typedef struct SQWTaskCtx {
SRWLatch lock;
int8_t phase;
@ -306,6 +318,19 @@ extern SQueryMgmt gQueryMgmt;
#define QW_FETCH_RUNNING(ctx) ((ctx)->inFetch)
#define QW_QUERY_NOT_STARTED(ctx) (QW_GET_PHASE(ctx) == -1)
#define QW_SET_QCID(id, qId, cId) \
do { \
*(uint64_t *)(id) = (qId); \
*(uint64_t *)((char *)(id) + sizeof(qId)) = (cId); \
} while (0)
#define QW_GET_QCID(id, qId, cId) \
do { \
(qId) = *(uint64_t *)(id); \
(cId) = *(uint64_t *)((char *)(id) + sizeof(qId)); \
} while (0)
#define QW_SET_QTID(id, qId, cId, tId, eId) \
do { \
*(uint64_t *)(id) = (qId); \
@ -370,33 +395,33 @@ extern SQueryMgmt gQueryMgmt;
} \
} while (0)
#define QW_SCH_ELOG(param, ...) qError("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p SID:%" PRIx64 " " param, mgmt, sId, __VA_ARGS__)
#define QW_SCH_ELOG(param, ...) qError("QW:%p clientId:%" PRIx64 " " param, mgmt, clientId, __VA_ARGS__)
#define QW_SCH_DLOG(param, ...) qDebug("QW:%p clientId:%" PRIx64 " " param, mgmt, clientId, __VA_ARGS__)
#define QW_TASK_ELOG(param, ...) \
qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
qError("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_WLOG(param, ...) \
qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
qWarn("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_DLOG(param, ...) \
qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
qDebug("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_DLOGL(param, ...) \
qDebugL("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId, __VA_ARGS__)
qDebugL("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId, __VA_ARGS__)
#define QW_TASK_ELOG_E(param) \
qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
qError("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId)
#define QW_TASK_WLOG_E(param) \
qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
qWarn("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId)
#define QW_TASK_DLOG_E(param) \
qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, cId, tId, eId)
qDebug("qid:0x%" PRIx64 "SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, qId, sId, cId, tId, eId)
#define QW_SCH_TASK_ELOG(param, ...) \
qError("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
qError("QW:%p SID:%" PRId64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
qId, cId, tId, eId, __VA_ARGS__)
#define QW_SCH_TASK_WLOG(param, ...) \
qWarn("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, \
qWarn("QW:%p SID:%" PRId64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, qId, \
cId, tId, eId, __VA_ARGS__)
#define QW_SCH_TASK_DLOG(param, ...) \
qDebug("QW:%p SID:0x%" PRIx64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
qDebug("QW:%p SID:%" PRId64 ",qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, mgmt, sId, \
qId, cId, tId, eId, __VA_ARGS__)
#define QW_LOCK_DEBUG(...) \
@ -475,7 +500,7 @@ static FORCE_INLINE int32_t qwRelease(int64_t refId) { return taosReleaseRef(gQw
char *qwPhaseStr(int32_t phase);
char *qwBufStatusStr(int32_t bufStatus);
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch);
void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt);
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status);
int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx);
@ -491,8 +516,8 @@ void qwSetHbParam(int64_t refId, SQWHbParam **pParam);
int32_t qwUpdateTimeInQueue(SQWorker *mgmt, int64_t ts, EQueueType type);
int64_t qwGetTimeInQueue(SQWorker *mgmt, EQueueType type);
void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch);
void qwFreeTaskCtx(SQWTaskCtx *ctx);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch);
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void qwDbgDumpMgmtInfo(SQWorker *mgmt);
@ -505,8 +530,9 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped);
int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
int32_t qwInitQueryPool(void);
void qwDestroyJobInfo(SQWJobInfo* pJob);
void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx);
void qwRetireJob(SQWJobInfo* pJob);
void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session);
int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession);
#ifdef __cplusplus

View File

@ -47,7 +47,7 @@ void qwFreeFetchRsp(void *msg);
int32_t qwMallocFetchRsp(int8_t rpcMalloc, int32_t length, SRetrieveTableRsp **rsp);
int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *rsp, int32_t code);
int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn);
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t clientId, SRpcHandleInfo *pConn);
int32_t qwBuildAndSendDropMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
#ifdef __cplusplus

View File

@ -96,7 +96,7 @@ void qwDbgDumpSchInfo(SQWorker *mgmt, SQWSchStatus *sch, int32_t i) {
int32_t taskNum = taosHashGetSize(sch->tasksHash);
QW_DLOG("***The %dth scheduler status, hbBrokenTs:%" PRId64 ",taskNum:%d", i, sch->hbBrokenTs, taskNum);
uint64_t qId, cId, tId;
uint64_t qId, cId, tId, sId = 0;
int32_t eId;
SQWTaskStatus *pTask = NULL;
void *pIter = taosHashIterate(sch->tasksHash, NULL);
@ -118,19 +118,20 @@ void qwDbgDumpTasksInfo(SQWorker *mgmt) {
int32_t i = 0;
SQWTaskCtx *ctx = NULL;
uint64_t qId, cId, tId;
uint64_t qId, cId, tId, sId;
int32_t eId;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, cId, tId, eId);
sId = ctx->sId;
QW_TASK_DLOG("%p lock:%x, phase:%d, type:%d, explain:%d, needFetch:%d, localExec:%d, queryMsgType:%d, "
"sId:%" PRId64 ", level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
"level:%d, queryGotData:%d, queryRsped:%d, queryEnd:%d, queryContinue:%d, queryInQueue:%d, "
"rspCode:%x, affectedRows:%" PRId64 ", taskHandle:%p, sinkHandle:%p, tbNum:%d, events:%d,%d,%d,%d,%d",
ctx, ctx->lock, ctx->phase, ctx->taskType, ctx->explain, ctx->needFetch, ctx->localExec, ctx->queryMsgType,
ctx->sId, ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue,
ctx->level, ctx->queryGotData, ctx->queryRsped, ctx->queryEnd, ctx->queryContinue,
ctx->queryInQueue, ctx->rspCode, ctx->affectedRows, ctx->taskHandle, ctx->sinkHandle, (int32_t)taosArrayGetSize(ctx->tbInfo),
ctx->events[QW_EVENT_CANCEL], ctx->events[QW_EVENT_READY],
ctx->events[QW_EVENT_FETCH], ctx->events[QW_EVENT_DROP], ctx->events[QW_EVENT_CQUERY]);

View File

@ -34,14 +34,14 @@ void qwIncConcurrentTaskNumCb(void) {
//TODO
}
int32_t qwInitJobInfo(uint64_t qId, SQWJobInfo* pJob) {
int32_t qwInitJobInfo(QW_FPARAMS_DEF, SQWJobInfo* pJob) {
pJob->pSessions= taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
if (NULL == pJob->pSessions) {
qError("fail to init session hash, code: 0x%x", terrno);
QW_TASK_ELOG("fail to init session hash, code: 0x%x", terrno);
return terrno;
}
int32_t code = taosMemPoolCallocJob(qId, (void**)&pJob->memInfo);
int32_t code = taosMemPoolCallocJob(qId, cId, (void**)&pJob->memInfo);
if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pJob->pSessions);
pJob->pSessions = NULL;
@ -72,24 +72,53 @@ int32_t qwInitJobHash(void) {
}
int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) {
void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) {
char id[sizeof(tId) + sizeof(eId) + 1] = {0};
QW_SET_TEID(id, tId, eId);
int32_t remainSessions = 0;
(void)taosHashRemove(pJobInfo->pSessions, id, sizeof(id));
taosMemPoolDestroySession(gMemPoolHandle, session, &remainSessions);
if (0 == remainSessions) {
QW_LOCK(QW_WRITE, &pJobInfo->lock);
if (0 == taosHashGetSize(pJobInfo->pSessions)) {
atomic_store_8(&pJobInfo->destroyed, 1);
qwDestroyJobInfo(pJobInfo);
QW_UNLOCK(QW_WRITE, &pJobInfo->lock);
char id2[sizeof(qId) + sizeof(cId) + 1] = {0};
QW_SET_QCID(id2, qId, cId);
(void)taosHashRemove(gQueryMgmt.pJobInfo, id2, sizeof(id2));
QW_TASK_DLOG_E("the whole query job removed");
} else {
QW_UNLOCK(QW_WRITE, &pJobInfo->lock);
}
}
}
int32_t qwRetrieveJobInfo(QW_FPARAMS_DEF, SQWJobInfo** ppJob) {
int32_t code = TSDB_CODE_SUCCESS;
SQWJobInfo* pJob = NULL;
char id[sizeof(qId) + sizeof(cId) + 1] = {0};
if (NULL == gQueryMgmt.pJobInfo) {
QW_ERR_RET(qwInitJobHash());
}
QW_SET_QCID(id, qId, cId);
while (true) {
pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, &qId, sizeof(qId));
pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, id, sizeof(id));
if (NULL == pJob) {
SQWJobInfo jobInfo = {0};
code = qwInitJobInfo(qId, &jobInfo);
code = qwInitJobInfo(QW_FPARAMS(), &jobInfo);
if (TSDB_CODE_SUCCESS != code) {
return code;
}
code = taosHashPut(gQueryMgmt.pJobInfo, &qId, sizeof(qId), &jobInfo, sizeof(jobInfo));
code = taosHashPut(gQueryMgmt.pJobInfo, id, sizeof(id), &jobInfo, sizeof(jobInfo));
if (TSDB_CODE_SUCCESS != code) {
qwDestroyJobInfo(&jobInfo);
if (TSDB_CODE_DUP_KEY == code) {
@ -97,31 +126,68 @@ int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) {
continue;
}
QW_TASK_ELOG("fail to put job to job hash, error: %s", tstrerror(code));
return code;
}
pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, &qId, sizeof(qId));
pJob = (SQWJobInfo*)taosHashAcquire(gQueryMgmt.pJobInfo, id, sizeof(id));
if (NULL == pJob) {
qError("QID:0x%" PRIx64 " not in joj hash, may be dropped", qId);
QW_TASK_ELOG_E("job not in job hash, may be dropped");
return TSDB_CODE_QRY_JOB_NOT_EXIST;
}
}
if (atomic_load_8(&pJob->destroyed)) {
continue;
}
break;
}
ctx->pJobInfo = pJob;
*ppJob = pJob;
char id[sizeof(tId) + sizeof(eId)] = {0};
QW_SET_TEID(id, tId, eId);
return code;
}
QW_ERR_JRET(taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo));
int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) {
int32_t code = TSDB_CODE_SUCCESS;
SQWJobInfo* pJob = NULL;
SQWSessionInfo session = {.mgmt = mgmt,
.sId = sId,
.qId = qId,
.cId = cId,
.tId = tId,
.rId = rId,
.eId = eId
};
code = taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES);
if (TSDB_CODE_SUCCESS != code) {
qError("fail to put session into query session hash, code: 0x%x", code);
QW_ERR_JRET(code);
}
do {
QW_ERR_JRET(qwRetrieveJobInfo(QW_FPARAMS(), &pJob));
ctx->pJobInfo = pJob;
char id[sizeof(tId) + sizeof(eId) + 1] = {0};
QW_SET_TEID(id, tId, eId);
QW_ERR_JRET(taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo, id));
session.sessionMp = *ppSession;
QW_LOCK(QW_READ, &pJob->lock);
if (atomic_load_8(&pJob->destroyed)) {
QW_UNLOCK(QW_READ, &pJob->lock);
continue;
}
code = taosHashPut(pJob->pSessions, id, sizeof(id), &session, sizeof(session));
if (TSDB_CODE_SUCCESS != code) {
QW_UNLOCK(QW_READ, &pJob->lock);
QW_TASK_ELOG("fail to put session into query session hash, code: 0x%x", code);
QW_ERR_JRET(code);
}
QW_UNLOCK(QW_READ, &pJob->lock);
break;
} while (true);
_return:

View File

@ -350,10 +350,10 @@ int32_t qwRegisterQueryBrokenLinkArg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn) {
return TSDB_CODE_SUCCESS;
}
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t sId, SRpcHandleInfo *pConn) {
int32_t qwRegisterHbBrokenLinkArg(SQWorker *mgmt, uint64_t clientId, SRpcHandleInfo *pConn) {
SSchedulerHbReq req = {0};
req.header.vgId = mgmt->nodeId;
req.sId = sId;
req.clientId = clientId;
int32_t msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
if (msgSize < 0) {
@ -720,6 +720,7 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
int32_t code = 0;
SSchedulerHbReq req = {0};
SQWorker *mgmt = (SQWorker *)qWorkerMgmt;
uint64_t clientId = 0;
QW_ERR_RET(qwUpdateTimeInQueue(mgmt, ts, FETCH_QUEUE));
QW_STAT_INC(mgmt->stat.msgStat.hbProcessed, 1);
@ -735,7 +736,8 @@ int32_t qWorkerProcessHbMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int64_
QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
}
uint64_t sId = req.sId;
clientId = req.clientId;
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .code = pMsg->code, .connInfo = pMsg->info};
if (TSDB_CODE_RPC_BROKEN_LINK == pMsg->code) {
QW_SCH_DLOG("receive Hb msg due to network broken, error:%s", tstrerror(pMsg->code));

View File

@ -70,7 +70,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status, bool
return TSDB_CODE_SUCCESS;
}
int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) {
int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t clientId, int32_t rwType) {
SQWSchStatus newSch = {0};
newSch.tasksHash =
taosHashInit(mgmt->cfg.maxSchTaskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
@ -82,7 +82,7 @@ int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) {
}
QW_LOCK(QW_WRITE, &mgmt->schLock);
int32_t code = taosHashPut(mgmt->schHash, &sId, sizeof(sId), &newSch, sizeof(newSch));
int32_t code = taosHashPut(mgmt->schHash, &clientId, sizeof(clientId), &newSch, sizeof(newSch));
if (0 != code) {
if (!HASH_NODE_EXIST(code)) {
QW_UNLOCK(QW_WRITE, &mgmt->schLock);
@ -99,15 +99,15 @@ int32_t qwAddSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType) {
return TSDB_CODE_SUCCESS;
}
int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch, int32_t nOpt) {
while (true) {
QW_LOCK(rwType, &mgmt->schLock);
*sch = taosHashGet(mgmt->schHash, &sId, sizeof(sId));
*sch = taosHashGet(mgmt->schHash, &clientId, sizeof(clientId));
if (NULL == (*sch)) {
QW_UNLOCK(rwType, &mgmt->schLock);
if (QW_NOT_EXIST_ADD == nOpt) {
QW_ERR_RET(qwAddSchedulerImpl(mgmt, sId, rwType));
QW_ERR_RET(qwAddSchedulerImpl(mgmt, clientId, rwType));
nOpt = QW_NOT_EXIST_RET_ERR;
@ -126,12 +126,12 @@ int32_t qwAcquireSchedulerImpl(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQW
return TSDB_CODE_SUCCESS;
}
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_ADD);
int32_t qwAcquireAddScheduler(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(mgmt, clientId, rwType, sch, QW_NOT_EXIST_ADD);
}
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t sId, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(mgmt, sId, rwType, sch, QW_NOT_EXIST_RET_ERR);
int32_t qwAcquireScheduler(SQWorker *mgmt, uint64_t clientId, int32_t rwType, SQWSchStatus **sch) {
return qwAcquireSchedulerImpl(mgmt, clientId, rwType, sch, QW_NOT_EXIST_RET_ERR);
}
void qwReleaseScheduler(int32_t rwType, SQWorker *mgmt) { QW_UNLOCK(rwType, &mgmt->schLock); }
@ -190,7 +190,7 @@ int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, i
int32_t qwAddTaskStatus(QW_FPARAMS_DEF, int32_t status) {
SQWSchStatus *tsch = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireAddScheduler(mgmt, sId, QW_READ, &tsch));
QW_ERR_RET(qwAcquireAddScheduler(mgmt, cId, QW_READ, &tsch));
QW_ERR_JRET(qwAddTaskStatusImpl(QW_FPARAMS(), tsch, 0, status, NULL));
@ -298,7 +298,7 @@ int32_t qwKillTaskHandle(SQWTaskCtx *ctx, int32_t rspCode) {
QW_RET(code);
}
void qwFreeTaskCtx(SQWTaskCtx *ctx) {
void qwFreeTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
if (ctx->ctrlConnInfo.handle) {
tmsgReleaseHandle(&ctx->ctrlConnInfo, TAOS_CONN_SERVER);
}
@ -322,7 +322,7 @@ void qwFreeTaskCtx(SQWTaskCtx *ctx) {
taosArrayDestroy(ctx->tbInfo);
if (gMemPoolHandle && ctx->memPoolSession) {
taosMemPoolDestroySession(gMemPoolHandle, ctx->memPoolSession);
qwDestroySession(QW_FPARAMS(), ctx->pJobInfo, ctx->memPoolSession);
}
}
@ -407,7 +407,7 @@ int32_t qwDropTaskCtx(QW_FPARAMS_DEF) {
QW_ERR_RET(QW_CTX_NOT_EXISTS_ERR_CODE(mgmt));
}
qwFreeTaskCtx(&octx);
qwFreeTaskCtx(QW_FPARAMS(), &octx);
ctx->tbInfo = NULL;
QW_TASK_DLOG_E("task ctx dropped");
@ -423,7 +423,7 @@ int32_t qwDropTaskStatus(QW_FPARAMS_DEF) {
char id[sizeof(qId) + sizeof(cId) + sizeof(tId) + sizeof(eId)] = {0};
QW_SET_QTID(id, qId, cId, tId, eId);
if (qwAcquireScheduler(mgmt, sId, QW_WRITE, &sch)) {
if (qwAcquireScheduler(mgmt, cId, QW_WRITE, &sch)) {
QW_TASK_WLOG_E("scheduler does not exist");
return TSDB_CODE_SUCCESS;
}
@ -457,7 +457,7 @@ int32_t qwUpdateTaskStatus(QW_FPARAMS_DEF, int8_t status, bool dynamicTask) {
SQWTaskStatus *task = NULL;
int32_t code = 0;
QW_ERR_RET(qwAcquireScheduler(mgmt, sId, QW_READ, &sch));
QW_ERR_RET(qwAcquireScheduler(mgmt, cId, QW_READ, &sch));
QW_ERR_JRET(qwAcquireTaskStatus(QW_FPARAMS(), QW_READ, sch, &task));
QW_ERR_JRET(qwSetTaskStatus(QW_FPARAMS(), task, status, dynamicTask));
@ -597,16 +597,18 @@ void qwDestroyImpl(void *pMgmt) {
mgmt->hbTimer = NULL;
taosTmrCleanUp(mgmt->timer);
uint64_t qId, cId, tId;
uint64_t qId, cId, tId, sId;
int32_t eId;
int64_t rId = 0;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
SQWTaskCtx *ctx = (SQWTaskCtx *)pIter;
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, cId, tId, eId);
qwFreeTaskCtx(ctx);
sId = ctx->sId;
qwFreeTaskCtx(QW_FPARAMS(), ctx);
QW_TASK_DLOG_E("task ctx freed");
pIter = taosHashIterate(mgmt->ctxHash, pIter);
taskCount++;
@ -694,23 +696,23 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t num = taosArrayGetSize(pExpiredSch);
for (int32_t i = 0; i < num; ++i) {
uint64_t *sId = taosArrayGet(pExpiredSch, i);
uint64_t *clientId = taosArrayGet(pExpiredSch, i);
SQWSchStatus *pSch = NULL;
if (NULL == sId) {
qError("get the %dth sch failed, code:%x", i, terrno);
if (NULL == clientId) {
qError("get the %dth client failed, code:%x", i, terrno);
break;
}
code = qwAcquireScheduler(mgmt, *sId, QW_WRITE, &pSch);
code = qwAcquireScheduler(mgmt, *clientId, QW_WRITE, &pSch);
if (TSDB_CODE_SUCCESS != code) {
qError("acquire sch %" PRIx64 " failed, code:%x", *sId, code);
qError("acquire client %" PRIx64 " failed, code:%x", *clientId, code);
continue;
}
if (taosHashGetSize(pSch->tasksHash) <= 0) {
qwDestroySchStatus(pSch);
code = taosHashRemove(mgmt->schHash, sId, sizeof(*sId));
qDebug("sch %" PRIx64 " destroy result code:%x", *sId, code);
code = taosHashRemove(mgmt->schHash, clientId, sizeof(*clientId));
qDebug("client %" PRIx64 " destroy result code:%x", *clientId, code);
}
qwReleaseScheduler(QW_WRITE, mgmt);
@ -718,10 +720,77 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) {
}
void qwDestroyJobInfo(SQWJobInfo* pJob) {
//TODO
if (NULL == pJob) {
return;
}
taosMemoryFreeClear(pJob->memInfo);
taosHashCleanup(pJob->pSessions);
pJob->pSessions = NULL;
}
void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) {
int32_t code = TSDB_CODE_SUCCESS;
QW_LOCK(QW_WRITE, &ctx->lock);
QW_TASK_DLOG_E("start to force stop task");
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock);
return;
}
if (QW_QUERY_RUNNING(ctx)) {
code = qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task running, async kill failed, error: %x", code);
} else {
QW_TASK_DLOG_E("task running, async killed");
}
} else if (QW_FETCH_RUNNING(ctx)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
QW_TASK_DLOG_E("task fetching, update drop received");
} else {
code = qwDropTask(QW_FPARAMS());
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task drop failed, error: %x", code);
} else {
QW_TASK_DLOG_E("task dropped");
}
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
}
void qwRetireTask(QW_FPARAMS_DEF) {
SQWTaskCtx *ctx = NULL;
int32_t code = qwAcquireTaskCtx(QW_FPARAMS(), &ctx);
if (TSDB_CODE_SUCCESS != code) {
return;
}
qwStopTask(QW_FPARAMS(), ctx);
qwReleaseTaskCtx(mgmt, ctx);
}
void qwRetireJob(SQWJobInfo* pJob) {
//TODO
if (NULL == pJob) {
return;
}
void* pIter = taosHashIterate(pJob->pSessions, NULL);
while (pIter) {
SQWSessionInfo* pSession = (SQWSessionInfo*)pIter;
qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId);
pIter = taosHashIterate(pJob->pSessions, pIter);
}
}

View File

@ -26,7 +26,6 @@ void qwStopAllTasks(SQWorker *mgmt) {
uint64_t qId, cId, tId, sId;
int32_t eId;
int64_t rId = 0;
int32_t code = TSDB_CODE_SUCCESS;
void *pIter = taosHashIterate(mgmt->ctxHash, NULL);
while (pIter) {
@ -34,41 +33,9 @@ void qwStopAllTasks(SQWorker *mgmt) {
void *key = taosHashGetKey(pIter, NULL);
QW_GET_QTID(key, qId, cId, tId, eId);
QW_LOCK(QW_WRITE, &ctx->lock);
sId = ctx->sId;
QW_TASK_DLOG_E("start to force stop task");
if (QW_EVENT_RECEIVED(ctx, QW_EVENT_DROP) || QW_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) {
QW_TASK_WLOG_E("task already dropping");
QW_UNLOCK(QW_WRITE, &ctx->lock);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
continue;
}
if (QW_QUERY_RUNNING(ctx)) {
code = qwKillTaskHandle(ctx, TSDB_CODE_VND_STOPPED);
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task running, async kill failed, error: %x", code);
} else {
QW_TASK_DLOG_E("task running, async killed");
}
} else if (QW_FETCH_RUNNING(ctx)) {
QW_UPDATE_RSP_CODE(ctx, TSDB_CODE_VND_STOPPED);
QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP);
QW_TASK_DLOG_E("task fetching, update drop received");
} else {
code = qwDropTask(QW_FPARAMS());
if (TSDB_CODE_SUCCESS != code) {
QW_TASK_ELOG("task drop failed, error: %x", code);
} else {
QW_TASK_DLOG_E("task dropped");
}
}
QW_UNLOCK(QW_WRITE, &ctx->lock);
qwStopTask(QW_FPARAMS(), ctx);
pIter = taosHashIterate(mgmt->ctxHash, pIter);
}
@ -79,7 +46,7 @@ int32_t qwProcessHbLinkBroken(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *re
SSchedulerHbRsp rsp = {0};
SQWSchStatus *sch = NULL;
QW_ERR_RET(qwAcquireScheduler(mgmt, req->sId, QW_READ, &sch));
QW_ERR_RET(qwAcquireScheduler(mgmt, req->clientId, QW_READ, &sch));
QW_LOCK(QW_WRITE, &sch->hbConnLock);
@ -1167,8 +1134,8 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
QW_RET(qwProcessHbLinkBroken(mgmt, qwMsg, req));
}
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->sId, QW_READ, &sch));
QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->sId, &qwMsg->connInfo));
QW_ERR_JRET(qwAcquireAddScheduler(mgmt, req->clientId, QW_READ, &sch));
QW_ERR_JRET(qwRegisterHbBrokenLinkArg(mgmt, req->clientId, &qwMsg->connInfo));
sch->hbBrokenTs = 0;
@ -1184,7 +1151,7 @@ int32_t qwProcessHb(SQWorker *mgmt, SQWMsg *qwMsg, SSchedulerHbReq *req) {
QW_UNLOCK(QW_WRITE, &sch->hbConnLock);
QW_DLOG("hb connection updated, sId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->sId,
QW_DLOG("hb connection updated, clientId:%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, handle:%p, ahandle:%p", req->clientId,
req->epId.nodeId, req->epId.ep.fqdn, req->epId.ep.port, qwMsg->connInfo.handle, qwMsg->connInfo.ahandle);
qwReleaseScheduler(QW_READ, mgmt);
@ -1261,13 +1228,13 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
while (pIter) {
SQWSchStatus *sch1 = (SQWSchStatus *)pIter;
if (NULL == sch1->hbConnInfo.handle) {
uint64_t *sId = taosHashGetKey(pIter, NULL);
QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
uint64_t *clientId = taosHashGetKey(pIter, NULL);
QW_TLOG("cancel send hb to client %" PRIx64 " cause of no connection handle", *clientId);
if (sch1->hbBrokenTs > 0 && ((currentMs - sch1->hbBrokenTs) > QW_SCH_TIMEOUT_MSEC) &&
taosHashGetSize(sch1->tasksHash) <= 0) {
if (NULL == taosArrayPush(pExpiredSch, sId)) {
QW_ELOG("add sId 0x%" PRIx64 " to expiredSch failed, code:%x", *sId, terrno);
if (NULL == taosArrayPush(pExpiredSch, clientId)) {
QW_ELOG("add clientId 0x%" PRIx64 " to expiredSch failed, code:%x", *clientId, terrno);
taosHashCancelIterate(mgmt->schHash, pIter);
break;
}
@ -1356,7 +1323,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes) {
_return:
qwFreeTaskCtx(&ctx);
qwFreeTaskCtx(QW_FPARAMS(), &ctx);
QW_RET(TSDB_CODE_SUCCESS);
}
@ -1632,15 +1599,19 @@ _return:
}
void qWorkerRetireJob(uint64_t jobId, int32_t errCode) {
SQWJobInfo* pJob = (SQWJobInfo*)taosHashGet(gQueryMgmt.pJobInfo, &jobId, sizeof(jobId));
void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) {
char id[sizeof(jobId) + sizeof(clientId) + 1] = {0};
QW_SET_QCID(id, jobId, clientId);
SQWJobInfo* pJob = (SQWJobInfo*)taosHashGet(gQueryMgmt.pJobInfo, id, sizeof(id));
if (NULL == pJob) {
qError("QID:0x%" PRIx64 " fail to get job from job hash", jobId);
qError("QID:0x%" PRIx64 " CID:0x%" PRIx64 " fail to get job from job hash", jobId, clientId);
return;
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
qInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64,
jobId, clientId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
qwRetireJob(pJob);
} else {
@ -1650,7 +1621,6 @@ void qWorkerRetireJob(uint64_t jobId, int32_t errCode) {
void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) {
SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL);
uint64_t jobId = 0;
int64_t retiredSize = 0;
while (retiredSize < retireSize && NULL != pJob) {
if (atomic_load_8(&pJob->retired)) {
@ -1660,14 +1630,13 @@ void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) {
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize);
jobId = pJob->memInfo->jobId;
qwRetireJob(pJob);
retiredSize += aSize;
qDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64,
jobId, aSize, retireSize);
qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job retired in batch, usedSize:%" PRId64 ", retireSize:%" PRId64,
pJob->memInfo->jobId, pJob->memInfo->clientId, aSize, retireSize);
}
pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob);

View File

@ -144,7 +144,7 @@ typedef struct SSchedulerCfg {
typedef struct SSchedulerMgmt {
uint64_t clientId; // unique clientId
uint64_t taskId; // sequential taksId
uint64_t sId; // schedulerId
uint64_t seriousId; // sequential seriousId
SSchedulerCfg cfg;
bool exit;
int32_t jobRef;
@ -274,6 +274,7 @@ typedef struct {
typedef struct SSchJob {
int64_t refId;
uint64_t queryId;
uint64_t seriousId;
SSchJobAttr attr;
int32_t levelNum;
int32_t taskNum;
@ -405,8 +406,13 @@ extern SSchedulerMgmt schMgmt;
(SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx))
#define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) (SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect)
#if 0
#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code))
#else
#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_NETWORK_ERR(_code)))
#endif
#define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \
(SCH_REDIRECT_MSGTYPE(_msgType) && \
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || \
@ -450,23 +456,23 @@ extern SSchedulerMgmt schMgmt;
(_task)->profile.endTs = us; \
} while (0)
#define SCH_JOB_ELOG(param, ...) qError("qid:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("qid:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__)
#define SCH_JOB_ELOG(param, ...) qError("qid:0x%" PRIx64 ",SID:%" PRId64 " " param, pJob->queryId, pJob->seriousId, __VA_ARGS__)
#define SCH_JOB_DLOG(param, ...) qDebug("qid:0x%" PRIx64 ",SID:%" PRId64 " " param, pJob->queryId, pJob->seriousId, __VA_ARGS__)
#define SCH_TASK_ELOG(param, ...) \
qError("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
qError("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_TASK_DLOG(param, ...) \
qDebug("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
qDebug("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_TASK_TLOG(param, ...) \
qTrace("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
qTrace("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_TASK_DLOGL(param, ...) \
qDebugL("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
qDebugL("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_TASK_WLOG(param, ...) \
qWarn("qid:0x%" PRIx64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, SCH_CLIENT_ID(pTask), \
qWarn("qid:0x%" PRIx64 ",SID:%" PRId64 ",CID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d " param, pJob->queryId, pJob->seriousId, SCH_CLIENT_ID(pTask), \
SCH_TASK_ID(pTask), SCH_TASK_EID(pTask), __VA_ARGS__)
#define SCH_SET_ERRNO(_err) \

View File

@ -345,6 +345,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
pJob->levelNum = levelNum;
SCH_RESET_JOB_LEVEL_IDX(pJob);
atomic_add_fetch_64(&pJob->seriousId, 1);
SSchLevel level = {0};
SNodeListNode *plans = NULL;
@ -1038,6 +1039,7 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) {
}
SCH_RESET_JOB_LEVEL_IDX(pJob);
atomic_add_fetch_64(&pJob->seriousId, 1);
return TSDB_CODE_SUCCESS;
}

View File

@ -456,11 +456,17 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType));
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
#if 0
if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
} else if (SCH_TASKSET_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
SCH_RET(schHandleTaskSetRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
}
#else
if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
}
#endif
pTask->redirectCtx.inRedirect = false;
@ -1042,7 +1048,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) {
int32_t msgType = TDMT_SCH_QUERY_HEARTBEAT;
req.header.vgId = nodeEpId->nodeId;
req.sId = schMgmt.sId;
req.clientId = schMgmt.clientId;
TAOS_MEMCPY(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
@ -1137,7 +1143,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case TDMT_VND_DELETE: {
SVDeleteReq req = {0};
req.header.vgId = addr->nodeId;
req.sId = schMgmt.sId;
req.sId = pJob->seriousId;
req.queryId = pJob->queryId;
req.clientId = pTask->clientId;
req.taskId = pTask->taskId;
@ -1171,7 +1177,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SSubQueryMsg qMsg;
qMsg.header.vgId = addr->nodeId;
qMsg.header.contLen = 0;
qMsg.sId = schMgmt.sId;
qMsg.sId = pJob->seriousId;
qMsg.queryId = pJob->queryId;
qMsg.clientId = pTask->clientId;
qMsg.taskId = pTask->taskId;
@ -1227,7 +1233,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case TDMT_SCH_MERGE_FETCH: {
SResFetchReq req = {0};
req.header.vgId = addr->nodeId;
req.sId = schMgmt.sId;
req.sId = pJob->seriousId;
req.queryId = pJob->queryId;
req.clientId = pTask->clientId;
req.taskId = pTask->taskId;
@ -1255,7 +1261,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
STaskDropReq qMsg;
qMsg.header.vgId = addr->nodeId;
qMsg.header.contLen = 0;
qMsg.sId = schMgmt.sId;
qMsg.sId = pJob->seriousId;
qMsg.queryId = pJob->queryId;
qMsg.clientId = pTask->clientId;
qMsg.taskId = pTask->taskId;
@ -1285,7 +1291,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SCH_ERR_RET(schMakeHbRpcCtx(pJob, pTask, &rpcCtx));
SSchedulerHbReq req = {0};
req.sId = schMgmt.sId;
req.clientId = schMgmt.clientId;
req.header.vgId = addr->nodeId;
req.epId.nodeId = addr->nodeId;
TAOS_MEMCPY(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
@ -1313,7 +1319,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
STaskNotifyReq qMsg;
qMsg.header.vgId = addr->nodeId;
qMsg.header.contLen = 0;
qMsg.sId = schMgmt.sId;
qMsg.sId = pJob->seriousId;
qMsg.queryId = pJob->queryId;
qMsg.clientId = pTask->clientId;
qMsg.taskId = pTask->taskId;

View File

@ -308,7 +308,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
.type = QUERY_NODE_DOWNSTREAM_SOURCE,
.clientId = pTask->clientId,
.taskId = pTask->taskId,
.schedId = schMgmt.sId,
.sId = pJob->seriousId,
.execId = pTask->execId,
.addr = pTask->succeedAddr,
.fetchMsgType = SCH_FETCH_TYPE(pTask),
@ -565,6 +565,7 @@ int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, i
SCH_ERR_JRET(schResetTaskSetLevelInfo(pJob, pTask));
SCH_RESET_JOB_LEVEL_IDX(pJob);
atomic_add_fetch_64(&pJob->seriousId, 1);
code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
@ -1150,7 +1151,7 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
}
}
SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->clientId, pTask->taskId,
SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, pJob->seriousId, pJob->queryId, pTask->clientId, pTask->taskId,
pJob->refId, pTask->execId, &qwMsg, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) {
@ -1410,7 +1411,7 @@ int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
}
}
SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->clientId, pTask->taskId,
SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, pJob->seriousId, pJob->queryId, pTask->clientId, pTask->taskId,
pJob->refId, pTask->execId, &pRsp, explainRes));
if (SCH_IS_EXPLAIN_JOB(pJob)) {

View File

@ -276,7 +276,7 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
SCH_UNLOCK(SCH_WRITE, &hb->lock);
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
qDebug("hb connection updated, sId:0x%" PRIx64 ", nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p", schMgmt.sId,
qDebug("hb connection updated, nodeId:%d, fqdn:%s, port:%d, pTrans:%p, pHandle:%p",
epId->nodeId, epId->ep.fqdn, epId->ep.port, trans->pTrans, trans->pHandle);
return TSDB_CODE_SUCCESS;

View File

@ -56,12 +56,7 @@ int32_t schedulerInit() {
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
if (taosGetSystemUUIDU64(&schMgmt.sId)) {
qError("generate schedulerId failed, errno:%d", errno);
SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
}
qInfo("scheduler 0x%" PRIx64 " initialized, maxJob:%u", schMgmt.sId, schMgmt.cfg.maxJobNum);
qInfo("scheduler 0x%" PRIx64 " initialized, maxJob:%u", getClientId(), schMgmt.cfg.maxJobNum);
return TSDB_CODE_SUCCESS;
}

View File

@ -317,7 +317,7 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size)
if (quota > 0 && cAllocSize / 1048576UL > quota) {
code = TSDB_CODE_QRY_REACH_QMEM_THRESHOLD;
uWarn("job 0x%" PRIx64 " allocSize %" PRId64 " is over than quota %" PRId64, pJob->job.jobId, cAllocSize, quota);
pPool->cfg.cb.reachFp(pJob->job.jobId, code);
pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
MP_RET(code);
}
@ -326,7 +326,7 @@ int32_t mpChkQuotaOverflow(SMemPool* pPool, SMPSession* pSession, int64_t size)
code = TSDB_CODE_QRY_QUERY_MEM_EXHAUSTED;
uWarn("%s pool sysAvailMemSize %" PRId64 " can't alloc %" PRId64" while keeping reserveSize %dMB",
pPool->name, atomic_load_64(&tsCurrentAvailMemorySize), size, *pPool->cfg.reserveSize);
pPool->cfg.cb.reachFp(pJob->job.jobId, code);
pPool->cfg.cb.reachFp(pJob->job.jobId, pJob->job.clientId, code);
(void)atomic_sub_fetch_64(&pJob->job.allocMemSize, size);
MP_RET(code);
}
@ -1174,7 +1174,7 @@ void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) {
(void)mpUpdateCfg(pPool);
}
void taosMemPoolDestroySession(void* poolHandle, void* session) {
void taosMemPoolDestroySession(void* poolHandle, void* session, int32_t* remainSessions) {
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session;
if (NULL == poolHandle || NULL == pSession) {
@ -1182,7 +1182,9 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) {
return;
}
(void)atomic_sub_fetch_32(&pSession->pJob->remainSession, 1);
if (remainSessions) {
*remainSessions = atomic_sub_fetch_32(&pSession->pJob->remainSession, 1);
}
//TODO;
@ -1191,19 +1193,25 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) {
mpCheckStatDetail(pPool, pSession, "DestroySession");
mpDestroyPosStat(&pSession->stat.posStat);
taosMemFreeClear(pSession->sessionId);
TAOS_MEMSET(pSession, 0, sizeof(*pSession));
mpPushIdleNode(pPool, &pPool->sessionCache, (SMPListNode*)pSession);
}
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) {
int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, char* sessionId) {
int32_t code = TSDB_CODE_SUCCESS;
SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = NULL;
MP_ERR_JRET(mpPopIdleNode(pPool, &pPool->sessionCache, (void**)&pSession));
pSession->sessionId = taosStrdup(sessionId);
if (NULL == pSession->sessionId) {
MP_ERR_JRET(terrno);
}
TAOS_MEMCPY(&pSession->ctrl, &pPool->ctrl, sizeof(pSession->ctrl));
if (gMPFps[gMPMgmt.strategy].initSessionFp) {
@ -1218,7 +1226,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob) {
_return:
if (TSDB_CODE_SUCCESS != code) {
taosMemPoolDestroySession(poolHandle, pSession);
taosMemPoolDestroySession(poolHandle, pSession, NULL);
pSession = NULL;
(void)atomic_add_fetch_64(&pPool->stat.statSession.initFail, 1);
} else {
@ -1503,7 +1511,7 @@ int32_t taosMemPoolTrim(void* poolHandle, void* session, int32_t size, char* fil
return code;
}
int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) {
int32_t taosMemPoolCallocJob(uint64_t jobId, uint64_t cId, void** ppJob) {
int32_t code = TSDB_CODE_SUCCESS;
*ppJob = taosMemoryCalloc(1, sizeof(SMPJob));
if (NULL == *ppJob) {
@ -1513,6 +1521,7 @@ int32_t taosMemPoolCallocJob(uint64_t jobId, void** ppJob) {
SMPJob* pJob = (SMPJob*)*ppJob;
pJob->job.jobId = jobId;
pJob->job.clientId = cId;
return code;
}

View File

@ -64,6 +64,12 @@ threadlocal void* mptThreadPoolSession = NULL;
*(uint32_t *)((char *)(id) + sizeof(tId)) = (eId); \
} while (0)
#define MPT_SET_QCID(id, qId, cId) \
do { \
*(uint64_t *)(id) = (qId); \
*(uint64_t *)((char *)(id) + sizeof(qId)) = (cId); \
} while (0)
#define mptEnableMemoryPoolUsage(_pool, _session) do { mptThreadPoolHandle = _pool; mptThreadPoolSession = _session; } while (0)
#define mptDisableMemoryPoolUsage() (mptThreadPoolHandle = NULL, mptThreadPoolSession = NULL)
@ -294,7 +300,7 @@ int32_t mptInitJobInfo(uint64_t qId, SMPTJobInfo* pJob) {
return terrno;
}
int32_t code = taosMemPoolCallocJob(qId, (void**)&pJob->memInfo);
int32_t code = taosMemPoolCallocJob(qId, 0, (void**)&pJob->memInfo);
if (TSDB_CODE_SUCCESS != code) {
taosHashCleanup(pJob->pSessions);
pJob->pSessions = NULL;
@ -349,11 +355,11 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* p
pJobCtx->pJob = pJob;
pJob->pCtx = pJobCtx;
assert(0 == taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo));
char id[sizeof(tId) + sizeof(eId)] = {0};
char id[sizeof(tId) + sizeof(eId) + 0] = {0};
MPT_SET_TEID(id, tId, eId);
assert(0 == taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo, id));
assert(0 == taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES));
_return:
@ -411,7 +417,7 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[i].stat, sizeof(*pStat)));
mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]);
taosMemPoolDestroySession(gMemPoolHandle, pJobCtx->pSessions[i]);
taosMemPoolDestroySession(gMemPoolHandle, pJobCtx->pSessions[i], NULL);
}
uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobCtx->pJob->errCode);
@ -455,7 +461,12 @@ void mptRetireJob(SMPTJobInfo* pJob) {
mptCheckCompareJobInfo(pCtx);
mptResetJob(pCtx);
int32_t taskRunning = atomic_load_32(&pCtx->taskRunningNum);
if (0 == taskRunning) {
mptDestroyJob(pCtx, false);
} else {
uDebug("JOB:0x%x retired but will not destroy cause of task running, num:%d", pCtx->jobId, taskRunning);
}
}
int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) {
@ -516,17 +527,20 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) {
}
void mptRetireJobCb(uint64_t jobId, int32_t errCode) {
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &jobId, sizeof(jobId));
void mptRetireJobCb(uint64_t jobId, uint64_t clientId, int32_t errCode) {
char id[sizeof(jobId) + sizeof(clientId) + 1] = {0};
MPT_SET_QCID(id, jobId, clientId);
SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, id, sizeof(id));
if (NULL == pJob) {
uError("QID:0x%" PRIx64 " fail to get job from job hash", jobId);
uError("QID:0x%" PRIx64 " CID:0x%" PRIx64 " fail to get job from job hash", jobId, clientId);
return;
}
if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) {
uInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
uInfo("QID:0x%" PRIx64 " CID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, clientId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize));
} else {
uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
uDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, clientId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize));
}
}
@ -546,11 +560,13 @@ void mptWriteMem(void* pStart, int32_t size) {
void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
int32_t actId = 0;
bool actDone = false;
int32_t size = taosRand() % mptCtrl.maxSingleAllocSize;
int32_t size = 0;
int32_t osize = 0, nsize = 0;
while (!actDone) {
actId = taosRand() % 10;
size = (taosRand() % 10) ? (taosRand() % (mptCtrl.maxSingleAllocSize / 100)) : (taosRand() % mptCtrl.maxSingleAllocSize);
switch (actId) {
case 0: { // malloc
if (pTask->memIdx >= MPT_MAX_MEM_ACT_TIMES) {
@ -925,8 +941,13 @@ void mptCheckPoolUsedSize(int32_t jobNum) {
int64_t poolUsedSize = 0;
taosMemPoolGetUsedSizeBegin(gMemPoolHandle, &usedSize, &needEnd);
for (int32_t i = 0; i < jobNum; ++i) {
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
while (taosRTryLockLatch(&pJobCtx->jobExecLock)) {
}
int64_t jobUsedSize = 0;
for (int32_t m = 0; m < pJobCtx->taskNum; ++m) {
SMPStatDetail* pStat = NULL;
@ -942,6 +963,8 @@ void mptCheckPoolUsedSize(int32_t jobNum) {
assert(pJobCtx->pJob->memInfo->allocMemSize == jobUsedSize);
taosRUnLockLatch(&pJobCtx->jobExecLock);
poolUsedSize += jobUsedSize;
}