Merge pull request #20344 from taosdata/fix/ly_stream
fix:fill history check update
This commit is contained in:
commit
24e039efad
|
@ -2044,8 +2044,8 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf)
|
|||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
// len += snprintf(dumpBuf + len, size - len, " %15lf |", *(double*)var);
|
||||
// if (len >= size - 1) return dumpBuf;
|
||||
len += snprintf(dumpBuf + len, size - len, " %15f |", *(double*)var);
|
||||
if (len >= size - 1) return dumpBuf;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_BOOL:
|
||||
len += snprintf(dumpBuf + len, size - len, " %15d |", *(bool*)var);
|
||||
|
|
|
@ -121,7 +121,8 @@ enum {
|
|||
STREAM_RECOVER_STEP__NONE = 0,
|
||||
STREAM_RECOVER_STEP__PREPARE1,
|
||||
STREAM_RECOVER_STEP__PREPARE2,
|
||||
STREAM_RECOVER_STEP__SCAN,
|
||||
STREAM_RECOVER_STEP__SCAN1,
|
||||
STREAM_RECOVER_STEP__SCAN2,
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -1724,9 +1724,9 @@ static void setBlockGroupIdByUid(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
|||
}
|
||||
}
|
||||
|
||||
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey) {
|
||||
static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBlock) {
|
||||
if (pInfo->pUpdateInfo) {
|
||||
checkUpdateData(pInfo, true, pInfo->pRes, true);
|
||||
checkUpdateData(pInfo, true, pBlock, true);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, endKey);
|
||||
if (pInfo->pUpdateDataRes->info.rows > 0) {
|
||||
pInfo->updateResIndex = 0;
|
||||
|
@ -1758,11 +1758,13 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer1;
|
||||
qDebug("stream recover step 1, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
|
||||
pTSInfo->base.cond.endVersion);
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN1;
|
||||
} else {
|
||||
pTSInfo->base.cond.startVersion = pTaskInfo->streamInfo.fillHistoryVer1 + 1;
|
||||
pTSInfo->base.cond.endVersion = pTaskInfo->streamInfo.fillHistoryVer2;
|
||||
qDebug("stream recover step 2, from %" PRId64 " to %" PRId64, pTSInfo->base.cond.startVersion,
|
||||
pTSInfo->base.cond.endVersion);
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN2;
|
||||
}
|
||||
|
||||
/*resetTableScanInfo(pTSInfo, pWin);*/
|
||||
|
@ -1772,11 +1774,11 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
|
||||
pTSInfo->scanTimes = 0;
|
||||
pTSInfo->currentGroupId = -1;
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
|
||||
pTaskInfo->streamInfo.recoverScanFinished = false;
|
||||
}
|
||||
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1 ||
|
||||
pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN2) {
|
||||
if (pInfo->blockRecoverContiCnt > 100) {
|
||||
pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
|
||||
pInfo->blockRecoverContiCnt = 0;
|
||||
|
@ -1789,6 +1791,27 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
printDataBlock(pInfo->pRecoverRes, "scan recover");
|
||||
return pInfo->pRecoverRes;
|
||||
} break;
|
||||
case STREAM_SCAN_FROM_UPDATERES: {
|
||||
generateScanRange(pInfo, pInfo->pUpdateDataRes, pInfo->pUpdateRes);
|
||||
prepareRangeScan(pInfo, pInfo->pUpdateRes, &pInfo->updateResIndex);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RANGE;
|
||||
return pInfo->pUpdateRes;
|
||||
} break;
|
||||
case STREAM_SCAN_FROM_DATAREADER_RANGE: {
|
||||
SSDataBlock* pSDB = doRangeScan(pInfo, pInfo->pUpdateRes, pInfo->primaryTsIndex, &pInfo->updateResIndex);
|
||||
if (pSDB) {
|
||||
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
|
||||
uint64_t version = getReaderMaxVersion(pTableScanInfo->base.dataReader);
|
||||
updateInfoSetScanRange(pInfo->pUpdateInfo, &pTableScanInfo->base.cond.twindows, pInfo->groupId, version);
|
||||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||
checkUpdateData(pInfo, true, pSDB, false);
|
||||
// printDataBlock(pSDB, "stream scan update");
|
||||
calBlockTbName(pInfo, pSDB);
|
||||
return pSDB;
|
||||
}
|
||||
blockDataCleanup(pInfo->pUpdateDataRes);
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
} break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -1798,8 +1821,12 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
pInfo->blockRecoverContiCnt++;
|
||||
calBlockTbName(pInfo, pInfo->pRecoverRes);
|
||||
if (pInfo->pUpdateInfo) {
|
||||
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN1) {
|
||||
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pInfo->pRecoverRes, pInfo->primaryTsIndex);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, maxTs);
|
||||
} else {
|
||||
doCheckUpdate(pInfo, pInfo->pRecoverRes->info.window.ekey, pInfo->pRecoverRes);
|
||||
}
|
||||
}
|
||||
if (pInfo->pCreateTbRes->info.rows > 0) {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_RES;
|
||||
|
@ -1910,7 +1937,7 @@ FETCH_NEXT_BLOCK:
|
|||
switch (pInfo->scanMode) {
|
||||
case STREAM_SCAN_FROM_RES: {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey);
|
||||
doCheckUpdate(pInfo, pInfo->pRes->info.window.ekey, pInfo->pRes);
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
pInfo->pRes->info.dataLoad = 1;
|
||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||
|
@ -2011,7 +2038,7 @@ FETCH_NEXT_BLOCK:
|
|||
return pInfo->pCreateTbRes;
|
||||
}
|
||||
|
||||
doCheckUpdate(pInfo, pBlockInfo->window.ekey);
|
||||
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
|
||||
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
|
||||
pInfo->pRes->info.dataLoad = 1;
|
||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||
|
|
|
@ -4805,10 +4805,9 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
setInverFunction(pSup->pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.type);
|
||||
}
|
||||
|
||||
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
|
||||
pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
pInfo->twAggSup.minTs = TMIN(pInfo->twAggSup.minTs, pBlock->info.window.skey);
|
||||
|
||||
doStreamIntervalAggImpl(pOperator, pBlock, pBlock->info.id.groupId, pInfo->pUpdatedMap);
|
||||
}
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
removeDeleteResults(pInfo->pUpdatedMap, pInfo->pDelWins);
|
||||
|
|
Loading…
Reference in New Issue