From 429b5cd6462421e5868af2c9f6a7eab9213c27d1 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Mon, 3 Apr 2023 14:31:37 +0800 Subject: [PATCH] feat:add stream file state --- include/common/tglobal.h | 3 +- include/libs/stream/streamState.h | 6 +- include/libs/stream/tstreamFileState.h | 54 ++++ include/util/tlist.h | 2 + source/common/src/tglobal.c | 4 +- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 2 +- source/libs/executor/src/timewindowoperator.c | 13 +- source/libs/stream/src/streamState.c | 38 +-- source/libs/stream/src/tstreamFileState.c | 235 ++++++++++++++++++ source/util/src/tlist.c | 18 ++ tests/parallel_test/cases.task | 2 + 12 files changed, 334 insertions(+), 45 deletions(-) create mode 100644 include/libs/stream/tstreamFileState.h create mode 100644 source/libs/stream/src/tstreamFileState.c diff --git a/include/common/tglobal.h b/include/common/tglobal.h index ac75b84762..a891b1db67 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -165,7 +165,8 @@ extern int32_t tsUptimeInterval; extern int32_t tsRpcRetryLimit; extern int32_t tsRpcRetryInterval; -extern bool tsDisableStream; +extern bool tsDisableStream; +extern int64_t tsStreamBufferSize; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 1824b9e340..5caba81fde 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -17,6 +17,7 @@ #include "rocksdb/c.h" #include "tdbInt.h" +#include "tstreamFileState.h" #ifdef __cplusplus extern "C" { @@ -54,6 +55,7 @@ typedef struct STdbState { // incremental state storage typedef struct { STdbState* pTdbState; + SStreamFileState* pFileState; int32_t number; } SStreamState; @@ -61,7 +63,6 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int void streamStateClose(SStreamState* pState); int32_t streamStateBegin(SStreamState* pState); int32_t streamStateCommit(SStreamState* pState); -int32_t streamStateAbort(SStreamState* pState); void streamStateDestroy(SStreamState* pState); typedef struct { @@ -126,9 +127,6 @@ int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname); int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal); -int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen); -int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen); - /***compare func **/ // todo refactor diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h new file mode 100644 index 0000000000..1320dcd4ec --- /dev/null +++ b/include/libs/stream/tstreamFileState.h @@ -0,0 +1,54 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _STREAM_FILE_STATE_H_ +#define _STREAM_FILE_STATE_H_ + +#include "os.h" + +#include "tdef.h" +#include "tlist.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SStreamFileState SStreamFileState; +typedef struct SRowBuffPos { + void* pRowBuff; + void* pKey; + bool beFlushed; + bool beUsed; +} SRowBuffPos; + +typedef SList SStreamSnapshot; + +typedef bool (*ExpiredFun)(void*, TSKEY); + +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, ExpiredFun fp, void* pFile); +void destroyStreamFileState(SStreamFileState* pFileState); + +int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen); +void* getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos); + +SStreamSnapshot* getSnapshot(SStreamFileState* pFileState); +int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize); +int32_t recoverSnapshot(SStreamFileState* pFileState); + +#ifdef __cplusplus +} +#endif + +#endif // _STREAM_FILE_STATE_H_ diff --git a/include/util/tlist.h b/include/util/tlist.h index 3dbdb72f9e..e9a81d350e 100644 --- a/include/util/tlist.h +++ b/include/util/tlist.h @@ -17,6 +17,7 @@ #define _TD_UTIL_LIST_H_ #include "os.h" +#include "talgo.h" #ifdef __cplusplus extern "C" { @@ -222,6 +223,7 @@ void tdListInit(SList *list, int32_t eleSize); void tdListEmpty(SList *list); SList *tdListNew(int32_t eleSize); void *tdListFree(SList *list); +void *tdListFreeP(SList *list, FDelete fp); void tdListPrependNode(SList *list, SListNode *node); void tdListAppendNode(SList *list, SListNode *node); int32_t tdListPrepend(SList *list, void *data); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8058f9fddd..4917ebe3d3 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -196,6 +196,7 @@ int32_t tsUptimeInterval = 300; // seconds char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udfd exits char tsUdfdLdLibPath[512] = ""; bool tsDisableStream = false; +int64_t tsStreamBufferSize = 128 * 1024 * 1024; #ifndef _STORAGE int32_t taosSetTfsCfg(SConfig *pCfg) { @@ -496,6 +497,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, 0) != 0) return -1; if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1; + if (cfgAddBool(pCfg, "streamBufferSize", tsStreamBufferSize, 0) != 0) return -1; GRANT_CFG_ADD; return 0; @@ -824,7 +826,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsQueryBufferSizeBytes = tsQueryBufferSize * 1048576UL; } - tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; + tsDisableStream = cfgGetItem(pCfg, "disableStream")->i64; GRANT_CFG_GET; return 0; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 55bf8b37d0..6d861e2b03 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -865,7 +865,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size); int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); -int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, +int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup); int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult); int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index a28de52fab..4df52982ad 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2723,7 +2723,7 @@ int32_t streamStateAddIfNotExist2(SStreamState* pState, const SWinKey* key, void return 0; } -int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, +int32_t setIntervalOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset, SAggSupporter* pAggSup) { SWinKey key = { .ts = win->skey, diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 0fea8cf28d..8ba152ef1b 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ #include "executorimpl.h" +#include "tglobal.h" #include "filter.h" #include "function.h" #include "functionMgt.h" @@ -2151,7 +2152,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S continue; } if (num == 0) { - int32_t code = setOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput, + int32_t code = setIntervalOutputBuf(pInfo->pState, &parentWin, &pCurResult, pWinRes->groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); ASSERT(pCurResult != NULL); if (code != TSDB_CODE_SUCCESS || pCurResult == NULL) { @@ -2160,7 +2161,7 @@ static void rebuildIntervalWindow(SOperatorInfo* pOperator, SArray* pWinArray, S } num++; SResultRow* pChResult = NULL; - setOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs, + setIntervalOutputBuf(pChInfo->pState, &parentWin, &pChResult, pWinRes->groupId, pChildSup->pCtx, pChildSup->numOfExprs, pChildSup->rowEntryInfoOffset, &pChInfo->aggSup); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &parentWin, true); compactFunctions(pSup->pCtx, pChildSup->pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData); @@ -2412,7 +2413,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } } - int32_t code = setOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput, + int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResult, groupId, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset, &pInfo->aggSup); if (code != TSDB_CODE_SUCCESS || pResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); @@ -4853,6 +4854,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return NULL; } +bool compareTs(void* pKey, TSKEY mark) { + SWinKey* pWinKey = (SWinKey*) pKey; + return pWinKey->ts < mark; +} + SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); @@ -4939,6 +4945,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, pInfo->aggSup.resultRowSize, compareTs, pInfo->pState); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index 1ab7014357..557c000a87 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -263,22 +263,6 @@ int32_t streamStateCommit(SStreamState* pState) { #endif } -int32_t streamStateAbort(SStreamState* pState) { -#ifdef USE_ROCKSDB - return 0; -#else - if (tdbAbort(pState->pTdbState->db, pState->pTdbState->txn) < 0) { - return -1; - } - - if (tdbBegin(pState->pTdbState->db, &pState->pTdbState->txn, NULL, NULL, NULL, - TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { - return -1; - } - return 0; -#endif -} - int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB return streamStateFuncPut_rocksdb(pState, key, value, vLen); @@ -305,7 +289,8 @@ int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) { // todo refactor int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB - return streamStatePut_rocksdb(pState, key, value, vLen); + return 0; + // return streamStatePut_rocksdb(pState, key, value, vLen); #else SStateKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbUpsert(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), value, vLen, pState->pTdbState->txn); @@ -315,7 +300,8 @@ int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* val // todo refactor int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen) { #ifdef USE_ROCKSDB - return streamStateGet_rocksdb(pState, key, pVal, pVLen); + return getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), pVal, pVLen); + // return streamStateGet_rocksdb(pState, key, pVal, pVLen); #else SStateKey sKey = {.key = *key, .opNum = pState->number}; return tdbTbGet(pState->pTdbState->pStateDb, &sKey, sizeof(SStateKey), pVal, pVLen); @@ -1033,22 +1019,6 @@ _end: #endif } -int32_t streamStatePutParTag(SStreamState* pState, int64_t groupId, const void* tag, int32_t tagLen) { -#ifdef USE_ROCKSDB - return streamStatePutParTag_rocksdb(pState, groupId, tag, tagLen); -#else - return tdbTbUpsert(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tag, tagLen, pState->pTdbState->txn); -#endif -} - -int32_t streamStateGetParTag(SStreamState* pState, int64_t groupId, void** tagVal, int32_t* tagLen) { -#ifdef USE_ROCKSDB - return streamStateGetParTag_rocksdb(pState, groupId, tagVal, tagLen); -#else - return tdbTbGet(pState->pTdbState->pParTagDb, &groupId, sizeof(int64_t), tagVal, tagLen); -#endif -} - int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { qWarn("try to write to cf parname"); #ifdef USE_ROCKSDB diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c new file mode 100644 index 0000000000..81dc2f78c3 --- /dev/null +++ b/source/libs/stream/src/tstreamFileState.c @@ -0,0 +1,235 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tstreamFileState.h" + +#include "taos.h" +#include "thash.h" +#include "tsimplehash.h" +#include "streamBackendRocksdb.h" + + +#define FLUSH_RATIO 0.2 +#define DEFAULT_MAX_STREAM_BUFFER_SIZE (128 * 1024 * 1024); + +struct SStreamFileState { + SList* usedBuffs; + SList* freeBuffs; + SSHashObj* rowBuffMap; + void* pFileStore; + int32_t rowSize; + int32_t keyLen; + uint64_t preCheckPointVersion; + uint64_t checkPointVersion; + TSKEY maxTs; + TSKEY deleteMark; + uint64_t maxRowCount; + uint64_t curRowCount; + ExpiredFun expFunc; +}; + +typedef SRowBuffPos SRowBuffInfo; + +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t rowSize, ExpiredFun fp, void* pFile) { + if (memSize <= 0) { + memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; + } + if (rowSize == 0) { + goto _error; + } + + SStreamFileState* pFileState = taosMemoryCalloc(1, sizeof(SStreamFileState)); + if (!pFileState) { + goto _error; + } + pFileState->usedBuffs = tdListNew(POINTER_BYTES); + pFileState->freeBuffs = tdListNew(POINTER_BYTES); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); + pFileState->rowBuffMap = tSimpleHashInit(1024, hashFn); + if (!pFileState->usedBuffs || !pFileState->freeBuffs || !pFileState->rowBuffMap) { + goto _error; + } + pFileState->rowSize = rowSize; + pFileState->preCheckPointVersion = 0; + pFileState->checkPointVersion = 1; + pFileState->pFileStore = pFile; + pFileState->expFunc = fp; + pFileState->maxRowCount = memSize / rowSize; + pFileState->curRowCount = 0; + return pFileState; + +_error: + destroyStreamFileState(pFileState); + return NULL; +} + +void destroyRowBuffPos(SRowBuffPos* pPos) { + taosMemoryFreeClear(pPos->pRowBuff); + taosMemoryFree(pPos); +} + +void destroyRowBuffPosPtr(void* ptr) { + if (!ptr) { + return; + } + void* tmp = *(void**)ptr; + SRowBuffPos* pPos = (SRowBuffPos*)tmp; + destroyRowBuffPos(pPos); +} + +void destroyStreamFileState(SStreamFileState* pFileState) { + tdListFreeP(pFileState->usedBuffs, destroyRowBuffPosPtr); + tdListFreeP(pFileState->freeBuffs, taosMemoryFree); + tSimpleHashCleanup(pFileState->rowBuffMap); +} + +void clearExpiredRowBuff(SStreamFileState* pFileState, TSKEY ts) { + SListIter iter = {0}; + tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD); + + SListNode* pNode = NULL; + while ((pNode = tdListNext(&iter)) != NULL) { + SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; + if (pFileState->expFunc(pPos->pKey, ts)) { + tdListAppend(pFileState->freeBuffs, &pPos->pRowBuff); + pPos->pRowBuff = NULL; + destroyRowBuffPos(pPos); + } + } +} + +int32_t flushRowBuff(SStreamFileState* pFileState) { + SStreamSnapshot* pFlushList = tdListNew(POINTER_BYTES); + if (!pFlushList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + uint64_t num = (uint64_t)(pFileState->curRowCount * FLUSH_RATIO); + uint64_t i = 0; + SListIter iter = {0}; + tdListInitIter(pFileState->usedBuffs, &iter, TD_LIST_FORWARD); + + SListNode* pNode = NULL; + while ((pNode = tdListNext(&iter)) != NULL && i < num) { + SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; + if (!pPos->beUsed) { + tdListAppend(pFlushList, &pPos); + i++; + } + } + flushSnapshot(pFileState->pFileStore, pFlushList, pFileState->rowSize); + return TSDB_CODE_SUCCESS; +} + +int32_t clearRowBuff(SStreamFileState* pFileState) { + clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark); + if (isListEmpty(pFileState->freeBuffs)) { + return flushRowBuff(pFileState); + } + return TSDB_CODE_SUCCESS; +} + +void* getFreeBuff(SList* lists) { + SListNode* pNode = tdListPopHead(lists); + if (!pNode) { + return NULL; + } + void* ptr = *(void**)pNode->data; + taosMemoryFree(pNode); + return ptr; +} + +SRowBuffPos* getNewRowPos(SStreamFileState* pFileState) { + SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos)); + tdListAppend(pFileState->usedBuffs, &pPos); + void* pBuff = getFreeBuff(pFileState->freeBuffs); + if (pBuff) { + pPos->pRowBuff = pBuff; + return pPos; + } + + if (pFileState->curRowCount < pFileState->maxRowCount) { + pBuff = taosMemoryCalloc(1, pFileState->rowSize); + if (pBuff) { + pPos->pRowBuff = pBuff; + pFileState->curRowCount++; + return pPos; + } + } + + int32_t code = clearRowBuff(pFileState); + ASSERT(code == 0); + pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs); + return pPos; +} + +int32_t getRowBuff(SStreamFileState* pFileState, void* pKey, int32_t keyLen, void** pVal, int32_t* pVLen) { + SRowBuffPos** pos = tSimpleHashGet(pFileState->rowBuffMap, pKey, keyLen); + if (pos) { + *pVLen = pFileState->rowSize; + *pVal = *pos; + return TSDB_CODE_SUCCESS; + } + SRowBuffPos* pNewPos = getNewRowPos(pFileState); + ASSERT(pNewPos);// todo(liuyao) delete + tSimpleHashPut(pFileState->rowBuffMap, pKey, keyLen, &pNewPos, POINTER_BYTES); + *pVLen = pFileState->rowSize; + *pVal = pNewPos; + return TSDB_CODE_SUCCESS; +} + +void* getRowBuffByPos(SStreamFileState* pFileState, SRowBuffPos* pPos) { + if (pPos->pRowBuff) { + return pPos->pRowBuff; + } + + int32_t code = clearRowBuff(pFileState); + ASSERT(code == 0); + pPos->pRowBuff = getFreeBuff(pFileState->freeBuffs); + + void* pVal = NULL; + int32_t len = 0; + streamStateGet_rocksdb(pFileState->pFileStore, pPos->pKey, &pVal, &len); + memcpy(pPos->pRowBuff, pVal, len); + taosMemoryFree(pVal); + return pPos->pRowBuff; +} + +void releaseRowBuffPos(SRowBuffPos* pBuff) { + pBuff->beUsed = false; +} + +SStreamSnapshot* getSnapshot(SStreamFileState* pFileState) { + clearExpiredRowBuff(pFileState, pFileState->maxTs - pFileState->deleteMark); + return pFileState->usedBuffs; +} + +int32_t flushSnapshot(void* pFile, SStreamSnapshot* pSnapshot, int32_t rowSize) { + int32_t code = TSDB_CODE_SUCCESS; + SListIter iter = {0}; + tdListInitIter(pSnapshot, &iter, TD_LIST_FORWARD); + + SListNode* pNode = NULL; + while ((pNode = tdListNext(&iter)) != NULL && code == TSDB_CODE_SUCCESS) { + SRowBuffPos* pPos = *(SRowBuffPos**)pNode->data; + code = streamStatePut_rocksdb(pFile, pPos->pKey, pPos->pRowBuff, rowSize); + } + return code; +} + +int32_t recoverSnapshot(SStreamFileState* pFileState) { + // 设置一个时间戳标记,小于这个时间戳的,如果缓存里没有,需要从rocks db里读取状态,大于这个时间戳的,不需要 + // 这个还需要考虑一下,如果rocks db中也没有,说明真的是新的,那么这次读取是冗余的。 + return TSDB_CODE_SUCCESS; +} \ No newline at end of file diff --git a/source/util/src/tlist.c b/source/util/src/tlist.c index 1b12ea0cdd..a89105bb42 100644 --- a/source/util/src/tlist.c +++ b/source/util/src/tlist.c @@ -46,6 +46,24 @@ void *tdListFree(SList *list) { return NULL; } +void tdListEmptyP(SList *list, FDelete fp) { + SListNode *node; + while ((node = TD_DLIST_HEAD(list)) != NULL) { + TD_DLIST_POP(list, node); + fp(node->data); + taosMemoryFree(node); + } +} + +void *tdListFreeP(SList *list, FDelete fp) { + if (list) { + tdListEmptyP(list, fp); + taosMemoryFree(list); + } + + return NULL; +} + void tdListPrependNode(SList *list, SListNode *node) { TD_DLIST_PREPEND(list, node); } void tdListAppendNode(SList *list, SListNode *node) { TD_DLIST_APPEND(list, node); } diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index ec192dc6a6..141ff2ef8e 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -255,6 +255,8 @@ ,,y,script,./test.sh -f tsim/stream/fillIntervalValue.sim ,,y,script,./test.sh -f tsim/stream/udTableAndTag0.sim ,,y,script,./test.sh -f tsim/stream/udTableAndTag1.sim +,,y,script,./test.sh -f tsim/stream/checkStreamSTable.sim +,,y,script,./test.sh -f tsim/stream/checkStreamSTable1.sim ,,y,script,./test.sh -f tsim/trans/lossdata1.sim ,,y,script,./test.sh -f tsim/trans/create_db.sim ,,y,script,./test.sh -f tsim/tmq/basic1.sim