This commit is contained in:
Bob Liu 2023-12-29 01:40:17 +08:00
parent 9f8f69a791
commit e2b61a55fc
3 changed files with 52 additions and 95 deletions

View File

@ -268,14 +268,12 @@ typedef struct STableScanInfo {
int32_t scanTimes; int32_t scanTimes;
SSDataBlock* pResBlock; SSDataBlock* pResBlock;
SHashObj* pIgnoreTables; SHashObj* pIgnoreTables;
SHashObj* pRemainTables; // remain table to process
SSampleExecInfo sample; // sample execution info SSampleExecInfo sample; // sample execution info
int32_t currentGroupId; int32_t tableStartIndex; // current group scan start
int32_t currentTable; int32_t tableEndIndex; // current group scan end
int8_t scanMode; int8_t scanMode;
int8_t assignBlockUid; int8_t assignBlockUid;
uint8_t countState; // empty table count state uint8_t countState; // empty table count state
bool isSameGroup; // whether all tables are in the same group this scan
bool hasGroupByTag; bool hasGroupByTag;
bool countOnly; bool countOnly;
bool filesetDelimited; bool filesetDelimited;

View File

@ -1209,7 +1209,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0); STableKeyInfo* pTableInfo = tableListGetInfo(pTableListInfo, 0);
uid = pTableInfo->uid; uid = pTableInfo->uid;
ts = INT64_MIN; ts = INT64_MIN;
pScanInfo->currentTable = 0; pScanInfo->tableEndIndex = 0;
} else { } else {
taosRUnLockLatch(&pTaskInfo->lock); taosRUnLockLatch(&pTaskInfo->lock);
qError("no table in table list, %s", id); qError("no table in table list, %s", id);
@ -1223,16 +1223,16 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
pInfo->pTableScanOp->resultInfo.totalRows = 0; pInfo->pTableScanOp->resultInfo.totalRows = 0;
// start from current accessed position // start from current accessed position
// we cannot start from the pScanInfo->currentTable, since the commit offset may cause the rollback of the start // we cannot start from the pScanInfo->tableEndIndex, since the commit offset may cause the rollback of the start
// position, let's find it from the beginning. // position, let's find it from the beginning.
index = tableListFind(pTableListInfo, uid, 0); index = tableListFind(pTableListInfo, uid, 0);
taosRUnLockLatch(&pTaskInfo->lock); taosRUnLockLatch(&pTaskInfo->lock);
if (index >= 0) { if (index >= 0) {
pScanInfo->currentTable = index; pScanInfo->tableEndIndex = index;
} else { } else {
qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid, qError("vgId:%d uid:%" PRIu64 " not found in table list, total:%d, index:%d %s", pTaskInfo->id.vgId, uid,
numOfTables, pScanInfo->currentTable, id); numOfTables, pScanInfo->tableEndIndex, id);
terrno = TSDB_CODE_PAR_INTERNAL_ERROR; terrno = TSDB_CODE_PAR_INTERNAL_ERROR;
return -1; return -1;
} }
@ -1255,12 +1255,12 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT
} }
qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s", qDebug("tsdb reader created with offset(snapshot) uid:%" PRId64 " ts:%" PRId64 " table index:%d, total:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id);
} else { } else {
pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1); pTaskInfo->storageAPI.tsdReader.tsdSetQueryTableList(pScanBaseInfo->dataReader, &keyInfo, 1);
pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond); pTaskInfo->storageAPI.tsdReader.tsdReaderResetStatus(pScanBaseInfo->dataReader, &pScanBaseInfo->cond);
qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s", qDebug("tsdb reader offset seek snapshot to uid:%" PRId64 " ts %" PRId64 " table index:%d numOfTable:%d, %s",
uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->currentTable, numOfTables, id); uid, pScanBaseInfo->cond.twindows.skey, pScanInfo->tableEndIndex, numOfTables, id);
} }
// restore the key value // restore the key value

View File

