fix(stream): set the time window filter before generating create table result.
This commit is contained in:
parent
6c9fa4d7c8
commit
92d1dbd9ad
|
@ -2120,8 +2120,7 @@ FETCH_NEXT_BLOCK:
|
||||||
return pInfo->pUpdateRes;
|
return pInfo->pUpdateRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock = pInfo->pRes;
|
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||||
SDataBlockInfo* pBlockInfo = &pBlock->info;
|
|
||||||
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
|
int32_t totalBlocks = taosArrayGetSize(pInfo->pBlockLists);
|
||||||
|
|
||||||
NEXT_SUBMIT_BLK:
|
NEXT_SUBMIT_BLK:
|
||||||
|
@ -2145,35 +2144,36 @@ FETCH_NEXT_BLOCK:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pInfo->pRes);
|
||||||
|
|
||||||
while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
|
while (pAPI->tqReaderFn.tqNextBlockImpl(pInfo->tqReader, id)) {
|
||||||
SSDataBlock* pRes = NULL;
|
SSDataBlock* pRes = NULL;
|
||||||
|
|
||||||
int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
|
int32_t code = pAPI->tqReaderFn.tqRetrieveBlock(pInfo->tqReader, &pRes, id);
|
||||||
qDebug("retrieve data from submit completed code:%s, rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows,
|
qDebug("retrieve data from submit completed code:%s rows:%" PRId64 " %s", tstrerror(code), pRes->info.rows, id);
|
||||||
id);
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
|
if (code != TSDB_CODE_SUCCESS || pRes->info.rows == 0) {
|
||||||
qDebug("retrieve data failed, try next block in submit block, %s", id);
|
qDebug("retrieve data failed, try next block in submit block, %s", id);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
setBlockIntoRes(pInfo, pRes, false);
|
// filter the block extracted from WAL files, according to the time window
|
||||||
|
// apply additional time window filter
|
||||||
|
doBlockDataWindowFilter(pRes, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id);
|
||||||
|
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||||
|
if (pRes->info.rows == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
setBlockIntoRes(pInfo, pRes, false);
|
||||||
if (pInfo->pCreateTbRes->info.rows > 0) {
|
if (pInfo->pCreateTbRes->info.rows > 0) {
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||||
qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id);
|
qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id);
|
||||||
return pInfo->pCreateTbRes;
|
return pInfo->pCreateTbRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
// apply additional time window filter
|
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
|
||||||
doBlockDataWindowFilter(pBlock, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id);
|
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
pBlock->info.dataLoad = 1;
|
|
||||||
blockDataUpdateTsWindow(pBlock, pInfo->primaryTsIndex);
|
|
||||||
|
|
||||||
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pBlock);
|
|
||||||
doFilter(pBlock, pOperator->exprSupp.pFilterInfo, NULL);
|
|
||||||
|
|
||||||
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
|
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
|
||||||
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
|
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);
|
||||||
|
@ -2195,7 +2195,7 @@ FETCH_NEXT_BLOCK:
|
||||||
|
|
||||||
qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id);
|
qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id);
|
||||||
if (pBlockInfo->rows > 0) {
|
if (pBlockInfo->rows > 0) {
|
||||||
return pBlock;
|
return pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
||||||
|
|
|
@ -80,6 +80,7 @@ int32_t streamTaskLaunchScanHistory(SStreamTask* pTask) {
|
||||||
walReaderGetCurrentVer(pTask->exec.pWalReader));
|
walReaderGetCurrentVer(pTask->exec.pWalReader));
|
||||||
}
|
}
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
|
||||||
|
streamTaskEnablePause(pTask);
|
||||||
streamSetParamForScanHistory(pTask);
|
streamSetParamForScanHistory(pTask);
|
||||||
streamTaskScanHistoryPrepare(pTask);
|
streamTaskScanHistoryPrepare(pTask);
|
||||||
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
} else if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
|
||||||
|
|
Loading…
Reference in New Issue