From cf3db4f1a6b911710240d6a59c5790a8a1cc35ae Mon Sep 17 00:00:00 2001 From: Jing Sima Date: Fri, 11 Oct 2024 08:43:05 +0800 Subject: [PATCH] fix:[TD-32184] fix heap use after free. --- source/libs/executor/src/executil.c | 35 +++++--------- .../executor/src/streamtimewindowoperator.c | 28 +++++++---- source/libs/stream/inc/streamBackendRocksdb.h | 2 +- source/libs/stream/src/streamBackendRocksdb.c | 47 ++++++++++++++++--- source/libs/stream/src/streamSessionState.c | 6 +-- source/libs/stream/src/streamState.c | 2 +- source/libs/stream/src/tstreamFileState.c | 5 +- 7 files changed, 77 insertions(+), 48 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 94b89ab64c..fd67468731 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -88,22 +88,20 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) { rowSize += pCtx[i].resDataInfo.interBufSize; } - rowSize += (numOfOutput * sizeof(bool)); - // expand rowSize to mark if col is null for top/bottom result(saveTupleData) return rowSize; } // Convert buf read from rocksdb to result row int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) { + if (inBuf == NULL || pSup == NULL) { + qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup); + return TSDB_CODE_INVALID_PARA; + } SqlFunctionCtx *pCtx = pSup->pCtx; int32_t *offset = pSup->rowEntryInfoOffset; SResultRow *pResultRow = NULL; size_t processedSize = 0; int32_t code = TSDB_CODE_SUCCESS; - if (inBuf == NULL) { - qError("invalid input buffer, inBuf:%p", inBuf); - return TSDB_CODE_INVALID_PARA; - } // calculate the size of output buffer *outBufSize = getResultRowSize(pCtx, pSup->numOfExprs); @@ -116,6 +114,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize (void)memcpy(pResultRow, inBuf, sizeof(SResultRow)); inBuf += sizeof(SResultRow); processedSize += sizeof(SResultRow); + for (int32_t i = 0; i < pSup->numOfExprs; ++i) { int32_t len = *(int32_t*)inBuf; inBuf += sizeof(int32_t); @@ -132,12 +131,6 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize inBuf += len; processedSize += len; } - void *pos = getResultEntryInfo(pResultRow, pSup->numOfExprs - 1, offset) + - sizeof(SResultRowEntryInfo) + - pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize; - (void)memcpy(pos, inBuf, pSup->numOfExprs * sizeof(bool)); - inBuf += pSup->numOfExprs * sizeof(bool); - processedSize += pSup->numOfExprs * sizeof(bool); if (processedSize < inBufSize) { // stream stores extra data after result row @@ -147,7 +140,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen); return terrno; } - (void)memcpy(*outBuf + processedSize, inBuf, leftLen); + (void)memcpy(*outBuf + *outBufSize, inBuf, leftLen); inBuf += leftLen; processedSize += leftLen; *outBufSize += leftLen; @@ -157,15 +150,16 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize // Convert result row to buf for rocksdb int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) { + if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) { + qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf); + return TSDB_CODE_INVALID_PARA; + } + SqlFunctionCtx *pCtx = pSup->pCtx; int32_t *offset = pSup->rowEntryInfoOffset; SResultRow *pResultRow = (SResultRow*)inBuf; size_t rowSize = getResultRowSize(pCtx, pSup->numOfExprs); - if (inBuf == NULL) { - qError("invalid input buffer, inBuf:%p", inBuf); - return TSDB_CODE_INVALID_PARA; - } if (rowSize > inBufSize) { qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize); return TSDB_CODE_INVALID_PARA; @@ -195,13 +189,6 @@ int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, pBuf += len; } - // mark if col is null for top/bottom result(saveTupleData) - void *pos = getResultEntryInfo(pResultRow, pSup->numOfExprs - 1, offset) + - sizeof(SResultRowEntryInfo) + - pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize; - - (void)memcpy(pBuf, pos, pSup->numOfExprs * sizeof(bool)); - if (rowSize < inBufSize) { // stream stores extra data after result row size_t leftLen = inBufSize - rowSize; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index fcd58fbe56..fc919dfe5f 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -2006,6 +2006,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN pInfo->stateStore = pTaskInfo->storageAPI.stateStore; int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols); pInfo->pState->pFileState = NULL; + + // used for backward compatibility of function's result info + pInfo->pState->pResultRowStore.resultRowGet = getResultRowFromBuf; + pInfo->pState->pResultRowStore.resultRowPut = putResultRowToBuf; + pInfo->pState->pExprSupp = &pOperator->exprSupp; + code = pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), @@ -2223,6 +2229,12 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); pSup->pState->pFileState = NULL; + + // used for backward compatibility of function's result info + pSup->pState->pResultRowStore.resultRowGet = getResultRowFromBuf; + pSup->pState->pResultRowStore.resultRowPut = putResultRowToBuf; + pSup->pState->pExprSupp = pExpSup; + code = pSup->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT, &pSup->pState->pFileState); @@ -2232,11 +2244,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in pSup->pResultRows = tSimpleHashInit(32, hashFn); QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno); - // used for backward compatibility of function's result info - pSup->pState->pResultRowStore.resultRowGet = getResultRowFromBuf; - pSup->pState->pResultRowStore.resultRowPut = putResultRowToBuf; - pSup->pState->pExprSupp = pExpSup; - for (int32_t i = 0; i < numOfOutput; ++i) { pExpSup->pCtx[i].saveHandle.pState = pSup->pState; } @@ -5390,17 +5397,18 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->pState->pFileState = NULL; - code = pTaskInfo->storageAPI.stateStore.streamFileStateInit( - tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH, - &pInfo->pState->pFileState); - QUERY_CHECK_CODE(code, lino, _error); // used for backward compatibility of function's result info pInfo->pState->pResultRowStore.resultRowGet = getResultRowFromBuf; pInfo->pState->pResultRowStore.resultRowPut = putResultRowToBuf; pInfo->pState->pExprSupp = &pOperator->exprSupp; + code = pTaskInfo->storageAPI.stateStore.streamFileStateInit( + tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH, + &pInfo->pState->pFileState); + QUERY_CHECK_CODE(code, lino, _error); + pInfo->pOperator = pOperator; setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index c4cf6a47cd..1e0801fb6b 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -169,7 +169,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key); int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); void streamStateCurPrev_rocksdb(SStreamStateCur* pCur); -int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); +int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen); SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key); SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key); SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index f91c26638a..c0bde10774 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -3319,7 +3319,9 @@ int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** *pVLen = tValLen; return code; } - code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + size_t pValLen = 0; + code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen); + *pVLen = (int32_t)pValLen; taosMemoryFree(tVal); return code; } @@ -3376,7 +3378,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) { } SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp); - code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0); + code = streamStateGetKVByCur_rocksdb(pState, pCur, key, NULL, 0); if (code != 0) { return code; } @@ -3420,7 +3422,8 @@ void streamStateCurPrev_rocksdb(SStreamStateCur* pCur) { rocksdb_iter_prev(pCur->iter); } } -int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { +int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, + int32_t* pVLen) { if (!pCur) return -1; SStateKey tkey; SStateKey* pKtmp = &tkey; @@ -3436,7 +3439,35 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons if (pVLen != NULL) { size_t vlen = 0; const char* valStr = rocksdb_iter_value(pCur->iter, &vlen); - *pVLen = valueDecode((void*)valStr, vlen, NULL, (char**)pVal); + char* val = NULL; + int32_t len = valueDecode((void*)valStr, vlen, NULL, (char**)val); + if (len <= 0) { + taosMemoryFree(val); + return -1; + } + + char* tVal = val; + size_t tVlen = len; + + if (pVal != NULL) { + if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) { + int code = + (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen); + if (code != 0) { + taosMemoryFree(val); + return code; + } + taosMemoryFree(val); + *pVal = (char*)tVal; + } else { + stInfo("streamStateGetKVByCur_rocksdb, pState = %p, pResultRowStore = %p, pExprSupp = %p", pState, + pState->pResultRowStore.resultRowGet, pState->pExprSupp); + *pVal = (char*)tVal; + } + } else { + taosMemoryFree(val); + } + *pVLen = (int32_t)tVlen; } *pKey = pKtmp->key; @@ -3598,7 +3629,9 @@ int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, v return code; } - code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen); + size_t pValLen = 0; + code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen); + *pVLen = (int32_t)pValLen; taosMemoryFree(tVal); return code; @@ -3933,7 +3966,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateC taosMemoryFree(val); } - if (pVLen != NULL) *pVLen = tVlen; + if (pVLen != NULL) *pVLen = (int32_t)tVlen; *pKey = pKTmp->key; return 0; @@ -4450,6 +4483,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_ void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); } int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key, void* val, int32_t vlen, int64_t ttl) { + int32_t code = 0; STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend; TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1)); @@ -4487,7 +4521,6 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) { dst = val; size = vlen; - return -1; } else { code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size); if (code != 0) { diff --git a/source/libs/stream/src/streamSessionState.c b/source/libs/stream/src/streamSessionState.c index bb8ea6c03c..536636533f 100644 --- a/source/libs/stream/src/streamSessionState.c +++ b/source/libs/stream/src/streamSessionState.c @@ -576,7 +576,7 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur) static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates, SStreamStateCur** ppCur) { SSessionKey key = {.groupId = groupId}; - int32_t code = streamStateSessionGetKVByCur_rocksdb(NULL, *ppCur, &key, NULL, NULL); + int32_t code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), *ppCur, &key, NULL, NULL); if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) { if (!(*ppCur)) { @@ -654,7 +654,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS SSessionKey key = {0}; void* pVal = NULL; int len = 0; - int32_t code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len); + int32_t code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &len); if (code == TSDB_CODE_FAILED) { streamStateFreeCur(pCur); return pBuffCur; @@ -711,7 +711,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void *pKey = *(SSessionKey*)(pPos->pKey); } else { void* pData = NULL; - code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, pKey, &pData, pVLen); + code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, &pData, pVLen); if (taosArrayGetSize(pWinStates) > 0 && (code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) { transformCursor(pCur->pStreamFileState, pCur); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 4c83f1b109..2e6a724912 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -302,7 +302,7 @@ SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) { } int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { - return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen); + return streamStateGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, pVal, pVLen); } int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) { diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 6a102743cd..424845e4f2 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -849,7 +849,7 @@ int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) { void* pVal = NULL; int32_t vlen = 0; SSessionKey key = {0}; - winRes = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &vlen); + winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen); if (winRes != TSDB_CODE_SUCCESS) { break; } @@ -903,7 +903,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { QUERY_CHECK_CODE(code, lino, _end); } - winCode = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen); + winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen); if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) { destroyRowBuffPos(pNewPos); SListNode* pNode = tdListPopTail(pFileState->usedBuffs); @@ -912,6 +912,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) { break; } if (vlen != pFileState->rowSize) { + qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen); code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR; QUERY_CHECK_CODE(code, lino, _end); }