diff --git a/include/common/tcommon.h b/include/common/tcommon.h index bdfb1d32b4..5b14fa4e49 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -169,6 +169,7 @@ typedef enum EStreamType { STREAM_PULL_OVER, STREAM_FILL_OVER, STREAM_CREATE_CHILD_TABLE, + STREAM_CHECKPOINT, } EStreamType; #pragma pack(push, 1) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index bc4037c642..0a9b016505 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -183,7 +183,6 @@ extern int32_t tsRpcRetryInterval; extern bool tsDisableStream; extern int64_t tsStreamBufferSize; -extern int64_t tsCheckpointInterval; extern bool tsFilterScalarMode; extern int32_t tsMaxStreamBackendCache; extern int32_t tsPQSortMemThreshold; diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index e263c9d236..2d20562a6c 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -363,13 +363,13 @@ typedef struct SStateStore { state_key_cmpr_fn fn, void** pVal, int32_t* pVLen); int32_t (*streamStateSessionGetKeyByRange)(SStreamState* pState, const SSessionKey* range, SSessionKey* curKey); - SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark); + SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp); TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol); bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts); bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); void (*updateInfoDestroy)(SUpdateInfo* pInfo); - SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark); + SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo); int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo); diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index bd5a3be8de..53aa9f8645 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -43,8 +43,8 @@ typedef struct SUpdateKey { // uint64_t maxDataVersion; //} SUpdateInfo; -SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark); -SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); +SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp); +SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp); TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 2964a53e79..2f2f88fad1 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -215,7 +215,6 @@ char tsUdfdResFuncs[512] = ""; // udfd resident funcs that teardown when udf char tsUdfdLdLibPath[512] = ""; bool tsDisableStream = false; int64_t tsStreamBufferSize = 128 * 1024 * 1024; -int64_t tsCheckpointInterval = 3 * 60 * 60 * 1000; bool tsFilterScalarMode = false; #ifndef _STORAGE @@ -532,7 +531,6 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, 0) != 0) return -1; - if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, 0) != 0) return -1; if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, 0) != 0) return -1; @@ -918,7 +916,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsDisableStream = cfgGetItem(pCfg, "disableStream")->bval; tsStreamBufferSize = cfgGetItem(pCfg, "streamBufferSize")->i64; - tsCheckpointInterval = cfgGetItem(pCfg, "checkpointInterval")->i64; tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval; tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 0ba9aae133..400f7e5320 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -317,8 +317,6 @@ typedef struct STimeWindowAggSupp { int64_t waterMark; TSKEY maxTs; TSKEY minTs; - TSKEY checkPointTs; - TSKEY checkPointInterval; SColumnInfoData timeWindowData; // query time window info for scalar function execution. } STimeWindowAggSupp; @@ -353,8 +351,6 @@ typedef struct SStreamScanInfo { SExprSupp* pPartScalarSup; bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. int32_t scanWinIndex; // for state operator - int32_t pullDataResIndex; - SSDataBlock* pPullDataRes; // pull data SSDataBlock SSDataBlock* pDeleteDataRes; // delete data SSDataBlock int32_t deleteDataIndex; STimeWindow updateWin; @@ -429,6 +425,11 @@ typedef struct SMergeAlignedIntervalAggOperatorInfo { SResultRow* pResultRow; } SMergeAlignedIntervalAggOperatorInfo; +typedef struct SOpCheckPointInfo { + uint16_t checkPointId; + SHashObj* children; // key:child id +} SOpCheckPointInfo; + typedef struct SStreamIntervalOperatorInfo { SOptrBasicInfo binfo; // basic info SAggSupporter aggSup; // aggregate supporter @@ -457,9 +458,12 @@ typedef struct SStreamIntervalOperatorInfo { SArray* pUpdated; SSHashObj* pUpdatedMap; int64_t dataVersion; - SStateStore statestore; + SStateStore stateStore; bool recvGetAll; SHashObj* pFinalPullDataMap; + SOpCheckPointInfo checkPointInfo; + bool reCkBlock; + SSDataBlock* pCheckpointRes; } SStreamIntervalOperatorInfo; typedef struct SDataGroupInfo { @@ -493,7 +497,6 @@ typedef struct SStreamSessionAggOperatorInfo { STimeWindowAggSupp twAggSup; SSDataBlock* pWinBlock; // window result SSDataBlock* pDelRes; // delete result - SSDataBlock* pUpdateRes; // update window bool returnUpdate; SSHashObj* pStDeleted; void* pDelIterator; @@ -507,6 +510,8 @@ typedef struct SStreamSessionAggOperatorInfo { int64_t dataVersion; SArray* historyWins; bool isHistoryOp; + bool reCkBlock; + SSDataBlock* pCheckpointRes; } SStreamSessionAggOperatorInfo; typedef struct SStreamStateAggOperatorInfo { @@ -528,6 +533,8 @@ typedef struct SStreamStateAggOperatorInfo { int64_t dataVersion; bool isHistoryOp; SArray* historyWins; + bool reCkBlock; + SSDataBlock* pCheckpointRes; } SStreamStateAggOperatorInfo; typedef struct SStreamPartitionOperatorInfo { @@ -687,6 +694,9 @@ uint64_t calcGroupId(char* pData, int32_t len); void streamOpReleaseState(struct SOperatorInfo* pOperator); void streamOpReloadState(struct SOperatorInfo* pOperator); +int32_t encodeSTimeWindowAggSupp(void **buf, STimeWindowAggSupp* pTwAggSup); +void* decodeSTimeWindowAggSupp(void *buf, STimeWindowAggSupp* pTwAggSup); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 7798ded61b..98534cb231 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -1343,6 +1343,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN); pInfo->srcRowIndex = -1; } break; + case STREAM_CHECKPOINT: case STREAM_CREATE_CHILD_TABLE: { return pBlock; } break; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 9228c923a6..0eb20ad64e 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1128,9 +1128,13 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { printDataBlock(pInfo->pDelRes, "stream partitionby delete"); return pInfo->pDelRes; } break; - default: - ASSERTS(pBlock->info.type == STREAM_CREATE_CHILD_TABLE || pBlock->info.type == STREAM_RETRIEVE, "invalid SSDataBlock type"); + case STREAM_CREATE_CHILD_TABLE: + case STREAM_RETRIEVE: + case STREAM_CHECKPOINT: { return pBlock; + } + default: + ASSERTS(0, "invalid SSDataBlock type"); } // there is an scalar expression that needs to be calculated right before apply the group aggregation. @@ -1183,8 +1187,8 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->partitionSup = *pParSup; pScanInfo->pPartScalarSup = pExpr; - if (!pScanInfo->igCheckUpdate && !pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0); + if (!pScanInfo->pUpdateInfo) { + pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate); } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 74210ee06e..220740096d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -40,11 +40,12 @@ int32_t scanDebug = 0; -#define MULTI_READER_MAX_TABLE_NUM 5000 -#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) -#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) -#define STREAM_SCAN_OP_NAME "StreamScanOperator" -#define STREAM_SCAN_OP_STATE_NAME "StreamScanFillHistoryState" +#define MULTI_READER_MAX_TABLE_NUM 5000 +#define SET_REVERSE_SCAN_FLAG(_info) ((_info)->scanFlag = REVERSE_SCAN) +#define SWITCH_ORDER(n) (((n) = ((n) == TSDB_ORDER_ASC) ? TSDB_ORDER_DESC : TSDB_ORDER_ASC)) +#define STREAM_SCAN_OP_NAME "StreamScanOperator" +#define STREAM_SCAN_OP_STATE_NAME "StreamScanFillHistoryState" +#define STREAM_SCAN_OP_CHECKPOINT_NAME "StreamScanOperator_Checkpoint" typedef struct STableMergeScanExecInfo { SFileBlockLoadRecorder blockRecorder; @@ -1756,21 +1757,33 @@ static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBl } } -//int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) { -// int32_t len = updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo); -// *pBuff = taosMemoryCalloc(1, len); -// updateInfoSerialize(*pBuff, len, pInfo->pUpdateInfo); -// return len; -//} +int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) { + int32_t len = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo); + len += encodeSTimeWindowAggSupp(NULL, &pInfo->twAggSup); + *pBuff = taosMemoryCalloc(1, len); + void* buf = *pBuff; + encodeSTimeWindowAggSupp(&buf, &pInfo->twAggSup); + pInfo->stateStore.updateInfoSerialize(buf, len, pInfo->pUpdateInfo); + return len; +} + +void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) { + void* pBuf = NULL; + int32_t len = streamScanOperatorEncode(pInfo, pBuf); + pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len); +} // other properties are recovered from the execution plan void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) { if (!pBuff || len == 0) { return; } + void* buf = pBuff; + buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + int32_t tlen = len - (pBuff - buf); - void* pUpInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); - int32_t code = pInfo->stateStore.updateInfoDeserialize(pBuff, len, pUpInfo); + void* pUpInfo = pInfo->stateStore.updateInfoInit(0, TSDB_TIME_PRECISION_MILLI, 0, pInfo->igCheckUpdate); + int32_t code = pInfo->stateStore.updateInfoDeserialize(buf, tlen, pUpInfo); if (code == TSDB_CODE_SUCCESS) { pInfo->pUpdateInfo = pUpInfo; } @@ -1985,6 +1998,11 @@ FETCH_NEXT_BLOCK: } } } break; + case STREAM_CHECKPOINT: { + streamScanOperatorSaveCheckpoint(pInfo); + pAPI->stateStore.streamStateCommit(pInfo->pState); + pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); + } break; default: break; } @@ -2317,7 +2335,6 @@ static void destroyStreamScanOperatorInfo(void* param) { pStreamScan->stateStore.updateInfoDestroy(pStreamScan->pUpdateInfo); blockDataDestroy(pStreamScan->pRes); blockDataDestroy(pStreamScan->pUpdateRes); - blockDataDestroy(pStreamScan->pPullDataRes); blockDataDestroy(pStreamScan->pDeleteDataRes); blockDataDestroy(pStreamScan->pUpdateDataRes); blockDataDestroy(pStreamScan->pCreateTbRes); @@ -2526,7 +2543,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; pInfo->groupId = 0; - pInfo->pPullDataRes = createSpecialDataBlock(STREAM_RETRIEVE); pInfo->pStreamScanOp = pOperator; pInfo->deleteDataIndex = 0; pInfo->pDeleteDataRes = createSpecialDataBlock(STREAM_DELETE_DATA); @@ -2545,9 +2561,11 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (pTaskInfo->streamInfo.pState) { void* buff = NULL; int32_t len = 0; - pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_NAME, strlen(STREAM_SCAN_OP_NAME), &buff, &len); - streamScanOperatorDecode(buff, len, pInfo); - taosMemoryFree(buff); + int32_t res = pAPI->stateStore.streamStateGetInfo(pTaskInfo->streamInfo.pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + streamScanOperatorDecode(buff, len, pInfo); + taosMemoryFree(buff); + } } setOperatorInfo(pOperator, STREAM_SCAN_OP_NAME, QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN, false, OP_NOT_OPENED, pInfo, diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index c4111ded92..7c56acbeff 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -26,11 +26,14 @@ #include "tlog.h" #include "ttime.h" -#define IS_FINAL_OP(op) ((op)->isFinal) -#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); -#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState" -#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState" -#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState" +#define IS_FINAL_OP(op) ((op)->isFinal) +#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); +#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState" +#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState" +#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState" +#define STREAM_INTERVAL_OP_CHECKPOINT_NAME "StreamIntervalOperator_Checkpoint" +#define STREAM_SESSION_OP_CHECKPOINT_NAME "StreamSessionOperator_Checkpoint" +#define STREAM_STATE_OP_CHECKPOINT_NAME "StreamStateOperator_Checkpoint" typedef struct SStateWindowInfo { SResultWindowInfo winInfo; @@ -1450,7 +1453,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin for (int32_t i = *index; i < size; i++) { SWinKey* pWin = taosArrayGet(pWins, i); void* tbname = NULL; - pInfo->statestore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname); + pInfo->stateStore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname); if (tbname == NULL) { appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL); } else { @@ -1458,7 +1461,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName); } - pInfo->statestore.streamStateFreeVal(tbname); + pInfo->stateStore.streamStateFreeVal(tbname); (*index)++; } } @@ -1513,13 +1516,14 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pPullDataRes); taosArrayDestroy(pInfo->pDelWins); blockDataDestroy(pInfo->pDelRes); - pInfo->statestore.streamFileStateDestroy(pInfo->pState->pFileState); + pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); taosMemoryFreeClear(pInfo->pState); nodesDestroyNode((SNode*)pInfo->pPhyNode); colDataDestroy(&pInfo->twAggSup.timeWindowData); pInfo->groupResInfo.pRows = taosArrayDestroy(pInfo->groupResInfo.pRows); cleanupExprSupp(&pInfo->scalarSupp); + blockDataDestroy(pInfo->pCheckpointRes); taosMemoryFreeClear(param); } @@ -1599,7 +1603,7 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; if (!pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark); + pScanInfo->pUpdateInfo = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate); } pScanInfo->interval = pInfo->interval; @@ -1607,12 +1611,6 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt pScanInfo->pState = pInfo->pState; } -void initStreamFunciton(SqlFunctionCtx* pCtx, int32_t numOfExpr) { - for (int32_t i = 0; i < numOfExpr; i++) { - // pCtx[i].isStream = true; - } -} - SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { SIntervalAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SIntervalAggOperatorInfo)); @@ -2089,7 +2087,7 @@ static void clearStreamIntervalOperator(SStreamIntervalOperatorInfo* pInfo) { clearDiskbasedBuf(pInfo->aggSup.pResultBuf); initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->aggSup.currentPageId = -1; - pInfo->statestore.streamStateClear(pInfo->pState); + pInfo->stateStore.streamStateClear(pInfo->pState); } static void clearSpecialDataBlock(SSDataBlock* pBlock) { @@ -2368,7 +2366,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p .groupId = groupId, }; void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); - if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->statestore) && isClosed && !chIds) { + if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore) && isClosed && !chIds) { SPullWindowInfo pull = { .window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request @@ -2399,7 +2397,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p } int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput, - pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->statestore); + pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->stateStore); pResult = (SResultRow*)pResPos->pRowBuff; if (code != TSDB_CODE_SUCCESS || pResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); @@ -2487,6 +2485,196 @@ static void resetUnCloseWinInfo(SSHashObj* winMap) { } } +int32_t encodeSWinKey(void **buf, SWinKey* key) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, key->ts); + tlen += taosEncodeFixedU64(buf, key->groupId); + return tlen; +} + +void* decodeSWinKey(void *buf, SWinKey* key) { + buf = taosDecodeFixedI64(buf, &key->ts); + buf = taosDecodeFixedU64(buf, &key->groupId); + return buf; +} + +int32_t encodeSRowBuffPos(void **buf, SRowBuffPos* pos) { + int32_t tlen = 0; + tlen += encodeSWinKey(buf, pos->pKey); + return tlen; +} + +void* decodeSRowBuffPos(void *buf, SRowBuffPos* pos) { + buf = decodeSWinKey(buf, pos->pKey); + return buf; +} + +int32_t encodeSTimeWindowAggSupp(void **buf, STimeWindowAggSupp* pTwAggSup) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pTwAggSup->minTs); + tlen += taosEncodeFixedI64(buf, pTwAggSup->maxTs); + return tlen; +} + +void* decodeSTimeWindowAggSupp(void *buf, STimeWindowAggSupp* pTwAggSup) { + buf = taosDecodeFixedI64(buf, &pTwAggSup->minTs); + buf = taosDecodeFixedI64(buf, &pTwAggSup->maxTs); + return buf; +} + +int32_t encodeSTimeWindow(void **buf, STimeWindow* pWin) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pWin->skey); + tlen += taosEncodeFixedI64(buf, pWin->ekey); + return tlen; +} + +void* decodeSTimeWindow(void *buf, STimeWindow* pWin) { + buf = taosDecodeFixedI64(buf, &pWin->skey); + buf = taosDecodeFixedI64(buf, &pWin->ekey); + return buf; +} + +int32_t encodeSPullWindowInfo(void **buf, SPullWindowInfo* pPullInfo) { + int32_t tlen = 0; + tlen += encodeSTimeWindow(buf, &pPullInfo->calWin); + tlen += taosEncodeFixedU64(buf, pPullInfo->groupId); + tlen += encodeSTimeWindow(buf, &pPullInfo->window); + return tlen; +} + +void* decodeSPullWindowInfo(void *buf, SPullWindowInfo* pPullInfo) { + buf = decodeSTimeWindow(buf, &pPullInfo->calWin); + buf = taosDecodeFixedU64(buf, &pPullInfo->groupId); + buf = decodeSTimeWindow(buf, &pPullInfo->window); + return buf; +} + +int32_t encodeSPullWindowInfoArray(void **buf, SArray* pPullInfos) { + int32_t tlen = 0; + int32_t size = taosArrayGetSize(pPullInfos); + tlen += taosEncodeFixedI32(buf, size); + for (int32_t i = 0; i < size; i++) { + void* pItem = taosArrayGet(pPullInfos, i); + tlen += encodeSPullWindowInfo(buf, pItem); + } + return tlen; +} + +void* decodeSPullWindowInfoArray(void *buf, SArray* pPullInfos) { + int32_t size = 0; + buf = taosDecodeFixedI32(buf, &size); + for (int32_t i = 0; i < size; i++) { + SPullWindowInfo item = {0}; + buf = decodeSPullWindowInfo(buf, &item); + taosArrayPush(pPullInfos, &item); + } + return buf; +} + +int32_t doStreamIntervalEncodeOpState(void **buf, SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return 0; + } + + // 1.pResultRowHashTable + int32_t tlen = 0; + int32_t mapSize = tSimpleHashGetSize(pInfo->aggSup.pResultRowHashTable); + tlen += taosEncodeFixedI32(buf, mapSize); + void *pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) { + void *key = taosHashGetKey(pIte, &keyLen); + tlen += encodeSWinKey(buf, key); + SRowBuffPos* pPos = *(void**)pIte; + tlen += encodeSRowBuffPos(buf, pPos); + } + + // 2.twAggSup + tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.pPullDataMap + int32_t size = taosHashGetSize(pInfo->pPullDataMap); + tlen += taosEncodeFixedI32(buf, size); + pIte = NULL; + keyLen = 0; + while ((pIte = taosHashIterate(pInfo->pPullDataMap, pIte)) != NULL) { + void *key = taosHashGetKey(pIte, &keyLen); + tlen += encodeSWinKey(buf, key); + SArray* pArray = (SArray*)pIte; + int32_t chSize = taosArrayGetSize(pArray); + tlen += taosEncodeFixedI32(buf, chSize); + for (int32_t i = 0; i < chSize; i++) { + void* pChItem = taosArrayGet(pArray, i); + tlen += taosEncodeFixedI32(buf, *(int32_t*)pChItem); + } + } + + // 4.pPullWins + tlen += encodeSPullWindowInfoArray(buf, pInfo->pPullWins); + + // 5.dataVersion + tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); + + return tlen; +} + +void doStreamIntervalDecodeOpState(void* buf, SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return ; + } + + // 1.pResultRowHashTable + int32_t mapSize = 0; + buf = taosDecodeFixedI32(buf, &mapSize); + for (int32_t i = 0; i < mapSize; i++) { + SWinKey key = {0}; + SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos)); + buf = decodeSWinKey(buf, &key); + buf = decodeSRowBuffPos(buf, pPos); + tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES); + } + + // 2.twAggSup + buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.pPullDataMap + int32_t size = 0; + buf = taosDecodeFixedI32(buf, &size); + for (int32_t i = 0; i < size; i++) { + SWinKey key = {0}; + SArray* pArray = taosArrayInit(0, sizeof(int32_t)); + buf = decodeSWinKey(buf, &key); + int32_t chSize = 0; + buf = taosDecodeFixedI32(buf, &chSize); + for (int32_t i = 0; i < chSize; i++) { + int32_t chId = 0; + buf = taosDecodeFixedI32(buf, &chId); + taosArrayPush(pArray, &chId); + } + taosHashPut(pInfo->pPullDataMap, &key, sizeof(SWinKey), &pArray, POINTER_BYTES); + } + + // 4.pPullWins + buf = decodeSPullWindowInfoArray(buf, pInfo->pPullWins); + + // 5.dataVersion + buf = taosDecodeFixedI64(buf, &pInfo->dataVersion); +} + +void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + int32_t len = doStreamIntervalEncodeOpState(NULL, pOperator); + void* buf = taosMemoryCalloc(1, len); + void* pBuf = buf; + len = doStreamIntervalEncodeOpState(&pBuf, pOperator); + pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, + strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len); +} + static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2525,21 +2713,18 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable); } + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, IS_FINAL_OP(pInfo) ? "interval final ck" : "interval semi ck"); + return pInfo->pCheckpointRes; + } + setOperatorCompleted(pOperator); if (!IS_FINAL_OP(pInfo)) { clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer clearStreamIntervalOperator(pInfo); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); qDebug("===stream===clear semi operator"); - } else { - if (pInfo->twAggSup.maxTs > 0 && - pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { - pAPI->stateStore.streamStateCommit(pInfo->pState); - pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); - pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; - } - qDebug("===stream===interval final close"); } return NULL; } else { @@ -2629,10 +2814,21 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } continue; } else if (pBlock->info.type == STREAM_PULL_OVER && IS_FINAL_OP(pInfo)) { - processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, pInfo->numOfChild, pOperator); + processPullOver(pBlock, pInfo->pPullDataMap, pInfo->pFinalPullDataMap, &pInfo->interval, pInfo->pPullWins, + pInfo->numOfChild, pOperator); continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; + } else if (pBlock->info.type == STREAM_CHECKPOINT) { + doStreamIntervalSaveCheckpoint(pOperator); + pAPI->stateStore.streamStateCommit(pInfo->pState); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); + copyDataBlock(pInfo->pCheckpointRes, pBlock); + pOperator->status = OP_RES_TO_RETURN; + qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, + IS_FINAL_OP(pInfo) ? "interval final" : "interval semi", pInfo->numOfDatapack); + pInfo->numOfDatapack = 0; + break; } else { ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -2689,6 +2885,12 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->binfo.pRes, IS_FINAL_OP(pInfo) ? "interval final ck" : "interval semi ck"); + return pInfo->pCheckpointRes; + } + return NULL; } @@ -2733,7 +2935,7 @@ void streamIntervalReleaseState(SOperatorInfo* pOperator) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; int32_t resSize = sizeof(TSKEY); - pInfo->statestore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize); + pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize); } SStreamIntervalOperatorInfo* pInfo = pOperator->info; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; @@ -2749,11 +2951,11 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; int32_t size = 0; void* pBuf = NULL; - int32_t code = pInfo->statestore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, + int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size); TSKEY ts = *(TSKEY*)pBuf; taosMemoryFree(pBuf); - pInfo->statestore.streamStateReloadInfo(pInfo->pState, ts); + pInfo->stateStore.streamStateReloadInfo(pInfo->pState, ts); } SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.reloadStreamStateFn) { @@ -2787,9 +2989,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, .deleteMark = getDeleteMark(pIntervalPhyNode), .deleteMarkSaved = 0, .calTriggerSaved = 0, - .checkPointTs = 0, - .checkPointInterval = - convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), }; ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; @@ -2819,7 +3018,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } - initStreamFunciton(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -2860,8 +3058,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo)); pInfo->dataVersion = 0; - pInfo->statestore = pTaskInfo->storageAPI.stateStore; + pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; + pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; @@ -2879,6 +3078,15 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } + // for stream + void* buff = NULL; + int32_t len = 0; + int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamIntervalDecodeOpState(buff, pOperator); + taosMemoryFree(buff); + } + return pOperator; _error: @@ -2913,7 +3121,6 @@ void destroyStreamSessionAggOperatorInfo(void* param) { colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pWinBlock); - blockDataDestroy(pInfo->pUpdateRes); tSimpleHashCleanup(pInfo->pStDeleted); taosArrayDestroy(pInfo->historyWins); @@ -2928,7 +3135,6 @@ int32_t initBasicInfoEx(SOptrBasicInfo* pBasicInfo, SExprSupp* pSup, SExprInfo* return code; } - initStreamFunciton(pSup->pCtx, pSup->numOfExprs); for (int32_t i = 0; i < numOfCols; ++i) { pSup->pCtx[i].saveHandle.pBuf = NULL; } @@ -2960,7 +3166,7 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; pScanInfo->pState = pAggSup->pState; if (!pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark); + pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark, pScanInfo->igCheckUpdate); } pScanInfo->twAggSup = *pTwSup; } @@ -3566,6 +3772,114 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { } } +int32_t encodeSSessionKey(void **buf, SSessionKey* key) { + int32_t tlen = 0; + tlen += encodeSTimeWindow(buf, &key->win); + tlen += taosEncodeFixedU64(buf, key->groupId); + return tlen; +} + +void* decodeSSessionKey(void *buf, SSessionKey* key) { + buf = decodeSTimeWindow(buf, &key->win); + buf = taosDecodeFixedU64(buf, &key->groupId); + return buf; +} + +int32_t encodeSResultWindowInfo(void **buf, SResultWindowInfo* key, int32_t outLen) { + int32_t tlen = 0; + tlen += taosEncodeFixedBool(buf, key->isOutput); + tlen += taosEncodeBinary(buf, key->pOutputBuf, outLen); + tlen += encodeSSessionKey(buf, &key->sessionWin); + return tlen; +} + +void* decodeSResultWindowInfo(void *buf, SResultWindowInfo* key, int32_t outLen) { + buf = taosDecodeFixedBool(buf, &key->isOutput); + buf = taosDecodeBinary(buf, &key->pOutputBuf, outLen); + buf = decodeSSessionKey(buf, &key->sessionWin); + return buf; +} + +int32_t doStreamSessionEncodeOpState(void **buf, SOperatorInfo* pOperator) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return 0; + } + + // 1.streamAggSup.pResultRows + int32_t tlen = 0; + int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows); + tlen += taosEncodeFixedI32(buf, mapSize); + void *pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { + void *key = taosHashGetKey(pIte, &keyLen); + tlen += encodeSSessionKey(buf, key); + tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); + } + + // 2.twAggSup + tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.pChildren + int32_t size = taosArrayGetSize(pInfo->pChildren); + tlen += taosEncodeFixedI32(buf, size); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); + tlen += doStreamSessionEncodeOpState(buf, pChOp); + } + + // 4.dataVersion + tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); + + return tlen; +} + +void* doStreamSessionDecodeOpState(void* buf, SOperatorInfo* pOperator) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return buf; + } + + // 1.streamAggSup.pResultRows + int32_t mapSize = 0; + buf = taosDecodeFixedI32(buf, &mapSize); + for (int32_t i = 0; i < mapSize; i++) { + SSessionKey key = {0}; + SResultWindowInfo winfo = {0}; + buf = decodeSSessionKey(buf, &key); + buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); + tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); + } + + // 2.twAggSup + buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.pChildren + int32_t size = 0; + buf = taosDecodeFixedI32(buf, &size); + ASSERT(size <= taosArrayGetSize(pInfo->pChildren)); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); + buf = doStreamSessionDecodeOpState(buf, pChOp); + } + + // 4.dataVersion + buf = taosDecodeFixedI64(buf, &pInfo->dataVersion); + return buf; +} + +void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + int32_t len = doStreamSessionEncodeOpState(NULL, pOperator); + void* buf = taosMemoryCalloc(1, len); + void* pBuf = buf; + len = doStreamSessionEncodeOpState(&pBuf, pOperator); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, + strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len); +} + static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; @@ -3585,6 +3899,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { return pBInfo->pRes; } + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, IS_FINAL_OP(pInfo) ? "final session ck" : "single session ck"); + return pInfo->pCheckpointRes; + } + setOperatorCompleted(pOperator); return NULL; } @@ -3626,6 +3946,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; + } else if (pBlock->info.type == STREAM_CHECKPOINT) { + doStreamSessionSaveCheckpoint(pOperator); + pAggSup->stateStore.streamStateCommit(pAggSup->pState); + setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId); + copyDataBlock(pInfo->pCheckpointRes, pBlock); + break; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -3690,6 +4016,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { return pBInfo->pRes; } + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, IS_FINAL_OP(pInfo) ? "final session ck" : "single session ck"); + return pInfo->pCheckpointRes; + } + setOperatorCompleted(pOperator); return NULL; } @@ -3809,6 +4141,17 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->isHistoryOp = pHandle->fillHistory; } + pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + + // for stream + void* buff = NULL; + int32_t len = 0; + int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamSessionDecodeOpState(buff, pOperator); + taosMemoryFree(buff); + } + setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, @@ -3860,6 +4203,12 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, "semi session ck"); + return pInfo->pCheckpointRes; + } + if (pOperator->status == OP_RES_TO_RETURN) { clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer @@ -3880,7 +4229,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { - clearSpecialDataBlock(pInfo->pUpdateRes); pOperator->status = OP_RES_TO_RETURN; break; } @@ -3900,6 +4248,12 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; + } else if (pBlock->info.type == STREAM_CHECKPOINT) { + doStreamSessionSaveCheckpoint(pOperator); + pAggSup->stateStore.streamStateCommit(pAggSup->pState); + setStreamDataVersion(pOperator->pTaskInfo, pInfo->dataVersion, pAggSup->pState->checkPointId); + pOperator->status = OP_RES_TO_RETURN; + break; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -3943,6 +4297,12 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { return pInfo->pDelRes; } + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, "semi session ck"); + return pInfo->pCheckpointRes; + } + clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer clearStreamSessionOperator(pInfo); @@ -3965,8 +4325,6 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream char* name = (pInfo->isFinal) ? "StreamSessionFinalAggOperator" : "StreamSessionSemiAggOperator"; if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { - pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); - blockDataEnsureCapacity(pInfo->pUpdateRes, 128); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL); } @@ -4190,6 +4548,76 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } } +int32_t doStreamStateEncodeOpState(void **buf, SOperatorInfo* pOperator) { + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return 0; + } + + // 1.streamAggSup.pResultRows + int32_t tlen = 0; + int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows); + tlen += taosEncodeFixedI32(buf, mapSize); + void *pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { + void *key = taosHashGetKey(pIte, &keyLen); + tlen += encodeSSessionKey(buf, key); + tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); + } + + // 2.twAggSup + tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.pChildren + int32_t size = taosArrayGetSize(pInfo->pChildren); + tlen += taosEncodeFixedI32(buf, size); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); + tlen += doStreamSessionEncodeOpState(buf, pChOp); + } + + // 4.dataVersion + tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); + + return tlen; +} + +void* doStreamStateDecodeOpState(void* buf, SOperatorInfo* pOperator) { + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return buf; + } + + // 1.streamAggSup.pResultRows + int32_t mapSize = 0; + buf = taosDecodeFixedI32(buf, &mapSize); + for (int32_t i = 0; i < mapSize; i++) { + SSessionKey key = {0}; + SResultWindowInfo winfo = {0}; + buf = decodeSSessionKey(buf, &key); + buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); + tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); + } + + // 2.twAggSup + buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.pChildren + int32_t size = 0; + buf = taosDecodeFixedI32(buf, &size); + ASSERT(size <= taosArrayGetSize(pInfo->pChildren)); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); + buf = doStreamStateDecodeOpState(buf, pChOp); + } + + // 4.dataVersion + buf = taosDecodeFixedI64(buf, &pInfo->dataVersion); + return buf; +} + static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -4428,6 +4856,17 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->isHistoryOp = pHandle->fillHistory; } + pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + + // for stream + void* buff = NULL; + int32_t len = 0; + int32_t res = pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, strlen(STREAM_STATE_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamStateDecodeOpState(buff, pOperator); + taosMemoryFree(buff); + } + setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, @@ -5048,14 +5487,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable); } - setOperatorCompleted(pOperator); - if (pInfo->twAggSup.maxTs > 0 && - pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { - pAPI->stateStore.streamStateCommit(pInfo->pState); - pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); - pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, "single interval ck"); + return pInfo->pCheckpointRes; } + + setOperatorCompleted(pOperator); return NULL; } @@ -5093,6 +5531,15 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { printDataBlock(pBlock, "single interval"); return pBlock; + } else if (pBlock->info.type == STREAM_CHECKPOINT) { + doStreamIntervalSaveCheckpoint(pOperator); + pAPI->stateStore.streamStateCommit(pInfo->pState); + setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); + pInfo->reCkBlock = true; + copyDataBlock(pInfo->pCheckpointRes, pBlock); + qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack); + pInfo->numOfDatapack = 0; + break; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -5155,6 +5602,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, "single interval ck"); + return pInfo->pCheckpointRes; + } + return NULL; } @@ -5186,10 +5639,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys .calTrigger = pIntervalPhyNode->window.triggerType, .maxTs = INT64_MIN, .minTs = INT64_MAX, - .deleteMark = getDeleteMark(pIntervalPhyNode), - .checkPointTs = 0, - .checkPointInterval = - convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), + .deleteMark = getDeleteMark(pIntervalPhyNode) }; ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); @@ -5203,7 +5653,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys SExprSupp* pSup = &pOperator->exprSupp; initBasicInfo(&pInfo->binfo, pResBlock); - initStreamFunciton(pSup->pCtx, pSup->numOfExprs); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; @@ -5261,8 +5710,18 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); - pInfo->statestore = pTaskInfo->storageAPI.stateStore; + pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; + pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + + // for stream + void* buff = NULL; + int32_t len = 0; + int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamIntervalDecodeOpState(buff, pOperator); + taosMemoryFree(buff); + } initIntervalDownStream(downstream, pPhyNode->type, pInfo); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 83976523c7..8e216f21c4 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "streamInc.h" +#include "streamInt.h" int32_t tEncodeSStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq) { if (tStartEncode(pEncoder) < 0) return -1; diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 85be120dbd..8250ea8c57 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -89,11 +89,11 @@ static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t w return watermark; } -SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark) { - return updateInfoInit(pInterval->interval, pInterval->precision, watermark); +SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp) { + return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp); } -SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark) { +SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp) { SUpdateInfo *pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); if (pInfo == NULL) { return NULL; @@ -104,30 +104,33 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma pInfo->interval = adjustInterval(interval, precision); pInfo->watermark = adjustWatermark(pInfo->interval, interval, watermark); - uint64_t bfSize = (uint64_t)(pInfo->watermark / pInfo->interval); + uint64_t bfSize = 0; + if (!igUp) { + bfSize = (uint64_t)(pInfo->watermark / pInfo->interval); - pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void *)); - if (pInfo->pTsSBFs == NULL) { - updateInfoDestroy(pInfo); - return NULL; + pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void *)); + if (pInfo->pTsSBFs == NULL) { + updateInfoDestroy(pInfo); + return NULL; + } + windowSBfAdd(pInfo, bfSize); + + pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY)); + if (pInfo->pTsBuckets == NULL) { + updateInfoDestroy(pInfo); + return NULL; + } + + TSKEY dumy = 0; + for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) { + taosArrayPush(pInfo->pTsBuckets, &dumy); + } + pInfo->numBuckets = DEFAULT_BUCKET_SIZE; + pInfo->pCloseWinSBF = NULL; + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); + pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); } pInfo->numSBFs = bfSize; - windowSBfAdd(pInfo, bfSize); - - pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY)); - if (pInfo->pTsBuckets == NULL) { - updateInfoDestroy(pInfo); - return NULL; - } - - TSKEY dumy = 0; - for (uint64_t i = 0; i < DEFAULT_BUCKET_SIZE; ++i) { - taosArrayPush(pInfo->pTsBuckets, &dumy); - } - pInfo->numBuckets = DEFAULT_BUCKET_SIZE; - pInfo->pCloseWinSBF = NULL; - _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); - pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); pInfo->maxDataVersion = 0; return pInfo; } diff --git a/tests/script/tsim/stream/checkpoint0.sim b/tests/script/tsim/stream/checkpoint0.sim new file mode 100644 index 0000000000..3f2457347a --- /dev/null +++ b/tests/script/tsim/stream/checkpoint0.sim @@ -0,0 +1,127 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 -v debugFlag 135 +system sh/exec.sh -n dnode1 -s start +sleep 50 +sql connect + +print =============== create database +sql create database test vgroups 1; +sql select * from information_schema.ins_databases +if $rows != 3 then + return -1 +endi + +print $data00 $data01 $data02 + +sql use test; + + +sql create table t1(ts timestamp, a int, b int , c int, d double); +sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s); +sql insert into t1 values(1648791213000,1,2,3,1.0); +sql insert into t1 values(1648791213001,2,2,3,1.1); + +$loop_count = 0 + +loop0: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop0 +endi + +# row 0 +if $data01 != 2 then + print =====data01=$data01 + goto loop0 +endi + +if $data02 != 3 then + print =====data02=$data02 + goto loop0 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT + +system sh/exec.sh -n dnode1 -s start + +sql insert into t1 values(1648791213002,3,2,3,1.1); + +$loop_count = 0 + +loop1: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 1 + goto loop1 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop1 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop1 +endi + +sql insert into t1 values(1648791223002,4,2,3,1.1); + +$loop_count = 0 + +loop2: +sleep 1000 + +sql select * from streamt; + +$loop_count = $loop_count + 1 +if $loop_count == 20 then + return -1 +endi + +if $rows != 2 then + print =====rows=$rows expect 2 + goto loop2 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop2 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop2 +endi + +# row 1 +if $data11 != 1 then + print =====data01=$data01 + goto loop2 +endi + +if $data12 != 4 then + print =====data02=$data02 + goto loop2 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT \ No newline at end of file