fix: group table scan issue
This commit is contained in:
parent
1364a2416b
commit
8872e017af
|
@ -823,6 +823,15 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
||||
setOperatorCompleted(pOperator);
|
||||
if (pOperator->dynamicTask) {
|
||||
taosArrayClear(pInfo->base.pTableListInfo->pTableList);
|
||||
taosHashClear(pInfo->base.pTableListInfo->map);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// reset value for the next group data output
|
||||
pOperator->status = OP_OPENED;
|
||||
resetLimitInfoForNextGroup(&pInfo->base.limitInfo);
|
||||
|
@ -844,10 +853,55 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
|
|||
return result;
|
||||
}
|
||||
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
|
||||
STableScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||
|
||||
if (pInfo->currentGroupId == -1) {
|
||||
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
STableKeyInfo* pList = NULL;
|
||||
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
|
||||
ASSERT(pInfo->base.dataReader == NULL);
|
||||
|
||||
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
|
||||
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
|
||||
pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
|
||||
}
|
||||
}
|
||||
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||
if (result != NULL) {
|
||||
if (pInfo->base.pTableListInfo->oneTableForEachGroup) {
|
||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId);
|
||||
result->info.id.groupId = pKeyInfo->uid;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
while (true) {
|
||||
result = startNextGroupScan(pOperator);
|
||||
if (result || pOperator->status == OP_EXEC_DONE) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||
STableScanInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
@ -862,8 +916,15 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
pInfo->currentGroupId = 0;
|
||||
return startNextGroupScan(pOperator);
|
||||
pInfo->currentGroupId = -1;
|
||||
pOperator->status = OP_OPENED;
|
||||
SSDataBlock* result = NULL;
|
||||
while (true) {
|
||||
result = startNextGroupScan(pOperator);
|
||||
if (result || pOperator->status == OP_EXEC_DONE) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -901,47 +962,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
pInfo->scanTimes = 0;
|
||||
}
|
||||
} else { // scan table group by group sequentially
|
||||
if (pInfo->currentGroupId == -1) {
|
||||
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
||||
setOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t num = 0;
|
||||
STableKeyInfo* pList = NULL;
|
||||
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
|
||||
ASSERT(pInfo->base.dataReader == NULL);
|
||||
|
||||
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
|
||||
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), pInfo->countOnly, &pInfo->pIgnoreTables);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
T_LONG_JMP(pTaskInfo->env, code);
|
||||
}
|
||||
|
||||
if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
|
||||
pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
|
||||
}
|
||||
}
|
||||
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||
if (result != NULL) {
|
||||
if (pInfo->base.pTableListInfo->oneTableForEachGroup) {
|
||||
STableKeyInfo* pKeyInfo = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentGroupId);
|
||||
result->info.id.groupId = pKeyInfo->uid;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) {
|
||||
setOperatorCompleted(pOperator);
|
||||
if (pOperator->dynamicTask) {
|
||||
taosArrayClear(pInfo->base.pTableListInfo->pTableList);
|
||||
taosHashClear(pInfo->base.pTableListInfo->map);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
return startNextGroupScan(pOperator);
|
||||
return groupSeqTableScan(pOperator);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue