fix(stream): avoid duplicated task id in task id list.

This commit is contained in:
Haojun Liao 2023-05-11 11:47:57 +08:00
parent 28c43f0151
commit 2b736ffd60
2 changed files with 12 additions and 6 deletions

View File

@ -627,7 +627,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
}
// sink
/*pTask->ahandle = pTq->pVnode;*/
if (pTask->outputType == TASK_OUTPUT__SMA) {
pTask->smaSink.vnode = pTq->pVnode;
pTask->smaSink.smaSink = smaHandleRes;

View File

@ -194,8 +194,12 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask*
return -1;
}
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES);
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) {
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
}
taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES);
return 0;
}
@ -333,15 +337,18 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) {
return -1;
}
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) {
void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId));
if (p == NULL) {
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
}
if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) {
tdbFree(pKey);
tdbFree(pVal);
tdbTbcClose(pCur);
return -1;
}
taosArrayPush(pMeta->pTaskList, &pTask->id.taskId);
if (pTask->fillHistory) {
pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM;
streamTaskCheckDownstream(pTask, ver);