other:merge 3.0
This commit is contained in:
parent
8c3b4db944
commit
9ccf7addb3
|
@ -701,6 +701,8 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
||||||
#endif
|
#endif
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pResList); ++i) {
|
||||||
SSDataBlock *output = taosArrayGetP(pResList, i);
|
SSDataBlock *output = taosArrayGetP(pResList, i);
|
||||||
|
smaError("uid:%ld, groupid:%ld", output->info.uid, output->info.groupId);
|
||||||
|
|
||||||
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
|
STsdb *sinkTsdb = (pItem->level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb[0] : pSma->pRSmaTsdb[1]);
|
||||||
SSubmitReq *pReq = NULL;
|
SSubmitReq *pReq = NULL;
|
||||||
|
|
||||||
|
@ -713,13 +715,13 @@ static int32_t tdRSmaExecAndSubmitResult(SSma *pSma, qTaskInfo_t taskInfo, SRSma
|
||||||
|
|
||||||
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
|
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
|
smaError("vgId:%d, process submit req for rsma suid:%"PRIu64", uid:%" PRIu64 " level %" PRIi8 " failed since %s",
|
||||||
SMA_VID(pSma), suid, pItem->level, terrstr());
|
SMA_VID(pSma), suid, output->info.groupId, pItem->level, terrstr());
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
smaDebug("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " ver %" PRIi64 " len %" PRIu32,
|
smaDebug("vgId:%d, process submit req for rsma suid:%" PRIu64 ",uid:%"PRIu64", level %" PRIi8 " ver %" PRIi64 " len %" PRIu32,
|
||||||
SMA_VID(pSma), suid, pItem->level, output->info.version, htonl(pReq->header.contLen));
|
SMA_VID(pSma), suid, output->info.groupId, pItem->level, output->info.version, htonl(pReq->header.contLen));
|
||||||
|
|
||||||
taosMemoryFreeClear(pReq);
|
taosMemoryFreeClear(pReq);
|
||||||
}
|
}
|
||||||
|
|
|
@ -371,6 +371,7 @@ typedef struct STableMergeScanInfo {
|
||||||
SQueryTableDataCond cond;
|
SQueryTableDataCond cond;
|
||||||
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
||||||
int32_t dataBlockLoadFlag;
|
int32_t dataBlockLoadFlag;
|
||||||
|
STsdbReader* pReader;
|
||||||
|
|
||||||
// if the upstream is an interval operator, the interval info is also kept here to get the time
|
// if the upstream is an interval operator, the interval info is also kept here to get the time
|
||||||
// window to check if current data block needs to be loaded.
|
// window to check if current data block needs to be loaded.
|
||||||
|
|
|
@ -621,12 +621,14 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
binfo.capacity = binfo.rows;
|
binfo.capacity = binfo.rows;
|
||||||
blockDataEnsureCapacity(pBlock, binfo.rows);
|
blockDataEnsureCapacity(pBlock, binfo.rows);
|
||||||
pBlock->info = binfo;
|
pBlock->info = binfo;
|
||||||
ASSERT(binfo.uid != 0);
|
|
||||||
|
|
||||||
uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
ASSERT(binfo.uid != 0);
|
||||||
if (groupId) {
|
pBlock->info.groupId = getTableGroupId(&pTaskInfo->tableqinfoList, pBlock->info.uid);
|
||||||
pBlock->info.groupId = *groupId;
|
ASSERT(pBlock->info.groupId != 0);
|
||||||
}
|
// uint64_t* groupId = taosHashGet(pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||||
|
// if (groupId) {
|
||||||
|
// pBlock->info.groupId = *groupId;
|
||||||
|
// }
|
||||||
|
|
||||||
uint32_t status = 0;
|
uint32_t status = 0;
|
||||||
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
|
int32_t code = loadDataBlock(pOperator, pTableScanInfo, pBlock, &status);
|
||||||
|
@ -765,6 +767,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
SSDataBlock* result = doTableScanGroup(pOperator);
|
SSDataBlock* result = doTableScanGroup(pOperator);
|
||||||
if (result != NULL) {
|
if (result != NULL) {
|
||||||
|
ASSERT(result->info.uid != 0);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4189,6 +4192,130 @@ int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle*
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t createMultipleDataReaders2(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle,
|
||||||
|
STableListInfo* pTableListInfo, int32_t tableStartIdx, int32_t tableEndIdx,
|
||||||
|
STsdbReader** ppReader, const char* idstr) {
|
||||||
|
STsdbReader* pReader = NULL;
|
||||||
|
void* pStart = taosArrayGet(pTableListInfo->pTableList, tableStartIdx);
|
||||||
|
int32_t num = tableEndIdx - tableStartIdx + 1;
|
||||||
|
|
||||||
|
int32_t code = tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, num, &pReader, idstr);
|
||||||
|
if (code != 0) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppReader = pReader;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t loadDataBlockFromOneTable2(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
||||||
|
SSDataBlock* pBlock, uint32_t* status) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
uint64_t uid = pBlock->info.uid;
|
||||||
|
|
||||||
|
SFileBlockLoadRecorder* pCost = &pTableScanInfo->readRecorder;
|
||||||
|
|
||||||
|
pCost->totalBlocks += 1;
|
||||||
|
pCost->totalRows += pBlock->info.rows;
|
||||||
|
|
||||||
|
*status = pInfo->dataBlockLoadFlag;
|
||||||
|
if (pTableScanInfo->pFilterNode != NULL ||
|
||||||
|
overlapWithTimeWindow(&pTableScanInfo->interval, &pBlock->info, pTableScanInfo->cond.order)) {
|
||||||
|
(*status) = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDataBlockInfo* pBlockInfo = &pBlock->info;
|
||||||
|
taosMemoryFreeClear(pBlock->pBlockAgg);
|
||||||
|
|
||||||
|
if (*status == FUNC_DATA_REQUIRED_FILTEROUT) {
|
||||||
|
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
|
pCost->filterOutBlocks += 1;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else if (*status == FUNC_DATA_REQUIRED_NOT_LOAD) {
|
||||||
|
qDebug("%s data block skipped, brange:%" PRId64 "-%" PRId64 ", rows:%d", GET_TASKID(pTaskInfo),
|
||||||
|
pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows);
|
||||||
|
pCost->skipBlocks += 1;
|
||||||
|
|
||||||
|
// clear all data in pBlock that are set when handing the previous block
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); ++i) {
|
||||||
|
SColumnInfoData* pcol = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
pcol->pData = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else if (*status == FUNC_DATA_REQUIRED_STATIS_LOAD) {
|
||||||
|
pCost->loadBlockStatis += 1;
|
||||||
|
|
||||||
|
bool allColumnsHaveAgg = true;
|
||||||
|
SColumnDataAgg** pColAgg = NULL;
|
||||||
|
STsdbReader* reader = pTableScanInfo->pReader;
|
||||||
|
tsdbRetrieveDatablockSMA(reader, &pColAgg, &allColumnsHaveAgg);
|
||||||
|
|
||||||
|
if (allColumnsHaveAgg == true) {
|
||||||
|
int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
|
||||||
|
// todo create this buffer during creating operator
|
||||||
|
if (pBlock->pBlockAgg == NULL) {
|
||||||
|
pBlock->pBlockAgg = taosMemoryCalloc(numOfCols, POINTER_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColMatchItem* pColMatchInfo = taosArrayGet(pTableScanInfo->matchInfo.pList, i);
|
||||||
|
if (!pColMatchInfo->needOutput) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
pBlock->pBlockAgg[pColMatchInfo->dstSlotId] = pColAgg[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else { // failed to load the block sma data, data block statistics does not exist, load data block instead
|
||||||
|
*status = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(*status == FUNC_DATA_REQUIRED_DATA_LOAD);
|
||||||
|
|
||||||
|
pCost->totalCheckedRows += pBlock->info.rows;
|
||||||
|
pCost->loadBlocks += 1;
|
||||||
|
|
||||||
|
STsdbReader* reader = pTableScanInfo->pReader;
|
||||||
|
SArray* pCols = tsdbRetrieveDataBlock(reader, NULL);
|
||||||
|
if (pCols == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
relocateColumnData(pBlock, pTableScanInfo->matchInfo.pList, pCols, true);
|
||||||
|
|
||||||
|
// currently only the tbname pseudo column
|
||||||
|
if (pTableScanInfo->pseudoSup.numOfExprs > 0) {
|
||||||
|
int32_t code = addTagPseudoColumnData(&pTableScanInfo->readHandle, pTableScanInfo->pseudoSup.pExprInfo,
|
||||||
|
pTableScanInfo->pseudoSup.numOfExprs, pBlock, GET_TASKID(pTaskInfo));
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTableScanInfo->pFilterNode != NULL) {
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
doFilter(pTableScanInfo->pFilterNode, pBlock, &pTableScanInfo->matchInfo, NULL);
|
||||||
|
|
||||||
|
double el = (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
pTableScanInfo->readRecorder.filterTime += el;
|
||||||
|
|
||||||
|
if (pBlock->info.rows == 0) {
|
||||||
|
pCost->filterOutBlocks += 1;
|
||||||
|
qDebug("%s data block filter out, brange:%" PRId64 "-%" PRId64 ", rows:%d, elapsed time:%.2f ms",
|
||||||
|
GET_TASKID(pTaskInfo), pBlockInfo->window.skey, pBlockInfo->window.ekey, pBlockInfo->rows, el);
|
||||||
|
} else {
|
||||||
|
qDebug("%s data block filter applied, elapsed time:%.2f ms", GET_TASKID(pTaskInfo), el);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
// todo refactor
|
// todo refactor
|
||||||
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
||||||
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
|
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
|
||||||
|
@ -4314,6 +4441,138 @@ typedef struct STableMergeScanSortSourceParam {
|
||||||
SSDataBlock* inputBlock;
|
SSDataBlock* inputBlock;
|
||||||
} STableMergeScanSortSourceParam;
|
} STableMergeScanSortSourceParam;
|
||||||
|
|
||||||
|
static SSDataBlock* getTableDataBlockTemp(void* param) {
|
||||||
|
STableMergeScanSortSourceParam* source = param;
|
||||||
|
SOperatorInfo* pOperator = source->pOperator;
|
||||||
|
STableMergeScanInfo* pInfo = pOperator->info;
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
int32_t readIdx = source->readerIdx;
|
||||||
|
SSDataBlock* pBlock = source->inputBlock;
|
||||||
|
STableMergeScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
|
||||||
|
SQueryTableDataCond* pQueryCond = taosArrayGet(pTableScanInfo->queryConds, readIdx);
|
||||||
|
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
void* pStart = taosArrayGet(pInfo->tableListInfo->pTableList, readIdx + pInfo->tableStartIndex);
|
||||||
|
|
||||||
|
SReadHandle* pHandle = &pInfo->readHandle;
|
||||||
|
tsdbReaderOpen(pHandle->vnode, pQueryCond, pStart, 1, &pInfo->pReader, GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
|
STsdbReader* reader = pInfo->pReader;
|
||||||
|
while (tsdbNextDataBlock(reader)) {
|
||||||
|
if (isTaskKilled(pOperator->pTaskInfo)) {
|
||||||
|
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
|
}
|
||||||
|
|
||||||
|
// process this data block based on the probabilities
|
||||||
|
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
|
||||||
|
if (!processThisBlock) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
SDataBlockInfo binfo = pBlock->info;
|
||||||
|
tsdbRetrieveDataBlockInfo(reader, &binfo);
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(pBlock, binfo.rows);
|
||||||
|
pBlock->info.type = binfo.type;
|
||||||
|
pBlock->info.uid = binfo.uid;
|
||||||
|
pBlock->info.window = binfo.window;
|
||||||
|
pBlock->info.rows = binfo.rows;
|
||||||
|
|
||||||
|
if (tsdbIsAscendingOrder(pInfo->pReader)) {
|
||||||
|
pQueryCond->twindows.skey = pBlock->info.window.ekey + 1;
|
||||||
|
} else {
|
||||||
|
pQueryCond->twindows.ekey = pBlock->info.window.skey - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint32_t status = 0;
|
||||||
|
int32_t code = loadDataBlockFromOneTable(pOperator, pTableScanInfo, readIdx, pBlock, &status);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
// current block is filter out according to filter condition, continue load the next block
|
||||||
|
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||||
|
if (groupId) {
|
||||||
|
pBlock->info.groupId = *groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOperator->resultInfo.totalRows += pBlock->info.rows; // pTableScanInfo->readRecorder.totalRows;
|
||||||
|
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
|
tsdbReaderClose(pInfo->pReader);
|
||||||
|
pInfo->pReader = NULL;
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
tsdbReaderClose(pInfo->pReader);
|
||||||
|
pInfo->pReader = NULL;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
static SSDataBlock* getTableDataBlock2(void* param) {
|
||||||
|
STableMergeScanSortSourceParam* source = param;
|
||||||
|
SOperatorInfo* pOperator = source->pOperator;
|
||||||
|
int64_t uid = source->uid;
|
||||||
|
SSDataBlock* pBlock = source->inputBlock;
|
||||||
|
STableMergeScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
|
||||||
|
STsdbReader* reader = pTableScanInfo->pReader;
|
||||||
|
while (tsdbTableNextDataBlock(reader, uid)) {
|
||||||
|
if (isTaskKilled(pOperator->pTaskInfo)) {
|
||||||
|
T_LONG_JMP(pOperator->pTaskInfo->env, TSDB_CODE_TSC_QUERY_CANCELLED);
|
||||||
|
}
|
||||||
|
|
||||||
|
// process this data block based on the probabilities
|
||||||
|
bool processThisBlock = processBlockWithProbability(&pTableScanInfo->sample);
|
||||||
|
if (!processThisBlock) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
SDataBlockInfo binfo = pBlock->info;
|
||||||
|
tsdbRetrieveDataBlockInfo(reader, &binfo);
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(pBlock, binfo.rows);
|
||||||
|
pBlock->info.type = binfo.type;
|
||||||
|
pBlock->info.uid = binfo.uid;
|
||||||
|
pBlock->info.window = binfo.window;
|
||||||
|
pBlock->info.rows = binfo.rows;
|
||||||
|
|
||||||
|
uint32_t status = 0;
|
||||||
|
int32_t code = loadDataBlockFromOneTable2(pOperator, pTableScanInfo, pBlock, &status);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
T_LONG_JMP(pOperator->pTaskInfo->env, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
// current block is filter out according to filter condition, continue load the next block
|
||||||
|
if (status == FUNC_DATA_REQUIRED_FILTEROUT || pBlock->info.rows == 0) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t* groupId = taosHashGet(pOperator->pTaskInfo->tableqinfoList.map, &pBlock->info.uid, sizeof(int64_t));
|
||||||
|
if (groupId) {
|
||||||
|
pBlock->info.groupId = *groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOperator->resultInfo.totalRows = pTableScanInfo->readRecorder.totalRows;
|
||||||
|
pTableScanInfo->readRecorder.elapsedTime += (taosGetTimestampUs() - st) / 1000.0;
|
||||||
|
|
||||||
|
return pBlock;
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* getTableDataBlock(void* param) {
|
static SSDataBlock* getTableDataBlock(void* param) {
|
||||||
STableMergeScanSortSourceParam* source = param;
|
STableMergeScanSortSourceParam* source = param;
|
||||||
SOperatorInfo* pOperator = source->pOperator;
|
SOperatorInfo* pOperator = source->pOperator;
|
||||||
|
|
Loading…
Reference in New Issue