diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index 6e4a8d62d0..a4728e6382 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -34,11 +34,16 @@ typedef struct SUpdateInfo { TSKEY minTS; SScalableBf* pCloseWinSBF; SHashObj* pMap; + STimeWindow scanWindow; + uint64_t scanGroupId; + uint64_t maxVersion; } SUpdateInfo; SUpdateInfo *updateInfoInitP(SInterval* pInterval, int64_t watermark); SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark); bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts); +void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version); +bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version); void updateInfoDestroy(SUpdateInfo *pInfo); void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index f8e64a3409..7cfc1c0b1d 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1231,9 +1231,7 @@ int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src) { colDataAssign(pDst, pSrc, src->info.rows, &src->info); } - dst->info.rows = src->info.rows; - dst->info.window = src->info.window; - dst->info.type = src->info.type; + dst->info = src->info; return TSDB_CODE_SUCCESS; } @@ -1708,9 +1706,9 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) int32_t colNum = taosArrayGetSize(pDataBlock->pDataBlock); int32_t rows = pDataBlock->info.rows; int32_t len = 0; - len += snprintf(dumpBuf + len, size - len, "===stream===%s |block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d\n", flag, + len += snprintf(dumpBuf + len, size - len, "===stream===%s|block type %d|child id %d|group id:%" PRIu64 "|uid:%ld|rows:%d|version:%" PRIu64 "\n", flag, (int32_t)pDataBlock->info.type, pDataBlock->info.childId, pDataBlock->info.groupId, - pDataBlock->info.uid, pDataBlock->info.rows); + pDataBlock->info.uid, pDataBlock->info.rows, pDataBlock->info.version); if (len >= size - 1) return dumpBuf; for (int32_t j = 0; j < rows; j++) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index dcf374f4c4..46519dce14 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -137,6 +137,7 @@ int32_t tsdbGetFileBlocksDistInfo(STsdbReader *pReader, STableBlockDistInfo *pTa int64_t tsdbGetNumOfRowsInMemTable(STsdbReader *pHandle); void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); +uint64_t getReaderMaxVersion(STsdbReader *pReader); int32_t tsdbLastRowReaderOpen(void *pVnode, int32_t type, SArray *pTableIdList, int32_t numOfCols, void **pReader); int32_t tsdbRetrieveLastRow(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, SArray *pTableUids); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ed2558d344..e924b29fd4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -2502,6 +2502,10 @@ void* tsdbGetIvtIdx(SMeta* pMeta) { return metaGetIvtIdx(pMeta); } +uint64_t getReaderMaxVersion(STsdbReader *pReader) { + return pReader->verRange.maxVer; +} + /** * @brief Get all suids since suid * diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d587e201ef..577f9772be 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -437,6 +437,7 @@ typedef struct SessionWindowSupporter { SStreamAggSupporter* pStreamAggSup; int64_t gap; uint8_t parentType; + SAggSupporter* pIntervalAggSup; } SessionWindowSupporter; typedef struct STimeWindowSupp { @@ -1009,6 +1010,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pStartTs, TSKEY* pEndTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool isCloseWindow(STimeWindow* pWin, STimeWindowAggSupp* pSup); +bool isDeletedWindow(STimeWindow* pWin, uint64_t groupId, SAggSupporter* pSup); void appendOneRow(SSDataBlock* pBlock, TSKEY* pStartTs, TSKEY* pEndTs, uint64_t* pUid); void printDataBlock(SSDataBlock* pBlock, const char* flag); diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e38034f4aa..8821dbd5a1 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1131,7 +1131,8 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock STimeWindow win = getActiveTimeWindow(NULL, &dumyInfo, tsCol[rowId], &pInfo->interval, TSDB_ORDER_ASC); // must check update info first. bool update = updateInfoIsUpdated(pInfo->pUpdateInfo, pBlock->info.uid, tsCol[rowId]); - if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup))) && out) { + if ((update || (isSignleIntervalWindow(pInfo) && isCloseWindow(&win, &pInfo->twAggSup) && + isDeletedWindow(&win, pBlock->info.groupId, pInfo->sessionSup.pIntervalAggSup))) && out) { appendOneRow(pInfo->pUpdateDataRes, tsCol + rowId, tsCol + rowId, &pBlock->info.uid); } } @@ -1337,6 +1338,9 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { case STREAM_SCAN_FROM_DATAREADER_RETRIEVE: { SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex); if (pSDB) { + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; + uint64_t version = getReaderMaxVersion(pTableScanInfo->dataReader); + updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->cond.twindows, pInfo->groupId,version); pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA; checkUpdateData(pInfo, true, pSDB, false); return pSDB; @@ -1390,6 +1394,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { setBlockIntoRes(pInfo, &block); + if (updateInfoIgnore(pInfo->pUpdateInfo, &pInfo->pRes->info.window, pInfo->pRes->info.groupId, pInfo->pRes->info.version)) { + printDataBlock(pInfo->pRes, "stream scan ignore"); + blockDataCleanup(pInfo->pRes); + continue; + } + if (pBlockInfo->rows > 0) { break; } @@ -1406,6 +1416,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // record the scan action. pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; + printDataBlock(pInfo->pRes, "stream scan"); if (pBlockInfo->rows == 0) { updateInfoDestoryColseWinSBF(pInfo->pUpdateInfo); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 63143875a3..802e1f2306 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1456,6 +1456,7 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) { static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval, SHashObj* pPullDataMap, SArray* closeWins, SArray* pRecyPages, SDiskbasedBuf* pDiscBuf) { + qDebug("===stream===close interval window"); void* pIte = NULL; size_t keyLen = 0; while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) { @@ -1772,10 +1773,11 @@ SSDataBlock* createDeleteBlock() { return pBlock; } -void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type) { +void initIntervalDownStream(SOperatorInfo* downstream, uint8_t type, SAggSupporter* pSup) { ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->sessionSup.parentType = type; + pScanInfo->sessionSup.pIntervalAggSup = pSup; } SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, @@ -1851,7 +1853,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* destroyIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); if (nodeType(pPhyNode) == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL) { - initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL); + initIntervalDownStream(downstream, QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, &pInfo->aggSup); } code = appendDownstream(pOperator, &downstream, 1); @@ -3111,7 +3113,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, NULL, destroyStreamFinalIntervalOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { - initIntervalDownStream(downstream, pPhyNode->type); + initIntervalDownStream(downstream, pPhyNode->type, &pInfo->aggSup); } code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index f2a5ba0ab5..ff1ef7b4b9 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -125,6 +125,9 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma pInfo->pCloseWinSBF = NULL; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); + pInfo->maxVersion = 0; + pInfo->scanGroupId = 0; + pInfo->scanWindow = (STimeWindow){.skey = INT64_MIN, .ekey = INT64_MAX}; return pInfo; } @@ -185,15 +188,36 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts) { } if (ts < pInfo->minTS) { + qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts); return true; } else if (res == TSDB_CODE_SUCCESS) { return false; } - qDebug("===stream===bucket:%d, tableId:%" PRIu64 ", maxTs:" PRIu64 ", maxMapTs:" PRIu64 ", ts:%" PRIu64, index, tableId, maxTs, *pMapMaxTs, ts); + qDebug("===stream===Update. tableId:%" PRIu64 ", maxTs:%" PRIu64 ", mapMaxTs:%" PRIu64 ", ts:%" PRIu64 , tableId, maxTs, *pMapMaxTs, ts); // check from tsdb api return true; } +void updateInfoSetScanRange(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) { + qDebug("===stream===groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); + pInfo->scanWindow = *pWin; + pInfo->scanGroupId = groupId; + pInfo->maxVersion = version; +} + +bool updateInfoIgnore(SUpdateInfo *pInfo, STimeWindow* pWin, uint64_t groupId, uint64_t version) { + if (!pInfo) { + return false; + } + qDebug("===stream===check groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); + if (pInfo->scanGroupId == groupId && pInfo->scanWindow.skey <= pWin->skey && + pWin->ekey <= pInfo->scanWindow.ekey && version <= pInfo->maxVersion ) { + qDebug("===stream===ignore groupId:%" PRIu64 ", startTs:%" PRIu64 ", endTs:%" PRIu64 ", version:%" PRIu64 , groupId, pWin->skey, pWin->ekey, version); + return true; + } + return false; +} + void updateInfoDestroy(SUpdateInfo *pInfo) { if (pInfo == NULL) { return;