From db50c9230add75c663bcf1c88e546faca7b994b6 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao@163.com> Date: Tue, 24 Sep 2024 16:58:46 +0800 Subject: [PATCH] fix issue --- include/libs/stream/tstreamFileState.h | 1 + source/libs/executor/src/scanoperator.c | 2 +- .../executor/src/streamtimesliceoperator.c | 35 ++++++------------- source/libs/stream/src/streamBackendRocksdb.c | 1 - source/libs/stream/src/streamState.c | 8 ++++- source/libs/stream/src/tstreamFileState.c | 6 ++++ 6 files changed, 25 insertions(+), 28 deletions(-) diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index 1497fb4afa..83463e2adb 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -145,6 +145,7 @@ void deleteHashSortRowBuff(SStreamFileState* pFileState, const SWinKey* pKey) int32_t streamFileStateGroupPut(SStreamFileState* pFileState, int64_t groupId, void* value, int32_t vLen); void streamFileStateGroupCurNext(SStreamStateCur* pCur); int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen); +SSHashObj* getGroupIdCache(SStreamFileState* pFileState); #ifdef __cplusplus } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5f6144634f..502d88b26d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -3632,7 +3632,7 @@ FETCH_NEXT_BLOCK: pInfo->updateResIndex = 0; code = copyGetResultBlock(pInfo->pUpdateRes, pBlock); QUERY_CHECK_CODE(code, lino, _end); - pInfo->pUpdateInfo->maxDataVersion = pBlock->info.version; + pInfo->pUpdateInfo->maxDataVersion = -1; prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex, NULL); pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE; } break; diff --git a/source/libs/executor/src/streamtimesliceoperator.c b/source/libs/executor/src/streamtimesliceoperator.c index d6cc178a7f..1c19894dff 100644 --- a/source/libs/executor/src/streamtimesliceoperator.c +++ b/source/libs/executor/src/streamtimesliceoperator.c @@ -942,14 +942,6 @@ static void copyCalcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFi } static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { - TSKEY prevWKey = INT64_MIN; - TSKEY nextWKey = INT64_MIN; - if (hasPrevWindow(pFillSup)) { - prevWKey = pFillSup->prev.key; - } - if (hasNextWindow(pFillSup)) { - nextWKey = pFillSup->next.key; - } TSKEY endTs = adustEndTsKey(ts, pFillSup->cur.key, &pFillSup->interval); TSKEY startTs = adustPrevTsKey(ts, pFillSup->cur.key, &pFillSup->interval); @@ -960,39 +952,32 @@ static void setForceWindowCloseFillRule(SStreamFillSupporter* pFillSup, SStreamF case TSDB_FILL_NULL_F: case TSDB_FILL_SET_VALUE: case TSDB_FILL_SET_VALUE_F: { - if (ts != pFillSup->cur.key) { + if (ts == pFillSup->cur.key) { + pFillInfo->pos = FILL_POS_START; + pFillInfo->needFill = false; + } else { pFillInfo->pos = FILL_POS_INVALID; setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo); - } else { - pFillInfo->needFill = false; - pFillInfo->pos = FILL_POS_START; - goto _end; + copyNonFillValueInfo(pFillSup, pFillInfo); } - copyNonFillValueInfo(pFillSup, pFillInfo); } break; case TSDB_FILL_PREV: { - if (ts != pFillSup->cur.key) { - pFillInfo->pos = FILL_POS_INVALID; - setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo); + if (ts == pFillSup->cur.key) { + pFillInfo->pos = FILL_POS_START; + pFillInfo->needFill = false; } else if (hasPrevWindow(pFillSup)) { pFillInfo->pos = FILL_POS_INVALID; setFillKeyInfo(ts, ts + 1, &pFillSup->interval, pFillInfo); + pFillInfo->pResRow = &pFillSup->prev; } else { pFillInfo->needFill = false; - pFillInfo->pos = FILL_POS_START; - goto _end; + pFillInfo->pos = FILL_POS_INVALID; } - pFillInfo->pResRow = &pFillSup->prev; } break; default: qError("%s failed at line %d since invalid fill type", __func__, __LINE__); break; } - -_end: - if (ts != pFillSup->cur.key) { - pFillInfo->pos = FILL_POS_INVALID; - } } static void setTimeSliceFillRule(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, TSKEY ts) { diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index dd72ba54e4..278b504e09 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -4306,7 +4306,6 @@ void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t gr char buf[128] = {0}; int32_t klen = ginitDict[i].enFunc((void*)&groupId, buf); if (!streamStateIterSeekAndValid(pCur->iter, buf, klen)) { - streamStateFreeCur(pCur); return ; } // skip ttl expired data diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index fd625958c4..484877e37b 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -561,6 +561,12 @@ SStreamStateCur* streamStateGroupGetCur(SStreamState* pState) { SStreamStateCur* pCur = createStateCursor(pState->pFileState); pCur->hashIter = 0; pCur->pHashData = NULL; + SSHashObj* pMap = getGroupIdCache(pState->pFileState); + pCur->pHashData = tSimpleHashIterate(pMap, pCur->pHashData, &pCur->hashIter); + if (pCur->pHashData == NULL) { + pCur->hashIter = -1; + streamStateParTagSeekKeyNext_rocksdb(pState, INT64_MIN, pCur); + } return pCur; } @@ -569,7 +575,7 @@ void streamStateGroupCurNext(SStreamStateCur* pCur) { } int32_t streamStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, void** pVal, int32_t* pVLen) { - if (pVal == NULL) { + if (pVal != NULL) { return -1; } return streamFileStateGroupGetKVByCur(pCur, pKey, pVal, pVLen); diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index 855a42a5f8..a88ab661e6 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -1144,6 +1144,7 @@ void streamFileStateGroupCurNext(SStreamStateCur* pCur) { SStreamFileState* pFileState = (SStreamFileState*)pCur->pStreamFileState; if (pCur->hashIter == -1) { streamStateCurNext(pFileState->pFileStore, pCur); + return; } SSHashObj* pHash = pFileState->pGroupIdMap; @@ -1151,6 +1152,7 @@ void streamFileStateGroupCurNext(SStreamStateCur* pCur) { if (!pCur->pHashData) { pCur->hashIter = -1; streamStateParTagSeekKeyNext_rocksdb(pFileState->pFileStore, pCur->minGpId, pCur); + return; } int64_t gpId = *(int64_t*)tSimpleHashGetKey(pCur->pHashData, NULL); pCur->minGpId = TMIN(pCur->minGpId, gpId); @@ -1164,3 +1166,7 @@ int32_t streamFileStateGroupGetKVByCur(SStreamStateCur* pCur, int64_t* pKey, voi } return streamStateParTagGetKVByCur_rocksdb(pCur, pKey, NULL, NULL); } + +SSHashObj* getGroupIdCache(SStreamFileState* pFileState) { + return pFileState->pGroupIdMap; +}