From ac531c6d7f4369213f04f37833db6de68aebfa0c Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Wed, 6 Dec 2023 10:45:12 +0800 Subject: [PATCH] enhance: add fileset delimited front end --- include/libs/nodes/plannodes.h | 1 + source/libs/executor/inc/executorInt.h | 2 ++ source/libs/executor/src/scanoperator.c | 19 +++++++------------ source/libs/nodes/src/nodesCloneFuncs.c | 2 ++ source/libs/nodes/src/nodesCodeFuncs.c | 15 ++++++++++++++- source/libs/nodes/src/nodesMsgFuncs.c | 7 ++++++- source/libs/planner/src/planOptimizer.c | 1 + source/libs/planner/src/planPhysiCreater.c | 1 + source/libs/planner/src/planSpliter.c | 2 ++ source/libs/planner/src/planUtil.c | 1 + 10 files changed, 37 insertions(+), 14 deletions(-) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index ce6ea32e2e..b99a97a194 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -433,6 +433,7 @@ typedef struct STableScanPhysiNode { int8_t igExpired; bool assignBlockUid; int8_t igCheckUpdate; + bool filesetDelimited; } STableScanPhysiNode; typedef STableScanPhysiNode STableSeqScanPhysiNode; diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c91701788d..cba26c46b5 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -270,6 +270,7 @@ typedef struct STableScanInfo { bool hasGroupByTag; bool countOnly; // TsdReader readerAPI; + bool filesetDelimited; } STableScanInfo; typedef struct STableMergeScanInfo { @@ -299,6 +300,7 @@ typedef struct STableMergeScanInfo { SSortExecInfo sortExecInfo; bool bNewFileset; bool bOnlyRetrieveBlock; + bool filesetDelimited; } STableMergeScanInfo; typedef struct STagScanFilterContext { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9911366166..7d06f5dab6 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -894,7 +894,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) { if (code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, code); } - + if (pInfo->filesetDelimited) { + pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader); + } if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) { pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity; } @@ -1086,6 +1088,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->countOnly = true; } + pInfo->filesetDelimited = pTableScanNode->filesetDelimited; + taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, optrDefaultBufFn, getTableScannerExecInfo, optrDefaultGetNextExtFn, NULL); @@ -3424,17 +3428,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx); pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock, (void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables); - int r = 1; - FILE* f = fopen("/tmp/duration", "r"); - if (f) { - fscanf(f, "%d", &r); - fclose(f); - } - if (r == 1) { - uInfo("zsl: DURATION ORDER"); + if (pInfo->filesetDelimited) { pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader); - } else { - uInfo("zsl: NO DURATION"); } pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo); @@ -3544,7 +3539,6 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) { if (pInfo->bNewFileset) { stopDurationForGroupTableMergeScan(pOperator); startDurationForGroupTableMergeScan(pOperator); - } else { // Data of this group are all dumped, let's try the next group stopGroupTableMergeScan(pOperator); @@ -3683,6 +3677,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock); pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols); + pInfo->filesetDelimited = pTableScanNode->filesetDelimited; setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = numOfCols; diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index ce23928268..97438b84a6 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -423,6 +423,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) { COPY_SCALAR_FIELD(igLastNull); COPY_SCALAR_FIELD(groupOrderScan); COPY_SCALAR_FIELD(onlyMetaCtbIdx); + COPY_SCALAR_FIELD(filesetDelimited); return TSDB_CODE_SUCCESS; } @@ -650,6 +651,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(igExpired); + COPY_SCALAR_FIELD(filesetDelimited); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index f3087dd5d4..d26cdcf401 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -677,6 +677,7 @@ static const char* jkScanLogicPlanDataRequired = "DataRequired"; static const char* jkScanLogicPlanTagCond = "TagCond"; static const char* jkScanLogicPlanGroupTags = "GroupTags"; static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx"; +static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited"; static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { const SScanLogicNode* pNode = (const SScanLogicNode*)pObj; @@ -721,6 +722,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddBoolToObject(pJson, jkScanLogicPlanOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited); + } return code; } @@ -768,7 +772,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetBoolValue(pJson, jkScanLogicPlanOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx); } - + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited); + } return code; } @@ -1830,6 +1836,7 @@ static const char* jkTableScanPhysiPlanTags = "Tags"; static const char* jkTableScanPhysiPlanSubtable = "Subtable"; static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid"; static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate"; +static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited"; static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj; @@ -1898,6 +1905,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanIgnoreUpdate, pNode->igCheckUpdate); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanFilesetDelimited, pNode->filesetDelimited); + } return code; } @@ -1969,6 +1979,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanIgnoreUpdate, &pNode->igCheckUpdate); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanFilesetDelimited, &pNode->filesetDelimited); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 9804f2075b..d6eb3360aa 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2167,7 +2167,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeValueI8(pEncoder, pNode->igCheckUpdate); } - + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeValueBool(pEncoder, pNode->filesetDelimited); + } return code; } @@ -2246,6 +2248,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj) if (TSDB_CODE_SUCCESS == code) { code = tlvDecodeValueI8(pDecoder, &pNode->igCheckUpdate); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvDecodeValueBool(pDecoder, &pNode->filesetDelimited); + } return code; } diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index e71e18d37d..aa3181e166 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -1400,6 +1400,7 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS pScan->node.outputTsOrder = order; if (TSDB_SUPER_TABLE == pScan->tableType) { pScan->scanType = SCAN_TYPE_TABLE_MERGE; + pScan->filesetDelimited = true; pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL; pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL; } diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 598bce3133..d1fbd0681d 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -622,6 +622,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp pTableScan->igExpired = pScanLogicNode->igExpired; pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate; pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false; + pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited; int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode); if (TSDB_CODE_SUCCESS == code) { diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 43bd8a5589..9b9f701e2b 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -692,6 +692,7 @@ static void stbSplSetTableMergeScan(SLogicNode* pNode) { if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) { SScanLogicNode* pScan = (SScanLogicNode*)pNode; pScan->scanType = SCAN_TYPE_TABLE_MERGE; + pScan->filesetDelimited = true; if (NULL != pScan->pGroupTags) { pScan->groupSort = true; } @@ -1241,6 +1242,7 @@ static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOu SNodeList* pMergeKeys = NULL; if (TSDB_CODE_SUCCESS == code) { pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE; + pMergeScan->filesetDelimited = true; pMergeScan->node.pChildren = pChildren; splSetParent((SLogicNode*)pMergeScan); code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan), diff --git a/source/libs/planner/src/planUtil.c b/source/libs/planner/src/planUtil.c index 1c7c937b7f..2da270e42d 100644 --- a/source/libs/planner/src/planUtil.c +++ b/source/libs/planner/src/planUtil.c @@ -164,6 +164,7 @@ static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel pScan->scanType = SCAN_TYPE_TABLE; } else if (TSDB_SUPER_TABLE == pScan->tableType) { pScan->scanType = SCAN_TYPE_TABLE_MERGE; + pScan->filesetDelimited = true; } if (TSDB_NORMAL_TABLE != pScan->tableType && TSDB_CHILD_TABLE != pScan->tableType) {