From 29d80b1c76641d1552c1b43fedacca8c88b237c2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 11 Apr 2023 14:04:39 +0000 Subject: [PATCH] fix invalid free --- source/libs/executor/src/timewindowoperator.c | 74 ++++++++-------- source/libs/stream/src/streamStateRocksdb.c | 1 - source/libs/stream/src/tstreamFileState.c | 86 +++++++++++++++---- 3 files changed, 104 insertions(+), 57 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a5e453378d..e7734f5a47 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ #include "executorimpl.h" -#include "tglobal.h" #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -21,6 +20,7 @@ #include "tcompare.h" #include "tdatablock.h" #include "tfill.h" +#include "tglobal.h" #include "tlog.h" #include "ttime.h" @@ -1367,12 +1367,12 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa } uint64_t winGpId = pGpDatas[i]; SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; - void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); + void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); if (chIds) { getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); continue; } - bool res = doDeleteWindow(pOperator, win.skey, winGpId); + bool res = doDeleteWindow(pOperator, win.skey, winGpId); if (pUpWins && res) { taosArrayPush(pUpWins, &winRes); } @@ -2096,12 +2096,11 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 } } -bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) { - return streamStateCheck(pState, pKey); -} +bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) { return streamStateCheck(pState, pKey); } int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SRowBuffPos** pResult, int64_t groupId, - SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) { + SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, + SAggSupporter* pAggSup) { SWinKey key = { .ts = win->skey, .groupId = groupId, @@ -2115,7 +2114,7 @@ int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SRowBuffPos *pResult = (SRowBuffPos*)value; SResultRow* res = (SResultRow*)((*pResult)->pRowBuff); // set time window for current result - res-> win = (*win); + res->win = (*win); setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } @@ -2148,21 +2147,20 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S continue; } if (num == 0) { - int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResPos, pWinRes->groupId, pSup->pCtx, numOfOutput, - pSup->rowEntryInfoOffset, &pInfo->aggSup); + int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResPos, pWinRes->groupId, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); ASSERT(pCurResPos != NULL); - pCurResult = (SResultRow*) pCurResPos->pRowBuff; + pCurResult = (SResultRow*)pCurResPos->pRowBuff; if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } - } num++; SRowBuffPos* pChResPos = NULL; SResultRow* pChResult = NULL; - setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResPos, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs, - pChildSup->rowEntryInfoOffset, &pChInfo->aggSup); - pChResult = (SResultRow*) pChResPos->pRowBuff; + setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResPos, pWinRes->groupId, pChildSup->pCtx, + pChildSup->numOfExprs, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup); + pChResult = (SResultRow*)pChResPos->pRowBuff; updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true); compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); } @@ -2412,7 +2410,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN return startPos; } -static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) { +static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) { pTaskInfo->streamInfo.dataVersion = version; pTaskInfo->streamInfo.checkPointId = ckId; } @@ -2493,8 +2491,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput, - pSup->rowEntryInfoOffset, &pInfo->aggSup); - pResult = (SResultRow*) pResPos->pRowBuff; + pSup->rowEntryInfoOffset, &pInfo->aggSup); + pResult = (SResultRow*)pResPos->pRowBuff; if (code != TSDB_CODE_SUCCESS || pResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } @@ -2506,8 +2504,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } SWinKey key = { - .ts = pResult->win.skey, - .groupId = groupId, + .ts = pResult->win.skey, + .groupId = groupId, }; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { saveWinResult(&key, pResPos, pUpdatedMap); @@ -2554,8 +2552,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) { SRowBuffPos* pos1 = *(SRowBuffPos**)pKey1; SRowBuffPos* pos2 = *(SRowBuffPos**)pKey2; - SWinKey* pWin1 = (SWinKey*)pos1->pKey; - SWinKey* pWin2 = (SWinKey*)pos2->pKey; + SWinKey* pWin1 = (SWinKey*)pos1->pKey; + SWinKey* pWin2 = (SWinKey*)pos2->pKey; if (pWin1->groupId > pWin2->groupId) { return 1; @@ -2757,7 +2755,7 @@ int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) { } TSKEY compareTs(void* pKey) { - SWinKey* pWinKey = (SWinKey*) pKey; + SWinKey* pWinKey = (SWinKey*)pKey; return pWinKey->ts; } @@ -2786,7 +2784,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, .deleteMarkSaved = 0, .calTriggerSaved = 0, .checkPointTs = 0, - .checkPointInterval = convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), + .checkPointInterval = + convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), }; ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; @@ -2869,8 +2868,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, compareTs, - pInfo->pState, pInfo->twAggSup.deleteMark); + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, + compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); pInfo->dataVersion = 0; pOperator->operatorType = pPhyNode->type; @@ -4969,23 +4968,24 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - pInfo->interval = (SInterval) { - .interval = pIntervalPhyNode->interval, - .sliding = pIntervalPhyNode->sliding, - .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, - .offset = pIntervalPhyNode->offset, - .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, + pInfo->interval = (SInterval){ + .interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, }; - pInfo->twAggSup = (STimeWindowAggSupp) { + pInfo->twAggSup = (STimeWindowAggSupp){ .waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, .deleteMark = getDeleteMark(pIntervalPhyNode), .checkPointTs = 0, - .checkPointInterval = convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), + .checkPointInterval = + convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), }; ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); @@ -5042,8 +5042,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, compareTs, - pInfo->pState, pInfo->twAggSup.deleteMark); + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, + compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 5e8c406697..02d3778baa 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -16,7 +16,6 @@ #include "rocksdb/c.h" #include "streamBackendRocksdb.h" #include "tcommon.h" -#include "tlog.h" int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { // diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index dedf2b114f..33097fec11 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -313,15 +313,23 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { return pFileState->usedBuffs; } -void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t len) { - pBuff = taosDecodeFixedI64(pBuff, &pFileState->flushMark); -} +// void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t len) { +// pBuff = taosDecodeFixedI64(pBuff, &pFileState->flushMark); +// } -void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) { +// void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) { +// *pLen = sizeof(TSKEY); +// (*pVal) = taosMemoryCalloc(1, *pLen); +// void* buff = *pVal; +// taosEncodeFixedI64(&buff, pFileState->flushMark); +// } +void streamFileStateDecode(TSKEY* key, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, key); } + +void streamFileStateEncode(TSKEY* key, void** pVal, int32_t* pLen) { *pLen = sizeof(TSKEY); (*pVal) = taosMemoryCalloc(1, *pLen); void* buff = *pVal; - taosEncodeFixedI64(&buff, pFileState->flushMark); + taosEncodeFixedI64(&buff, *key); } int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) { @@ -349,12 +357,26 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } if (flushState) { - int32_t len = 0; - void* buff = NULL; - streamFileStateEncode(pFileState, &buff, &len); - SWinKey key = {.ts = -1, .groupId = 0}; // dengyihao - streamStatePut_rocksdb(pFileState->pFileStore, &key, buff, len); - taosMemoryFree(buff); + const char* taskKey = "streamFileState"; + { + char keyBuf[128] = {0}; + void* valBuf = NULL; + int32_t len = 0; + sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId); + streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); + streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); + taosMemoryFree(valBuf); + } + { + char keyBuf[128] = {0}; + char valBuf[64] = {0}; + int32_t len = 0; + sprintf(keyBuf, "%s:%" PRId64 "", taskKey, INT64_MIN); + sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); + + streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, strlen(valBuf)); + } + streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); } streamStateDestroyBatch(batch); @@ -362,16 +384,42 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } int32_t recoverSnapshot(SStreamFileState* pFileState) { - int32_t code = TSDB_CODE_SUCCESS; - SWinKey stkey = {.ts = -1, .groupId = 0}; // dengyihao + int32_t code = TSDB_CODE_SUCCESS; + const char* taskKey = "streamFileState"; + int64_t maxCheckPointId = 0; + { + char buf[128] = {0}; + void* val = NULL; + int32_t len = 0; + sprintf(buf, "%s:%" PRId64 "", taskKey, INT64_MIN); + code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); + if (code != 0) { + return TSDB_CODE_FAILED; + } + sscanf(val, "%" PRId64 "", &maxCheckPointId); + taosMemoryFree(val); + } + for (int64_t i = maxCheckPointId; i > 0; i++) { + char buf[128] = {0}; + void* val = 0; + int32_t len = 0; + sprintf(buf, "%s:%" PRId64 "", taskKey, i); + code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); + if (code != 0) { + return TSDB_CODE_FAILED; + } + TSKEY ts; + sscanf(val, "%" PRId64 "", &ts); + taosMemoryFree(val); + if (ts < pFileState->flushMark) { + // forceRemoveCheckPoint(pFileState->pFileStore, i); + } else { + break; + } + } + void* pStVal = NULL; int32_t len = 0; - code = streamStateGet_rocksdb(pFileState->pFileStore, &stkey, &pStVal, &len); - if (code == TSDB_CODE_SUCCESS) { - streamFileStateDecode(pFileState, pStVal, len); - } else { - return TSDB_CODE_FAILED; - } SWinKey key = {.groupId = 0, .ts = 0}; SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key);