feat: add front end for ordered blocks in mem sort

This commit is contained in:
shenglian zhou 2024-02-01 09:38:35 +08:00
parent af422e77c6
commit 7dcb477933
13 changed files with 62 additions and 4 deletions

View File

@ -374,6 +374,7 @@
#define TK_NO_BATCH_SCAN 607
#define TK_SORT_FOR_GROUP 608
#define TK_PARTITION_FIRST 609
#define TK_SEQ_BLOCKS_SORT 610
#define TK_NK_NIL 65535

View File

@ -121,6 +121,7 @@ typedef struct SScanLogicNode {
bool filesetDelimited; // returned blocks delimited by fileset
bool isCountByTag; // true if selectstmt hasCountFunc & part by tag/tbname
SArray* pFuncTypes; // for last, last_row
bool seqBlocksSort; // for table merge scan
} SScanLogicNode;
typedef struct SJoinLogicNode {
@ -439,6 +440,7 @@ typedef struct STableScanPhysiNode {
int8_t igCheckUpdate;
bool filesetDelimited;
bool needCountEmptyTable;
bool seqBlocksSort;
} STableScanPhysiNode;
typedef STableScanPhysiNode STableSeqScanPhysiNode;

View File

@ -128,6 +128,7 @@ typedef enum EHintOption {
HINT_BATCH_SCAN,
HINT_SORT_FOR_GROUP,
HINT_PARTITION_FIRST,
HINT_SEQ_BLOCKS_SORT
} EHintOption;
typedef struct SHintNode {

View File

@ -3716,7 +3716,7 @@ static int32_t stopSubTablesTableMergeScan(STableMergeScanInfo* pInfo) {
return TSDB_CODE_SUCCESS;
}
SSDataBlock* doTableMergeScanSubTables(SOperatorInfo* pOperator) {
SSDataBlock* doTableMergeScanSeqBlocksInMem(SOperatorInfo* pOperator) {
if (pOperator->status == OP_EXEC_DONE) {
return NULL;
}
@ -4304,8 +4304,14 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = numOfCols;
pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScanSubTables, NULL, destroyTableMergeScanOperatorInfo,
optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, NULL);
pOperator->fpSet =
createOperatorFpSet(optrDummyOpenFn,
pTableScanNode->seqBlocksSort ? doTableMergeScanSeqBlocksInMem : doTableMergeScan,
NULL,
destroyTableMergeScanOperatorInfo,
optrDefaultBufFn,
getTableMergeScanExplainExecInfo,
optrDefaultGetNextExtFn, NULL);
pOperator->cost.openCost = 0;
return pOperator;

View File

@ -449,6 +449,7 @@ static int32_t logicScanCopy(const SScanLogicNode* pSrc, SScanLogicNode* pDst) {
COPY_SCALAR_FIELD(filesetDelimited);
COPY_SCALAR_FIELD(isCountByTag);
CLONE_OBJECT_FIELD(pFuncTypes, functParamClone);
COPY_SCALAR_FIELD(seqBlocksSort);
return TSDB_CODE_SUCCESS;
}
@ -679,6 +680,7 @@ static int32_t physiTableScanCopy(const STableScanPhysiNode* pSrc, STableScanPhy
COPY_SCALAR_FIELD(igExpired);
COPY_SCALAR_FIELD(filesetDelimited);
COPY_SCALAR_FIELD(needCountEmptyTable);
COPY_SCALAR_FIELD(seqBlocksSort);
return TSDB_CODE_SUCCESS;
}

View File

@ -682,6 +682,7 @@ static const char* jkScanLogicPlanTagCond = "TagCond";
static const char* jkScanLogicPlanGroupTags = "GroupTags";
static const char* jkScanLogicPlanOnlyMetaCtbIdx = "OnlyMetaCtbIdx";
static const char* jkScanLogicPlanFilesetDelimited = "FilesetDelimited";
static const char* jkScanLogicPlanSeqBlocksSort = "SeqBlocksSort";
static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
const SScanLogicNode* pNode = (const SScanLogicNode*)pObj;
@ -729,6 +730,9 @@ static int32_t logicScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->filesetDelimited);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkScanLogicPlanFilesetDelimited, pNode->seqBlocksSort);
}
return code;
}
@ -779,6 +783,9 @@ static int32_t jsonToLogicScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->filesetDelimited);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkScanLogicPlanFilesetDelimited, &pNode->seqBlocksSort);
}
return code;
}
@ -1872,6 +1879,7 @@ static const char* jkTableScanPhysiPlanAssignBlockUid = "AssignBlockUid";
static const char* jkTableScanPhysiPlanIgnoreUpdate = "IgnoreUpdate";
static const char* jkTableScanPhysiPlanFilesetDelimited = "FilesetDelimited";
static const char* jkTableScanPhysiPlanNeedCountEmptyTable = "NeedCountEmptyTable";
static const char* jkTableScanPhysiPlanSeqBlocksSort = "SeqBlocksSort";
static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
const STableScanPhysiNode* pNode = (const STableScanPhysiNode*)pObj;
@ -1946,6 +1954,9 @@ static int32_t physiTableScanNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, pNode->needCountEmptyTable);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkTableScanPhysiPlanSeqBlocksSort, pNode->seqBlocksSort);
}
return code;
}
@ -2022,6 +2033,9 @@ static int32_t jsonToPhysiTableScanNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanNeedCountEmptyTable, &pNode->needCountEmptyTable);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkTableScanPhysiPlanSeqBlocksSort, &pNode->seqBlocksSort);
}
return code;
}

