From d397c9a9b75e0f9cc0c3d46653952ee2ca98dcb5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Nov 2024 19:22:37 +0800 Subject: [PATCH] fix(stream): remove failed task in hash table and array list. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 2 +- source/libs/stream/src/streamMeta.c | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 326e8d4ada..1ea524dc78 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -652,7 +652,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve streamMetaWUnLock(pMeta); if (code < 0) { - tqError("failed to add s-task:0x%x into vgId:%d meta, existed:%d, code:%s", vgId, taskId, numOfTasks, + tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks, tstrerror(code)); return code; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 86f305df60..a6f87711bf 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -723,8 +723,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask); code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)); - if (code) { // todo remove it from task list + if (code) { stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId); + void* pUnused = taosArrayPop(pMeta->pTaskList); int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret != 0) { @@ -734,6 +735,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { + int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); + void* pUnused = taosArrayPop(pMeta->pTaskList); + int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret) { stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId); @@ -742,6 +746,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if ((code = streamMetaCommit(pMeta)) != 0) { + int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id)); + void* pUnused = taosArrayPop(pMeta->pTaskList); + int32_t ret = taosRemoveRef(streamTaskRefPool, refId); if (ret) { stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);