[td-11818] fix a race condition.
This commit is contained in:
parent
284e794315
commit
0f2a175e1b
|
@ -310,7 +310,7 @@ int32_t ctgGetVgInfoFromHashValue(SDBVgroupInfo *dbInfo, const SName *pTableName
|
|||
}
|
||||
|
||||
if (NULL == vgInfo) {
|
||||
ctgError("no hash range found for hashvalue [%u], numOfVgId:%d", hashValue, taosHashGetSize(dbInfo->vgInfo));
|
||||
ctgError("no hash range found for hash value [%u], numOfVgId:%d", hashValue, taosHashGetSize(dbInfo->vgInfo));
|
||||
|
||||
void *pIter1 = taosHashIterate(dbInfo->vgInfo, NULL);
|
||||
while (pIter1) {
|
||||
|
|
|
@ -274,12 +274,14 @@ int32_t schSetTaskCandidateAddrs(SSchJob *job, SSchTask *task) {
|
|||
}
|
||||
|
||||
|
||||
int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) {
|
||||
if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) {
|
||||
qError("taosHashPut failed");
|
||||
int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
|
||||
if (0 != taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES)) {
|
||||
qError("failed to add new task, taskId:%"PRId64", reqId:0x"PRIx64", out of memory", pJob->queryId);
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
qDebug("add one task, taskId:%"PRId64", numOfTasks:%d, reqId:0x%"PRIx64, pTask->taskId, taosHashGetSize(pJob->execTasks),
|
||||
pJob->queryId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -583,8 +585,14 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in
|
|||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
int32_t s = taosHashGetSize((*job)->execTasks);
|
||||
assert(s != 0);
|
||||
|
||||
SSchTask **task = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId));
|
||||
if (NULL == task || NULL == (*task)) {
|
||||
void* f1 = taosHashGet((*job)->execTasks, &pParam->taskId, sizeof(pParam->taskId));
|
||||
|
||||
assert(0);
|
||||
qError("taosHashGet taskId:%"PRIx64" not exist", pParam->taskId);
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
@ -819,12 +827,13 @@ int32_t schLaunchTask(SSchJob *job, SSchTask *task) {
|
|||
SCH_ERR_RET(schSetTaskCandidateAddrs(job, task));
|
||||
|
||||
if (NULL == task->candidateAddrs || taosArrayGetSize(task->candidateAddrs) <= 0) {
|
||||
SCH_TASK_ERR_LOG("no valid condidate node for task:%"PRIx64, task->taskId);
|
||||
SCH_TASK_ERR_LOG("no valid candidate node for task:%"PRIx64, task->taskId);
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType));
|
||||
// NOTE: race condition: the task should be put into the hash table before send msg to server
|
||||
SCH_ERR_RET(schPushTaskToExecList(job, task));
|
||||
SCH_ERR_RET(schBuildAndSendMsg(job, task, plan->msgType));
|
||||
|
||||
task->status = JOB_TASK_STATUS_EXECUTING;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -975,7 +984,7 @@ _return:
|
|||
}
|
||||
|
||||
int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, void** pJob, SQueryResult *pRes) {
|
||||
if (NULL == transport || /* NULL == nodeList || */ NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
|
||||
if (NULL == transport || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob || NULL == pRes) {
|
||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue