feat: optimize partition node, replace with sort node
This commit is contained in:
parent
c6c4895f6c
commit
2c4e0fee07
|
@ -365,6 +365,7 @@
|
||||||
#define TK_NK_BIN 605 // bin format data 0b111
|
#define TK_NK_BIN 605 // bin format data 0b111
|
||||||
#define TK_BATCH_SCAN 606
|
#define TK_BATCH_SCAN 606
|
||||||
#define TK_NO_BATCH_SCAN 607
|
#define TK_NO_BATCH_SCAN 607
|
||||||
|
#define TK_SORT_FOR_GROUP 608
|
||||||
|
|
||||||
|
|
||||||
#define TK_NK_NIL 65535
|
#define TK_NK_NIL 65535
|
||||||
|
|
|
@ -279,8 +279,8 @@ typedef struct SSortLogicNode {
|
||||||
SLogicNode node;
|
SLogicNode node;
|
||||||
SNodeList* pSortKeys;
|
SNodeList* pSortKeys;
|
||||||
bool groupSort;
|
bool groupSort;
|
||||||
int64_t maxRows;
|
|
||||||
bool skipPKSortOpt;
|
bool skipPKSortOpt;
|
||||||
|
bool calcGroupId;
|
||||||
} SSortLogicNode;
|
} SSortLogicNode;
|
||||||
|
|
||||||
typedef struct SPartitionLogicNode {
|
typedef struct SPartitionLogicNode {
|
||||||
|
@ -603,6 +603,7 @@ typedef struct SSortPhysiNode {
|
||||||
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
|
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
|
||||||
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
|
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
|
||||||
SNodeList* pTargets;
|
SNodeList* pTargets;
|
||||||
|
bool calcGroupId;
|
||||||
} SSortPhysiNode;
|
} SSortPhysiNode;
|
||||||
|
|
||||||
typedef SSortPhysiNode SGroupSortPhysiNode;
|
typedef SSortPhysiNode SGroupSortPhysiNode;
|
||||||
|
|
|
@ -119,6 +119,7 @@ typedef struct SLeftValueNode {
|
||||||
typedef enum EHintOption {
|
typedef enum EHintOption {
|
||||||
HINT_NO_BATCH_SCAN = 1,
|
HINT_NO_BATCH_SCAN = 1,
|
||||||
HINT_BATCH_SCAN,
|
HINT_BATCH_SCAN,
|
||||||
|
HINT_SORT_FOR_GROUP,
|
||||||
} EHintOption;
|
} EHintOption;
|
||||||
|
|
||||||
typedef struct SHintNode {
|
typedef struct SHintNode {
|
||||||
|
|
|
@ -196,4 +196,24 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t
|
||||||
|
|
||||||
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
|
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
|
||||||
SStorageAPI* pStorageAPI);
|
SStorageAPI* pStorageAPI);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief extract col data according to sort/group cols
|
||||||
|
* @param pSortGroupCols sort keys or group keys, array of SColumnNode
|
||||||
|
* @param [out] pColVals col vals extracted, array of SGroupKeys
|
||||||
|
*/
|
||||||
|
void extractCols(SArray* pSortGroupCols, SArray* pColVals, SSDataBlock* pBlock, int32_t rowIndex);
|
||||||
|
/**
|
||||||
|
* @breif build keys buffer with col values
|
||||||
|
* @retval key length
|
||||||
|
* @param [out] buf buffer to store result key
|
||||||
|
*/
|
||||||
|
int32_t buildKeys(char* buf, SArray* pColVals);
|
||||||
|
|
||||||
|
uint64_t calcGroupId(char *pData, int32_t len);
|
||||||
|
|
||||||
|
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys);
|
||||||
|
|
||||||
|
int32_t extractSortGroupKeysInfo(SArray** pColVals, int32_t* keyLen, char** keyBuf, const SArray* pColList);
|
||||||
|
|
||||||
#endif // TDENGINE_EXECUTIL_H
|
#endif // TDENGINE_EXECUTIL_H
|
||||||
|
|
|
@ -194,6 +194,8 @@ void tsortSetClosed(SSortHandle* pHandle);
|
||||||
void tsortSetSingleTableMerge(SSortHandle* pHandle);
|
void tsortSetSingleTableMerge(SSortHandle* pHandle);
|
||||||
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param);
|
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param);
|
||||||
|
|
||||||
|
int32_t tsortBuildKeys(SArray* pSortGroupCols, SArray* pColVals, STupleHandle* pTuple, char* keyBuf);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -303,6 +303,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
|
||||||
|
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
|
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
|
||||||
|
downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT ||
|
||||||
(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
|
(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
|
||||||
((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
|
((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -2261,3 +2261,132 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t
|
||||||
ts[3] = pWin->skey; // window start key
|
ts[3] = pWin->skey; // window start key
|
||||||
ts[4] = pWin->ekey + delta; // window end key
|
ts[4] = pWin->ekey + delta; // window end key
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void extractCols(SArray* pSortGroupCols, SArray* pColVals, SSDataBlock* pBlock, int32_t rowIndex) {
|
||||||
|
SColumnDataAgg* pColAgg = NULL;
|
||||||
|
|
||||||
|
size_t numOfGroupCols = taosArrayGetSize(pSortGroupCols);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||||
|
SColumn* pCol = (SColumn*) taosArrayGet(pSortGroupCols, i);
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
|
||||||
|
|
||||||
|
// valid range check. todo: return error code.
|
||||||
|
if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlock->pBlockAgg != NULL) {
|
||||||
|
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
|
||||||
|
}
|
||||||
|
|
||||||
|
SGroupKeys* pkey = taosArrayGet(pColVals, i);
|
||||||
|
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
|
||||||
|
pkey->isNull = true;
|
||||||
|
} else {
|
||||||
|
pkey->isNull = false;
|
||||||
|
char* val = colDataGetData(pColInfoData, rowIndex);
|
||||||
|
if (pkey->type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
if (tTagIsJson(val)) {
|
||||||
|
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int32_t dataLen = getJsonValueLen(val);
|
||||||
|
memcpy(pkey->pData, val, dataLen);
|
||||||
|
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
|
memcpy(pkey->pData, val, varDataTLen(val));
|
||||||
|
ASSERT(varDataTLen(val) <= pkey->bytes);
|
||||||
|
} else {
|
||||||
|
memcpy(pkey->pData, val, pkey->bytes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t buildKeys(char* buf, SArray* pColVals) {
|
||||||
|
size_t numOfGroupCols = taosArrayGetSize(pColVals);
|
||||||
|
|
||||||
|
char* isNull = (char*)buf;
|
||||||
|
char* pStart = (char*)buf + sizeof(int8_t) * numOfGroupCols;
|
||||||
|
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||||
|
SGroupKeys* pkey = taosArrayGet(pColVals, i);
|
||||||
|
if (pkey->isNull) {
|
||||||
|
isNull[i] = 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
isNull[i] = 0;
|
||||||
|
if (pkey->type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
int32_t dataLen = getJsonValueLen(pkey->pData);
|
||||||
|
memcpy(pStart, (pkey->pData), dataLen);
|
||||||
|
pStart += dataLen;
|
||||||
|
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
||||||
|
varDataCopy(pStart, pkey->pData);
|
||||||
|
pStart += varDataTLen(pkey->pData);
|
||||||
|
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
|
||||||
|
} else {
|
||||||
|
memcpy(pStart, pkey->pData, pkey->bytes);
|
||||||
|
pStart += pkey->bytes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return (int32_t)(pStart - (char*)buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t calcGroupId(char* pData, int32_t len) {
|
||||||
|
T_MD5_CTX context;
|
||||||
|
tMD5Init(&context);
|
||||||
|
tMD5Update(&context, (uint8_t*)pData, len);
|
||||||
|
tMD5Final(&context);
|
||||||
|
|
||||||
|
// NOTE: only extract the initial 8 bytes of the final MD5 digest
|
||||||
|
uint64_t id = 0;
|
||||||
|
memcpy(&id, context.digest, sizeof(uint64_t));
|
||||||
|
if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
|
||||||
|
return id;
|
||||||
|
}
|
||||||
|
|
||||||
|
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
|
||||||
|
SNode* node;
|
||||||
|
SNodeList* ret = NULL;
|
||||||
|
FOREACH(node, pSortKeys) {
|
||||||
|
SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
|
||||||
|
nodesListMakeAppend(&ret, pSortKey->pExpr);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t extractSortGroupKeysInfo(SArray** pColVals, int32_t* keyLen, char** keyBuf, const SArray* pColList) {
|
||||||
|
*pColVals = taosArrayInit(4, sizeof(SGroupKeys));
|
||||||
|
if ((*pColVals) == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
*keyLen = 0;
|
||||||
|
int32_t numOfGroupCols = taosArrayGetSize(pColList);
|
||||||
|
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
||||||
|
SColumn* pCol = (SColumn*)taosArrayGet(pColList, i);
|
||||||
|
(*keyLen) += pCol->bytes; // actual data + null_flag
|
||||||
|
|
||||||
|
SGroupKeys key = {0};
|
||||||
|
key.bytes = pCol->bytes;
|
||||||
|
key.type = pCol->type;
|
||||||
|
key.isNull = false;
|
||||||
|
key.pData = taosMemoryCalloc(1, pCol->bytes);
|
||||||
|
if (key.pData == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush((*pColVals), &key);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
|
||||||
|
(*keyLen) += nullFlagSize;
|
||||||
|
|
||||||
|
(*keyBuf) = taosMemoryCalloc(1, (*keyLen));
|
||||||
|
if ((*keyBuf) == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
|
@ -635,20 +635,6 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
||||||
return pPage;
|
return pPage;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t calcGroupId(char* pData, int32_t len) {
|
|
||||||
T_MD5_CTX context;
|
|
||||||
tMD5Init(&context);
|
|
||||||
tMD5Update(&context, (uint8_t*)pData, len);
|
|
||||||
tMD5Final(&context);
|
|
||||||
|
|
||||||
// NOTE: only extract the initial 8 bytes of the final MD5 digest
|
|
||||||
uint64_t id = 0;
|
|
||||||
memcpy(&id, context.digest, sizeof(uint64_t));
|
|
||||||
if (0 == id)
|
|
||||||
memcpy(&id, context.digest + 8, sizeof(uint64_t));
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
|
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
|
||||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
|
int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
|
||||||
|
|
|
@ -19,18 +19,29 @@
|
||||||
#include "querytask.h"
|
#include "querytask.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
|
|
||||||
|
typedef struct SSortOpGroupIdCalc {
|
||||||
|
STupleHandle* pSavedTuple;
|
||||||
|
SArray* pSortColsArr;
|
||||||
|
SArray* pSortColVals;
|
||||||
|
char* keyBuf;
|
||||||
|
char* lastKeyBuf;
|
||||||
|
int32_t lastKeysLen;
|
||||||
|
uint64_t lastGroupId;
|
||||||
|
} SSortOpGroupIdCalc;
|
||||||
|
|
||||||
typedef struct SSortOperatorInfo {
|
typedef struct SSortOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||||
SArray* pSortInfo;
|
SArray* pSortInfo;
|
||||||
SSortHandle* pSortHandle;
|
SSortHandle* pSortHandle;
|
||||||
SColMatchInfo matchInfo;
|
SColMatchInfo matchInfo;
|
||||||
int32_t bufPageSize;
|
int32_t bufPageSize;
|
||||||
int64_t startTs; // sort start time
|
int64_t startTs; // sort start time
|
||||||
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
|
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
|
||||||
SLimitInfo limitInfo;
|
SLimitInfo limitInfo;
|
||||||
uint64_t maxTupleLength;
|
uint64_t maxTupleLength;
|
||||||
int64_t maxRows;
|
int64_t maxRows;
|
||||||
|
SSortOpGroupIdCalc* pGroupIdCalc;
|
||||||
} SSortOperatorInfo;
|
} SSortOperatorInfo;
|
||||||
|
|
||||||
static SSDataBlock* doSort(SOperatorInfo* pOperator);
|
static SSDataBlock* doSort(SOperatorInfo* pOperator);
|
||||||
|
@ -40,6 +51,8 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain
|
||||||
static void destroySortOperatorInfo(void* param);
|
static void destroySortOperatorInfo(void* param);
|
||||||
static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys);
|
static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys);
|
||||||
|
|
||||||
|
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc);
|
||||||
|
|
||||||
// todo add limit/offset impl
|
// todo add limit/offset impl
|
||||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
|
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
|
||||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||||
|
@ -78,6 +91,35 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
||||||
|
|
||||||
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
|
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
|
||||||
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
|
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
|
||||||
|
|
||||||
|
if (pSortNode->calcGroupId) {
|
||||||
|
SSortOpGroupIdCalc* pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc));
|
||||||
|
if (!pGroupIdCalc) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
SNodeList* pSortColsNodeArr = makeColsNodeArrFromSortKeys(pSortNode->pSortKeys);
|
||||||
|
if (!pSortColsNodeArr) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pGroupIdCalc->pSortColsArr = makeColumnArrayFromList(pSortColsNodeArr);
|
||||||
|
if (!pGroupIdCalc->pSortColsArr) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
nodesClearList(pSortColsNodeArr);
|
||||||
|
}
|
||||||
|
int32_t keyLen;
|
||||||
|
if (TSDB_CODE_SUCCESS == code)
|
||||||
|
code = extractSortGroupKeysInfo(&pGroupIdCalc->pSortColVals, &keyLen, &pGroupIdCalc->keyBuf,
|
||||||
|
pGroupIdCalc->pSortColsArr);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pGroupIdCalc->lastKeysLen = 0;
|
||||||
|
pGroupIdCalc->lastKeyBuf = taosMemoryCalloc(1, keyLen);
|
||||||
|
if (!pGroupIdCalc->lastKeyBuf) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pInfo->pGroupIdCalc = pGroupIdCalc;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (code != TSDB_CODE_SUCCESS) goto _error;
|
||||||
|
|
||||||
pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder;
|
pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder;
|
||||||
pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder;
|
pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder;
|
||||||
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
|
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
|
||||||
|
@ -129,6 +171,47 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
||||||
pBlock->info.rows += 1;
|
pBlock->info.rows += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief get next tuple with group id attached, all tuples fetched from tsortNextTuple are sorted by group keys
|
||||||
|
* @param pBlock the output block, the group id will be saved in it
|
||||||
|
* @retval NULL if next group tuple arrived, the pre fetched tuple will be saved in pInfo.pSavedTuple
|
||||||
|
*/
|
||||||
|
static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
|
STupleHandle *ret = pInfo->pGroupIdCalc->pSavedTuple;
|
||||||
|
pInfo->pGroupIdCalc->pSavedTuple = NULL;
|
||||||
|
if (!ret) {
|
||||||
|
ret = tsortNextTuple(pHandle);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ret) {
|
||||||
|
int32_t len = tsortBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->pSortColVals, ret,
|
||||||
|
pInfo->pGroupIdCalc->keyBuf);
|
||||||
|
bool newGroup = len != pInfo->pGroupIdCalc->lastKeysLen
|
||||||
|
? true
|
||||||
|
: memcmp(pInfo->pGroupIdCalc->lastKeyBuf, pInfo->pGroupIdCalc->keyBuf, len) != 0;
|
||||||
|
bool emptyBlock = pBlock->info.rows == 0;
|
||||||
|
if (newGroup && !emptyBlock) {
|
||||||
|
// new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return NULL
|
||||||
|
pInfo->pGroupIdCalc->pSavedTuple = ret;
|
||||||
|
ret = NULL;
|
||||||
|
} else {
|
||||||
|
if (newGroup) {
|
||||||
|
ASSERT(emptyBlock);
|
||||||
|
pInfo->pGroupIdCalc->lastKeysLen = len;
|
||||||
|
pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId = calcGroupId(pInfo->pGroupIdCalc->keyBuf, len);
|
||||||
|
TSWAP(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeyBuf);
|
||||||
|
} else if (emptyBlock) {
|
||||||
|
// new block but not new group, assign last group id to it
|
||||||
|
pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId;
|
||||||
|
} else {
|
||||||
|
// not new group and not empty block and ret NOT NULL, just return the tuple
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||||
SSortOperatorInfo* pInfo) {
|
SSortOperatorInfo* pInfo) {
|
||||||
blockDataCleanup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
|
@ -140,8 +223,13 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
||||||
|
|
||||||
blockDataEnsureCapacity(p, capacity);
|
blockDataEnsureCapacity(p, capacity);
|
||||||
|
|
||||||
|
STupleHandle* pTupleHandle;
|
||||||
while (1) {
|
while (1) {
|
||||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
if (pInfo->pGroupIdCalc) {
|
||||||
|
pTupleHandle = nextTupleWithGroupId(pHandle, pInfo, p);
|
||||||
|
} else {
|
||||||
|
pTupleHandle = tsortNextTuple(pHandle);
|
||||||
|
}
|
||||||
if (pTupleHandle == NULL) {
|
if (pTupleHandle == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -168,6 +256,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
||||||
pDataBlock->info.dataLoad = 1;
|
pDataBlock->info.dataLoad = 1;
|
||||||
pDataBlock->info.rows = p->info.rows;
|
pDataBlock->info.rows = p->info.rows;
|
||||||
pDataBlock->info.scanFlag = p->info.scanFlag;
|
pDataBlock->info.scanFlag = p->info.scanFlag;
|
||||||
|
pDataBlock->info.id.groupId = p->info.id.groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDataDestroy(p);
|
blockDataDestroy(p);
|
||||||
|
@ -281,6 +370,7 @@ void destroySortOperatorInfo(void* param) {
|
||||||
tsortDestroySortHandle(pInfo->pSortHandle);
|
tsortDestroySortHandle(pInfo->pSortHandle);
|
||||||
taosArrayDestroy(pInfo->pSortInfo);
|
taosArrayDestroy(pInfo->pSortInfo);
|
||||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||||
|
destroySortOpGroupIdCalc(pInfo->pGroupIdCalc);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -309,6 +399,20 @@ static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNod
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) {
|
||||||
|
if (pCalc) {
|
||||||
|
for (int i = 0; i < taosArrayGetSize(pCalc->pSortColVals); i++) {
|
||||||
|
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pCalc->pSortColVals, i);
|
||||||
|
taosMemoryFree(key.pData);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pCalc->pSortColVals);
|
||||||
|
taosArrayDestroy(pCalc->pSortColsArr);
|
||||||
|
taosMemoryFree(pCalc->keyBuf);
|
||||||
|
taosMemoryFree(pCalc->lastKeyBuf);
|
||||||
|
taosMemoryFree(pCalc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
//=====================================================================================
|
//=====================================================================================
|
||||||
// Group Sort Operator
|
// Group Sort Operator
|
||||||
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
|
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "tsort.h"
|
#include "tsort.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "tsimplehash.h"
|
#include "tsimplehash.h"
|
||||||
|
#include "executil.h"
|
||||||
|
|
||||||
struct STupleHandle {
|
struct STupleHandle {
|
||||||
SSDataBlock* pBlock;
|
SSDataBlock* pBlock;
|
||||||
|
@ -1553,3 +1554,8 @@ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
|
||||||
|
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tsortBuildKeys(SArray* pSortGroupCols, SArray* pColVals, STupleHandle* pTuple, char* keyBuf) {
|
||||||
|
extractCols(pSortGroupCols, pColVals, pTuple->pBlock, pTuple->rowIndex);
|
||||||
|
return buildKeys(keyBuf, pColVals);
|
||||||
|
}
|
||||||
|
|
|
@ -531,6 +531,7 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
|
||||||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||||
CLONE_NODE_LIST_FIELD(pSortKeys);
|
CLONE_NODE_LIST_FIELD(pSortKeys);
|
||||||
COPY_SCALAR_FIELD(groupSort);
|
COPY_SCALAR_FIELD(groupSort);
|
||||||
|
COPY_SCALAR_FIELD(calcGroupId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2327,7 +2327,7 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
|
||||||
static const char* jkSortPhysiPlanExprs = "Exprs";
|
static const char* jkSortPhysiPlanExprs = "Exprs";
|
||||||
static const char* jkSortPhysiPlanSortKeys = "SortKeys";
|
static const char* jkSortPhysiPlanSortKeys = "SortKeys";
|
||||||
static const char* jkSortPhysiPlanTargets = "Targets";
|
static const char* jkSortPhysiPlanTargets = "Targets";
|
||||||
static const char* jkSortPhysiPlanMaxRows = "MaxRows";
|
static const char* jkSortPhysiPlanCalcGroupIds = "CalcGroupIds";
|
||||||
|
|
||||||
static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
|
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
|
||||||
|
@ -2342,6 +2342,9 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets);
|
code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddBoolToObject(pJson, jkSortPhysiPlanCalcGroupIds, pNode->calcGroupId);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2359,6 +2362,9 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets);
|
code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBoolValue(pJson, jkSortPhysiPlanCalcGroupIds, &pNode->calcGroupId);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2746,7 +2746,7 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS };
|
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_CALC_GROUPID };
|
||||||
|
|
||||||
static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
|
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
|
||||||
|
@ -2761,6 +2761,9 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
|
code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeBool(pEncoder, PHY_SORT_CODE_CALC_GROUPID, pNode->calcGroupId);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2784,6 +2787,9 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
case PHY_SORT_CODE_TARGETS:
|
case PHY_SORT_CODE_TARGETS:
|
||||||
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
|
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
|
||||||
break;
|
break;
|
||||||
|
case PHY_SORT_CODE_CALC_GROUPID:
|
||||||
|
code = tlvDecodeBool(pTlv, &pNode->calcGroupId);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -359,6 +359,9 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case HINT_SORT_FOR_GROUP:
|
||||||
|
if (paramNum > 0) return true;
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -421,6 +424,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) {
|
||||||
}
|
}
|
||||||
opt = HINT_NO_BATCH_SCAN;
|
opt = HINT_NO_BATCH_SCAN;
|
||||||
break;
|
break;
|
||||||
|
case TK_SORT_FOR_GROUP:
|
||||||
|
lastComma = false;
|
||||||
|
if (0 != opt || inParamList) {
|
||||||
|
quit = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
opt = HINT_SORT_FOR_GROUP;
|
||||||
|
break;
|
||||||
case TK_NK_LP:
|
case TK_NK_LP:
|
||||||
lastComma = false;
|
lastComma = false;
|
||||||
if (0 == opt || inParamList) {
|
if (0 == opt || inParamList) {
|
||||||
|
|
|
@ -206,6 +206,7 @@ static SKeyword keywordTable[] = {
|
||||||
{"SMALLINT", TK_SMALLINT},
|
{"SMALLINT", TK_SMALLINT},
|
||||||
{"SNODE", TK_SNODE},
|
{"SNODE", TK_SNODE},
|
||||||
{"SNODES", TK_SNODES},
|
{"SNODES", TK_SNODES},
|
||||||
|
{"SORT_FOR_GROUP", TK_SORT_FOR_GROUP},
|
||||||
{"SOFFSET", TK_SOFFSET},
|
{"SOFFSET", TK_SOFFSET},
|
||||||
{"SPLIT", TK_SPLIT},
|
{"SPLIT", TK_SPLIT},
|
||||||
{"STABLE", TK_STABLE},
|
{"STABLE", TK_STABLE},
|
||||||
|
|
|
@ -44,12 +44,13 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
|
||||||
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan);
|
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan);
|
||||||
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList);
|
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList);
|
||||||
|
|
||||||
bool getBatchScanOptionFromHint(SNodeList* pList);
|
bool getBatchScanOptionFromHint(SNodeList* pList);
|
||||||
|
bool getSortForGroupOptHint(SNodeList* pList);
|
||||||
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
|
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
|
||||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
|
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
|
||||||
bool isPartTableAgg(SAggLogicNode* pAgg);
|
bool isPartTableAgg(SAggLogicNode* pAgg);
|
||||||
bool isPartTagAgg(SAggLogicNode* pAgg);
|
bool isPartTagAgg(SAggLogicNode* pAgg);
|
||||||
bool isPartTableWinodw(SWindowLogicNode* pWindow);
|
bool isPartTableWinodw(SWindowLogicNode* pWindow);
|
||||||
|
|
||||||
#define CLONE_LIMIT 1
|
#define CLONE_LIMIT 1
|
||||||
#define CLONE_SLIMIT 1 << 1
|
#define CLONE_SLIMIT 1 << 1
|
||||||
|
|
|
@ -990,7 +990,6 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
|
||||||
if (NULL == pSelect->pWindow) {
|
if (NULL == pSelect->pWindow) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
switch (nodeType(pSelect->pWindow)) {
|
switch (nodeType(pSelect->pWindow)) {
|
||||||
case QUERY_NODE_STATE_WINDOW:
|
case QUERY_NODE_STATE_WINDOW:
|
||||||
return createWindowLogicNodeByState(pCxt, (SStateWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
|
return createWindowLogicNodeByState(pCxt, (SStateWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
|
||||||
|
|
|
@ -2042,6 +2042,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild);
|
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild);
|
||||||
|
TSWAP(pProjectNode->node.pHint, pChild->pHint);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
NODES_CLEAR_LIST(pProjectNode->node.pChildren);
|
NODES_CLEAR_LIST(pProjectNode->node.pChildren);
|
||||||
nodesDestroyNode((SNode*)pProjectNode);
|
nodesDestroyNode((SNode*)pProjectNode);
|
||||||
|
@ -3587,6 +3588,67 @@ static int32_t stableJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
|
||||||
return stbJoinOptRewriteStableJoin(pCxt, pNode, pLogicSubplan);
|
return stbJoinOptRewriteStableJoin(pCxt, pNode, pLogicSubplan);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool partColOptShouldBeOptimized(SLogicNode* pNode) {
|
||||||
|
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||||
|
SPartitionLogicNode* pPartition = (SPartitionLogicNode*)pNode;
|
||||||
|
if (pPartition->node.pParent && nodeType(pPartition->node.pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) return false;
|
||||||
|
if (planOptNodeListHasCol(pPartition->pPartitionKeys)) return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
|
||||||
|
SNode* node;
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SSortLogicNode* pSort = (SSortLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT);
|
||||||
|
if (pSort) {
|
||||||
|
pSort->groupSort = false;
|
||||||
|
TSWAP(pSort->node.pChildren, pPartition->node.pChildren);
|
||||||
|
FOREACH(node, pPartition->pPartitionKeys) {
|
||||||
|
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
||||||
|
if (!pOrder) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
} else {
|
||||||
|
nodesListMakeAppend(&pSort->pSortKeys, (SNode*)pOrder);
|
||||||
|
pOrder->order = ORDER_ASC;
|
||||||
|
pOrder->pExpr = nodesCloneNode(node);
|
||||||
|
if (!pOrder->pExpr) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
pSort->node.pTargets = nodesCloneList(((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0))->pTargets);
|
||||||
|
if (!pSort->node.pTargets) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
nodesDestroyNode((SNode*)pSort);
|
||||||
|
pSort = NULL;
|
||||||
|
}
|
||||||
|
return pSort;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SPartitionLogicNode* pNode = (SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partColOptShouldBeOptimized);
|
||||||
|
if (NULL == pNode) return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
SLogicNode* pRootNode = getLogicNodeRootNode((SLogicNode*)pNode);
|
||||||
|
if (!pRootNode->pHint || !getSortForGroupOptHint(pRootNode->pHint)) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// replace with sort node
|
||||||
|
SSortLogicNode* pSort = partColOptCreateSort(pNode);
|
||||||
|
if (!pSort) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
pSort->calcGroupId = true;
|
||||||
|
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort);
|
||||||
|
}
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
pCxt->optimized = true;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
// clang-format off
|
// clang-format off
|
||||||
static const SOptimizeRule optimizeRuleSet[] = {
|
static const SOptimizeRule optimizeRuleSet[] = {
|
||||||
|
@ -3606,6 +3668,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
|
||||||
{.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize},
|
{.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize},
|
||||||
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
|
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
|
||||||
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
|
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
|
||||||
|
{.pName = "PartitionCols", .optimizeFunc = partitionColsOpt},
|
||||||
};
|
};
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
|
|
|
@ -1749,6 +1749,7 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
||||||
SNodeList* pPrecalcExprs = NULL;
|
SNodeList* pPrecalcExprs = NULL;
|
||||||
SNodeList* pSortKeys = NULL;
|
SNodeList* pSortKeys = NULL;
|
||||||
int32_t code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
|
int32_t code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
|
||||||
|
pSort->calcGroupId = pSortLogicNode->calcGroupId;
|
||||||
|
|
||||||
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
|
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
|
||||||
// push down expression to pOutputDataBlockDesc of child node
|
// push down expression to pOutputDataBlockDesc of child node
|
||||||
|
|
|
@ -244,7 +244,12 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
|
||||||
}
|
}
|
||||||
pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
|
pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
|
||||||
}
|
}
|
||||||
return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
|
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)) {
|
||||||
|
return true;
|
||||||
|
} else if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
|
||||||
|
return stbSplHasMultiTbScan(streamQuery, (SLogicNode*)pChild);
|
||||||
|
}
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool stbSplIsMultiTbScanChild(bool streamQuery, SLogicNode* pNode) {
|
static bool stbSplIsMultiTbScanChild(bool streamQuery, SLogicNode* pNode) {
|
||||||
|
|
|
@ -430,6 +430,17 @@ bool getBatchScanOptionFromHint(SNodeList* pList) {
|
||||||
return batchScan;
|
return batchScan;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool getSortForGroupOptHint(SNodeList* pList) {
|
||||||
|
SNode* pNode;
|
||||||
|
FOREACH(pNode, pList) {
|
||||||
|
SHintNode* pHint = (SHintNode*)pNode;
|
||||||
|
if (pHint->option == HINT_SORT_FOR_GROUP) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
|
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SLogicNode* pCurr = (SLogicNode*)pNode;
|
SLogicNode* pCurr = (SLogicNode*)pNode;
|
||||||
|
|
|
@ -34,6 +34,7 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
|
||||||
|
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 4
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||||
|
|
|
@ -0,0 +1,227 @@
|
||||||
|
import taos
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import socket
|
||||||
|
import os
|
||||||
|
import threading
|
||||||
|
import math
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from util.log import *
|
||||||
|
from util.sql import *
|
||||||
|
from util.cases import *
|
||||||
|
from util.dnodes import *
|
||||||
|
from util.common import *
|
||||||
|
# from tmqCommon import *
|
||||||
|
|
||||||
|
class TDTestCase:
|
||||||
|
def __init__(self):
|
||||||
|
self.vgroups = 4
|
||||||
|
self.ctbNum = 10
|
||||||
|
self.rowsPerTbl = 10000
|
||||||
|
self.duraion = '1h'
|
||||||
|
|
||||||
|
def init(self, conn, logSql, replicaVar=1):
|
||||||
|
self.replicaVar = int(replicaVar)
|
||||||
|
tdLog.debug(f"start to excute {__file__}")
|
||||||
|
tdSql.init(conn.cursor(), False)
|
||||||
|
|
||||||
|
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
|
||||||
|
if dropFlag == 1:
|
||||||
|
tsql.execute("drop database if exists %s"%(dbName))
|
||||||
|
|
||||||
|
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration))
|
||||||
|
tdLog.debug("complete to create database %s"%(dbName))
|
||||||
|
return
|
||||||
|
|
||||||
|
def create_stable(self,tsql, paraDict):
|
||||||
|
colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
|
||||||
|
tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
|
||||||
|
sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString)
|
||||||
|
tdLog.debug("%s"%(sqlString))
|
||||||
|
tsql.execute(sqlString)
|
||||||
|
return
|
||||||
|
|
||||||
|
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
|
||||||
|
for i in range(ctbNum):
|
||||||
|
sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \
|
||||||
|
(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx)
|
||||||
|
tsql.execute(sqlString)
|
||||||
|
|
||||||
|
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
|
||||||
|
return
|
||||||
|
|
||||||
|
def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
|
||||||
|
tdLog.debug("start to insert data ............")
|
||||||
|
tsql.execute("use %s" %dbName)
|
||||||
|
pre_insert = "insert into "
|
||||||
|
sql = pre_insert
|
||||||
|
|
||||||
|
for i in range(ctbNum):
|
||||||
|
rowsBatched = 0
|
||||||
|
sql += " %s%d values "%(ctbPrefix,i)
|
||||||
|
for j in range(rowsPerTbl):
|
||||||
|
if (i < ctbNum/2):
|
||||||
|
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10)
|
||||||
|
else:
|
||||||
|
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10)
|
||||||
|
rowsBatched += 1
|
||||||
|
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
|
||||||
|
tsql.execute(sql)
|
||||||
|
rowsBatched = 0
|
||||||
|
if j < rowsPerTbl - 1:
|
||||||
|
sql = "insert into %s%d values " %(ctbPrefix,i)
|
||||||
|
else:
|
||||||
|
sql = "insert into "
|
||||||
|
if sql != pre_insert:
|
||||||
|
tsql.execute(sql)
|
||||||
|
tdLog.debug("insert data ............ [OK]")
|
||||||
|
return
|
||||||
|
|
||||||
|
def prepareTestEnv(self):
|
||||||
|
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||||
|
paraDict = {'dbName': 'test',
|
||||||
|
'dropFlag': 1,
|
||||||
|
'vgroups': 2,
|
||||||
|
'stbName': 'meters',
|
||||||
|
'colPrefix': 'c',
|
||||||
|
'tagPrefix': 't',
|
||||||
|
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}],
|
||||||
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
|
||||||
|
'ctbPrefix': 't',
|
||||||
|
'ctbStartIdx': 0,
|
||||||
|
'ctbNum': 100,
|
||||||
|
'rowsPerTbl': 10000,
|
||||||
|
'batchNum': 3000,
|
||||||
|
'startTs': 1537146000000,
|
||||||
|
'tsStep': 600000}
|
||||||
|
|
||||||
|
paraDict['vgroups'] = self.vgroups
|
||||||
|
paraDict['ctbNum'] = self.ctbNum
|
||||||
|
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||||
|
|
||||||
|
tdLog.info("create database")
|
||||||
|
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion)
|
||||||
|
|
||||||
|
tdLog.info("create stb")
|
||||||
|
self.create_stable(tsql=tdSql, paraDict=paraDict)
|
||||||
|
|
||||||
|
tdLog.info("create child tables")
|
||||||
|
self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \
|
||||||
|
stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\
|
||||||
|
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"])
|
||||||
|
self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\
|
||||||
|
ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\
|
||||||
|
rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
|
||||||
|
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
|
||||||
|
return
|
||||||
|
|
||||||
|
def check_explain_res_has_row(self, plan_str_expect: str, rows):
|
||||||
|
plan_found = False
|
||||||
|
for row in rows:
|
||||||
|
if str(row).find(plan_str_expect) >= 0:
|
||||||
|
tdLog.debug("plan: [%s] found in: [%s]" % (plan_str_expect, str(row)))
|
||||||
|
plan_found = True
|
||||||
|
break
|
||||||
|
if not plan_found:
|
||||||
|
tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(rows)))
|
||||||
|
|
||||||
|
|
||||||
|
def test_sort_for_partition_hint(self):
|
||||||
|
sql = 'explain select count(*), c1 from meters partition by c1'
|
||||||
|
sql_hint = 'explain select /*+ sort_for_group() */count(*), c1 from meters partition by c1'
|
||||||
|
tdSql.query(sql)
|
||||||
|
self.check_explain_res_has_row("Partition on", tdSql.queryResult)
|
||||||
|
tdSql.query(sql_hint)
|
||||||
|
self.check_explain_res_has_row("Sort", tdSql.queryResult)
|
||||||
|
|
||||||
|
sql = 'explain select count(*), c1, tbname from meters partition by tbname, c1'
|
||||||
|
sql_hint = 'explain select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1'
|
||||||
|
tdSql.query(sql)
|
||||||
|
self.check_explain_res_has_row("Partition on", tdSql.queryResult)
|
||||||
|
tdSql.query(sql_hint)
|
||||||
|
self.check_explain_res_has_row("Sort", tdSql.queryResult)
|
||||||
|
|
||||||
|
sql_interval = 'explain select count(*), c1 from meters partition by c1 interval(1s)'
|
||||||
|
sql_interval_hint = 'explain select /*+ sort_for_group() */ count(*), c1 from meters partition by c1 interval(1s)'
|
||||||
|
tdSql.query(sql_interval)
|
||||||
|
self.check_explain_res_has_row("Partition on", tdSql.queryResult)
|
||||||
|
tdSql.query(sql_interval_hint)
|
||||||
|
self.check_explain_res_has_row("Partition on", tdSql.queryResult)
|
||||||
|
|
||||||
|
def add_order_by(self, sql: str, order_by: str, select_list: str = "*") -> str:
|
||||||
|
return "select %s from (%s)t order by %s" % (select_list, sql, order_by)
|
||||||
|
|
||||||
|
def add_hint(self, sql: str) -> str:
|
||||||
|
return "select /*+ sort_for_group() */ %s" % sql[6:]
|
||||||
|
|
||||||
|
def query_with_time(self, sql):
|
||||||
|
start = datetime.now()
|
||||||
|
tdSql.query(sql)
|
||||||
|
return (datetime.now().timestamp() - start.timestamp()) * 1000
|
||||||
|
|
||||||
|
def query_and_compare_res(self, sql1, sql2):
|
||||||
|
dur = self.query_with_time(sql1)
|
||||||
|
tdLog.debug("sql1 query with time: [%f]" % dur)
|
||||||
|
res1 = tdSql.queryResult
|
||||||
|
dur = self.query_with_time(sql2)
|
||||||
|
tdLog.debug("sql2 query with time: [%f]" % dur)
|
||||||
|
res2 = tdSql.queryResult
|
||||||
|
if res1 is None or res2 is None:
|
||||||
|
tdLog.exit("res1 or res2 is None")
|
||||||
|
if len(res1) != len(res2):
|
||||||
|
tdLog.exit("query and copare failed cause different rows, sql1: [%s], rows: [%d], sql2: [%s], rows: [%d]" % (sql1, len(res1), sql2, len(res2)))
|
||||||
|
for i in range(0, len(res1)):
|
||||||
|
if res1[i] != res2[i]:
|
||||||
|
tdLog.exit("compare failed for row: [%d], sqls: [%s] res1: [%s], sql2 : [%s] res2: [%s]" % (i, sql1, res1[i], sql2, res2[i]))
|
||||||
|
tdLog.debug("sql: [%s] and sql: [%s] have same results, rows: [%d]" % (sql1, sql2, len(res1)))
|
||||||
|
|
||||||
|
def prepare_and_query(self, sqls: [], order_by: str, select_list: str = "*"):
|
||||||
|
for sql in sqls:
|
||||||
|
sql_hint = self.add_order_by(self.add_hint(sql), order_by, select_list)
|
||||||
|
sql = self.add_order_by(sql, order_by, select_list)
|
||||||
|
self.query_and_compare_res(sql, sql_hint)
|
||||||
|
pass
|
||||||
|
|
||||||
|
def test_sort_for_partition_res(self):
|
||||||
|
sqls_par_c1_agg = [
|
||||||
|
"select count(*), c1 from meters partition by c1",
|
||||||
|
"select count(*), min(c2), max(c3), c1 from meters partition by c1",
|
||||||
|
]
|
||||||
|
sqls_par_c1 = [
|
||||||
|
"select * from meters partition by c1"
|
||||||
|
]
|
||||||
|
sqls_par_c1_c2_agg = [
|
||||||
|
"select count(*), c1, c2 from meters partition by c1, c2",
|
||||||
|
"select count(*), c1, c2, min(c4), max(c5), sum(c6) from meters partition by c1, c2",
|
||||||
|
]
|
||||||
|
sqls_par_c1_c2 = [
|
||||||
|
"select * from meters partition by c1, c2"
|
||||||
|
]
|
||||||
|
|
||||||
|
sqls_par_tbname_c1 = [
|
||||||
|
"select count(*), c1 , tbname as tb from meters partition by tbname, c1"
|
||||||
|
]
|
||||||
|
sqls_par_tag_c1 = [
|
||||||
|
"select count(*), c1, t1 from meters partition by t1, c1"
|
||||||
|
]
|
||||||
|
self.prepare_and_query(sqls_par_c1_agg, "c1")
|
||||||
|
self.prepare_and_query(sqls_par_c1, "c1, ts, c2", "c1, ts, c2")
|
||||||
|
self.prepare_and_query(sqls_par_c1_c2_agg, "c1, c2")
|
||||||
|
self.prepare_and_query(sqls_par_c1_c2, "c1, c2, ts, c3", "c1, c2, ts, c3")
|
||||||
|
self.prepare_and_query(sqls_par_tbname_c1, "tb, c1")
|
||||||
|
self.prepare_and_query(sqls_par_tag_c1, "t1, c1")
|
||||||
|
|
||||||
|
def run(self):
|
||||||
|
self.prepareTestEnv()
|
||||||
|
self.test_sort_for_partition_hint()
|
||||||
|
self.test_sort_for_partition_res()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
tdSql.close()
|
||||||
|
tdLog.success(f"{__file__} successfully executed")
|
||||||
|
|
||||||
|
event = threading.Event()
|
||||||
|
|
||||||
|
tdCases.addLinux(__file__, TDTestCase())
|
||||||
|
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -18,6 +18,7 @@ python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
|
||||||
python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
|
python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
|
||||||
python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
|
python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
|
||||||
python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
|
python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
|
||||||
|
python3 ./test.py -f 2-query/partition_by_col.py -Q 4
|
||||||
python3 ./test.py -f 7-tmq/tmqShow.py
|
python3 ./test.py -f 7-tmq/tmqShow.py
|
||||||
python3 ./test.py -f 7-tmq/tmqDropStb.py
|
python3 ./test.py -f 7-tmq/tmqDropStb.py
|
||||||
python3 ./test.py -f 7-tmq/subscribeStb0.py
|
python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||||
|
|
Loading…
Reference in New Issue