Merge pull request #22410 from taosdata/szhou/tag-scan-opt

enhance: tag scan optimization
This commit is contained in:
dapan1121 2023-08-16 08:58:17 +08:00 committed by GitHub
commit df13c6fa3e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 426 additions and 51 deletions

View File

@ -98,6 +98,16 @@ typedef struct SMTbCursor {
int8_t paused;
} SMTbCursor;
typedef struct SMCtbCursor {
SMeta *pMeta;
void *pCur;
tb_uid_t suid;
void *pKey;
void *pVal;
int kLen;
int vLen;
} SMCtbCursor;
typedef struct SRowBuffPos {
void* pRowBuff;
void* pKey;
@ -278,13 +288,15 @@ typedef struct SStoreMeta {
void (*getBasicInfo)(void* pVnode, const char** dbname, int32_t* vgId, int64_t* numOfTables,
int64_t* numOfNormalTables); // vnodeGetInfo(void *pVnode, const char **dbname, int32_t *vgId) &
// metaGetTbNum(SMeta *pMeta) & metaGetNtbNum(SMeta *pMeta);
int64_t (*getNumOfRowsInMem)(void* pVnode);
/**
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetCtbIdListByFilter(void *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
int32_t vnodeGetStbIdList(void *pVnode, int64_t suid, SArray *list);
*/
SMCtbCursor* (*openCtbCursor)(void *pVnode, tb_uid_t uid, int lock);
void (*closeCtbCursor)(SMCtbCursor *pCtbCur, int lock);
tb_uid_t (*ctbCursorNext)(SMCtbCursor* pCur);
} SStoreMeta;
typedef struct SStoreMetaReader {

View File

@ -107,6 +107,7 @@ typedef struct SScanLogicNode {
bool sortPrimaryKey;
bool igLastNull;
bool groupOrderScan;
bool onlyMetaCtbIdx; // for tag scan with no tbname
} SScanLogicNode;
typedef struct SJoinLogicNode {
@ -334,7 +335,11 @@ typedef struct SScanPhysiNode {
bool groupOrderScan;
} SScanPhysiNode;
typedef SScanPhysiNode STagScanPhysiNode;
typedef struct STagScanPhysiNode {
SScanPhysiNode scan;
bool onlyMetaCtbIdx; //no tbname, tag index not used.
} STagScanPhysiNode;
typedef SScanPhysiNode SBlockDistScanPhysiNode;
typedef struct SLastRowScanPhysiNode {

View File

@ -167,7 +167,7 @@ int metaAddIndexToSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
int metaDropIndexFromSTable(SMeta* pMeta, int64_t version, SDropIndexReq* pReq);
int64_t metaGetTimeSeriesNum(SMeta* pMeta);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid, int lock);
SMCtbCursor* metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock);
void metaCloseCtbCursor(SMCtbCursor* pCtbCur, int lock);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
SMStbCursor* metaOpenStbCursor(SMeta* pMeta, tb_uid_t uid);

View File

@ -408,17 +408,9 @@ _err:
return NULL;
}
struct SMCtbCursor {
SMeta *pMeta;
TBC *pCur;
tb_uid_t suid;
void *pKey;
void *pVal;
int kLen;
int vLen;
};
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid, int lock) {
SMCtbCursor *metaOpenCtbCursor(void* pVnode, tb_uid_t uid, int lock) {
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
SMCtbCursor *pCtbCur = NULL;
SCtbIdxKey ctbIdxKey;
int ret = 0;
@ -435,7 +427,7 @@ SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid, int lock) {
metaRLock(pMeta);
}
ret = tdbTbcOpen(pMeta->pCtbIdx, &pCtbCur->pCur, NULL);
ret = tdbTbcOpen(pMeta->pCtbIdx, (TBC**)&pCtbCur->pCur, NULL);
if (ret < 0) {
metaULock(pMeta);
taosMemoryFree(pCtbCur);
@ -1373,7 +1365,7 @@ int32_t metaGetTableTagsByUids(void *pVnode, int64_t suid, SArray *uidList) {
}
int32_t metaGetTableTags(void *pVnode, uint64_t suid, SArray *pUidTagInfo) {
SMCtbCursor *pCur = metaOpenCtbCursor(((SVnode *)pVnode)->pMeta, suid, 1);
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, suid, 1);
// If len > 0 means there already have uids, and we only want the
// tags of the specified tables, of which uid in the uid list. Otherwise, all table tags are retrieved and kept

View File

@ -96,6 +96,10 @@ void initMetadataAPI(SStoreMeta* pMeta) {
pMeta->metaGetCachedTbGroup = metaGetCachedTbGroup;
pMeta->metaPutTbGroupToCache = metaPutTbGroupToCache;
pMeta->openCtbCursor = metaOpenCtbCursor;
pMeta->closeCtbCursor = metaCloseCtbCursor;
pMeta->ctbCursorNext = metaCtbCursorNext;
}
void initTqAPI(SStoreTqReader* pTq) {

View File

@ -440,7 +440,7 @@ int32_t vnodeGetTableList(void* pVnode, int8_t type, SArray* pList) {
}
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, uid, 1);
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, uid, 1);
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
@ -462,7 +462,7 @@ int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bo
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list) {
SVnode *pVnodeObj = pVnode;
SMCtbCursor *pCur = metaOpenCtbCursor(pVnodeObj->pMeta, suid, 1);
SMCtbCursor *pCur = metaOpenCtbCursor(pVnodeObj, suid, 1);
while (1) {
tb_uid_t id = metaCtbCursorNext(pCur);
@ -521,7 +521,7 @@ int32_t vnodeGetStbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bo
}
int32_t vnodeGetCtbNum(SVnode *pVnode, int64_t suid, int64_t *num) {
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode->pMeta, suid, 0);
SMCtbCursor *pCur = metaOpenCtbCursor(pVnode, suid, 0);
if (!pCur) {
return TSDB_CODE_FAILED;
}

View File

@ -291,17 +291,17 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
switch (pNode->type) {
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: {
STagScanPhysiNode *pTagScanNode = (STagScanPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->tableName.tname);
EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->scan.tableName.tname);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
if (pTagScanNode->pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pTagScanNode->pScanPseudoCols->length);
if (pTagScanNode->scan.pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pTagScanNode->scan.pScanPseudoCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
@ -309,11 +309,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pTagScanNode->node.pOutputDataBlockDesc->pSlots));
nodesGetOutputNumFromSlotList(pTagScanNode->scan.node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND_LIMIT(pTagScanNode->node.pLimit);
EXPLAIN_ROW_APPEND_SLIMIT(pTagScanNode->node.pSlimit);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->scan.node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND_LIMIT(pTagScanNode->scan.node.pLimit);
EXPLAIN_ROW_APPEND_SLIMIT(pTagScanNode->scan.node.pSlimit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));

View File

@ -190,4 +190,6 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag);
void getNextTimeWindow(const SInterval* pInterval, STimeWindow* tw, int32_t order);
void getInitialStartTimeWindow(SInterval* pInterval, TSKEY ts, STimeWindow* w, bool ascQuery);
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI);
#endif // TDENGINE_EXECUTIL_H

View File

@ -251,6 +251,12 @@ typedef struct STableMergeScanInfo {
SSortExecInfo sortExecInfo;
} STableMergeScanInfo;
typedef struct STagScanFilterContext {
SHashObj* colHash;
int32_t index;
SArray* cInfoList;
} STagScanFilterContext;
typedef struct STagScanInfo {
SColumnInfo* pCols;
SSDataBlock* pRes;
@ -259,6 +265,14 @@ typedef struct STagScanInfo {
SLimitNode* pSlimit;
SReadHandle readHandle;
STableListInfo* pTableListInfo;
uint64_t suid;
void* pCtbCursor;
SNode* pTagCond;
SNode* pTagIndexCond;
STagScanFilterContext filterCtx;
SArray* aUidTags; // SArray<STUidTagInfo>
SArray* aFilterIdxs; // SArray<int32_t>
SStorageAPI* pStorageAPI;
} STagScanInfo;
typedef enum EStreamScanMode {

View File

@ -81,7 +81,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, STableListInfo* pTableListInfo, SNode* pTagCond, SNode*pTagIndexCond, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo);

View File

@ -47,8 +47,6 @@ static int32_t optimizeTbnameInCondImpl(void* metaHandle, SArray* list, SNode* p
static int32_t getTableList(void* pVnode, SScanPhysiNode* pScanNode, SNode* pTagCond, SNode* pTagIndexCond,
STableListInfo* pListInfo, uint8_t* digest, const char* idstr, SStorageAPI* pStorageAPI);
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI);
static int64_t getLimit(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->limit; }
static int64_t getOffset(const SNode* pLimit) { return NULL == pLimit ? -1 : ((SLimitNode*)pLimit)->offset; }
@ -846,7 +844,7 @@ static int32_t optimizeTbnameInCondImpl(void* pVnode, SArray* pExistedUidList, S
return -1;
}
static SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
SStorageAPI* pStorageAPI) {
SSDataBlock* pResBlock = createDataBlock();
if (pResBlock == NULL) {

View File

@ -370,17 +370,18 @@ SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SR
STableCountScanPhysiNode* pTblCountScanNode = (STableCountScanPhysiNode*)pPhyNode;
pOperator = createTableCountScanOperatorInfo(pHandle, pTblCountScanNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
STagScanPhysiNode* pTagScanPhyNode = (STagScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();
int32_t code = createScanTableListInfo(pScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
pTagIndexCond, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("failed to getTableList, code: %s", tstrerror(code));
return NULL;
if (!pTagScanPhyNode->onlyMetaCtbIdx) {
int32_t code = createScanTableListInfo((SScanPhysiNode*)pTagScanPhyNode, NULL, false, pHandle, pTableListInfo, pTagCond,
pTagIndexCond, pTaskInfo);
if (code != TSDB_CODE_SUCCESS) {
pTaskInfo->code = code;
qError("failed to getTableList, code: %s", tstrerror(code));
return NULL;
}
}
pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
pOperator = createTagScanOperatorInfo(pHandle, pTagScanPhyNode, pTableListInfo, pTagCond, pTagIndexCond, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
STableListInfo* pTableListInfo = tableListCreate();

View File

@ -2688,7 +2688,236 @@ static void doTagScanOneTable(SOperatorInfo* pOperator, const SSDataBlock* pRes,
}
}
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
static void tagScanFreeUidTag(void* p) {
STUidTagInfo* pInfo = p;
if (pInfo->pTagVal != NULL) {
taosMemoryFree(pInfo->pTagVal);
}
}
static int32_t tagScanCreateResultData(SDataType* pType, int32_t numOfRows, SScalarParam* pParam) {
SColumnInfoData* pColumnData = taosMemoryCalloc(1, sizeof(SColumnInfoData));
if (pColumnData == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
pColumnData->info.type = pType->type;
pColumnData->info.bytes = pType->bytes;
pColumnData->info.scale = pType->scale;
pColumnData->info.precision = pType->precision;
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
taosMemoryFree(pColumnData);
return terrno;
}
pParam->columnData = pColumnData;
pParam->colAlloced = true;
return TSDB_CODE_SUCCESS;
}
static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) {
SColumnNode* pSColumnNode = NULL;
if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
pSColumnNode = *(SColumnNode**)pNode;
} else if (QUERY_NODE_FUNCTION == nodeType((*pNode))) {
SFunctionNode* pFuncNode = *(SFunctionNode**)(pNode);
if (pFuncNode->funcType == FUNCTION_TYPE_TBNAME) {
pSColumnNode = (SColumnNode*)nodesMakeNode(QUERY_NODE_COLUMN);
if (NULL == pSColumnNode) {
return DEAL_RES_ERROR;
}
pSColumnNode->colId = -1;
pSColumnNode->colType = COLUMN_TYPE_TBNAME;
pSColumnNode->node.resType.type = TSDB_DATA_TYPE_VARCHAR;
pSColumnNode->node.resType.bytes = TSDB_TABLE_FNAME_LEN - 1 + VARSTR_HEADER_SIZE;
nodesDestroyNode(*pNode);
*pNode = (SNode*)pSColumnNode;
} else {
return DEAL_RES_CONTINUE;
}
} else {
return DEAL_RES_CONTINUE;
}
STagScanFilterContext* pCtx = (STagScanFilterContext*)pContext;
void* data = taosHashGet(pCtx->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId));
if (!data) {
taosHashPut(pCtx->colHash, &pSColumnNode->colId, sizeof(pSColumnNode->colId), pNode, sizeof((*pNode)));
pSColumnNode->slotId = pCtx->index++;
SColumnInfo cInfo = {.colId = pSColumnNode->colId,
.type = pSColumnNode->node.resType.type,
.bytes = pSColumnNode->node.resType.bytes};
taosArrayPush(pCtx->cInfoList, &cInfo);
} else {
SColumnNode* col = *(SColumnNode**)data;
pSColumnNode->slotId = col->slotId;
}
return DEAL_RES_CONTINUE;
}
static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI, STagScanInfo* pInfo) {
int32_t code = 0;
int32_t numOfTables = taosArrayGetSize(aUidTags);
SSDataBlock* pResBlock = createTagValBlockForFilter(pInfo->filterCtx.cInfoList, numOfTables, aUidTags, pVnode, pAPI);
SArray* pBlockList = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(pBlockList, &pResBlock);
SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)};
SScalarParam output = {0};
tagScanCreateResultData(&type, numOfTables, &output);
scalarCalculate(pTagCond, pBlockList, &output);
bool* result = (bool*)output.columnData->pData;
for (int32_t i = 0 ; i < numOfTables; ++i) {
if (result[i]) {
taosArrayPush(aFilterIdxs, &i);
}
}
colDataDestroy(output.columnData);
taosMemoryFreeClear(output.columnData);
blockDataDestroy(pResBlock);
taosArrayDestroy(pBlockList);
}
static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) {
if (fmIsScanPseudoColumnFunc(pExprInfo->pExpr->_function.functionId)) { // tbname
char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0};
// if (pUidTagInfo->name != NULL) {
// STR_TO_VARSTR(str, pUidTagInfo->name);
// } else { // name is not retrieved during filter
// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str);
// }
STR_TO_VARSTR(str, "ctbidx");
colDataSetVal(pColInfo, rowIndex, str, false);
} else {
STagVal tagVal = {0};
tagVal.cid = pExprInfo->base.pParam[0].pCol->colId;
if (pUidTagInfo->pTagVal == NULL) {
colDataSetNULL(pColInfo, rowIndex);
} else {
const char* p = pAPI->metaFn.extractTagVal(pUidTagInfo->pTagVal, pColInfo->info.type, &tagVal);
if (p == NULL || (pColInfo->info.type == TSDB_DATA_TYPE_JSON && ((STag*)p)->nTag == 0)) {
colDataSetNULL(pColInfo, rowIndex);
} else if (pColInfo->info.type == TSDB_DATA_TYPE_JSON) {
colDataSetVal(pColInfo, rowIndex, p, false);
} else if (IS_VAR_DATA_TYPE(pColInfo->info.type)) {
char* tmp = taosMemoryMalloc(tagVal.nData + VARSTR_HEADER_SIZE + 1);
varDataSetLen(tmp, tagVal.nData);
memcpy(tmp + VARSTR_HEADER_SIZE, tagVal.pData, tagVal.nData);
colDataSetVal(pColInfo, rowIndex, tmp, false);
taosMemoryFree(tmp);
} else {
colDataSetVal(pColInfo, rowIndex, (const char*)&tagVal.i64, false);
}
}
}
}
static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aFilterIdxs, bool ignoreFilterIdx,
SStorageAPI* pAPI) {
STagScanInfo* pInfo = pOperator->info;
SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0];
if (!ignoreFilterIdx) {
size_t szTables = taosArrayGetSize(aFilterIdxs);
for (int i = 0; i < szTables; ++i) {
int32_t idx = *(int32_t*)taosArrayGet(aFilterIdxs, i);
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, idx);
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
}
}
} else {
size_t szTables = taosArrayGetSize(aUidTags);
for (int i = 0; i < szTables; ++i) {
STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, i);
for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) {
SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId);
tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode);
}
}
}
return 0;
}
static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
STagScanInfo* pInfo = pOperator->info;
SSDataBlock* pRes = pInfo->pRes;
blockDataCleanup(pRes);
if (pInfo->pCtbCursor == NULL) {
pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1);
}
SArray* aUidTags = pInfo->aUidTags;
SArray* aFilterIdxs = pInfo->aFilterIdxs;
int32_t count = 0;
while (1) {
taosArrayClearEx(aUidTags, tagScanFreeUidTag);
taosArrayClear(aFilterIdxs);
int32_t numTables = 0;
while (numTables < pOperator->resultInfo.capacity) {
SMCtbCursor* pCur = pInfo->pCtbCursor;
tb_uid_t uid = pAPI->metaFn.ctbCursorNext(pInfo->pCtbCursor);
if (uid == 0) {
break;
}
STUidTagInfo info = {.uid = uid, .pTagVal = pCur->pVal};
info.pTagVal = taosMemoryMalloc(pCur->vLen);
memcpy(info.pTagVal, pCur->pVal, pCur->vLen);
taosArrayPush(aUidTags, &info);
++numTables;
}
if (numTables == 0) {
break;
}
bool ignoreFilterIdx = true;
if (pInfo->pTagCond != NULL) {
ignoreFilterIdx = false;
tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI, pInfo);
} else {
ignoreFilterIdx = true;
}
tagScanFillResultBlock(pOperator, pRes, aUidTags, aFilterIdxs, ignoreFilterIdx, pAPI);
count = ignoreFilterIdx ? taosArrayGetSize(aUidTags): taosArrayGetSize(aFilterIdxs);
if (count != 0) {
break;
}
}
pRes->info.rows = count;
pOperator->resultInfo.totalRows += count;
return (pRes->info.rows == 0) ? NULL : pInfo->pRes;
}
static SSDataBlock* doTagScanFromMetaEntry(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
@ -2746,14 +2975,23 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
static void destroyTagScanOperatorInfo(void* param) {
STagScanInfo* pInfo = (STagScanInfo*)param;
if (pInfo->pCtbCursor != NULL) {
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1);
}
taosHashCleanup(pInfo->filterCtx.colHash);
taosArrayDestroy(pInfo->filterCtx.cInfoList);
taosArrayDestroy(pInfo->aFilterIdxs);
taosArrayDestroyEx(pInfo->aUidTags, tagScanFreeUidTag);
pInfo->pRes = blockDataDestroy(pInfo->pRes);
taosArrayDestroy(pInfo->matchInfo.pList);
pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo);
taosMemoryFreeClear(param);
}
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) {
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo) {
SScanPhysiNode* pPhyNode = (SScanPhysiNode*)pTagScanNode;
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -2775,6 +3013,11 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
goto _error;
}
pInfo->pTagCond = pTagCond;
pInfo->pTagIndexCond = pTagIndexCond;
pInfo->suid = pPhyNode->suid;
pInfo->pStorageAPI = &pTaskInfo->storageAPI;
pInfo->pTableListInfo = pTableListInfo;
pInfo->pRes = createDataBlockFromDescNode(pDescNode);
pInfo->readHandle = *pReadHandle;
@ -2786,8 +3029,18 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
if (pTagScanNode->onlyMetaCtbIdx) {
pInfo->aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo));
pInfo->aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t));
pInfo->filterCtx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
pInfo->filterCtx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
if (pInfo->pTagCond != NULL) {
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&pInfo->filterCtx);
}
}
__optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry;
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);
return pOperator;

