enhance:modify table scan operator that can accept several scan ranges

This commit is contained in:
slzhou 2022-05-27 17:57:27 +08:00
parent 2939e17ad1
commit 7a459a26a8
6 changed files with 98 additions and 58 deletions

View File

@ -105,12 +105,14 @@ typedef struct SColumnInfoData {
} SColumnInfoData; } SColumnInfoData;
typedef struct SQueryTableDataCond { typedef struct SQueryTableDataCond {
STimeWindow twindow; //STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols; int32_t numOfCols;
SColumnInfo *colList; SColumnInfo *colList;
bool loadExternalRows; // load external rows or not bool loadExternalRows; // load external rows or not
int32_t type; // data block load type: int32_t type; // data block load type:
int32_t numOfTWindows;
STimeWindow *twindows;
} SQueryTableDataCond; } SQueryTableDataCond;
void* blockDataDestroy(SSDataBlock* pBlock); void* blockDataDestroy(SSDataBlock* pBlock);

View File

@ -111,7 +111,7 @@ bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave); int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond); void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond, int32_t tWinIdx);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle); void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
// tq // tq

View File

@ -317,28 +317,28 @@ static int64_t getEarliestValidTimestamp(STsdb* pTsdb) {
return now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick return now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick
} }
static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond) { static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) {
pTsdbReadHandle->window = pCond->twindow; pTsdbReadHandle->window = pCond->twindows[tWinIdx];
bool updateTs = false; bool updateTs = false;
int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle->pTsdb); int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle->pTsdb);
if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) {
if (startTs > pTsdbReadHandle->window.skey) { if (startTs > pTsdbReadHandle->window.skey) {
pTsdbReadHandle->window.skey = startTs; pTsdbReadHandle->window.skey = startTs;
pCond->twindow.skey = startTs; pCond->twindows[tWinIdx].skey = startTs;
updateTs = true; updateTs = true;
} }
} else { } else {
if (startTs > pTsdbReadHandle->window.ekey) { if (startTs > pTsdbReadHandle->window.ekey) {
pTsdbReadHandle->window.ekey = startTs; pTsdbReadHandle->window.ekey = startTs;
pCond->twindow.ekey = startTs; pCond->twindows[tWinIdx].ekey = startTs;
updateTs = true; updateTs = true;
} }
} }
if (updateTs) { if (updateTs) {
tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s", tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s",
pTsdbReadHandle, pCond->twindow.skey, pCond->twindow.ekey, pTsdbReadHandle->window.skey, pTsdbReadHandle, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey, pTsdbReadHandle->window.skey,
pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr);
} }
} }
@ -382,7 +382,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
goto _end; goto _end;
} }
STsdb* pTsdb = getTsdbByRetentions(pVnode, pReadHandle, pCond->twindow.skey, pVnode->config.tsdbCfg.retentions); STsdb* pTsdb = getTsdbByRetentions(pVnode, pReadHandle, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions);
pReadHandle->order = pCond->order; pReadHandle->order = pCond->order;
pReadHandle->pTsdb = pTsdb; pReadHandle->pTsdb = pTsdb;
@ -408,7 +408,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond*
} }
assert(pCond != NULL); assert(pCond != NULL);
setQueryTimewindow(pReadHandle, pCond); setQueryTimewindow(pReadHandle, pCond, 0);
if (pCond->numOfCols > 0) { if (pCond->numOfCols > 0) {
int32_t rowLen = 0; int32_t rowLen = 0;
@ -520,7 +520,7 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableL
return (tsdbReaderT)pTsdbReadHandle; return (tsdbReaderT)pTsdbReadHandle;
} }
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) { void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) {
STsdbReadHandle* pTsdbReadHandle = queryHandle; STsdbReadHandle* pTsdbReadHandle = queryHandle;
if (emptyQueryTimewindow(pTsdbReadHandle)) { if (emptyQueryTimewindow(pTsdbReadHandle)) {
@ -533,7 +533,7 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) {
} }
pTsdbReadHandle->order = pCond->order; pTsdbReadHandle->order = pCond->order;
pTsdbReadHandle->window = pCond->twindow; setQueryTimewindow(pTsdbReadHandle, pCond, tWinIdx);
pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL; pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
pTsdbReadHandle->cur.fid = -1; pTsdbReadHandle->cur.fid = -1;
pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER; pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
@ -558,11 +558,11 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) {
resetCheckInfo(pTsdbReadHandle); resetCheckInfo(pTsdbReadHandle);
} }
void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList) { void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList, int32_t tWinIdx) {
STsdbReadHandle* pTsdbReadHandle = queryHandle; STsdbReadHandle* pTsdbReadHandle = queryHandle;
pTsdbReadHandle->order = pCond->order; pTsdbReadHandle->order = pCond->order;
pTsdbReadHandle->window = pCond->twindow; pTsdbReadHandle->window = pCond->twindows[tWinIdx];
pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL; pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL;
pTsdbReadHandle->cur.fid = -1; pTsdbReadHandle->cur.fid = -1;
pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER; pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER;
@ -602,7 +602,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCon
tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId, tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId,
uint64_t taskId) { uint64_t taskId) {
pCond->twindow = updateLastrowForEachGroup(pList); pCond->twindows[0] = updateLastrowForEachGroup(pList);
// no qualified table // no qualified table
if (taosArrayGetSize(pList->pTableList) == 0) { if (taosArrayGetSize(pList->pTableList) == 0) {
@ -620,7 +620,7 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL
return NULL; return NULL;
} }
assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey); assert(pCond->order == TSDB_ORDER_ASC && pCond->twindows[0].skey <= pCond->twindows[0].ekey);
if (pTsdbReadHandle->cachelastrow) { if (pTsdbReadHandle->cachelastrow) {
pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST; pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST;
} }

