|
|
|
@ -1965,7 +1965,10 @@ _error:
|
|
|
|
|
|
|
|
|
|
typedef struct STableMergeScanInfo {
|
|
|
|
|
STableListInfo* tableListInfo;
|
|
|
|
|
int32_t currentGroupId;
|
|
|
|
|
int32_t tableStartIndex;
|
|
|
|
|
int32_t tableEndIndex;
|
|
|
|
|
bool hasGroupId;
|
|
|
|
|
uint64_t groupId;
|
|
|
|
|
|
|
|
|
|
SArray* dataReaders; // array of tsdbReaderT*
|
|
|
|
|
SReadHandle readHandle;
|
|
|
|
@ -2006,7 +2009,7 @@ typedef struct STableMergeScanInfo {
|
|
|
|
|
int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan
|
|
|
|
|
int32_t dataBlockLoadFlag;
|
|
|
|
|
SInterval interval; // if the upstream is an interval operator, the interval info is also kept here to get the time
|
|
|
|
|
// window to check if current data block needs to be loaded.
|
|
|
|
|
// window to check if current data block needs to be loaded.
|
|
|
|
|
|
|
|
|
|
SSampleExecInfo sample; // sample execution info
|
|
|
|
|
} STableMergeScanInfo;
|
|
|
|
@ -2030,6 +2033,22 @@ int32_t createScanTableListInfo(STableScanPhysiNode* pTableScanNode, SReadHandle
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, STableListInfo* pTableListInfo,
|
|
|
|
|
int32_t tableStartIdx, int32_t tableEndIdx, SArray* arrayReader, uint64_t queryId,
|
|
|
|
|
uint64_t taskId) {
|
|
|
|
|
for (int32_t i = tableStartIdx; i <= tableEndIdx; ++i) {
|
|
|
|
|
SArray* subTableList = taosArrayInit(1, sizeof(STableKeyInfo));
|
|
|
|
|
taosArrayPush(subTableList, taosArrayGet(pTableListInfo->pTableList, i));
|
|
|
|
|
|
|
|
|
|
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, subTableList, queryId, taskId);
|
|
|
|
|
taosArrayPush(arrayReader, &pReader);
|
|
|
|
|
|
|
|
|
|
taosArrayDestroy(subTableList);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// todo refactor
|
|
|
|
|
static int32_t loadDataBlockFromOneTable(SOperatorInfo* pOperator, STableMergeScanInfo* pTableScanInfo,
|
|
|
|
|
int32_t readerIdx, SSDataBlock* pBlock, uint32_t* status) {
|
|
|
|
@ -2216,34 +2235,32 @@ SArray* generateSortByTsInfo(int32_t order) {
|
|
|
|
|
return pList;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t createMultipleDataReaders(SQueryTableDataCond* pQueryCond, SReadHandle* pHandle, SArray* tableList, SArray* arrayReader, uint64_t queryId,
|
|
|
|
|
uint64_t taskId) {
|
|
|
|
|
for (int32_t i = 0; i < taosArrayGetSize(tableList); ++i) {
|
|
|
|
|
SArray* tmp = taosArrayInit(1, sizeof(STableKeyInfo));
|
|
|
|
|
taosArrayPush(tmp, taosArrayGet(tableList, i));
|
|
|
|
|
|
|
|
|
|
tsdbReaderT* pReader = tsdbReaderOpen(pHandle->vnode, pQueryCond, tmp, queryId, taskId);
|
|
|
|
|
taosArrayPush(arrayReader, &pReader);
|
|
|
|
|
|
|
|
|
|
taosArrayDestroy(tmp);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
|
|
|
|
|
STableMergeScanInfo* pInfo = pOperator->info;
|
|
|
|
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
|
|
|
|
|
|
|
|
|
SArray* tableList = taosArrayGetP(pInfo->tableListInfo->pGroupList, pInfo->currentGroupId);
|
|
|
|
|
{
|
|
|
|
|
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
|
|
|
|
|
int32_t i = pInfo->tableStartIndex + 1;
|
|
|
|
|
for (; i < tableListSize; ++i) {
|
|
|
|
|
STableKeyInfo* tableKeyInfo = taosArrayGet(pInfo->tableListInfo->pTableList, i);
|
|
|
|
|
if (tableKeyInfo->groupId != pInfo->groupId) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
pInfo->tableEndIndex = i - 1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableList,
|
|
|
|
|
int32_t tableStartIdx = pInfo->tableStartIndex;
|
|
|
|
|
int32_t tableEndIdx = pInfo->tableEndIndex;
|
|
|
|
|
|
|
|
|
|
STableListInfo* tableListInfo = pInfo->tableListInfo;
|
|
|
|
|
createMultipleDataReaders(&pInfo->cond, &pInfo->readHandle, tableListInfo, tableStartIdx, tableEndIdx,
|
|
|
|
|
pInfo->dataReaders, pInfo->queryId, pInfo->taskId);
|
|
|
|
|
|
|
|
|
|
// todo the total available buffer should be determined by total capacity of buffer of this task.
|
|
|
|
|
// the additional one is reserved for merge result
|
|
|
|
|
int32_t tableLen = taosArrayGetSize(tableList);
|
|
|
|
|
pInfo->sortBufSize = pInfo->bufPageSize * ((tableLen==0?1:tableLen) + 1);
|
|
|
|
|
pInfo->sortBufSize = pInfo->bufPageSize * (tableEndIdx - tableStartIdx + 1 + 1);
|
|
|
|
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
|
|
|
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
|
|
|
|
|
pInfo->pSortInputBlock, pTaskInfo->id.str);
|
|
|
|
@ -2330,43 +2347,38 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pTaskInfo->env, code);
|
|
|
|
|
}
|
|
|
|
|
size_t tableListSize = taosArrayGetSize(pInfo->tableListInfo->pTableList);
|
|
|
|
|
if (!pInfo->hasGroupId) {
|
|
|
|
|
pInfo->hasGroupId = true;
|
|
|
|
|
|
|
|
|
|
if (pInfo->currentGroupId == -1) {
|
|
|
|
|
pInfo->currentGroupId++;
|
|
|
|
|
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) {
|
|
|
|
|
if (tableListSize == 0) {
|
|
|
|
|
doSetOperatorCompleted(pOperator);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
pInfo->tableStartIndex = 0;
|
|
|
|
|
pInfo->groupId = ((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
|
|
|
|
|
startGroupTableMergeScan(pOperator);
|
|
|
|
|
}
|
|
|
|
|
SSDataBlock* pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
|
|
|
|
if (pBlock != NULL) {
|
|
|
|
|
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t));
|
|
|
|
|
if(groupId) pBlock->info.groupId = *groupId;
|
|
|
|
|
|
|
|
|
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
|
|
|
|
return pBlock;
|
|
|
|
|
SSDataBlock* pBlock = NULL;
|
|
|
|
|
while (pInfo->tableStartIndex < tableListSize) {
|
|
|
|
|
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
|
|
|
|
if (pBlock != NULL) {
|
|
|
|
|
pBlock->info.groupId = pInfo->groupId;
|
|
|
|
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
|
|
|
|
return pBlock;
|
|
|
|
|
} else {
|
|
|
|
|
stopGroupTableMergeScan(pOperator);
|
|
|
|
|
if (pInfo->tableEndIndex >= tableListSize - 1) {
|
|
|
|
|
doSetOperatorCompleted(pOperator);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
|
|
|
|
|
pInfo->groupId =
|
|
|
|
|
((STableKeyInfo*)taosArrayGet(pInfo->tableListInfo->pTableList, pInfo->tableStartIndex))->groupId;
|
|
|
|
|
startGroupTableMergeScan(pOperator);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
stopGroupTableMergeScan(pOperator);
|
|
|
|
|
pInfo->currentGroupId++;
|
|
|
|
|
if (pInfo->currentGroupId >= taosArrayGetSize(pInfo->tableListInfo->pGroupList)) {
|
|
|
|
|
doSetOperatorCompleted(pOperator);
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
startGroupTableMergeScan(pOperator);
|
|
|
|
|
|
|
|
|
|
pBlock = getSortedTableMergeScanBlockData(pInfo->pSortHandle, pOperator->resultInfo.capacity, pOperator);
|
|
|
|
|
if (pBlock != NULL) {
|
|
|
|
|
uint64_t* groupId = taosHashGet(pInfo->tableListInfo->map, &(pBlock->info.uid), sizeof(uint64_t));
|
|
|
|
|
if(groupId) pBlock->info.groupId = *groupId;
|
|
|
|
|
|
|
|
|
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
|
|
|
|
return pBlock;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
doSetOperatorCompleted(pOperator);
|
|
|
|
|
|
|
|
|
|
return pBlock;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -2403,6 +2415,12 @@ int32_t getTableMergeScanExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExpla
|
|
|
|
|
return TSDB_CODE_SUCCESS;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t compareTableKeyInfoByGid(const void* p1, const void* p2) {
|
|
|
|
|
const STableKeyInfo* info1 = p1;
|
|
|
|
|
const STableKeyInfo* info2 = p2;
|
|
|
|
|
return info1->groupId - info2->groupId;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, STableListInfo* pTableListInfo,
|
|
|
|
|
SReadHandle* readHandle, SExecTaskInfo* pTaskInfo, uint64_t queryId,
|
|
|
|
|
uint64_t taskId) {
|
|
|
|
@ -2411,6 +2429,9 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|
|
|
|
if (pInfo == NULL || pOperator == NULL) {
|
|
|
|
|
goto _error;
|
|
|
|
|
}
|
|
|
|
|
if (pTableScanNode->pPartitionTags) {
|
|
|
|
|
taosArraySort(pTableListInfo->pTableList, compareTableKeyInfoByGid);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SDataBlockDescNode* pDescNode = pTableScanNode->scan.node.pOutputDataBlockDesc;
|
|
|
|
|
|
|
|
|
@ -2443,7 +2464,6 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
|
|
|
|
|
pInfo->dataReaders = taosArrayInit(64, POINTER_BYTES);
|
|
|
|
|
pInfo->queryId = queryId;
|
|
|
|
|
pInfo->taskId = taskId;
|
|
|
|
|
pInfo->currentGroupId = -1;
|
|
|
|
|
|
|
|
|
|
pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam));
|
|
|
|
|
|
|
|
|
|