enhance: tag cond col list only once and tag scan derive from scan

This commit is contained in:
shenglian zhou 2023-08-15 16:10:54 +08:00
parent 012248b681
commit 84e472ad03
7 changed files with 41 additions and 143 deletions

View File

@ -336,15 +336,7 @@ typedef struct SScanPhysiNode {
} SScanPhysiNode;
typedef struct STagScanPhysiNode {
// SScanPhysiNode scan; //TODO?
SPhysiNode node;
SNodeList* pScanCols;
SNodeList* pScanPseudoCols;
uint64_t uid; // unique id of the table
uint64_t suid;
int8_t tableType;
SName tableName;
bool groupOrderScan;
SScanPhysiNode scan;
bool onlyMetaCtbIdx; //no tbname, tag index not used.
} STagScanPhysiNode;

View File

@ -291,17 +291,17 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
switch (pNode->type) {
case QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN: {
STagScanPhysiNode *pTagScanNode = (STagScanPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->tableName.tname);
EXPLAIN_ROW_NEW(level, EXPLAIN_TAG_SCAN_FORMAT, pTagScanNode->scan.tableName.tname);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
if (pTagScanNode->pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pTagScanNode->pScanPseudoCols->length);
if (pTagScanNode->scan.pScanPseudoCols) {
EXPLAIN_ROW_APPEND(EXPLAIN_PSEUDO_COLUMNS_FORMAT, pTagScanNode->scan.pScanPseudoCols->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
}
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->scan.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
@ -309,11 +309,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
if (verbose) {
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
nodesGetOutputNumFromSlotList(pTagScanNode->node.pOutputDataBlockDesc->pSlots));
nodesGetOutputNumFromSlotList(pTagScanNode->scan.node.pOutputDataBlockDesc->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND_LIMIT(pTagScanNode->node.pLimit);
EXPLAIN_ROW_APPEND_SLIMIT(pTagScanNode->node.pSlimit);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pTagScanNode->scan.node.pOutputDataBlockDesc->outputRowSize);
EXPLAIN_ROW_APPEND_LIMIT(pTagScanNode->scan.node.pLimit);
EXPLAIN_ROW_APPEND_SLIMIT(pTagScanNode->scan.node.pSlimit);
EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));

View File

@ -251,6 +251,12 @@ typedef struct STableMergeScanInfo {
SSortExecInfo sortExecInfo;
} STableMergeScanInfo;
typedef struct STagScanFilterContext {
SHashObj* colHash;
int32_t index;
SArray* cInfoList;
} STagScanFilterContext;
typedef struct STagScanInfo {
SColumnInfo* pCols;
SSDataBlock* pRes;
@ -263,6 +269,7 @@ typedef struct STagScanInfo {
void* pCtbCursor;
SNode* pTagCond;
SNode* pTagIndexCond;
STagScanFilterContext filterCtx;
SArray* aUidTags; // SArray<STUidTagInfo>
SArray* aFilterIdxs; // SArray<int32_t>
SStorageAPI* pStorageAPI;

View File

@ -2719,12 +2719,6 @@ static int32_t tagScanCreateResultData(SDataType* pType, int32_t numOfRows, SSca
return TSDB_CODE_SUCCESS;
}
typedef struct STagScanFilterContext {
SHashObj* colHash;
int32_t index;
SArray* cInfoList;
} STagScanFilterContext;
static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) {
SColumnNode* pSColumnNode = NULL;
if (QUERY_NODE_COLUMN == nodeType((*pNode))) {
@ -2767,17 +2761,11 @@ static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) {
}
static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI) {
static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI, STagScanInfo* pInfo) {
int32_t code = 0;
int32_t numOfTables = taosArrayGetSize(aUidTags);
STagScanFilterContext ctx = {0};
ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&ctx);
SSDataBlock* pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, aUidTags, pVnode, pAPI);
SSDataBlock* pResBlock = createTagValBlockForFilter(pInfo->filterCtx.cInfoList, numOfTables, aUidTags, pVnode, pAPI);
SArray* pBlockList = taosArrayInit(1, POINTER_BYTES);
taosArrayPush(pBlockList, &pResBlock);
@ -2801,8 +2789,7 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aF
blockDataDestroy(pResBlock);
taosArrayDestroy(pBlockList);
taosHashCleanup(ctx.colHash);
taosArrayDestroy(ctx.cInfoList);
}
static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) {
@ -2911,7 +2898,7 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) {
bool ignoreFilterIdx = true;
if (pInfo->pTagCond != NULL) {
ignoreFilterIdx = false;
tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI);
tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, aFilterIdxs, pInfo->readHandle.vnode, pAPI, pInfo);
} else {
ignoreFilterIdx = true;
}
@ -2991,7 +2978,8 @@ static void destroyTagScanOperatorInfo(void* param) {
if (pInfo->pCtbCursor != NULL) {
pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1);
}
taosHashCleanup(pInfo->filterCtx.colHash);
taosArrayDestroy(pInfo->filterCtx.cInfoList);
taosArrayDestroy(pInfo->aFilterIdxs);
taosArrayDestroyEx(pInfo->aUidTags, tagScanFreeUidTag);
@ -3001,8 +2989,9 @@ static void destroyTagScanOperatorInfo(void* param) {
taosMemoryFreeClear(param);
}
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode,
SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pTagScanNode,
STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo) {
SScanPhysiNode* pPhyNode = (STagScanPhysiNode*)pTagScanNode;
STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo));
SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
if (pInfo == NULL || pOperator == NULL) {
@ -3040,11 +3029,16 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi
initResultSizeInfo(&pOperator->resultInfo, 4096);
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
if (pPhyNode->onlyMetaCtbIdx) {
if (pTagScanNode->onlyMetaCtbIdx) {
pInfo->aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo));
pInfo->aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t));
pInfo->filterCtx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK);
pInfo->filterCtx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo));
if (pInfo->pTagCond != NULL) {
nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&pInfo->filterCtx);
}
__optr_fn_t tagScanNextFn = (pPhyNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry;
}
__optr_fn_t tagScanNextFn = (pTagScanNode->onlyMetaCtbIdx) ? doTagScanFromCtbIdx : doTagScanFromMetaEntry;
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn, tagScanNextFn, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL);

