From 7a459a26a851021a894161704d766699883cbacd Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 27 May 2022 17:57:27 +0800 Subject: [PATCH] enhance:modify table scan operator that can accept several scan ranges --- include/common/tcommon.h | 4 +- source/dnode/vnode/inc/vnode.h | 2 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 26 ++++---- source/libs/executor/inc/executorimpl.h | 2 + source/libs/executor/src/executorimpl.c | 38 +++++++++-- source/libs/executor/src/scanoperator.c | 84 +++++++++++++------------ 6 files changed, 98 insertions(+), 58 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 45745403f3..88fa0e728f 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -105,12 +105,14 @@ typedef struct SColumnInfoData { } SColumnInfoData; typedef struct SQueryTableDataCond { - STimeWindow twindow; + //STimeWindow twindow; int32_t order; // desc|asc order to iterate the data block int32_t numOfCols; SColumnInfo *colList; bool loadExternalRows; // load external rows or not int32_t type; // data block load type: + int32_t numOfTWindows; + STimeWindow *twindows; } SQueryTableDataCond; void* blockDataDestroy(SSDataBlock* pBlock); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 2b713ff980..1973bedb0c 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -111,7 +111,7 @@ bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg ***pBlockStatis, bool *allHave); SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); -void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond); +void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond, int32_t tWinIdx); void tsdbCleanupReadHandle(tsdbReaderT queryHandle); // tq diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index c0b97f7536..6abb23e5c1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -317,28 +317,28 @@ static int64_t getEarliestValidTimestamp(STsdb* pTsdb) { return now - (tsTickPerMin[pCfg->precision] * pCfg->keep2) + 1; // needs to add one tick } -static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond) { - pTsdbReadHandle->window = pCond->twindow; +static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) { + pTsdbReadHandle->window = pCond->twindows[tWinIdx]; bool updateTs = false; int64_t startTs = getEarliestValidTimestamp(pTsdbReadHandle->pTsdb); if (ASCENDING_TRAVERSE(pTsdbReadHandle->order)) { if (startTs > pTsdbReadHandle->window.skey) { pTsdbReadHandle->window.skey = startTs; - pCond->twindow.skey = startTs; + pCond->twindows[tWinIdx].skey = startTs; updateTs = true; } } else { if (startTs > pTsdbReadHandle->window.ekey) { pTsdbReadHandle->window.ekey = startTs; - pCond->twindow.ekey = startTs; + pCond->twindows[tWinIdx].ekey = startTs; updateTs = true; } } if (updateTs) { tsdbDebug("%p update the query time window, old:%" PRId64 " - %" PRId64 ", new:%" PRId64 " - %" PRId64 ", %s", - pTsdbReadHandle, pCond->twindow.skey, pCond->twindow.ekey, pTsdbReadHandle->window.skey, + pTsdbReadHandle, pCond->twindows[tWinIdx].skey, pCond->twindows[tWinIdx].ekey, pTsdbReadHandle->window.skey, pTsdbReadHandle->window.ekey, pTsdbReadHandle->idStr); } } @@ -382,7 +382,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* goto _end; } - STsdb* pTsdb = getTsdbByRetentions(pVnode, pReadHandle, pCond->twindow.skey, pVnode->config.tsdbCfg.retentions); + STsdb* pTsdb = getTsdbByRetentions(pVnode, pReadHandle, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions); pReadHandle->order = pCond->order; pReadHandle->pTsdb = pTsdb; @@ -408,7 +408,7 @@ static STsdbReadHandle* tsdbQueryTablesImpl(SVnode* pVnode, SQueryTableDataCond* } assert(pCond != NULL); - setQueryTimewindow(pReadHandle, pCond); + setQueryTimewindow(pReadHandle, pCond, 0); if (pCond->numOfCols > 0) { int32_t rowLen = 0; @@ -520,7 +520,7 @@ tsdbReaderT* tsdbQueryTables(SVnode* pVnode, SQueryTableDataCond* pCond, STableL return (tsdbReaderT)pTsdbReadHandle; } -void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) { +void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, int32_t tWinIdx) { STsdbReadHandle* pTsdbReadHandle = queryHandle; if (emptyQueryTimewindow(pTsdbReadHandle)) { @@ -533,7 +533,7 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) { } pTsdbReadHandle->order = pCond->order; - pTsdbReadHandle->window = pCond->twindow; + setQueryTimewindow(pTsdbReadHandle, pCond, tWinIdx); pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL; pTsdbReadHandle->cur.fid = -1; pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER; @@ -558,11 +558,11 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond) { resetCheckInfo(pTsdbReadHandle); } -void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList) { +void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, STableListInfo* tableList, int32_t tWinIdx) { STsdbReadHandle* pTsdbReadHandle = queryHandle; pTsdbReadHandle->order = pCond->order; - pTsdbReadHandle->window = pCond->twindow; + pTsdbReadHandle->window = pCond->twindows[tWinIdx]; pTsdbReadHandle->type = TSDB_QUERY_TYPE_ALL; pTsdbReadHandle->cur.fid = -1; pTsdbReadHandle->cur.win = TSWINDOW_INITIALIZER; @@ -602,7 +602,7 @@ void tsdbResetQueryHandleForNewTable(tsdbReaderT queryHandle, SQueryTableDataCon tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableListInfo* pList, uint64_t qId, uint64_t taskId) { - pCond->twindow = updateLastrowForEachGroup(pList); + pCond->twindows[0] = updateLastrowForEachGroup(pList); // no qualified table if (taosArrayGetSize(pList->pTableList) == 0) { @@ -620,7 +620,7 @@ tsdbReaderT tsdbQueryLastRow(SVnode* pVnode, SQueryTableDataCond* pCond, STableL return NULL; } - assert(pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey <= pCond->twindow.ekey); + assert(pCond->order == TSDB_ORDER_ASC && pCond->twindows[0].skey <= pCond->twindows[0].ekey); if (pTsdbReadHandle->cachelastrow) { pTsdbReadHandle->type = TSDB_QUERY_TYPE_LAST; } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index dc613ddd86..4c749997c1 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -802,6 +802,8 @@ SResultWindowInfo* getSessionTimeWindow(SArray* pWinInfos, TSKEY ts, int64_t gap int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t rows, int32_t start, int64_t gap, SHashObj* pStDeleted); bool functionNeedToExecute(SqlFunctionCtx* pCtx); + +int32_t compareTimeWindow(const void* p1, const void* p2, const void* param); #ifdef __cplusplus } #endif diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 684c657d17..2208c94e9c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4706,6 +4706,18 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return pOptr; } +int32_t compareTimeWindow(const void* p1, const void* p2, const void* param) { + const SQueryTableDataCond *pCond = param; + const STimeWindow *pWin1 = p1; + const STimeWindow *pWin2 = p2; + if (pCond->order == TSDB_ORDER_ASC) { + return pWin1->skey - pWin2->skey; + } else if (pCond->order == TSDB_ORDER_DESC) { + return pWin2->skey - pWin1->skey; + } + return 0; +} + int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysiNode* pTableScanNode) { pCond->loadExternalRows = false; @@ -4717,16 +4729,34 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi return terrno; } - pCond->twindow = pTableScanNode->scanRange; + //pCond->twindow = pTableScanNode->scanRange; + //TODO: get it from stable scan node + pCond->numOfTWindows = 1; + pCond->twindows = taosMemoryCalloc(pCond->numOfTWindows, sizeof(STimeWindow)); + pCond->twindows[0] = pTableScanNode->scanRange; #if 1 // todo work around a problem, remove it later - if ((pCond->order == TSDB_ORDER_ASC && pCond->twindow.skey > pCond->twindow.ekey) || - (pCond->order == TSDB_ORDER_DESC && pCond->twindow.skey < pCond->twindow.ekey)) { - TSWAP(pCond->twindow.skey, pCond->twindow.ekey); + for (int32_t i = 0; i < pCond->numOfTWindows; ++i) { + if ((pCond->order == TSDB_ORDER_ASC && pCond->twindows[i].skey > pCond->twindows[i].ekey) || + (pCond->order == TSDB_ORDER_DESC && pCond->twindows[i].skey < pCond->twindows[i].ekey)) { + TSWAP(pCond->twindows[i].skey, pCond->twindows[i].ekey); + } } #endif + for (int32_t i = 0; i < pCond->numOfTWindows; ++i) { + if ((pCond->order == TSDB_ORDER_ASC && pCond->twindows[i].skey > pCond->twindows[i].ekey) || + (pCond->order == TSDB_ORDER_DESC && pCond->twindows[i].skey < pCond->twindows[i].ekey)) { + TSWAP(pCond->twindows[i].skey, pCond->twindows[i].ekey); + } + } + taosqsort(pCond->twindows, + pCond->numOfTWindows, + sizeof(STimeWindow), + pCond, + compareTimeWindow); + pCond->type = BLOCK_LOAD_OFFSET_SEQ_ORDER; // pCond->type = pTableScanNode->scanFlag; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 32187c81a7..1316a749f3 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -274,9 +274,17 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction switchCtxOrder(pCtx, numOfOutput); // setupQueryRangeForReverseScan(pTableScanInfo); - STimeWindow* pTWindow = &pTableScanInfo->cond.twindow; - TSWAP(pTWindow->skey, pTWindow->ekey); pTableScanInfo->cond.order = TSDB_ORDER_DESC; + for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) { + STimeWindow* pTWindow = &pTableScanInfo->cond.twindows[i]; + TSWAP(pTWindow->skey, pTWindow->ekey); + } + SQueryTableDataCond *pCond = &pTableScanInfo->cond; + taosqsort(pCond->twindows, + pCond->numOfTWindows, + sizeof(STimeWindow), + pCond, + compareTimeWindow); } void addTagPseudoColumnData(STableScanInfo* pTableScanInfo, SSDataBlock* pBlock) { @@ -354,33 +362,35 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { SSDataBlock* pBlock = pTableScanInfo->pResBlock; int64_t st = taosGetTimestampUs(); + for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) + { + tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, i); + while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { + if (isTaskKilled(pOperator->pTaskInfo)) { + longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } - while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { - if (isTaskKilled(pOperator->pTaskInfo)) { - longjmp(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info); + + uint32_t status = 0; + int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); + // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + longjmp(pOperator->pTaskInfo->env, code); + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + continue; + } + + pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; + pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + + pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; + return pBlock; } - - tsdbRetrieveDataBlockInfo(pTableScanInfo->dataReader, &pBlock->info); - - uint32_t status = 0; - int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); - // int32_t code = loadDataBlockOnDemand(pOperator->pRuntimeEnv, pTableScanInfo, pBlock, &status); - if (code != TSDB_CODE_SUCCESS) { - longjmp(pOperator->pTaskInfo->env, code); - } - - // current block is filter out according to filter condition, continue load the next block - if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { - continue; - } - - pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; - pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; - - pOperator->cost.totalCost = pTableScanInfo->readRecorder.elapsedTime; - return pBlock; } - return NULL; } @@ -405,14 +415,11 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { setTaskStatus(pTaskInfo, TASK_NOT_COMPLETED); pTableScanInfo->scanFlag = REPEAT_SCAN; - - STimeWindow* pWin = &pTableScanInfo->cond.twindow; - qDebug("%s start to repeat ascending order scan data blocks due to query func required, qrange:%" PRId64 - "-%" PRId64, - GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); - - // do prepare for the next round table scan operation - tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); + qDebug("%s start to repeat ascending order scan data blocks due to query func required", GET_TASKID(pTaskInfo)); + for (int32_t i = 0; i < pTableScanInfo->cond.numOfTWindows; ++i) { + STimeWindow* pWin = &pTableScanInfo->cond.twindows[i]; + qDebug("%s\t qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); + } } } @@ -420,10 +427,9 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { if (pTableScanInfo->scanTimes < total) { if (pTableScanInfo->cond.order == TSDB_ORDER_ASC) { prepareForDescendingScan(pTableScanInfo, pTableScanInfo->pCtx, pTableScanInfo->numOfOutput); - tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); } - STimeWindow* pWin = &pTableScanInfo->cond.twindow; + STimeWindow* pWin = &pTableScanInfo->cond.twindows[0]; qDebug("%s start to descending order scan data blocks due to query func required, qrange:%" PRId64 "-%" PRId64, GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); @@ -444,7 +450,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { GET_TASKID(pTaskInfo), pTaskInfo->window.skey, pTaskInfo->window.ekey); // do prepare for the next round table scan operation - tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); + tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); } } } @@ -678,8 +684,8 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo) { binarySearchForKey, NULL, TSDB_ORDER_ASC); } STableScanInfo* pTableScanInfo = pInfo->pOperatorDumy->info; - pTableScanInfo->cond.twindow = win; - tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond); + pTableScanInfo->cond.twindows[0] = win; + tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); pTableScanInfo->scanTimes = 0; return true; } else {