fix issue

This commit is contained in:
liuyao 2023-10-07 19:11:46 +08:00
parent 19bbf10579
commit 1e323c0408
3 changed files with 8 additions and 4 deletions

View File

@ -1599,6 +1599,11 @@ void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uin
pScanInfo->twAggSup = *pTwSup; pScanInfo->twAggSup = *pTwSup;
} }
static TSKEY sesionTs(void* pKey) {
SSessionKey* pWinKey = (SSessionKey*)pKey;
return pWinKey->win.skey;
}
int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap, int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, int32_t numOfOutput, int64_t gap,
SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore,
SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr,
@ -1621,7 +1626,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, in
pSup->stateStore.streamStateSetNumber(pSup->pState, -1); pSup->stateStore.streamStateSetNumber(pSup->pState, -1);
int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput); int32_t funResSize = getMaxFunResSize(pExpSup, numOfOutput);
pSup->pState->pFileState = pSup->stateStore.streamFileStateInit( pSup->pState->pFileState = pSup->stateStore.streamFileStateInit(
tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, compareTs, pSup->pState, tsStreamBufferSize, sizeof(SSessionKey), pSup->resultRowSize, funResSize, sesionTs, pSup->pState,
pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT); pTwAggSup->deleteMark, taskIdStr, pHandle->checkpointId, STREAM_STATE_BUFF_SORT);
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);

View File

@ -264,7 +264,6 @@ int32_t deleteSessionWinStateBuffByPosFn(SStreamFileState* pFileState, SRowBuffP
if (index >= 0) { if (index >= 0) {
SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index); SRowBuffPos* pItemPos = taosArrayGetP(pWinStates, index);
if (pItemPos == pPos) { if (pItemPos == pPos) {
pItemPos->beFlushed = true;
taosArrayRemove(pWinStates, index); taosArrayRemove(pWinStates, index);
} }
} }

View File

@ -67,7 +67,7 @@ int32_t stateHashBuffRemoveFn(void* pBuff, const void *pKey, size_t keyLen) {
} }
int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) { int32_t stateHashBuffRemoveByPosFn(SStreamFileState* pFileState, SRowBuffPos* pPos) {
size_t keyLen = pFileState->rowSize; size_t keyLen = pFileState->keyLen;
SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen); SRowBuffPos** ppPos = tSimpleHashGet(pFileState->rowStateBuff, pPos->pKey, keyLen);
if (ppPos) { if (ppPos) {
if ((*ppPos) == pPos) { if ((*ppPos) == pPos) {
@ -539,7 +539,7 @@ int32_t flushSnapshot(SStreamFileState* pFileState, SStreamSnapshot* pSnapshot,
} }
pPos->beFlushed = true; pPos->beFlushed = true;
qDebug("===stream===flushed start:%" PRId64 ", end:%" PRId64 , ((SSessionKey*)pPos->pKey)->win.skey, ((SSessionKey*)pPos->pKey)->win.ekey); qDebug("===stream===flushed start:%" PRId64, pFileState->getTs(pPos->pKey));
if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) { if (streamStateGetBatchSize(batch) >= BATCH_LIMIT) {
streamStatePutBatch_rocksdb(pFileState->pFileStore, batch); streamStatePutBatch_rocksdb(pFileState->pFileStore, batch);
streamStateClearBatch(batch); streamStateClearBatch(batch);