Merge pull request #24824 from taosdata/fix/TD-28739
delete invalid result
This commit is contained in:
commit
d59399da5b
|
@ -115,8 +115,16 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs, int32_t start, int32_t rows, int32_t maxRows,
|
static void removeCountResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessionKey* pKey) {
|
||||||
SSHashObj* pStDeleted, bool* pRebuild) {
|
SSessionKey key = {0};
|
||||||
|
getSessionHashKey(pKey, &key);
|
||||||
|
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
|
||||||
|
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowInfo* pWinInfo, TSKEY* pTs,
|
||||||
|
int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated,
|
||||||
|
SSHashObj* pStDeleted, bool* pRebuild) {
|
||||||
SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
|
SSessionKey sWinKey = pWinInfo->winInfo.sessionWin;
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
for (int32_t i = start; i < rows; i++) {
|
for (int32_t i = start; i < rows; i++) {
|
||||||
|
@ -148,6 +156,7 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI
|
||||||
|
|
||||||
if (needDelState) {
|
if (needDelState) {
|
||||||
memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
|
memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey));
|
||||||
|
removeCountResult(pStUpdated, pAggSup->pResultRows, &sWinKey);
|
||||||
if (pWinInfo->winInfo.pStatePos->needFree) {
|
if (pWinInfo->winInfo.pStatePos->needFree) {
|
||||||
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
|
pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &sWinKey);
|
||||||
}
|
}
|
||||||
|
@ -242,7 +251,8 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
|
||||||
setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
|
setSessionWinOutputInfo(pStUpdated, &curWin.winInfo);
|
||||||
slidingRows = *curWin.pWindowCount;
|
slidingRows = *curWin.pWindowCount;
|
||||||
if (!buffInfo.rebuildWindow) {
|
if (!buffInfo.rebuildWindow) {
|
||||||
winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStDeleted, &buffInfo.rebuildWindow);
|
winRows = updateCountWindowInfo(pAggSup, &curWin, startTsCols, i, rows, pAggSup->windowCount, pStUpdated,
|
||||||
|
pStDeleted, &buffInfo.rebuildWindow);
|
||||||
}
|
}
|
||||||
if (buffInfo.rebuildWindow) {
|
if (buffInfo.rebuildWindow) {
|
||||||
SSessionKey range = {0};
|
SSessionKey range = {0};
|
||||||
|
|
|
@ -736,6 +736,7 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
||||||
void* pRockVal = NULL;
|
void* pRockVal = NULL;
|
||||||
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId);
|
SStreamStateCur* pCur = streamStateSessionSeekToLast_rocksdb(pFileStore, pKey->groupId);
|
||||||
code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen);
|
code = streamStateSessionGetKVByCur_rocksdb(pCur, pWinKey, &pRockVal, pVLen);
|
||||||
|
streamStateFreeCur(pCur);
|
||||||
if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
|
if (code == TSDB_CODE_SUCCESS || isFlushedState(pFileState, endTs, 0)) {
|
||||||
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code);
|
qDebug("===stream===0 get state win:%" PRId64 ",%" PRId64 " from disc, res %d", pWinKey->win.skey, pWinKey->win.ekey, code);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -743,7 +744,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
||||||
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) );
|
COUNT_TYPE* pWinStateCout = (COUNT_TYPE*)( (char*)(pRockVal) + (valSize - sizeof(COUNT_TYPE)) );
|
||||||
if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
|
if (inSessionWindow(pWinKey, startTs, gap) || (*pWinStateCout) < winCount) {
|
||||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
|
(*pVal) = createSessionWinBuff(pFileState, pWinKey, pRockVal, pVLen);
|
||||||
streamStateFreeCur(pCur);
|
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -751,7 +751,6 @@ int32_t getCountWinResultBuff(SStreamFileState* pFileState, SSessionKey* pKey, C
|
||||||
pWinKey->win.ekey = endTs;
|
pWinKey->win.ekey = endTs;
|
||||||
(*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
|
(*pVal) = createSessionWinBuff(pFileState, pWinKey, NULL, NULL);
|
||||||
taosMemoryFree(pRockVal);
|
taosMemoryFree(pRockVal);
|
||||||
streamStateFreeCur(pCur);
|
|
||||||
} else {
|
} else {
|
||||||
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey);
|
(*pVal) = addNewSessionWindow(pFileState, pWinStates, pWinKey);
|
||||||
code = TSDB_CODE_FAILED;
|
code = TSDB_CODE_FAILED;
|
||||||
|
|
Loading…
Reference in New Issue