From d7121249211f756ce62383f3369ea825881af160 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 28 Jul 2023 17:25:56 +0800 Subject: [PATCH 1/5] init scan state --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 6eee9df528..6252bff002 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2587,7 +2587,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->igCheckUpdate = pTableScanNode->igCheckUpdate; pInfo->igExpired = pTableScanNode->igExpired; pInfo->twAggSup.maxTs = INT64_MIN; - pInfo->pState = NULL; + pInfo->pState = pTaskInfo->streamInfo.pState; pInfo->stateStore = pTaskInfo->storageAPI.stateStore; pInfo->readerFn = pTaskInfo->storageAPI.tqReaderFn; From 73ed6fe64909322b4fc9d8b5923cb25f1566433d Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Fri, 28 Jul 2023 19:30:26 +0800 Subject: [PATCH 2/5] save compace window info --- source/libs/executor/src/timewindowoperator.c | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 16be9f6ca2..93a84c6f04 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3736,7 +3736,6 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { setSessionOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].win.ekey, pSeKeyBuf[i].groupId, &winInfo); int32_t winNum = compactSessionWindow(pOperator, &winInfo, pInfo->pStUpdated, pInfo->pStDeleted, true); if (winNum > 0) { - saveSessionOutputBuf(pAggSup, &winInfo); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResult(winInfo, pInfo->pStUpdated); } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { @@ -3747,9 +3746,8 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { getSessionHashKey(&winInfo.sessionWin, &key); tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo)); } - } else { - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)winInfo.pOutputBuf, &pAggSup->stateStore); } + saveSessionOutputBuf(pAggSup, &winInfo); } taosMemoryFree(pBuf); @@ -4398,7 +4396,6 @@ void streamStateReloadState(SOperatorInfo* pOperator) { setStateOutputBuf(pAggSup, pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, NULL, &curInfo, &nextInfo); if (compareStateKey(curInfo.pStateKey,nextInfo.pStateKey)) { compactStateWindow(pOperator, &curInfo.winInfo, &nextInfo.winInfo, pInfo->pSeUpdated, pInfo->pSeUpdated); - saveSessionOutputBuf(pAggSup, &curInfo.winInfo); if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE) { saveResult(curInfo.winInfo, pInfo->pSeUpdated); } else if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) { @@ -4412,7 +4409,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { } if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { - releaseOutputBuf(pAggSup->pState, NULL, (SResultRow*)curInfo.winInfo.pOutputBuf, &pAggSup->pSessionAPI->stateStore); + saveSessionOutputBuf(pAggSup, &curInfo.winInfo); } if (IS_VALID_SESSION_WIN(nextInfo.winInfo)) { From 0bd2548937fa160cbd4f187d079ea280ef0b170d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Jul 2023 16:41:01 +0800 Subject: [PATCH 3/5] fix(stream): initialize the filter window initial range. --- source/dnode/snode/src/snode.c | 6 +- source/dnode/vnode/src/tq/tq.c | 6 +- source/dnode/vnode/src/tq/tqRestore.c | 5 +- source/libs/executor/src/executor.c | 1 + source/libs/executor/src/scanoperator.c | 175 ++++++++++++------------ source/libs/stream/src/stream.c | 2 +- 6 files changed, 101 insertions(+), 94 deletions(-) diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index b0ddefce79..7235a56691 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -346,9 +346,9 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pSnode->pMeta, pTask); - qDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", - pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, - streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", + pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ccdf0c88a5..db9ef7805f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1041,9 +1041,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", - pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, - streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + tqDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", + pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index f4d82e456e..c3e7d03e43 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -251,19 +251,18 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t status = pTask->status.taskStatus; // non-source or fill-history tasks don't need to response the WAL scan action. - if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } - if (status != TASK_STATUS__NORMAL/* && status != TASK_STATUS__SCAN_HISTORY_WAL*/) { + if (status != TASK_STATUS__NORMAL) { tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); streamMetaReleaseTask(pStreamMeta, pTask); continue; } if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { -// ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL); ASSERT(status == TASK_STATUS__NORMAL); // the maximum version of data in the WAL has reached already, the step2 is done tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b85305b32d..5a99c1ea9a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -341,6 +341,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v return NULL; } + qResetStreamInfoTimeWindow(pTaskInfo); return pTaskInfo; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 3d5e4a7d5f..b30c2d32ee 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1550,10 +1550,88 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock } } -static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) { +static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { + if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) { + bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); + + if (pWindow->skey != INT64_MIN) { + qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey); + + ASSERT(pCol->pData != NULL); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + printf("-------------------%"PRId64"\n", *ts); + + p[i] = (*ts >= pWindow->skey); + + if (!p[i]) { + hasUnqualified = true; + } + } + } else if (pWindow->ekey != INT64_MAX) { + qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts <= pWindow->ekey); + + if (!p[i]) { + hasUnqualified = true; + } + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + taosMemoryFree(p); + } +} + +// re-build the delete block, ONLY according to the split timestamp +static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) { + if (skey == INT64_MIN) { + return; + } + + int32_t numOfRows = pBlock->info.rows; + + bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData; + SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData; + + for (int32_t i = 0; i < numOfRows; i++) { + if (tsStartCol[i] < skey) { + tsStartCol[i] = skey; + } + + if (tsEndCol[i] >= skey) { + p[i] = true; + } else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX] + hasUnqualified = true; + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows); + taosMemoryFree(p); +} + +static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SOperatorInfo* pOperator = pInfo->pStreamScanOp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + const char* id = GET_TASKID(pTaskInfo); blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); @@ -1593,7 +1671,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, - pBlockInfo->rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache); + pBlockInfo->rows, id, &pTableScanInfo->base.metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { blockDataFreeRes((SSDataBlock*)pBlock); @@ -1608,8 +1686,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); } - pInfo->pRes->info.dataLoad = 1; + // filter the block extracted from WAL files, according to the time window apply additional time window filter + doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id); blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); + pInfo->pRes->info.dataLoad = 1; + + if (pInfo->pRes->info.rows == 0) { + return 0; + } calBlockTbName(pInfo, pInfo->pRes); return 0; @@ -1666,7 +1750,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version); blockDataCleanup(pInfo->pRes); - setBlockIntoRes(pInfo, pRes, true); + STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX}; + setBlockIntoRes(pInfo, pRes, &defaultWindow, true); if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; } @@ -1775,80 +1860,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) } } -static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { - if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) { - bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); - bool hasUnqualified = false; - - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); - - if (pWindow->skey != INT64_MIN) { - qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey); - - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t* ts = (int64_t*)colDataGetData(pCol, i); - p[i] = (*ts >= pWindow->skey); - - if (!p[i]) { - hasUnqualified = true; - } - } - } else if (pWindow->ekey != INT64_MAX) { - qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey); - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t* ts = (int64_t*)colDataGetData(pCol, i); - p[i] = (*ts <= pWindow->ekey); - - if (!p[i]) { - hasUnqualified = true; - } - } - } - - if (hasUnqualified) { - trimDataBlock(pBlock, pBlock->info.rows, p); - } - - taosMemoryFree(p); - } -} - -// re-build the delete block, ONLY according to the split timestamp -static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) { - if (skey == INT64_MIN) { - return; - } - - int32_t numOfRows = pBlock->info.rows; - - bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); - bool hasUnqualified = false; - - SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); - uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData; - SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); - uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData; - - for (int32_t i = 0; i < numOfRows; i++) { - if (tsStartCol[i] < skey) { - tsStartCol[i] = skey; - } - - if (tsEndCol[i] >= skey) { - p[i] = true; - } else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX] - hasUnqualified = true; - } - } - - if (hasUnqualified) { - trimDataBlock(pBlock, pBlock->info.rows, p); - } - - qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows); - taosMemoryFree(p); -} - static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2158,15 +2169,11 @@ FETCH_NEXT_BLOCK: continue; } - // filter the block extracted from WAL files, according to the time window - // apply additional time window filter - doBlockDataWindowFilter(pRes, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); - blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); - if (pRes->info.rows == 0) { + setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false); + if (pInfo->pRes->info.rows == 0) { continue; } - setBlockIntoRes(pInfo, pRes, false); if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index ba8e358f68..f85ade591c 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -379,7 +379,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); + qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); if (code != TSDB_CODE_SUCCESS) { destroyStreamDataBlock((SStreamDataBlock*) pItem); From 94551e6e464f64e8ba33085b5d5c3b7f7dadd605 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Jul 2023 18:48:22 +0800 Subject: [PATCH 4/5] fix(stream): update the load block position. --- source/libs/executor/src/scanoperator.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index b30c2d32ee..067de55001 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1688,9 +1688,9 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // filter the block extracted from WAL files, according to the time window apply additional time window filter doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id); - blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); pInfo->pRes->info.dataLoad = 1; + blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); if (pInfo->pRes->info.rows == 0) { return 0; } From 399088e2bd589cbffb1acb792bb0a700f1a13c2f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 29 Jul 2023 19:23:35 +0800 Subject: [PATCH 5/5] fix(stream): remove invalid code. --- source/libs/executor/src/scanoperator.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 067de55001..887aa51665 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1563,8 +1563,6 @@ static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeW ASSERT(pCol->pData != NULL); for (int32_t i = 0; i < pBlock->info.rows; ++i) { int64_t* ts = (int64_t*)colDataGetData(pCol, i); - printf("-------------------%"PRId64"\n", *ts); - p[i] = (*ts >= pWindow->skey); if (!p[i]) {