fix(stream): update the table list api.
This commit is contained in:
parent
eb7f510ccb
commit
4ed26bbc19
|
@ -108,7 +108,7 @@ uint64_t tableListGetSize(const STableListInfo* pTableList);
|
||||||
uint64_t tableListGetSuid(const STableListInfo* pTableList);
|
uint64_t tableListGetSuid(const STableListInfo* pTableList);
|
||||||
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
|
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index);
|
||||||
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex);
|
int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t startIndex);
|
||||||
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, int32_t* type);
|
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type);
|
||||||
|
|
||||||
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
void initResultRowInfo(SResultRowInfo* pResultRowInfo);
|
void initResultRowInfo(SResultRowInfo* pResultRowInfo);
|
||||||
|
|
|
@ -27,19 +27,21 @@
|
||||||
#include "executorimpl.h"
|
#include "executorimpl.h"
|
||||||
#include "tcompression.h"
|
#include "tcompression.h"
|
||||||
|
|
||||||
|
typedef struct STableListIdInfo {
|
||||||
|
uint64_t suid;
|
||||||
|
uint64_t uid;
|
||||||
|
int32_t tableType;
|
||||||
|
} STableListIdInfo;
|
||||||
|
|
||||||
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
|
// If the numOfOutputGroups is 1, the data blocks that belongs to different groups will be provided randomly
|
||||||
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
|
// The numOfOutputGroups is specified by physical plan. and will not be affect by numOfGroups
|
||||||
struct STableListInfo {
|
struct STableListInfo {
|
||||||
bool oneTableForEachGroup;
|
bool oneTableForEachGroup;
|
||||||
int32_t numOfOuputGroups; // the data block will be generated one by one
|
int32_t numOfOuputGroups; // the data block will be generated one by one
|
||||||
int32_t* groupOffset; // keep the offset value for each group in the tableList
|
int32_t* groupOffset; // keep the offset value for each group in the tableList
|
||||||
SArray* pTableList;
|
SArray* pTableList;
|
||||||
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
SHashObj* map; // speedup acquire the tableQueryInfo by table uid
|
||||||
union {
|
STableListIdInfo idInfo; // this maybe the super table or ordinary table
|
||||||
uint64_t suid;
|
|
||||||
uint64_t uid;
|
|
||||||
}; // this maybe the super table or ordinary table
|
|
||||||
int32_t tableType; // queried table type
|
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct tagFilterAssist {
|
typedef struct tagFilterAssist {
|
||||||
|
@ -474,7 +476,7 @@ int32_t getColInfoResultForGroupby(void* metaHandle, SNodeList* group, STableLis
|
||||||
}
|
}
|
||||||
|
|
||||||
// int64_t stt = taosGetTimestampUs();
|
// int64_t stt = taosGetTimestampUs();
|
||||||
code = metaGetTableTags(metaHandle, pTableListInfo->suid, pUidTagList);
|
code = metaGetTableTags(metaHandle, pTableListInfo->idInfo.suid, pUidTagList);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -957,7 +959,7 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
|
||||||
|
|
||||||
FilterCondType condType = checkTagCond(pTagCond);
|
FilterCondType condType = checkTagCond(pTagCond);
|
||||||
|
|
||||||
int32_t filter = optimizeTbnameInCond(metaHandle, pListInfo->suid, pUidTagList, pTagCond);
|
int32_t filter = optimizeTbnameInCond(metaHandle, pListInfo->idInfo.suid, pUidTagList, pTagCond);
|
||||||
if (filter == 0) { // tbname in filter is activated, do nothing and return
|
if (filter == 0) { // tbname in filter is activated, do nothing and return
|
||||||
taosArrayClear(pUidList);
|
taosArrayClear(pUidList);
|
||||||
|
|
||||||
|
@ -970,12 +972,12 @@ static int32_t doFilterByTagCond(STableListInfo* pListInfo, SArray* pUidList, SN
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
} else {
|
} else {
|
||||||
if ((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) {
|
if ((condType == FILTER_NO_LOGIC || condType == FILTER_AND) && status != SFLT_NOT_INDEX) {
|
||||||
code = metaGetTableTagsByUids(metaHandle, pListInfo->suid, pUidTagList);
|
code = metaGetTableTagsByUids(metaHandle, pListInfo->idInfo.suid, pUidTagList);
|
||||||
} else {
|
} else {
|
||||||
code = metaGetTableTags(metaHandle, pListInfo->suid, pUidTagList);
|
code = metaGetTableTags(metaHandle, pListInfo->idInfo.suid, pUidTagList);
|
||||||
}
|
}
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->suid);
|
qError("failed to get table tags from meta, reason:%s, suid:%" PRIu64, tstrerror(code), pListInfo->idInfo.suid);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -1029,14 +1031,14 @@ int32_t getTableList(void* metaHandle, void* pVnode, SScanPhysiNode* pScanNode,
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
size_t numOfTables = 0;
|
size_t numOfTables = 0;
|
||||||
|
|
||||||
pListInfo->suid = pScanNode->suid;
|
pListInfo->idInfo.suid = pScanNode->suid;
|
||||||
pListInfo->tableType = pScanNode->tableType;
|
pListInfo->idInfo.tableType = pScanNode->tableType;
|
||||||
|
|
||||||
SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
|
SArray* pUidList = taosArrayInit(8, sizeof(uint64_t));
|
||||||
|
|
||||||
SIdxFltStatus status = SFLT_NOT_INDEX;
|
SIdxFltStatus status = SFLT_NOT_INDEX;
|
||||||
if (pScanNode->tableType != TSDB_SUPER_TABLE) {
|
if (pScanNode->tableType != TSDB_SUPER_TABLE) {
|
||||||
pListInfo->uid = pScanNode->uid;
|
pListInfo->idInfo.uid = pScanNode->uid;
|
||||||
if (metaIsTableExist(metaHandle, pScanNode->uid)) {
|
if (metaIsTableExist(metaHandle, pScanNode->uid)) {
|
||||||
taosArrayPush(pUidList, &pScanNode->uid);
|
taosArrayPush(pUidList, &pScanNode->uid);
|
||||||
}
|
}
|
||||||
|
@ -1801,11 +1803,7 @@ uint64_t tableListGetSize(const STableListInfo* pTableList) {
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t tableListGetSuid(const STableListInfo* pTableList) {
|
uint64_t tableListGetSuid(const STableListInfo* pTableList) {
|
||||||
if (pTableList->tableType == TSDB_SUPER_TABLE) {
|
return pTableList->idInfo.suid;
|
||||||
return pTableList->suid;
|
|
||||||
} else { // query normal table, no suid exists.
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
|
STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index) {
|
||||||
|
@ -1831,9 +1829,10 @@ int32_t tableListFind(const STableListInfo* pTableList, uint64_t uid, int32_t st
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, int32_t* type) {
|
void tableListGetSourceTableInfo(const STableListInfo* pTableList, uint64_t* psuid, uint64_t* uid, int32_t* type) {
|
||||||
*psuid = pTableList->suid;
|
*psuid = pTableList->idInfo.suid;
|
||||||
*type = pTableList->tableType;
|
*uid = pTableList->idInfo.uid;
|
||||||
|
*type = pTableList->idInfo.tableType;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid) {
|
||||||
|
|
|
@ -330,9 +330,10 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
||||||
|
|
||||||
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
|
STableScanInfo* pTableScanInfo = pScanInfo->pTableScanOp->info;
|
||||||
|
|
||||||
|
uint64_t suid = 0;
|
||||||
uint64_t uid = 0;
|
uint64_t uid = 0;
|
||||||
int32_t type = 0;
|
int32_t type = 0;
|
||||||
tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &uid, &type);
|
tableListGetSourceTableInfo(pTableScanInfo->base.pTableListInfo, &suid, &uid, &type);
|
||||||
|
|
||||||
// let's discard the tables those are not created according to the queried super table.
|
// let's discard the tables those are not created according to the queried super table.
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
|
@ -353,7 +354,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S
|
||||||
} else {
|
} else {
|
||||||
if (type == TSDB_SUPER_TABLE) {
|
if (type == TSDB_SUPER_TABLE) {
|
||||||
// this new created child table does not belong to the scanned super table.
|
// this new created child table does not belong to the scanned super table.
|
||||||
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != uid) {
|
if (mr.me.type != TSDB_CHILD_TABLE || mr.me.ctbEntry.suid != suid) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else { // ordinary table
|
} else { // ordinary table
|
||||||
|
|
Loading…
Reference in New Issue