Merge pull request #22629 from taosdata/feat/TD-25620
feat: optimize partition node, replace with sort node
This commit is contained in:
commit
8ef01584ee
|
@ -28,6 +28,7 @@ typedef struct SBlockOrderInfo {
|
|||
bool nullFirst;
|
||||
int32_t order;
|
||||
int32_t slotId;
|
||||
void* compFn;
|
||||
SColumnInfoData* pColData;
|
||||
} SBlockOrderInfo;
|
||||
|
||||
|
@ -82,6 +83,15 @@ static FORCE_INLINE bool colDataIsNull_s(const SColumnInfoData* pColumnInfoData,
|
|||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool colDataIsNull_t(const SColumnInfoData* pColumnInfoData, uint32_t row, bool isVarType) {
|
||||
if (!pColumnInfoData->hasNull) return false;
|
||||
if (isVarType) {
|
||||
return colDataIsNull_var(pColumnInfoData, row);
|
||||
} else {
|
||||
return pColumnInfoData->nullbitmap ? colDataIsNull_f(pColumnInfoData->nullbitmap, row) : false;
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE bool colDataIsNull(const SColumnInfoData* pColumnInfoData, uint32_t totalRows, uint32_t row,
|
||||
SColumnDataAgg* pColAgg) {
|
||||
if (!pColumnInfoData->hasNull) {
|
||||
|
@ -210,6 +220,10 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
|
|||
size_t blockDataGetSerialMetaSize(uint32_t numOfCols);
|
||||
|
||||
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||
/**
|
||||
* @brief find how many rows already in order start from first row
|
||||
*/
|
||||
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||
|
||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
|
||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||
|
|
|
@ -366,6 +366,7 @@
|
|||
#define TK_NK_BIN 605 // bin format data 0b111
|
||||
#define TK_BATCH_SCAN 606
|
||||
#define TK_NO_BATCH_SCAN 607
|
||||
#define TK_SORT_FOR_GROUP 608
|
||||
|
||||
|
||||
#define TK_NK_NIL 65535
|
||||
|
|
|
@ -136,6 +136,8 @@ typedef struct SAggLogicNode {
|
|||
bool hasTimeLineFunc;
|
||||
bool onlyHasKeepOrderFunc;
|
||||
bool hasGroupKeyOptimized;
|
||||
bool isGroupTb;
|
||||
bool isPartTb; // true if partition keys has tbname
|
||||
} SAggLogicNode;
|
||||
|
||||
typedef struct SProjectLogicNode {
|
||||
|
@ -221,6 +223,7 @@ typedef struct SMergeLogicNode {
|
|||
int32_t srcGroupId;
|
||||
bool groupSort;
|
||||
bool ignoreGroupId;
|
||||
bool inputWithGroupId;
|
||||
} SMergeLogicNode;
|
||||
|
||||
typedef enum EWindowType {
|
||||
|
@ -263,6 +266,7 @@ typedef struct SWindowLogicNode {
|
|||
int8_t igExpired;
|
||||
int8_t igCheckUpdate;
|
||||
EWindowAlgorithm windowAlgo;
|
||||
bool isPartTb;
|
||||
} SWindowLogicNode;
|
||||
|
||||
typedef struct SFillLogicNode {
|
||||
|
@ -279,8 +283,9 @@ typedef struct SSortLogicNode {
|
|||
SLogicNode node;
|
||||
SNodeList* pSortKeys;
|
||||
bool groupSort;
|
||||
int64_t maxRows;
|
||||
bool skipPKSortOpt;
|
||||
bool calcGroupId;
|
||||
bool excludePkCol; // exclude PK ts col when calc group id
|
||||
} SSortLogicNode;
|
||||
|
||||
typedef struct SPartitionLogicNode {
|
||||
|
@ -288,6 +293,10 @@ typedef struct SPartitionLogicNode {
|
|||
SNodeList* pPartitionKeys;
|
||||
SNodeList* pTags;
|
||||
SNode* pSubtable;
|
||||
|
||||
bool needBlockOutputTsOrder; // if true, partition output block will have ts order maintained
|
||||
int32_t pkTsColId;
|
||||
uint64_t pkTsColTbId;
|
||||
} SPartitionLogicNode;
|
||||
|
||||
typedef enum ESubplanType {
|
||||
|
@ -527,6 +536,7 @@ typedef struct SMergePhysiNode {
|
|||
int32_t srcGroupId;
|
||||
bool groupSort;
|
||||
bool ignoreGroupId;
|
||||
bool inputWithGroupId;
|
||||
} SMergePhysiNode;
|
||||
|
||||
typedef struct SWindowPhysiNode {
|
||||
|
@ -603,6 +613,8 @@ typedef struct SSortPhysiNode {
|
|||
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
|
||||
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
|
||||
SNodeList* pTargets;
|
||||
bool calcGroupId;
|
||||
bool excludePkCol;
|
||||
} SSortPhysiNode;
|
||||
|
||||
typedef SSortPhysiNode SGroupSortPhysiNode;
|
||||
|
@ -612,6 +624,9 @@ typedef struct SPartitionPhysiNode {
|
|||
SNodeList* pExprs; // these are expression list of partition_by_clause
|
||||
SNodeList* pPartitionKeys;
|
||||
SNodeList* pTargets;
|
||||
|
||||
bool needBlockOutputTsOrder;
|
||||
int32_t tsSlotId;
|
||||
} SPartitionPhysiNode;
|
||||
|
||||
typedef struct SStreamPartitionPhysiNode {
|
||||
|
|
|
@ -119,6 +119,7 @@ typedef struct SLeftValueNode {
|
|||
typedef enum EHintOption {
|
||||
HINT_NO_BATCH_SCAN = 1,
|
||||
HINT_BATCH_SCAN,
|
||||
HINT_SORT_FOR_GROUP,
|
||||
} EHintOption;
|
||||
|
||||
typedef struct SHintNode {
|
||||
|
|
|
@ -933,7 +933,13 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
|
|||
return 0;
|
||||
}
|
||||
}
|
||||
__compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
|
||||
|
||||
__compar_fn_t fn;
|
||||
if (pOrder->compFn) {
|
||||
fn = pOrder->compFn;
|
||||
} else {
|
||||
fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
|
||||
}
|
||||
|
||||
int ret = fn(left1, right1);
|
||||
if (ret == 0) {
|
||||
|
@ -1099,6 +1105,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
|||
for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
|
||||
struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
|
||||
pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
|
||||
pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order);
|
||||
}
|
||||
|
||||
terrno = 0;
|
||||
|
@ -2509,3 +2516,20 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList
|
|||
int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
|
||||
}
|
||||
|
||||
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
||||
if (!pDataBlock || !pOrderInfo) return 0;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
|
||||
SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i);
|
||||
pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId);
|
||||
pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order);
|
||||
}
|
||||
SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock};
|
||||
int32_t rowIdx = 0, nextRowIdx = 1;
|
||||
for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) {
|
||||
if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
return nextRowIdx;
|
||||
}
|
||||
|
|
|
@ -196,4 +196,22 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t
|
|||
|
||||
SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, SArray* pUidTagList, void* pVnode,
|
||||
SStorageAPI* pStorageAPI);
|
||||
|
||||
/**
|
||||
* @brief build a tuple into keyBuf
|
||||
* @param [out] keyBuf the output buf
|
||||
* @param [in] pSortGroupCols the cols to build
|
||||
* @param [in] pBlock block the tuple in
|
||||
*/
|
||||
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex);
|
||||
|
||||
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pDataBlock,
|
||||
int32_t rowIndex);
|
||||
|
||||
uint64_t calcGroupId(char *pData, int32_t len);
|
||||
|
||||
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys);
|
||||
|
||||
int32_t extractKeysLen(const SArray* keys);
|
||||
|
||||
#endif // TDENGINE_EXECUTIL_H
|
||||
|
|
|
@ -194,6 +194,16 @@ void tsortSetClosed(SSortHandle* pHandle);
|
|||
void tsortSetSingleTableMerge(SSortHandle* pHandle);
|
||||
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param);
|
||||
|
||||
/**
|
||||
* @brief comp the tuple with keyBuf, if not equal, new keys will be built in keyBuf, newLen will be stored in keyLen
|
||||
* @param [in] pSortCols cols to comp and build
|
||||
* @param [in, out] pass in the old keys, if comp not equal, new keys will be built in it.
|
||||
* @param [in, out] keyLen the old keysLen, if comp not equal, new keysLen will be stored in it.
|
||||
* @param [in] the tuple to comp with
|
||||
* @retval 0 if comp equal, 1 if not
|
||||
*/
|
||||
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -303,6 +303,7 @@ static int32_t createDataBlockForEmptyInput(SOperatorInfo* pOperator, SSDataBloc
|
|||
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_PARTITION ||
|
||||
downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_SORT ||
|
||||
(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
|
||||
((STableScanInfo*)downstream->info)->hasGroupByTag == true)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -2261,3 +2261,104 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t
|
|||
ts[3] = pWin->skey; // window start key
|
||||
ts[4] = pWin->ekey + delta; // window end key
|
||||
}
|
||||
|
||||
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) {
|
||||
SColumnDataAgg* pColAgg = NULL;
|
||||
const char* isNull = oldkeyBuf;
|
||||
const char* p = oldkeyBuf + sizeof(int8_t) * pSortGroupCols->size;
|
||||
|
||||
for (int32_t i = 0; i < pSortGroupCols->size; ++i) {
|
||||
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
|
||||
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
|
||||
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
|
||||
|
||||
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
|
||||
if (isNull[i] != 1) return 1;
|
||||
} else {
|
||||
if (isNull[i] != 0) return 1;
|
||||
const char* val = colDataGetData(pColInfoData, rowIndex);
|
||||
if (pCol->type == TSDB_DATA_TYPE_JSON) {
|
||||
int32_t len = getJsonValueLen(val);
|
||||
if (memcmp(p, val, len) != 0) return 1;
|
||||
p += len;
|
||||
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
|
||||
p += varDataTLen(val);
|
||||
} else {
|
||||
if (0 != memcmp(p, val, pCol->bytes)) return 1;
|
||||
p += pCol->bytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock,
|
||||
int32_t rowIndex) {
|
||||
uint32_t colNum = pSortGroupCols->size;
|
||||
SColumnDataAgg* pColAgg = NULL;
|
||||
char* isNull = keyBuf;
|
||||
char* p = keyBuf + sizeof(int8_t) * colNum;
|
||||
|
||||
for (int32_t i = 0; i < colNum; ++i) {
|
||||
const SColumn* pCol = (SColumn*)TARRAY_GET_ELEM(pSortGroupCols, i);
|
||||
const SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, pCol->slotId);
|
||||
if (pCol->slotId > pBlock->pDataBlock->size) continue;
|
||||
|
||||
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
|
||||
|
||||
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
|
||||
isNull[i] = 1;
|
||||
} else {
|
||||
isNull[i] = 0;
|
||||
const char* val = colDataGetData(pColInfoData, rowIndex);
|
||||
if (pCol->type == TSDB_DATA_TYPE_JSON) {
|
||||
int32_t len = getJsonValueLen(val);
|
||||
memcpy(p, val, len);
|
||||
p += len;
|
||||
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
varDataCopy(p, val);
|
||||
p += varDataTLen(val);
|
||||
} else {
|
||||
memcpy(p, val, pCol->bytes);
|
||||
p += pCol->bytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
return (int32_t)(p - keyBuf);
|
||||
}
|
||||
|
||||
uint64_t calcGroupId(char* pData, int32_t len) {
|
||||
T_MD5_CTX context;
|
||||
tMD5Init(&context);
|
||||
tMD5Update(&context, (uint8_t*)pData, len);
|
||||
tMD5Final(&context);
|
||||
|
||||
// NOTE: only extract the initial 8 bytes of the final MD5 digest
|
||||
uint64_t id = 0;
|
||||
memcpy(&id, context.digest, sizeof(uint64_t));
|
||||
if (0 == id) memcpy(&id, context.digest + 8, sizeof(uint64_t));
|
||||
return id;
|
||||
}
|
||||
|
||||
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
|
||||
SNode* node;
|
||||
SNodeList* ret = NULL;
|
||||
FOREACH(node, pSortKeys) {
|
||||
SOrderByExprNode* pSortKey = (SOrderByExprNode*)node;
|
||||
nodesListMakeAppend(&ret, pSortKey->pExpr);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t extractKeysLen(const SArray* keys) {
|
||||
int32_t len = 0;
|
||||
int32_t keyNum = taosArrayGetSize(keys);
|
||||
for (int32_t i = 0; i < keyNum; ++i) {
|
||||
SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
|
||||
len += pCol->bytes;
|
||||
}
|
||||
len += sizeof(int8_t) * keyNum; //null flag
|
||||
return len;
|
||||
}
|
||||
|
|
|
@ -56,6 +56,10 @@ typedef struct SPartitionOperatorInfo {
|
|||
int32_t groupIndex; // group index
|
||||
int32_t pageIndex; // page index of current group
|
||||
SExprSupp scalarSup;
|
||||
|
||||
int32_t remainRows;
|
||||
int32_t orderedRows;
|
||||
SArray* pOrderInfoArr;
|
||||
} SPartitionOperatorInfo;
|
||||
|
||||
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
|
||||
|
@ -635,20 +639,6 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
|
|||
return pPage;
|
||||
}
|
||||
|
||||
uint64_t calcGroupId(char* pData, int32_t len) {
|
||||
T_MD5_CTX context;
|
||||
tMD5Init(&context);
|
||||
tMD5Update(&context, (uint8_t*)pData, len);
|
||||
tMD5Final(&context);
|
||||
|
||||
// NOTE: only extract the initial 8 bytes of the final MD5 digest
|
||||
uint64_t id = 0;
|
||||
memcpy(&id, context.digest, sizeof(uint64_t));
|
||||
if (0 == id)
|
||||
memcpy(&id, context.digest + 8, sizeof(uint64_t));
|
||||
return id;
|
||||
}
|
||||
|
||||
int32_t* setupColumnOffset(const SSDataBlock* pBlock, int32_t rowCapacity) {
|
||||
size_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
|
||||
int32_t* offset = taosMemoryCalloc(numOfCols, sizeof(int32_t));
|
||||
|
@ -699,37 +689,49 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
|||
SPartitionOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
SDataGroupInfo* pGroupInfo =
|
||||
(pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
|
||||
if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
|
||||
// try next group data
|
||||
++pInfo->groupIndex;
|
||||
if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) {
|
||||
setOperatorCompleted(pOperator);
|
||||
clearPartitionOperator(pInfo);
|
||||
return NULL;
|
||||
if (pInfo->remainRows == 0) {
|
||||
blockDataCleanup(pInfo->binfo.pRes);
|
||||
SDataGroupInfo* pGroupInfo =
|
||||
(pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
|
||||
if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
|
||||
// try next group data
|
||||
++pInfo->groupIndex;
|
||||
if (pInfo->groupIndex >= taosArrayGetSize(pInfo->sortedGroupArray)) {
|
||||
setOperatorCompleted(pOperator);
|
||||
clearPartitionOperator(pInfo);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
|
||||
pInfo->pageIndex = 0;
|
||||
}
|
||||
|
||||
pGroupInfo = taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex);
|
||||
pInfo->pageIndex = 0;
|
||||
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
|
||||
void* page = getBufPage(pInfo->pBuf, *pageId);
|
||||
if (page == NULL) {
|
||||
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
}
|
||||
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
|
||||
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
|
||||
|
||||
pInfo->pageIndex += 1;
|
||||
releaseBufPage(pInfo->pBuf, page);
|
||||
pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId;
|
||||
pInfo->binfo.pRes->info.dataLoad = 1;
|
||||
pInfo->orderedRows = 0;
|
||||
}
|
||||
|
||||
int32_t* pageId = taosArrayGet(pGroupInfo->pPageList, pInfo->pageIndex);
|
||||
void* page = getBufPage(pInfo->pBuf, *pageId);
|
||||
if (page == NULL) {
|
||||
qError("failed to get buffer, code:%s, %s", tstrerror(terrno), GET_TASKID(pTaskInfo));
|
||||
T_LONG_JMP(pTaskInfo->env, terrno);
|
||||
if (pInfo->pOrderInfoArr) {
|
||||
pInfo->binfo.pRes->info.rows += pInfo->remainRows;
|
||||
blockDataTrimFirstRows(pInfo->binfo.pRes, pInfo->orderedRows);
|
||||
pInfo->orderedRows = blockDataGetSortedRows(pInfo->binfo.pRes, pInfo->pOrderInfoArr);
|
||||
pInfo->remainRows = pInfo->binfo.pRes->info.rows - pInfo->orderedRows;
|
||||
pInfo->binfo.pRes->info.rows = pInfo->orderedRows;
|
||||
}
|
||||
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->rowCapacity);
|
||||
blockDataFromBuf1(pInfo->binfo.pRes, page, pInfo->rowCapacity);
|
||||
|
||||
pInfo->pageIndex += 1;
|
||||
releaseBufPage(pInfo->pBuf, page);
|
||||
|
||||
pInfo->binfo.pRes->info.dataLoad = 1;
|
||||
blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
|
||||
pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId;
|
||||
|
||||
pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
|
||||
return pInfo->binfo.pRes;
|
||||
|
@ -746,7 +748,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
|||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
blockDataCleanup(pRes);
|
||||
return buildPartitionResult(pOperator);
|
||||
}
|
||||
|
||||
|
@ -829,6 +830,7 @@ static void destroyPartitionOperatorInfo(void* param) {
|
|||
|
||||
cleanupExprSupp(&pInfo->scalarSup);
|
||||
destroyDiskbasedBuf(pInfo->pBuf);
|
||||
taosArrayDestroy(pInfo->pOrderInfoArr);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
@ -846,6 +848,17 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
|||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
|
||||
pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys);
|
||||
|
||||
if (pPartNode->needBlockOutputTsOrder) {
|
||||
SBlockOrderInfo order = {.order = ORDER_ASC, .pColData = NULL, .nullFirst = false, .slotId = pPartNode->tsSlotId};
|
||||
pInfo->pOrderInfoArr = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||
if (!pInfo->pOrderInfoArr) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
pTaskInfo->code = terrno;
|
||||
goto _error;
|
||||
}
|
||||
taosArrayPush(pInfo->pOrderInfoArr, &order);
|
||||
}
|
||||
|
||||
if (pPartNode->pExprs != NULL) {
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
|
||||
|
|
|
@ -19,18 +19,28 @@
|
|||
#include "querytask.h"
|
||||
#include "tdatablock.h"
|
||||
|
||||
typedef struct SSortOpGroupIdCalc {
|
||||
STupleHandle* pSavedTuple;
|
||||
SArray* pSortColsArr;
|
||||
char* keyBuf;
|
||||
int32_t lastKeysLen; // default to be 0
|
||||
uint64_t lastGroupId;
|
||||
bool excludePKCol;
|
||||
} SSortOpGroupIdCalc;
|
||||
|
||||
typedef struct SSortOperatorInfo {
|
||||
SOptrBasicInfo binfo;
|
||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||
SArray* pSortInfo;
|
||||
SSortHandle* pSortHandle;
|
||||
SColMatchInfo matchInfo;
|
||||
int32_t bufPageSize;
|
||||
int64_t startTs; // sort start time
|
||||
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
|
||||
SLimitInfo limitInfo;
|
||||
uint64_t maxTupleLength;
|
||||
int64_t maxRows;
|
||||
SOptrBasicInfo binfo;
|
||||
uint32_t sortBufSize; // max buffer size for in-memory sort
|
||||
SArray* pSortInfo;
|
||||
SSortHandle* pSortHandle;
|
||||
SColMatchInfo matchInfo;
|
||||
int32_t bufPageSize;
|
||||
int64_t startTs; // sort start time
|
||||
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
|
||||
SLimitInfo limitInfo;
|
||||
uint64_t maxTupleLength;
|
||||
int64_t maxRows;
|
||||
SSortOpGroupIdCalc* pGroupIdCalc;
|
||||
} SSortOperatorInfo;
|
||||
|
||||
static SSDataBlock* doSort(SOperatorInfo* pOperator);
|
||||
|
@ -40,6 +50,8 @@ static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain
|
|||
static void destroySortOperatorInfo(void* param);
|
||||
static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys);
|
||||
|
||||
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc);
|
||||
|
||||
// todo add limit/offset impl
|
||||
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
|
||||
SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo));
|
||||
|
@ -78,6 +90,34 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
|||
|
||||
pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode);
|
||||
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
|
||||
|
||||
if (pSortNode->calcGroupId) {
|
||||
int32_t keyLen;
|
||||
SSortOpGroupIdCalc* pGroupIdCalc = pInfo->pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc));
|
||||
if (!pGroupIdCalc) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
SNodeList* pSortColsNodeArr = makeColsNodeArrFromSortKeys(pSortNode->pSortKeys);
|
||||
if (!pSortColsNodeArr) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pGroupIdCalc->pSortColsArr = makeColumnArrayFromList(pSortColsNodeArr);
|
||||
if (!pGroupIdCalc->pSortColsArr) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
nodesClearList(pSortColsNodeArr);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
// PK ts col should always at last, see partColOptCreateSort
|
||||
if (pSortNode->excludePkCol) taosArrayPop(pGroupIdCalc->pSortColsArr);
|
||||
keyLen = extractKeysLen(pGroupIdCalc->pSortColsArr);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pGroupIdCalc->lastKeysLen = 0;
|
||||
pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen);
|
||||
if (!pGroupIdCalc->keyBuf) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) goto _error;
|
||||
|
||||
pInfo->binfo.inputTsOrder = pSortNode->node.inputTsOrder;
|
||||
pInfo->binfo.outputTsOrder = pSortNode->node.outputTsOrder;
|
||||
initLimitInfo(pSortNode->node.pLimit, pSortNode->node.pSlimit, &pInfo->limitInfo);
|
||||
|
@ -129,6 +169,52 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
|||
pBlock->info.rows += 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
|
||||
* @param [in, out] pBlock the output block, the group id will be saved in it
|
||||
* @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
|
||||
* @retval NULL if no more tuples
|
||||
*/
|
||||
static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||
STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
|
||||
if (!retTuple) {
|
||||
retTuple = tsortNextTuple(pHandle);
|
||||
}
|
||||
|
||||
if (retTuple) {
|
||||
int32_t newGroup;
|
||||
if (pInfo->pGroupIdCalc->pSavedTuple) {
|
||||
newGroup = true;
|
||||
pInfo->pGroupIdCalc->pSavedTuple = NULL;
|
||||
} else {
|
||||
newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
|
||||
&pInfo->pGroupIdCalc->lastKeysLen, retTuple);
|
||||
}
|
||||
bool emptyBlock = pBlock->info.rows == 0;
|
||||
if (newGroup) {
|
||||
if (!emptyBlock) {
|
||||
// new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
|
||||
// NULL. Note that the keyBuf and lastKeysLen has been updated to new value
|
||||
pInfo->pGroupIdCalc->pSavedTuple = retTuple;
|
||||
retTuple = NULL;
|
||||
} else {
|
||||
// new group with empty block
|
||||
pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId =
|
||||
calcGroupId(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeysLen);
|
||||
}
|
||||
} else {
|
||||
if (emptyBlock) {
|
||||
// new block but not new group, assign last group id to it
|
||||
pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId;
|
||||
} else {
|
||||
// not new group and not empty block and ret NOT NULL, just return the tuple
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return retTuple;
|
||||
}
|
||||
|
||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||
SSortOperatorInfo* pInfo) {
|
||||
blockDataCleanup(pDataBlock);
|
||||
|
@ -140,8 +226,13 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
|||
|
||||
blockDataEnsureCapacity(p, capacity);
|
||||
|
||||
STupleHandle* pTupleHandle;
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pInfo->pGroupIdCalc) {
|
||||
pTupleHandle = nextTupleWithGroupId(pHandle, pInfo, p);
|
||||
} else {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
}
|
||||
if (pTupleHandle == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -168,6 +259,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
|
|||
pDataBlock->info.dataLoad = 1;
|
||||
pDataBlock->info.rows = p->info.rows;
|
||||
pDataBlock->info.scanFlag = p->info.scanFlag;
|
||||
pDataBlock->info.id.groupId = p->info.id.groupId;
|
||||
}
|
||||
|
||||
blockDataDestroy(p);
|
||||
|
@ -281,6 +373,7 @@ void destroySortOperatorInfo(void* param) {
|
|||
tsortDestroySortHandle(pInfo->pSortHandle);
|
||||
taosArrayDestroy(pInfo->pSortInfo);
|
||||
taosArrayDestroy(pInfo->matchInfo.pList);
|
||||
destroySortOpGroupIdCalc(pInfo->pGroupIdCalc);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
@ -309,6 +402,14 @@ static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNod
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) {
|
||||
if (pCalc) {
|
||||
taosArrayDestroy(pCalc->pSortColsArr);
|
||||
taosMemoryFree(pCalc->keyBuf);
|
||||
taosMemoryFree(pCalc);
|
||||
}
|
||||
}
|
||||
|
||||
//=====================================================================================
|
||||
// Group Sort Operator
|
||||
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
|
||||
|
@ -591,6 +692,7 @@ typedef struct SMultiwayMergeOperatorInfo {
|
|||
bool ignoreGroupId;
|
||||
uint64_t groupId;
|
||||
STupleHandle* prefetchedTuple;
|
||||
bool inputWithGroupId;
|
||||
} SMultiwayMergeOperatorInfo;
|
||||
|
||||
int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
|
||||
|
@ -641,7 +743,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
|||
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = NULL;
|
||||
if (pInfo->groupSort) {
|
||||
if (pInfo->groupSort || pInfo->inputWithGroupId) {
|
||||
if (pInfo->prefetchedTuple == NULL) {
|
||||
pTupleHandle = tsortNextTuple(pHandle);
|
||||
} else {
|
||||
|
@ -662,7 +764,7 @@ static void doGetSortedBlockData(SMultiwayMergeOperatorInfo* pInfo, SSortHandle*
|
|||
break;
|
||||
}
|
||||
|
||||
if (pInfo->groupSort) {
|
||||
if (pInfo->groupSort || pInfo->inputWithGroupId) {
|
||||
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||
if (pInfo->groupId == 0 || pInfo->groupId == tupleGroupId) {
|
||||
appendOneRowToDataBlock(p, pTupleHandle);
|
||||
|
@ -842,6 +944,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
|
|||
pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); // one additional is reserved for merged result.
|
||||
pInfo->binfo.inputTsOrder = pMergePhyNode->node.inputTsOrder;
|
||||
pInfo->binfo.outputTsOrder = pMergePhyNode->node.outputTsOrder;
|
||||
pInfo->inputWithGroupId = pMergePhyNode->inputWithGroupId;
|
||||
|
||||
setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL,
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
#include "tsort.h"
|
||||
#include "tutil.h"
|
||||
#include "tsimplehash.h"
|
||||
#include "executil.h"
|
||||
|
||||
struct STupleHandle {
|
||||
SSDataBlock* pBlock;
|
||||
|
@ -615,48 +616,62 @@ int32_t msortComparFn(const void* pLeft, const void* pRight, void* param) {
|
|||
int ret = pParam->cmpFn(left1, right1);
|
||||
return ret;
|
||||
} else {
|
||||
bool isVarType;
|
||||
for (int32_t i = 0; i < pInfo->size; ++i) {
|
||||
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i);
|
||||
SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId);
|
||||
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
|
||||
isVarType = IS_VAR_DATA_TYPE(pLeftColInfoData->info.type);
|
||||
|
||||
bool leftNull = false;
|
||||
if (pLeftColInfoData->hasNull) {
|
||||
if (pLeftBlock->pBlockAgg == NULL) {
|
||||
leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
|
||||
} else {
|
||||
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
|
||||
pLeftBlock->pBlockAgg[i]);
|
||||
if (pLeftColInfoData->hasNull || pRightColInfoData->hasNull) {
|
||||
bool leftNull = false;
|
||||
if (pLeftColInfoData->hasNull) {
|
||||
if (pLeftBlock->pBlockAgg == NULL) {
|
||||
leftNull = colDataIsNull_t(pLeftColInfoData, pLeftSource->src.rowIndex, isVarType);
|
||||
} else {
|
||||
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex,
|
||||
pLeftBlock->pBlockAgg[i]);
|
||||
}
|
||||
}
|
||||
|
||||
bool rightNull = false;
|
||||
if (pRightColInfoData->hasNull) {
|
||||
if (pRightBlock->pBlockAgg == NULL) {
|
||||
rightNull = colDataIsNull_t(pRightColInfoData, pRightSource->src.rowIndex, isVarType);
|
||||
} else {
|
||||
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
|
||||
pRightBlock->pBlockAgg[i]);
|
||||
}
|
||||
}
|
||||
|
||||
if (leftNull && rightNull) {
|
||||
continue; // continue to next slot
|
||||
}
|
||||
|
||||
if (rightNull) {
|
||||
return pOrder->nullFirst ? 1 : -1;
|
||||
}
|
||||
|
||||
if (leftNull) {
|
||||
return pOrder->nullFirst ? -1 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
bool rightNull = false;
|
||||
if (pRightColInfoData->hasNull) {
|
||||
if (pRightBlock->pBlockAgg == NULL) {
|
||||
rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex);
|
||||
} else {
|
||||
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex,
|
||||
pRightBlock->pBlockAgg[i]);
|
||||
}
|
||||
void* left1, *right1;
|
||||
if (isVarType) {
|
||||
left1 = colDataGetVarData(pLeftColInfoData, pLeftSource->src.rowIndex);
|
||||
right1 = colDataGetVarData(pRightColInfoData, pRightSource->src.rowIndex);
|
||||
} else {
|
||||
left1 = colDataGetNumData(pLeftColInfoData, pLeftSource->src.rowIndex);
|
||||
right1 = colDataGetNumData(pRightColInfoData, pRightSource->src.rowIndex);
|
||||
}
|
||||
|
||||
if (leftNull && rightNull) {
|
||||
continue; // continue to next slot
|
||||
__compar_fn_t fn = pOrder->compFn;
|
||||
if (!fn) {
|
||||
fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
|
||||
pOrder->compFn = fn;
|
||||
}
|
||||
|
||||
if (rightNull) {
|
||||
return pOrder->nullFirst ? 1 : -1;
|
||||
}
|
||||
|
||||
if (leftNull) {
|
||||
return pOrder->nullFirst ? -1 : 1;
|
||||
}
|
||||
|
||||
void* left1 = colDataGetData(pLeftColInfoData, pLeftSource->src.rowIndex);
|
||||
void* right1 = colDataGetData(pRightColInfoData, pRightSource->src.rowIndex);
|
||||
|
||||
__compar_fn_t fn = getKeyComparFunc(pLeftColInfoData->info.type, pOrder->order);
|
||||
|
||||
int ret = fn(left1, right1);
|
||||
if (ret == 0) {
|
||||
continue;
|
||||
|
@ -1567,3 +1582,15 @@ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
|
|||
|
||||
return info;
|
||||
}
|
||||
|
||||
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen,
|
||||
const STupleHandle* pTuple) {
|
||||
int32_t ret;
|
||||
if (0 == compKeys(pSortCols, keyBuf, *keyLen, pTuple->pBlock, pTuple->rowIndex)) {
|
||||
ret = 0;
|
||||
} else {
|
||||
*keyLen = buildKeys(keyBuf, pSortCols, pTuple->pBlock, pTuple->rowIndex);
|
||||
ret = 1;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
|
|
@ -444,6 +444,7 @@ static int32_t logicAggCopy(const SAggLogicNode* pSrc, SAggLogicNode* pDst) {
|
|||
CLONE_NODE_LIST_FIELD(pGroupKeys);
|
||||
CLONE_NODE_LIST_FIELD(pAggFuncs);
|
||||
COPY_SCALAR_FIELD(hasGroupKeyOptimized);
|
||||
COPY_SCALAR_FIELD(isPartTb);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -489,6 +490,7 @@ static int32_t logicMergeCopy(const SMergeLogicNode* pSrc, SMergeLogicNode* pDst
|
|||
COPY_SCALAR_FIELD(srcGroupId);
|
||||
COPY_SCALAR_FIELD(groupSort);
|
||||
COPY_SCALAR_FIELD(ignoreGroupId);
|
||||
COPY_SCALAR_FIELD(inputWithGroupId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -531,6 +533,8 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
|
|||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||
CLONE_NODE_LIST_FIELD(pSortKeys);
|
||||
COPY_SCALAR_FIELD(groupSort);
|
||||
COPY_SCALAR_FIELD(calcGroupId);
|
||||
COPY_SCALAR_FIELD(excludePkCol);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -539,6 +543,9 @@ static int32_t logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLog
|
|||
CLONE_NODE_LIST_FIELD(pPartitionKeys);
|
||||
CLONE_NODE_LIST_FIELD(pTags);
|
||||
CLONE_NODE_FIELD(pSubtable);
|
||||
COPY_SCALAR_FIELD(needBlockOutputTsOrder);
|
||||
COPY_SCALAR_FIELD(pkTsColId);
|
||||
COPY_SCALAR_FIELD(pkTsColTbId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -678,6 +685,8 @@ static int32_t physiPartitionCopy(const SPartitionPhysiNode* pSrc, SPartitionPhy
|
|||
CLONE_NODE_LIST_FIELD(pExprs);
|
||||
CLONE_NODE_LIST_FIELD(pPartitionKeys);
|
||||
CLONE_NODE_LIST_FIELD(pTargets);
|
||||
COPY_SCALAR_FIELD(needBlockOutputTsOrder);
|
||||
COPY_SCALAR_FIELD(tsSlotId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -2271,6 +2271,7 @@ static const char* jkMergePhysiPlanNumOfChannels = "NumOfChannels";
|
|||
static const char* jkMergePhysiPlanSrcGroupId = "SrcGroupId";
|
||||
static const char* jkMergePhysiPlanGroupSort = "GroupSort";
|
||||
static const char* jkMergePhysiPlanIgnoreGroupID = "IgnoreGroupID";
|
||||
static const char* jkMergePhysiPlanInputWithGroupId = "InputWithGroupId";
|
||||
|
||||
static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SMergePhysiNode* pNode = (const SMergePhysiNode*)pObj;
|
||||
|
@ -2294,6 +2295,9 @@ static int32_t physiMergeNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanIgnoreGroupID, pNode->ignoreGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkMergePhysiPlanInputWithGroupId, pNode->inputWithGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2327,7 +2331,8 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
|
|||
static const char* jkSortPhysiPlanExprs = "Exprs";
|
||||
static const char* jkSortPhysiPlanSortKeys = "SortKeys";
|
||||
static const char* jkSortPhysiPlanTargets = "Targets";
|
||||
static const char* jkSortPhysiPlanMaxRows = "MaxRows";
|
||||
static const char* jkSortPhysiPlanCalcGroupIds = "CalcGroupIds";
|
||||
static const char* jkSortPhysiPlanExcludePKCol = "ExcludePKCol";
|
||||
|
||||
static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
|
||||
|
@ -2342,6 +2347,12 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkSortPhysiPlanCalcGroupIds, pNode->calcGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddBoolToObject(pJson, jkSortPhysiPlanExcludePKCol, pNode->excludePkCol);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2359,6 +2370,12 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkSortPhysiPlanCalcGroupIds, &pNode->calcGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code= tjsonGetBoolValue(pJson, jkSortPhysiPlanExcludePKCol, &pNode->excludePkCol);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2644,6 +2661,8 @@ static int32_t jsonToPhysiEventWindowNode(const SJson* pJson, void* pObj) {
|
|||
static const char* jkPartitionPhysiPlanExprs = "Exprs";
|
||||
static const char* jkPartitionPhysiPlanPartitionKeys = "PartitionKeys";
|
||||
static const char* jkPartitionPhysiPlanTargets = "Targets";
|
||||
static const char* jkPartitionPhysiPlanNeedBlockOutputTsOrder = "NeedBlockOutputTsOrder";
|
||||
static const char* jkPartitionPhysiPlanTsSlotId = "tsSlotId";
|
||||
|
||||
static int32_t physiPartitionNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SPartitionPhysiNode* pNode = (const SPartitionPhysiNode*)pObj;
|
||||
|
@ -2658,6 +2677,12 @@ static int32_t physiPartitionNodeToJson(const void* pObj, SJson* pJson) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkPartitionPhysiPlanTargets, pNode->pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonAddBoolToObject(pJson, jkPartitionPhysiPlanNeedBlockOutputTsOrder, pNode->needBlockOutputTsOrder);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
tjsonAddIntegerToObject(pJson, jkPartitionPhysiPlanTsSlotId, pNode->tsSlotId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2675,6 +2700,12 @@ static int32_t jsonToPhysiPartitionNode(const SJson* pJson, void* pObj) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkPartitionPhysiPlanTargets, &pNode->pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetBoolValue(pJson, jkPartitionPhysiPlanNeedBlockOutputTsOrder, &pNode->needBlockOutputTsOrder);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkPartitionPhysiPlanTsSlotId, &pNode->tsSlotId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -2682,6 +2682,7 @@ enum {
|
|||
PHY_MERGE_CODE_SRC_GROUP_ID,
|
||||
PHY_MERGE_CODE_GROUP_SORT,
|
||||
PHY_MERGE_CODE_IGNORE_GROUP_ID,
|
||||
PHY_MERGE_CODE_INPUT_WITH_GROUP_ID,
|
||||
};
|
||||
|
||||
static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
|
@ -2706,6 +2707,9 @@ static int32_t physiMergeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_IGNORE_GROUP_ID, pNode->ignoreGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_MERGE_CODE_INPUT_WITH_GROUP_ID, pNode->inputWithGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2738,6 +2742,9 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_MERGE_CODE_IGNORE_GROUP_ID:
|
||||
code = tlvDecodeBool(pTlv, &pNode->ignoreGroupId);
|
||||
break;
|
||||
case PHY_MERGE_CODE_INPUT_WITH_GROUP_ID:
|
||||
code = tlvDecodeBool(pTlv, &pNode->inputWithGroupId);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -2746,7 +2753,14 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS };
|
||||
enum {
|
||||
PHY_SORT_CODE_BASE_NODE = 1,
|
||||
PHY_SORT_CODE_EXPR,
|
||||
PHY_SORT_CODE_SORT_KEYS,
|
||||
PHY_SORT_CODE_TARGETS,
|
||||
PHY_SORT_CODE_CALC_GROUPID,
|
||||
PHY_SORT_CODE_EXCLUDE_PK_COL
|
||||
};
|
||||
|
||||
static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
|
||||
|
@ -2761,6 +2775,12 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_SORT_CODE_CALC_GROUPID, pNode->calcGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_SORT_CODE_EXCLUDE_PK_COL, pNode->excludePkCol);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2784,6 +2804,11 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_SORT_CODE_TARGETS:
|
||||
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
|
||||
break;
|
||||
case PHY_SORT_CODE_CALC_GROUPID:
|
||||
code = tlvDecodeBool(pTlv, &pNode->calcGroupId);
|
||||
break;
|
||||
case PHY_SORT_CODE_EXCLUDE_PK_COL:
|
||||
code = tlvDecodeBool(pTlv, &pNode->excludePkCol);
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
@ -3142,7 +3167,14 @@ static int32_t msgToPhysiEventWindowNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
enum { PHY_PARTITION_CODE_BASE_NODE = 1, PHY_PARTITION_CODE_EXPR, PHY_PARTITION_CODE_KEYS, PHY_PARTITION_CODE_TARGETS };
|
||||
enum {
|
||||
PHY_PARTITION_CODE_BASE_NODE = 1,
|
||||
PHY_PARTITION_CODE_EXPR,
|
||||
PHY_PARTITION_CODE_KEYS,
|
||||
PHY_PARTITION_CODE_TARGETS,
|
||||
PHY_PARTITION_CODE_HAS_OUTPUT_TS_ORDER,
|
||||
PHY_PARTITION_CODE_TS_SLOTID
|
||||
};
|
||||
|
||||
static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||
const SPartitionPhysiNode* pNode = (const SPartitionPhysiNode*)pObj;
|
||||
|
@ -3157,6 +3189,12 @@ static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder)
|
|||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeObj(pEncoder, PHY_PARTITION_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_PARTITION_CODE_HAS_OUTPUT_TS_ORDER, pNode->needBlockOutputTsOrder);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI32(pEncoder, PHY_PARTITION_CODE_TS_SLOTID, pNode->tsSlotId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -3180,6 +3218,12 @@ static int32_t msgToPhysiPartitionNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_PARTITION_CODE_TARGETS:
|
||||
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
|
||||
break;
|
||||
case PHY_PARTITION_CODE_HAS_OUTPUT_TS_ORDER:
|
||||
code = tlvDecodeBool(pTlv, &pNode->needBlockOutputTsOrder);
|
||||
break;
|
||||
case PHY_PARTITION_CODE_TS_SLOTID:
|
||||
code = tlvDecodeI32(pTlv, &pNode->tsSlotId);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -359,6 +359,9 @@ bool addHintNodeToList(SAstCreateContext* pCxt, SNodeList** ppHintList, EHintOpt
|
|||
}
|
||||
break;
|
||||
}
|
||||
case HINT_SORT_FOR_GROUP:
|
||||
if (paramNum > 0) return true;
|
||||
break;
|
||||
default:
|
||||
return true;
|
||||
}
|
||||
|
@ -421,6 +424,14 @@ SNodeList* createHintNodeList(SAstCreateContext* pCxt, const SToken* pLiteral) {
|
|||
}
|
||||
opt = HINT_NO_BATCH_SCAN;
|
||||
break;
|
||||
case TK_SORT_FOR_GROUP:
|
||||
lastComma = false;
|
||||
if (0 != opt || inParamList) {
|
||||
quit = true;
|
||||
break;
|
||||
}
|
||||
opt = HINT_SORT_FOR_GROUP;
|
||||
break;
|
||||
case TK_NK_LP:
|
||||
lastComma = false;
|
||||
if (0 == opt || inParamList) {
|
||||
|
|
|
@ -207,6 +207,7 @@ static SKeyword keywordTable[] = {
|
|||
{"SMALLINT", TK_SMALLINT},
|
||||
{"SNODE", TK_SNODE},
|
||||
{"SNODES", TK_SNODES},
|
||||
{"SORT_FOR_GROUP", TK_SORT_FOR_GROUP},
|
||||
{"SOFFSET", TK_SOFFSET},
|
||||
{"SPLIT", TK_SPLIT},
|
||||
{"STABLE", TK_STABLE},
|
||||
|
|
|
@ -44,12 +44,15 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
|
|||
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan);
|
||||
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList);
|
||||
|
||||
bool getBatchScanOptionFromHint(SNodeList* pList);
|
||||
bool getBatchScanOptionFromHint(SNodeList* pList);
|
||||
bool getSortForGroupOptHint(SNodeList* pList);
|
||||
SLogicNode* getLogicNodeRootNode(SLogicNode* pCurr);
|
||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
|
||||
bool isPartTableAgg(SAggLogicNode* pAgg);
|
||||
bool isPartTagAgg(SAggLogicNode* pAgg);
|
||||
bool isPartTableWinodw(SWindowLogicNode* pWindow);
|
||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
|
||||
bool isPartTableAgg(SAggLogicNode* pAgg);
|
||||
bool isPartTagAgg(SAggLogicNode* pAgg);
|
||||
bool isPartTableWinodw(SWindowLogicNode* pWindow);
|
||||
bool keysHasCol(SNodeList* pKeys);
|
||||
bool keysHasTbname(SNodeList* pKeys);
|
||||
|
||||
#define CLONE_LIMIT 1
|
||||
#define CLONE_SLIMIT 1 << 1
|
||||
|
|
|
@ -741,6 +741,9 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
|||
}
|
||||
nodesDestroyList(pOutputGroupKeys);
|
||||
|
||||
pAgg->isGroupTb = pAgg->pGroupKeys ? keysHasTbname(pAgg->pGroupKeys) : 0;
|
||||
pAgg->isPartTb = pSelect->pPartitionByList ? keysHasTbname(pSelect->pPartitionByList) : 0;
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pLogicNode = (SLogicNode*)pAgg;
|
||||
} else {
|
||||
|
@ -962,6 +965,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
|
|||
nodesDestroyNode((SNode*)pWindow);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pWindow->isPartTb = pSelect->pPartitionByList ? keysHasTbname(pSelect->pPartitionByList) : 0;
|
||||
|
||||
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
|
||||
}
|
||||
|
@ -993,7 +997,6 @@ static int32_t createWindowLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSele
|
|||
if (NULL == pSelect->pWindow) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
switch (nodeType(pSelect->pWindow)) {
|
||||
case QUERY_NODE_STATE_WINDOW:
|
||||
return createWindowLogicNodeByState(pCxt, (SStateWindowNode*)pSelect->pWindow, pSelect, pLogicNode);
|
||||
|
@ -1260,6 +1263,15 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
|
|||
}
|
||||
}
|
||||
|
||||
if (keysHasCol(pPartition->pPartitionKeys) && pSelect->pWindow &&
|
||||
nodeType(pSelect->pWindow) == QUERY_NODE_INTERVAL_WINDOW) {
|
||||
pPartition->needBlockOutputTsOrder = true;
|
||||
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pSelect->pWindow;
|
||||
SColumnNode* pTsCol = (SColumnNode*)pInterval->pCol;
|
||||
pPartition->pkTsColId = pTsCol->colId;
|
||||
pPartition->pkTsColTbId = pTsCol->tableId;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags) {
|
||||
pPartition->pTags = nodesCloneList(pSelect->pTags);
|
||||
if (NULL == pPartition->pTags) {
|
||||
|
|
|
@ -1661,22 +1661,6 @@ static int32_t smaIndexOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
|||
return smaIndexOptimizeImpl(pCxt, pLogicSubplan, pScan);
|
||||
}
|
||||
|
||||
static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) {
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) {
|
||||
*(bool*)pContext = true;
|
||||
return DEAL_RES_END;
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
static bool planOptNodeListHasCol(SNodeList* pKeys) {
|
||||
bool hasCol = false;
|
||||
nodesWalkExprs(pKeys, partTagsOptHasColImpl, &hasCol);
|
||||
return hasCol;
|
||||
}
|
||||
|
||||
static EDealRes partTagsOptHasTbname(SNode* pNode, void* pContext) {
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
if (COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) {
|
||||
|
@ -1755,7 +1739,7 @@ static bool partTagsOptMayBeOptimized(SLogicNode* pNode) {
|
|||
return false;
|
||||
}
|
||||
|
||||
return !planOptNodeListHasCol(partTagsGetPartKeys(pNode)) && partTagsOptAreSupportedFuncs(partTagsGetFuncs(pNode));
|
||||
return !keysHasCol(partTagsGetPartKeys(pNode)) && partTagsOptAreSupportedFuncs(partTagsGetFuncs(pNode));
|
||||
}
|
||||
|
||||
static int32_t partTagsOptRebuildTbanme(SNodeList* pPartKeys) {
|
||||
|
@ -2042,6 +2026,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
|
|||
}
|
||||
|
||||
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild);
|
||||
if (pProjectNode->node.pHint && !pChild->pHint) TSWAP(pProjectNode->node.pHint, pChild->pHint);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
NODES_CLEAR_LIST(pProjectNode->node.pChildren);
|
||||
nodesDestroyNode((SNode*)pProjectNode);
|
||||
|
@ -2734,7 +2719,7 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) {
|
|||
}
|
||||
|
||||
SAggLogicNode* pAgg = (SAggLogicNode*)(pNode->pParent);
|
||||
if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs || planOptNodeListHasCol(pAgg->pGroupKeys) ||
|
||||
if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs || keysHasCol(pAgg->pGroupKeys) ||
|
||||
!planOptNodeListHasTbname(pAgg->pGroupKeys)) {
|
||||
return false;
|
||||
}
|
||||
|
@ -2853,13 +2838,14 @@ static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPu
|
|||
cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT_SLIMIT);
|
||||
return true;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT:
|
||||
if (((SSortLogicNode*)pNodeLimitPushTo)->calcGroupId) break;
|
||||
// fall through
|
||||
case QUERY_NODE_LOGIC_PLAN_FILL:
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT: {
|
||||
cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT_SLIMIT);
|
||||
SNode* pChild = NULL;
|
||||
FOREACH(pChild, pNodeLimitPushTo->pChildren) { pushDownLimitHow(pNodeLimitPushTo, (SLogicNode*)pChild); }
|
||||
return true;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG: {
|
||||
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT &&
|
||||
(isPartTagAgg((SAggLogicNode*)pNodeLimitPushTo) || isPartTableAgg((SAggLogicNode*)pNodeLimitPushTo))) {
|
||||
|
@ -3587,6 +3573,96 @@ static int32_t stableJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
|
|||
return stbJoinOptRewriteStableJoin(pCxt, pNode, pLogicSubplan);
|
||||
}
|
||||
|
||||
static bool partColOptShouldBeOptimized(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||
SPartitionLogicNode* pPartition = (SPartitionLogicNode*)pNode;
|
||||
if (keysHasCol(pPartition->pPartitionKeys)) return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
|
||||
SNode* node;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SSortLogicNode* pSort = (SSortLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SORT);
|
||||
if (pSort) {
|
||||
bool alreadyPartByPKTs = false;
|
||||
pSort->groupSort = false;
|
||||
FOREACH(node, pPartition->pPartitionKeys) {
|
||||
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
||||
if (QUERY_NODE_COLUMN == nodeType(node) && ((SColumnNode*)node)->colId == pPartition->pkTsColId &&
|
||||
((SColumnNode*)node)->tableId == pPartition->pkTsColTbId)
|
||||
alreadyPartByPKTs = true;
|
||||
if (!pOrder) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
nodesListMakeAppend(&pSort->pSortKeys, (SNode*)pOrder);
|
||||
pOrder->order = ORDER_ASC;
|
||||
pOrder->pExpr = nodesCloneNode(node);
|
||||
if (!pOrder->pExpr) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
if (pPartition->needBlockOutputTsOrder && !alreadyPartByPKTs) {
|
||||
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
||||
if (!pOrder) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
} else {
|
||||
pSort->excludePkCol = true;
|
||||
nodesListMakeAppend(&pSort->pSortKeys, (SNode*)pOrder);
|
||||
pOrder->order = ORDER_ASC;
|
||||
pOrder->pExpr = 0;
|
||||
FOREACH(node, pPartition->node.pTargets) {
|
||||
if (nodeType(node) == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pCol = (SColumnNode*)node;
|
||||
if (pCol->colId == pPartition->pkTsColId && pCol->tableId == pPartition->pkTsColTbId) {
|
||||
pOrder->pExpr = nodesCloneNode((SNode*)pCol);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!pOrder->pExpr) {
|
||||
code = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
nodesDestroyNode((SNode*)pSort);
|
||||
pSort = NULL;
|
||||
}
|
||||
return pSort;
|
||||
}
|
||||
|
||||
static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SPartitionLogicNode* pNode = (SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partColOptShouldBeOptimized);
|
||||
if (NULL == pNode) return TSDB_CODE_SUCCESS;
|
||||
|
||||
SLogicNode* pRootNode = getLogicNodeRootNode((SLogicNode*)pNode);
|
||||
if (!pRootNode->pHint || !getSortForGroupOptHint(pRootNode->pHint)) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// replace with sort node
|
||||
SSortLogicNode* pSort = partColOptCreateSort(pNode);
|
||||
if (!pSort) {
|
||||
// if sort create failed, we eat the error, skip the optimization
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
TSWAP(pSort->node.pChildren, pNode->node.pChildren);
|
||||
TSWAP(pSort->node.pTargets, pNode->node.pTargets);
|
||||
optResetParent((SLogicNode*)pSort);
|
||||
pSort->calcGroupId = true;
|
||||
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort);
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
pCxt->optimized = true;
|
||||
} else {
|
||||
nodesDestroyNode((SNode*)pSort);
|
||||
}
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
// clang-format off
|
||||
static const SOptimizeRule optimizeRuleSet[] = {
|
||||
|
@ -3606,6 +3682,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
|
|||
{.pName = "TableCountScan", .optimizeFunc = tableCountScanOptimize},
|
||||
{.pName = "EliminateProject", .optimizeFunc = eliminateProjOptimize},
|
||||
{.pName = "EliminateSetOperator", .optimizeFunc = eliminateSetOpOptimize},
|
||||
{.pName = "PartitionCols", .optimizeFunc = partitionColsOpt},
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -1749,6 +1749,8 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
|||
SNodeList* pPrecalcExprs = NULL;
|
||||
SNodeList* pSortKeys = NULL;
|
||||
int32_t code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
|
||||
pSort->calcGroupId = pSortLogicNode->calcGroupId;
|
||||
pSort->excludePkCol = pSortLogicNode->excludePkCol;
|
||||
|
||||
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
|
||||
// push down expression to pOutputDataBlockDesc of child node
|
||||
|
@ -1797,6 +1799,7 @@ static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList*
|
|||
SNodeList* pPrecalcExprs = NULL;
|
||||
SNodeList* pPartitionKeys = NULL;
|
||||
int32_t code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
|
||||
pPart->needBlockOutputTsOrder = pPartLogicNode->needBlockOutputTsOrder;
|
||||
|
||||
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
|
||||
// push down expression to pOutputDataBlockDesc of child node
|
||||
|
@ -1818,6 +1821,22 @@ static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList*
|
|||
}
|
||||
}
|
||||
|
||||
if (pPart->needBlockOutputTsOrder) {
|
||||
SNode* node;
|
||||
bool found = false;
|
||||
FOREACH(node, pPartLogicNode->node.pTargets) {
|
||||
if (nodeType(node) == QUERY_NODE_COLUMN) {
|
||||
SColumnNode* pCol = (SColumnNode*)node;
|
||||
if (pCol->tableId == pPartLogicNode->pkTsColTbId && pCol->colId == pPartLogicNode->pkTsColId) {
|
||||
pPart->tsSlotId = pCol->slotId;
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (!found) code = TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = setConditionsSlotId(pCxt, (const SLogicNode*)pPartLogicNode, (SPhysiNode*)pPart);
|
||||
}
|
||||
|
@ -1944,6 +1963,7 @@ static int32_t createMergePhysiNode(SPhysiPlanContext* pCxt, SMergeLogicNode* pM
|
|||
pMerge->srcGroupId = pMergeLogicNode->srcGroupId;
|
||||
pMerge->groupSort = pMergeLogicNode->groupSort;
|
||||
pMerge->ignoreGroupId = pMergeLogicNode->ignoreGroupId;
|
||||
pMerge->inputWithGroupId = pMergeLogicNode->inputWithGroupId;
|
||||
|
||||
int32_t code = addDataBlockSlots(pCxt, pMergeLogicNode->pInputs, pMerge->node.pOutputDataBlockDesc);
|
||||
|
||||
|
|
|
@ -244,7 +244,12 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
|
|||
}
|
||||
pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
|
||||
}
|
||||
return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild)) {
|
||||
return true;
|
||||
} else if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
|
||||
return stbSplHasMultiTbScan(streamQuery, (SLogicNode*)pChild);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static bool stbSplIsMultiTbScanChild(bool streamQuery, SLogicNode* pNode) {
|
||||
|
@ -519,6 +524,11 @@ static int32_t stbSplRewriteFromMergeNode(SMergeLogicNode* pMerge, SLogicNode* p
|
|||
}
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_LOGIC_PLAN_SORT: {
|
||||
SSortLogicNode* pSort = (SSortLogicNode*)pNode;
|
||||
if (pSort->calcGroupId) pMerge->inputWithGroupId = true;
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -358,12 +358,12 @@ static bool stbNotSystemScan(SLogicNode* pNode) {
|
|||
}
|
||||
}
|
||||
|
||||
static bool stbHasPartTbname(SNodeList* pPartKeys) {
|
||||
if (NULL == pPartKeys) {
|
||||
bool keysHasTbname(SNodeList* pKeys) {
|
||||
if (NULL == pKeys) {
|
||||
return false;
|
||||
}
|
||||
SNode* pPartKey = NULL;
|
||||
FOREACH(pPartKey, pPartKeys) {
|
||||
FOREACH(pPartKey, pKeys) {
|
||||
if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) {
|
||||
pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0);
|
||||
}
|
||||
|
@ -390,10 +390,10 @@ bool isPartTableAgg(SAggLogicNode* pAgg) {
|
|||
return false;
|
||||
}
|
||||
if (NULL != pAgg->pGroupKeys) {
|
||||
return stbHasPartTbname(pAgg->pGroupKeys) &&
|
||||
return (pAgg->isGroupTb || keysHasTbname(pAgg->pGroupKeys)) &&
|
||||
stbNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
|
||||
}
|
||||
return stbHasPartTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
|
||||
return pAgg->isPartTb || keysHasTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
|
||||
}
|
||||
|
||||
static bool stbHasPartTag(SNodeList* pPartKeys) {
|
||||
|
@ -430,6 +430,17 @@ bool getBatchScanOptionFromHint(SNodeList* pList) {
|
|||
return batchScan;
|
||||
}
|
||||
|
||||
bool getSortForGroupOptHint(SNodeList* pList) {
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pList) {
|
||||
SHintNode* pHint = (SHintNode*)pNode;
|
||||
if (pHint->option == HINT_SORT_FOR_GROUP) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SLogicNode* pCurr = (SLogicNode*)pNode;
|
||||
|
@ -467,7 +478,7 @@ bool isPartTagAgg(SAggLogicNode* pAgg) {
|
|||
}
|
||||
|
||||
bool isPartTableWinodw(SWindowLogicNode* pWindow) {
|
||||
return stbHasPartTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
|
||||
return pWindow->isPartTb || keysHasTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
|
||||
}
|
||||
|
||||
bool cloneLimit(SLogicNode* pParent, SLogicNode* pChild, uint8_t cloneWhat) {
|
||||
|
@ -490,3 +501,19 @@ bool cloneLimit(SLogicNode* pParent, SLogicNode* pChild, uint8_t cloneWhat) {
|
|||
}
|
||||
return cloned;
|
||||
}
|
||||
|
||||
static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) {
|
||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||
if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) {
|
||||
*(bool*)pContext = true;
|
||||
return DEAL_RES_END;
|
||||
}
|
||||
}
|
||||
return DEAL_RES_CONTINUE;
|
||||
}
|
||||
|
||||
bool keysHasCol(SNodeList* pKeys) {
|
||||
bool hasCol = false;
|
||||
nodesWalkExprs(pKeys, partTagsOptHasColImpl, &hasCol);
|
||||
return hasCol;
|
||||
}
|
||||
|
|
|
@ -49,6 +49,10 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 4
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/partition_by_col.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqShow.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqDropStb.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||
|
|
|
@ -0,0 +1,323 @@
|
|||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
import math
|
||||
from datetime import datetime
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.common import *
|
||||
# from tmqCommon import *
|
||||
|
||||
COMPARE_DATA = 0
|
||||
COMPARE_LEN = 1
|
||||
|
||||
class TDTestCase:
|
||||
def __init__(self):
|
||||
self.vgroups = 4
|
||||
self.ctbNum = 10
|
||||
self.rowsPerTbl = 10000
|
||||
self.duraion = '1h'
|
||||
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
||||
def create_database(self,tsql, dbName,dropFlag=1,vgroups=2,replica=1, duration:str='1d'):
|
||||
if dropFlag == 1:
|
||||
tsql.execute("drop database if exists %s"%(dbName))
|
||||
|
||||
tsql.execute("create database if not exists %s vgroups %d replica %d duration %s"%(dbName, vgroups, replica, duration))
|
||||
tdLog.debug("complete to create database %s"%(dbName))
|
||||
return
|
||||
|
||||
def create_stable(self,tsql, paraDict):
|
||||
colString = tdCom.gen_column_type_str(colname_prefix=paraDict["colPrefix"], column_elm_list=paraDict["colSchema"])
|
||||
tagString = tdCom.gen_tag_type_str(tagname_prefix=paraDict["tagPrefix"], tag_elm_list=paraDict["tagSchema"])
|
||||
sqlString = f"create table if not exists %s.%s (%s) tags (%s)"%(paraDict["dbName"], paraDict["stbName"], colString, tagString)
|
||||
tdLog.debug("%s"%(sqlString))
|
||||
tsql.execute(sqlString)
|
||||
return
|
||||
|
||||
def create_ctable(self,tsql=None, dbName='dbx',stbName='stb',ctbPrefix='ctb',ctbNum=1,ctbStartIdx=0):
|
||||
for i in range(ctbNum):
|
||||
sqlString = "create table %s.%s%d using %s.%s tags(%d, 'tb%d', 'tb%d', %d, %d, %d)" % \
|
||||
(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,(i+ctbStartIdx) % 5,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx)
|
||||
tsql.execute(sqlString)
|
||||
|
||||
tdLog.debug("complete to create %d child tables by %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
def insert_data(self,tsql,dbName,ctbPrefix,ctbNum,rowsPerTbl,batchNum,startTs,tsStep):
|
||||
tdLog.debug("start to insert data ............")
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_insert = "insert into "
|
||||
sql = pre_insert
|
||||
|
||||
for i in range(ctbNum):
|
||||
rowsBatched = 0
|
||||
sql += " %s%d values "%(ctbPrefix,i)
|
||||
for j in range(rowsPerTbl):
|
||||
if (i < ctbNum/2):
|
||||
sql += "(%d, %d, %d, %d,%d,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10, j%10, j%10)
|
||||
else:
|
||||
sql += "(%d, %d, NULL, %d,NULL,%d,%d,true,'binary%d', 'nchar%d') "%(startTs + j*tsStep, j%10, j%10, j%10, j%10, j%10, j%10)
|
||||
rowsBatched += 1
|
||||
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
|
||||
tsql.execute(sql)
|
||||
rowsBatched = 0
|
||||
if j < rowsPerTbl - 1:
|
||||
sql = "insert into %s%d values " %(ctbPrefix,i)
|
||||
else:
|
||||
sql = "insert into "
|
||||
if sql != pre_insert:
|
||||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareTestEnv(self):
|
||||
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||
paraDict = {'dbName': 'test',
|
||||
'dropFlag': 1,
|
||||
'vgroups': 2,
|
||||
'stbName': 'meters',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},
|
||||
{'type': 'BIGINT', 'count':1},
|
||||
{'type': 'FLOAT', 'count':1},
|
||||
{'type': 'DOUBLE', 'count':1},
|
||||
{'type': 'smallint', 'count':1},
|
||||
{'type': 'tinyint', 'count':1},
|
||||
{'type': 'bool', 'count':1},
|
||||
{'type': 'binary', 'len':10, 'count':1},
|
||||
{'type': 'nchar', 'len':10, 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
|
||||
'ctbPrefix': 't',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 100,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 3000,
|
||||
'startTs': 1537146000000,
|
||||
'tsStep': 600000}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tdLog.info("create database")
|
||||
self.create_database(tsql=tdSql, dbName=paraDict["dbName"], dropFlag=paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, duration=self.duraion)
|
||||
|
||||
tdLog.info("create stb")
|
||||
self.create_stable(tsql=tdSql, paraDict=paraDict)
|
||||
|
||||
tdLog.info("create child tables")
|
||||
self.create_ctable(tsql=tdSql, dbName=paraDict["dbName"], \
|
||||
stbName=paraDict["stbName"],ctbPrefix=paraDict["ctbPrefix"],\
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict["ctbStartIdx"])
|
||||
self.insert_data(tsql=tdSql, dbName=paraDict["dbName"],\
|
||||
ctbPrefix=paraDict["ctbPrefix"],ctbNum=paraDict["ctbNum"],\
|
||||
rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],\
|
||||
startTs=paraDict["startTs"],tsStep=paraDict["tsStep"])
|
||||
return
|
||||
|
||||
def check_explain_res_has_row(self, plan_str_expect: str, rows):
|
||||
plan_found = False
|
||||
for row in rows:
|
||||
if str(row).find(plan_str_expect) >= 0:
|
||||
tdLog.debug("plan: [%s] found in: [%s]" % (plan_str_expect, str(row)))
|
||||
plan_found = True
|
||||
break
|
||||
if not plan_found:
|
||||
tdLog.exit("plan: %s not found in res: [%s]" % (plan_str_expect, str(rows)))
|
||||
|
||||
|
||||
def test_sort_for_partition_hint(self):
|
||||
sql = 'select count(*), c1 from meters partition by c1'
|
||||
sql_hint = 'select /*+ sort_for_group() */count(*), c1 from meters partition by c1'
|
||||
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
|
||||
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||
|
||||
sql = 'select count(*), c1, tbname from meters partition by tbname, c1'
|
||||
sql_hint = 'select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1'
|
||||
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
|
||||
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||
|
||||
sql = 'select count(*), c1, tbname from meters partition by tbname, c1 interval(1s)'
|
||||
sql_hint = 'select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1 interval(1s)'
|
||||
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
|
||||
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||
|
||||
sql = 'select count(*), c1, t1 from meters partition by t1, c1'
|
||||
sql_hint = 'select /*+ sort_for_group() */ count(*), c1, t1 from meters partition by t1, c1'
|
||||
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
|
||||
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||
|
||||
sql = 'select count(*), c1, t1 from meters partition by t1, c1 interval(1s)'
|
||||
sql_hint = 'select /*+ sort_for_group() */ count(*), c1, t1 from meters partition by t1, c1 interval(1s)'
|
||||
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
|
||||
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||
|
||||
sql = 'select count(*), c1 from meters partition by c1 interval(1s)'
|
||||
sql_hint = 'select /*+ sort_for_group() */ count(*), c1 from meters partition by c1 interval(1s)'
|
||||
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
|
||||
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||
|
||||
def add_order_by(self, sql: str, order_by: str, select_list: str = "*") -> str:
|
||||
return "select %s from (%s)t order by %s" % (select_list, sql, order_by)
|
||||
|
||||
def add_hint(self, sql: str) -> str:
|
||||
return "select /*+ sort_for_group() */ %s" % sql[6:]
|
||||
|
||||
def query_with_time(self, sql):
|
||||
start = datetime.now()
|
||||
tdSql.query(sql, queryTimes=1)
|
||||
return (datetime.now().timestamp() - start.timestamp()) * 1000
|
||||
|
||||
def explain_sql(self, sql: str):
|
||||
sql = "explain " + sql
|
||||
tdSql.query(sql, queryTimes=1)
|
||||
return tdSql.queryResult
|
||||
|
||||
def query_and_compare_res(self, sql1, sql2, compare_what: int = 0):
|
||||
dur = self.query_with_time(sql1)
|
||||
tdLog.debug("sql1 query with time: [%f]" % dur)
|
||||
res1 = tdSql.queryResult
|
||||
dur = self.query_with_time(sql2)
|
||||
tdLog.debug("sql2 query with time: [%f]" % dur)
|
||||
res2 = tdSql.queryResult
|
||||
if res1 is None or res2 is None:
|
||||
tdLog.exit("res1 or res2 is None")
|
||||
if compare_what <= COMPARE_LEN:
|
||||
if len(res1) != len(res2):
|
||||
tdLog.exit("query and copare failed cause different rows, sql1: [%s], rows: [%d], sql2: [%s], rows: [%d]" % (sql1, len(res1), sql2, len(res2)))
|
||||
if compare_what == COMPARE_DATA:
|
||||
for i in range(0, len(res1)):
|
||||
if res1[i] != res2[i]:
|
||||
tdLog.exit("compare failed for row: [%d], sqls: [%s] res1: [%s], sql2 : [%s] res2: [%s]" % (i, sql1, res1[i], sql2, res2[i]))
|
||||
tdLog.debug("sql: [%s] and sql: [%s] have same results, rows: [%d]" % (sql1, sql2, len(res1)))
|
||||
|
||||
def prepare_and_query_and_compare(self, sqls: [], order_by: str, select_list: str = "*", compare_what: int = 0):
|
||||
for sql in sqls:
|
||||
sql_hint = self.add_order_by(self.add_hint(sql), order_by, select_list)
|
||||
sql = self.add_order_by(sql, order_by, select_list)
|
||||
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||
self.check_explain_res_has_row("Partition", self.explain_sql(sql))
|
||||
self.query_and_compare_res(sql, sql_hint, compare_what=compare_what)
|
||||
|
||||
def test_sort_for_partition_res(self):
|
||||
sqls_par_c1_agg = [
|
||||
"select count(*), c1 from meters partition by c1",
|
||||
"select count(*), min(c2), max(c3), c1 from meters partition by c1",
|
||||
"select c1 from meters partition by c1",
|
||||
]
|
||||
sqls_par_c1 = [
|
||||
"select * from meters partition by c1"
|
||||
]
|
||||
sqls_par_c1_c2_agg = [
|
||||
"select count(*), c1, c2 from meters partition by c1, c2",
|
||||
"select c1, c2 from meters partition by c1, c2",
|
||||
"select count(*), c1, c2, min(c4), max(c5), sum(c6) from meters partition by c1, c2",
|
||||
]
|
||||
sqls_par_c1_c2 = [
|
||||
"select * from meters partition by c1, c2"
|
||||
]
|
||||
|
||||
sqls_par_tbname_c1 = [
|
||||
"select count(*), c1 , tbname as a from meters partition by tbname, c1"
|
||||
]
|
||||
sqls_par_tag_c1 = [
|
||||
"select count(*), c1, t1 from meters partition by t1, c1"
|
||||
]
|
||||
self.prepare_and_query_and_compare(sqls_par_c1_agg, "c1")
|
||||
self.prepare_and_query_and_compare(sqls_par_c1, "c1, ts, c2", "c1, ts, c2")
|
||||
self.prepare_and_query_and_compare(sqls_par_c1_c2_agg, "c1, c2")
|
||||
self.prepare_and_query_and_compare(sqls_par_c1_c2, "c1, c2, ts, c3", "c1, c2, ts, c3")
|
||||
self.prepare_and_query_and_compare(sqls_par_tbname_c1, "a, c1")
|
||||
self.prepare_and_query_and_compare(sqls_par_tag_c1, "t1, c1")
|
||||
|
||||
def get_interval_template_sqls(self, col_name):
|
||||
sqls = [
|
||||
'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1s)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30s)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1m)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30m)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1h)' % (col_name, col_name),
|
||||
|
||||
'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1s)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30s)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1m)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30m)' % (col_name, col_name),
|
||||
#'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1h)' % (col_name, col_name),
|
||||
|
||||
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1s)' % (col_name, col_name, col_name),
|
||||
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30s)' % (col_name, col_name, col_name),
|
||||
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1m)' % (col_name, col_name, col_name),
|
||||
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30m)' % (col_name, col_name, col_name),
|
||||
#'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1h)' % (col_name, col_name, col_name),
|
||||
|
||||
'select _wstart as ts, count(*), tbname as a, %s from meters partition by %s, tbname interval(1s)' % (col_name, col_name),
|
||||
'select _wstart as ts, count(*), t1 as a, %s from meters partition by %s, t1 interval(1s)' % (col_name, col_name),
|
||||
]
|
||||
order_list = 'a, %s, ts' % (col_name)
|
||||
return (sqls, order_list)
|
||||
|
||||
def test_sort_for_partition_interval(self):
|
||||
sqls, order_list = self.get_interval_template_sqls('c1')
|
||||
self.prepare_and_query_and_compare(sqls, order_list)
|
||||
#sqls, order_list = self.get_interval_template_sqls('c2')
|
||||
#self.prepare_and_query(sqls, order_list)
|
||||
sqls, order_list = self.get_interval_template_sqls('c3')
|
||||
self.prepare_and_query_and_compare(sqls, order_list)
|
||||
#sqls, order_list = self.get_interval_template_sqls('c4')
|
||||
#self.prepare_and_query(sqls, order_list)
|
||||
#sqls, order_list = self.get_interval_template_sqls('c5')
|
||||
#self.prepare_and_query(sqls, order_list)
|
||||
sqls, order_list = self.get_interval_template_sqls('c6')
|
||||
self.prepare_and_query_and_compare(sqls, order_list)
|
||||
#sqls, order_list = self.get_interval_template_sqls('c7')
|
||||
#self.prepare_and_query(sqls, order_list)
|
||||
sqls, order_list = self.get_interval_template_sqls('c8')
|
||||
self.prepare_and_query_and_compare(sqls, order_list)
|
||||
sqls, order_list = self.get_interval_template_sqls('c9')
|
||||
self.prepare_and_query_and_compare(sqls, order_list)
|
||||
|
||||
def test_sort_for_partition_no_agg_limit(self):
|
||||
sqls_template = [
|
||||
'select * from meters partition by c1 slimit %d limit %d',
|
||||
'select * from meters partition by c2 slimit %d limit %d',
|
||||
'select * from meters partition by c8 slimit %d limit %d',
|
||||
]
|
||||
sqls = []
|
||||
for sql in sqls_template:
|
||||
sqls.append(sql % (1,1))
|
||||
sqls.append(sql % (1,10))
|
||||
sqls.append(sql % (10,10))
|
||||
sqls.append(sql % (100, 100))
|
||||
order_by_list = 'ts,c1,c2,c3,c4,c5,c6,c7,c8,c9,t1,t2,t3,t4,t5,t6'
|
||||
|
||||
self.prepare_and_query_and_compare(sqls, order_by_list, compare_what=COMPARE_LEN)
|
||||
|
||||
|
||||
def run(self):
|
||||
self.prepareTestEnv()
|
||||
#time.sleep(99999999)
|
||||
self.test_sort_for_partition_hint()
|
||||
self.test_sort_for_partition_res()
|
||||
self.test_sort_for_partition_interval()
|
||||
self.test_sort_for_partition_no_agg_limit()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -18,6 +18,7 @@ python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4
|
|||
python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4
|
||||
python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4
|
||||
python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4
|
||||
python3 ./test.py -f 2-query/partition_by_col.py -Q 4
|
||||
python3 ./test.py -f 7-tmq/tmqShow.py
|
||||
python3 ./test.py -f 7-tmq/tmqDropStb.py
|
||||
python3 ./test.py -f 7-tmq/subscribeStb0.py
|
||||
|
|
Loading…
Reference in New Issue