From 19a51d82eb523c1db4ec1acdce58d690e338ec41 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 18 Sep 2024 10:58:27 +0800 Subject: [PATCH 1/6] fix(stream):fix mem leak and malloc res --- source/common/src/tdatablock.c | 12 ++++++--- source/libs/executor/src/executil.c | 12 +++++---- source/libs/executor/src/executorInt.c | 2 ++ source/libs/stream/src/streamUpdate.c | 4 +++ source/libs/stream/src/tstreamFileState.c | 30 +++++++++++++++++------ source/util/src/tbloomfilter.c | 2 ++ 6 files changed, 46 insertions(+), 16 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 7e957357a9..d8a66f82bf 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2620,11 +2620,15 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf } len += snprintf(dumpBuf + len, size - len, "%s |end\n", flag); - *pDataBuf = dumpBuf; - dumpBuf = NULL; _exit: - if (dumpBuf) { - taosMemoryFree(dumpBuf); + if (code == TSDB_CODE_SUCCESS) { + *pDataBuf = dumpBuf; + dumpBuf = NULL; + } else { + uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + if (dumpBuf) { + taosMemoryFree(dumpBuf); + } } return code; } diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 05ed5a9d1e..01104dfe21 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -2833,11 +2833,13 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr qDebug("%s===stream===%s: Block is Empty. block type %d", taskIdStr, flag, pBlock->info.type); return; } - char* pBuf = NULL; - int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr); - if (code == 0) { - qDebug("%s", pBuf); - taosMemoryFree(pBuf); + if (qDebugFlag & DEBUG_DEBUG) { + char* pBuf = NULL; + int32_t code = dumpBlockData(pBlock, flag, &pBuf, taskIdStr); + if (code == 0) { + qDebug("%s", pBuf); + taosMemoryFree(pBuf); + } } } diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index 1804f0ce26..4fef157984 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -314,6 +314,8 @@ static int32_t doCreateConstantValColumnInfo(SInputColumnInfoData* pInput, SFunc } } else if (type == TSDB_DATA_TYPE_VARCHAR || type == TSDB_DATA_TYPE_GEOMETRY) { char* tmp = taosMemoryMalloc(pFuncParam->param.nLen + VARSTR_HEADER_SIZE); + QUERY_CHECK_NULL(tmp, code, lino, _end, terrno); + STR_WITH_SIZE_TO_VARSTR(tmp, pFuncParam->param.pz, pFuncParam->param.nLen); for (int32_t i = 0; i < numOfRows; ++i) { code = colDataSetVal(pColInfo, i, tmp, false); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index cc80e27467..2acb0f88af 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -631,7 +631,11 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { if (tDecodeI8(&decoder, &pInfo->pkColType) < 0) return -1; pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pInfo->pkColLen); + QUERY_CHECK_NULL(pInfo->pKeyBuff, code, lino, _error, errno); + pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen); + QUERY_CHECK_NULL(pInfo->pValueBuff, code, lino, _error, errno); + if (pInfo->pkColLen != 0) { pInfo->comparePkRowFn = compareKeyTsAndPk; pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index abb796b0b7..b3a868d625 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -96,6 +96,10 @@ int32_t intervalFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, void* intervalCreateStateKey(SRowBuffPos* pPos, int64_t num) { SStateKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateKey)); + if (pStateKey == NULL) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return NULL; + } SWinKey* pWinKey = pPos->pKey; pStateKey->key = *pWinKey; pStateKey->opNum = num; @@ -112,6 +116,10 @@ int32_t sessionFileGetFn(SStreamFileState* pFileState, void* pKey, void* data, i void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) { SStateSessionKey* pStateKey = taosMemoryCalloc(1, sizeof(SStateSessionKey)); + if (pStateKey == NULL) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return NULL; + } SSessionKey* pWinKey = pPos->pKey; pStateKey->key = *pWinKey; pStateKey->opNum = num; @@ -120,11 +128,16 @@ void* sessionCreateStateKey(SRowBuffPos* pPos, int64_t num) { static void streamFileStateDecode(TSKEY* pKey, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, pKey); } -static void streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { +static int32_t streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { *pLen = sizeof(TSKEY); (*pVal) = taosMemoryCalloc(1, *pLen); + if ((*pVal) == NULL) { + qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno)); + return terrno; + } void* buff = *pVal; int32_t tmp = taosEncodeFixedI64(&buff, *pKey); + return tmp; } int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, @@ -480,11 +493,10 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) { if (pFileState->curRowCount < pFileState->maxRowCount) { pBuff = taosMemoryCalloc(1, pFileState->rowSize); - if (pBuff) { - pPos->pRowBuff = pBuff; - pFileState->curRowCount++; - goto _end; - } + QUERY_CHECK_NULL(pBuff, code, lino, _error, errno); + pPos->pRowBuff = pBuff; + pFileState->curRowCount++; + goto _end; } code = clearRowBuff(pFileState); @@ -712,6 +724,8 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo } void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number); + QUERY_CHECK_NULL(pSKey, code, lino, _end, errno); + code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize, 0, buf); taosMemoryFreeClear(pSKey); @@ -738,7 +752,9 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo if (flushState) { void* valBuf = NULL; int32_t len = 0; - streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); + code = streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); + QUERY_CHECK_CODE(code, lino, _end); + qDebug("===stream===flushMark write:%" PRId64, pFileState->flushMark); code = streamStatePutBatch(pFileState->pFileStore, "default", batch, STREAM_STATE_INFO_NAME, valBuf, len, 0); taosMemoryFree(valBuf); diff --git a/source/util/src/tbloomfilter.c b/source/util/src/tbloomfilter.c index 841657e628..5bd358c202 100644 --- a/source/util/src/tbloomfilter.c +++ b/source/util/src/tbloomfilter.c @@ -177,6 +177,8 @@ int32_t tBloomFilterDecode(SDecoder* pDecoder, SBloomFilter** ppBF) { QUERY_CHECK_CODE(code, lino, _error); } pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t)); + QUERY_CHECK_NULL(pBF->buffer, code, lino, _error, errno); + for (int32_t i = 0; i < pBF->numUnits; i++) { uint64_t* pUnits = (uint64_t*)pBF->buffer; if (tDecodeU64(pDecoder, pUnits + i) < 0) { From 6a1cde28fb5959374cdc006184589db7589acfd1 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 18 Sep 2024 14:22:24 +0800 Subject: [PATCH 2/6] fix(stream):adjust file state res --- source/libs/stream/src/tstreamFileState.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index b3a868d625..89062aa6f6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -137,7 +137,7 @@ static int32_t streamFileStateEncode(TSKEY* pKey, void** pVal, int32_t* pLen) { } void* buff = *pVal; int32_t tmp = taosEncodeFixedI64(&buff, *pKey); - return tmp; + return TSDB_CODE_SUCCESS; } int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, From bb394021743b5648ffe193e73d4570e0c8d8e8de Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 18 Sep 2024 16:22:14 +0800 Subject: [PATCH 3/6] fix(stream):check null pointer --- source/libs/stream/src/streamSessionState.c | 9 +++++++++ source/libs/stream/src/streamState.c | 3 +++ 2 files changed, 12 insertions(+) diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index 5eb55467b1..a192f03947 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -81,6 +81,9 @@ bool inSessionWindow(SSessionKey* pKey, TSKEY ts, int64_t gap) { SStreamStateCur* createSessionStateCursor(SStreamFileState* pFileState) { SStreamStateCur* pCur = createStreamStateCursor(); + if (pCur == NULL) { + return NULL; + } pCur->pStreamFileState = pFileState; return pCur; } @@ -533,6 +536,9 @@ static SStreamStateCur* seekKeyCurrentPrev_buff(SStreamFileState* pFileState, co if (index >= 0) { pCur = createSessionStateCursor(pFileState); + if (pCur == NULL) { + return NULL; + } pCur->buffIndex = index; if (pIndex) { *pIndex = index; @@ -634,6 +640,9 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS pBuffCur->buffIndex = 0; } else if (taosArrayGetSize(pWinStates) > 0) { pBuffCur = createSessionStateCursor(pFileState); + if (pBuffCur == NULL) { + return NULL; + } pBuffCur->buffIndex = 0; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 0e2d31cc8f..e1edebc861 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -530,6 +530,9 @@ void streamStateCopyBackend(SStreamState* src, SStreamState* dst) { } SStreamStateCur* createStreamStateCursor() { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } pCur->buffIndex = -1; return pCur; } From 8ae4e54d65b1da26a8953ca0fb97e6c8f9aab1c0 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Wed, 18 Sep 2024 16:24:28 +0800 Subject: [PATCH 4/6] fix(stream):check null pointer --- source/libs/stream/src/tstreamFileState.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 89062aa6f6..e1c7294185 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -190,6 +190,7 @@ int32_t streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, pFileState->stateFunctionGetFn = getSessionRowBuff; } QUERY_CHECK_NULL(pFileState->rowStateBuff, code, lino, _error, terrno); + QUERY_CHECK_NULL(pFileState->cfName, code, lino, _error, terrno); pFileState->keyLen = keySize; pFileState->rowSize = rowSize; From 4321d7eb43ced3d5b947a252e3005988c3811cdf Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 19 Sep 2024 09:52:16 +0800 Subject: [PATCH 5/6] use terrno --- source/util/src/tbloomfilter.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/util/src/tbloomfilter.c b/source/util/src/tbloomfilter.c index 5bd358c202..0889017cde 100644 --- a/source/util/src/tbloomfilter.c +++ b/source/util/src/tbloomfilter.c @@ -177,7 +177,7 @@ int32_t tBloomFilterDecode(SDecoder* pDecoder, SBloomFilter** ppBF) { QUERY_CHECK_CODE(code, lino, _error); } pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t)); - QUERY_CHECK_NULL(pBF->buffer, code, lino, _error, errno); + QUERY_CHECK_NULL(pBF->buffer, code, lino, _error, terrno); for (int32_t i = 0; i < pBF->numUnits; i++) { uint64_t* pUnits = (uint64_t*)pBF->buffer; From 6225f7fa2ff8fffdee9e0f3bbb7d036ffee92c1f Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Thu, 19 Sep 2024 13:47:20 +0800 Subject: [PATCH 6/6] fix(stream):use terrno --- source/libs/stream/src/streamUpdate.c | 4 ++-- source/libs/stream/src/tstreamFileState.c | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 2acb0f88af..d5adbcee77 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -631,10 +631,10 @@ int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { if (tDecodeI8(&decoder, &pInfo->pkColType) < 0) return -1; pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pInfo->pkColLen); - QUERY_CHECK_NULL(pInfo->pKeyBuff, code, lino, _error, errno); + QUERY_CHECK_NULL(pInfo->pKeyBuff, code, lino, _error, terrno); pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen); - QUERY_CHECK_NULL(pInfo->pValueBuff, code, lino, _error, errno); + QUERY_CHECK_NULL(pInfo->pValueBuff, code, lino, _error, terrno); if (pInfo->pkColLen != 0) { pInfo->comparePkRowFn = compareKeyTsAndPk; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index e1c7294185..5626aa29da 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -494,7 +494,7 @@ SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) { if (pFileState->curRowCount < pFileState->maxRowCount) { pBuff = taosMemoryCalloc(1, pFileState->rowSize); - QUERY_CHECK_NULL(pBuff, code, lino, _error, errno); + QUERY_CHECK_NULL(pBuff, code, lino, _error, terrno); pPos->pRowBuff = pBuff; pFileState->curRowCount++; goto _end; @@ -725,7 +725,7 @@ void flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, boo } void* pSKey = pFileState->stateBuffCreateStateKeyFn(pPos, ((SStreamState*)pFileState->pFileStore)->number); - QUERY_CHECK_NULL(pSKey, code, lino, _end, errno); + QUERY_CHECK_NULL(pSKey, code, lino, _end, terrno); code = streamStatePutBatchOptimize(pFileState->pFileStore, idx, batch, pSKey, pPos->pRowBuff, pFileState->rowSize, 0, buf);