Merge pull request #26015 from taosdata/fix/TD-3036

adj stream count state buff
This commit is contained in:
Haojun Liao 2024-06-03 19:03:24 +08:00 committed by GitHub
commit 0c8e9046f6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 43 additions and 24 deletions

View File

@ -677,6 +677,14 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
goto _error; goto _error;
} }
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pCountNode->window.watermark,
.calTrigger = pCountNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
.deleteMark = getDeleteMark(&pCountNode->window, 0),
};
pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId;
code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0,
pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle,
@ -687,13 +695,6 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys
pInfo->streamAggSup.windowCount = pCountNode->windowCount; pInfo->streamAggSup.windowCount = pCountNode->windowCount;
pInfo->streamAggSup.windowSliding = pCountNode->windowSliding; pInfo->streamAggSup.windowSliding = pCountNode->windowSliding;
pInfo->twAggSup = (STimeWindowAggSupp){
.waterMark = pCountNode->window.watermark,
.calTrigger = pCountNode->window.triggerType,
.maxTs = INT64_MIN,
.minTs = INT64_MAX,
};
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
pInfo->binfo.pRes = pResBlock; pInfo->binfo.pRes = pResBlock;

View File

@ -734,6 +734,21 @@ _end:
return code; return code;
} }
int32_t getCountWinStateFromDisc(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen) {
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext_rocksdb(pState, pKey);
int32_t code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
streamStateFreeCur(pCur);
if (code == TSDB_CODE_SUCCESS) {
return code;
} else {
pCur = streamStateSessionSeekKeyPrev_rocksdb(pState, pKey);
}
code = streamStateSessionGetKVByCur_rocksdb(pCur, pKey, pVal, pVLen);
streamStateFreeCur(pCur);
return code;
}
int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) { int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, COUNT_TYPE winCount, void** pVal, int32_t* pVLen) {
SSessionKey* pWinKey = pKey; SSessionKey* pWinKey = pKey;
const TSKEY gap = 0; const TSKEY gap = 0;
@ -755,14 +770,13 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
if (size == 0) { if (size == 0) {
void* pFileStore = getStateFileStore(pFileState); void* pFileStore = getStateFileStore(pFileState);
void* pRockVal = NULL; void* pRockVal = NULL;
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); code = getCountWinStateFromDisc(pFileStore, pWinKey, &pRockVal, pVLen);
code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen);
streamStateFreeCur(pCur);
if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) { if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code); qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
pWinKey->win.ekey, code);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
int32_t valSize = *pVLen; int32_t valSize = *pVLen;
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) ); COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)((char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)));
if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) { if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen); (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
goto _end; goto _end;
@ -798,20 +812,24 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
} }
if (index == -1) { if (index == -1) {
if (!isDeteled(pFileState, endTs)) { if (!isDeteled(pFileState, endTs) && isFlushedState(pFileState, endTs, 0)) {
void* p = NULL; SSessionKey tmpKey = *pWinKey;
void* pFileStore = getStateFileStore(pFileState); void* pRockVal = NULL;
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId); void* pFileStore = getStateFileStore(pFileState);
int32_t code_file = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &p, pVLen); int32_t code_file = getCountWinStateFromDisc(pFileStore, &tmpKey, &pRockVal, pVLen);
if (code_file == TSDB_CODE_SUCCESS) { if (code_file == TSDB_CODE_SUCCESS) {
(*pVal) = createSessionWinBuff(pFileState, pWinKey, p, pVLen); SRowBuffPos* pFirstPos = taosArrayGetP(pWinStates, 0);
code = code_file; SSessionKey* pFirstWinKey = (SSessionKey*)pFirstPos->pKey;
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code_file); if (tmpKey.win.ekey < pFirstWinKey->win.skey) {
streamStateFreeCur(pCur); *pWinKey = tmpKey;
goto _end; (*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
code = code_file;
qDebug("===stream===1 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey,
pWinKey->win.ekey, code_file);
goto _end;
}
} }
taosMemoryFree(p); taosMemoryFree(pRockVal);
streamStateFreeCur(pCur);
} }
} }