Merge remote-tracking branch 'origin/enh/triggerCheckPoint2' into enh/triggerCheckPoint2

# Conflicts:
#	source/common/src/tglobal.c
#	source/libs/executor/src/timewindowoperator.c
This commit is contained in:
Haojun Liao 2023-07-24 15:25:43 +08:00
commit 01f11bf5a6
3 changed files with 33 additions and 32 deletions

View File

@ -538,8 +538,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "udfdResFuncs", tsUdfdResFuncs, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddString(pCfg, "udfdLdLibPath", tsUdfdLdLibPath, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddBool(pCfg, "disableStream", tsDisableStream, 0) != 0) return -1; if (cfgAddBool(pCfg, "disableStream", tsDisableStream, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, 0) != 0) return -1; if (cfgAddInt64(pCfg, "streamBufferSize", tsStreamBufferSize, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt64(pCfg, "checkpointInterval", tsCheckpointInterval, 0, INT64_MAX, CFG_SCOPE_SERVER) != 0) return -1;
if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1; if (cfgAddInt32(pCfg, "cacheLazyLoadThreshold", tsCacheLazyLoadThreshold, 0, 100000, CFG_SCOPE_SERVER) != 0) return -1;

View File

@ -239,30 +239,31 @@ int32_t vnodeSnapRead(SVSnapReader *pReader, uint8_t **ppData, uint32_t *nData)
} }
} }
} }
if (!pReader->streamStateDone) { // if (!pReader->streamStateDone) {
if (pReader->pStreamStateReader == NULL) { // if (pReader->pStreamStateReader == NULL) {
code = // code =
streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver, &pReader->pStreamStateReader); // streamStateSnapReaderOpen(pReader->pVnode->pTq, pReader->sver, pReader->sver,
if (code) { // &pReader->pStreamStateReader);
pReader->streamStateDone = 1; // if (code) {
pReader->pStreamStateReader = NULL; // pReader->streamStateDone = 1;
goto _err; // pReader->pStreamStateReader = NULL;
} // goto _err;
} // }
code = streamStateSnapRead(pReader->pStreamStateReader, ppData); // }
if (code) { // code = streamStateSnapRead(pReader->pStreamStateReader, ppData);
goto _err; // if (code) {
} else { // goto _err;
if (*ppData) { // } else {
goto _exit; // if (*ppData) {
} else { // goto _exit;
pReader->streamStateDone = 1; // } else {
code = streamStateSnapReaderClose(pReader->pStreamStateReader); // pReader->streamStateDone = 1;
if (code) goto _err; // code = streamStateSnapReaderClose(pReader->pStreamStateReader);
pReader->pStreamStateReader = NULL; // if (code) goto _err;
} // pReader->pStreamStateReader = NULL;
} // }
} // }
// }
// RSMA ============== // RSMA ==============
if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) { if (VND_IS_RSMA(pReader->pVnode) && !pReader->rsmaDone) {
@ -430,7 +431,7 @@ _exit:
} }
static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { static int32_t vnodeSnapWriteInfo(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
int32_t code = 0; int32_t code = 0;
SVnode *pVnode = pWriter->pVnode; SVnode *pVnode = pWriter->pVnode;
SSnapDataHdr *pHdr = (SSnapDataHdr *)pData; SSnapDataHdr *pHdr = (SSnapDataHdr *)pData;

View File

@ -3144,7 +3144,6 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
colDataDestroy(&pInfo->twAggSup.timeWindowData); colDataDestroy(&pInfo->twAggSup.timeWindowData);
blockDataDestroy(pInfo->pDelRes); blockDataDestroy(pInfo->pDelRes);
blockDataDestroy(pInfo->pWinBlock); blockDataDestroy(pInfo->pWinBlock);
blockDataDestroy(pInfo->pUpdateRes);
tSimpleHashCleanup(pInfo->pStUpdated); tSimpleHashCleanup(pInfo->pStUpdated);
tSimpleHashCleanup(pInfo->pStDeleted); tSimpleHashCleanup(pInfo->pStDeleted);
@ -3431,7 +3430,7 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
SSHashObj* pStDeleted, bool addGap) { SSHashObj* pStDeleted, bool addGap) {
SExprSupp* pSup = &pOperator->exprSupp; SExprSupp* pSup = &pOperator->exprSupp;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
SStreamSessionAggOperatorInfo* pInfo = pOperator->info; SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
SResultRow* pCurResult = NULL; SResultRow* pCurResult = NULL;
@ -3469,7 +3468,8 @@ static void compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo* pC
} }
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) { int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore); saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize,
&pAggSup->stateStore);
pWinInfo->pOutputBuf = NULL; pWinInfo->pOutputBuf = NULL;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -4883,8 +4883,8 @@ static void compactStateWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCur
} }
void streamStateReloadState(SOperatorInfo* pOperator) { void streamStateReloadState(SOperatorInfo* pOperator) {
SStreamStateAggOperatorInfo* pInfo = pOperator->info; SStreamStateAggOperatorInfo* pInfo = pOperator->info;
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
resetWinRange(&pAggSup->winRange); resetWinRange(&pAggSup->winRange);
SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0}; SSessionKey seKey = {.win.skey = INT64_MIN, .win.ekey = INT64_MIN, .groupId = 0};