enh: set scan limit
This commit is contained in:
parent
b2cec0f1e9
commit
39eb62d80d
|
@ -213,6 +213,7 @@ int32_t qStreamSourceRecoverStep1(qTaskInfo_t tinfo, int64_t ver);
|
|||
int32_t qStreamSourceRecoverStep2(qTaskInfo_t tinfo, int64_t ver);
|
||||
int32_t qStreamRecoverFinish(qTaskInfo_t tinfo);
|
||||
int32_t qStreamRestoreParam(qTaskInfo_t tinfo);
|
||||
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -136,6 +136,7 @@ typedef struct {
|
|||
SSchemaWrapper* schema;
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
int8_t recoverStep;
|
||||
int8_t recoverScanFinished;
|
||||
SQueryTableDataCond tableCond;
|
||||
int64_t fillHistoryVer1;
|
||||
int64_t fillHistoryVer2;
|
||||
|
@ -182,7 +183,7 @@ struct SExecTaskInfo {
|
|||
SSubplan* pSubplan;
|
||||
struct SOperatorInfo* pRoot;
|
||||
SLocalFetch localFetch;
|
||||
SArray* pResultBlockList;// result block list
|
||||
SArray* pResultBlockList; // result block list
|
||||
STaskStopInfo stopInfo;
|
||||
};
|
||||
|
||||
|
@ -263,7 +264,7 @@ typedef struct SExchangeInfo {
|
|||
// SArray<SSDataBlock*>, result block list, used to keep the multi-block that
|
||||
// passed by downstream operator
|
||||
SArray* pResultBlockList;
|
||||
SArray* pRecycledBlocks;// build a pool for small data block to avoid to repeatly create and then destroy.
|
||||
SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy.
|
||||
SSDataBlock* pDummyBlock; // dummy block, not keep data
|
||||
bool seqLoadData; // sequential load data or not, false by default
|
||||
int32_t current;
|
||||
|
@ -468,6 +469,11 @@ typedef struct SStreamScanInfo {
|
|||
SNodeList* pGroupTags;
|
||||
SNode* pTagCond;
|
||||
SNode* pTagIndexCond;
|
||||
|
||||
// recover
|
||||
int32_t blockRecoverContiCnt;
|
||||
int32_t blockRecoverTotCnt;
|
||||
|
||||
} SStreamScanInfo;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -936,6 +936,10 @@ int32_t qStreamRestoreParam(qTaskInfo_t tinfo) {
|
|||
}
|
||||
return 0;
|
||||
}
|
||||
bool qStreamRecoverScanFinished(qTaskInfo_t tinfo) {
|
||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||
return pTaskInfo->streamInfo.recoverScanFinished;
|
||||
}
|
||||
|
||||
void* qExtractReaderFromStreamScanner(void* scanner) {
|
||||
SStreamScanInfo* pInfo = scanner;
|
||||
|
|
|
@ -768,8 +768,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
tableListGetGroupList(pTaskInfo->pTableInfoList, pInfo->currentGroupId, &pList, &num);
|
||||
ASSERT(pInfo->base.dataReader == NULL);
|
||||
|
||||
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num,
|
||||
pInfo->pResBlock, (STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
|
||||
int32_t code = tsdbReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
|
||||
(STsdbReader**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -986,8 +986,8 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
|||
|
||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||
STsdbReader* pReader = NULL;
|
||||
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock, (STsdbReader**)&pReader,
|
||||
GET_TASKID(pTaskInfo));
|
||||
int32_t code = tsdbReaderOpen(pTableScanInfo->base.readHandle.vnode, &cond, &tblInfo, 1, pBlock,
|
||||
(STsdbReader**)&pReader, GET_TASKID(pTaskInfo));
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
|
@ -995,7 +995,7 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU
|
|||
}
|
||||
|
||||
if (tsdbNextDataBlock(pReader)) {
|
||||
/*SSDataBlock* p = */tsdbRetrieveDataBlock(pReader, NULL);
|
||||
/*SSDataBlock* p = */ tsdbRetrieveDataBlock(pReader, NULL);
|
||||
doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows);
|
||||
pBlock->info.id.groupId = getTableGroupId(pTaskInfo->pTableInfoList, pBlock->info.id.uid);
|
||||
}
|
||||
|
@ -1753,11 +1753,18 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
pTSInfo->scanTimes = 0;
|
||||
pTSInfo->currentGroupId = -1;
|
||||
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN;
|
||||
pTaskInfo->streamInfo.recoverScanFinished = false;
|
||||
}
|
||||
|
||||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
|
||||
if (pInfo->blockRecoverContiCnt > 100) {
|
||||
pInfo->blockRecoverTotCnt += pInfo->blockRecoverContiCnt;
|
||||
pInfo->blockRecoverContiCnt = 0;
|
||||
return NULL;
|
||||
}
|
||||
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
|
||||
if (pBlock != NULL) {
|
||||
pInfo->blockRecoverContiCnt++;
|
||||
calBlockTbName(pInfo, pBlock);
|
||||
if (pInfo->pUpdateInfo) {
|
||||
TSKEY maxTs = updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
|
||||
|
@ -1775,6 +1782,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
pTSInfo->base.cond.startVersion = -1;
|
||||
pTSInfo->base.cond.endVersion = -1;
|
||||
|
||||
pTaskInfo->streamInfo.recoverScanFinished = true;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -2285,7 +2293,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
if (pHandle->initTableReader) {
|
||||
pTSInfo->scanMode = TABLE_SCAN__TABLE_ORDER;
|
||||
pTSInfo->base.dataReader = NULL;
|
||||
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock, &pTSInfo->base.dataReader, NULL);
|
||||
code = tsdbReaderOpen(pHandle->vnode, &pTSInfo->base.cond, pList, num, pTSInfo->pResBlock,
|
||||
&pTSInfo->base.dataReader, NULL);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
destroyTableScanOperatorInfo(pTableScanOp);
|
||||
|
@ -2355,7 +2364,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
|
||||
|
||||
__optr_fn_t nextFn = pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM ? doStreamScan : doQueueScan;
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
@ -2492,7 +2502,8 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
|||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||
|
||||
return pOperator;
|
||||
|
||||
|
@ -2517,7 +2528,8 @@ static SSDataBlock* getTableDataBlockImpl(void* param) {
|
|||
void* p = tableListGetInfo(pTaskInfo->pTableInfoList, readIdx + pInfo->tableStartIndex);
|
||||
SReadHandle* pHandle = &pInfo->base.readHandle;
|
||||
|
||||
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
|
||||
int32_t code =
|
||||
tsdbReaderOpen(pHandle->vnode, pQueryCond, p, 1, pBlock, &pInfo->base.dataReader, GET_TASKID(pTaskInfo));
|
||||
if (code != 0) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
@ -3041,8 +3053,8 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC
|
|||
|
||||
setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, optrDefaultBufFn, NULL);
|
||||
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator,
|
||||
optrDefaultBufFn, NULL);
|
||||
return pOperator;
|
||||
|
||||
_error:
|
||||
|
|
|
@ -116,7 +116,11 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
|||
ASSERT(0);
|
||||
}
|
||||
if (output == NULL) {
|
||||
if (qStreamRecoverScanFinished(exec)) {
|
||||
finished = true;
|
||||
} else {
|
||||
qSetStreamOpOpen(exec);
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue