other:merge 3.0
This commit is contained in:
parent
5b7855eb87
commit
d1097b7c2d
|
@ -340,23 +340,23 @@ typedef struct STableScanInfo {
|
||||||
} STableScanInfo;
|
} STableScanInfo;
|
||||||
|
|
||||||
typedef struct STableMergeScanInfo {
|
typedef struct STableMergeScanInfo {
|
||||||
STableListInfo* tableListInfo;
|
STableListInfo* tableListInfo;
|
||||||
int32_t tableStartIndex;
|
int32_t tableStartIndex;
|
||||||
int32_t tableEndIndex;
|
int32_t tableEndIndex;
|
||||||
bool hasGroupId;
|
bool hasGroupId;
|
||||||
uint64_t groupId;
|
uint64_t groupId;
|
||||||
SArray* dataReaders; // array of tsdbReaderT*
|
SArray* dataReaders; // array of tsdbReaderT*
|
||||||
SArray* queryConds; // array of queryTableDataCond
|
SReadHandle readHandle;
|
||||||
STsdbReader* pReader;
|
int32_t bufPageSize;
|
||||||
SReadHandle readHandle;
|
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||||
int32_t bufPageSize;
|
SArray* pSortInfo;
|
||||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
SSortHandle* pSortHandle;
|
||||||
SArray* pSortInfo;
|
SSDataBlock* pSortInputBlock;
|
||||||
SSortHandle* pSortHandle;
|
int64_t startTs; // sort start time
|
||||||
SSDataBlock* pSortInputBlock;
|
SArray* sortSourceParams;
|
||||||
int64_t startTs; // sort start time
|
SLimitInfo limitInfo;
|
||||||
SArray* sortSourceParams;
|
|
||||||
SLimitInfo limitInfo;
|
|
||||||
SFileBlockLoadRecorder readRecorder;
|
SFileBlockLoadRecorder readRecorder;
|
||||||
int64_t numOfRows;
|
int64_t numOfRows;
|
||||||
SScanInfo scanInfo;
|
SScanInfo scanInfo;
|
||||||
|
|
|
@ -4204,114 +4204,6 @@ int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
|
||||||
SSDataBlock* pBlock, uint32_t* status) {
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
|
||||||
STableMergeScanInfo* pInfo = pOperator->info;
|
|
||||||
|
|
||||||
uint64_t uid = pBlock->info.uid;
|
|
||||||
|
|
||||||
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
|
|
||||||
|
|
||||||
pCost->totalBlocks += 1;
|
|
||||||
pCost->totalRows += pBlock->info.rows;
|
|
||||||
|
|
||||||
*status = pInfo->dataBlockLoadFlag;
|
|
||||||
if (pTableScanInfo->pFilterNode != NULL ||
|
|
||||||
overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
|
|
||||||
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDataBlockInfo* pBlockInfo = &pBlock->info;
|
|
||||||
taosMemoryFreeClear(pBlock->pBlockAgg);
|
|
||||||
|
|
||||||
if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
|
|
||||||
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
|
||||||
pCost->filterOutBlocks += 1;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
|
|
||||||
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
|
||||||
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
|
||||||
pCost->skipBlocks += 1;
|
|
||||||
|
|
||||||
// clear all data in pBlock that are set when handing the previous block
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
|
||||||
SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
|
|
||||||
pcol->pData = NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
|
||||||
pCost->loadBlockStatis += 1;
|
|
||||||
|
|
||||||
bool allColumnsHaveAgg = true;
|
|
||||||
SColumnDataAgg** pColAgg = NULL;
|
|
||||||
STsdbReader* reader = pTableScanInfo->pReader;
|
|
||||||
tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg);
|
|
||||||
|
|
||||||
if (allColumnsHaveAgg == true) {
|
|
||||||
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
|
||||||
|
|
||||||
// todo create this buffer during creating operator
|
|
||||||
if (pBlock->pBlockAgg == NULL) {
|
|
||||||
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
|
||||||
SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i);
|
|
||||||
if (!pColMatchInfo->needOutput) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i];
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
} else { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
|
||||||
*status = FUNC_DATA_REQUIRED_DATA_LOAD;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
|
|
||||||
|
|
||||||
pCost->totalCheckedRows += pBlock->info.rows;
|
|
||||||
pCost->loadBlocks += 1;
|
|
||||||
|
|
||||||
STsdbReader* reader = pTableScanInfo->pReader;
|
|
||||||
SArray* pCols = tsdbRetrieveDataBlock(reader, NULL);
|
|
||||||
if (pCols == NULL) {
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
|
||||||
|
|
||||||
// currently only the tbname pseudo column
|
|
||||||
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
|
||||||
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
|
|
||||||
pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo));
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
T_LONG_JMP(pTaskInfo->env, code);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTableScanInfo->pFilterNode != NULL) {
|
|
||||||
int64_t st = taosGetTimestampMs();
|
|
||||||
doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL);
|
|
||||||
|
|
||||||
double el = (taosGetTimestampUs() - st) / 1000.0;
|
|
||||||
pTableScanInfo->readRecorder.filterTime += el;
|
|
||||||
|
|
||||||
if (pBlock->info.rows == 0) {
|
|
||||||
pCost->filterOutBlocks += 1;
|
|
||||||
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
|
|
||||||
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
|
|
||||||
} else {
|
|
||||||
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// todo refactor
|
// todo refactor
|
||||||
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
||||||
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
|
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
|
||||||
|
|
Loading…
Reference in New Issue