From 7c39bc989083ce501dd6df5bd980b4edaab07057 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Fri, 11 Aug 2023 13:50:41 +0800 Subject: [PATCH] fix: some minor modifications --- source/libs/executor/inc/executorInt.h | 1 + source/libs/executor/src/scanoperator.c | 77 ++++++++++++++----------- 2 files changed, 44 insertions(+), 34 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index cadf367481..2b25feabb3 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -263,6 +263,7 @@ typedef struct STagScanInfo { void* pCtbCursor; SNode* pTagCond; SNode* pTagIndexCond; + SStorageAPI* pStorageAPI; } STagScanInfo; typedef enum EStreamScanMode { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 42c488edbe..5e0eb71c13 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2767,9 +2767,10 @@ static EDealRes tagScanRewriteTagColumn(SNode** pNode, void* pContext) { } -static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aUidTagIdxs, void* pVnode, SStorageAPI* pAPI) { +static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aFilterIdxs, void* pVnode, SStorageAPI* pAPI) { int32_t code = 0; int32_t numOfTables = taosArrayGetSize(aUidTags); + STagScanFilterContext ctx = {0}; ctx.colHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_SMALLINT), false, HASH_NO_LOCK); ctx.cInfoList = taosArrayInit(4, sizeof(SColumnInfo)); @@ -2777,48 +2778,42 @@ static void tagScanFilterByTagCond(SArray* aUidTags, SNode* pTagCond, SArray* aU nodesRewriteExprPostOrder(&pTagCond, tagScanRewriteTagColumn, (void*)&ctx); SSDataBlock* pResBlock = createTagValBlockForFilter(ctx.cInfoList, numOfTables, aUidTags, pVnode, pAPI); - if (pResBlock == NULL) { - - } SArray* pBlockList = taosArrayInit(1, POINTER_BYTES); taosArrayPush(pBlockList, &pResBlock); SDataType type = {.type = TSDB_DATA_TYPE_BOOL, .bytes = sizeof(bool)}; SScalarParam output = {0}; - code = tagScanCreateResultData(&type, numOfTables, &output); - if (code != TSDB_CODE_SUCCESS) { + tagScanCreateResultData(&type, numOfTables, &output); - } - - code = scalarCalculate(pTagCond, pBlockList, &output); - if (code != TSDB_CODE_SUCCESS) { - } + scalarCalculate(pTagCond, pBlockList, &output); bool* result = (bool*)output.columnData->pData; for (int32_t i = 0 ; i < numOfTables; ++i) { if (result[i]) { - taosArrayPush(aUidTagIdxs, &i); + taosArrayPush(aFilterIdxs, &i); } } - taosHashCleanup(ctx.colHash); - taosArrayDestroy(ctx.cInfoList); - blockDataDestroy(pResBlock); - taosArrayDestroy(pBlockList); colDataDestroy(output.columnData); taosMemoryFreeClear(output.columnData); + + blockDataDestroy(pResBlock); + taosArrayDestroy(pBlockList); + + taosHashCleanup(ctx.colHash); + taosArrayDestroy(ctx.cInfoList); } static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo* pExprInfo, SColumnInfoData* pColInfo, int rowIndex, const SStorageAPI* pAPI, void* pVnode) { if (fmIsScanPseudoColumnFunc(pExprInfo->pExpr->_function.functionId)) { // tbname char str[TSDB_TABLE_FNAME_LEN + VARSTR_HEADER_SIZE] = {0}; +// if (pUidTagInfo->name != NULL) { +// STR_TO_VARSTR(str, pUidTagInfo->name); +// } else { // name is not retrieved during filter +// pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str); +// } STR_TO_VARSTR(str, "zsl"); - // if (pUidTagInfo->name != NULL) { - // STR_TO_VARSTR(str, pUidTagInfo->name); - // } else { // name is not retrieved during filter - // pAPI->metaFn.getTableNameByUid(pVnode, pUidTagInfo->uid, str); - // } colDataSetVal(pColInfo, rowIndex, str, false); } else { @@ -2846,13 +2841,15 @@ static void tagScanFillOneCellWithTag(const STUidTagInfo* pUidTagInfo, SExprInfo } } -static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aUidTagIdxs, +static int32_t tagScanFillResultBlock(SOperatorInfo* pOperator, SSDataBlock* pRes, SArray* aUidTags, SArray* aFilterIdxs, SStorageAPI* pAPI) { STagScanInfo* pInfo = pOperator->info; SExprInfo* pExprInfo = &pOperator->exprSupp.pExprInfo[0]; - for (int i = 0; i < taosArrayGetSize(aUidTagIdxs); ++i) { - STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, *(int32_t*)taosArrayGet(aUidTagIdxs, i)); + size_t szTables = taosArrayGetSize(aFilterIdxs); + for (int i = 0; i < szTables; ++i) { + int32_t idx = *(int32_t*)taosArrayGet(aFilterIdxs, i); + STUidTagInfo* pUidTagInfo = taosArrayGet(aUidTags, idx); for (int32_t j = 0; j < pOperator->exprSupp.numOfExprs; ++j) { SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, pExprInfo[j].base.resSchema.slotId); tagScanFillOneCellWithTag(pUidTagInfo, &pExprInfo[j], pDst, i, pAPI, pInfo->readHandle.vnode); @@ -2920,8 +2917,10 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { if (pInfo->pCtbCursor == NULL) { pInfo->pCtbCursor = pAPI->metaFn.openCtbCursor(pInfo->readHandle.vnode, pInfo->suid, 1); } + SArray* aUidTags = taosArrayInit(pOperator->resultInfo.capacity, sizeof(STUidTagInfo)); - SArray* aUidTagIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t)); + SArray* aFilterIdxs = taosArrayInit(pOperator->resultInfo.capacity, sizeof(int32_t)); + while (1) { while (count < pOperator->resultInfo.capacity) { SMCtbCursor* pCur = pInfo->pCtbCursor; @@ -2936,20 +2935,26 @@ static SSDataBlock* doTagScanFromCtbIdx(SOperatorInfo* pOperator) { } int32_t numTables = taosArrayGetSize(aUidTags); - if (numTables != 0 && pInfo->pTagCond != NULL) { - tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aUidTagIdxs, pAPI); - } - tagScanFillResultBlock(pOperator, pRes, aUidTags, aUidTagIdxs, pAPI); - count = taosArrayGetSize(aUidTagIdxs); - - if (taosArrayGetSize(aUidTagIdxs) != 0) { + if (numTables == 0) { break; } + + tagScanFilterByTagCond(aUidTags, pInfo->pTagCond, pInfo->readHandle.vnode, aFilterIdxs, pAPI); + + tagScanFillResultBlock(pOperator, pRes, aUidTags, aFilterIdxs, pAPI); + count = taosArrayGetSize(aFilterIdxs); + + if (count != 0) { + break; + } + taosArrayClearEx(aUidTags, tagScanFreeUidTag); - taosArrayClear(aUidTagIdxs); + taosArrayClear(aFilterIdxs); } - taosArrayDestroy(aUidTagIdxs); + + taosArrayDestroy(aFilterIdxs); taosArrayDestroyEx(aUidTags, tagScanFreeUidTag); + pOperator->resultInfo.totalRows += count; return (pRes->info.rows == 0) ? NULL : pInfo->pRes; } @@ -3012,6 +3017,9 @@ static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { static void destroyTagScanOperatorInfo(void* param) { STagScanInfo* pInfo = (STagScanInfo*)param; + if (pInfo->pCtbCursor != NULL) { + pInfo->pStorageAPI->metaFn.closeCtbCursor(pInfo->pCtbCursor, 1); + } pInfo->pRes = blockDataDestroy(pInfo->pRes); taosArrayDestroy(pInfo->matchInfo.pList); pInfo->pTableListInfo = tableListDestroy(pInfo->pTableListInfo); @@ -3043,6 +3051,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi pInfo->pTagCond = pTagCond; pInfo->pTagIndexCond = pTagIndexCond; pInfo->pTableListInfo = pTableListInfo; + pInfo->pStorageAPI = &pTaskInfo->storageAPI; pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->readHandle = *pReadHandle; pInfo->curPos = 0;