diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 19a0fbd629..af21ba83b7 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -701,6 +701,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma #endif for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) { SSDataBlock *output = taosArrayGetP(pResList, i); + smaError("uid:%ld, groupid:%ld", output->info.uid, output->info.groupId); + STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]); SSubmitReq *pReq = NULL; @@ -713,13 +715,13 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) { taosMemoryFreeClear(pReq); - smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s", - SMA_VID(pSma), suid, pItem->level, terrstr()); + smaError("vgId:%d, process submit req for rsma suid:%"PRIu64", uid:%" PRIu64 " level %" PRIi8 " failed since %s", + SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr()); goto _err; } - smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " ver %" PRIi64 " len %" PRIu32, - SMA_VID(pSma), suid, pItem->level, output->info.version, htonl(pReq->header.contLen)); + smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%"PRIu64", level %" PRIi8 " ver %" PRIi64 " len %" PRIu32, + SMA_VID(pSma), suid, output->info.groupId, pItem->level, output->info.version, htonl(pReq->header.contLen)); taosMemoryFreeClear(pReq); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 975f8bb162..fc9524e828 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -371,6 +371,7 @@ typedef struct STableMergeScanInfo { SQueryTableDataCond cond; int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan int32_t dataBlockLoadFlag; + STsdbReader* pReader; // if the upstream is an interval operator, the interval info is also kept here to get the time // window to check if current data block needs to be loaded. diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index e8a67cdc35..fec9217bdb 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -621,12 +621,14 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { binfo.capacity = binfo.rows; blockDataEnsureCapacity(pBlock, binfo.rows); pBlock->info = binfo; - ASSERT(binfo.uid != 0); - uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); - if (groupId) { - pBlock->info.groupId = *groupId; - } + ASSERT(binfo.uid != 0); + pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid); + ASSERT(pBlock->info.groupId != 0); +// uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); +// if (groupId) { +// pBlock->info.groupId = *groupId; +// } uint32_t status = 0; int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status); @@ -765,6 +767,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { SSDataBlock* result = doTableScanGroup(pOperator); if (result != NULL) { + ASSERT(result->info.uid != 0); return result; } @@ -4189,6 +4192,130 @@ int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* return TSDB_CODE_SUCCESS; } +int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, + STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx, + STsdbReader** ppReader, const char* idstr) { + STsdbReader* pReader = NULL; + void* pStart = taosArrayGet(pTableListInfo->pTableList, tableStartIdx); + int32_t num = tableEndIdx - tableStartIdx + 1; + + int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, num, &pReader, idstr); + if (code != 0) { + return code; + } + + *ppReader = pReader; + return TSDB_CODE_SUCCESS; +} + +static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, + SSDataBlock* pBlock, uint32_t* status) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + STableMergeScanInfo* pInfo = pOperator->info; + + uint64_t uid = pBlock->info.uid; + + SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder; + + pCost->totalBlocks += 1; + pCost->totalRows += pBlock->info.rows; + + *status = pInfo->dataBlockLoadFlag; + if (pTableScanInfo->pFilterNode != NULL || + overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) { + (*status) = FUNC_DATA_REQUIRED_DATA_LOAD; + } + + SDataBlockInfo* pBlockInfo = &pBlock->info; + taosMemoryFreeClear(pBlock->pBlockAgg); + + if (*status == FUNC_DATA_REQUIRED_FILTEROUT) { + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->filterOutBlocks += 1; + return TSDB_CODE_SUCCESS; + } else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) { + qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo), + pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows); + pCost->skipBlocks += 1; + + // clear all data in pBlock that are set when handing the previous block + for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) { + SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i); + pcol->pData = NULL; + } + + return TSDB_CODE_SUCCESS; + } else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) { + pCost->loadBlockStatis += 1; + + bool allColumnsHaveAgg = true; + SColumnDataAgg** pColAgg = NULL; + STsdbReader* reader = pTableScanInfo->pReader; + tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg); + + if (allColumnsHaveAgg == true) { + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + + // todo create this buffer during creating operator + if (pBlock->pBlockAgg == NULL) { + pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES); + } + + for (int32_t i = 0; i < numOfCols; ++i) { + SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i); + if (!pColMatchInfo->needOutput) { + continue; + } + pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i]; + } + + return TSDB_CODE_SUCCESS; + } else { // failed to load the block sma data, data block statistics does not exist, load data block instead + *status = FUNC_DATA_REQUIRED_DATA_LOAD; + } + } + + ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD); + + pCost->totalCheckedRows += pBlock->info.rows; + pCost->loadBlocks += 1; + + STsdbReader* reader = pTableScanInfo->pReader; + SArray* pCols = tsdbRetrieveDataBlock(reader, NULL); + if (pCols == NULL) { + return terrno; + } + + relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true); + + // currently only the tbname pseudo column + if (pTableScanInfo->pseudoSup.numOfExprs > 0) { + int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo, + pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo)); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + } + + if (pTableScanInfo->pFilterNode != NULL) { + int64_t st = taosGetTimestampMs(); + doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL); + + double el = (taosGetTimestampUs() - st) / 1000.0; + pTableScanInfo->readRecorder.filterTime += el; + + if (pBlock->info.rows == 0) { + pCost->filterOutBlocks += 1; + qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms", + GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el); + } else { + qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el); + } + } + return TSDB_CODE_SUCCESS; +} + // todo refactor static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo, int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) { @@ -4314,6 +4441,138 @@ typedef struct STableMergeScanSortSourceParam { SSDataBlock* inputBlock; } STableMergeScanSortSourceParam; +static SSDataBlock* getTableDataBlockTemp(void* param) { + STableMergeScanSortSourceParam* source = param; + SOperatorInfo* pOperator = source->pOperator; + STableMergeScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + int32_t readIdx = source->readerIdx; + SSDataBlock* pBlock = source->inputBlock; + STableMergeScanInfo* pTableScanInfo = pOperator->info; + + SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx); + + blockDataCleanup(pBlock); + + int64_t st = taosGetTimestampUs(); + + void* pStart = taosArrayGet(pInfo->tableListInfo->pTableList, readIdx + pInfo->tableStartIndex); + + SReadHandle* pHandle = &pInfo->readHandle; + tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, 1, &pInfo->pReader, GET_TASKID(pTaskInfo)); + + STsdbReader* reader = pInfo->pReader; + while (tsdbNextDataBlock(reader)) { + if (isTaskKilled(pOperator->pTaskInfo)) { + T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + // process this data block based on the probabilities + bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); + if (!processThisBlock) { + continue; + } + + blockDataCleanup(pBlock); + SDataBlockInfo binfo = pBlock->info; + tsdbRetrieveDataBlockInfo(reader, &binfo); + + blockDataEnsureCapacity(pBlock, binfo.rows); + pBlock->info.type = binfo.type; + pBlock->info.uid = binfo.uid; + pBlock->info.window = binfo.window; + pBlock->info.rows = binfo.rows; + + if (tsdbIsAscendingOrder(pInfo->pReader)) { + pQueryCond->twindows.skey = pBlock->info.window.ekey + 1; + } else { + pQueryCond->twindows.ekey = pBlock->info.window.skey - 1; + } + + uint32_t status = 0; + int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + continue; + } + + uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + if (groupId) { + pBlock->info.groupId = *groupId; + } + + pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows; + pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + + tsdbReaderClose(pInfo->pReader); + pInfo->pReader = NULL; + return pBlock; + } + tsdbReaderClose(pInfo->pReader); + pInfo->pReader = NULL; + return NULL; +} +static SSDataBlock* getTableDataBlock2(void* param) { + STableMergeScanSortSourceParam* source = param; + SOperatorInfo* pOperator = source->pOperator; + int64_t uid = source->uid; + SSDataBlock* pBlock = source->inputBlock; + STableMergeScanInfo* pTableScanInfo = pOperator->info; + + int64_t st = taosGetTimestampUs(); + + blockDataCleanup(pBlock); + + STsdbReader* reader = pTableScanInfo->pReader; + while (tsdbTableNextDataBlock(reader, uid)) { + if (isTaskKilled(pOperator->pTaskInfo)) { + T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED); + } + + // process this data block based on the probabilities + bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample); + if (!processThisBlock) { + continue; + } + + blockDataCleanup(pBlock); + SDataBlockInfo binfo = pBlock->info; + tsdbRetrieveDataBlockInfo(reader, &binfo); + + blockDataEnsureCapacity(pBlock, binfo.rows); + pBlock->info.type = binfo.type; + pBlock->info.uid = binfo.uid; + pBlock->info.window = binfo.window; + pBlock->info.rows = binfo.rows; + + uint32_t status = 0; + int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pOperator->pTaskInfo->env, code); + } + + // current block is filter out according to filter condition, continue load the next block + if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) { + continue; + } + + uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t)); + if (groupId) { + pBlock->info.groupId = *groupId; + } + + pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows; + pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0; + + return pBlock; + } + return NULL; +} + static SSDataBlock* getTableDataBlock(void* param) { STableMergeScanSortSourceParam* source = param; SOperatorInfo* pOperator = source->pOperator;