feat: add tag condition
This commit is contained in:
parent
131e923718
commit
eea32f5f0a
|
@ -103,9 +103,7 @@ tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STab
|
||||||
void *pMemRef);
|
void *pMemRef);
|
||||||
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
|
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
|
||||||
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
|
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
|
||||||
int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, const char *pTagCond, size_t len,
|
int32_t tsdbQueryAllTable(void* pMeta, uint64_t uid, STableGroupInfo* pGroupInfo, SNode* pTagCond);
|
||||||
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupInfo,
|
|
||||||
SColIndex *pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId);
|
|
||||||
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
|
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
|
||||||
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
|
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
|
||||||
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
|
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
#include "index.h"
|
||||||
|
|
||||||
#define EXTRA_BYTES 2
|
#define EXTRA_BYTES 2
|
||||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||||
|
@ -3875,74 +3876,25 @@ SArray* createTableGroup(SArray* pTableList, SSchemaWrapper* pTagSchema, SColInd
|
||||||
// return TSDB_CODE_SUCCESS;
|
// return TSDB_CODE_SUCCESS;
|
||||||
//}
|
//}
|
||||||
|
|
||||||
int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
|
int32_t tsdbQueryAllTable(void* pMeta, uint64_t uid, STableGroupInfo* pGroupInfo, SNode* pTagCond) {
|
||||||
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
|
|
||||||
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
|
|
||||||
SMetaReader mr = {0};
|
|
||||||
|
|
||||||
metaReaderInit(&mr, (SMeta*)pMeta, 0);
|
|
||||||
|
|
||||||
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
|
||||||
tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
|
|
||||||
goto _error;
|
|
||||||
} else {
|
|
||||||
tsdbDebug("%p succeed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mr.me.type != TSDB_SUPER_TABLE) {
|
|
||||||
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId,
|
|
||||||
reqId);
|
|
||||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
metaReaderClear(&mr);
|
|
||||||
|
|
||||||
// NOTE: not add ref count for super table
|
// NOTE: not add ref count for super table
|
||||||
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
|
SArray* res = taosArrayInit(8, sizeof(STableKeyInfo));
|
||||||
SSchemaWrapper* pTagSchema = metaGetTableSchema(pMeta, uid, 1, true);
|
|
||||||
|
|
||||||
// no tags and tbname condition, all child tables of this stable are involved
|
|
||||||
if (tbnameCond == NULL && (pTagCond == NULL || len == 0)) {
|
|
||||||
int32_t ret = getAllTableList(pMeta, uid, res);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
goto _error;
|
|
||||||
}
|
|
||||||
|
|
||||||
pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
|
|
||||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
|
|
||||||
|
|
||||||
tsdbDebug("%p no table name/tag condition, all tables qualified, numOfTables:%u, group:%zu, TID:0x%" PRIx64
|
|
||||||
" QID:0x%" PRIx64,
|
|
||||||
pMeta, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList), taskId, reqId);
|
|
||||||
|
|
||||||
taosArrayDestroy(res);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t ret = TSDB_CODE_SUCCESS;
|
int32_t ret = TSDB_CODE_SUCCESS;
|
||||||
|
if(pTagCond){
|
||||||
SFilterInfo* filterInfo = NULL;
|
ret = doFilterTag(pTagCond, res);
|
||||||
ret = filterInitFromNode((SNode*)pTagCond, &filterInfo, 0);
|
}else{
|
||||||
|
ret = getAllTableList(pMeta, uid, res);
|
||||||
|
}
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
terrno = ret;
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
ret = tsdbQueryTableList(pMeta, res, filterInfo);
|
|
||||||
pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
|
pGroupInfo->numOfTables = (uint32_t)taosArrayGetSize(res);
|
||||||
pGroupInfo->pGroupList = createTableGroup(res, pTagSchema, pColIndex, numOfCols, skey);
|
pGroupInfo->pGroupList = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
taosArrayPush(pGroupInfo->pGroupList, &res);
|
||||||
|
|
||||||
// tsdbDebug("%p stable tid:%d, uid:%" PRIu64 " query, numOfTables:%u, belong to %" PRIzu " groups", tsdb,
|
|
||||||
// pTable->tableId, pTable->uid, pGroupInfo->numOfTables, taosArrayGetSize(pGroupInfo->pGroupList));
|
|
||||||
|
|
||||||
taosArrayDestroy(res);
|
|
||||||
return ret;
|
return ret;
|
||||||
|
|
||||||
_error:
|
|
||||||
return terrno;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) {
|
int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) {
|
||||||
|
|
|
@ -4438,10 +4438,10 @@ static SExecTaskInfo* createExecTaskInfo(uint64_t queryId, uint64_t taskId, EOPT
|
||||||
}
|
}
|
||||||
|
|
||||||
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
static tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||||
STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId);
|
STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode);
|
||||||
|
|
||||||
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo,
|
static int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo,
|
||||||
uint64_t queryId, uint64_t taskId);
|
uint64_t queryId, uint64_t taskId, SNode* pTagCond);
|
||||||
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo);
|
||||||
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
static SArray* extractColumnInfo(SNodeList* pNodeList);
|
||||||
|
|
||||||
|
@ -4471,14 +4471,14 @@ void extractTableSchemaVersion(SReadHandle* pHandle, uint64_t uid, SExecTaskInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle,
|
||||||
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) {
|
uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo, SNode* pTagCond) {
|
||||||
int32_t type = nodeType(pPhyNode);
|
int32_t type = nodeType(pPhyNode);
|
||||||
|
|
||||||
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
||||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
tsdbReaderT pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond);
|
||||||
if (pDataReader == NULL && terrno != 0) {
|
if (pDataReader == NULL && terrno != 0) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -4502,9 +4502,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
|
|
||||||
tsdbReaderT pDataReader = NULL;
|
tsdbReaderT pDataReader = NULL;
|
||||||
if (pHandle->vnode) {
|
if (pHandle->vnode) {
|
||||||
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId);
|
pDataReader = doCreateDataReader(pTableScanNode, pHandle, pTableGroupInfo, (uint64_t)queryId, taskId, pTagCond);
|
||||||
} else {
|
} else {
|
||||||
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId);
|
doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo, queryId, taskId, pTagCond);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDataReader == NULL && terrno != 0) {
|
if (pDataReader == NULL && terrno != 0) {
|
||||||
|
@ -4551,7 +4551,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
SSDataBlock* pResBlock = createResDataBlock(pDescNode);
|
||||||
|
|
||||||
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
|
int32_t code = doCreateTableGroup(pHandle->meta, pScanPhyNode->tableType, pScanPhyNode->uid, pTableGroupInfo,
|
||||||
queryId, taskId);
|
queryId, taskId, pTagCond);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -4577,7 +4577,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
|
SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES);
|
||||||
for (int32_t i = 0; i < size; ++i) {
|
for (int32_t i = 0; i < size; ++i) {
|
||||||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
||||||
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo, pTagCond);
|
||||||
if (ops[i] == NULL) {
|
if (ops[i] == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -4893,10 +4893,10 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo,
|
int32_t doCreateTableGroup(void* metaHandle, int32_t tableType, uint64_t tableUid, STableGroupInfo* pGroupInfo,
|
||||||
uint64_t queryId, uint64_t taskId) {
|
uint64_t queryId, uint64_t taskId, SNode* pTagCond) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (tableType == TSDB_SUPER_TABLE) {
|
if (tableType == TSDB_SUPER_TABLE) {
|
||||||
code = tsdbQuerySTableByTagCond(metaHandle, tableUid, 0, NULL, 0, 0, NULL, pGroupInfo, NULL, 0, queryId, taskId);
|
code = tsdbQueryAllTable(metaHandle, tableUid, pGroupInfo, pTagCond);
|
||||||
} else { // Create one table group.
|
} else { // Create one table group.
|
||||||
code = tsdbGetOneTableGroup(metaHandle, tableUid, 0, pGroupInfo);
|
code = tsdbGetOneTableGroup(metaHandle, tableUid, 0, pGroupInfo);
|
||||||
}
|
}
|
||||||
|
@ -4923,10 +4923,10 @@ SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
tsdbReaderT doCreateDataReader(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle,
|
||||||
STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId) {
|
STableGroupInfo* pTableGroupInfo, uint64_t queryId, uint64_t taskId, SNode* pTagNode) {
|
||||||
uint64_t uid = pTableScanNode->scan.uid;
|
uint64_t uid = pTableScanNode->scan.uid;
|
||||||
int32_t code =
|
int32_t code =
|
||||||
doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId);
|
doCreateTableGroup(pHandle->meta, pTableScanNode->scan.tableType, uid, pTableGroupInfo, queryId, taskId, pTagNode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
@ -4962,7 +4962,7 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead
|
||||||
}
|
}
|
||||||
|
|
||||||
(*pTaskInfo)->pRoot =
|
(*pTaskInfo)->pRoot =
|
||||||
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo);
|
createOperatorTree(pPlan->pNode, *pTaskInfo, pHandle, queryId, taskId, &(*pTaskInfo)->tableqinfoGroupInfo, pPlan->pTagCond);
|
||||||
if (NULL == (*pTaskInfo)->pRoot) {
|
if (NULL == (*pTaskInfo)->pRoot) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
goto _complete;
|
goto _complete;
|
||||||
|
|
Loading…
Reference in New Issue