View File

@ -802,6 +802,8 @@ SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap
int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows,
int32_t start, int64_t gap, SHashObj* pStDeleted); int32_t start, int64_t gap, SHashObj* pStDeleted);
bool functionNeedToExecute(SqlFunctionCtx* pCtx); bool functionNeedToExecute(SqlFunctionCtx* pCtx);
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -4706,6 +4706,18 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
return pOptr; return pOptr;
} }
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param) {
const SQueryTableDataCond *pCond = param;
const STimeWindow *pWin1 = p1;
const STimeWindow *pWin2 = p2;
if (pCond->order == TSDB_ORDER_ASC) {
return pWin1->skey - pWin2->skey;
} else if (pCond->order == TSDB_ORDER_DESC) {
return pWin2->skey - pWin1->skey;
}
return 0;
}
int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) { int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) {
pCond->loadExternalRows = false; pCond->loadExternalRows = false;
@ -4717,16 +4729,34 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
return terrno; return terrno;
} }
pCond->twindow = pTableScanNode->scanRange; //pCond->twindow = pTableScanNode->scanRange;
//TODO: get it from stable scan node
pCond->numOfTWindows = 1;
pCond->twindows = taosMemoryCalloc(pCond->numOfTWindows, sizeof(STimeWindow));
pCond->twindows[0] = pTableScanNode->scanRange;
#if 1 #if 1
// todo work around a problem, remove it later // todo work around a problem, remove it later
if ((pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey > pCond->twindow.ekey) || for (int32_t i = 0; i < pCond->numOfTWindows; ++i) {
(pCond->order == TSDB_ORDER_DESC && pCond->twindow.skey < pCond->twindow.ekey)) { if ((pCond->order == TSDB_ORDER_ASC && pCond->twindows[i].skey > pCond->twindows[i].ekey) ||
TSWAP(pCond->twindow.skey, pCond->twindow.ekey); (pCond->order == TSDB_ORDER_DESC && pCond->twindows[i].skey < pCond->twindows[i].ekey)) {
TSWAP(pCond->twindows[i].skey, pCond->twindows[i].ekey);
}
} }
#endif #endif
for (int32_t i = 0; i < pCond->numOfTWindows; ++i) {
if ((pCond->order == TSDB_ORDER_ASC && pCond->twindows[i].skey > pCond->twindows[i].ekey) ||
(pCond->order == TSDB_ORDER_DESC && pCond->twindows[i].skey < pCond->twindows[i].ekey)) {
TSWAP(pCond->twindows[i].skey, pCond->twindows[i].ekey);
}
}
taosqsort(pCond->twindows,
pCond->numOfTWindows,
sizeof(STimeWindow),
pCond,
compareTimeWindow);
pCond->type = BLOCK_LOAD_OFFSET_SEQ_ORDER; pCond->type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
// pCond->type = pTableScanNode->scanFlag; // pCond->type = pTableScanNode->scanFlag;

View File

@ -274,9 +274,17 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
switchCtxOrder(pCtx, numOfOutput); switchCtxOrder(pCtx, numOfOutput);
// setupQueryRangeForReverseScan(pTableScanInfo); // setupQueryRangeForReverseScan(pTableScanInfo);
STimeWindow* pTWindow = &pTableScanInfo->cond.twindow;
TSWAP(pTWindow->skey, pTWindow->ekey);
pTableScanInfo->cond.order = TSDB_ORDER_DESC; pTableScanInfo->cond.order = TSDB_ORDER_DESC;
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
STimeWindow* pTWindow = &pTableScanInfo->cond.twindows[i];
TSWAP(pTWindow->skey, pTWindow->ekey);
}
SQueryTableDataCond *pCond = &pTableScanInfo->cond;
taosqsort(pCond->twindows,
pCond->numOfTWindows,
sizeof(STimeWindow),
pCond,
compareTimeWindow);
} }
void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) { void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) {
@ -354,33 +362,35 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
SSDataBlock* pBlock = pTableScanInfo->pResBlock; SSDataBlock* pBlock = pTableScanInfo->pResBlock;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i)
{
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, i);
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
if (isTaskKilled(pOperator->pTaskInfo)) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
}
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
if (isTaskKilled(pOperator->pTaskInfo)) {
longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pTaskInfo->env, code);
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
continue;
}
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
return pBlock;
} }
tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info);
uint32_t status = 0;
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
// int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status);
if (code != TSDB_CODE_SUCCESS) {
longjmp(pOperator->pTaskInfo->env, code);
}
// current block is filter out according to filter condition, continue load the next block
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
continue;
}
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime;
return pBlock;
} }
return NULL; return NULL;
} }
@ -405,14 +415,11 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED);
pTableScanInfo->scanFlag = REPEAT_SCAN; pTableScanInfo->scanFlag = REPEAT_SCAN;
qDebug("%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo));
STimeWindow* pWin = &pTableScanInfo->cond.twindow; for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) {
qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64 STimeWindow* pWin = &pTableScanInfo->cond.twindows[i];
"-%" PRId64, qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); }
// do prepare for the next round table scan operation
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
} }
} }
@ -420,10 +427,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if (pTableScanInfo->scanTimes < total) { if (pTableScanInfo->scanTimes < total) {
if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) { if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) {
prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput);
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond);
} }
STimeWindow* pWin = &pTableScanInfo->cond.twindow; STimeWindow* pWin = &pTableScanInfo->cond.twindows[0];
qDebug("%s start to descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, qDebug("%s start to descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64,
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
@ -444,7 +450,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey);
// do prepare for the next round table scan operation // do prepare for the next round table scan operation
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
} }
} }
} }
@ -678,8 +684,8 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) {
binarySearchForKey, NULL, TSDB_ORDER_ASC); binarySearchForKey, NULL, TSDB_ORDER_ASC);
} }
STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info; STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info;
pTableScanInfo->cond.twindow = win; pTableScanInfo->cond.twindows[0] = win;
tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0);
pTableScanInfo->scanTimes = 0; pTableScanInfo->scanTimes = 0;
return true; return true;
} else { } else {