diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4fb7d765d8..21c2ab13ab 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -627,7 +627,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } // sink - /*pTask->ahandle = pTq->pVnode;*/ if (pTask->outputType == TASK_OUTPUT__SMA) { pTask->smaSink.vnode = pTq->pVnode; pTask->smaSink.smaSink = smaHandleRes; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 534f5f0597..8b95025c0d 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -194,8 +194,12 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* return -1; } - taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); - taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); + if (p == NULL) { + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + } + + taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, POINTER_BYTES); return 0; } @@ -333,15 +337,18 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + void* p = taosHashGet(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId)); + if (p == NULL) { + taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); + } + + if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(pTask->id.taskId), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); return -1; } - taosArrayPush(pMeta->pTaskList, &pTask->id.taskId); - if (pTask->fillHistory) { pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; streamTaskCheckDownstream(pTask, ver);