ehance: table scan operator accepts many time ranges
This commit is contained in:
parent
f4b37f906d
commit
860f7270fb
|
@ -334,6 +334,8 @@ typedef struct STableScanInfo {
|
||||||
int32_t dataBlockLoadFlag;
|
int32_t dataBlockLoadFlag;
|
||||||
double sampleRatio; // data block sample ratio, 1 by default
|
double sampleRatio; // data block sample ratio, 1 by default
|
||||||
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time window to check if current data block needs to be loaded.
|
||||||
|
|
||||||
|
int32_t curTWinIdx;
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
typedef struct STagScanInfo {
|
typedef struct STagScanInfo {
|
||||||
|
|
|
@ -362,34 +362,31 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i)
|
|
||||||
{
|
|
||||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, i);
|
|
||||||
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
|
|
||||||
if (isTaskKilled(pOperator->pTaskInfo)) {
|
|
||||||
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
|
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
|
||||||
|
if (isTaskKilled(pOperator->pTaskInfo)) {
|
||||||
uint32_t status = 0;
|
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
|
|
||||||
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
longjmp(pOperator->pTaskInfo->env, code);
|
|
||||||
}
|
|
||||||
|
|
||||||
// current block is filter out according to filter condition, continue load the next block
|
|
||||||
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
|
||||||
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
|
||||||
|
|
||||||
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
|
|
||||||
return pBlock;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
|
||||||
|
|
||||||
|
uint32_t status = 0;
|
||||||
|
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
|
||||||
|
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
longjmp(pOperator->pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
// current block is filter out according to filter condition, continue load the next block
|
||||||
|
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
||||||
|
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
|
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
|
||||||
|
return pBlock;
|
||||||
}
|
}
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -405,9 +402,15 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
// do the ascending order traverse in the first place.
|
// do the ascending order traverse in the first place.
|
||||||
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
|
||||||
if (p != NULL) {
|
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||||
return p;
|
if (p != NULL) {
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
pTableScanInfo->curTWinIdx += 1;
|
||||||
|
if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
|
||||||
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableScanInfo->scanTimes += 1;
|
pTableScanInfo->scanTimes += 1;
|
||||||
|
@ -419,6 +422,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
||||||
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
||||||
qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||||
|
// do prepare for the next round table scan operation
|
||||||
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
||||||
|
pTableScanInfo->curTWinIdx = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -427,30 +433,40 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
if (pTableScanInfo->scanTimes < total) {
|
if (pTableScanInfo->scanTimes < total) {
|
||||||
if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
|
if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
|
||||||
prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
|
prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
|
||||||
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
||||||
|
pTableScanInfo->curTWinIdx = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STimeWindow* pWin = &pTableScanInfo->cond.twindows[0];
|
qDebug("%s start to descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
|
||||||
qDebug("%s start to descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
|
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
||||||
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
||||||
|
qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||||
|
}
|
||||||
while (pTableScanInfo->scanTimes < total) {
|
while (pTableScanInfo->scanTimes < total) {
|
||||||
SSDataBlock* p = doTableScanImpl(pOperator);
|
while (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
|
||||||
if (p != NULL) {
|
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||||
return p;
|
if (p != NULL) {
|
||||||
|
return p;
|
||||||
|
}
|
||||||
|
pTableScanInfo->curTWinIdx += 1;
|
||||||
|
if (pTableScanInfo->curTWinIdx < pTableScanInfo->cond.numOfTWindows) {
|
||||||
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, pTableScanInfo->curTWinIdx);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pTableScanInfo->scanTimes += 1;
|
pTableScanInfo->scanTimes += 1;
|
||||||
|
|
||||||
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
|
if (pTableScanInfo->scanTimes < total) {
|
||||||
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
|
||||||
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
pTableScanInfo->scanFlag = REPEAT_SCAN;
|
||||||
|
|
||||||
qDebug("%s start to repeat descending order scan data blocks due to query func required, qrange:%" PRId64
|
qDebug("%s start to repeat descending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
|
||||||
"-%" PRId64,
|
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
|
||||||
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
|
STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
|
||||||
|
qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||||
// do prepare for the next round table scan operation
|
}
|
||||||
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
|
||||||
|
pTableScanInfo->curTWinIdx = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -530,6 +546,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
pInfo->dataReader = pDataReader;
|
pInfo->dataReader = pDataReader;
|
||||||
pInfo->scanFlag = MAIN_SCAN;
|
pInfo->scanFlag = MAIN_SCAN;
|
||||||
pInfo->pColMatchInfo = pColList;
|
pInfo->pColMatchInfo = pColList;
|
||||||
|
pInfo->curTWinIdx = 0;
|
||||||
|
|
||||||
pOperator->name = "TableScanOperator"; // for debug purpose
|
pOperator->name = "TableScanOperator"; // for debug purpose
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN;
|
||||||
|
|
Loading…
Reference in New Issue