From 0f2a175e1b21c3fa653a8d89c37166474cb6dca0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 6 Jan 2022 17:30:27 +0800 Subject: [PATCH] [td-11818] fix a race condition. --- source/libs/catalog/src/catalog.c | 2 +- source/libs/scheduler/src/scheduler.c | 21 +++++++++++++++------ 2 files changed, 16 insertions(+), 7 deletions(-) diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index d84d126b67..b4d51e50a4 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -310,7 +310,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName } if (NULL == vgInfo) { - ctgError("no hash range found for hashvalue [%u], numOfVgId:%d", hashValue, taosHashGetSize(dbInfo->vgInfo)); + ctgError("no hash range found for hash value [%u], numOfVgId:%d", hashValue, taosHashGetSize(dbInfo->vgInfo)); void *pIter1 = taosHashIterate(dbInfo->vgInfo, NULL); while (pIter1) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 38a823de2c..491d799e07 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -274,12 +274,14 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) { } -int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) { - if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { - qError("taosHashPut failed"); +int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { + if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) { + qError("failed to add new task, taskId:%"PRId64", reqId:0x"PRIx64", out of memory", pJob->queryId); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + qDebug("add one task, taskId:%"PRId64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks), + pJob->queryId); return TSDB_CODE_SUCCESS; } @@ -583,8 +585,14 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } + int32_t s = taosHashGetSize((*job)->execTasks); + assert(s != 0); + SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); if (NULL == task || NULL == (*task)) { + void* f1 = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId)); + + assert(0); qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -819,12 +827,13 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SCH_ERR_RET(schSetTaskCandidateAddrs(job, task)); if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) { - SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId); + SCH_TASK_ERR_LOG("no valid candidate node for task:%"PRIx64, task->taskId); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType)); + // NOTE: race condition: the task should be put into the hash table before send msg to server SCH_ERR_RET(schPushTaskToExecList(job, task)); + SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType)); task->status = JOB_TASK_STATUS_EXECUTING; return TSDB_CODE_SUCCESS; @@ -975,7 +984,7 @@ _return: } int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) { - if (NULL == transport || /* NULL == nodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { + if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); }