fix:[TD-32184] fix heap use after free.
This commit is contained in:
parent
193220aa88
commit
cf3db4f1a6
|
@ -88,22 +88,20 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|||
rowSize += pCtx[i].resDataInfo.interBufSize;
|
||||
}
|
||||
|
||||
rowSize += (numOfOutput * sizeof(bool));
|
||||
// expand rowSize to mark if col is null for top/bottom result(saveTupleData)
|
||||
return rowSize;
|
||||
}
|
||||
|
||||
// Convert buf read from rocksdb to result row
|
||||
int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) {
|
||||
if (inBuf == NULL || pSup == NULL) {
|
||||
qError("invalid input parameters, inBuf:%p, pSup:%p", inBuf, pSup);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
SqlFunctionCtx *pCtx = pSup->pCtx;
|
||||
int32_t *offset = pSup->rowEntryInfoOffset;
|
||||
SResultRow *pResultRow = NULL;
|
||||
size_t processedSize = 0;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (inBuf == NULL) {
|
||||
qError("invalid input buffer, inBuf:%p", inBuf);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
// calculate the size of output buffer
|
||||
*outBufSize = getResultRowSize(pCtx, pSup->numOfExprs);
|
||||
|
@ -116,6 +114,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize
|
|||
(void)memcpy(pResultRow, inBuf, sizeof(SResultRow));
|
||||
inBuf += sizeof(SResultRow);
|
||||
processedSize += sizeof(SResultRow);
|
||||
|
||||
for (int32_t i = 0; i < pSup->numOfExprs; ++i) {
|
||||
int32_t len = *(int32_t*)inBuf;
|
||||
inBuf += sizeof(int32_t);
|
||||
|
@ -132,12 +131,6 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize
|
|||
inBuf += len;
|
||||
processedSize += len;
|
||||
}
|
||||
void *pos = getResultEntryInfo(pResultRow, pSup->numOfExprs - 1, offset) +
|
||||
sizeof(SResultRowEntryInfo) +
|
||||
pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize;
|
||||
(void)memcpy(pos, inBuf, pSup->numOfExprs * sizeof(bool));
|
||||
inBuf += pSup->numOfExprs * sizeof(bool);
|
||||
processedSize += pSup->numOfExprs * sizeof(bool);
|
||||
|
||||
if (processedSize < inBufSize) {
|
||||
// stream stores extra data after result row
|
||||
|
@ -147,7 +140,7 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize
|
|||
qError("failed to reallocate memory for output buffer, size:%zu", *outBufSize + leftLen);
|
||||
return terrno;
|
||||
}
|
||||
(void)memcpy(*outBuf + processedSize, inBuf, leftLen);
|
||||
(void)memcpy(*outBuf + *outBufSize, inBuf, leftLen);
|
||||
inBuf += leftLen;
|
||||
processedSize += leftLen;
|
||||
*outBufSize += leftLen;
|
||||
|
@ -157,15 +150,16 @@ int32_t getResultRowFromBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize
|
|||
|
||||
// Convert result row to buf for rocksdb
|
||||
int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize, char **outBuf, size_t *outBufSize) {
|
||||
if (pSup == NULL || inBuf == NULL || outBuf == NULL || outBufSize == NULL) {
|
||||
qError("invalid input parameters, inBuf:%p, pSup:%p, outBufSize:%p, outBuf:%p", inBuf, pSup, outBufSize, outBuf);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
SqlFunctionCtx *pCtx = pSup->pCtx;
|
||||
int32_t *offset = pSup->rowEntryInfoOffset;
|
||||
SResultRow *pResultRow = (SResultRow*)inBuf;
|
||||
size_t rowSize = getResultRowSize(pCtx, pSup->numOfExprs);
|
||||
|
||||
if (inBuf == NULL) {
|
||||
qError("invalid input buffer, inBuf:%p", inBuf);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
if (rowSize > inBufSize) {
|
||||
qError("invalid input buffer size, rowSize:%zu, inBufSize:%zu", rowSize, inBufSize);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
|
@ -195,13 +189,6 @@ int32_t putResultRowToBuf(SExprSupp *pSup, const char* inBuf, size_t inBufSize,
|
|||
pBuf += len;
|
||||
}
|
||||
|
||||
// mark if col is null for top/bottom result(saveTupleData)
|
||||
void *pos = getResultEntryInfo(pResultRow, pSup->numOfExprs - 1, offset) +
|
||||
sizeof(SResultRowEntryInfo) +
|
||||
pCtx[pSup->numOfExprs - 1].resDataInfo.interBufSize;
|
||||
|
||||
(void)memcpy(pBuf, pos, pSup->numOfExprs * sizeof(bool));
|
||||
|
||||
if (rowSize < inBufSize) {
|
||||
// stream stores extra data after result row
|
||||
size_t leftLen = inBufSize - rowSize;
|
||||
|
|
|
@ -2006,6 +2006,12 @@ int32_t createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiN
|
|||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols);
|
||||
pInfo->pState->pFileState = NULL;
|
||||
|
||||
// used for backward compatibility of function's result info
|
||||
pInfo->pState->pResultRowStore.resultRowGet = getResultRowFromBuf;
|
||||
pInfo->pState->pResultRowStore.resultRowPut = putResultRowToBuf;
|
||||
pInfo->pState->pExprSupp = &pOperator->exprSupp;
|
||||
|
||||
code =
|
||||
pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize,
|
||||
compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo),
|
||||
|
@ -2223,6 +2229,12 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
pSup->stateStore.streamStateSetNumber(pSup->pState, -1, tsIndex);
|
||||
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
|
||||
pSup->pState->pFileState = NULL;
|
||||
|
||||
// used for backward compatibility of function's result info
|
||||
pSup->pState->pResultRowStore.resultRowGet = getResultRowFromBuf;
|
||||
pSup->pState->pResultRowStore.resultRowPut = putResultRowToBuf;
|
||||
pSup->pState->pExprSupp = pExpSup;
|
||||
|
||||
code = pSup->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize,
|
||||
sesionTs, pSup->pState, pTwAggSup->deleteMark, taskIdStr,
|
||||
pHandle->checkpointId, STREAM_STATE_BUFF_SORT, &pSup->pState->pFileState);
|
||||
|
@ -2232,11 +2244,6 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
|
|||
pSup->pResultRows = tSimpleHashInit(32, hashFn);
|
||||
QUERY_CHECK_NULL(pSup->pResultRows, code, lino, _end, terrno);
|
||||
|
||||
// used for backward compatibility of function's result info
|
||||
pSup->pState->pResultRowStore.resultRowGet = getResultRowFromBuf;
|
||||
pSup->pState->pResultRowStore.resultRowPut = putResultRowToBuf;
|
||||
pSup->pState->pExprSupp = pExpSup;
|
||||
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pExpSup->pCtx[i].saveHandle.pState = pSup->pState;
|
||||
}
|
||||
|
@ -5390,17 +5397,18 @@ int32_t createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
|||
|
||||
pInfo->stateStore = pTaskInfo->storageAPI.stateStore;
|
||||
pInfo->pState->pFileState = NULL;
|
||||
code = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
|
||||
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
||||
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH,
|
||||
&pInfo->pState->pFileState);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
// used for backward compatibility of function's result info
|
||||
pInfo->pState->pResultRowStore.resultRowGet = getResultRowFromBuf;
|
||||
pInfo->pState->pResultRowStore.resultRowPut = putResultRowToBuf;
|
||||
pInfo->pState->pExprSupp = &pOperator->exprSupp;
|
||||
|
||||
code = pTaskInfo->storageAPI.stateStore.streamFileStateInit(
|
||||
tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState,
|
||||
pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId, STREAM_STATE_BUFF_HASH,
|
||||
&pInfo->pState->pFileState);
|
||||
QUERY_CHECK_CODE(code, lino, _error);
|
||||
|
||||
pInfo->pOperator = pOperator;
|
||||
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
|
|
|
@ -169,7 +169,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key);
|
|||
int32_t streamStateGetGroupKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateAddIfNotExist_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
|
||||
void streamStateCurPrev_rocksdb(SStreamStateCur* pCur);
|
||||
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen);
|
||||
SStreamStateCur* streamStateGetAndCheckCur_rocksdb(SStreamState* pState, SWinKey* key);
|
||||
SStreamStateCur* streamStateSeekKeyNext_rocksdb(SStreamState* pState, const SWinKey* key);
|
||||
SStreamStateCur* streamStateSeekToLast_rocksdb(SStreamState* pState);
|
||||
|
|
|
@ -3319,7 +3319,9 @@ int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void**
|
|||
*pVLen = tValLen;
|
||||
return code;
|
||||
}
|
||||
code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen);
|
||||
size_t pValLen = 0;
|
||||
code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen);
|
||||
*pVLen = (int32_t)pValLen;
|
||||
taosMemoryFree(tVal);
|
||||
return code;
|
||||
}
|
||||
|
@ -3376,7 +3378,7 @@ int32_t streamStateGetFirst_rocksdb(SStreamState* pState, SWinKey* key) {
|
|||
}
|
||||
|
||||
SStreamStateCur* pCur = streamStateSeekKeyNext_rocksdb(pState, &tmp);
|
||||
code = streamStateGetKVByCur_rocksdb(pCur, key, NULL, 0);
|
||||
code = streamStateGetKVByCur_rocksdb(pState, pCur, key, NULL, 0);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
|
@ -3420,7 +3422,8 @@ void streamStateCurPrev_rocksdb(SStreamStateCur* pCur) {
|
|||
rocksdb_iter_prev(pCur->iter);
|
||||
}
|
||||
}
|
||||
int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||
int32_t streamStateGetKVByCur_rocksdb(SStreamState* pState, SStreamStateCur* pCur, SWinKey* pKey, const void** pVal,
|
||||
int32_t* pVLen) {
|
||||
if (!pCur) return -1;
|
||||
SStateKey tkey;
|
||||
SStateKey* pKtmp = &tkey;
|
||||
|
@ -3436,7 +3439,35 @@ int32_t streamStateGetKVByCur_rocksdb(SStreamStateCur* pCur, SWinKey* pKey, cons
|
|||
if (pVLen != NULL) {
|
||||
size_t vlen = 0;
|
||||
const char* valStr = rocksdb_iter_value(pCur->iter, &vlen);
|
||||
*pVLen = valueDecode((void*)valStr, vlen, NULL, (char**)pVal);
|
||||
char* val = NULL;
|
||||
int32_t len = valueDecode((void*)valStr, vlen, NULL, (char**)val);
|
||||
if (len <= 0) {
|
||||
taosMemoryFree(val);
|
||||
return -1;
|
||||
}
|
||||
|
||||
char* tVal = val;
|
||||
size_t tVlen = len;
|
||||
|
||||
if (pVal != NULL) {
|
||||
if (pState != NULL && pState->pResultRowStore.resultRowGet != NULL && pState->pExprSupp != NULL) {
|
||||
int code =
|
||||
(pState->pResultRowStore.resultRowGet)(pState->pExprSupp, val, len, (char**)&tVal, (size_t*)&tVlen);
|
||||
if (code != 0) {
|
||||
taosMemoryFree(val);
|
||||
return code;
|
||||
}
|
||||
taosMemoryFree(val);
|
||||
*pVal = (char*)tVal;
|
||||
} else {
|
||||
stInfo("streamStateGetKVByCur_rocksdb, pState = %p, pResultRowStore = %p, pExprSupp = %p", pState,
|
||||
pState->pResultRowStore.resultRowGet, pState->pExprSupp);
|
||||
*pVal = (char*)tVal;
|
||||
}
|
||||
} else {
|
||||
taosMemoryFree(val);
|
||||
}
|
||||
*pVLen = (int32_t)tVlen;
|
||||
}
|
||||
|
||||
*pKey = pKtmp->key;
|
||||
|
@ -3598,7 +3629,9 @@ int32_t streamStateFuncGet_rocksdb(SStreamState* pState, const STupleKey* key, v
|
|||
return code;
|
||||
}
|
||||
|
||||
code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, (size_t*)pVLen);
|
||||
size_t pValLen = 0;
|
||||
code = (pState->pResultRowStore.resultRowGet)(pState->pExprSupp, tVal, tValLen, (char**)pVal, &pValLen);
|
||||
*pVLen = (int32_t)pValLen;
|
||||
|
||||
taosMemoryFree(tVal);
|
||||
return code;
|
||||
|
@ -3933,7 +3966,7 @@ int32_t streamStateSessionGetKVByCur_rocksdb(SStreamState* pState, SStreamStateC
|
|||
taosMemoryFree(val);
|
||||
}
|
||||
|
||||
if (pVLen != NULL) *pVLen = tVlen;
|
||||
if (pVLen != NULL) *pVLen = (int32_t)tVlen;
|
||||
|
||||
*pKey = pKTmp->key;
|
||||
return 0;
|
||||
|
@ -4450,6 +4483,7 @@ void streamStateClearBatch(void* pBatch) { rocksdb_writebatch_clear((rocksdb_
|
|||
void streamStateDestroyBatch(void* pBatch) { rocksdb_writebatch_destroy((rocksdb_writebatch_t*)pBatch); }
|
||||
int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb_writebatch_t* pBatch, void* key,
|
||||
void* val, int32_t vlen, int64_t ttl) {
|
||||
int32_t code = 0;
|
||||
STaskDbWrapper* wrapper = pState->pTdbState->pOwner->pBackend;
|
||||
TAOS_UNUSED(atomic_add_fetch_64(&wrapper->dataWritten, 1));
|
||||
|
||||
|
@ -4487,7 +4521,6 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
|||
if (pState->pResultRowStore.resultRowPut == NULL || pState->pExprSupp == NULL) {
|
||||
dst = val;
|
||||
size = vlen;
|
||||
return -1;
|
||||
} else {
|
||||
code = (pState->pResultRowStore.resultRowPut)(pState->pExprSupp, val, vlen, &dst, &size);
|
||||
if (code != 0) {
|
||||
|
|
|
@ -576,7 +576,7 @@ static void transformCursor(SStreamFileState* pFileState, SStreamStateCur* pCur)
|
|||
static void checkAndTransformCursor(SStreamFileState* pFileState, const uint64_t groupId, SArray* pWinStates,
|
||||
SStreamStateCur** ppCur) {
|
||||
SSessionKey key = {.groupId = groupId};
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(NULL, *ppCur, &key, NULL, NULL);
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), *ppCur, &key, NULL, NULL);
|
||||
if (taosArrayGetSize(pWinStates) > 0 &&
|
||||
(code == TSDB_CODE_FAILED || sessionStateKeyCompare(&key, pWinStates, 0) >= 0)) {
|
||||
if (!(*ppCur)) {
|
||||
|
@ -654,7 +654,7 @@ SStreamStateCur* countWinStateSeekKeyPrev(SStreamFileState* pFileState, const SS
|
|||
SSessionKey key = {0};
|
||||
void* pVal = NULL;
|
||||
int len = 0;
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &len);
|
||||
int32_t code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &len);
|
||||
if (code == TSDB_CODE_FAILED) {
|
||||
streamStateFreeCur(pCur);
|
||||
return pBuffCur;
|
||||
|
@ -711,7 +711,7 @@ int32_t sessionWinStateGetKVByCur(SStreamStateCur* pCur, SSessionKey* pKey, void
|
|||
*pKey = *(SSessionKey*)(pPos->pKey);
|
||||
} else {
|
||||
void* pData = NULL;
|
||||
code = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, pKey, &pData, pVLen);
|
||||
code = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, &pData, pVLen);
|
||||
if (taosArrayGetSize(pWinStates) > 0 &&
|
||||
(code == TSDB_CODE_FAILED || sessionStateRangeKeyCompare(pKey, pWinStates, 0) >= 0)) {
|
||||
transformCursor(pCur->pStreamFileState, pCur);
|
||||
|
|
|
@ -302,7 +302,7 @@ SStreamStateCur* streamStateGetAndCheckCur(SStreamState* pState, SWinKey* key) {
|
|||
}
|
||||
|
||||
int32_t streamStateGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||
return streamStateGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
|
||||
return streamStateGetKVByCur_rocksdb(getStateFileStore(pCur->pStreamFileState), pCur, pKey, pVal, pVLen);
|
||||
}
|
||||
|
||||
int32_t streamStateFillGetKVByCur(SStreamStateCur* pCur, SWinKey* pKey, const void** pVal, int32_t* pVLen) {
|
||||
|
|
|
@ -849,7 +849,7 @@ int32_t recoverSesssion(SStreamFileState* pFileState, int64_t ckId) {
|
|||
void* pVal = NULL;
|
||||
int32_t vlen = 0;
|
||||
SSessionKey key = {0};
|
||||
winRes = streamStateSessionGetKVByCur_rocksdb(NULL, pCur, &key, &pVal, &vlen);
|
||||
winRes = streamStateSessionGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, &key, &pVal, &vlen);
|
||||
if (winRes != TSDB_CODE_SUCCESS) {
|
||||
break;
|
||||
}
|
||||
|
@ -903,7 +903,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
|||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
||||
winCode = streamStateGetKVByCur_rocksdb(pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
|
||||
winCode = streamStateGetKVByCur_rocksdb(getStateFileStore(pFileState), pCur, pNewPos->pKey, (const void**)&pVal, &vlen);
|
||||
if (winCode != TSDB_CODE_SUCCESS || pFileState->getTs(pNewPos->pKey) < pFileState->flushMark) {
|
||||
destroyRowBuffPos(pNewPos);
|
||||
SListNode* pNode = tdListPopTail(pFileState->usedBuffs);
|
||||
|
@ -912,6 +912,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState, int64_t ckId) {
|
|||
break;
|
||||
}
|
||||
if (vlen != pFileState->rowSize) {
|
||||
qError("row size mismatch, expect:%d, actual:%d", pFileState->rowSize, vlen);
|
||||
code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
|
||||
QUERY_CHECK_CODE(code, lino, _end);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue