From 4d6b9d4a1548f80a74f3cd89131cdff6cf56b031 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 26 Jul 2023 09:08:17 +0800 Subject: [PATCH] fix(stream): split delete msg for real-time scan from wal. --- source/libs/executor/src/scanoperator.c | 99 +++++++++++++++++-------- 1 file changed, 70 insertions(+), 29 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 2059ed18e5..73cc09cfca 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1774,6 +1774,67 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) } } +static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { + if (pWindow->skey != INT64_MIN) { + qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey); + + bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); + for(int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*) colDataGetData(pCol, i); + p[i] = (*ts >= pWindow->skey); + + if (!p[i]) { + hasUnqualified = true; + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + taosMemoryFree(p); + } +} + +// re-build the delete block, ONLY according to the split timestamp +static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) { + if (skey == INT64_MIN) { + return; + } + + int32_t numOfRows = pBlock->info.rows; + + bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData; + SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData; + + for (int32_t i = 0; i < numOfRows; i++) { + if (tsStartCol[i] < skey) { + tsStartCol[i] = skey; + } + + if (tsEndCol[i] >= skey) { + p[i] = true; + } else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX] + hasUnqualified = true; + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows); + taosMemoryFree(p); +} + static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -1922,6 +1983,7 @@ FETCH_NEXT_BLOCK: if (pInfo->pUpdateInfo) { pInfo->pUpdateInfo->maxDataVersion = TMAX(pInfo->pUpdateInfo->maxDataVersion, pBlock->info.version); } + blockDataUpdateTsWindow(pBlock, 0); switch (pBlock->info.type) { case STREAM_NORMAL: @@ -1944,7 +2006,9 @@ FETCH_NEXT_BLOCK: } else { pDelBlock = pBlock; } + setBlockGroupIdByUid(pInfo, pDelBlock); + rebuildDeleteBlockData(pDelBlock, pStreamInfo->fillHistoryWindow.skey, id); printDataBlock(pDelBlock, "stream scan delete recv filtered"); if (pDelBlock->info.rows == 0) { if (pInfo->tqReader) { @@ -1952,6 +2016,7 @@ FETCH_NEXT_BLOCK: } goto FETCH_NEXT_BLOCK; } + if (!isIntervalWindow(pInfo) && !isSessionWindow(pInfo) && !isStateWindow(pInfo)) { generateDeleteResultBlock(pInfo, pDelBlock, pInfo->pDeleteDataRes); pInfo->pDeleteDataRes->info.type = STREAM_DELETE_RESULT; @@ -2093,39 +2158,15 @@ FETCH_NEXT_BLOCK: doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock); doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL); - { // do additional time window filter - STimeWindow* pWindow = &pStreamInfo->fillHistoryWindow; - - if (pWindow->skey != INT64_MIN) { - qDebug("%s filter for additional history window, skey:%"PRId64, id, pWindow->skey); - - bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); - bool hasUnqualified = false; - - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); - for(int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t* ts = (int64_t*) colDataGetData(pCol, i); - p[i] = (*ts >= pWindow->skey); - - if (!p[i]) { - hasUnqualified = true; - } - } - - if (hasUnqualified) { - trimDataBlock(pBlock, pBlock->info.rows, p); - } - - taosMemoryFree(p); - } - } + // apply additional time window filter + doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); pBlock->info.dataLoad = 1; blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex); - qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, - pInfo->pUpdateDataRes->info.rows); - if (pBlockInfo->rows > 0 || pInfo->pUpdateDataRes->info.rows > 0) { + int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; + qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); + if (pBlockInfo->rows > 0 || numOfUpdateRes > 0) { break; } }