View File

@ -2185,6 +2185,9 @@ static int32_t physiTableScanNodeInlineToMsg(const void* pObj, STlvEncoder* pEnc
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->needCountEmptyTable);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->seqBlocksSort);
}
return code;
}
@ -2269,6 +2272,9 @@ static int32_t msgToPhysiTableScanNodeInline(STlvDecoder* pDecoder, void* pObj)
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->needCountEmptyTable);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->seqBlocksSort);
}
return code;
}

View File

@ -401,6 +401,9 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt
case HINT_PARTITION_FIRST:
if (paramNum > 0 || hasHint(*ppHintList, HINT_SORT_FOR_GROUP)) return true;
break;
case HINT_SEQ_BLOCKS_SORT:
if (paramNum > 0 || hasHint(*ppHintList, HINT_SEQ_BLOCKS_SORT)) return true;
break;
default:
return true;
}
@ -479,6 +482,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) {
}
opt = HINT_PARTITION_FIRST;
break;
case TK_SEQ_BLOCKS_SORT:
lastComma = false;
if (0 != opt || inParamList) {
quit = true;
break;
}
opt = HINT_SEQ_BLOCKS_SORT;
break;
case TK_NK_LP:
lastComma = false;
if (0 == opt || inParamList) {

View File

@ -199,6 +199,7 @@ static SKeyword keywordTable[] = {
{"SCHEMALESS", TK_SCHEMALESS},
{"SCORES", TK_SCORES},
{"SELECT", TK_SELECT},
{"SEQ_BLOCKS_SORT", TK_SEQ_BLOCKS_SORT},
{"SERVER_STATUS", TK_SERVER_STATUS},
{"SERVER_VERSION", TK_SERVER_VERSION},
{"SESSION", TK_SESSION},

View File

@ -47,6 +47,7 @@ int32_t validateQueryPlan(SPlanContext* pCxt, SQueryPlan* pPlan);
bool getBatchScanOptionFromHint(SNodeList* pList);
bool getSortForGroupOptHint(SNodeList* pList);
bool getSeqBlocksSortHint(SNodeList* pList);
bool getOptHint(SNodeList* pList, EHintOption hint);
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);

View File

@ -501,7 +501,7 @@ static int32_t createScanLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
} else {
nodesDestroyNode((SNode*)pScan);
}
pScan->seqBlocksSort = getSeqBlocksSortHint(pSelect->pHint);
pCxt->hasScan = true;
return code;

View File

@ -651,6 +651,7 @@ static int32_t createTableScanPhysiNode(SPhysiPlanContext* pCxt, SSubplan* pSubp
pTableScan->assignBlockUid = pCxt->pPlanCxt->rSmaQuery ? true : false;
pTableScan->filesetDelimited = pScanLogicNode->filesetDelimited;
pTableScan->needCountEmptyTable = pScanLogicNode->isCountByTag;
pTableScan->seqBlocksSort = pScanLogicNode->seqBlocksSort;
int32_t code = createScanPhysiNodeFinalize(pCxt, pSubplan, pScanLogicNode, (SScanPhysiNode*)pTableScan, pPhyNode);
if (TSDB_CODE_SUCCESS == code) {

View File

@ -455,6 +455,18 @@ bool getOptHint(SNodeList* pList, EHintOption hint) {
return false;
}
bool getSeqBlocksSortOptHint(SNodeList* pList) {
if (!pList) return false;
SNode* pNode;
FOREACH(pNode, pList) {
SHintNode* pHint = (SHintNode*)pNode;
if (pHint->option == HINT_SEQ_BLOCKS_SORT) {
return true;
}
}
return false;
}
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
int32_t code = TSDB_CODE_SUCCESS;
SLogicNode* pCurr = (SLogicNode*)pNode;