View File

@ -564,14 +564,7 @@ static int32_t physiScanCopy(const SScanPhysiNode* pSrc, SScanPhysiNode* pDst) {
}
static int32_t physiTagScanCopy(const STagScanPhysiNode* pSrc, STagScanPhysiNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, physiNodeCopy);
CLONE_NODE_LIST_FIELD(pScanCols);
CLONE_NODE_LIST_FIELD(pScanPseudoCols);
COPY_SCALAR_FIELD(uid);
COPY_SCALAR_FIELD(suid);
COPY_SCALAR_FIELD(tableType);
COPY_OBJECT_FIELD(tableName, sizeof(SName));
COPY_SCALAR_FIELD(groupOrderScan);
COPY_BASE_OBJECT_FIELD(scan, physiScanCopy);
COPY_SCALAR_FIELD(onlyMetaCtbIdx);
return TSDB_CODE_SUCCESS;
}

View File

@ -1630,28 +1630,8 @@ static const char* jkTagScanPhysiOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) {
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
int32_t code = physicPlanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkScanPhysiPlanScanCols, pNode->pScanCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkScanPhysiPlanScanPseudoCols, pNode->pScanPseudoCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableId, pNode->uid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanSTableId, pNode->suid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkScanPhysiPlanTableType, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddObject(pJson, jkScanPhysiPlanTableName, nameToJson, &pNode->tableName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkScanPhysiPlanGroupOrderScan, pNode->groupOrderScan);
}
int32_t code = physiScanNodeToJson(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTagScanPhysiOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx);
}
@ -1661,28 +1641,8 @@ static int32_t physiTagScanNodeToJson(const void* pObj, SJson* pJson) {
static int32_t jsonToPhysiTagScanNode(const SJson* pJson, void* pObj) {
STagScanPhysiNode* pNode = (STagScanPhysiNode*)pObj;
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkScanPhysiPlanScanCols, &pNode->pScanCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkScanPhysiPlanScanPseudoCols, &pNode->pScanPseudoCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanTableId, &pNode->uid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetUBigIntValue(pJson, jkScanPhysiPlanSTableId, &pNode->suid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkScanPhysiPlanTableType, &pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonToObject(pJson, jkScanPhysiPlanTableName, jsonToName, &pNode->tableName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkScanPhysiPlanGroupOrderScan, &pNode->groupOrderScan);
}
int32_t code = jsonToPhysiScanNode(pObj, pJson);
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTagScanPhysiOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx);
}