View File

@ -564,7 +564,9 @@ static int32_t physiScanCopy(const SScanPhysiNode* pSrc, SScanPhysiNode* pDst) {
}
static int32_t physiTagScanCopy(const STagScanPhysiNode* pSrc, STagScanPhysiNode* pDst) {
return physiScanCopy(pSrc, pDst);
COPY_BASE_OBJECT_FIELD(scan, physiScanCopy);
COPY_SCALAR_FIELD(onlyMetaCtbIdx);
return TSDB_CODE_SUCCESS;
}
static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhysiNode* pDst) {

View File

@ -662,6 +662,7 @@ static const char* jkScanLogicPlanDynamicScanFuncs = "DynamicScanFuncs";
static const char* jkScanLogicPlanDataRequired = "DataRequired";
static const char* jkScanLogicPlanTagCond = "TagCond";
static const char* jkScanLogicPlanGroupTags = "GroupTags";
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
@ -703,7 +704,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkScanLogicPlanGroupTags, pNode->pGroupTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx);
}
return code;
}
@ -748,7 +751,10 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkScanLogicPlanGroupTags, &pNode->pGroupTags);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkScanLogicPlanOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx);
}
return code;
}
@ -1564,7 +1570,7 @@ static const char* jkScanPhysiPlanTableName = "TableName";
static const char* jkScanPhysiPlanGroupOrderScan = "GroupOrderScan";
static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
const SScanPhysiNode* pNode = (const SScanPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
@ -1593,7 +1599,7 @@ static int32_t physiScanNodeToJson(const void* pObj, SJson* pJson) {
}
static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj;
SScanPhysiNode* pNode = (SScanPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
@ -1621,6 +1627,30 @@ static int32_t jsonToPhysiScanNode(const SJson* pJson, void* pObj) {
return code;
}
static const char* jkTagScanPhysiOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) {
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
int32_t code = physiScanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTagScanPhysiOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx);
}
return code;
}
static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) {
STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj;
int32_t code = jsonToPhysiScanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTagScanPhysiOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx);
}
return code;
}
static const char* jkLastRowScanPhysiPlanGroupTags = "GroupTags";
static const char* jkLastRowScanPhysiPlanGroupSort = "GroupSort";
@ -6592,6 +6622,7 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) {
case QUERY_NODE_LOGIC_PLAN:
return logicPlanToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return physiTagScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
return physiScanNodeToJson(pObj, pJson);
case QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN:
@ -6910,6 +6941,7 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) {
case QUERY_NODE_LOGIC_PLAN:
return jsonToLogicPlan(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
return jsonToPhysiTagScanNode(pJson, pObj);
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN:
return jsonToPhysiScanNode(pJson, pObj);

View File

@ -2003,6 +2003,43 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum {
PHY_TAG_SCAN_CODE_SCAN = 1,
PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX
};
static int32_t physiTagScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_SCAN, physiScanNodeToMsg, &pNode->scan);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX, pNode->onlyMetaCtbIdx);
}
return code;
}
static int32_t msgToPhysiTagScanNode(STlvDecoder* pDecoder, void* pObj) {
STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj;
int32_t code = TSDB_CODE_SUCCESS;
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case PHY_TAG_SCAN_CODE_SCAN:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiScanNode, &pNode->scan);
break;
case PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX:
code = tlvDecodeBool(pTlv, &pNode->onlyMetaCtbIdx);
break;
default:
break;
}
}
return code;
}
enum {
PHY_LAST_ROW_SCAN_CODE_SCAN = 1,
PHY_LAST_ROW_SCAN_CODE_GROUP_TAGS,
@ -3726,6 +3763,8 @@ static int32_t specificNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
code = caseWhenNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
code = physiTagScanNodeToMsg(pObj, pEncoder);
break;
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
code = physiScanNodeToMsg(pObj, pEncoder);
break;
@ -3869,6 +3908,8 @@ static int32_t msgToSpecificNode(STlvDecoder* pDecoder, void* pObj) {
code = msgToCaseWhenNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN:
code = msgToPhysiTagScanNode(pDecoder, pObj);
break;
case QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN:
code = msgToPhysiScanNode(pDecoder, pObj);
break;

View File

@ -199,9 +199,10 @@ TEST_F(NodesCloneTest, physiScan) {
ASSERT_EQ(nodeType(pSrc), nodeType(pDst));
STagScanPhysiNode* pSrcNode = (STagScanPhysiNode*)pSrc;
STagScanPhysiNode* pDstNode = (STagScanPhysiNode*)pDst;
ASSERT_EQ(pSrcNode->uid, pDstNode->uid);
ASSERT_EQ(pSrcNode->suid, pDstNode->suid);
ASSERT_EQ(pSrcNode->tableType, pDstNode->tableType);
ASSERT_EQ(pSrcNode->scan.uid, pDstNode->scan.uid);
ASSERT_EQ(pSrcNode->scan.suid, pDstNode->scan.suid);
ASSERT_EQ(pSrcNode->scan.tableType, pDstNode->scan.tableType);
ASSERT_EQ(pSrcNode->onlyMetaCtbIdx, pDstNode->onlyMetaCtbIdx);
});
std::unique_ptr<SNode, void (*)(SNode*)> srcNode(nullptr, nodesDestroyNode);

View File

@ -2679,6 +2679,9 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
}
nodesDestroyNode((SNode*)pAgg);
tagScanOptCloneAncestorSlimit((SLogicNode*)pScanNode);
pScanNode->onlyMetaCtbIdx = false;
pCxt->optimized = true;
return TSDB_CODE_SUCCESS;
}

View File

@ -511,6 +511,20 @@ static int32_t createSimpleScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSub
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, pScan, pPhyNode);
}
static int32_t createTagScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
SPhysiNode** pPhyNode) {
STagScanPhysiNode* pScan =
(STagScanPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pScanLogicNode, QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN);
if (NULL == pScan) {
return TSDB_CODE_OUT_OF_MEMORY;
}
vgroupInfoToNodeAddr(pScanLogicNode->pVgroupList->vgroups, &pSubplan->execNode);
pScan->onlyMetaCtbIdx = pScanLogicNode->onlyMetaCtbIdx;
return createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pScan, pPhyNode);
}
static int32_t createLastRowScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan, SScanLogicNode* pScanLogicNode,
SPhysiNode** pPhyNode) {
SLastRowScanPhysiNode* pScan =
@ -646,6 +660,7 @@ static int32_t createScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubplan,
pCxt->hasScan = true;
switch (pScanLogicNode->scanType) {
case SCAN_TYPE_TAG:
return createTagScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
case SCAN_TYPE_BLOCK_INFO:
return createSimpleScanPhysiNode(pCxt, pSubplan, pScanLogicNode, pPhyNode);
case SCAN_TYPE_TABLE_COUNT: