From 586826c41fcfb907f72df2e28f12e70ef7a4be06 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 18 Sep 2024 15:47:15 +0800 Subject: [PATCH 1/6] refactor: check return value. --- source/dnode/mnode/impl/src/mndStream.c | 5 +-- source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 36 ++++++++++--------- source/libs/stream/src/streamMeta.c | 23 ++++++++---- source/libs/stream/src/streamSnapshot.c | 23 ++++++++++++ source/libs/stream/src/streamTask.c | 5 +++ 6 files changed, 68 insertions(+), 26 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 1fb398d070..511cc8f984 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2195,8 +2195,9 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1}; epsetAssign(&entry.epset, &pTask->info.epSet); - if (taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry)) != 0) { - mError("failed to put entry into hash map, nodeId:%d", entry.nodeId); + int32_t ret = taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry)); + if (ret != 0 && ret != TSDB_CODE_DUP_KEY) { + mError("failed to put entry into hash map, nodeId:%d, code:%s", entry.nodeId, tstrerror(code)); } } diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index 3a5d72576b..567d9de949 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -135,7 +135,7 @@ typedef struct { #define META_ON_S3_FORMATE "%s_%" PRId64 "\n%s_%" PRId64 "\n%s_%" PRId64 "" bool streamBackendDataIsExist(const char* path, int64_t chkpId); -void* streamBackendInit(const char* path, int64_t chkpId, int32_t vgId); +int32_t streamBackendInit(const char* path, int64_t chkpId, int32_t vgId, SBackendWrapper** pBackend); void streamBackendCleanup(void* arg); void streamBackendHandleCleanup(void* arg); int32_t streamBackendLoadCheckpointInfo(void* pMeta); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index d9b6671d9b..57f75743ac 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -812,29 +812,32 @@ bool streamBackendDataIsExist(const char* path, int64_t chkpId) { return exist; } -void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { +int32_t streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId, SBackendWrapper** pBackend) { char* backendPath = NULL; - int32_t code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); + int32_t code = 0; + int32_t lino = 0; + char* err = NULL; + size_t nCf = 0; + + *pBackend = NULL; + + code = rebuildDirFromCheckpoint(streamPath, chkpId, &backendPath); + TSDB_CHECK_CODE(code, lino, _EXIT); stDebug("start to init stream backend:%s, checkpointId:%" PRId64 " vgId:%d", backendPath, chkpId, vgId); uint32_t dbMemLimit = nextPow2(tsMaxStreamBackendCache) << 20; SBackendWrapper* pHandle = taosMemoryCalloc(1, sizeof(SBackendWrapper)); - if (pHandle == NULL) { - goto _EXIT; - } + TSDB_CHECK_NULL(pHandle, code, lino, _EXIT, terrno); pHandle->list = tdListNew(sizeof(SCfComparator)); - if (pHandle->list == NULL) { - goto _EXIT; - } + TSDB_CHECK_NULL(pHandle->list, code, lino, _EXIT, terrno); (void)taosThreadMutexInit(&pHandle->mutex, NULL); (void)taosThreadMutexInit(&pHandle->cfMutex, NULL); + pHandle->cfInst = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - if (pHandle->cfInst == NULL) { - goto _EXIT; - } + TSDB_CHECK_NULL(pHandle->cfInst, code, lino, _EXIT, terrno); rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); @@ -863,9 +866,6 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { NULL, destroyCompactFilteFactory, compactFilteFactoryCreateFilter, compactFilteFactoryName); rocksdb_options_set_compaction_filter_factory(pHandle->dbOpt, pHandle->filterFactory); - char* err = NULL; - size_t nCf = 0; - char** cfs = rocksdb_list_column_families(opts, backendPath, &nCf, &err); if (nCf == 0 || nCf == 1 || err != NULL) { taosMemoryFreeClear(err); @@ -894,7 +894,9 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) { stDebug("init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId); taosMemoryFreeClear(backendPath); - return (void*)pHandle; + *pBackend = pHandle; + return code; + _EXIT: rocksdb_options_destroy(opts); rocksdb_cache_destroy(cache); @@ -904,9 +906,9 @@ _EXIT: taosHashCleanup(pHandle->cfInst); pHandle->list = tdListFree(pHandle->list); taosMemoryFree(pHandle); - stDebug("failed to init stream backend at %s", backendPath); + stDebug("failed to init stream backend at %s, vgId:%d line:%d code:%s", backendPath, vgId, lino, tstrerror(code)); taosMemoryFree(backendPath); - return NULL; + return code; } void streamBackendCleanup(void* arg) { SBackendWrapper* pHandle = (SBackendWrapper*)arg; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f4202667ff..699ad9d372 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -231,8 +231,10 @@ int32_t streamMetaCheckBackendCompatible(SStreamMeta* pMeta) { } int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) { - int32_t code = 0; - int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta); + int32_t code = 0; + SBackendWrapper* pBackend = NULL; + int64_t chkpId = streamMetaGetLatestCheckpointId(pMeta); + terrno = 0; bool exist = streamBackendDataIsExist(pMeta->path, chkpId); if (exist == false) { @@ -240,7 +242,10 @@ int32_t streamMetaCvtDbFormat(SStreamMeta* pMeta) { return code; } - SBackendWrapper* pBackend = streamBackendInit(pMeta->path, chkpId, pMeta->vgId); + code = streamBackendInit(pMeta->path, chkpId, pMeta->vgId, &pBackend); + if (code) { + return code; + } void* pIter = taosHashIterate(pBackend->cfInst, NULL); while (pIter) { @@ -259,9 +264,13 @@ _EXIT: if (code == 0) { char* state = taosMemoryCalloc(1, strlen(pMeta->path) + 32); - sprintf(state, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); - taosRemoveDir(state); - taosMemoryFree(state); + if (state != NULL) { + sprintf(state, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); + taosRemoveDir(state); + taosMemoryFree(state); + } else { + stError("vgId:%s, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code)); + } } return code; @@ -451,6 +460,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn, TSDB_CHECK_CODE(code, lino, _err); int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); + TSDB_CHECK_NULL(pRid, code, lino, _err, terrno); + memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); code = metaRefMgtAdd(pMeta->vgId, pRid); TSDB_CHECK_CODE(code, lino, _err); diff --git a/source/libs/stream/src/streamSnapshot.c b/source/libs/stream/src/streamSnapshot.c index 511445858a..2742798a04 100644 --- a/source/libs/stream/src/streamSnapshot.c +++ b/source/libs/stream/src/streamSnapshot.c @@ -130,6 +130,11 @@ int32_t streamGetFileSize(char* path, char* name, int64_t* sz) { int32_t ret = 0; char* fullname = taosMemoryCalloc(1, strlen(path) + 32); + if (fullname == NULL) { + stError("failed to get file:%s size, code: out of memory", name); + return terrno; + } + sprintf(fullname, "%s%s%s", path, TD_DIRSEP, name); ret = taosStatFile(fullname, sz, NULL, NULL); @@ -555,6 +560,11 @@ _NEXT: (int32_t)taosArrayGetSize(pHandle->pDbSnapSet), pHandle->currIdx); uint8_t* buf = taosMemoryCalloc(1, sizeof(SStreamSnapBlockHdr) + kBlockSize); + if (buf == NULL) { + stError("%s failed to prepare the block header, code:Out of memory", item->name); + return terrno; + } + int64_t nread = taosPReadFile(pSnapFile->fd, buf + sizeof(SStreamSnapBlockHdr), kBlockSize, pSnapFile->offset); if (nread < 0) { taosMemoryFree(buf); @@ -764,6 +774,11 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa sprintf(idstr, "0x%" PRIx64 "-0x%x", snapInfo.streamId, (int32_t)(snapInfo.taskId)); char* path = taosMemoryCalloc(1, strlen(pHandle->metaPath) + 256); + if (path == NULL) { + stError("s-task:0x%x failed to prepare meta header buffer, code:Out of memory", (int32_t) snapInfo.taskId); + return terrno; + } + sprintf(path, "%s%s%s%s%s%s%s%" PRId64 "", pHandle->metaPath, TD_DIRSEP, idstr, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", snapInfo.chkpId); if (!taosIsDir(path)) { @@ -778,11 +793,19 @@ int32_t streamSnapWrite(SStreamSnapWriter* pWriter, uint8_t* pData, uint32_t nDa pDbSnapFile->path = path; pDbSnapFile->snapInfo = snapInfo; pDbSnapFile->pFileList = taosArrayInit(64, sizeof(SBackendFileItem)); + if (pDbSnapFile->pFileList == NULL) { + return terrno; + } + pDbSnapFile->currFileIdx = 0; pDbSnapFile->offset = 0; SBackendFileItem item = {0}; item.name = taosStrdup((char*)ROCKSDB_CURRENT); + if (item.name == NULL) { + return terrno; + } + item.type = ROCKSDB_CURRENT_TYPE; void* p = taosArrayPush(pDbSnapFile->pFileList, &item); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 365710f8a7..3319c7c74f 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -133,6 +133,11 @@ int32_t tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool sprintf(buf, "0x%" PRIx64 "-0x%x", pTask->id.streamId, pTask->id.taskId); pTask->id.idStr = taosStrdup(buf); + if (pTask->id.idStr == NULL) { + stError("s-task:0x%x failed to build task id, code: out of memory", pTask->id.taskId); + return terrno; + } + pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; pTask->status.taskStatus = fillHistory ? TASK_STATUS__SCAN_HISTORY : TASK_STATUS__READY; pTask->inputq.status = TASK_INPUT_STATUS__NORMAL; From 720d1c2486b94db6f320cfdb4563354fdce4fbe8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 18 Sep 2024 16:43:18 +0800 Subject: [PATCH 2/6] fix(stream): fix memory leak. --- source/dnode/mnode/impl/src/mndStreamUtil.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index bad44a8687..5d8ba02781 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -628,6 +628,7 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha code = tEncodeStreamTaskUpdateMsg(&encoder, &req); if (code == -1) { tEncoderClear(&encoder); + taosMemoryFree(buf); taosArrayDestroy(req.pNodeList); return code; } @@ -648,21 +649,25 @@ static int32_t doBuildStreamTaskUpdateMsg(void **pBuf, int32_t *pLen, SVgroupCha static int32_t doSetUpdateTaskAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask, SVgroupChangeInfo *pInfo) { void *pBuf = NULL; int32_t len = 0; + SEpSet epset = {0}; + bool hasEpset = false; + bool unusedRet = streamTaskUpdateEpsetInfo(pTask, pInfo->pUpdateNodeList); int32_t code = doBuildStreamTaskUpdateMsg(&pBuf, &len, pInfo, pTask->info.nodeId, &pTask->id, pTrans->id); if (code) { + mError("failed to build stream task epset update msg, code:%s", tstrerror(code)); return code; } - SEpSet epset = {0}; - bool hasEpset = false; code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); if (code != TSDB_CODE_SUCCESS || !hasEpset) { + mError("failed to extract epset during create update epset, code:%s", tstrerror(code)); return code; } code = setTransAction(pTrans, pBuf, len, TDMT_VND_STREAM_TASK_UPDATE, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID); if (code != TSDB_CODE_SUCCESS) { + mError("failed to create update task epset trans, code:%s", tstrerror(code)); taosMemoryFree(pBuf); } From ddce0e5386a46957714fc2e23f8349d23f27cf2f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 18 Sep 2024 22:28:28 +0800 Subject: [PATCH 3/6] fix(stream): fix syntax error. --- source/libs/stream/src/streamMeta.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 699ad9d372..a8c698c408 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -269,7 +269,7 @@ _EXIT: taosRemoveDir(state); taosMemoryFree(state); } else { - stError("vgId:%s, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code)); + stError("vgId:%d, failed to remove file dir:%s, since:%s", pMeta->vgId, pMeta->path, tstrerror(code)); } } From 47a2a8528c4aec69a0511aa1c08cac829554683b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 Sep 2024 09:04:29 +0800 Subject: [PATCH 4/6] fix(stream): fix syntax error in unit test. --- source/libs/stream/test/backendTest.cpp | 6 ++++-- source/libs/stream/test/tstreamUpdateTest.cpp | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 2b21510e45..391ca66c53 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -478,11 +478,13 @@ TEST_F(BackendEnv, oldBackendInit) { ASSERT(code == 0); { - SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(path, 10, 10); + SBackendWrapper *p = NULL; + int32_t code = (SBackendWrapper *)streamBackendInit(path, 10, 10, &p); streamBackendCleanup((void *)p); } { - SBackendWrapper *p = (SBackendWrapper *)streamBackendInit(path, 10, 10); + SBackendWrapper *p = NULL; + int32_t code = (SBackendWrapper *)streamBackendInit(path, 10, 10, &p); streamBackendCleanup((void *)p); } diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index 4360fc7d54..10002469d3 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -12,7 +12,7 @@ class StreamStateEnv : public ::testing::Test { protected: virtual void SetUp() { streamMetaInit(); - backend = streamBackendInit(path, 0, 0); + int32_t code = streamBackendInit(path, 0, 0, &backend); } virtual void TearDown() { streamMetaCleanup(); } From 839590348fbfbf3db609a7f28ce6cc416d9f508e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 Sep 2024 09:43:35 +0800 Subject: [PATCH 5/6] fix(test): fix syntax error in the unit tests. --- source/libs/stream/test/backendTest.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/test/backendTest.cpp b/source/libs/stream/test/backendTest.cpp index 391ca66c53..e7e7149882 100644 --- a/source/libs/stream/test/backendTest.cpp +++ b/source/libs/stream/test/backendTest.cpp @@ -479,12 +479,12 @@ TEST_F(BackendEnv, oldBackendInit) { { SBackendWrapper *p = NULL; - int32_t code = (SBackendWrapper *)streamBackendInit(path, 10, 10, &p); + int32_t code = streamBackendInit(path, 10, 10, &p); streamBackendCleanup((void *)p); } { SBackendWrapper *p = NULL; - int32_t code = (SBackendWrapper *)streamBackendInit(path, 10, 10, &p); + int32_t code = streamBackendInit(path, 10, 10, &p); streamBackendCleanup((void *)p); } From d698e4107cf65ee74c2aa56404c13742c01c2288 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 19 Sep 2024 10:21:34 +0800 Subject: [PATCH 6/6] fix(stream): fix syntax error. --- source/libs/stream/test/tstreamUpdateTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/test/tstreamUpdateTest.cpp b/source/libs/stream/test/tstreamUpdateTest.cpp index 10002469d3..f3992aaa13 100644 --- a/source/libs/stream/test/tstreamUpdateTest.cpp +++ b/source/libs/stream/test/tstreamUpdateTest.cpp @@ -12,7 +12,7 @@ class StreamStateEnv : public ::testing::Test { protected: virtual void SetUp() { streamMetaInit(); - int32_t code = streamBackendInit(path, 0, 0, &backend); + int32_t code = streamBackendInit(path, 0, 0, (SBackendWrapper**)&backend); } virtual void TearDown() { streamMetaCleanup(); }