enhance: add fileset delimited front end

This commit is contained in:
shenglian zhou 2023-12-06 10:45:12 +08:00
parent a2e7c78d27
commit ac531c6d7f
10 changed files with 37 additions and 14 deletions

View File

@ -433,6 +433,7 @@ typedef struct STableScanPhysiNode {
int8_t igExpired;
bool assignBlockUid;
int8_t igCheckUpdate;
bool filesetDelimited;
} STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode;

View File

@ -270,6 +270,7 @@ typedef struct STableScanInfo {
bool hasGroupByTag;
bool countOnly;
// TsdReader readerAPI;
bool filesetDelimited;
} STableScanInfo;
typedef struct STableMergeScanInfo {
@ -299,6 +300,7 @@ typedef struct STableMergeScanInfo {
SSortExecInfo sortExecInfo;
bool bNewFileset;
bool bOnlyRetrieveBlock;
bool filesetDelimited;
} STableMergeScanInfo;
typedef struct STagScanFilterContext {

View File

@ -894,7 +894,9 @@ static SSDataBlock* groupSeqTableScan(SOperatorInfo* pOperator) {
if (code != TSDB_CODE_SUCCESS) {
T_LONG_JMP(pTaskInfo->env, code);
}
if (pInfo->filesetDelimited) {
pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader);
}
if (pInfo->pResBlock->info.capacity > pOperator->resultInfo.capacity) {
pOperator->resultInfo.capacity = pInfo->pResBlock->info.capacity;
}
@ -1086,6 +1088,8 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode,
pInfo->countOnly = true;
}
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false);
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo,
optrDefaultBufFn, getTableScannerExecInfo, optrDefaultGetNextExtFn, NULL);
@ -3424,17 +3428,8 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
STableKeyInfo* startKeyInfo = tableListGetInfo(pInfo->base.pTableListInfo, tableStartIdx);
pAPI->tsdReader.tsdReaderOpen(pHandle->vnode, &pInfo->base.cond, startKeyInfo, numOfTable, pInfo->pReaderBlock,
(void**)&pInfo->base.dataReader, GET_TASKID(pTaskInfo), false, &pInfo->mSkipTables);
int r = 1;
FILE* f = fopen("/tmp/duration", "r");
if (f) {
fscanf(f, "%d", &r);
fclose(f);
}
if (r == 1) {
uInfo("zsl: DURATION ORDER");
if (pInfo->filesetDelimited) {
pAPI->tsdReader.tsdSetFilesetDelimited(pInfo->base.dataReader);
} else {
uInfo("zsl: NO DURATION");
}
pAPI->tsdReader.tsdSetSetNotifyCb(pInfo->base.dataReader, tableMergeScanTsdbNotifyCb, pInfo);
@ -3544,7 +3539,6 @@ SSDataBlock* doTableMergeScan(SOperatorInfo* pOperator) {
if (pInfo->bNewFileset) {
stopDurationForGroupTableMergeScan(pOperator);
startDurationForGroupTableMergeScan(pOperator);
} else {
// Data of this group are all dumped, let's try the next group
stopGroupTableMergeScan(pOperator);
@ -3683,6 +3677,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
uint32_t nCols = taosArrayGetSize(pInfo->pResBlock->pDataBlock);
pInfo->bufPageSize = getProperSortPageSize(rowSize, nCols);
pInfo->filesetDelimited = pTableScanNode->filesetDelimited;
setOperatorInfo(pOperator, "TableMergeScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols;

View File

@ -423,6 +423,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(igLastNull);
COPY_SCALAR_FIELD(groupOrderScan);
COPY_SCALAR_FIELD(onlyMetaCtbIdx);
COPY_SCALAR_FIELD(filesetDelimited);
return TSDB_CODE_SUCCESS;
}
@ -650,6 +651,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
COPY_SCALAR_FIELD(triggerType);
COPY_SCALAR_FIELD(watermark);
COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(filesetDelimited);
return TSDB_CODE_SUCCESS;
}

View File

@ -677,6 +677,7 @@ static const char* jkScanLogicPlanDataRequired = "DataRequired";
static const char* jkScanLogicPlanTagCond = "TagCond";
static const char* jkScanLogicPlanGroupTags = "GroupTags";
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
@ -721,6 +722,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanOnlyMetaCtbIdx, pNode->onlyMetaCtbIdx);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited);
}
return code;
}
@ -768,7 +772,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkScanLogicPlanOnlyMetaCtbIdx, &pNode->onlyMetaCtbIdx);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
}
return code;
}
@ -1830,6 +1836,7 @@ static const char* jkTableScanPhysiPlanTags = "Tags";
static const char* jkTableScanPhysiPlanSubtable = "Subtable";
static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
@ -1898,6 +1905,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkTableScanPhysiPlanIgnoreUpdate, pNode->igCheckUpdate);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanFilesetDelimited, pNode->filesetDelimited);
}
return code;
}
@ -1969,6 +1979,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetTinyIntValue(pJson, jkTableScanPhysiPlanIgnoreUpdate, &pNode->igCheckUpdate);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanFilesetDelimited, &pNode->filesetDelimited);
}
return code;
}

View File

@ -2167,7 +2167,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI8(pEncoder, pNode->igCheckUpdate);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->filesetDelimited);
}
return code;
}
@ -2246,6 +2248,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI8(pDecoder, &pNode->igCheckUpdate);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->filesetDelimited);
}
return code;
}

View File

@ -1400,6 +1400,7 @@ static int32_t sortPriKeyOptApply(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
pScan->node.outputTsOrder = order;
if (TSDB_SUPER_TABLE == pScan->tableType) {
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
pScan->filesetDelimited = true;
pScan->node.resultDataOrder = DATA_ORDER_LEVEL_GLOBAL;
pScan->node.requireDataOrder = DATA_ORDER_LEVEL_GLOBAL;
}

View File

@ -622,6 +622,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan->igExpired = pScanLogicNode->igExpired;
pTableScan->igCheckUpdate = pScanLogicNode->igCheckUpdate;
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
if (TSDB_CODE_SUCCESS == code) {

View File

@ -692,6 +692,7 @@ static void stbSplSetTableMergeScan(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
SScanLogicNode* pScan = (SScanLogicNode*)pNode;
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
pScan->filesetDelimited = true;
if (NULL != pScan->pGroupTags) {
pScan->groupSort = true;
}
@ -1241,6 +1242,7 @@ static int32_t stbSplCreateMergeScanNode(SScanLogicNode* pScan, SLogicNode** pOu
SNodeList* pMergeKeys = NULL;
if (TSDB_CODE_SUCCESS == code) {
pMergeScan->scanType = SCAN_TYPE_TABLE_MERGE;
pMergeScan->filesetDelimited = true;
pMergeScan->node.pChildren = pChildren;
splSetParent((SLogicNode*)pMergeScan);
code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pMergeScan),

View File

@ -164,6 +164,7 @@ static int32_t adjustScanDataRequirement(SScanLogicNode* pScan, EDataOrderLevel
pScan->scanType = SCAN_TYPE_TABLE;
} else if (TSDB_SUPER_TABLE == pScan->tableType) {
pScan->scanType = SCAN_TYPE_TABLE_MERGE;
pScan->filesetDelimited = true;
}
if (TSDB_NORMAL_TABLE != pScan->tableType && TSDB_CHILD_TABLE != pScan->tableType) {