diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index b0ddefce79..7235a56691 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -346,9 +346,9 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pSnode->pMeta, pTask); - qDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", - pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, - streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", + pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ccdf0c88a5..db9ef7805f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1041,9 +1041,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d", - pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, - streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status); + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + tqDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", + pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index f4d82e456e..c3e7d03e43 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -251,19 +251,18 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t status = pTask->status.taskStatus; // non-source or fill-history tasks don't need to response the WAL scan action. - if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) { + if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) { streamMetaReleaseTask(pStreamMeta, pTask); continue; } - if (status != TASK_STATUS__NORMAL/* && status != TASK_STATUS__SCAN_HISTORY_WAL*/) { + if (status != TASK_STATUS__NORMAL) { tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status)); streamMetaReleaseTask(pStreamMeta, pTask); continue; } if ((pTask->info.fillHistory == 1) && pTask->status.transferState) { -// ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL); ASSERT(status == TASK_STATUS__NORMAL); // the maximum version of data in the WAL has reached already, the step2 is done tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr, diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index b85305b32d..5a99c1ea9a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -341,6 +341,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v return NULL; } + qResetStreamInfoTimeWindow(pTaskInfo); return pTaskInfo; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ad95297384..7434db61db 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1550,10 +1550,86 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock } } -static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) { +static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { + if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) { + bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); + + if (pWindow->skey != INT64_MIN) { + qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey); + + ASSERT(pCol->pData != NULL); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts >= pWindow->skey); + + if (!p[i]) { + hasUnqualified = true; + } + } + } else if (pWindow->ekey != INT64_MAX) { + qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t* ts = (int64_t*)colDataGetData(pCol, i); + p[i] = (*ts <= pWindow->ekey); + + if (!p[i]) { + hasUnqualified = true; + } + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + taosMemoryFree(p); + } +} + +// re-build the delete block, ONLY according to the split timestamp +static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) { + if (skey == INT64_MIN) { + return; + } + + int32_t numOfRows = pBlock->info.rows; + + bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); + bool hasUnqualified = false; + + SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); + uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData; + SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); + uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData; + + for (int32_t i = 0; i < numOfRows; i++) { + if (tsStartCol[i] < skey) { + tsStartCol[i] = skey; + } + + if (tsEndCol[i] >= skey) { + p[i] = true; + } else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX] + hasUnqualified = true; + } + } + + if (hasUnqualified) { + trimDataBlock(pBlock, pBlock->info.rows, p); + } + + qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows); + taosMemoryFree(p); +} + +static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; SOperatorInfo* pOperator = pInfo->pStreamScanOp; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + const char* id = GET_TASKID(pTaskInfo); blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows); @@ -1593,7 +1669,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock // currently only the tbname pseudo column if (pInfo->numOfPseudoExpr > 0) { int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes, - pBlockInfo->rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache); + pBlockInfo->rows, id, &pTableScanInfo->base.metaCache); // ignore the table not exists error, since this table may have been dropped during the scan procedure. if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) { blockDataFreeRes((SSDataBlock*)pBlock); @@ -1608,8 +1684,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); } + // filter the block extracted from WAL files, according to the time window apply additional time window filter + doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id); pInfo->pRes->info.dataLoad = 1; + blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); + if (pInfo->pRes->info.rows == 0) { + return 0; + } calBlockTbName(pInfo, pInfo->pRes); return 0; @@ -1666,7 +1748,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows, pTaskInfo->streamInfo.currentOffset.version); blockDataCleanup(pInfo->pRes); - setBlockIntoRes(pInfo, pRes, true); + STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX}; + setBlockIntoRes(pInfo, pRes, &defaultWindow, true); if (pInfo->pRes->info.rows > 0) { return pInfo->pRes; } @@ -1775,80 +1858,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo) } } -static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) { - if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) { - bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool)); - bool hasUnqualified = false; - - SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex); - - if (pWindow->skey != INT64_MIN) { - qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey); - - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t* ts = (int64_t*)colDataGetData(pCol, i); - p[i] = (*ts >= pWindow->skey); - - if (!p[i]) { - hasUnqualified = true; - } - } - } else if (pWindow->ekey != INT64_MAX) { - qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey); - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t* ts = (int64_t*)colDataGetData(pCol, i); - p[i] = (*ts <= pWindow->ekey); - - if (!p[i]) { - hasUnqualified = true; - } - } - } - - if (hasUnqualified) { - trimDataBlock(pBlock, pBlock->info.rows, p); - } - - taosMemoryFree(p); - } -} - -// re-build the delete block, ONLY according to the split timestamp -static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) { - if (skey == INT64_MIN) { - return; - } - - int32_t numOfRows = pBlock->info.rows; - - bool* p = taosMemoryCalloc(numOfRows, sizeof(bool)); - bool hasUnqualified = false; - - SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX); - uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData; - SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX); - uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData; - - for (int32_t i = 0; i < numOfRows; i++) { - if (tsStartCol[i] < skey) { - tsStartCol[i] = skey; - } - - if (tsEndCol[i] >= skey) { - p[i] = true; - } else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX] - hasUnqualified = true; - } - } - - if (hasUnqualified) { - trimDataBlock(pBlock, pBlock->info.rows, p); - } - - qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows); - taosMemoryFree(p); -} - static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -2158,15 +2167,11 @@ FETCH_NEXT_BLOCK: continue; } - // filter the block extracted from WAL files, according to the time window - // apply additional time window filter - doBlockDataWindowFilter(pRes, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id); - blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex); - if (pRes->info.rows == 0) { + setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false); + if (pInfo->pRes->info.rows == 0) { continue; } - setBlockIntoRes(pInfo, pRes, false); if (pInfo->pCreateTbRes->info.rows > 0) { pInfo->scanMode = STREAM_SCAN_FROM_RES; qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index ba8e358f68..f85ade591c 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -379,7 +379,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); + qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem); if (code != TSDB_CODE_SUCCESS) { destroyStreamDataBlock((SStreamDataBlock*) pItem);