From 3005fd642c5f7af6259cdce00b668e96839b176a Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 18 Aug 2023 15:33:13 +0800 Subject: [PATCH] merge 3.0 --- source/libs/executor/src/scanoperator.c | 2 +- .../executor/src/streamtimewindowoperator.c | 639 ++++++++++++++++-- 2 files changed, 574 insertions(+), 67 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5e9d047b47..741e8ff43a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2226,7 +2226,7 @@ FETCH_NEXT_BLOCK: if (pBlock->info.type == STREAM_CHECKPOINT) { streamScanOperatorSaveCheckpoint(pInfo); } - printDataBlock(pBlock, "stream scan ck"); + // printDataBlock(pBlock, "stream scan ck"); return pInfo->pCheckpointRes; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index a718373f60..4757aa872f 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -18,6 +18,7 @@ #include "functionMgt.h" #include "operator.h" #include "querytask.h" +#include "tchecksum.h" #include "tcommon.h" #include "tcompare.h" #include "tdatablock.h" @@ -26,12 +27,15 @@ #include "tlog.h" #include "ttime.h" -#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) -#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) -#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); -#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState" -#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState" -#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState" +#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) +#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) +#define DEAULT_DELETE_MARK (1000LL * 60LL * 60LL * 24LL * 365LL * 10LL); +#define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState" +#define STREAM_SESSION_OP_STATE_NAME "StreamSessionHistoryState" +#define STREAM_STATE_OP_STATE_NAME "StreamStateHistoryState" +#define STREAM_INTERVAL_OP_CHECKPOINT_NAME "StreamIntervalOperator_Checkpoint" +#define STREAM_SESSION_OP_CHECKPOINT_NAME "StreamSessionOperator_Checkpoint" +#define STREAM_STATE_OP_CHECKPOINT_NAME "StreamStateOperator_Checkpoint" typedef struct SStateWindowInfo { SResultWindowInfo winInfo; @@ -353,7 +357,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin for (int32_t i = *index; i < size; i++) { SWinKey* pWin = taosArrayGet(pWins, i); void* tbname = NULL; - pInfo->statestore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname); + pInfo->stateStore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname); if (tbname == NULL) { appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL); } else { @@ -361,7 +365,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName); } - pInfo->statestore.streamStateFreeVal(tbname); + pInfo->stateStore.streamStateFreeVal(tbname); (*index)++; } } @@ -381,7 +385,7 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { blockDataDestroy(pInfo->pPullDataRes); taosArrayDestroy(pInfo->pDelWins); blockDataDestroy(pInfo->pDelRes); - pInfo->statestore.streamFileStateDestroy(pInfo->pState->pFileState); + pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState); taosMemoryFreeClear(pInfo->pState); nodesDestroyNode((SNode*)pInfo->pPhyNode); @@ -392,6 +396,8 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { pInfo->pUpdatedMap = NULL; pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); + blockDataDestroy(pInfo->pCheckpointRes); + taosMemoryFreeClear(param); } @@ -416,7 +422,8 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; if (!pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark); + pScanInfo->pUpdateInfo = + pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate); } pScanInfo->interval = pInfo->interval; @@ -513,7 +520,7 @@ static void clearStreamIntervalOperator(SStreamIntervalOperatorInfo* pInfo) { clearDiskbasedBuf(pInfo->aggSup.pResultBuf); initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->aggSup.currentPageId = -1; - pInfo->statestore.streamStateClear(pInfo->pState); + pInfo->stateStore.streamStateClear(pInfo->pState); } static void clearSpecialDataBlock(SSDataBlock* pBlock) { @@ -745,11 +752,6 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN return startPos; } -static void setStreamDataVersion(SExecTaskInfo* pTaskInfo, int64_t version, int64_t ckId) { - pTaskInfo->streamInfo.dataVersion = version; - pTaskInfo->streamInfo.checkPointId = ckId; -} - static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId, SSHashObj* pUpdatedMap) { SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; @@ -794,7 +796,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat .groupId = groupId, }; void* chIds = taosHashGet(pInfo->pPullDataMap, &winRes, sizeof(SWinKey)); - if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->statestore) && isClosed && + if (isDeletedStreamWindow(&nextWin, groupId, pInfo->pState, &pInfo->twAggSup, &pInfo->stateStore) && isClosed && !chIds) { SPullWindowInfo pull = { .window = nextWin, .groupId = groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; @@ -826,7 +828,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat } int32_t code = setIntervalOutputBuf(pInfo->pState, &nextWin, &pResPos, groupId, pSup->pCtx, numOfOutput, - pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->statestore); + pSup->rowEntryInfoOffset, &pInfo->aggSup, &pInfo->stateStore); pResult = (SResultRow*)pResPos->pRowBuff; if (code != TSDB_CODE_SUCCESS || pResult == NULL) { T_LONG_JMP(pTaskInfo->env, TSDB_CODE_OUT_OF_MEMORY); @@ -914,6 +916,214 @@ static void resetUnCloseWinInfo(SSHashObj* winMap) { } } +int32_t encodeSWinKey(void** buf, SWinKey* key) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, key->ts); + tlen += taosEncodeFixedU64(buf, key->groupId); + return tlen; +} + +void* decodeSWinKey(void* buf, SWinKey* key) { + buf = taosDecodeFixedI64(buf, &key->ts); + buf = taosDecodeFixedU64(buf, &key->groupId); + return buf; +} + +int32_t encodeSRowBuffPos(void** buf, SRowBuffPos* pos) { + int32_t tlen = 0; + tlen += encodeSWinKey(buf, pos->pKey); + return tlen; +} + +void* decodeSRowBuffPos(void* buf, SRowBuffPos* pos) { + buf = decodeSWinKey(buf, pos->pKey); + return buf; +} + +int32_t encodeSTimeWindowAggSupp(void** buf, STimeWindowAggSupp* pTwAggSup) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pTwAggSup->minTs); + tlen += taosEncodeFixedI64(buf, pTwAggSup->maxTs); + return tlen; +} + +void* decodeSTimeWindowAggSupp(void* buf, STimeWindowAggSupp* pTwAggSup) { + buf = taosDecodeFixedI64(buf, &pTwAggSup->minTs); + buf = taosDecodeFixedI64(buf, &pTwAggSup->maxTs); + return buf; +} + +int32_t encodeSTimeWindow(void** buf, STimeWindow* pWin) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pWin->skey); + tlen += taosEncodeFixedI64(buf, pWin->ekey); + return tlen; +} + +void* decodeSTimeWindow(void* buf, STimeWindow* pWin) { + buf = taosDecodeFixedI64(buf, &pWin->skey); + buf = taosDecodeFixedI64(buf, &pWin->ekey); + return buf; +} + +int32_t encodeSPullWindowInfo(void** buf, SPullWindowInfo* pPullInfo) { + int32_t tlen = 0; + tlen += encodeSTimeWindow(buf, &pPullInfo->calWin); + tlen += taosEncodeFixedU64(buf, pPullInfo->groupId); + tlen += encodeSTimeWindow(buf, &pPullInfo->window); + return tlen; +} + +void* decodeSPullWindowInfo(void* buf, SPullWindowInfo* pPullInfo) { + buf = decodeSTimeWindow(buf, &pPullInfo->calWin); + buf = taosDecodeFixedU64(buf, &pPullInfo->groupId); + buf = decodeSTimeWindow(buf, &pPullInfo->window); + return buf; +} + +int32_t encodeSPullWindowInfoArray(void** buf, SArray* pPullInfos) { + int32_t tlen = 0; + int32_t size = taosArrayGetSize(pPullInfos); + tlen += taosEncodeFixedI32(buf, size); + for (int32_t i = 0; i < size; i++) { + void* pItem = taosArrayGet(pPullInfos, i); + tlen += encodeSPullWindowInfo(buf, pItem); + } + return tlen; +} + +void* decodeSPullWindowInfoArray(void* buf, SArray* pPullInfos) { + int32_t size = 0; + buf = taosDecodeFixedI32(buf, &size); + for (int32_t i = 0; i < size; i++) { + SPullWindowInfo item = {0}; + buf = decodeSPullWindowInfo(buf, &item); + taosArrayPush(pPullInfos, &item); + } + return buf; +} + +int32_t doStreamIntervalEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return 0; + } + + void* pData = (buf == NULL) ? NULL : *buf; + + // 1.pResultRowHashTable + int32_t tlen = 0; + int32_t mapSize = tSimpleHashGetSize(pInfo->aggSup.pResultRowHashTable); + tlen += taosEncodeFixedI32(buf, mapSize); + void* pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) { + void* key = taosHashGetKey(pIte, &keyLen); + tlen += encodeSWinKey(buf, key); + } + + // 2.twAggSup + tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.pPullDataMap + int32_t size = taosHashGetSize(pInfo->pPullDataMap); + tlen += taosEncodeFixedI32(buf, size); + pIte = NULL; + keyLen = 0; + while ((pIte = taosHashIterate(pInfo->pPullDataMap, pIte)) != NULL) { + void* key = taosHashGetKey(pIte, &keyLen); + tlen += encodeSWinKey(buf, key); + SArray* pArray = (SArray*)pIte; + int32_t chSize = taosArrayGetSize(pArray); + tlen += taosEncodeFixedI32(buf, chSize); + for (int32_t i = 0; i < chSize; i++) { + void* pChItem = taosArrayGet(pArray, i); + tlen += taosEncodeFixedI32(buf, *(int32_t*)pChItem); + } + } + + // 4.pPullWins + tlen += encodeSPullWindowInfoArray(buf, pInfo->pPullWins); + + // 5.dataVersion + tlen += taosEncodeFixedI64(buf, pInfo->dataVersion); + + // 6.checksum + if (buf) { + uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t)); + tlen += taosEncodeFixedU32(buf, cksum); + } else { + tlen += sizeof(uint32_t); + } + + return tlen; +} + +void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return; + } + + // 6.checksum + int32_t dataLen = len - sizeof(uint32_t); + void* pCksum = POINTER_SHIFT(buf, dataLen); + if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { + ASSERT(0); // debug + qError("stream interval state is invalid"); + return; + } + + // 1.pResultRowHashTable + int32_t mapSize = 0; + buf = taosDecodeFixedI32(buf, &mapSize); + for (int32_t i = 0; i < mapSize; i++) { + SWinKey key = {0}; + buf = decodeSWinKey(buf, &key); + SRowBuffPos* pPos = NULL; + int32_t resSize = pInfo->aggSup.resultRowSize; + pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize); + tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES); + } + + // 2.twAggSup + buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.pPullDataMap + int32_t size = 0; + buf = taosDecodeFixedI32(buf, &size); + for (int32_t i = 0; i < size; i++) { + SWinKey key = {0}; + SArray* pArray = taosArrayInit(0, sizeof(int32_t)); + buf = decodeSWinKey(buf, &key); + int32_t chSize = 0; + buf = taosDecodeFixedI32(buf, &chSize); + for (int32_t i = 0; i < chSize; i++) { + int32_t chId = 0; + buf = taosDecodeFixedI32(buf, &chId); + taosArrayPush(pArray, &chId); + } + taosHashPut(pInfo->pPullDataMap, &key, sizeof(SWinKey), &pArray, POINTER_BYTES); + } + + // 4.pPullWins + buf = decodeSPullWindowInfoArray(buf, pInfo->pPullWins); + + // 5.dataVersion + buf = taosDecodeFixedI64(buf, &pInfo->dataVersion); +} + +void doStreamIntervalSaveCheckpoint(SOperatorInfo* pOperator) { + SStreamIntervalOperatorInfo* pInfo = pOperator->info; + int32_t len = doStreamIntervalEncodeOpState(NULL, 0, pOperator); + void* buf = taosMemoryCalloc(1, len); + void* pBuf = buf; + len = doStreamIntervalEncodeOpState(&pBuf, len, pOperator); + pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, + strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), buf, len); + taosMemoryFree(buf); +} static SSDataBlock* buildIntervalResult(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; @@ -966,21 +1176,18 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable); } + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); + return pInfo->pCheckpointRes; + } + setOperatorCompleted(pOperator); if (!IS_FINAL_INTERVAL_OP(pOperator)) { clearFunctionContext(&pOperator->exprSupp); // semi interval operator clear disk buffer clearStreamIntervalOperator(pInfo); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); - qDebug("stask:%s ===stream===%s clear", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType)); - } else { - if (pInfo->twAggSup.maxTs > 0 && - pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { - pAPI->stateStore.streamStateCommit(pInfo->pState); - pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); - pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; - } - qDebug("stask:%s ===stream===%s close", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType)); + qDebug("===stream===clear semi operator"); } return NULL; } else { @@ -1075,6 +1282,11 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; + } else if (pBlock->info.type == STREAM_CHECKPOINT) { + pAPI->stateStore.streamStateCommit(pInfo->pState); + doStreamIntervalSaveCheckpoint(pOperator); + copyDataBlock(pInfo->pCheckpointRes, pBlock); + continue; } else { ASSERTS(pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -1155,7 +1367,7 @@ static void streamIntervalReleaseState(SOperatorInfo* pOperator) { if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; int32_t resSize = sizeof(TSKEY); - pInfo->statestore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, + pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pInfo->twAggSup.maxTs, resSize); } SStreamIntervalOperatorInfo* pInfo = pOperator->info; @@ -1172,12 +1384,12 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; int32_t size = 0; void* pBuf = NULL; - int32_t code = pInfo->statestore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, + int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size); TSKEY ts = *(TSKEY*)pBuf; taosMemoryFree(pBuf); pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); - pInfo->statestore.streamStateReloadInfo(pInfo->pState, ts); + pInfo->stateStore.streamStateReloadInfo(pInfo->pState, ts); } SOperatorInfo* downstream = pOperator->pDownstream[0]; if (downstream->fpSet.reloadStreamStateFn) { @@ -1186,7 +1398,8 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) { } SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild) { + SExecTaskInfo* pTaskInfo, int32_t numOfChild, + SReadHandle* pHandle) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -1211,9 +1424,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, .deleteMark = getDeleteMark(pIntervalPhyNode), .deleteMarkSaved = 0, .calTriggerSaved = 0, - .checkPointTs = 0, - .checkPointInterval = - convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), }; ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; @@ -1266,12 +1476,13 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; int32_t funResSize = getMaxFunResSize(&pOperator->exprSupp, numOfCols); - pInfo->pState->pFileState = - pAPI->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, - compareTs, pInfo->pState, pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo)); + pInfo->pState->pFileState = pAPI->stateStore.streamFileStateInit( + tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId); pInfo->dataVersion = 0; - pInfo->statestore = pTaskInfo->storageAPI.stateStore; + pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; + pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pOperator->operatorType = pPhyNode->type; if (!IS_FINAL_INTERVAL_OP(pOperator) || numOfChild == 0) { @@ -1293,6 +1504,16 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } + // for stream + void* buff = NULL; + int32_t len = 0; + int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, + strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamIntervalDecodeOpState(buff, len, pOperator); + taosMemoryFree(buff); + } + return pOperator; _error: @@ -1327,11 +1548,12 @@ void destroyStreamSessionAggOperatorInfo(void* param) { colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pWinBlock); - blockDataDestroy(pInfo->pUpdateRes); tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStDeleted); taosArrayDestroy(pInfo->historyWins); + blockDataDestroy(pInfo->pCheckpointRes); + taosMemoryFreeClear(param); } @@ -1374,7 +1596,8 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; pScanInfo->pState = pAggSup->pState; if (!pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark); + pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark, + pScanInfo->igCheckUpdate); } pScanInfo->twAggSup = *pTwSup; } @@ -2003,6 +2226,137 @@ void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins) { } } +int32_t encodeSSessionKey(void** buf, SSessionKey* key) { + int32_t tlen = 0; + tlen += encodeSTimeWindow(buf, &key->win); + tlen += taosEncodeFixedU64(buf, key->groupId); + return tlen; +} + +void* decodeSSessionKey(void* buf, SSessionKey* key) { + buf = decodeSTimeWindow(buf, &key->win); + buf = taosDecodeFixedU64(buf, &key->groupId); + return buf; +} + +int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen) { + int32_t tlen = 0; + tlen += taosEncodeFixedBool(buf, key->isOutput); + tlen += encodeSSessionKey(buf, &key->sessionWin); + return tlen; +} + +void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) { + buf = taosDecodeFixedBool(buf, &key->isOutput); + key->pOutputBuf = NULL; + buf = decodeSSessionKey(buf, &key->sessionWin); + return buf; +} + +int32_t doStreamSessionEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return 0; + } + + void* pData = (buf == NULL) ? NULL : *buf; + + // 1.streamAggSup.pResultRows + int32_t tlen = 0; + int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows); + tlen += taosEncodeFixedI32(buf, mapSize); + void* pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { + void* key = taosHashGetKey(pIte, &keyLen); + tlen += encodeSSessionKey(buf, key); + tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); + } + + // 2.twAggSup + tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.pChildren + int32_t size = taosArrayGetSize(pInfo->pChildren); + tlen += taosEncodeFixedI32(buf, size); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); + tlen += doStreamSessionEncodeOpState(buf, 0, pChOp, false); + } + + // 4.dataVersion + tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); + + // 5.checksum + if (isParent) { + if (buf) { + uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t)); + tlen += taosEncodeFixedU32(buf, cksum); + } else { + tlen += sizeof(uint32_t); + } + } + + return tlen; +} + +void* doStreamSessionDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return buf; + } + + // 5.checksum + if (isParent) { + int32_t dataLen = len - sizeof(uint32_t); + void* pCksum = POINTER_SHIFT(buf, dataLen); + if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { + ASSERT(0); // debug + qError("stream interval state is invalid"); + return buf; + } + } + + // 1.streamAggSup.pResultRows + int32_t mapSize = 0; + buf = taosDecodeFixedI32(buf, &mapSize); + for (int32_t i = 0; i < mapSize; i++) { + SSessionKey key = {0}; + SResultWindowInfo winfo = {0}; + buf = decodeSSessionKey(buf, &key); + buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); + tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); + } + + // 2.twAggSup + buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.pChildren + int32_t size = 0; + buf = taosDecodeFixedI32(buf, &size); + ASSERT(size <= taosArrayGetSize(pInfo->pChildren)); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); + buf = doStreamSessionDecodeOpState(buf, 0, pChOp, false); + } + + // 4.dataVersion + buf = taosDecodeFixedI64(buf, &pInfo->dataVersion); + return buf; +} + +void doStreamSessionSaveCheckpoint(SOperatorInfo* pOperator) { + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; + int32_t len = doStreamSessionEncodeOpState(NULL, 0, pOperator, true); + void* buf = taosMemoryCalloc(1, len); + void* pBuf = buf; + len = doStreamSessionEncodeOpState(&pBuf, len, pOperator, true); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, + strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), buf, len); + taosMemoryFree(buf); +} + static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { SExprSupp* pSup = &pOperator->exprSupp; SStreamSessionAggOperatorInfo* pInfo = pOperator->info; @@ -2058,6 +2412,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; + } else if (pBlock->info.type == STREAM_CHECKPOINT) { + pAggSup->stateStore.streamStateCommit(pAggSup->pState); + doStreamSessionSaveCheckpoint(pOperator); + copyDataBlock(pInfo->pCheckpointRes, pBlock); + continue; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -2249,7 +2608,19 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh if (pHandle) { pInfo->isHistoryOp = pHandle->fillHistory; } + + pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; + // for stream + void* buff = NULL; + int32_t len = 0; + int32_t res = + pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_CHECKPOINT_NAME, + strlen(STREAM_SESSION_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamSessionDecodeOpState(buff, len, pOperator, true); + taosMemoryFree(buff); + } setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, @@ -2316,7 +2687,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { while (1) { SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); if (pBlock == NULL) { - clearSpecialDataBlock(pInfo->pUpdateRes); pOperator->status = OP_RES_TO_RETURN; break; } @@ -2336,6 +2706,10 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; + } else if (pBlock->info.type == STREAM_CHECKPOINT) { + pAggSup->stateStore.streamStateCommit(pAggSup->pState); + doStreamSessionSaveCheckpoint(pOperator); + continue; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -2387,12 +2761,11 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream pOperator->operatorType = pPhyNode->type; if (pPhyNode->type != QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { - pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); - blockDataEnsureCapacity(pInfo->pUpdateRes, 128); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL); } - setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); + setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), pPhyNode->type, false, OP_NOT_OPENED, pInfo, + pTaskInfo); if (numOfChild > 0) { pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*)); @@ -2441,6 +2814,8 @@ void destroyStreamStateOperatorInfo(void* param) { taosArrayDestroy(pInfo->historyWins); tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pSeDeleted); + blockDataDestroy(pInfo->pCheckpointRes); + taosMemoryFreeClear(param); } @@ -2648,6 +3023,109 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } } +int32_t doStreamStateEncodeOpState(void** buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return 0; + } + + void* pData = (buf == NULL) ? NULL : *buf; + + // 1.streamAggSup.pResultRows + int32_t tlen = 0; + int32_t mapSize = tSimpleHashGetSize(pInfo->streamAggSup.pResultRows); + tlen += taosEncodeFixedI32(buf, mapSize); + void* pIte = NULL; + size_t keyLen = 0; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(pInfo->streamAggSup.pResultRows, pIte, &iter)) != NULL) { + void* key = taosHashGetKey(pIte, &keyLen); + tlen += encodeSSessionKey(buf, key); + tlen += encodeSResultWindowInfo(buf, pIte, pInfo->streamAggSup.resultRowSize); + } + + // 2.twAggSup + tlen += encodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.pChildren + int32_t size = taosArrayGetSize(pInfo->pChildren); + tlen += taosEncodeFixedI32(buf, size); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); + tlen += doStreamStateEncodeOpState(buf, 0, pChOp, false); + } + + // 4.dataVersion + tlen += taosEncodeFixedI32(buf, pInfo->dataVersion); + + // 5.checksum + if (isParent) { + if (buf) { + uint32_t cksum = taosCalcChecksum(0, pData, len - sizeof(uint32_t)); + tlen += taosEncodeFixedU32(buf, cksum); + } else { + tlen += sizeof(uint32_t); + } + } + + return tlen; +} + +void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperator, bool isParent) { + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + if (!pInfo) { + return buf; + } + + // 5.checksum + if (isParent) { + int32_t dataLen = len - sizeof(uint32_t); + void* pCksum = POINTER_SHIFT(buf, dataLen); + if (taosCheckChecksum(buf, dataLen, *(uint32_t*)pCksum) != TSDB_CODE_SUCCESS) { + ASSERT(0); // debug + qError("stream interval state is invalid"); + return buf; + } + } + + // 1.streamAggSup.pResultRows + int32_t mapSize = 0; + buf = taosDecodeFixedI32(buf, &mapSize); + for (int32_t i = 0; i < mapSize; i++) { + SSessionKey key = {0}; + SResultWindowInfo winfo = {0}; + buf = decodeSSessionKey(buf, &key); + buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); + tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); + } + + // 2.twAggSup + buf = decodeSTimeWindowAggSupp(buf, &pInfo->twAggSup); + + // 3.pChildren + int32_t size = 0; + buf = taosDecodeFixedI32(buf, &size); + ASSERT(size <= taosArrayGetSize(pInfo->pChildren)); + for (int32_t i = 0; i < size; i++) { + SOperatorInfo* pChOp = taosArrayGetP(pInfo->pChildren, i); + buf = doStreamStateDecodeOpState(buf, 0, pChOp, false); + } + + // 4.dataVersion + buf = taosDecodeFixedI64(buf, &pInfo->dataVersion); + return buf; +} + +void doStreamStateSaveCheckpoint(SOperatorInfo* pOperator) { + SStreamStateAggOperatorInfo* pInfo = pOperator->info; + int32_t len = doStreamStateEncodeOpState(NULL, 0, pOperator, true); + void* buf = taosMemoryCalloc(1, len); + void* pBuf = buf; + len = doStreamStateEncodeOpState(&pBuf, len, pOperator, true); + pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, + strlen(STREAM_STATE_OP_CHECKPOINT_NAME), buf, len); +} + static SSDataBlock* buildStateResult(SOperatorInfo* pOperator) { SStreamStateAggOperatorInfo* pInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pInfo->binfo; @@ -2700,7 +3178,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { if (pBlock == NULL) { break; } - printDataBlock(pBlock, "single state recv", GET_TASKID(pTaskInfo)); + printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo)); if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { @@ -2715,6 +3193,11 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { continue; } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { return pBlock; + } else if (pBlock->info.type == STREAM_CHECKPOINT) { + pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); + doStreamSessionSaveCheckpoint(pOperator); + copyDataBlock(pInfo->pCheckpointRes, pBlock); + continue; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -2926,6 +3409,19 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->isHistoryOp = pHandle->fillHistory; } + pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + + // for stream + void* buff = NULL; + int32_t len = 0; + int32_t res = + pInfo->streamAggSup.stateStore.streamStateGetInfo(pInfo->streamAggSup.pState, STREAM_STATE_OP_CHECKPOINT_NAME, + strlen(STREAM_STATE_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamStateDecodeOpState(buff, len, pOperator, true); + taosMemoryFree(buff); + } + setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, @@ -2984,14 +3480,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { resetUnCloseWinInfo(pInfo->aggSup.pResultRowHashTable); } - setOperatorCompleted(pOperator); - if (pInfo->twAggSup.maxTs > 0 && - pInfo->twAggSup.maxTs - pInfo->twAggSup.checkPointInterval > pInfo->twAggSup.checkPointTs) { - pAPI->stateStore.streamStateCommit(pInfo->pState); - pAPI->stateStore.streamStateDeleteCheckPoint(pInfo->pState, pInfo->twAggSup.maxTs - pInfo->twAggSup.deleteMark); - setStreamDataVersion(pTaskInfo, pInfo->dataVersion, pInfo->pState->checkPointId); - pInfo->twAggSup.checkPointTs = pInfo->twAggSup.maxTs; + if (pInfo->reCkBlock) { + pInfo->reCkBlock = false; + // printDataBlock(pInfo->pCheckpointRes, "single interval ck"); + return pInfo->pCheckpointRes; } + + setOperatorCompleted(pOperator); return NULL; } @@ -3030,6 +3525,12 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } else if (pBlock->info.type == STREAM_CREATE_CHILD_TABLE) { printDataBlock(pBlock, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); return pBlock; + } else if (pBlock->info.type == STREAM_CHECKPOINT) { + pAPI->stateStore.streamStateCommit(pInfo->pState); + doStreamIntervalSaveCheckpoint(pOperator); + pInfo->reCkBlock = true; + copyDataBlock(pInfo->pCheckpointRes, pBlock); + continue; } else { ASSERTS(pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_INVALID, "invalid SSDataBlock type"); } @@ -3078,7 +3579,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo) { + SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) { SStreamIntervalOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamIntervalOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -3100,16 +3601,11 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, }; - pInfo->twAggSup = (STimeWindowAggSupp){ - .waterMark = pIntervalPhyNode->window.watermark, - .calTrigger = pIntervalPhyNode->window.triggerType, - .maxTs = INT64_MIN, - .minTs = INT64_MAX, - .deleteMark = getDeleteMark(pIntervalPhyNode), - .checkPointTs = 0, - .checkPointInterval = - convertTimePrecision(tsCheckpointInterval, TSDB_TIME_PRECISION_MILLI, pInfo->interval.precision), - }; + pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark, + .calTrigger = pIntervalPhyNode->window.triggerType, + .maxTs = INT64_MIN, + .minTs = INT64_MAX, + .deleteMark = getDeleteMark(pIntervalPhyNode)}; ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); @@ -3168,7 +3664,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pState->pFileState = pTaskInfo->storageAPI.stateStore.streamFileStateInit( tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, - pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo)); + pInfo->twAggSup.deleteMark, GET_TASKID(pTaskInfo), pHandle->checkpointId); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -3176,8 +3672,19 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL); setOperatorStreamStateFn(pOperator, streamIntervalReleaseState, streamIntervalReloadState); - pInfo->statestore = pTaskInfo->storageAPI.stateStore; + pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; + pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); + + // for stream + void* buff = NULL; + int32_t len = 0; + int32_t res = pAPI->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_CHECKPOINT_NAME, + strlen(STREAM_INTERVAL_OP_CHECKPOINT_NAME), &buff, &len); + if (res == TSDB_CODE_SUCCESS) { + doStreamIntervalDecodeOpState(buff, len, pOperator); + taosMemoryFree(buff); + } initIntervalDownStream(downstream, pPhyNode->type, pInfo); code = appendDownstream(pOperator, &downstream, 1);