@ -655,45 +655,29 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData,
colDataDestroy(&infoData); colDataDestroy(&infoData);
} }
static int32_t initTableCountEnv(STableScanInfo* pTableScanInfo, const STableKeyInfo* pList, int32_t num) {
if (!pTableScanInfo->needCountEmptyTable) {
pTableScanInfo->countState = TABLE_COUNT_STATE_END;
return TSDB_CODE_SUCCESS;
}
pTableScanInfo->isSameGroup = true;
if (NULL == pTableScanInfo->pRemainTables) {
int32_t tableNum = taosArrayGetSize(pTableScanInfo->base.pTableListInfo->pTableList);
pTableScanInfo->pRemainTables =
taosHashInit(tableNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (NULL == pTableScanInfo->pRemainTables) {
pTableScanInfo->countState = TABLE_COUNT_STATE_END;
return TSDB_CODE_OUT_OF_MEMORY;
}
}
uint64_t groupId = pList->groupId;
for (int32_t i = 0; i < num; i++) {
const STableKeyInfo* pInfo = pList + i;
if (pTableScanInfo->isSameGroup && groupId != pInfo->groupId) {
pTableScanInfo->isSameGroup = false;
}
taosHashPut(pTableScanInfo->pRemainTables, &(pInfo->uid), sizeof(pInfo->uid), &(pInfo->groupId), sizeof(pInfo->groupId));
}
pTableScanInfo->countState = TABLE_COUNT_STATE_SCAN;
return TSDB_CODE_SUCCESS;
}
static void markTableProcessed(STableScanInfo* pTableScanInfo, uint64_t uid) { static void initNextGroupScan(STableScanInfo* pInfo, STableKeyInfo** pKeyInfo, int32_t* size) {
// case0 group scanning, mark pInfo->tableStartIndex = pInfo->tableEndIndex + 1;
// case1 stream scan: no need to mark
if (pTableScanInfo->countState > TABLE_COUNT_STATE_SCAN) { int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
return; STableKeyInfo* pStart = (STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableStartIndex);
int32_t i = pInfo->tableStartIndex + 1;
for (; i < numOfTables; ++i) {
STableKeyInfo* pCur = tableListGetInfo(pInfo->base.pTableListInfo, i);
if (pCur->groupId != pStart->groupId) {
break;
}
} }
// case2 if all table in same group, process only once
if (pTableScanInfo->isSameGroup) { pInfo->tableEndIndex = i - 1;
pTableScanInfo->countState = TABLE_COUNT_STATE_END; if (!pInfo->needCountEmptyTable) {
return; pInfo->countState = TABLE_COUNT_STATE_END;
} else {
pInfo->countState = TABLE_COUNT_STATE_SCAN;
} }
taosHashRemove(pTableScanInfo->pRemainTables, &uid, sizeof(uid));
*pKeyInfo = pStart;
*size = i - pInfo->tableStartIndex;
} }
static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBase* pBase, SSDataBlock* pBlock, static SSDataBlock* getOneRowResultBlock(SExecTaskInfo* pTaskInfo, STableScanBase* pBase, SSDataBlock* pBlock,
@ -791,7 +775,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
return NULL; return NULL;
} }
static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKeyInfo* pList, int32_t num) { static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator) {
STableScanInfo* pTableScanInfo = pOperator->info; STableScanInfo* pTableScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
@ -801,15 +785,11 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey
return NULL; return NULL;
} }
if (TABLE_COUNT_STATE_NONE == pTableScanInfo->countState) {
initTableCountEnv(pTableScanInfo, pList, num);
}
// do the ascending order traverse in the first place. // do the ascending order traverse in the first place.
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) { while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
SSDataBlock* p = doTableScanImpl(pOperator); SSDataBlock* p = doTableScanImpl(pOperator);
if (p != NULL) { if (p != NULL) {
markTableProcessed(pTableScanInfo, p->info.id.uid); pTableScanInfo->countState = TABLE_COUNT_STATE_END;
return p; return p;
} }
@ -838,7 +818,7 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey
while (pTableScanInfo->scanTimes < total) { while (pTableScanInfo->scanTimes < total) {
SSDataBlock* p = doTableScanImpl(pOperator); SSDataBlock* p = doTableScanImpl(pOperator);
if (p != NULL) { if (p != NULL) {
markTableProcessed(pTableScanInfo, p->info.id.uid); pTableScanInfo->countState = TABLE_COUNT_STATE_END;
return p; return p;
} }
@ -856,29 +836,13 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey
} }
if (pTableScanInfo->countState < TABLE_COUNT_STATE_END) { if (pTableScanInfo->countState < TABLE_COUNT_STATE_END) {
int32_t tb_cnt = taosHashGetSize(pTableScanInfo->pRemainTables); // output once for this group
if (tb_cnt) {
if (!pTableScanInfo->isSameGroup) {
// get first empty table uid, mark processed & rm from hash
void *pIte = taosHashIterate(pTableScanInfo->pRemainTables, NULL);
if (pIte != NULL) {
size_t keySize = 0;
uint64_t* pUid = taosHashGetKey(pIte, &keySize);
STableKeyInfo info = {.uid = *pUid, .groupId = *(uint64_t*)pIte};
taosHashCancelIterate(pTableScanInfo->pRemainTables, pIte);
markTableProcessed(pTableScanInfo, *pUid);
return getBlockForEmptyTable(pOperator, &info);
}
} else {
// output one table for this group
pTableScanInfo->countState = TABLE_COUNT_STATE_END;
return getBlockForEmptyTable(pOperator, pList);
}
}
pTableScanInfo->countState = TABLE_COUNT_STATE_END; pTableScanInfo->countState = TABLE_COUNT_STATE_END;
STableKeyInfo* pStart =
(STableKeyInfo*)tableListGetInfo(pTableScanInfo->base.pTableListInfo, pTableScanInfo->tableStartIndex);
return getBlockForEmptyTable(pOperator, pStart);
} }
taosHashClear(pTableScanInfo->pRemainTables);
return NULL; return NULL;
} }
@ -938,8 +902,8 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
STableScanInfo* pInfo = pOperator->info; STableScanInfo* pInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI; SStorageAPI* pAPI = &pTaskInfo->storageAPI;
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo)) { if (pInfo->tableEndIndex + 1 >= numOfTables) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
if (pOperator->dynamicTask) { if (pOperator->dynamicTask) {
taosArrayClear(pInfo->base.pTableListInfo->pTableList); taosArrayClear(pInfo->base.pTableListInfo->pTableList);
@ -954,14 +918,13 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
int32_t num = 0; int32_t num = 0;
STableKeyInfo* pList = NULL; STableKeyInfo* pList = NULL;
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); initNextGroupScan(pInfo, &pList, &num);
pInfo->countState = TABLE_COUNT_STATE_NONE;
pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num); pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num);
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
SSDataBlock* result = doGroupedTableScan(pOperator, pList, num); SSDataBlock* result = doGroupedTableScan(pOperator);
if (result != NULL) { if (result != NULL) {
if (pOperator->dynamicTask) { if (pOperator->dynamicTask) {
result->info.id.groupId = result->info.id.uid; result->info.id.groupId = result->info.id.uid;
@ -979,15 +942,14 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
int32_t num = 0; int32_t num = 0;
STableKeyInfo* pList = NULL; STableKeyInfo* pList = NULL;
if (pInfo->currentGroupId == -1) { if (pInfo->tableEndIndex == -1) {
int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo); int32_t numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
if ((++pInfo->currentGroupId) >= tableListGetOutputGroups(pInfo->base.pTableListInfo) || numOfTables == 0) { if (pInfo->tableEndIndex + 1 == numOfTables) {
setOperatorCompleted(pOperator); setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num); initNextGroupScan(pInfo, &pList, &num);
pInfo->countState = TABLE_COUNT_STATE_NONE;
ASSERT(pInfo->base.dataReader == NULL); ASSERT(pInfo->base.dataReader == NULL);
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock, int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
@ -1001,11 +963,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) { if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity; pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
} }
} else {
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
} }
SSDataBlock* result = doGroupedTableScan(pOperator, pList, num); SSDataBlock* result = doGroupedTableScan(pOperator);
if (result != NULL) { if (result != NULL) {
if (pOperator->dynamicTask) { if (pOperator->dynamicTask) {
result->info.id.groupId = result->info.id.uid; result->info.id.groupId = result->info.id.uid;
@ -1038,7 +998,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
T_LONG_JMP(pTaskInfo->env, code); T_LONG_JMP(pTaskInfo->env, code);
} }
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
pInfo->currentGroupId = -1; pInfo->tableEndIndex = -1;
pOperator->status = OP_OPENED; pOperator->status = OP_OPENED;
SSDataBlock* result = NULL; SSDataBlock* result = NULL;
while (true) { while (true) {
@ -1057,29 +1017,29 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
pInfo->countState = TABLE_COUNT_STATE_END; pInfo->countState = TABLE_COUNT_STATE_END;
while (1) { while (1) {
SSDataBlock* result = doGroupedTableScan(pOperator, NULL, 0); SSDataBlock* result = doGroupedTableScan(pOperator);
if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) { if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
return result; return result;
} }
// if no data, switch to next table and continue scan // if no data, switch to next table and continue scan
pInfo->currentTable++; pInfo->tableEndIndex++;
taosRLockLatch(&pTaskInfo->lock); taosRLockLatch(&pTaskInfo->lock);
numOfTables = tableListGetSize(pInfo->base.pTableListInfo); numOfTables = tableListGetSize(pInfo->base.pTableListInfo);
if (pInfo->currentTable >= numOfTables) { if (pInfo->tableEndIndex >= numOfTables) {
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo)); qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
taosRUnLockLatch(&pTaskInfo->lock); taosRUnLockLatch(&pTaskInfo->lock);
return NULL; return NULL;
} }
tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->currentTable); tInfo = *(STableKeyInfo*)tableListGetInfo(pInfo->base.pTableListInfo, pInfo->tableEndIndex);
taosRUnLockLatch(&pTaskInfo->lock); taosRUnLockLatch(&pTaskInfo->lock);
pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1); pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, &tInfo, 1);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables, qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo)); pInfo->tableEndIndex, numOfTables, GET_TASKID(pTaskInfo));
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond); pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->scanTimes = 0; pInfo->scanTimes = 0;
@ -1117,7 +1077,6 @@ static void destroyTableScanOperatorInfo(void* param) {
STableScanInfo* pTableScanInfo = (STableScanInfo*)param; STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
blockDataDestroy(pTableScanInfo->pResBlock); blockDataDestroy(pTableScanInfo->pResBlock);
taosHashCleanup(pTableScanInfo->pIgnoreTables); taosHashCleanup(pTableScanInfo->pIgnoreTables);
taosHashCleanup(pTableScanInfo->pRemainTables);
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI); destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
@ -1173,7 +1132,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
goto _error; goto _error;
} }
pInfo->currentGroupId = -1; pInfo->tableEndIndex = -1;
pInfo->assignBlockUid = pTableScanNode->assignBlockUid; pInfo->assignBlockUid = pTableScanNode->assignBlockUid;
pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false; pInfo->hasGroupByTag = pTableScanNode->pGroupTags ? true : false;
@ -1268,7 +1227,7 @@ void resetTableScanInfo(STableScanInfo* pTableScanInfo, STimeWindow* pWin, uint6
pTableScanInfo->base.cond.startVersion = 0; pTableScanInfo->base.cond.startVersion = 0;
pTableScanInfo->base.cond.endVersion = ver; pTableScanInfo->base.cond.endVersion = ver;
pTableScanInfo->scanTimes = 0; pTableScanInfo->scanTimes = 0;
pTableScanInfo->currentGroupId = -1; pTableScanInfo->tableEndIndex = -1;
pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader); pTableScanInfo->base.readerAPI.tsdReaderClose(pTableScanInfo->base.dataReader);
pTableScanInfo->base.dataReader = NULL; pTableScanInfo->base.dataReader = NULL;
} }
@ -2170,7 +2129,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
pInfo->pTableScanOp->status = OP_OPENED; pInfo->pTableScanOp->status = OP_OPENED;
pTSInfo->scanTimes = 0; pTSInfo->scanTimes = 0;
pTSInfo->currentGroupId = -1; pTSInfo->tableEndIndex = -1;
} }
if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) { if (pStreamInfo->recoverStep == STREAM_RECOVER_STEP__SCAN1) {