From 51f3f29d5bf323d34dbe70d9e471e729e638a6e5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 27 Oct 2024 21:52:46 +0800 Subject: [PATCH] fix(stream): fix syntax check failure. --- source/dnode/vnode/src/tq/tqRead.c | 5 ++++- source/libs/stream/src/streamMeta.c | 20 +++++++++++++++----- source/libs/stream/src/streamTask.c | 8 ++++++-- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 9d9e7c431a..bb8491108f 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1126,7 +1126,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId); } } - taosReleaseRef(streamTaskRefPool, refId); + int32_t ret = taosReleaseRef(streamTaskRefPool, refId); + if (ret) { + tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId); + } } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 05aaa66049..d414b02d69 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -135,7 +135,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) { void metaRefMgtRemove(int64_t* pRefId) { streamMutexLock(&gMetaRefMgt.mutex); - taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId)); + int32_t code = taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId)); taosMemoryFree(pRefId); streamMutexUnlock(&gMetaRefMgt.mutex); } @@ -534,7 +534,10 @@ void streamMetaClear(SStreamMeta* pMeta) { p->info.delaySchedParam = 0; } - taosRemoveRef(streamTaskRefPool, refId); + int32_t code = taosRemoveRef(streamTaskRefPool, refId); + if (code) { + stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId); + } } if (pMeta->streamBackendRid != 0) { @@ -722,12 +725,19 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa } if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { - taosRemoveRef(streamTaskRefPool, refId); + int32_t ret = taosRemoveRef(streamTaskRefPool, refId); + if (ret) { + stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId); + } return code; } if ((code = streamMetaCommit(pMeta)) != 0) { - taosRemoveRef(streamTaskRefPool, refId); + int32_t ret = taosRemoveRef(streamTaskRefPool, refId); + if (ret) { + stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId); + } + return code; } @@ -1215,7 +1225,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { streamMetaReleaseTask(pMeta, pTask); ret = taosRemoveRef(streamTaskRefPool, refId); if (ret) { - stError("vgId:%d failed to remove task:0x%x, refId:%"PRId64, pMeta->vgId, pTaskId->taskId, refId); + stError("vgId:%d failed to remove task:0x%x, refId:%" PRId64, pMeta->vgId, pTaskId->taskId, refId); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index e00984ea9b..67d76ba2ef 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1285,8 +1285,12 @@ int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) { *pRefId = taosMemoryMalloc(sizeof(int64_t)); if (*pRefId != NULL) { **pRefId = pTask->id.refId; - metaRefMgtAdd(pTask->pMeta->vgId, *pRefId); - return 0; + int32_t code = metaRefMgtAdd(pTask->pMeta->vgId, *pRefId); + if (code != 0) { + stError("s-task:%s failed to add refId:%" PRId64 " into refId-mgmt, code:%s", pTask->id.idStr, pTask->id.refId, + tstrerror(code)); + } + return code; } else { stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno)); return terrno;