Merge pull request #22531 from taosdata/fix/TD-25893
reload semi session state
This commit is contained in:
commit
04a13d10fe
|
@ -3240,6 +3240,31 @@ static int32_t compactSessionWindow(SOperatorInfo* pOperator, SResultWindowInfo*
|
||||||
return winNum;
|
return winNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void compactSessionSemiWindow(SOperatorInfo* pOperator, SResultWindowInfo* pCurWin) {
|
||||||
|
SExprSupp* pSup = &pOperator->exprSupp;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
|
||||||
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SResultRow* pCurResult = NULL;
|
||||||
|
int32_t numOfOutput = pOperator->exprSupp.numOfExprs;
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
// Just look for the window behind StartIndex
|
||||||
|
while (1) {
|
||||||
|
SResultWindowInfo winInfo = {0};
|
||||||
|
SStreamStateCur* pCur = getNextSessionWinInfo(pAggSup, NULL, pCurWin, &winInfo);
|
||||||
|
if (!IS_VALID_SESSION_WIN(winInfo) || !isInWindow(pCurWin, winInfo.sessionWin.win.skey, pAggSup->gap) ||
|
||||||
|
!inWinRange(&pAggSup->winRange, &winInfo.sessionWin.win)) {
|
||||||
|
taosMemoryFree(winInfo.pOutputBuf);
|
||||||
|
pAPI->stateStore.streamStateFreeCur(pCur);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
pCurWin->sessionWin.win.ekey = TMAX(pCurWin->sessionWin.win.ekey, winInfo.sessionWin.win.ekey);
|
||||||
|
doDeleteSessionWindow(pAggSup, &winInfo.sessionWin);
|
||||||
|
pAPI->stateStore.streamStateFreeCur(pCur);
|
||||||
|
taosMemoryFree(winInfo.pOutputBuf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
|
int32_t saveSessionOutputBuf(SStreamAggSupporter* pAggSup, SResultWindowInfo* pWinInfo) {
|
||||||
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore);
|
saveSessionDiscBuf(pAggSup->pState, &pWinInfo->sessionWin, pWinInfo->pOutputBuf, pAggSup->resultRowSize, &pAggSup->stateStore);
|
||||||
pWinInfo->pOutputBuf = NULL;
|
pWinInfo->pOutputBuf = NULL;
|
||||||
|
@ -3446,6 +3471,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
|
||||||
int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin);
|
int32_t code = getSessionWinBuf(pChAggSup, pCur, &childWin);
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) {
|
if (code == TSDB_CODE_SUCCESS && !inWinRange(&pAggSup->winRange, &childWin.sessionWin.win)) {
|
||||||
|
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3454,6 +3480,7 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
|
||||||
setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin);
|
setSessionOutputBuf(pAggSup, pWinKey->win.skey, pWinKey->win.ekey, pWinKey->groupId, &parentWin);
|
||||||
code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
code = initSessionOutputBuf(&parentWin, &pResult, pSup->pCtx, numOfOutput, pSup->rowEntryInfoOffset);
|
||||||
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
||||||
|
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3464,7 +3491,9 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
|
||||||
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo, &pInfo->twAggSup.timeWindowData);
|
||||||
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true);
|
compactSessionWindow(pOperator, &parentWin, pStUpdated, NULL, true);
|
||||||
saveResult(parentWin, pStUpdated);
|
saveResult(parentWin, pStUpdated);
|
||||||
|
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
|
||||||
} else {
|
} else {
|
||||||
|
releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)childWin.pOutputBuf, &pAggSup->stateStore);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -3703,11 +3732,11 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamSessionReleaseState(SOperatorInfo* pOperator) {
|
void streamSessionReleaseState(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION) {
|
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
int32_t resSize = taosArrayGetSize(pInfo->historyWins) * sizeof(SSessionKey);
|
||||||
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME, strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData, resSize);
|
pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_SESSION_OP_STATE_NAME,
|
||||||
}
|
strlen(STREAM_SESSION_OP_STATE_NAME), pInfo->historyWins->pData,
|
||||||
|
resSize);
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->fpSet.releaseStreamStateFn) {
|
if (downstream->fpSet.releaseStreamStateFn) {
|
||||||
downstream->fpSet.releaseStreamStateFn(downstream);
|
downstream->fpSet.releaseStreamStateFn(downstream);
|
||||||
|
@ -3719,6 +3748,33 @@ void resetWinRange(STimeWindow* winRange) {
|
||||||
winRange->ekey = INT64_MAX;
|
winRange->ekey = INT64_MAX;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamSessionSemiReloadState(SOperatorInfo* pOperator) {
|
||||||
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
resetWinRange(&pAggSup->winRange);
|
||||||
|
|
||||||
|
SResultWindowInfo winInfo = {0};
|
||||||
|
int32_t size = 0;
|
||||||
|
void* pBuf = NULL;
|
||||||
|
int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME,
|
||||||
|
strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size);
|
||||||
|
int32_t num = size / sizeof(SSessionKey);
|
||||||
|
SSessionKey* pSeKeyBuf = (SSessionKey*) pBuf;
|
||||||
|
ASSERT(size == num * sizeof(SSessionKey));
|
||||||
|
for (int32_t i = 0; i < num; i++) {
|
||||||
|
SResultWindowInfo winInfo = {0};
|
||||||
|
setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo);
|
||||||
|
compactSessionSemiWindow(pOperator, &winInfo);
|
||||||
|
saveSessionOutputBuf(pAggSup, &winInfo);
|
||||||
|
}
|
||||||
|
taosMemoryFree(pBuf);
|
||||||
|
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
if (downstream->fpSet.reloadStreamStateFn) {
|
||||||
|
downstream->fpSet.reloadStreamStateFn(downstream);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void streamSessionReloadState(SOperatorInfo* pOperator) {
|
void streamSessionReloadState(SOperatorInfo* pOperator) {
|
||||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||||
|
@ -3948,6 +4004,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
|
removeSessionResults(pInfo->pStDeleted, pInfo->pUpdated);
|
||||||
tSimpleHashCleanup(pInfo->pStUpdated);
|
tSimpleHashCleanup(pInfo->pStUpdated);
|
||||||
pInfo->pStUpdated = NULL;
|
pInfo->pStUpdated = NULL;
|
||||||
|
|
||||||
|
if(pInfo->isHistoryOp) {
|
||||||
|
getMaxTsWins(pInfo->pUpdated, pInfo->historyWins);
|
||||||
|
}
|
||||||
|
|
||||||
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||||
pInfo->pUpdated = NULL;
|
pInfo->pUpdated = NULL;
|
||||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
@ -3996,8 +4057,8 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
||||||
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
blockDataEnsureCapacity(pInfo->pUpdateRes, 128);
|
||||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
|
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL,
|
||||||
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
|
destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionSemiReloadState);
|
||||||
}
|
}
|
||||||
setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState);
|
|
||||||
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
|
|
||||||
pOperator->operatorType = pPhyNode->type;
|
pOperator->operatorType = pPhyNode->type;
|
||||||
|
|
Loading…
Reference in New Issue