From 0bb263076e18239066626a1698b0ffd71ee36fb6 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Jul 2024 10:13:02 +0800 Subject: [PATCH 1/2] fix(stream): check return value --- source/dnode/mnode/impl/src/mndStream.c | 22 ++++++++++----------- source/dnode/mnode/impl/src/mndStreamHb.c | 5 +++-- source/dnode/mnode/impl/src/mndStreamUtil.c | 2 +- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 4dcd716f28..e20529f4b6 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1020,10 +1020,10 @@ static int32_t doSetCheckpointAction(SMnode *pMnode, STrans *pTrans, SStreamTask static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId, int8_t mndTrigger, bool lock) { - int32_t code = -1; + int32_t code = TSDB_CODE_SUCCESS; int64_t ts = taosGetTimestampMs(); if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) { - return TSDB_CODE_SUCCESS; + return code; } bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock); @@ -1087,13 +1087,11 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre goto _ERR; } - if ((code = mndTransPrepare(pMnode, pTrans)) != TSDB_CODE_SUCCESS) { - code = terrno; + code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("failed to prepare checkpoint trans since %s", terrstr()); - goto _ERR; } - code = 0; _ERR: mndTransDrop(pTrans); return code; @@ -1458,7 +1456,8 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return -1; } - if (mndTransPrepare(pMnode, pTrans) != 0) { + code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -2179,7 +2178,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { taosWUnLockLatch(&pStream->lock); code = mndTransPrepare(pMnode, pTrans); - if (code) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -2273,7 +2272,8 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { } taosWUnLockLatch(&pStream->lock); - if (mndTransPrepare(pMnode, pTrans) != 0) { + code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare pause stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -2434,7 +2434,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange } code = mndTransPrepare(pMnode, pTrans); - if (code) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -3129,7 +3129,7 @@ int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, } code = mndTransPrepare(pMnode, pTrans); - if (code) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare update checkpoint-info meta trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); diff --git a/source/dnode/mnode/impl/src/mndStreamHb.c b/source/dnode/mnode/impl/src/mndStreamHb.c index 1ca46f128f..507cafabe5 100644 --- a/source/dnode/mnode/impl/src/mndStreamHb.c +++ b/source/dnode/mnode/impl/src/mndStreamHb.c @@ -94,7 +94,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) { } code = mndTransPrepare(pMnode, pTrans); - if (code != 0) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare update stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); @@ -197,7 +197,8 @@ int32_t mndDropOrphanTasks(SMnode *pMnode, SArray *pList) { return code; } - if ((code = mndTransPrepare(pMnode, pTrans)) != 0) { + code = mndTransPrepare(pMnode, pTrans); + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); mndTransDrop(pTrans); return code; diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index 5a17d659cd..548eb118c7 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -1132,7 +1132,7 @@ int32_t mndCreateSetConsensusChkptIdTrans(SMnode *pMnode, SStreamObj *pStream, i } code = mndTransPrepare(pMnode, pTrans); - if (code) { + if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_ACTION_IN_PROGRESS) { mError("trans:%d, failed to prepare set consensus-chkptId trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); From 41617e42a5fee7bec7134e92e8e31ed3cba7be67 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 30 Jul 2024 15:44:29 +0800 Subject: [PATCH 2/2] fix(query): check the return value. --- source/libs/executor/src/tlinearhash.c | 38 ++++++++++++++++---------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/source/libs/executor/src/tlinearhash.c b/source/libs/executor/src/tlinearhash.c index ec897b06a9..0c70428e78 100644 --- a/source/libs/executor/src/tlinearhash.c +++ b/source/libs/executor/src/tlinearhash.c @@ -101,7 +101,10 @@ static int32_t doAddToBucket(SLHashObj* pHashObj, SLHashBucket* pBucket, int32_t return terrno; } - taosArrayPush(pBucket->pPageIdList, &newPageId); + void* px = taosArrayPush(pBucket->pPageIdList, &newPageId); + if (px == NULL) { + return terrno; + } doCopyObject(pNewPage->data, key, keyLen, data, size); pNewPage->num = sizeof(SFilePage) + nodeSize; @@ -127,7 +130,7 @@ static void doRemoveFromBucket(SFilePage* pPage, SLHashNode* pNode, SLHashBucket char* p = (char*)pNode + len; char* pEnd = (char*)pPage + pPage->num; - memmove(pNode, p, (pEnd - p)); + (void) memmove(pNode, p, (pEnd - p)); pPage->num -= len; if (pPage->num == 0) { @@ -189,7 +192,7 @@ static void doTrimBucketPages(SLHashObj* pHashObj, SLHashBucket* pBucket) { nodeSize = GET_LHASH_NODE_LEN(pStart); } else { // move to the front of pLast page if (pStart != pLast->data) { - memmove(pLast->data, pStart, (((char*)pLast) + pLast->num - pStart)); + (void) memmove(pLast->data, pStart, (((char*)pLast) + pLast->num - pStart)); setBufPageDirty(pLast, true); } @@ -235,7 +238,10 @@ static int32_t doAddNewBucket(SLHashObj* pHashObj) { setBufPageDirty(p, true); releaseBufPage(pHashObj->pBuf, p); - taosArrayPush(pBucket->pPageIdList, &pageId); + void* px = taosArrayPush(pBucket->pPageIdList, &pageId); + if (px == NULL) { + return terrno; + } pHashObj->numOfBuckets += 1; // printf("---------------add new bucket, id:0x%x, total:%d\n", pHashObj->numOfBuckets - 1, pHashObj->numOfBuckets); @@ -251,7 +257,7 @@ SLHashObj* tHashInit(int32_t inMemPages, int32_t pageSize, _hash_fn_t fn, int32_ if (!osTempSpaceAvailable()) { terrno = TSDB_CODE_NO_DISKSPACE; - printf("tHash Init failed since %s, tempDir:%s", terrstr(), tsTempDir); + (void) printf("tHash Init failed since %s, tempDir:%s", terrstr(), tsTempDir); taosMemoryFree(pHashObj); return NULL; } @@ -301,9 +307,10 @@ void* tHashCleanup(SLHashObj* pHashObj) { } int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data, size_t size) { + int32_t code = 0; if (pHashObj->bits == 0) { SLHashBucket* pBucket = pHashObj->pBucket[0]; - doAddToBucket(pHashObj, pBucket, 0, key, keyLen, data, size); + code = doAddToBucket(pHashObj, pBucket, 0, key, keyLen, data, size); } else { int32_t hashVal = pHashObj->hashFn(key, keyLen); int32_t v = doGetBucketIdFromHashVal(hashVal, pHashObj->bits); @@ -315,10 +322,11 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data } SLHashBucket* pBucket = pHashObj->pBucket[v]; - int32_t code = doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + code = doAddToBucket(pHashObj, pBucket, v, key, keyLen, data, size); + } + + if (code) { + return code; } pHashObj->size += 1; @@ -327,7 +335,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data if ((pHashObj->numOfBuckets * LHASH_CAP_RATIO * pHashObj->tuplesPerPage) < pHashObj->size) { int32_t newBucketId = pHashObj->numOfBuckets; - int32_t code = doAddNewBucket(pHashObj); + code = doAddNewBucket(pHashObj); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -362,7 +370,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data ASSERT(v1 == newBucketId); // printf("move key:%d to 0x%x bucket, remain items:%d\n", *(int32_t*)k, v1, pBucket->size - 1); SLHashBucket* pNewBucket = pHashObj->pBucket[newBucketId]; - doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen, + code = doAddToBucket(pHashObj, pNewBucket, newBucketId, (void*)GET_LHASH_NODE_KEY(pNode), pNode->keyLen, GET_LHASH_NODE_KEY(pNode), pNode->dataLen); doRemoveFromBucket(p, pNode, pBucket); } else { @@ -377,7 +385,7 @@ int32_t tHashPut(SLHashObj* pHashObj, const void* key, size_t keyLen, void* data doTrimBucketPages(pHashObj, pBucket); } - return TSDB_CODE_SUCCESS; + return code; } char* tHashGet(SLHashObj* pHashObj, const void* key, size_t keyLen) { @@ -420,8 +428,8 @@ int32_t tHashRemove(SLHashObj* pHashObj, const void* key, size_t keyLen) { } void tHashPrint(const SLHashObj* pHashObj, int32_t type) { - printf("==================== linear hash ====================\n"); - printf("total bucket:%d, size:%" PRId64 ", ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO); + (void) printf("==================== linear hash ====================\n"); + (void) printf("total bucket:%d, size:%" PRId64 ", ratio:%.2f\n", pHashObj->numOfBuckets, pHashObj->size, LHASH_CAP_RATIO); dBufSetPrintInfo(pHashObj->pBuf);