Merge pull request #22243 from taosdata/fix/3_liaohj
fix(stream): initialize the filter window initial range.
This commit is contained in:
commit
80ad5a0abb
|
@ -346,9 +346,9 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
rsp.status = streamTaskCheckStatus(pTask);
|
rsp.status = streamTaskCheckStatus(pTask);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
|
|
||||||
qDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
|
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||||
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
|
qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
|
||||||
streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
|
pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
} else {
|
} else {
|
||||||
rsp.status = 0;
|
rsp.status = 0;
|
||||||
qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
|
qDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64
|
||||||
|
|
|
@ -1041,9 +1041,9 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
rsp.status = streamTaskCheckStatus(pTask);
|
rsp.status = streamTaskCheckStatus(pTask);
|
||||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||||
|
|
||||||
tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), status:%s, rsp status %d",
|
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
|
||||||
pTask->id.idStr, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId,
|
tqDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
|
||||||
streamGetTaskStatusStr(pTask->status.taskStatus), rsp.status);
|
pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
|
||||||
} else {
|
} else {
|
||||||
rsp.status = 0;
|
rsp.status = 0;
|
||||||
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
|
tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
|
||||||
|
|
|
@ -251,19 +251,18 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
||||||
int32_t status = pTask->status.taskStatus;
|
int32_t status = pTask->status.taskStatus;
|
||||||
|
|
||||||
// non-source or fill-history tasks don't need to response the WAL scan action.
|
// non-source or fill-history tasks don't need to response the WAL scan action.
|
||||||
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
if ((pTask->info.taskLevel != TASK_LEVEL__SOURCE) || (pTask->status.downstreamReady == 0)) {
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (status != TASK_STATUS__NORMAL/* && status != TASK_STATUS__SCAN_HISTORY_WAL*/) {
|
if (status != TASK_STATUS__NORMAL) {
|
||||||
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, streamGetTaskStatusStr(status));
|
||||||
streamMetaReleaseTask(pStreamMeta, pTask);
|
streamMetaReleaseTask(pStreamMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
|
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
|
||||||
// ASSERT(status == TASK_STATUS__SCAN_HISTORY_WAL);
|
|
||||||
ASSERT(status == TASK_STATUS__NORMAL);
|
ASSERT(status == TASK_STATUS__NORMAL);
|
||||||
// the maximum version of data in the WAL has reached already, the step2 is done
|
// the maximum version of data in the WAL has reached already, the step2 is done
|
||||||
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
|
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
|
||||||
|
|
|
@ -341,6 +341,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
qResetStreamInfoTimeWindow(pTaskInfo);
|
||||||
return pTaskInfo;
|
return pTaskInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1550,10 +1550,86 @@ static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, bool filter) {
|
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
|
||||||
|
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
|
||||||
|
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
|
||||||
|
bool hasUnqualified = false;
|
||||||
|
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
||||||
|
|
||||||
|
if (pWindow->skey != INT64_MIN) {
|
||||||
|
qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey);
|
||||||
|
|
||||||
|
ASSERT(pCol->pData != NULL);
|
||||||
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
|
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
|
||||||
|
p[i] = (*ts >= pWindow->skey);
|
||||||
|
|
||||||
|
if (!p[i]) {
|
||||||
|
hasUnqualified = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (pWindow->ekey != INT64_MAX) {
|
||||||
|
qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey);
|
||||||
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
|
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
|
||||||
|
p[i] = (*ts <= pWindow->ekey);
|
||||||
|
|
||||||
|
if (!p[i]) {
|
||||||
|
hasUnqualified = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasUnqualified) {
|
||||||
|
trimDataBlock(pBlock, pBlock->info.rows, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// re-build the delete block, ONLY according to the split timestamp
|
||||||
|
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) {
|
||||||
|
if (skey == INT64_MIN) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfRows = pBlock->info.rows;
|
||||||
|
|
||||||
|
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
|
||||||
|
bool hasUnqualified = false;
|
||||||
|
|
||||||
|
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||||
|
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
|
||||||
|
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||||
|
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfRows; i++) {
|
||||||
|
if (tsStartCol[i] < skey) {
|
||||||
|
tsStartCol[i] = skey;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsEndCol[i] >= skey) {
|
||||||
|
p[i] = true;
|
||||||
|
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
|
||||||
|
hasUnqualified = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (hasUnqualified) {
|
||||||
|
trimDataBlock(pBlock, pBlock->info.rows, p);
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
|
||||||
|
taosMemoryFree(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock, STimeWindow* pTimeWindow, bool filter) {
|
||||||
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
SDataBlockInfo* pBlockInfo = &pInfo->pRes->info;
|
||||||
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
|
SOperatorInfo* pOperator = pInfo->pStreamScanOp;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
const char* id = GET_TASKID(pTaskInfo);
|
||||||
|
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
|
blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
|
||||||
|
|
||||||
|
@ -1593,7 +1669,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
if (pInfo->numOfPseudoExpr > 0) {
|
if (pInfo->numOfPseudoExpr > 0) {
|
||||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||||
pBlockInfo->rows, GET_TASKID(pTaskInfo), &pTableScanInfo->base.metaCache);
|
pBlockInfo->rows, id, &pTableScanInfo->base.metaCache);
|
||||||
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
|
// ignore the table not exists error, since this table may have been dropped during the scan procedure.
|
||||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||||
|
@ -1608,8 +1684,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// filter the block extracted from WAL files, according to the time window apply additional time window filter
|
||||||
|
doBlockDataWindowFilter(pInfo->pRes, pInfo->primaryTsIndex, pTimeWindow, id);
|
||||||
pInfo->pRes->info.dataLoad = 1;
|
pInfo->pRes->info.dataLoad = 1;
|
||||||
|
|
||||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||||
|
if (pInfo->pRes->info.rows == 0) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
calBlockTbName(pInfo, pInfo->pRes);
|
calBlockTbName(pInfo, pInfo->pRes);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -1666,7 +1748,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
|
||||||
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
|
qDebug("doQueueScan get data from log %" PRId64 " rows, version:%" PRId64, pRes->info.rows,
|
||||||
pTaskInfo->streamInfo.currentOffset.version);
|
pTaskInfo->streamInfo.currentOffset.version);
|
||||||
blockDataCleanup(pInfo->pRes);
|
blockDataCleanup(pInfo->pRes);
|
||||||
setBlockIntoRes(pInfo, pRes, true);
|
STimeWindow defaultWindow = {.skey = INT64_MIN, .ekey = INT64_MAX};
|
||||||
|
setBlockIntoRes(pInfo, pRes, &defaultWindow, true);
|
||||||
if (pInfo->pRes->info.rows > 0) {
|
if (pInfo->pRes->info.rows > 0) {
|
||||||
return pInfo->pRes;
|
return pInfo->pRes;
|
||||||
}
|
}
|
||||||
|
@ -1775,80 +1858,6 @@ void streamScanOperatorDecode(void* pBuff, int32_t len, SStreamScanInfo* pInfo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doBlockDataWindowFilter(SSDataBlock* pBlock, int32_t tsIndex, STimeWindow* pWindow, const char* id) {
|
|
||||||
if (pWindow->skey != INT64_MIN || pWindow->ekey != INT64_MAX) {
|
|
||||||
bool* p = taosMemoryCalloc(pBlock->info.rows, sizeof(bool));
|
|
||||||
bool hasUnqualified = false;
|
|
||||||
|
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, tsIndex);
|
|
||||||
|
|
||||||
if (pWindow->skey != INT64_MIN) {
|
|
||||||
qDebug("%s filter for additional history window, skey:%" PRId64, id, pWindow->skey);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
|
||||||
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
|
|
||||||
p[i] = (*ts >= pWindow->skey);
|
|
||||||
|
|
||||||
if (!p[i]) {
|
|
||||||
hasUnqualified = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if (pWindow->ekey != INT64_MAX) {
|
|
||||||
qDebug("%s filter for additional history window, ekey:%" PRId64, id, pWindow->ekey);
|
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
|
||||||
int64_t* ts = (int64_t*)colDataGetData(pCol, i);
|
|
||||||
p[i] = (*ts <= pWindow->ekey);
|
|
||||||
|
|
||||||
if (!p[i]) {
|
|
||||||
hasUnqualified = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (hasUnqualified) {
|
|
||||||
trimDataBlock(pBlock, pBlock->info.rows, p);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosMemoryFree(p);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// re-build the delete block, ONLY according to the split timestamp
|
|
||||||
static void rebuildDeleteBlockData(SSDataBlock* pBlock, int64_t skey, const char* id) {
|
|
||||||
if (skey == INT64_MIN) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfRows = pBlock->info.rows;
|
|
||||||
|
|
||||||
bool* p = taosMemoryCalloc(numOfRows, sizeof(bool));
|
|
||||||
bool hasUnqualified = false;
|
|
||||||
|
|
||||||
SColumnInfoData* pSrcStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
|
||||||
uint64_t* tsStartCol = (uint64_t*)pSrcStartCol->pData;
|
|
||||||
SColumnInfoData* pSrcEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
|
||||||
uint64_t* tsEndCol = (uint64_t*)pSrcEndCol->pData;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfRows; i++) {
|
|
||||||
if (tsStartCol[i] < skey) {
|
|
||||||
tsStartCol[i] = skey;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsEndCol[i] >= skey) {
|
|
||||||
p[i] = true;
|
|
||||||
} else { // this row should be removed, since it is not in this query time window, which is [skey, INT64_MAX]
|
|
||||||
hasUnqualified = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (hasUnqualified) {
|
|
||||||
trimDataBlock(pBlock, pBlock->info.rows, p);
|
|
||||||
}
|
|
||||||
|
|
||||||
qDebug("%s re-build delete datablock, start key revised to:%"PRId64", rows:%"PRId64, id, skey, pBlock->info.rows);
|
|
||||||
taosMemoryFree(p);
|
|
||||||
}
|
|
||||||
|
|
||||||
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
||||||
// NOTE: this operator does never check if current status is done or not
|
// NOTE: this operator does never check if current status is done or not
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -2158,15 +2167,11 @@ FETCH_NEXT_BLOCK:
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// filter the block extracted from WAL files, according to the time window
|
setBlockIntoRes(pInfo, pRes, &pStreamInfo->fillHistoryWindow, false);
|
||||||
// apply additional time window filter
|
if (pInfo->pRes->info.rows == 0) {
|
||||||
doBlockDataWindowFilter(pRes, pInfo->primaryTsIndex, &pStreamInfo->fillHistoryWindow, id);
|
|
||||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
|
||||||
if (pRes->info.rows == 0) {
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
setBlockIntoRes(pInfo, pRes, false);
|
|
||||||
if (pInfo->pCreateTbRes->info.rows > 0) {
|
if (pInfo->pCreateTbRes->info.rows > 0) {
|
||||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||||
qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id);
|
qDebug("create table res exists, rows:%"PRId64" return from stream scan, %s", pInfo->pCreateTbRes->info.rows, id);
|
||||||
|
|
|
@ -379,7 +379,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
|
qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
||||||
int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
|
int32_t code = taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
destroyStreamDataBlock((SStreamDataBlock*) pItem);
|
destroyStreamDataBlock((SStreamDataBlock*) pItem);
|
||||||
|
|
Loading…
Reference in New Issue