count state

This commit is contained in:
Bob Liu 2023-12-27 21:37:00 +08:00
parent f6e55d358b
commit d795798648
2 changed files with 63 additions and 42 deletions

View File

@ -217,6 +217,12 @@ enum {
TABLE_SCAN__BLOCK_ORDER = 2,
};
typedef enum ETableCountState {
TABLE_COUNT_STATE_NONE = 0, // before start scan
TABLE_COUNT_STATE_SCAN = 1, // scanning
TABLE_COUNT_STATE_END = 2, // finish or noneed to process
} ETableCountState;
typedef struct SAggSupporter {
SSHashObj* pResultRowHashTable; // quick locate the window object for each result
char* keyBuf; // window key buffer
@ -262,17 +268,18 @@ typedef struct STableScanInfo {
int32_t scanTimes;
SSDataBlock* pResBlock;
SHashObj* pIgnoreTables;
SHashObj* pValuedTables; // non empty table uids
SHashObj* pRemainTables; // remain table to process
SSampleExecInfo sample; // sample execution info
int32_t currentGroupId;
int32_t currentTable;
int8_t scanMode;
int8_t assignBlockUid;
uint8_t countState; // empty table count state
bool isOneGroup; // whether or not only one group in this scan
bool hasGroupByTag;
bool countOnly;
bool filesetDelimited;
bool needCountEmptyTable;
bool processingEmptyTable;
} STableScanInfo;
typedef struct STableMergeScanInfo {

View File

@ -655,26 +655,44 @@ void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData,
colDataDestroy(&infoData);
}
// record processed (non empty) table
static int32_t markTableProcessed(STableScanInfo* pTableScanInfo, uint64_t uid) {
static int32_t initRemainTable(STableScanInfo* pTableScanInfo, const STableKeyInfo* pList, int32_t num) {
if (!pTableScanInfo->needCountEmptyTable) {
return TSDB_CODE_SUCCESS;
}
if (NULL == pTableScanInfo->pValuedTables) {
pTableScanInfo->isOneGroup = true;
if (NULL == pTableScanInfo->pRemainTables) {
int32_t tableNum = taosArrayGetSize(pTableScanInfo->base.pTableListInfo->pTableList);
pTableScanInfo->pValuedTables =
pTableScanInfo->pRemainTables =
taosHashInit(tableNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (NULL == pTableScanInfo->pValuedTables) {
if (NULL == pTableScanInfo->pRemainTables) {
pTableScanInfo->countState = TABLE_COUNT_STATE_END;
return TSDB_CODE_OUT_OF_MEMORY;
}
}
taosHashPut(pTableScanInfo->pValuedTables, &uid, sizeof(uid), &pTableScanInfo->scanTimes,
sizeof(pTableScanInfo->scanTimes));
uint64_t groupId = 0;
for (int32_t i = 0; i < num; i++) {
const STableKeyInfo* pInfo = pList + i;
if (pTableScanInfo->isOneGroup) {
if (i == 0) {
groupId = pInfo->groupId;
} else if (groupId != pInfo->groupId) {
pTableScanInfo->isOneGroup = false;
}
}
taosHashPut(pTableScanInfo->pRemainTables, &(pInfo->uid), sizeof(pInfo->uid), &(pInfo->groupId), sizeof(pInfo->groupId));
}
return TSDB_CODE_SUCCESS;
}
static void markTableProcessed(STableScanInfo* pTableScanInfo, uint64_t uid) {
// case0 group scanning, mark
// case1 stream scan: no need to mark
if (pTableScanInfo->countState > TABLE_COUNT_STATE_SCAN) {
return;
}
taosHashRemove(pTableScanInfo->pRemainTables, &uid, sizeof(uid));
}
static SSDataBlock* getBlockForEmptyTable(SOperatorInfo* pOperator, const STableKeyInfo* tbInfo) {
STableScanInfo* pTableScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
@ -770,14 +788,17 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey
STableScanInfo* pTableScanInfo = pOperator->info;
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
// Only when all tables are scanned can you determine how many groups the tag has
bool outputAll = true;
// The read handle is not initialized yet, since no qualified tables exists
if (pTableScanInfo->base.dataReader == NULL || pOperator->status == OP_EXEC_DONE) {
return NULL;
}
if (TABLE_COUNT_STATE_NONE == pTableScanInfo->countState) {
initRemainTable(pTableScanInfo, pList, num);
pTableScanInfo->countState = TABLE_COUNT_STATE_SCAN;
}
// do the ascending order traverse in the first place.
while (pTableScanInfo->scanTimes < pTableScanInfo->scanInfo.numOfAsc) {
SSDataBlock* p = doTableScanImpl(pOperator);
@ -829,37 +850,28 @@ static SSDataBlock* doGroupedTableScan(SOperatorInfo* pOperator, const STableKey
}
if (pTableScanInfo->needCountEmptyTable) {
// pList is NULL in mode TABLE_SCAN__TABLE_ORDER for streamscan, no need to process
// pList not NULL, group by group, num >= 1
int32_t tb_cnt = taosHashGetSize(pTableScanInfo->pValuedTables);
if (pList && num > tb_cnt) {
if (!pTableScanInfo->processingEmptyTable) {
pTableScanInfo->processingEmptyTable = true;
pTableScanInfo->currentTable = 0;
}
if (pTableScanInfo->currentTable < num) {
if (outputAll) {
// loop: get empty table uid & process
while (pTableScanInfo->currentTable < num) {
const STableKeyInfo* info = pList + pTableScanInfo->currentTable++;
if (pTableScanInfo->pValuedTables &&
NULL != taosHashGet(pTableScanInfo->pValuedTables, &info->uid, sizeof(info->uid))) {
} else {
return getBlockForEmptyTable(pOperator, info);
}
}
} else if (tb_cnt == 0) {
// only need one & all empty table in this group
// output first one
pTableScanInfo->currentTable = num;
return getBlockForEmptyTable(pOperator, pList);
int32_t tb_cnt = taosHashGetSize(pTableScanInfo->pRemainTables);
if (tb_cnt) {
if (!pTableScanInfo->isOneGroup) {
// get first empty table uid, mark processed & rm from hash
void *pIte = taosHashIterate(pTableScanInfo->pRemainTables, NULL);
if (pIte != NULL) {
size_t keySize = 0;
uint64_t* pUid = taosHashGetKey(pIte, &keySize);
STableKeyInfo info = {.uid = *pUid, .groupId = *(uint64_t*)pIte};
taosHashCancelIterate(pTableScanInfo->pRemainTables, pIte);
markTableProcessed(pTableScanInfo, *pUid);
return getBlockForEmptyTable(pOperator, &info);
}
} else {
// output one table for this group
taosHashClear(pTableScanInfo->pRemainTables);
return getBlockForEmptyTable(pOperator, pList);
}
}
pTableScanInfo->processingEmptyTable = false;
pTableScanInfo->countState = TABLE_COUNT_STATE_END;
}
taosHashClear(pTableScanInfo->pValuedTables);
return NULL;
}
@ -937,7 +949,8 @@ static SSDataBlock* startNextGroupScan(SOperatorInfo* pOperator) {
int32_t num = 0;
STableKeyInfo* pList = NULL;
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
pInfo->countState = TABLE_COUNT_STATE_NONE;
pAPI->tsdReader.tsdSetQueryTableList(pInfo->base.dataReader, pList, num);
pAPI->tsdReader.tsdReaderResetStatus(pInfo->base.dataReader, &pInfo->base.cond);
pInfo->scanTimes = 0;
@ -968,6 +981,7 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
}
tableListGetGroupList(pInfo->base.pTableListInfo, pInfo->currentGroupId, &pList, &num);
pInfo->countState = TABLE_COUNT_STATE_NONE;
ASSERT(pInfo->base.dataReader == NULL);
int32_t code = pAPI->tsdReader.tsdReaderOpen(pInfo->base.readHandle.vnode, &pInfo->base.cond, pList, num, pInfo->pResBlock,
@ -1034,6 +1048,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
int32_t numOfTables = 0; // tableListGetSize(pTaskInfo->pTableListInfo);
STableKeyInfo tInfo = {0};
pInfo->countState = TABLE_COUNT_STATE_END;
while (1) {
SSDataBlock* result = doGroupedTableScan(pOperator, NULL, 0);
@ -1096,7 +1111,7 @@ static void destroyTableScanOperatorInfo(void* param) {
STableScanInfo* pTableScanInfo = (STableScanInfo*)param;
blockDataDestroy(pTableScanInfo->pResBlock);
taosHashCleanup(pTableScanInfo->pIgnoreTables);
taosHashCleanup(pTableScanInfo->pValuedTables);
taosHashCleanup(pTableScanInfo->pRemainTables);
destroyTableScanBase(&pTableScanInfo->base, &pTableScanInfo->base.readerAPI);
taosMemoryFreeClear(param);
}
@ -1161,7 +1176,6 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pOperator->exprSupp.numOfExprs = numOfCols;
pInfo->needCountEmptyTable = tsCountAlwaysReturnValue && pTableScanNode->needCountEmptyTable;
pInfo->processingEmptyTable = false;
pInfo->base.pTableListInfo = pTableListInfo;
pInfo->base.metaCache.pTableMetaEntryCache = taosLRUCacheInit(1024 * 128, -1, .5);