View File

@ -2004,42 +2004,15 @@ static int32_t msgToPhysiScanNode(STlvDecoder* pDecoder, void* pObj) {
}
enum {
PHY_TAG_SCAN_CODE_BASE_NODE = 1,
PHY_TAG_SCAN_CODE_SCAN_COLS,
PHY_TAG_SCAN_CODE_SCAN_PSEUDO_COLS,
PHY_TAG_SCAN_CODE_BASE_UID,
PHY_TAG_SCAN_CODE_BASE_SUID,
PHY_TAG_SCAN_CODE_BASE_TABLE_TYPE,
PHY_TAG_SCAN_CODE_BASE_TABLE_NAME,
PHY_TAG_SCAN_CODE_BASE_GROUP_ORDER_SCAN,
PHY_TAG_SCAN_CODE_SCAN = 1,
PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX
};
static int32_t physiTagScanNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const STagScanPhysiNode* pNode = (const STagScanPhysiNode*)pObj;
int32_t code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_BASE_NODE, physiNodeToMsg, &pNode->node);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_SCAN_COLS, nodeListToMsg, pNode->pScanCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_SCAN_PSEUDO_COLS, nodeListToMsg, pNode->pScanPseudoCols);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeU64(pEncoder, PHY_TAG_SCAN_CODE_BASE_UID, pNode->uid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeU64(pEncoder, PHY_TAG_SCAN_CODE_BASE_SUID, pNode->suid);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI8(pEncoder, PHY_TAG_SCAN_CODE_BASE_TABLE_TYPE, pNode->tableType);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_BASE_TABLE_NAME, nameToMsg, &pNode->tableName);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_TAG_SCAN_CODE_BASE_GROUP_ORDER_SCAN, pNode->groupOrderScan);
}
int32_t code = tlvEncodeObj(pEncoder, PHY_TAG_SCAN_CODE_SCAN, physiScanNodeToMsg, &pNode->scan);
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeBool(pEncoder, PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX, pNode->onlyMetaCtbIdx);
}
@ -2053,29 +2026,8 @@ static int32_t msgToPhysiTagScanNode(STlvDecoder* pDecoder, void* pObj) {
STlv* pTlv = NULL;
tlvForEach(pDecoder, pTlv, code) {
switch (pTlv->type) {
case PHY_TAG_SCAN_CODE_BASE_NODE:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node);
break;
case PHY_TAG_SCAN_CODE_SCAN_COLS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pScanCols);
break;
case PHY_TAG_SCAN_CODE_SCAN_PSEUDO_COLS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pScanPseudoCols);
break;
case PHY_TAG_SCAN_CODE_BASE_UID:
code = tlvDecodeU64(pTlv, &pNode->uid);
break;
case PHY_TAG_SCAN_CODE_BASE_SUID:
code = tlvDecodeU64(pTlv, &pNode->suid);
break;
case PHY_TAG_SCAN_CODE_BASE_TABLE_TYPE:
code = tlvDecodeI8(pTlv, &pNode->tableType);
break;
case PHY_TAG_SCAN_CODE_BASE_TABLE_NAME:
code = tlvDecodeObjFromTlv(pTlv, msgToName, &pNode->tableName);
break;
case PHY_TAG_SCAN_CODE_BASE_GROUP_ORDER_SCAN:
code = tlvDecodeBool(pTlv, &pNode->groupOrderScan);
case PHY_TAG_SCAN_CODE_SCAN:
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiScanNode, &pNode->scan);
break;
case PHY_TAG_SCAN_CODE_ONLY_META_CTB_IDX:
code = tlvDecodeBool(pTlv, &pNode->onlyMetaCtbIdx);