Merge branch 'szhou/tag-scan-opt' of github.com:taosdata/TDengine into szhou/tag-scan-opt
This commit is contained in:
commit
42a4f364c1
|
@ -98,6 +98,16 @@ typedef struct SMTbCursor {
|
||||||
int8_t paused;
|
int8_t paused;
|
||||||
} SMTbCursor;
|
} SMTbCursor;
|
||||||
|
|
||||||
|
typedef struct SMCtbCursor {
|
||||||
|
SMeta *pMeta;
|
||||||
|
void *pCur;
|
||||||
|
tb_uid_t suid;
|
||||||
|
void *pKey;
|
||||||
|
void *pVal;
|
||||||
|
int kLen;
|
||||||
|
int vLen;
|
||||||
|
} SMCtbCursor;
|
||||||
|
|
||||||
typedef struct SRowBuffPos {
|
typedef struct SRowBuffPos {
|
||||||
void* pRowBuff;
|
void* pRowBuff;
|
||||||
void* pKey;
|
void* pKey;
|
||||||
|
@ -278,13 +288,15 @@ typedef struct SStoreMeta {
|
||||||
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
|
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
|
||||||
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
|
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
|
||||||
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
|
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
|
||||||
|
|
||||||
int64_t (*getNumOfRowsInMem)(void* pVnode);
|
int64_t (*getNumOfRowsInMem)(void* pVnode);
|
||||||
/**
|
/**
|
||||||
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
|
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
|
||||||
int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
|
int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
|
||||||
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
|
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
|
||||||
*/
|
*/
|
||||||
|
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock);
|
||||||
|
void (*closeCtbCursor)(SMCtbCursor *pCtbCur, int lock);
|
||||||
|
tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur);
|
||||||
} SStoreMeta;
|
} SStoreMeta;
|
||||||
|
|
||||||
typedef struct SStoreMetaReader {
|
typedef struct SStoreMetaReader {
|
||||||
|
|
|
@ -334,7 +334,19 @@ typedef struct SScanPhysiNode {
|
||||||
bool groupOrderScan;
|
bool groupOrderScan;
|
||||||
} SScanPhysiNode;
|
} SScanPhysiNode;
|
||||||
|
|
||||||
typedef SScanPhysiNode STagScanPhysiNode;
|
typedef struct STagScanPhysiNode {
|
||||||
|
// SScanPhysiNode scan; //TODO?
|
||||||
|
SPhysiNode node;
|
||||||
|
SNodeList* pScanCols;
|
||||||
|
SNodeList* pScanPseudoCols;
|
||||||
|
uint64_t uid; // unique id of the table
|
||||||
|
uint64_t suid;
|
||||||
|
int8_t tableType;
|
||||||
|
SName tableName;
|
||||||
|
bool groupOrderScan;
|
||||||
|
bool onlyMetaCtbIdx; //no tbname, tag index not used.
|
||||||
|
} STagScanPhysiNode;
|
||||||
|
|
||||||
typedef SScanPhysiNode SBlockDistScanPhysiNode;
|
typedef SScanPhysiNode SBlockDistScanPhysiNode;
|
||||||
|
|
||||||
typedef struct SLastRowScanPhysiNode {
|
typedef struct SLastRowScanPhysiNode {
|
||||||
|
|
|
@ -167,7 +167,7 @@ int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||||
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
|
||||||
|
|
||||||
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
|
||||||
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock);
|
SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
|
||||||
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
|
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
|
||||||
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
|
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
|
||||||
SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);
|
SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);
|
||||||
|
|
|
@ -408,17 +408,9 @@ _err:
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SMCtbCursor {
|
|
||||||
SMeta *pMeta;
|
|
||||||
TBC *pCur;
|
|
||||||
tb_uid_t suid;
|
|
||||||
void *pKey;
|
|
||||||
void *pVal;
|
|
||||||
int kLen;
|
|
||||||
int vLen;
|
|
||||||
};
|
|
||||||
|
|
||||||
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid, int lock) {
|
SMCtbCursor *metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock) {
|
||||||
|
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
|
||||||
SMCtbCursor *pCtbCur = NULL;
|
SMCtbCursor *pCtbCur = NULL;
|
||||||
SCtbIdxKey ctbIdxKey;
|
SCtbIdxKey ctbIdxKey;
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
|
|
@ -96,6 +96,10 @@ void initMetadataAPI(SStoreMeta* pMeta) {
|
||||||
|
|
||||||
pMeta->metaGetCachedTbGroup = metaGetCachedTbGroup;
|
pMeta->metaGetCachedTbGroup = metaGetCachedTbGroup;
|
||||||
pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache;
|
pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache;
|
||||||
|
|
||||||
|
pMeta->openCtbCursor = metaOpenCtbCursor;
|
||||||
|
pMeta->closeCtbCursor = metaCloseCtbCursor;
|
||||||
|
pMeta->ctbCursorNext = metaCtbCursorNext;
|
||||||
}
|
}
|
||||||
|
|
||||||
void initTqAPI(SStoreTqReader* pTq) {
|
void initTqAPI(SStoreTqReader* pTq) {
|
||||||
|
|
|
@ -190,4 +190,6 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||||
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
|
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
|
||||||
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
|
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
|
||||||
|
|
||||||
|
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
|
||||||
|
SStorageAPI* pStorageAPI);
|
||||||
#endif // TDENGINE_EXECUTIL_H
|
#endif // TDENGINE_EXECUTIL_H
|
||||||
|
|
|
@ -259,6 +259,11 @@ typedef struct STagScanInfo {
|
||||||
SLimitNode* pSlimit;
|
SLimitNode* pSlimit;
|
||||||
SReadHandle readHandle;
|
SReadHandle readHandle;
|
||||||
STableListInfo* pTableListInfo;
|
STableListInfo* pTableListInfo;
|
||||||
|
uint64_t suid;
|
||||||
|
void* pCtbCursor;
|
||||||
|
SNode* pTagCond;
|
||||||
|
SNode* pTagIndexCond;
|
||||||
|
SStorageAPI* pStorageAPI;
|
||||||
} STagScanInfo;
|
} STagScanInfo;
|
||||||
|
|
||||||
typedef enum EStreamScanMode {
|
typedef enum EStreamScanMode {
|
||||||
|
|
|
@ -81,7 +81,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
|
||||||
|
|
||||||
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SNode* pTagCond, SNode*pTagIndexCond, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
|
|
|
@ -47,8 +47,6 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* p
|
||||||
|
|
||||||
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
|
||||||
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
|
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
|
||||||
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
|
|
||||||
SStorageAPI* pStorageAPI);
|
|
||||||
|
|
||||||
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
|
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
|
||||||
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
|
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
|
||||||
|
@ -846,7 +844,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
|
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
|
||||||
SStorageAPI* pStorageAPI) {
|
SStorageAPI* pStorageAPI) {
|
||||||
SSDataBlock* pResBlock = createDataBlock();
|
SSDataBlock* pResBlock = createDataBlock();
|
||||||
if (pResBlock == NULL) {
|
if (pResBlock == NULL) {
|
||||||
|
|
|
@ -370,17 +370,18 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
|
||||||
STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
|
STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
|
||||||
pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
|
pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
||||||
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
|
STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode;
|
||||||
STableListInfo* pTableListInfo = tableListCreate();
|
STableListInfo* pTableListInfo = tableListCreate();
|
||||||
int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
|
if (!pTagScanPhyNode->onlyMetaCtbIdx) {
|
||||||
|
int32_t code = createScanTableListInfo(pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
|
||||||
pTagIndexCond, pTaskInfo);
|
pTagIndexCond, pTaskInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pTaskInfo->code = code;
|
pTaskInfo->code = code;
|
||||||
qError("failed to getTableList, code: %s", tstrerror(code));
|
qError("failed to getTableList, code: %s", tstrerror(code));
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
|
pOperator = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
|
||||||
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
|
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
|
||||||
STableListInfo* pTableListInfo = tableListCreate();
|
STableListInfo* pTableListInfo = tableListCreate();
|
||||||
|
|
|
@ -2688,6 +2688,277 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void tagScanFreeUidTag(void* p) {
|
||||||
|
STUidTagInfo* pInfo = p;
|
||||||
|
if (pInfo->pTagVal != NULL) {
|
||||||
|
taosMemoryFree(pInfo->pTagVal);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tagScanCreateResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
|
||||||
|
SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
|
||||||
|
if (pColumnData == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
pColumnData->info.type = pType->type;
|
||||||
|
pColumnData->info.bytes = pType->bytes;
|
||||||
|
pColumnData->info.scale = pType->scale;
|
||||||
|
pColumnData->info.precision = pType->precision;
|
||||||
|
|
||||||
|
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
terrno = code;
|
||||||
|
taosMemoryFree(pColumnData);
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
pParam->columnData = pColumnData;
|
||||||
|
pParam->colAlloced = true;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef struct STagScanFilterContext {
|
||||||
|
SHashObj* colHash;
|
||||||
|
int32_t index;
|
||||||
|
SArray* cInfoList;
|
||||||
|
} STagScanFilterContext;
|
||||||
|
|
||||||
|
static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) {
|
||||||
|
SColumnNode* pSColumnNode = NULL;
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
|
||||||
|
pSColumnNode = *(SColumnNode**)pNode;
|
||||||
|
} else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
|
||||||
|
SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
|
||||||
|
if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
|
||||||
|
pSColumnNode = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
|
||||||
|
if (NULL == pSColumnNode) {
|
||||||
|
return DEAL_RES_ERROR;
|
||||||
|
}
|
||||||
|
pSColumnNode->colId = -1;
|
||||||
|
pSColumnNode->colType = COLUMN_TYPE_TBNAME;
|
||||||
|
pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
|
||||||
|
pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
|
||||||
|
nodesDestroyNode(*pNode);
|
||||||
|
*pNode = (SNode*)pSColumnNode;
|
||||||
|
} else {
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
STagScanFilterContext* pCtx = (STagScanFilterContext*)pContext;
|
||||||
|
void* data = taosHashGet(pCtx->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
|
||||||
|
if (!data) {
|
||||||
|
taosHashPut(pCtx->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
|
||||||
|
pSColumnNode->slotId = pCtx->index++;
|
||||||
|
SColumnInfo cInfo = {.colId = pSColumnNode->colId,
|
||||||
|
.type = pSColumnNode->node.resType.type,
|
||||||
|
.bytes = pSColumnNode->node.resType.bytes};
|
||||||
|
taosArrayPush(pCtx->cInfoList, &cInfo);
|
||||||
|
} else {
|
||||||
|
SColumnNode* col = *(SColumnNode**)data;
|
||||||
|
pSColumnNode->slotId = col->slotId;
|
||||||
|
}
|
||||||
|
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t numOfTables = taosArrayGetSize(aUidTags);
|
||||||
|
|
||||||
|
STagScanFilterContext ctx = {0};
|
||||||
|
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
|
||||||
|
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
|
||||||
|
|
||||||
|
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&ctx);
|
||||||
|
|
||||||
|
SSDataBlock* pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, aUidTags, pVnode, pAPI);
|
||||||
|
|
||||||
|
SArray* pBlockList = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
taosArrayPush(pBlockList, &pResBlock);
|
||||||
|
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
|
||||||
|
|
||||||
|
SScalarParam output = {0};
|
||||||
|
tagScanCreateResultData(&type, numOfTables, &output);
|
||||||
|
|
||||||
|
scalarCalculate(pTagCond, pBlockList, &output);
|
||||||
|
|
||||||
|
bool* result = (bool*)output.columnData->pData;
|
||||||
|
for (int32_t i = 0 ; i < numOfTables; ++i) {
|
||||||
|
if (result[i]) {
|
||||||
|
taosArrayPush(aFilterIdxs, &i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
colDataDestroy(output.columnData);
|
||||||
|
taosMemoryFreeClear(output.columnData);
|
||||||
|
|
||||||
|
blockDataDestroy(pResBlock);
|
||||||
|
taosArrayDestroy(pBlockList);
|
||||||
|
|
||||||
|
taosHashCleanup(ctx.colHash);
|
||||||
|
taosArrayDestroy(ctx.cInfoList);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) {
|
||||||
|
if (fmIsScanPseudoColumnFunc(pExprInfo->pExpr->_function.functionId)) { // tbname
|
||||||
|
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
|
||||||
|
// if (pUidTagInfo->name != NULL) {
|
||||||
|
// STR_TO_VARSTR(str, pUidTagInfo->name);
|
||||||
|
// } else { // name is not retrieved during filter
|
||||||
|
// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str);
|
||||||
|
// }
|
||||||
|
STR_TO_VARSTR(str, "zsl");
|
||||||
|
|
||||||
|
colDataSetVal(pColInfo, rowIndex, str, false);
|
||||||
|
} else {
|
||||||
|
STagVal tagVal = {0};
|
||||||
|
tagVal.cid = pExprInfo->base.pParam[0].pCol->colId;
|
||||||
|
if (pUidTagInfo->pTagVal == NULL) {
|
||||||
|
colDataSetNULL(pColInfo, rowIndex);
|
||||||
|
} else {
|
||||||
|
const char* p = pAPI->metaFn.extractTagVal(pUidTagInfo->pTagVal, pColInfo->info.type, &tagVal);
|
||||||
|
|
||||||
|
if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
|
||||||
|
colDataSetNULL(pColInfo, rowIndex);
|
||||||
|
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
colDataSetVal(pColInfo, rowIndex, p, false);
|
||||||
|
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
|
||||||
|
char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
|
||||||
|
varDataSetLen(tmp, tagVal.nData);
|
||||||
|
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
|
||||||
|
colDataSetVal(pColInfo, rowIndex, tmp, false);
|
||||||
|
taosMemoryFree(tmp);
|
||||||
|
} else {
|
||||||
|
colDataSetVal(pColInfo, rowIndex, (const char*)&tagVal.i64, false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aFilterIdxs,
|
||||||
|
SStorageAPI* pAPI) {
|
||||||
|
STagScanInfo* pInfo = pOperator->info;
|
||||||
|
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
||||||
|
|
||||||
|
size_t szTables = taosArrayGetSize(aFilterIdxs);
|
||||||
|
for (int i = 0; i < szTables; ++i) {
|
||||||
|
int32_t idx = *(int32_t*)taosArrayGet(aFilterIdxs, i);
|
||||||
|
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, idx);
|
||||||
|
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||||
|
tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags,
|
||||||
|
SStorageAPI* pAPI) {
|
||||||
|
STagScanInfo* pInfo = pOperator->info;
|
||||||
|
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
|
||||||
|
|
||||||
|
int32_t nTbls = taosArrayGetSize(aUidTags);
|
||||||
|
for (int i = 0; i < nTbls; ++i) {
|
||||||
|
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, i);
|
||||||
|
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
|
||||||
|
|
||||||
|
// refactor later
|
||||||
|
if (fmIsScanPseudoColumnFunc(pExprInfo[j].pExpr->_function.functionId)) {
|
||||||
|
char str[512];
|
||||||
|
|
||||||
|
STR_TO_VARSTR(str, "zsl");
|
||||||
|
colDataSetVal(pDst, (i), str, false);
|
||||||
|
} else { // it is a tag value
|
||||||
|
STagVal val = {0};
|
||||||
|
val.cid = pExprInfo[j].base.pParam[0].pCol->colId;
|
||||||
|
const char* p = pAPI->metaFn.extractTagVal(pUidTagInfo->pTagVal, pDst->info.type, &val);
|
||||||
|
|
||||||
|
char* data = NULL;
|
||||||
|
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL) {
|
||||||
|
data = tTagValToData((const STagVal*)p, false);
|
||||||
|
} else {
|
||||||
|
data = (char*)p;
|
||||||
|
}
|
||||||
|
colDataSetVal(pDst, i, data,
|
||||||
|
(data == NULL) || (pDst->info.type == TSDB_DATA_TYPE_JSON && tTagIsJsonNull(data)));
|
||||||
|
|
||||||
|
if (pDst->info.type != TSDB_DATA_TYPE_JSON && p != NULL && IS_VAR_DATA_TYPE(((const STagVal*)p)->type) &&
|
||||||
|
data != NULL) {
|
||||||
|
taosMemoryFree(data);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
|
STagScanInfo* pInfo = pOperator->info;
|
||||||
|
SSDataBlock* pRes = pInfo->pRes;
|
||||||
|
blockDataCleanup(pRes);
|
||||||
|
int32_t count = 0;
|
||||||
|
|
||||||
|
if (pInfo->pCtbCursor == NULL) {
|
||||||
|
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo));
|
||||||
|
SArray* aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t));
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
while (count < pOperator->resultInfo.capacity) {
|
||||||
|
SMCtbCursor* pCur = pInfo->pCtbCursor;
|
||||||
|
tb_uid_t uid = pAPI->metaFn.ctbCursorNext(pInfo->pCtbCursor);
|
||||||
|
if (uid == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
|
||||||
|
info.pTagVal = taosMemoryMalloc(pCur->vLen);
|
||||||
|
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
|
||||||
|
taosArrayPush(aUidTags, &info);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numTables = taosArrayGetSize(aUidTags);
|
||||||
|
if (numTables == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aFilterIdxs, pAPI);
|
||||||
|
|
||||||
|
tagScanFillResultBlock(pOperator, pRes, aUidTags, aFilterIdxs, pAPI);
|
||||||
|
count = taosArrayGetSize(aFilterIdxs);
|
||||||
|
|
||||||
|
if (count != 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayClearEx(aUidTags, tagScanFreeUidTag);
|
||||||
|
taosArrayClear(aFilterIdxs);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(aFilterIdxs);
|
||||||
|
taosArrayDestroyEx(aUidTags, tagScanFreeUidTag);
|
||||||
|
|
||||||
|
pOperator->resultInfo.totalRows += count;
|
||||||
|
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -2746,6 +3017,9 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
static void destroyTagScanOperatorInfo(void* param) {
|
static void destroyTagScanOperatorInfo(void* param) {
|
||||||
STagScanInfo* pInfo = (STagScanInfo*)param;
|
STagScanInfo* pInfo = (STagScanInfo*)param;
|
||||||
|
if (pInfo->pCtbCursor != NULL) {
|
||||||
|
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1);
|
||||||
|
}
|
||||||
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
pInfo->pRes = blockDataDestroy(pInfo->pRes);
|
||||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||||
pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
|
pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
|
||||||
|
@ -2753,7 +3027,7 @@ static void destroyTagScanOperatorInfo(void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
|
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
|
||||||
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
|
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo) {
|
||||||
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
|
||||||
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
|
||||||
if (pInfo == NULL || pOperator == NULL) {
|
if (pInfo == NULL || pOperator == NULL) {
|
||||||
|
@ -2775,6 +3049,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->pTagCond = pTagCond;
|
||||||
|
pInfo->pTagIndexCond = pTagIndexCond;
|
||||||
|
pInfo->suid = pPhyNode->suid;
|
||||||
|
pInfo->pStorageAPI = &pTaskInfo->storageAPI;
|
||||||
|
|
||||||
pInfo->pTableListInfo = pTableListInfo;
|
pInfo->pTableListInfo = pTableListInfo;
|
||||||
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
|
||||||
pInfo->readHandle = *pReadHandle;
|
pInfo->readHandle = *pReadHandle;
|
||||||
|
@ -2786,8 +3065,9 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
|
||||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
|
__optr_fn_t tagScanNextFn = (pPhyNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScan;
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
|
createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
|
||||||
|
|
||||||
return pOperator;
|
return pOperator;
|
||||||
|
|
||||||
|
|
|
@ -564,7 +564,16 @@ static int32_t physiScanCopy(const SScanPhysiNode* pSrc, SScanPhysiNode* pDst) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t physiTagScanCopy(const STagScanPhysiNode* pSrc, STagScanPhysiNode* pDst) {
|
static int32_t physiTagScanCopy(const STagScanPhysiNode* pSrc, STagScanPhysiNode* pDst) {
|
||||||
return physiScanCopy(pSrc, pDst);
|
COPY_BASE_OBJECT_FIELD(node, physiNodeCopy);
|
||||||
|
CLONE_NODE_LIST_FIELD(pScanCols);
|
||||||
|
CLONE_NODE_LIST_FIELD(pScanPseudoCols);
|
||||||
|
COPY_SCALAR_FIELD(uid);
|
||||||
|
COPY_SCALAR_FIELD(suid);
|
||||||
|
COPY_SCALAR_FIELD(tableType);
|
||||||
|
COPY_OBJECT_FIELD(tableName, sizeof(SName));
|
||||||
|
COPY_SCALAR_FIELD(groupOrderScan);
|
||||||
|
COPY_SCALAR_FIELD(onlyMetaCtbIdx);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhysiNode* pDst) {
|
static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhysiNode* pDst) {
|
||||||
|
|
|
@ -1562,7 +1562,7 @@ static const char* jkScanPhysiPlanTableName = "TableName";
|
||||||
static const char* jkScanPhysiPlanGroupOrderScan = "GroupOrderScan";
|
static const char* jkScanPhysiPlanGroupOrderScan = "GroupOrderScan";
|
||||||
|
|
||||||
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
|
const SScanPhysiNode* pNode = (const SScanPhysiNode*)pObj;
|
||||||
|
|
||||||
int32_t code = physicPlanNodeToJson(pObj, pJson);
|
int32_t code = physicPlanNodeToJson(pObj, pJson);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -1591,7 +1591,7 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
|
static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
|
||||||
STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj;
|
SScanPhysiNode* pNode = (SScanPhysiNode*)pObj;
|
||||||
|
|
||||||
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
|
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -1619,6 +1619,70 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static const char* jkTagScanPhysiOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
|
||||||
|
|
||||||
|
static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
|
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
|
||||||
|
|
||||||
|
int32_t code = physicPlanNodeToJson(pObj, pJson);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodeListToJson(pJson, jkScanPhysiPlanScanCols, pNode->pScanCols);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodeListToJson(pJson, jkScanPhysiPlanScanPseudoCols, pNode->pScanPseudoCols);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanSTableId, pNode->suid);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddBoolToObject(pJson, jkScanPhysiPlanGroupOrderScan, pNode->groupOrderScan);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddBoolToObject(pJson, jkTagScanPhysiOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) {
|
||||||
|
STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj;
|
||||||
|
|
||||||
|
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeList(pJson, jkScanPhysiPlanScanCols, &pNode->pScanCols);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = jsonToNodeList(pJson, jkScanPhysiPlanScanPseudoCols, &pNode->pScanPseudoCols);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanTableId, &pNode->uid);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanSTableId, &pNode->suid);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBoolValue(pJson, jkScanPhysiPlanGroupOrderScan, &pNode->groupOrderScan);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBoolValue(pJson, jkTagScanPhysiOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags";
|
static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags";
|
||||||
static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort";
|
static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort";
|
||||||
|
|
||||||
|
@ -6590,6 +6654,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
case QUERY_NODE_LOGIC_PLAN:
|
case QUERY_NODE_LOGIC_PLAN:
|
||||||
return logicPlanToJson(pObj, pJson);
|
return logicPlanToJson(pObj, pJson);
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||||
|
return physiTableScanNodeToJson(pObj, pJson);
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||||
return physiScanNodeToJson(pObj, pJson);
|
return physiScanNodeToJson(pObj, pJson);
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
|
||||||
|
@ -6908,6 +6973,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
|
||||||
case QUERY_NODE_LOGIC_PLAN:
|
case QUERY_NODE_LOGIC_PLAN:
|
||||||
return jsonToLogicPlan(pJson, pObj);
|
return jsonToLogicPlan(pJson, pObj);
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||||
|
return jsonToPhysiTagScanNode(pJson, pObj);
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
|
||||||
return jsonToPhysiScanNode(pJson, pObj);
|
return jsonToPhysiScanNode(pJson, pObj);
|
||||||
|
|
|
@ -2003,6 +2003,91 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum {
|
||||||
|
PHY_TAG_SCAN_CODE_BASE_NODE = 1,
|
||||||
|
PHY_TAG_SCAN_CODE_SCAN_COLS,
|
||||||
|
PHY_TAG_SCAN_CODE_SCAN_PSEUDO_COLS,
|
||||||
|
PHY_TAG_SCAN_CODE_BASE_UID,
|
||||||
|
PHY_TAG_SCAN_CODE_BASE_SUID,
|
||||||
|
PHY_TAG_SCAN_CODE_BASE_TABLE_TYPE,
|
||||||
|
PHY_TAG_SCAN_CODE_BASE_TABLE_NAME,
|
||||||
|
PHY_TAG_SCAN_CODE_BASE_GROUP_ORDER_SCAN,
|
||||||
|
PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX
|
||||||
|
};
|
||||||
|
|
||||||
|
static int32_t physiTagScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
|
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
|
||||||
|
|
||||||
|
int32_t code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_BASE_NODE, physiNodeToMsg, &pNode->node);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_SCAN_COLS, nodeListToMsg, pNode->pScanCols);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_SCAN_PSEUDO_COLS, nodeListToMsg, pNode->pScanPseudoCols);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeU64(pEncoder, PHY_TAG_SCAN_CODE_BASE_UID, pNode->uid);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeU64(pEncoder, PHY_TAG_SCAN_CODE_BASE_SUID, pNode->suid);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeI8(pEncoder, PHY_TAG_SCAN_CODE_BASE_TABLE_TYPE, pNode->tableType);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_BASE_TABLE_NAME, nameToMsg, &pNode->tableName);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeBool(pEncoder, PHY_TAG_SCAN_CODE_BASE_GROUP_ORDER_SCAN, pNode->groupOrderScan);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeBool(pEncoder, PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX, pNode->onlyMetaCtbIdx);
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t msgToPhysiTagScanNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
|
STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj;
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
STlv* pTlv = NULL;
|
||||||
|
tlvForEach(pDecoder, pTlv, code) {
|
||||||
|
switch (pTlv->type) {
|
||||||
|
case PHY_TAG_SCAN_CODE_BASE_NODE:
|
||||||
|
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node);
|
||||||
|
break;
|
||||||
|
case PHY_TAG_SCAN_CODE_SCAN_COLS:
|
||||||
|
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pScanCols);
|
||||||
|
break;
|
||||||
|
case PHY_TAG_SCAN_CODE_SCAN_PSEUDO_COLS:
|
||||||
|
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pScanPseudoCols);
|
||||||
|
break;
|
||||||
|
case PHY_TAG_SCAN_CODE_BASE_UID:
|
||||||
|
code = tlvDecodeU64(pTlv, &pNode->uid);
|
||||||
|
break;
|
||||||
|
case PHY_TAG_SCAN_CODE_BASE_SUID:
|
||||||
|
code = tlvDecodeU64(pTlv, &pNode->suid);
|
||||||
|
break;
|
||||||
|
case PHY_TAG_SCAN_CODE_BASE_TABLE_TYPE:
|
||||||
|
code = tlvDecodeI8(pTlv, &pNode->tableType);
|
||||||
|
break;
|
||||||
|
case PHY_TAG_SCAN_CODE_BASE_TABLE_NAME:
|
||||||
|
code = tlvDecodeObjFromTlv(pTlv, msgToName, &pNode->tableName);
|
||||||
|
break;
|
||||||
|
case PHY_TAG_SCAN_CODE_BASE_GROUP_ORDER_SCAN:
|
||||||
|
code = tlvDecodeBool(pTlv, &pNode->groupOrderScan);
|
||||||
|
break;
|
||||||
|
case PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX:
|
||||||
|
code = tlvDecodeBool(pTlv, &pNode->onlyMetaCtbIdx);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
PHY_LAST_ROW_SCAN_CODE_SCAN = 1,
|
PHY_LAST_ROW_SCAN_CODE_SCAN = 1,
|
||||||
PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS,
|
PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS,
|
||||||
|
@ -3726,6 +3811,8 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
code = caseWhenNodeToMsg(pObj, pEncoder);
|
code = caseWhenNodeToMsg(pObj, pEncoder);
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||||
|
code = physiTagScanNodeToMsg(pObj, pEncoder);
|
||||||
|
break;
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||||
code = physiScanNodeToMsg(pObj, pEncoder);
|
code = physiScanNodeToMsg(pObj, pEncoder);
|
||||||
break;
|
break;
|
||||||
|
@ -3869,6 +3956,8 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
code = msgToCaseWhenNode(pDecoder, pObj);
|
code = msgToCaseWhenNode(pDecoder, pObj);
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
|
||||||
|
code = msgToPhysiTagScanNode(pDecoder, pObj);
|
||||||
|
break;
|
||||||
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
|
||||||
code = msgToPhysiScanNode(pDecoder, pObj);
|
code = msgToPhysiScanNode(pDecoder, pObj);
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue