From 743ed82daa9a99e36a348d8dac1d012cac8209b5 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 11 Apr 2023 11:55:47 +0000 Subject: [PATCH 01/18] fix invalid free --- source/libs/stream/src/streamStateRocksdb.c | 23 +++++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index e700e8b97a..136e54fd94 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -297,6 +297,8 @@ const char* compareFuncKeyName(void* name); const char* compareParKeyName(void* name); const char* comparePartagKeyName(void* name); +void destroyFunc(void* stata) { return; } + typedef struct { const char* key; int32_t len; @@ -312,14 +314,19 @@ typedef struct { SCfInit ginitDict[] = { {"default", strlen("default"), 0, defaultKeyComp, defaultKeyEncode, defaultKeyDecode, defaultKeyToString, - compareDefaultName}, - {"state", strlen("state"), 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName}, - {"fill", strlen("fill"), 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName}, + compareDefaultName, destroyFunc}, + {"state", strlen("state"), 1, stateKeyDBComp, stateKeyEncode, stateKeyDecode, stateKeyToString, compareStateName, + destroyFunc}, + {"fill", strlen("fill"), 2, winKeyDBComp, winKeyEncode, winKeyDecode, winKeyToString, compareWinKeyName, + destroyFunc}, {"sess", strlen("sess"), 3, stateSessionKeyDBComp, stateSessionKeyEncode, stateSessionKeyDecode, - stateSessionKeyToString, compareSessionKeyName}, - {"func", strlen("func"), 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName}, - {"parname", strlen("parname"), 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName}, - {"partag", strlen("partag"), 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName}, + stateSessionKeyToString, compareSessionKeyName, destroyFunc}, + {"func", strlen("func"), 4, tupleKeyDBComp, tupleKeyEncode, tupleKeyDecode, tupleKeyToString, compareFuncKeyName, + destroyFunc}, + {"parname", strlen("parname"), 5, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, compareParKeyName, + destroyFunc}, + {"partag", strlen("partag"), 6, parKeyDBComp, parKeyEncode, parKeyDecode, parKeyToString, comparePartagKeyName, + destroyFunc}, }; const char* compareDefaultName(void* name) { return ginitDict[0].key; } @@ -330,8 +337,6 @@ const char* compareFuncKeyName(void* name) { return ginitDict[4].key; } const char* compareParKeyName(void* name) { return ginitDict[5].key; } const char* comparePartagKeyName(void* name) { return ginitDict[6].key; } -void destroyFunc(void* stata) { return; } - int streamInitBackend(SStreamState* pState, char* path) { rocksdb_env_t* env = rocksdb_create_default_env(); // rocksdb_envoptions_create(); rocksdb_env_set_low_priority_background_threads(env, 15); From a3836b2363958598b0373403c4c1514fc3b9e704 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 11 Apr 2023 12:52:03 +0000 Subject: [PATCH 02/18] fix invalid free --- include/libs/stream/streamState.h | 2 ++ source/libs/stream/src/streamStateRocksdb.c | 18 +++++++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 6103262e17..7c4aab2491 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -38,6 +38,8 @@ typedef struct STdbState { rocksdb_comparator_t** pCompare; rocksdb_options_t* dbOpt; struct SStreamTask* pOwner; + void* param; + void* env; TDB* db; TTB* pStateDb; diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 136e54fd94..5e8c406697 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#include "rocksdb/c.h" #include "streamBackendRocksdb.h" #include "tcommon.h" #include "tlog.h" @@ -280,6 +281,10 @@ int parKeyToString(void* k, char* buf) { return n; } +typedef struct { + void* tableOpt; + void* lru; // global or not +} rocksdbCfParam; const char* cfName[] = {"default", "state", "fill", "sess", "func", "parname", "partag"}; typedef int (*EncodeFunc)(void* key, char* buf); @@ -354,6 +359,7 @@ int streamInitBackend(SStreamState* pState, char* path) { char* err = NULL; int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + rocksdbCfParam* param = taosMemoryCalloc(cfLen, sizeof(rocksdbCfParam)); const rocksdb_options_t** cfOpt = taosMemoryCalloc(cfLen, sizeof(rocksdb_options_t*)); for (int i = 0; i < cfLen; i++) { cfOpt[i] = rocksdb_options_create_copy(opts); @@ -367,6 +373,8 @@ int streamInitBackend(SStreamState* pState, char* path) { rocksdb_options_set_block_based_table_factory((rocksdb_options_t*)cfOpt[i], tableOpt); + param[i].tableOpt = tableOpt; + param[i].lru = cache; // rocksdb_slicetransform_t* trans = rocksdb_slicetransform_create_fixed_prefix(8); // rocksdb_options_set_prefix_extractor((rocksdb_options_t*)cfOpt[i], trans); }; @@ -391,6 +399,8 @@ int streamInitBackend(SStreamState* pState, char* path) { pState->pTdbState->cfOpts = (rocksdb_options_t**)cfOpt; pState->pTdbState->pCompare = pCompare; pState->pTdbState->dbOpt = opts; + pState->pTdbState->param = param; + pState->pTdbState->env = env; return 0; } void streamCleanBackend(SStreamState* pState) { @@ -398,12 +408,17 @@ void streamCleanBackend(SStreamState* pState) { qInfo("rocksdb already free"); return; } - int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + int cfLen = sizeof(ginitDict) / sizeof(ginitDict[0]); + rocksdbCfParam* param = pState->pTdbState->param; for (int i = 0; i < cfLen; i++) { rocksdb_column_family_handle_destroy(pState->pTdbState->pHandle[i]); rocksdb_options_destroy(pState->pTdbState->cfOpts[i]); rocksdb_comparator_destroy(pState->pTdbState->pCompare[i]); + + rocksdb_cache_destroy(param[i].lru); + rocksdb_block_based_options_destroy(param[i].tableOpt); } + taosMemoryFree(pState->pTdbState->param); rocksdb_options_destroy(pState->pTdbState->dbOpt); taosMemoryFreeClear(pState->pTdbState->pHandle); @@ -417,6 +432,7 @@ void streamCleanBackend(SStreamState* pState) { pState->pTdbState->readOpts = NULL; rocksdb_close(pState->pTdbState->rocksdb); + rocksdb_env_destroy(pState->pTdbState->env); pState->pTdbState->rocksdb = NULL; } From 29d80b1c76641d1552c1b43fedacca8c88b237c2 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 11 Apr 2023 14:04:39 +0000 Subject: [PATCH 03/18] fix invalid free --- source/libs/executor/src/timewindowoperator.c | 74 ++++++++-------- source/libs/stream/src/streamStateRocksdb.c | 1 - source/libs/stream/src/tstreamFileState.c | 86 +++++++++++++++---- 3 files changed, 104 insertions(+), 57 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index a5e453378d..e7734f5a47 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ #include "executorimpl.h" -#include "tglobal.h" #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -21,6 +20,7 @@ #include "tcompare.h" #include "tdatablock.h" #include "tfill.h" +#include "tglobal.h" #include "tlog.h" #include "ttime.h" @@ -1367,12 +1367,12 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa } uint64_t winGpId = pGpDatas[i]; SWinKey winRes = {.ts = win.skey, .groupId = winGpId}; - void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); + void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); if (chIds) { getNextTimeWindow(pInterval, pInterval->precision, TSDB_ORDER_ASC, &win); continue; } - bool res = doDeleteWindow(pOperator, win.skey, winGpId); + bool res = doDeleteWindow(pOperator, win.skey, winGpId); if (pUpWins && res) { taosArrayPush(pUpWins, &winRes); } @@ -2096,12 +2096,11 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3 } } -bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) { - return streamStateCheck(pState, pKey); -} +bool hasIntervalWindow(SStreamState* pState, SWinKey* pKey) { return streamStateCheck(pState, pKey); } int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SRowBuffPos** pResult, int64_t groupId, - SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) { + SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, + SAggSupporter* pAggSup) { SWinKey key = { .ts = win->skey, .groupId = groupId, @@ -2115,7 +2114,7 @@ int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SRowBuffPos *pResult = (SRowBuffPos*)value; SResultRow* res = (SResultRow*)((*pResult)->pRowBuff); // set time window for current result - res-> win = (*win); + res->win = (*win); setResultRowInitCtx(res, pCtx, numOfOutput, rowEntryInfoOffset); return TSDB_CODE_SUCCESS; } @@ -2148,21 +2147,20 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S continue; } if (num == 0) { - int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResPos, pWinRes->groupId, pSup->pCtx, numOfOutput, - pSup->rowEntryInfoOffset, &pInfo->aggSup); + int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResPos, pWinRes->groupId, pSup->pCtx, + numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); ASSERT(pCurResPos != NULL); - pCurResult = (SResultRow*) pCurResPos->pRowBuff; + pCurResult = (SResultRow*)pCurResPos->pRowBuff; if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } - } num++; SRowBuffPos* pChResPos = NULL; SResultRow* pChResult = NULL; - setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResPos, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs, - pChildSup->rowEntryInfoOffset, &pChInfo->aggSup); - pChResult = (SResultRow*) pChResPos->pRowBuff; + setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResPos, pWinRes->groupId, pChildSup->pCtx, + pChildSup->numOfExprs, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup); + pChResult = (SResultRow*)pChResPos->pRowBuff; updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true); compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); } @@ -2412,7 +2410,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN return startPos; } -static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) { +static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) { pTaskInfo->streamInfo.dataVersion = version; pTaskInfo->streamInfo.checkPointId = ckId; } @@ -2493,8 +2491,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput, - pSup->rowEntryInfoOffset, &pInfo->aggSup); - pResult = (SResultRow*) pResPos->pRowBuff; + pSup->rowEntryInfoOffset, &pInfo->aggSup); + pResult = (SResultRow*)pResPos->pRowBuff; if (code != TSDB_CODE_SUCCESS || pResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); } @@ -2506,8 +2504,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } SWinKey key = { - .ts = pResult->win.skey, - .groupId = groupId, + .ts = pResult->win.skey, + .groupId = groupId, }; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { saveWinResult(&key, pResPos, pUpdatedMap); @@ -2554,8 +2552,8 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p static inline int winPosCmprImpl(const void* pKey1, const void* pKey2) { SRowBuffPos* pos1 = *(SRowBuffPos**)pKey1; SRowBuffPos* pos2 = *(SRowBuffPos**)pKey2; - SWinKey* pWin1 = (SWinKey*)pos1->pKey; - SWinKey* pWin2 = (SWinKey*)pos2->pKey; + SWinKey* pWin1 = (SWinKey*)pos1->pKey; + SWinKey* pWin2 = (SWinKey*)pos2->pKey; if (pWin1->groupId > pWin2->groupId) { return 1; @@ -2757,7 +2755,7 @@ int64_t getDeleteMark(SIntervalPhysiNode* pIntervalPhyNode) { } TSKEY compareTs(void* pKey) { - SWinKey* pWinKey = (SWinKey*) pKey; + SWinKey* pWinKey = (SWinKey*)pKey; return pWinKey->ts; } @@ -2786,7 +2784,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, .deleteMarkSaved = 0, .calTriggerSaved = 0, .checkPointTs = 0, - .checkPointInterval = convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), + .checkPointInterval = + convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), }; ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; @@ -2869,8 +2868,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, compareTs, - pInfo->pState, pInfo->twAggSup.deleteMark); + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, + compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); pInfo->dataVersion = 0; pOperator->operatorType = pPhyNode->type; @@ -4969,23 +4968,24 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); - pInfo->interval = (SInterval) { - .interval = pIntervalPhyNode->interval, - .sliding = pIntervalPhyNode->sliding, - .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, - .offset = pIntervalPhyNode->offset, - .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, + pInfo->interval = (SInterval){ + .interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, + .intervalUnit = pIntervalPhyNode->intervalUnit, + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset, + .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, }; - pInfo->twAggSup = (STimeWindowAggSupp) { + pInfo->twAggSup = (STimeWindowAggSupp){ .waterMark = pIntervalPhyNode->window.watermark, .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, .deleteMark = getDeleteMark(pIntervalPhyNode), .checkPointTs = 0, - .checkPointInterval = convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), + .checkPointInterval = + convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), }; ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); @@ -5042,8 +5042,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, compareTs, - pInfo->pState, pInfo->twAggSup.deleteMark); + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, + compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 5e8c406697..02d3778baa 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -16,7 +16,6 @@ #include "rocksdb/c.h" #include "streamBackendRocksdb.h" #include "tcommon.h" -#include "tlog.h" int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { // diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index dedf2b114f..33097fec11 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -313,15 +313,23 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { return pFileState->usedBuffs; } -void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t len) { - pBuff = taosDecodeFixedI64(pBuff, &pFileState->flushMark); -} +// void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t len) { +// pBuff = taosDecodeFixedI64(pBuff, &pFileState->flushMark); +// } -void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) { +// void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) { +// *pLen = sizeof(TSKEY); +// (*pVal) = taosMemoryCalloc(1, *pLen); +// void* buff = *pVal; +// taosEncodeFixedI64(&buff, pFileState->flushMark); +// } +void streamFileStateDecode(TSKEY* key, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, key); } + +void streamFileStateEncode(TSKEY* key, void** pVal, int32_t* pLen) { *pLen = sizeof(TSKEY); (*pVal) = taosMemoryCalloc(1, *pLen); void* buff = *pVal; - taosEncodeFixedI64(&buff, pFileState->flushMark); + taosEncodeFixedI64(&buff, *key); } int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState) { @@ -349,12 +357,26 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } if (flushState) { - int32_t len = 0; - void* buff = NULL; - streamFileStateEncode(pFileState, &buff, &len); - SWinKey key = {.ts = -1, .groupId = 0}; // dengyihao - streamStatePut_rocksdb(pFileState->pFileStore, &key, buff, len); - taosMemoryFree(buff); + const char* taskKey = "streamFileState"; + { + char keyBuf[128] = {0}; + void* valBuf = NULL; + int32_t len = 0; + sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId); + streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); + streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); + taosMemoryFree(valBuf); + } + { + char keyBuf[128] = {0}; + char valBuf[64] = {0}; + int32_t len = 0; + sprintf(keyBuf, "%s:%" PRId64 "", taskKey, INT64_MIN); + sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); + + streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, strlen(valBuf)); + } + streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); } streamStateDestroyBatch(batch); @@ -362,16 +384,42 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, } int32_t recoverSnapshot(SStreamFileState* pFileState) { - int32_t code = TSDB_CODE_SUCCESS; - SWinKey stkey = {.ts = -1, .groupId = 0}; // dengyihao + int32_t code = TSDB_CODE_SUCCESS; + const char* taskKey = "streamFileState"; + int64_t maxCheckPointId = 0; + { + char buf[128] = {0}; + void* val = NULL; + int32_t len = 0; + sprintf(buf, "%s:%" PRId64 "", taskKey, INT64_MIN); + code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); + if (code != 0) { + return TSDB_CODE_FAILED; + } + sscanf(val, "%" PRId64 "", &maxCheckPointId); + taosMemoryFree(val); + } + for (int64_t i = maxCheckPointId; i > 0; i++) { + char buf[128] = {0}; + void* val = 0; + int32_t len = 0; + sprintf(buf, "%s:%" PRId64 "", taskKey, i); + code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); + if (code != 0) { + return TSDB_CODE_FAILED; + } + TSKEY ts; + sscanf(val, "%" PRId64 "", &ts); + taosMemoryFree(val); + if (ts < pFileState->flushMark) { + // forceRemoveCheckPoint(pFileState->pFileStore, i); + } else { + break; + } + } + void* pStVal = NULL; int32_t len = 0; - code = streamStateGet_rocksdb(pFileState->pFileStore, &stkey, &pStVal, &len); - if (code == TSDB_CODE_SUCCESS) { - streamFileStateDecode(pFileState, pStVal, len); - } else { - return TSDB_CODE_FAILED; - } SWinKey key = {.groupId = 0, .ts = 0}; SStreamStateCur* pCur = streamStateGetCur_rocksdb(pFileState->pFileStore, &key); From a8ffb82370e51144712ee62fe815d1cb0a59b0a6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 11 Apr 2023 14:13:35 +0000 Subject: [PATCH 04/18] fix invalid free --- source/libs/stream/src/tstreamFileState.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 33097fec11..9602f1eb1c 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -399,7 +399,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { sscanf(val, "%" PRId64 "", &maxCheckPointId); taosMemoryFree(val); } - for (int64_t i = maxCheckPointId; i > 0; i++) { + for (int64_t i = maxCheckPointId; i > 0; i--) { char buf[128] = {0}; void* val = 0; int32_t len = 0; From 7db12bb31732817ea6e2920817f0dc6b4856c7fa Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 11 Apr 2023 14:21:16 +0000 Subject: [PATCH 05/18] fix invalid free --- source/libs/stream/src/tstreamFileState.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 9602f1eb1c..08c8c16079 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -413,8 +413,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { taosMemoryFree(val); if (ts < pFileState->flushMark) { // forceRemoveCheckPoint(pFileState->pFileStore, i); - } else { break; + } else { } } From ed4aaade8ce4845064f5a96f75ba5998236decce Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 12 Apr 2023 03:03:51 +0000 Subject: [PATCH 06/18] add backend --- include/libs/stream/tstreamFileState.h | 23 +++++++----- source/libs/stream/inc/streamBackendRocksdb.h | 1 + source/libs/stream/src/streamStateRocksdb.c | 36 +++++++++++++++++-- source/libs/stream/src/tstreamFileState.c | 17 ++++++--- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 1962e41b6c..4f2618f8dc 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -18,6 +18,7 @@ #include "os.h" +#include "tarray.h" #include "tdef.h" #include "tlist.h" @@ -27,29 +28,33 @@ extern "C" { typedef struct SStreamFileState SStreamFileState; typedef struct SRowBuffPos { - void* pRowBuff; - void* pKey; - bool beFlushed; - bool beUsed; + void* pRowBuff; + void* pKey; + bool beFlushed; + bool beUsed; } SRowBuffPos; typedef SList SStreamSnapshot; typedef TSKEY (*GetTsFun)(void*); -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, TSKEY delMark); +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, + TSKEY delMark); void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState); int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); int32_t deleteRowBuff(SStreamFileState* pFileState, const void* pKey, int32_t keyLen); int32_t getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos, void** pVal); -void releaseRowBuffPos(SRowBuffPos* pBuff); -bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); +void releaseRowBuffPos(SRowBuffPos* pBuff); +bool hasRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen); SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); -int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); -int32_t recoverSnapshot(SStreamFileState* pFileState); +int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, bool flushState); +int32_t recoverSnapshot(SStreamFileState* pFileState); + +int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); +int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId); #ifdef __cplusplus } diff --git a/source/libs/stream/inc/streamBackendRocksdb.h b/source/libs/stream/inc/streamBackendRocksdb.h index a06aef365f..a33a0a577b 100644 --- a/source/libs/stream/inc/streamBackendRocksdb.h +++ b/source/libs/stream/inc/streamBackendRocksdb.h @@ -89,4 +89,5 @@ int32_t streamStatePutBatch_rocksdb(SStreamState* pState, void* pBatch); int32_t streamDefaultPut_rocksdb(SStreamState* pState, const void* key, void* pVal, int32_t pVLen); int32_t streamDefaultGet_rocksdb(SStreamState* pState, const void* key, void** pVal, int32_t* pVLen); int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key); +int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result); #endif \ No newline at end of file diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index 02d3778baa..fb5ad34d0d 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -458,9 +458,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa rocksdb_readoptions_t** readOpt) { int idx = streamGetInit(cfName); - //*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); if (snapshot != NULL) { - *snapshot = NULL; + *snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb); } rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create(); @@ -638,6 +637,39 @@ int32_t streamDefaultDel_rocksdb(SStreamState* pState, const void* key) { return code; } +int32_t streamDefaultIter_rocksdb(SStreamState* pState, const void* start, const void* end, SArray* result) { + int code = 0; + char* err = NULL; + + rocksdb_snapshot_t* snapshot = NULL; + rocksdb_readoptions_t* readopts = NULL; + rocksdb_iterator_t* pIter = streamStateIterCreate(pState, "default", &snapshot, &readopts); + if (pIter == NULL) { + return -1; + } + + rocksdb_iter_seek(pIter, start, strlen(start)); + while (rocksdb_iter_valid(pIter)) { + const char* key = rocksdb_iter_key(pIter, NULL); + if (end != NULL && strcmp(key, end) > 0) { + break; + } + if (strncmp(key, start, strlen(start)) == 0 && strlen(key) >= strlen(start) + 1) { + int64_t checkPoint = 0; + if (sscanf(key + strlen(key), ":%" PRId64 "", &checkPoint) == 1) { + taosArrayPush(result, &checkPoint); + } + } else { + break; + } + rocksdb_iter_next(pIter); + } + rocksdb_release_snapshot(pState->pTdbState->rocksdb, snapshot); + rocksdb_readoptions_destroy(readopts); + rocksdb_iter_destroy(pIter); + return code; +} + int32_t streamStateGet_rocksdb(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { int code = 0; SStateKey sKey = {.key = *key, .opNum = pState->number}; diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 08c8c16079..ddd8e148ff 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -371,7 +371,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, char keyBuf[128] = {0}; char valBuf[64] = {0}; int32_t len = 0; - sprintf(keyBuf, "%s:%" PRId64 "", taskKey, INT64_MIN); + memcpy(keyBuf, taskKey, strlen(taskKey)); sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, strlen(valBuf)); @@ -382,7 +382,16 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, streamStateDestroyBatch(batch); return code; } - +int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) { + const char* taskKey = "streamFileState"; + char keyBuf[128] = {0}; + sprintf(keyBuf, "%s:%" PRId64 "", taskKey, checkpointId); + return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); +} +int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { + const char* taskKey = "streamFileState"; + return streamDefaultIter_rocksdb(pFileState->pFileStore, taskKey, NULL, list); +} int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t code = TSDB_CODE_SUCCESS; const char* taskKey = "streamFileState"; @@ -391,7 +400,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { char buf[128] = {0}; void* val = NULL; int32_t len = 0; - sprintf(buf, "%s:%" PRId64 "", taskKey, INT64_MIN); + memcpy(buf, taskKey, strlen(taskKey)); code = streamDefaultGet_rocksdb(pFileState->pFileStore, buf, &val, &len); if (code != 0) { return TSDB_CODE_FAILED; @@ -412,7 +421,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { sscanf(val, "%" PRId64 "", &ts); taosMemoryFree(val); if (ts < pFileState->flushMark) { - // forceRemoveCheckPoint(pFileState->pFileStore, i); + forceRemoveCheckpoint(pFileState, i); break; } else { } From 6a98d55c2fb5f31249084af3c4bbd10500b99e0b Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 12 Apr 2023 11:04:17 +0800 Subject: [PATCH 07/18] add ci test --- include/common/tcommon.h | 12 +-- source/libs/executor/src/timewindowoperator.c | 44 +------- tests/parallel_test/cases.task | 100 ++++++++++-------- tests/script/win-test-file | 49 +++++---- 4 files changed, 97 insertions(+), 108 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 61adccf5c0..51a714c792 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -51,18 +51,18 @@ static inline int winKeyCmprImpl(const void* pKey1, const void* pKey2) { SWinKey* pWin1 = (SWinKey*)pKey1; SWinKey* pWin2 = (SWinKey*)pKey2; - if (pWin1->ts > pWin2->ts) { - return 1; - } else if (pWin1->ts < pWin2->ts) { - return -1; - } - if (pWin1->groupId > pWin2->groupId) { return 1; } else if (pWin1->groupId < pWin2->groupId) { return -1; } + if (pWin1->ts > pWin2->ts) { + return 1; + } else if (pWin1->ts < pWin2->ts) { + return -1; + } + return 0; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index e7734f5a47..c1b3ed647d 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1454,42 +1454,8 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { return w; } -static void deleteIntervalDiscBuf(SStreamState* pState, SHashObj* pPullDataMap, TSKEY mark, SInterval* pInterval, - SWinKey* key) { - STimeWindow tw = getFinalTimeWindow(key->ts, pInterval); - SWinKey next = {0}; - while (tw.ekey < mark) { - SStreamStateCur* pCur = streamStateSeekKeyNext(pState, key); - int32_t code = streamStateGetKVByCur(pCur, &next, NULL, 0); - streamStateFreeCur(pCur); - - void* chIds = taosHashGet(pPullDataMap, key, sizeof(SWinKey)); - if (chIds && pPullDataMap) { - SArray* chAy = *(SArray**)chIds; - int32_t size = taosArrayGetSize(chAy); - qDebug("===stream===window %" PRId64 " wait child size:%d", key->ts, size); - for (int32_t i = 0; i < size; i++) { - qDebug("===stream===window %" PRId64 " wait child id:%d", key->ts, *(int32_t*)taosArrayGet(chAy, i)); - } - break; - } - qDebug("===stream===delete window %" PRId64, key->ts); - int32_t codeDel = streamStateDel(pState, key); - if (codeDel != TSDB_CODE_SUCCESS) { - code = streamStateGetFirst(pState, key); - if (code != TSDB_CODE_SUCCESS) { - qDebug("===stream===stream state first key: empty-empty"); - return; - } - continue; - } - if (code == TSDB_CODE_SUCCESS) { - *key = next; - tw = getFinalTimeWindow(key->ts, pInterval); - } else { - break; - } - } +static void deleteIntervalDiscBuf(SStreamState* pState, TSKEY mark) { + //todo(liuyao) delete expired check point } static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) { @@ -2609,8 +2575,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); qDebug("===stream===clear semi operator"); } else { - deleteIntervalDiscBuf(pInfo->pState, pInfo->pPullDataMap, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, - &pInfo->interval, &pInfo->delKey); + deleteIntervalDiscBuf(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { streamStateCommit(pInfo->pState); pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; @@ -4848,8 +4813,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { printDataBlock(pInfo->binfo.pRes, "single interval"); return pInfo->binfo.pRes; } - deleteIntervalDiscBuf(pInfo->pState, NULL, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark, &pInfo->interval, - &pInfo->delKey); + deleteIntervalDiscBuf(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); setOperatorCompleted(pOperator); if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { streamStateCommit(pInfo->pState); diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index f7910e1ab2..a387df4da6 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -223,40 +223,47 @@ ,,n,script,./test.sh -f tsim/stream/basic0.sim -g ,,y,script,./test.sh -f tsim/stream/basic1.sim ,,y,script,./test.sh -f tsim/stream/basic2.sim +,,y,script,./test.sh -f tsim/stream/basic3.sim +,,y,script,./test.sh -f tsim/stream/basic4.sim +,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim +,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim +,,y,script,./test.sh -f tsim/stream/deleteInterval.sim +,,y,script,./test.sh -f tsim/stream/deleteSession.sim +,,y,script,./test.sh -f tsim/stream/deleteState.sim +,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim +,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim +,,y,script,./test.sh -f tsim/stream/distributeSession0.sim ,,y,script,./test.sh -f tsim/stream/drop_stream.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim -,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim -,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim -,,y,script,./test.sh -f tsim/stream/distributeSession0.sim -,,y,script,./test.sh -f tsim/stream/session0.sim -,,y,script,./test.sh -f tsim/stream/session1.sim -,,y,script,./test.sh -f tsim/stream/state0.sim -,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim -,,y,script,./test.sh -f tsim/stream/triggerSession0.sim -,,y,script,./test.sh -f tsim/stream/partitionby.sim -,,y,script,./test.sh -f tsim/stream/partitionby1.sim -,,y,script,./test.sh -f tsim/stream/schedSnode.sim -,,y,script,./test.sh -f tsim/stream/windowClose.sim -,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim -,,y,script,./test.sh -f tsim/stream/sliding.sim -,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim -,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim -,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim -,,y,script,./test.sh -f tsim/stream/deleteInterval.sim -,,y,script,./test.sh -f tsim/stream/deleteSession.sim -,,y,script,./test.sh -f tsim/stream/deleteState.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalDelete0.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalDelete1.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalLinear.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim +,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext1.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim +,,y,script,./test.sh -f tsim/stream/fillIntervalRange.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim +,,y,script,./test.sh -f tsim/stream/ignoreCheckUpdate.sim +,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim +,,y,script,./test.sh -f tsim/stream/partitionby1.sim +,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim +,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim +,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim +,,y,script,./test.sh -f tsim/stream/partitionby.sim +,,y,script,./test.sh -f tsim/stream/schedSnode.sim +,,y,script,./test.sh -f tsim/stream/session0.sim +,,y,script,./test.sh -f tsim/stream/session1.sim +,,y,script,./test.sh -f tsim/stream/sliding.sim +,,y,script,./test.sh -f tsim/stream/state0.sim +,,y,script,./test.sh -f tsim/stream/state1.sim +,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim +,,y,script,./test.sh -f tsim/stream/triggerSession0.sim ,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim ,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim -,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim -,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim +,,y,script,./test.sh -f tsim/stream/udTableAndTag2.sim +,,y,script,./test.sh -f tsim/stream/windowClose.sim ,,y,script,./test.sh -f tsim/trans/lossdata1.sim ,,y,script,./test.sh -f tsim/trans/create_db.sim ,,y,script,./test.sh -f tsim/tmq/basic1.sim @@ -1334,38 +1341,47 @@ ,,n,script,./test.sh -f tsim/stream/basic0.sim -g ,,y,script,./test.sh -f tsim/stream/basic1.sim ,,y,script,./test.sh -f tsim/stream/basic2.sim +,,y,script,./test.sh -f tsim/stream/basic3.sim +,,y,script,./test.sh -f tsim/stream/basic4.sim +,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim +,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim +,,y,script,./test.sh -f tsim/stream/deleteInterval.sim +,,y,script,./test.sh -f tsim/stream/deleteSession.sim +,,y,script,./test.sh -f tsim/stream/deleteState.sim +,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim +,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim +,,y,script,./test.sh -f tsim/stream/distributeSession0.sim ,,y,script,./test.sh -f tsim/stream/drop_stream.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic1.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic2.sim ,,y,script,./test.sh -f tsim/stream/fillHistoryBasic3.sim -,,y,script,./test.sh -f tsim/stream/distributeInterval0.sim -,,y,script,./test.sh -f tsim/stream/distributeIntervalRetrive0.sim -,,y,script,./test.sh -f tsim/stream/distributeSession0.sim -,,y,script,./test.sh -f tsim/stream/session0.sim -,,y,script,./test.sh -f tsim/stream/session1.sim -,,y,script,./test.sh -f tsim/stream/state0.sim -,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim -,,y,script,./test.sh -f tsim/stream/triggerSession0.sim -,,y,script,./test.sh -f tsim/stream/partitionby.sim -,,y,script,./test.sh -f tsim/stream/partitionby1.sim -,,y,script,./test.sh -f tsim/stream/schedSnode.sim -,,y,script,./test.sh -f tsim/stream/windowClose.sim -,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim -,,y,script,./test.sh -f tsim/stream/sliding.sim -,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim -,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim -,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim -,,y,script,./test.sh -f tsim/stream/deleteInterval.sim -,,y,script,./test.sh -f tsim/stream/deleteSession.sim -,,y,script,./test.sh -f tsim/stream/deleteState.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalDelete0.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalDelete1.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalLinear.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalPartitionBy.sim +,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext1.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalPrevNext.sim +,,y,script,./test.sh -f tsim/stream/fillIntervalRange.sim ,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim +,,y,script,./test.sh -f tsim/stream/ignoreCheckUpdate.sim +,,y,script,./test.sh -f tsim/stream/ignoreExpiredData.sim +,,y,script,./test.sh -f tsim/stream/partitionby1.sim +,,y,script,./test.sh -f tsim/stream/partitionbyColumnInterval.sim +,,y,script,./test.sh -f tsim/stream/partitionbyColumnSession.sim +,,y,script,./test.sh -f tsim/stream/partitionbyColumnState.sim +,,y,script,./test.sh -f tsim/stream/partitionby.sim +,,y,script,./test.sh -f tsim/stream/schedSnode.sim +,,y,script,./test.sh -f tsim/stream/session0.sim +,,y,script,./test.sh -f tsim/stream/session1.sim +,,y,script,./test.sh -f tsim/stream/sliding.sim +,,y,script,./test.sh -f tsim/stream/state0.sim +,,y,script,./test.sh -f tsim/stream/state1.sim +,,y,script,./test.sh -f tsim/stream/triggerInterval0.sim +,,y,script,./test.sh -f tsim/stream/triggerSession0.sim ,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim ,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim +,,y,script,./test.sh -f tsim/stream/udTableAndTag2.sim +,,y,script,./test.sh -f tsim/stream/windowClose.sim ,,y,script,./test.sh -f tsim/trans/lossdata1.sim ,,y,script,./test.sh -f tsim/tmq/basic1.sim ,,y,script,./test.sh -f tsim/tmq/basic2.sim diff --git a/tests/script/win-test-file b/tests/script/win-test-file index 2d5a1b3108..e8e71f8fdc 100644 --- a/tests/script/win-test-file +++ b/tests/script/win-test-file @@ -213,38 +213,47 @@ rem ./test.sh -f tsim/db/alter_replica_13.sim ./test.sh -f tsim/stream/basic0.sim -g ./test.sh -f tsim/stream/basic1.sim ./test.sh -f tsim/stream/basic2.sim +./test.sh -f tsim/stream/basic3.sim +./test.sh -f tsim/stream/basic4.sim +./test.sh -f tsim/stream/checkStreamSTable1.sim +./test.sh -f tsim/stream/checkStreamSTable.sim +./test.sh -f tsim/stream/deleteInterval.sim +./test.sh -f tsim/stream/deleteSession.sim +./test.sh -f tsim/stream/deleteState.sim +./test.sh -f tsim/stream/distributeInterval0.sim +./test.sh -f tsim/stream/distributeIntervalRetrive0.sim +./test.sh -f tsim/stream/distributeSession0.sim ./test.sh -f tsim/stream/drop_stream.sim ./test.sh -f tsim/stream/fillHistoryBasic1.sim ./test.sh -f tsim/stream/fillHistoryBasic2.sim ./test.sh -f tsim/stream/fillHistoryBasic3.sim -./test.sh -f tsim/stream/distributeInterval0.sim -./test.sh -f tsim/stream/distributeIntervalRetrive0.sim -./test.sh -f tsim/stream/distributeSession0.sim -./test.sh -f tsim/stream/session0.sim -./test.sh -f tsim/stream/session1.sim -./test.sh -f tsim/stream/state0.sim -./test.sh -f tsim/stream/triggerInterval0.sim -./test.sh -f tsim/stream/triggerSession0.sim -./test.sh -f tsim/stream/partitionby.sim -./test.sh -f tsim/stream/partitionby1.sim -./test.sh -f tsim/stream/schedSnode.sim -./test.sh -f tsim/stream/windowClose.sim -./test.sh -f tsim/stream/ignoreExpiredData.sim -./test.sh -f tsim/stream/sliding.sim -./test.sh -f tsim/stream/partitionbyColumnInterval.sim -./test.sh -f tsim/stream/partitionbyColumnSession.sim -./test.sh -f tsim/stream/partitionbyColumnState.sim -./test.sh -f tsim/stream/deleteInterval.sim -./test.sh -f tsim/stream/deleteSession.sim -./test.sh -f tsim/stream/deleteState.sim ./test.sh -f tsim/stream/fillIntervalDelete0.sim ./test.sh -f tsim/stream/fillIntervalDelete1.sim ./test.sh -f tsim/stream/fillIntervalLinear.sim ./test.sh -f tsim/stream/fillIntervalPartitionBy.sim +./test.sh -f tsim/stream/fillIntervalPrevNext1.sim ./test.sh -f tsim/stream/fillIntervalPrevNext.sim +./test.sh -f tsim/stream/fillIntervalRange.sim ./test.sh -f tsim/stream/fillIntervalValue.sim +./test.sh -f tsim/stream/ignoreCheckUpdate.sim +./test.sh -f tsim/stream/ignoreExpiredData.sim +./test.sh -f tsim/stream/partitionby1.sim +./test.sh -f tsim/stream/partitionbyColumnInterval.sim +./test.sh -f tsim/stream/partitionbyColumnSession.sim +./test.sh -f tsim/stream/partitionbyColumnState.sim +./test.sh -f tsim/stream/partitionby.sim +./test.sh -f tsim/stream/schedSnode.sim +./test.sh -f tsim/stream/session0.sim +./test.sh -f tsim/stream/session1.sim +./test.sh -f tsim/stream/sliding.sim +./test.sh -f tsim/stream/state0.sim +./test.sh -f tsim/stream/state1.sim +./test.sh -f tsim/stream/triggerInterval0.sim +./test.sh -f tsim/stream/triggerSession0.sim ./test.sh -f tsim/stream/udTableAndTag0.sim ./test.sh -f tsim/stream/udTableAndTag1.sim +./test.sh -f tsim/stream/udTableAndTag2.sim +./test.sh -f tsim/stream/windowClose.sim ./test.sh -f tsim/trans/lossdata1.sim ./test.sh -f tsim/trans/create_db.sim ./test.sh -f tsim/tmq/basic1.sim From 20f7838c8f0c09c56e1d17a8294c9fb6c147d6c2 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 12 Apr 2023 13:32:49 +0800 Subject: [PATCH 08/18] opt interval final window --- source/libs/executor/src/timewindowoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c1b3ed647d..eb7c605d69 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2140,7 +2140,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) { if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { SWinKey key = {.ts = pWin->skey, .groupId = groupId}; - if (streamStateCheck(pState, &key)) { + if (!hasIntervalWindow(pState, &key)) { return true; } return false; From 819154e1fcce9d87549a47f33edb0644000d055d Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 12 Apr 2023 13:47:54 +0800 Subject: [PATCH 09/18] use delete_mark to drop check point --- source/libs/stream/src/tstreamFileState.c | 12 +----------- 1 file changed, 1 insertion(+), 11 deletions(-) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index ddd8e148ff..9e8fe9b5de 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -313,16 +313,6 @@ SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { return pFileState->usedBuffs; } -// void streamFileStateDecode(SStreamFileState* pFileState, void* pBuff, int32_t len) { -// pBuff = taosDecodeFixedI64(pBuff, &pFileState->flushMark); -// } - -// void streamFileStateEncode(SStreamFileState* pFileState, void** pVal, int32_t* pLen) { -// *pLen = sizeof(TSKEY); -// (*pVal) = taosMemoryCalloc(1, *pLen); -// void* buff = *pVal; -// taosEncodeFixedI64(&buff, pFileState->flushMark); -// } void streamFileStateDecode(TSKEY* key, void* pBuff, int32_t len) { pBuff = taosDecodeFixedI64(pBuff, key); } void streamFileStateEncode(TSKEY* key, void** pVal, int32_t* pLen) { @@ -420,7 +410,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { TSKEY ts; sscanf(val, "%" PRId64 "", &ts); taosMemoryFree(val); - if (ts < pFileState->flushMark) { + if (ts < pFileState->deleteMark) { forceRemoveCheckpoint(pFileState, i); break; } else { From 7e4eefcde18c7936fbbf6a569f2e0a462708e121 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 12 Apr 2023 05:50:01 +0000 Subject: [PATCH 10/18] fix write crash --- source/libs/stream/src/tstreamFileState.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index ddd8e148ff..c3b9d9f7dd 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -355,6 +355,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, if (streamStateGetBatchSize(batch) > 0) { code = streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); } + streamStateClearBatch(batch); if (flushState) { const char* taskKey = "streamFileState"; From 4625dd98f02a6fc206a74ceb39ca374195d12a83 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 12 Apr 2023 07:39:54 +0000 Subject: [PATCH 11/18] fix write crash --- source/libs/stream/src/streamStateRocksdb.c | 13 +++++++++++-- source/libs/stream/src/tstreamFileState.c | 9 ++++----- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/streamStateRocksdb.c b/source/libs/stream/src/streamStateRocksdb.c index fb5ad34d0d..9977732a9a 100644 --- a/source/libs/stream/src/streamStateRocksdb.c +++ b/source/libs/stream/src/streamStateRocksdb.c @@ -18,8 +18,17 @@ #include "tcommon.h" int defaultKeyComp(void* state, const char* aBuf, size_t aLen, const char* bBuf, size_t bLen) { - // - return memcmp(aBuf, bBuf, aLen); + int ret = memcmp(aBuf, bBuf, aLen); + if (ret == 0) { + if (aLen < bLen) + return -1; + else if (aLen > bLen) + return 1; + else + return 0; + } else { + return ret; + } } int defaultKeyEncode(void* k, char* buf) { int len = strlen((char*)k); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 4c6c1e1afa..eab95d1214 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -355,7 +355,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, int32_t len = 0; sprintf(keyBuf, "%s:%" PRId64 "", taskKey, ((SStreamState*)pFileState->pFileStore)->checkPointId); streamFileStateEncode(&pFileState->flushMark, &valBuf, &len); - streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); + code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); taosMemoryFree(valBuf); } { @@ -363,14 +363,13 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, char valBuf[64] = {0}; int32_t len = 0; memcpy(keyBuf, taskKey, strlen(taskKey)); - sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); - - streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, strlen(valBuf)); + len = sprintf(valBuf, "%" PRId64 "", ((SStreamState*)pFileState->pFileStore)->checkPointId); + code = streamStatePutBatch(pFileState->pFileStore, "default", batch, keyBuf, valBuf, len); } streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); } - streamStateDestroyBatch(batch); + return code; } int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) { From 9be89ce00a4d4e24d5e7c7b2f99afe1100f372a0 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 12 Apr 2023 17:33:58 +0800 Subject: [PATCH 12/18] delete expired check point --- include/libs/stream/streamState.h | 1 + include/libs/stream/tstreamFileState.h | 2 +- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/timewindowoperator.c | 17 +++++++++-------- source/libs/stream/src/streamState.c | 8 ++++++++ source/libs/stream/src/tstreamFileState.c | 12 ++++++++++-- 6 files changed, 30 insertions(+), 12 deletions(-) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 7c4aab2491..ae84299c1c 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -65,6 +65,7 @@ void streamStateClose(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); void streamStateDestroy(SStreamState* pState); +int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark); typedef struct { rocksdb_iterator_t* iter; diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 4f2618f8dc..25267be354 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -54,7 +54,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pS int32_t recoverSnapshot(SStreamFileState* pFileState); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); -int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId); +int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); #ifdef __cplusplus } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0ad2423ca3..86f78989d1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2151,7 +2151,7 @@ FETCH_NEXT_BLOCK: // record the scan action. pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; - // printDataBlock(pInfo->pRes, "stream scan"); + printDataBlock(pInfo->pRes, "stream scan"); qDebug("scan rows: %" PRId64, pBlockInfo->rows); if (pBlockInfo->rows > 0) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index eb7c605d69..811194031b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1454,10 +1454,6 @@ STimeWindow getFinalTimeWindow(int64_t ts, SInterval* pInterval) { return w; } -static void deleteIntervalDiscBuf(SStreamState* pState, TSKEY mark) { - //todo(liuyao) delete expired check point -} - static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren, TSKEY maxTs) { int32_t size = taosArrayGetSize(pChildren); for (int32_t i = 0; i < size; i++) { @@ -2404,6 +2400,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p STimeWindow nextWin = {0}; if (IS_FINAL_OP(pInfo)) { nextWin = getFinalTimeWindow(ts, &pInfo->interval); + qDebug("===stream===final ts:%" PRId64 ", getFinalTimeWindow:%" PRId64,ts, nextWin.skey); } else { nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); } @@ -2448,14 +2445,17 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } if (ignore) { - startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); + // startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); + int32_t prevEndPos = startPos; + startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos); if (startPos < 0) { break; } continue; } } - + + qDebug("===stream===final setIntervalOutputBuf:%" PRId64, nextWin.skey); int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); pResult = (SResultRow*)pResPos->pRowBuff; @@ -2474,6 +2474,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p .groupId = groupId, }; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { + qDebug("===stream===is final%d saveWinResult:%" PRId64, IS_FINAL_OP(pInfo), key.ts); saveWinResult(&key, pResPos, pUpdatedMap); } @@ -2575,9 +2576,9 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); qDebug("===stream===clear semi operator"); } else { - deleteIntervalDiscBuf(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { streamStateCommit(pInfo->pState); + streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; } qDebug("===stream===interval final close"); @@ -4813,10 +4814,10 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { printDataBlock(pInfo->binfo.pRes, "single interval"); return pInfo->binfo.pRes; } - deleteIntervalDiscBuf(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); setOperatorCompleted(pOperator); if (pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { streamStateCommit(pInfo->pState); + streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index cee72002da..85b176c684 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1072,6 +1072,14 @@ void streamStateDestroy(SStreamState* pState) { taosMemoryFreeClear(pState); } +int32_t streamStateDeleteCheckPoint(SStreamState* pState, TSKEY mark) { +#ifdef USE_ROCKSDB + return deleteExpiredCheckPoint(pState->pFileState, mark); +#else + return 0; +#endif +} + #if 0 char* streamStateSessionDump(SStreamState* pState) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index eab95d1214..88fdfb6b3c 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -372,17 +372,20 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot, return code; } + int32_t forceRemoveCheckpoint(SStreamFileState* pFileState, int64_t checkpointId) { const char* taskKey = "streamFileState"; char keyBuf[128] = {0}; sprintf(keyBuf, "%s:%" PRId64 "", taskKey, checkpointId); return streamDefaultDel_rocksdb(pFileState->pFileStore, keyBuf); } + int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list) { const char* taskKey = "streamFileState"; return streamDefaultIter_rocksdb(pFileState->pFileStore, taskKey, NULL, list); } -int32_t recoverSnapshot(SStreamFileState* pFileState) { + +int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t code = TSDB_CODE_SUCCESS; const char* taskKey = "streamFileState"; int64_t maxCheckPointId = 0; @@ -410,13 +413,18 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { TSKEY ts; sscanf(val, "%" PRId64 "", &ts); taosMemoryFree(val); - if (ts < pFileState->deleteMark) { + if (ts < mark) { forceRemoveCheckpoint(pFileState, i); break; } else { } } + return code; +} +int32_t recoverSnapshot(SStreamFileState* pFileState) { + int32_t code = TSDB_CODE_SUCCESS; + deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark); void* pStVal = NULL; int32_t len = 0; From e42caca9c2a6530df919a988f8c053bfa98f9dcc Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Wed, 12 Apr 2023 17:39:44 +0800 Subject: [PATCH 13/18] delete some log --- source/libs/executor/src/scanoperator.c | 2 +- source/libs/executor/src/timewindowoperator.c | 10 ++-------- 2 files changed, 3 insertions(+), 9 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 86f78989d1..0ad2423ca3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2151,7 +2151,7 @@ FETCH_NEXT_BLOCK: // record the scan action. pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; - printDataBlock(pInfo->pRes, "stream scan"); + // printDataBlock(pInfo->pRes, "stream scan"); qDebug("scan rows: %" PRId64, pBlockInfo->rows); if (pBlockInfo->rows > 0) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 811194031b..c2d2e92c79 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2400,7 +2400,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p STimeWindow nextWin = {0}; if (IS_FINAL_OP(pInfo)) { nextWin = getFinalTimeWindow(ts, &pInfo->interval); - qDebug("===stream===final ts:%" PRId64 ", getFinalTimeWindow:%" PRId64,ts, nextWin.skey); } else { nextWin = getActiveTimeWindow(pInfo->aggSup.pResultBuf, pResultRowInfo, ts, &pInfo->interval, TSDB_ORDER_ASC); } @@ -2428,7 +2427,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { int32_t size = taosArrayGetSize(pInfo->pChildren); addPullWindow(pInfo->pPullDataMap, &winRes, size); - qDebug("===stream===prepare retrive %" PRId64 ", size:%d", winRes.ts, size); } } else { int32_t index = -1; @@ -2445,17 +2443,14 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } if (ignore) { - // startPos = getNexWindowPos(&pInfo->interval, &pSDataBlock->info, tsCols, startPos, nextWin.ekey, &nextWin); - int32_t prevEndPos = startPos; - startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos); + startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, startPos); if (startPos < 0) { break; } continue; } } - - qDebug("===stream===final setIntervalOutputBuf:%" PRId64, nextWin.skey); + int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); pResult = (SResultRow*)pResPos->pRowBuff; @@ -2474,7 +2469,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p .groupId = groupId, }; if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { - qDebug("===stream===is final%d saveWinResult:%" PRId64, IS_FINAL_OP(pInfo), key.ts); saveWinResult(&key, pResPos, pUpdatedMap); } From 8992721e20ee61513fc01ecf2979ee21a0ba7ecd Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 13 Apr 2023 09:15:50 +0800 Subject: [PATCH 14/18] refact child cache --- source/libs/executor/inc/executorimpl.h | 1 + source/libs/executor/src/timewindowoperator.c | 95 ++----------------- 2 files changed, 9 insertions(+), 87 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 5fe64fef64..0adabed626 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -579,6 +579,7 @@ typedef struct SStreamIntervalOperatorInfo { SSDataBlock* pPullDataRes; bool isFinal; SArray* pChildren; + int32_t numOfChild; SStreamState* pState; SWinKey delKey; uint64_t numOfDatapack; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c2d2e92c79..89bce58cce 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1541,14 +1541,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { streamFileStateDestroy(pInfo->pState->pFileState); taosMemoryFreeClear(pInfo->pState); - if (pInfo->pChildren) { - int32_t size = taosArrayGetSize(pInfo->pChildren); - for (int32_t i = 0; i < size; i++) { - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, i); - destroyOperatorInfo(pChildOp); - } - taosArrayDestroy(pInfo->pChildren); - } nodesDestroyNode((SNode*)pInfo->pPhyNode); colDataDestroy(&pInfo->twAggSup.timeWindowData); pInfo->groupResInfo.pRows = taosArrayDestroy(pInfo->groupResInfo.pRows); @@ -2081,58 +2073,6 @@ int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SRowBuffPos return TSDB_CODE_SUCCESS; } -static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, SSHashObj* pUpdatedMap) { - SStreamIntervalOperatorInfo* pInfo = pOperator->info; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - int32_t size = taosArrayGetSize(pWinArray); - int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - SExprSupp* pSup = &pOperator->exprSupp; - if (!pInfo->pChildren) { - return; - } - for (int32_t i = 0; i < size; i++) { - SWinKey* pWinRes = taosArrayGet(pWinArray, i); - SRowBuffPos* pCurResPos = NULL; - SResultRow* pCurResult = NULL; - STimeWindow parentWin = getFinalTimeWindow(pWinRes->ts, &pInfo->interval); - if (isDeletedStreamWindow(&parentWin, pWinRes->groupId, pInfo->pState, &pInfo->twAggSup)) { - continue; - } - - int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); - int32_t num = 0; - for (int32_t j = 0; j < numOfChildren; j++) { - SOperatorInfo* pChildOp = taosArrayGetP(pInfo->pChildren, j); - SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; - SExprSupp* pChildSup = &pChildOp->exprSupp; - if (!hasIntervalWindow(pChInfo->pState, pWinRes)) { - continue; - } - if (num == 0) { - int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResPos, pWinRes->groupId, pSup->pCtx, - numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); - ASSERT(pCurResPos != NULL); - pCurResult = (SResultRow*)pCurResPos->pRowBuff; - if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) { - T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); - } - } - num++; - SRowBuffPos* pChResPos = NULL; - SResultRow* pChResult = NULL; - setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResPos, pWinRes->groupId, pChildSup->pCtx, - pChildSup->numOfExprs, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup); - pChResult = (SResultRow*)pChResPos->pRowBuff; - updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true); - compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); - } - if (num > 0 && pUpdatedMap) { - saveWinResultInfo(pCurResult->win.skey, pWinRes->groupId, pCurResPos, pUpdatedMap); - saveOutputBuf(pInfo->pState, pWinRes, pCurResult, pInfo->aggSup.resultRowSize); - } - } -} - bool isDeletedStreamWindow(STimeWindow* pWin, uint64_t groupId, SStreamState* pState, STimeWindowAggSupp* pTwSup) { if (pTwSup->maxTs != INT64_MIN && pWin->ekey < pTwSup->maxTs - pTwSup->deleteMark) { SWinKey key = {.ts = pWin->skey, .groupId = groupId}; @@ -2250,9 +2190,8 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo) { .window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { - int32_t size1 = taosArrayGetSize(pInfo->pChildren); - addPullWindow(pInfo->pPullDataMap, winKey, size1); - qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, size1); + addPullWindow(pInfo->pPullDataMap, winKey, pInfo->numOfChild); + qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, pInfo->numOfChild); } } } @@ -2413,7 +2352,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p continue; } - if (IS_FINAL_OP(pInfo) && pInfo->pChildren) { + if (IS_FINAL_OP(pInfo) && pInfo->numOfChild > 0) { bool ignore = true; SWinKey winRes = { .ts = nextWin.skey, @@ -2425,8 +2364,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p .window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { - int32_t size = taosArrayGetSize(pInfo->pChildren); - addPullWindow(pInfo->pPullDataMap, &winRes, size); + addPullWindow(pInfo->pPullDataMap, &winRes, pInfo->numOfChild); } } else { int32_t index = -1; @@ -2780,24 +2718,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, streamStateSetNumber(pInfo->pState, -1); initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->pChildren = NULL; - if (numOfChild > 0) { - pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); - if (!pInfo->pChildren) { - goto _error; - } - for (int32_t i = 0; i < numOfChild; i++) { - SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0); - if (pChildOp) { - SStreamIntervalOperatorInfo* pChInfo = pChildOp->info; - pChInfo->twAggSup.calTrigger = STREAM_TRIGGER_AT_ONCE; - taosArrayPush(pInfo->pChildren, &pChildOp); - streamStateSetNumber(pChInfo->pState, i); - continue; - } - goto _error; - } - } + pInfo->numOfChild = numOfChild; pInfo->pPhyNode = (SPhysiNode*)nodesCloneNode((SNode*)pPhyNode); @@ -3337,13 +3258,13 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; int32_t numOfOutput = pSup->numOfExprs; - int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren); + int32_t numOfChild = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { SSessionKey* pWinKey = taosArrayGet(pWinArray, i); int32_t num = 0; SResultWindowInfo parentWin = {0}; - for (int32_t j = 0; j < numOfChildren; j++) { + for (int32_t j = 0; j < numOfChild; j++) { SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j); SStreamSessionAggOperatorInfo* pChInfo = pChild->info; SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup; @@ -4995,7 +4916,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pullIndex = 0; pInfo->pPullDataRes = NULL; pInfo->isFinal = false; - pInfo->pChildren = NULL; + pInfo->numOfChild = 0; pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; pInfo->numOfDatapack = 0; From 259815fd83aab5d41f843cd1574637a5bb1cec59 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 13 Apr 2023 10:46:38 +0800 Subject: [PATCH 15/18] scan multiple range --- source/libs/executor/src/scanoperator.c | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 0ad2423ca3..8a3ac9d6a3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1183,14 +1183,19 @@ static STimeWindow getSlidingWindow(TSKEY* startTsCol, TSKEY* endTsCol, uint64_t static SSDataBlock* doRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { qInfo("do stream range scan. windows index:%d", *pRowIndex); + bool prepareRes = true; while (1) { SSDataBlock* pResult = NULL; pResult = doTableScan(pInfo->pTableScanOp); - if (!pResult && prepareRangeScan(pInfo, pSDB, pRowIndex)) { + if (!pResult) { + prepareRes = prepareRangeScan(pInfo, pSDB, pRowIndex); // scan next window data pResult = doTableScan(pInfo->pTableScanOp); } if (!pResult) { + if (prepareRes) { + continue; + } blockDataCleanup(pSDB); *pRowIndex = 0; pInfo->updateWin = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; @@ -2057,7 +2062,7 @@ FETCH_NEXT_BLOCK: updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); - // printDataBlock(pSDB, "stream scan update"); + printDataBlock(pSDB, "stream scan update"); calBlockTbName(pInfo, pSDB); return pSDB; } From 34bf638f192cf33d6eaab2069fac25b51e906696 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Thu, 13 Apr 2023 16:21:57 +0800 Subject: [PATCH 16/18] fix bug --- include/libs/stream/tstreamUpdate.h | 6 +--- source/libs/executor/src/scanoperator.c | 21 +++++--------- source/libs/stream/src/streamState.c | 1 + source/libs/stream/src/streamUpdate.c | 38 ++----------------------- 4 files changed, 12 insertions(+), 54 deletions(-) diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index ab328c6ad5..4678aa0bd9 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -40,9 +40,7 @@ typedef struct SUpdateInfo { TSKEY minTS; SScalableBf *pCloseWinSBF; SHashObj *pMap; - STimeWindow scanWindow; - uint64_t scanGroupId; - uint64_t maxVersion; + uint64_t maxDataVersion; } SUpdateInfo; SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); @@ -50,8 +48,6 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); -void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); -bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version); void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 8a3ac9d6a3..4b4cae1a55 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1022,8 +1022,9 @@ static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t grou pInfo->groupId = groupCol[rowIndex]; } -void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin) { +void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint64_t version) { pTableScanInfo->base.cond.twindows = *pWin; + pTableScanInfo->base.cond.endVersion = version; pTableScanInfo->scanTimes = 0; pTableScanInfo->currentGroupId = -1; tsdbReaderClose(pTableScanInfo->base.dataReader); @@ -1142,7 +1143,7 @@ static bool prepareRangeScan(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_ break; } - resetTableScanInfo(pInfo->pTableScanOp->info, &win); + resetTableScanInfo(pInfo->pTableScanOp->info, &win, pInfo->pUpdateInfo->maxDataVersion); pInfo->pTableScanOp->status = OP_OPENED; return true; } @@ -1784,6 +1785,7 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) { if (pInfo->pUpdateInfo) { + pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); checkUpdateData(pInfo, true, pBlock, true); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey); if (pInfo->pUpdateDataRes->info.rows > 0) { @@ -1845,7 +1847,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2; } - /*resetTableScanInfo(pTSInfo, pWin);*/ tsdbReaderClose(pTSInfo->base.dataReader); qDebug("4"); @@ -1891,8 +1892,6 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); - updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); printDataBlock(pSDB, "scan recover update"); @@ -1961,6 +1960,9 @@ FETCH_NEXT_BLOCK: pBlock->info.calWin.skey = INT64_MIN; pBlock->info.calWin.ekey = INT64_MAX; pBlock->info.dataLoad = 1; + if (pInfo->pUpdateInfo) { + pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); + } blockDataUpdateTsWindow(pBlock, 0); switch (pBlock->info.type) { case STREAM_NORMAL: @@ -2058,8 +2060,6 @@ FETCH_NEXT_BLOCK: SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; - uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader); - updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); printDataBlock(pSDB, "stream scan update"); @@ -2125,13 +2125,6 @@ FETCH_NEXT_BLOCK: setBlockIntoRes(pInfo, &block, false); - if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.id.groupId, - pInfo->pRes->info.version)) { - printDataBlock(pInfo->pRes, "stream scan ignore"); - blockDataCleanup(pInfo->pRes); - continue; - } - if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; return pInfo->pCreateTbRes; diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 85b176c684..4ead4c49b5 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -1066,6 +1066,7 @@ void streamStateDestroy(SStreamState* pState) { #ifdef USE_ROCKSDB streamFileStateDestroy(pState->pFileState); streamStateDestroy_rocksdb(pState); + taosMemoryFreeClear(pState->parNameMap); // do nothong #endif taosMemoryFreeClear(pState->pTdbState); diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index be12c72d00..70a1c543f6 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -128,9 +128,7 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma pInfo->pCloseWinSBF = NULL; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); - pInfo->maxVersion = 0; - pInfo->scanGroupId = 0; - pInfo->scanWindow = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; + pInfo->maxDataVersion = 0; return pInfo; } @@ -242,29 +240,6 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { return true; } -void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) { - qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, - pWin->skey, pWin->ekey, version); - pInfo->scanWindow = *pWin; - pInfo->scanGroupId = groupId; - pInfo->maxVersion = version; -} - -bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow *pWin, uint64_t groupId, uint64_t version) { - if (!pInfo) { - return false; - } - qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, - pWin->skey, pWin->ekey, version); - if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && pWin->ekey <= pInfo->scanWindow.ekey && - version <= pInfo->maxVersion) { - qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64, groupId, - pWin->skey, pWin->ekey, version); - return true; - } - return false; -} - void updateInfoDestroy(SUpdateInfo *pInfo) { if (pInfo == NULL) { return; @@ -337,10 +312,7 @@ int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) if (tEncodeI64(&encoder, *(TSKEY *)pIte) < 0) return -1; } - if (tEncodeI64(&encoder, pInfo->scanWindow.skey) < 0) return -1; - if (tEncodeI64(&encoder, pInfo->scanWindow.ekey) < 0) return -1; - if (tEncodeU64(&encoder, pInfo->scanGroupId) < 0) return -1; - if (tEncodeU64(&encoder, pInfo->maxVersion) < 0) return -1; + if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) return -1; tEndEncode(&encoder); @@ -393,11 +365,7 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { taosHashPut(pInfo->pMap, &uid, sizeof(uint64_t), &ts, sizeof(TSKEY)); } ASSERT(mapSize == taosHashGetSize(pInfo->pMap)); - - if (tDecodeI64(&decoder, &pInfo->scanWindow.skey) < 0) return -1; - if (tDecodeI64(&decoder, &pInfo->scanWindow.ekey) < 0) return -1; - if (tDecodeU64(&decoder, &pInfo->scanGroupId) < 0) return -1; - if (tDecodeU64(&decoder, &pInfo->maxVersion) < 0) return -1; + if (tDecodeU64(&decoder, &pInfo->maxDataVersion) < 0) return -1; tEndDecode(&decoder); From cb0b4ffbd3bca71a6e92830bfdaac06e62204f22 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 14 Apr 2023 09:38:35 +0800 Subject: [PATCH 17/18] use interval state --- source/libs/executor/inc/executorimpl.h | 7 ++-- source/libs/executor/src/scanoperator.c | 34 ++----------------- source/libs/executor/src/timewindowoperator.c | 22 ++++++------ 3 files changed, 18 insertions(+), 45 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0adabed626..f503f9370b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -483,9 +483,10 @@ typedef struct SStreamScanInfo { int32_t blockRecoverTotCnt; SSDataBlock* pRecoverRes; - SSDataBlock* pCreateTbRes; - int8_t igCheckUpdate; - int8_t igExpired; + SSDataBlock* pCreateTbRes; + int8_t igCheckUpdate; + int8_t igExpired; + SStreamState* pState; } SStreamScanInfo; typedef struct { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 4b4cae1a55..fbc835beea 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1447,39 +1447,8 @@ static int32_t generateScanRange(SStreamScanInfo* pInfo, SSDataBlock* pSrcBlock, return code; } -#if 0 -void calBlockTag(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { - SExprSupp* pTagCalSup = &pInfo->tagCalSup; - SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState; - if (pTagCalSup == NULL || pTagCalSup->numOfExprs == 0) return; - if (pBlock == NULL || pBlock->info.rows == 0) return; - - void* tag = NULL; - int32_t tagLen = 0; - if (streamStateGetParTag(pState, pBlock->info.id.groupId, &tag, &tagLen) == 0) { - pBlock->info.tagLen = tagLen; - void* pTag = taosMemoryRealloc(pBlock->info.pTag, tagLen); - if (pTag == NULL) { - tdbFree(tag); - taosMemoryFree(pBlock->info.pTag); - pBlock->info.pTag = NULL; - pBlock->info.tagLen = 0; - return; - } - pBlock->info.pTag = pTag; - memcpy(pBlock->info.pTag, tag, tagLen); - tdbFree(tag); - return; - } else { - pBlock->info.pTag = NULL; - } - tdbFree(tag); -} -#endif - static void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; - SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState; blockDataCleanup(pInfo->pCreateTbRes); if (pInfo->tbnameCalSup.numOfExprs == 0 && pInfo->tagCalSup.numOfExprs == 0) { pBlock->info.parTbName[0] = 0; @@ -1535,7 +1504,7 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.id.uid, tsCol[rowId]); bool closedWin = isClosed && isSignleIntervalWindow(pInfo) && isDeletedStreamWindow(&win, pBlock->info.id.groupId, - pInfo->pTableScanOp->pTaskInfo->streamInfo.pState, &pInfo->twAggSup); + pInfo->pState, &pInfo->twAggSup); if ((update || closedWin) && out) { qDebug("stream update check not pass, update %d, closedWin %d", update, closedWin); uint64_t gpId = 0; @@ -2534,6 +2503,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; pInfo->igExpired = pTableScanNode->igExpired; pInfo->twAggSup.maxTs = INT64_MIN; + pInfo->pState = NULL; // todo(liuyao) get buff from rocks db; void* buff = NULL; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 89bce58cce..8034b8f871 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -25,7 +25,9 @@ #include "ttime.h" #define IS_FINAL_OP(op) ((op)->isFinal) -#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); +// #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); + +#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL); typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; @@ -1612,20 +1614,20 @@ static bool timeWindowinterpNeeded(SqlFunctionCtx* pCtx, int32_t numOfCols, SInt return needed; } -void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SAggSupporter* pSup, SInterval* pInterval, - STimeWindowAggSupp* pTwSup) { +void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamIntervalOperatorInfo* pInfo) { if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - initIntervalDownStream(downstream->pDownstream[0], type, pSup, pInterval, pTwSup); + initIntervalDownStream(downstream->pDownstream[0], type, pInfo); return; } SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup.parentType = type; - pScanInfo->windowSup.pIntervalAggSup = pSup; + pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = updateInfoInitP(pInterval, pTwSup->waterMark); + pScanInfo->pUpdateInfo = updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark); } - pScanInfo->interval = *pInterval; - pScanInfo->twAggSup = *pTwSup; + pScanInfo->interval = pInfo->interval; + pScanInfo->twAggSup = pInfo->twAggSup; + pScanInfo->pState = pInfo->pState; } void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) { @@ -2761,7 +2763,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { - initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); + initIntervalDownStream(downstream, pPhyNode->type, pInfo); } code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -4930,7 +4932,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); - initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup, &pInfo->interval, &pInfo->twAggSup); + initIntervalDownStream(downstream, pPhyNode->type, pInfo); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; From 4906855a8b3ba77e004c9e251c35f9d1eeb0199e Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 14 Apr 2023 09:47:56 +0800 Subject: [PATCH 18/18] delete mark --- source/libs/executor/src/timewindowoperator.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 8034b8f871..f85ec1fb2f 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -25,9 +25,8 @@ #include "ttime.h" #define IS_FINAL_OP(op) ((op)->isFinal) -// #define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); +#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); -#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL); typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo;