fix: data error of partition by col + interval
This commit is contained in:
parent
2c4e0fee07
commit
4a131000e4
|
@ -28,6 +28,7 @@ typedef struct SBlockOrderInfo {
|
||||||
bool nullFirst;
|
bool nullFirst;
|
||||||
int32_t order;
|
int32_t order;
|
||||||
int32_t slotId;
|
int32_t slotId;
|
||||||
|
void* compFn;
|
||||||
SColumnInfoData* pColData;
|
SColumnInfoData* pColData;
|
||||||
} SBlockOrderInfo;
|
} SBlockOrderInfo;
|
||||||
|
|
||||||
|
@ -210,6 +211,10 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock);
|
||||||
size_t blockDataGetSerialMetaSize(uint32_t numOfCols);
|
size_t blockDataGetSerialMetaSize(uint32_t numOfCols);
|
||||||
|
|
||||||
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||||
|
/**
|
||||||
|
* @brief find how many rows already in order start from first row
|
||||||
|
*/
|
||||||
|
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo);
|
||||||
|
|
||||||
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
|
int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows, bool clearPayload);
|
||||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||||
|
|
|
@ -136,6 +136,8 @@ typedef struct SAggLogicNode {
|
||||||
bool hasTimeLineFunc;
|
bool hasTimeLineFunc;
|
||||||
bool onlyHasKeepOrderFunc;
|
bool onlyHasKeepOrderFunc;
|
||||||
bool hasGroupKeyOptimized;
|
bool hasGroupKeyOptimized;
|
||||||
|
bool isGroupTb;
|
||||||
|
bool isPartTb; // true if partition keys has tbname
|
||||||
} SAggLogicNode;
|
} SAggLogicNode;
|
||||||
|
|
||||||
typedef struct SProjectLogicNode {
|
typedef struct SProjectLogicNode {
|
||||||
|
@ -263,6 +265,7 @@ typedef struct SWindowLogicNode {
|
||||||
int8_t igExpired;
|
int8_t igExpired;
|
||||||
int8_t igCheckUpdate;
|
int8_t igCheckUpdate;
|
||||||
EWindowAlgorithm windowAlgo;
|
EWindowAlgorithm windowAlgo;
|
||||||
|
bool isPartTb;
|
||||||
} SWindowLogicNode;
|
} SWindowLogicNode;
|
||||||
|
|
||||||
typedef struct SFillLogicNode {
|
typedef struct SFillLogicNode {
|
||||||
|
@ -281,6 +284,7 @@ typedef struct SSortLogicNode {
|
||||||
bool groupSort;
|
bool groupSort;
|
||||||
bool skipPKSortOpt;
|
bool skipPKSortOpt;
|
||||||
bool calcGroupId;
|
bool calcGroupId;
|
||||||
|
bool excludePkCol; // exclude PK ts col when calc group id
|
||||||
} SSortLogicNode;
|
} SSortLogicNode;
|
||||||
|
|
||||||
typedef struct SPartitionLogicNode {
|
typedef struct SPartitionLogicNode {
|
||||||
|
@ -288,6 +292,9 @@ typedef struct SPartitionLogicNode {
|
||||||
SNodeList* pPartitionKeys;
|
SNodeList* pPartitionKeys;
|
||||||
SNodeList* pTags;
|
SNodeList* pTags;
|
||||||
SNode* pSubtable;
|
SNode* pSubtable;
|
||||||
|
|
||||||
|
bool needBlockOutputTsOrder; // if true, partition output block will have ts order maintained
|
||||||
|
int32_t tsSlotId;
|
||||||
} SPartitionLogicNode;
|
} SPartitionLogicNode;
|
||||||
|
|
||||||
typedef enum ESubplanType {
|
typedef enum ESubplanType {
|
||||||
|
@ -604,6 +611,7 @@ typedef struct SSortPhysiNode {
|
||||||
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
|
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
|
||||||
SNodeList* pTargets;
|
SNodeList* pTargets;
|
||||||
bool calcGroupId;
|
bool calcGroupId;
|
||||||
|
bool excludePkCol;
|
||||||
} SSortPhysiNode;
|
} SSortPhysiNode;
|
||||||
|
|
||||||
typedef SSortPhysiNode SGroupSortPhysiNode;
|
typedef SSortPhysiNode SGroupSortPhysiNode;
|
||||||
|
@ -613,6 +621,9 @@ typedef struct SPartitionPhysiNode {
|
||||||
SNodeList* pExprs; // these are expression list of partition_by_clause
|
SNodeList* pExprs; // these are expression list of partition_by_clause
|
||||||
SNodeList* pPartitionKeys;
|
SNodeList* pPartitionKeys;
|
||||||
SNodeList* pTargets;
|
SNodeList* pTargets;
|
||||||
|
|
||||||
|
bool needBlockOutputTsOrder;
|
||||||
|
int32_t tsSlotId;
|
||||||
} SPartitionPhysiNode;
|
} SPartitionPhysiNode;
|
||||||
|
|
||||||
typedef struct SStreamPartitionPhysiNode {
|
typedef struct SStreamPartitionPhysiNode {
|
||||||
|
|
|
@ -933,7 +933,13 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
__compar_fn_t fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
|
|
||||||
|
__compar_fn_t fn;
|
||||||
|
if (pOrder->compFn) {
|
||||||
|
fn = pOrder->compFn;
|
||||||
|
} else {
|
||||||
|
fn = getKeyComparFunc(pColInfoData->info.type, pOrder->order);
|
||||||
|
}
|
||||||
|
|
||||||
int ret = fn(left1, right1);
|
int ret = fn(left1, right1);
|
||||||
if (ret == 0) {
|
if (ret == 0) {
|
||||||
|
@ -1099,6 +1105,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) {
|
||||||
struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
|
struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i);
|
||||||
pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
|
pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId);
|
||||||
|
pInfo->compFn = getKeyComparFunc(pInfo->pColData->info.type, pInfo->order);
|
||||||
}
|
}
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
@ -2515,3 +2522,20 @@ void trimDataBlock(SSDataBlock* pBlock, int32_t totalRows, const bool* pBoolList
|
||||||
int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
int32_t blockGetEncodeSize(const SSDataBlock* pBlock) {
|
||||||
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
|
return blockDataGetSerialMetaSize(taosArrayGetSize(pBlock->pDataBlock)) + blockDataGetSize(pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t blockDataGetSortedRows(SSDataBlock* pDataBlock, SArray* pOrderInfo) {
|
||||||
|
if (!pDataBlock || !pOrderInfo) return 0;
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) {
|
||||||
|
SBlockOrderInfo* pOrder = taosArrayGet(pOrderInfo, i);
|
||||||
|
pOrder->pColData = taosArrayGet(pDataBlock->pDataBlock, pOrder->slotId);
|
||||||
|
pOrder->compFn = getKeyComparFunc(pOrder->pColData->info.type, pOrder->order);
|
||||||
|
}
|
||||||
|
SSDataBlockSortHelper sortHelper = {.orderInfo = pOrderInfo, .pDataBlock = pDataBlock};
|
||||||
|
int32_t rowIdx = 0, nextRowIdx = 1;
|
||||||
|
for (; rowIdx < pDataBlock->info.rows && nextRowIdx < pDataBlock->info.rows; ++rowIdx, ++nextRowIdx) {
|
||||||
|
if (dataBlockCompar(&nextRowIdx, &rowIdx, &sortHelper) < 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nextRowIdx;
|
||||||
|
}
|
||||||
|
|
|
@ -198,22 +198,20 @@ SSDataBlock* createTagValBlockForFilter(SArray* pColList, int32_t numOfTables, S
|
||||||
SStorageAPI* pStorageAPI);
|
SStorageAPI* pStorageAPI);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief extract col data according to sort/group cols
|
* @brief build a tuple into keyBuf
|
||||||
* @param pSortGroupCols sort keys or group keys, array of SColumnNode
|
* @param [out] keyBuf the output buf
|
||||||
* @param [out] pColVals col vals extracted, array of SGroupKeys
|
* @param [in] pSortGroupCols the cols to build
|
||||||
|
* @param [in] pBlock block the tuple in
|
||||||
*/
|
*/
|
||||||
void extractCols(SArray* pSortGroupCols, SArray* pColVals, SSDataBlock* pBlock, int32_t rowIndex);
|
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock, int32_t rowIndex);
|
||||||
/**
|
|
||||||
* @breif build keys buffer with col values
|
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pDataBlock,
|
||||||
* @retval key length
|
int32_t rowIndex);
|
||||||
* @param [out] buf buffer to store result key
|
|
||||||
*/
|
|
||||||
int32_t buildKeys(char* buf, SArray* pColVals);
|
|
||||||
|
|
||||||
uint64_t calcGroupId(char *pData, int32_t len);
|
uint64_t calcGroupId(char *pData, int32_t len);
|
||||||
|
|
||||||
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys);
|
SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys);
|
||||||
|
|
||||||
int32_t extractSortGroupKeysInfo(SArray** pColVals, int32_t* keyLen, char** keyBuf, const SArray* pColList);
|
int32_t extractKeysLen(const SArray* keys);
|
||||||
|
|
||||||
#endif // TDENGINE_EXECUTIL_H
|
#endif // TDENGINE_EXECUTIL_H
|
||||||
|
|
|
@ -194,7 +194,15 @@ void tsortSetClosed(SSortHandle* pHandle);
|
||||||
void tsortSetSingleTableMerge(SSortHandle* pHandle);
|
void tsortSetSingleTableMerge(SSortHandle* pHandle);
|
||||||
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param);
|
void tsortSetAbortCheckFn(SSortHandle* pHandle, bool (*checkFn)(void* param), void* param);
|
||||||
|
|
||||||
int32_t tsortBuildKeys(SArray* pSortGroupCols, SArray* pColVals, STupleHandle* pTuple, char* keyBuf);
|
/**
|
||||||
|
* @brief comp the tuple with keyBuf, if not equal, new keys will be built in keyBuf, newLen will be stored in keyLen
|
||||||
|
* @param [in] pSortCols cols to comp and build
|
||||||
|
* @param [in, out] pass in the old keys, if comp not equal, new keys will be built in it.
|
||||||
|
* @param [in, out] keyLen the old keysLen, if comp not equal, new keysLen will be stored in it.
|
||||||
|
* @param [in] the tuple to comp with
|
||||||
|
* @retval 0 if comp equal, 1 if not
|
||||||
|
*/
|
||||||
|
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen, const STupleHandle* pTuple);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -2262,75 +2262,71 @@ void updateTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pWin, int64_t
|
||||||
ts[4] = pWin->ekey + delta; // window end key
|
ts[4] = pWin->ekey + delta; // window end key
|
||||||
}
|
}
|
||||||
|
|
||||||
void extractCols(SArray* pSortGroupCols, SArray* pColVals, SSDataBlock* pBlock, int32_t rowIndex) {
|
int32_t compKeys(const SArray* pSortGroupCols, const char* oldkeyBuf, int32_t oldKeysLen, const SSDataBlock* pBlock, int32_t rowIndex) {
|
||||||
SColumnDataAgg* pColAgg = NULL;
|
SColumnDataAgg* pColAgg = NULL;
|
||||||
|
const char* isNull = oldkeyBuf;
|
||||||
|
const char* p = oldkeyBuf + sizeof(int8_t) * taosArrayGetSize(pSortGroupCols);
|
||||||
|
|
||||||
size_t numOfGroupCols = taosArrayGetSize(pSortGroupCols);
|
for (int32_t i = 0; i < taosArrayGetSize(pSortGroupCols); ++i) {
|
||||||
|
const SColumn* pCol = (SColumn*)taosArrayGet(pSortGroupCols, i);
|
||||||
|
const SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
|
||||||
|
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
|
||||||
SColumn* pCol = (SColumn*) taosArrayGet(pSortGroupCols, i);
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
|
|
||||||
|
|
||||||
// valid range check. todo: return error code.
|
|
||||||
if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pBlock->pBlockAgg != NULL) {
|
|
||||||
pColAgg = pBlock->pBlockAgg[pCol->slotId]; // TODO is agg data matched?
|
|
||||||
}
|
|
||||||
|
|
||||||
SGroupKeys* pkey = taosArrayGet(pColVals, i);
|
|
||||||
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
|
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
|
||||||
pkey->isNull = true;
|
if (isNull[i] != 1) return 1;
|
||||||
} else {
|
} else {
|
||||||
pkey->isNull = false;
|
if (isNull[i] != 0) return 1;
|
||||||
char* val = colDataGetData(pColInfoData, rowIndex);
|
const char* val = colDataGetData(pColInfoData, rowIndex);
|
||||||
if (pkey->type == TSDB_DATA_TYPE_JSON) {
|
if (pCol->type == TSDB_DATA_TYPE_JSON) {
|
||||||
if (tTagIsJson(val)) {
|
int32_t len = getJsonValueLen(val);
|
||||||
terrno = TSDB_CODE_QRY_JSON_IN_GROUP_ERROR;
|
if (memcmp(p, val, len) != 0) return 1;
|
||||||
return;
|
p += len;
|
||||||
}
|
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
int32_t dataLen = getJsonValueLen(val);
|
if (memcmp(p, val, varDataTLen(val)) != 0) return 1;
|
||||||
memcpy(pkey->pData, val, dataLen);
|
p += varDataTLen(val);
|
||||||
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
|
||||||
memcpy(pkey->pData, val, varDataTLen(val));
|
|
||||||
ASSERT(varDataTLen(val) <= pkey->bytes);
|
|
||||||
} else {
|
} else {
|
||||||
memcpy(pkey->pData, val, pkey->bytes);
|
if (0 != memcmp(p, val, pCol->bytes)) return 1;
|
||||||
|
p += pCol->bytes;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if ((int32_t)(p - oldkeyBuf) != oldKeysLen) return 1;
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t buildKeys(char* buf, SArray* pColVals) {
|
int32_t buildKeys(char* keyBuf, const SArray* pSortGroupCols, const SSDataBlock* pBlock,
|
||||||
size_t numOfGroupCols = taosArrayGetSize(pColVals);
|
int32_t rowIndex) {
|
||||||
|
uint32_t colNum = taosArrayGetSize(pSortGroupCols);
|
||||||
|
SColumnDataAgg* pColAgg = NULL;
|
||||||
|
char* isNull = keyBuf;
|
||||||
|
char* p = keyBuf + sizeof(int8_t) * colNum;
|
||||||
|
|
||||||
char* isNull = (char*)buf;
|
for (int32_t i = 0; i < colNum; ++i) {
|
||||||
char* pStart = (char*)buf + sizeof(int8_t) * numOfGroupCols;
|
const SColumn* pCol = (SColumn*)taosArrayGet(pSortGroupCols, i);
|
||||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
const SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pCol->slotId);
|
||||||
SGroupKeys* pkey = taosArrayGet(pColVals, i);
|
if (pCol->slotId > taosArrayGetSize(pBlock->pDataBlock)) continue;
|
||||||
if (pkey->isNull) {
|
|
||||||
|
if (pBlock->pBlockAgg) pColAgg = pBlock->pBlockAgg[pCol->slotId];
|
||||||
|
|
||||||
|
if (colDataIsNull(pColInfoData, pBlock->info.rows, rowIndex, pColAgg)) {
|
||||||
isNull[i] = 1;
|
isNull[i] = 1;
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
isNull[i] = 0;
|
|
||||||
if (pkey->type == TSDB_DATA_TYPE_JSON) {
|
|
||||||
int32_t dataLen = getJsonValueLen(pkey->pData);
|
|
||||||
memcpy(pStart, (pkey->pData), dataLen);
|
|
||||||
pStart += dataLen;
|
|
||||||
} else if (IS_VAR_DATA_TYPE(pkey->type)) {
|
|
||||||
varDataCopy(pStart, pkey->pData);
|
|
||||||
pStart += varDataTLen(pkey->pData);
|
|
||||||
ASSERT(varDataTLen(pkey->pData) <= pkey->bytes);
|
|
||||||
} else {
|
} else {
|
||||||
memcpy(pStart, pkey->pData, pkey->bytes);
|
isNull[i] = 0;
|
||||||
pStart += pkey->bytes;
|
const char* val = colDataGetData(pColInfoData, rowIndex);
|
||||||
|
if (pCol->type == TSDB_DATA_TYPE_JSON) {
|
||||||
|
int32_t len = getJsonValueLen(val);
|
||||||
|
memcpy(p, val, len);
|
||||||
|
p += len;
|
||||||
|
} else if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
|
varDataCopy(p, val);
|
||||||
|
p += varDataTLen(val);
|
||||||
|
} else {
|
||||||
|
memcpy(p, val, pCol->bytes);
|
||||||
|
p += pCol->bytes;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return (int32_t)(pStart - (char*)buf);
|
return (int32_t)(p - keyBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t calcGroupId(char* pData, int32_t len) {
|
uint64_t calcGroupId(char* pData, int32_t len) {
|
||||||
|
@ -2356,37 +2352,13 @@ SNodeList* makeColsNodeArrFromSortKeys(SNodeList* pSortKeys) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t extractSortGroupKeysInfo(SArray** pColVals, int32_t* keyLen, char** keyBuf, const SArray* pColList) {
|
int32_t extractKeysLen(const SArray* keys) {
|
||||||
*pColVals = taosArrayInit(4, sizeof(SGroupKeys));
|
int32_t len = 0;
|
||||||
if ((*pColVals) == NULL) {
|
int32_t keyNum = taosArrayGetSize(keys);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
for (int32_t i = 0; i < keyNum; ++i) {
|
||||||
|
SColumn* pCol = (SColumn*)taosArrayGet(keys, i);
|
||||||
|
len += pCol->bytes;
|
||||||
}
|
}
|
||||||
|
len += sizeof(int8_t) * keyNum; //null flag
|
||||||
*keyLen = 0;
|
return len;
|
||||||
int32_t numOfGroupCols = taosArrayGetSize(pColList);
|
|
||||||
for (int32_t i = 0; i < numOfGroupCols; ++i) {
|
|
||||||
SColumn* pCol = (SColumn*)taosArrayGet(pColList, i);
|
|
||||||
(*keyLen) += pCol->bytes; // actual data + null_flag
|
|
||||||
|
|
||||||
SGroupKeys key = {0};
|
|
||||||
key.bytes = pCol->bytes;
|
|
||||||
key.type = pCol->type;
|
|
||||||
key.isNull = false;
|
|
||||||
key.pData = taosMemoryCalloc(1, pCol->bytes);
|
|
||||||
if (key.pData == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayPush((*pColVals), &key);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t nullFlagSize = sizeof(int8_t) * numOfGroupCols;
|
|
||||||
(*keyLen) += nullFlagSize;
|
|
||||||
|
|
||||||
(*keyBuf) = taosMemoryCalloc(1, (*keyLen));
|
|
||||||
if ((*keyBuf) == NULL) {
|
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,10 @@ typedef struct SPartitionOperatorInfo {
|
||||||
int32_t groupIndex; // group index
|
int32_t groupIndex; // group index
|
||||||
int32_t pageIndex; // page index of current group
|
int32_t pageIndex; // page index of current group
|
||||||
SExprSupp scalarSup;
|
SExprSupp scalarSup;
|
||||||
|
|
||||||
|
int32_t remainRows;
|
||||||
|
int32_t orderedRows;
|
||||||
|
SArray* pOrderInfoArr;
|
||||||
} SPartitionOperatorInfo;
|
} SPartitionOperatorInfo;
|
||||||
|
|
||||||
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
|
static void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInfo** pGroupInfo, int32_t len);
|
||||||
|
@ -685,6 +689,8 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
||||||
SPartitionOperatorInfo* pInfo = pOperator->info;
|
SPartitionOperatorInfo* pInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
if (pInfo->remainRows == 0) {
|
||||||
|
blockDataCleanup(pInfo->binfo.pRes);
|
||||||
SDataGroupInfo* pGroupInfo =
|
SDataGroupInfo* pGroupInfo =
|
||||||
(pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
|
(pInfo->groupIndex != -1) ? taosArrayGet(pInfo->sortedGroupArray, pInfo->groupIndex) : NULL;
|
||||||
if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
|
if (pInfo->groupIndex == -1 || pInfo->pageIndex >= taosArrayGetSize(pGroupInfo->pPageList)) {
|
||||||
|
@ -712,10 +718,20 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pInfo->pageIndex += 1;
|
pInfo->pageIndex += 1;
|
||||||
releaseBufPage(pInfo->pBuf, page);
|
releaseBufPage(pInfo->pBuf, page);
|
||||||
|
|
||||||
pInfo->binfo.pRes->info.dataLoad = 1;
|
|
||||||
blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
|
|
||||||
pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId;
|
pInfo->binfo.pRes->info.id.groupId = pGroupInfo->groupId;
|
||||||
|
pInfo->binfo.pRes->info.dataLoad = 1;
|
||||||
|
pInfo->orderedRows = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pInfo->pOrderInfoArr) {
|
||||||
|
pInfo->binfo.pRes->info.rows += pInfo->remainRows;
|
||||||
|
blockDataTrimFirstRows(pInfo->binfo.pRes, pInfo->orderedRows);
|
||||||
|
pInfo->orderedRows = blockDataGetSortedRows(pInfo->binfo.pRes, pInfo->pOrderInfoArr);
|
||||||
|
pInfo->remainRows = pInfo->binfo.pRes->info.rows - pInfo->orderedRows;
|
||||||
|
pInfo->binfo.pRes->info.rows = pInfo->orderedRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataUpdateTsWindow(pInfo->binfo.pRes, 0);
|
||||||
|
|
||||||
pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
|
pOperator->resultInfo.totalRows += pInfo->binfo.pRes->info.rows;
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
|
@ -732,7 +748,6 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
||||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
blockDataCleanup(pRes);
|
|
||||||
return buildPartitionResult(pOperator);
|
return buildPartitionResult(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -815,6 +830,7 @@ static void destroyPartitionOperatorInfo(void* param) {
|
||||||
|
|
||||||
cleanupExprSupp(&pInfo->scalarSup);
|
cleanupExprSupp(&pInfo->scalarSup);
|
||||||
destroyDiskbasedBuf(pInfo->pBuf);
|
destroyDiskbasedBuf(pInfo->pBuf);
|
||||||
|
taosArrayDestroy(pInfo->pOrderInfoArr);
|
||||||
taosMemoryFreeClear(param);
|
taosMemoryFreeClear(param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -832,6 +848,17 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition
|
||||||
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
|
SExprInfo* pExprInfo = createExprInfo(pPartNode->pTargets, NULL, &numOfCols);
|
||||||
pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys);
|
pInfo->pGroupCols = makeColumnArrayFromList(pPartNode->pPartitionKeys);
|
||||||
|
|
||||||
|
if (pPartNode->needBlockOutputTsOrder) {
|
||||||
|
SBlockOrderInfo order = {.order = ORDER_ASC, .pColData = NULL, .nullFirst = false, .slotId = pPartNode->tsSlotId};
|
||||||
|
pInfo->pOrderInfoArr = taosArrayInit(1, sizeof(SBlockOrderInfo));
|
||||||
|
if (!pInfo->pOrderInfoArr) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
pTaskInfo->code = terrno;
|
||||||
|
goto _error;
|
||||||
|
}
|
||||||
|
taosArrayPush(pInfo->pOrderInfoArr, &order);
|
||||||
|
}
|
||||||
|
|
||||||
if (pPartNode->pExprs != NULL) {
|
if (pPartNode->pExprs != NULL) {
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
|
SExprInfo* pExprInfo1 = createExprInfo(pPartNode->pExprs, NULL, &num);
|
||||||
|
|
|
@ -22,11 +22,10 @@
|
||||||
typedef struct SSortOpGroupIdCalc {
|
typedef struct SSortOpGroupIdCalc {
|
||||||
STupleHandle* pSavedTuple;
|
STupleHandle* pSavedTuple;
|
||||||
SArray* pSortColsArr;
|
SArray* pSortColsArr;
|
||||||
SArray* pSortColVals;
|
|
||||||
char* keyBuf;
|
char* keyBuf;
|
||||||
char* lastKeyBuf;
|
int32_t lastKeysLen; // default to be 0
|
||||||
int32_t lastKeysLen;
|
|
||||||
uint64_t lastGroupId;
|
uint64_t lastGroupId;
|
||||||
|
bool excludePKCol;
|
||||||
} SSortOpGroupIdCalc;
|
} SSortOpGroupIdCalc;
|
||||||
|
|
||||||
typedef struct SSortOperatorInfo {
|
typedef struct SSortOperatorInfo {
|
||||||
|
@ -93,7 +92,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
||||||
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
|
pInfo->pSortInfo = createSortInfo(pSortNode->pSortKeys);
|
||||||
|
|
||||||
if (pSortNode->calcGroupId) {
|
if (pSortNode->calcGroupId) {
|
||||||
SSortOpGroupIdCalc* pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc));
|
int32_t keyLen;
|
||||||
|
SSortOpGroupIdCalc* pGroupIdCalc = pInfo->pGroupIdCalc = taosMemoryCalloc(1, sizeof(SSortOpGroupIdCalc));
|
||||||
if (!pGroupIdCalc) {
|
if (!pGroupIdCalc) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -105,17 +105,15 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
|
||||||
if (!pGroupIdCalc->pSortColsArr) code = TSDB_CODE_OUT_OF_MEMORY;
|
if (!pGroupIdCalc->pSortColsArr) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
nodesClearList(pSortColsNodeArr);
|
nodesClearList(pSortColsNodeArr);
|
||||||
}
|
}
|
||||||
int32_t keyLen;
|
|
||||||
if (TSDB_CODE_SUCCESS == code)
|
|
||||||
code = extractSortGroupKeysInfo(&pGroupIdCalc->pSortColVals, &keyLen, &pGroupIdCalc->keyBuf,
|
|
||||||
pGroupIdCalc->pSortColsArr);
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pGroupIdCalc->lastKeysLen = 0;
|
// PK ts col should always at last, see partColOptCreateSort
|
||||||
pGroupIdCalc->lastKeyBuf = taosMemoryCalloc(1, keyLen);
|
if (pSortNode->excludePkCol) taosArrayPop(pGroupIdCalc->pSortColsArr);
|
||||||
if (!pGroupIdCalc->lastKeyBuf) code = TSDB_CODE_OUT_OF_MEMORY;
|
keyLen = extractKeysLen(pGroupIdCalc->pSortColsArr);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pInfo->pGroupIdCalc = pGroupIdCalc;
|
pGroupIdCalc->lastKeysLen = 0;
|
||||||
|
pGroupIdCalc->keyBuf = taosMemoryCalloc(1, keyLen);
|
||||||
|
if (!pGroupIdCalc->keyBuf) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (code != TSDB_CODE_SUCCESS) goto _error;
|
if (code != TSDB_CODE_SUCCESS) goto _error;
|
||||||
|
@ -172,35 +170,40 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief get next tuple with group id attached, all tuples fetched from tsortNextTuple are sorted by group keys
|
* @brief get next tuple with group id attached, here assume that all tuples are sorted by group keys
|
||||||
* @param pBlock the output block, the group id will be saved in it
|
* @param [in, out] pBlock the output block, the group id will be saved in it
|
||||||
* @retval NULL if next group tuple arrived, the pre fetched tuple will be saved in pInfo.pSavedTuple
|
* @retval NULL if next group tuple arrived and this new group tuple will be saved in pInfo.pSavedTuple
|
||||||
|
* @retval NULL if no more tuples
|
||||||
*/
|
*/
|
||||||
static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInfo* pInfo, SSDataBlock* pBlock) {
|
||||||
STupleHandle *ret = pInfo->pGroupIdCalc->pSavedTuple;
|
STupleHandle* retTuple = pInfo->pGroupIdCalc->pSavedTuple;
|
||||||
pInfo->pGroupIdCalc->pSavedTuple = NULL;
|
if (!retTuple) {
|
||||||
if (!ret) {
|
retTuple = tsortNextTuple(pHandle);
|
||||||
ret = tsortNextTuple(pHandle);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ret) {
|
if (retTuple) {
|
||||||
int32_t len = tsortBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->pSortColVals, ret,
|
int32_t newGroup;
|
||||||
pInfo->pGroupIdCalc->keyBuf);
|
if (pInfo->pGroupIdCalc->pSavedTuple) {
|
||||||
bool newGroup = len != pInfo->pGroupIdCalc->lastKeysLen
|
newGroup = true;
|
||||||
? true
|
pInfo->pGroupIdCalc->pSavedTuple = NULL;
|
||||||
: memcmp(pInfo->pGroupIdCalc->lastKeyBuf, pInfo->pGroupIdCalc->keyBuf, len) != 0;
|
|
||||||
bool emptyBlock = pBlock->info.rows == 0;
|
|
||||||
if (newGroup && !emptyBlock) {
|
|
||||||
// new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return NULL
|
|
||||||
pInfo->pGroupIdCalc->pSavedTuple = ret;
|
|
||||||
ret = NULL;
|
|
||||||
} else {
|
} else {
|
||||||
|
newGroup = tsortCompAndBuildKeys(pInfo->pGroupIdCalc->pSortColsArr, pInfo->pGroupIdCalc->keyBuf,
|
||||||
|
&pInfo->pGroupIdCalc->lastKeysLen, retTuple);
|
||||||
|
}
|
||||||
|
bool emptyBlock = pBlock->info.rows == 0;
|
||||||
if (newGroup) {
|
if (newGroup) {
|
||||||
ASSERT(emptyBlock);
|
if (!emptyBlock) {
|
||||||
pInfo->pGroupIdCalc->lastKeysLen = len;
|
// new group arrived, and we have already copied some tuples for cur group, save the new group tuple, return
|
||||||
pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId = calcGroupId(pInfo->pGroupIdCalc->keyBuf, len);
|
// NULL. Note that the keyBuf and lastKeysLen has been updated to new value
|
||||||
TSWAP(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeyBuf);
|
pInfo->pGroupIdCalc->pSavedTuple = retTuple;
|
||||||
} else if (emptyBlock) {
|
retTuple = NULL;
|
||||||
|
} else {
|
||||||
|
// new group with empty block
|
||||||
|
pInfo->pGroupIdCalc->lastGroupId = pBlock->info.id.groupId =
|
||||||
|
calcGroupId(pInfo->pGroupIdCalc->keyBuf, pInfo->pGroupIdCalc->lastKeysLen);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (emptyBlock) {
|
||||||
// new block but not new group, assign last group id to it
|
// new block but not new group, assign last group id to it
|
||||||
pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId;
|
pBlock->info.id.groupId = pInfo->pGroupIdCalc->lastGroupId;
|
||||||
} else {
|
} else {
|
||||||
|
@ -209,7 +212,7 @@ static STupleHandle* nextTupleWithGroupId(SSortHandle* pHandle, SSortOperatorInf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return ret;
|
return retTuple;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity, SArray* pColMatchInfo,
|
||||||
|
@ -401,14 +404,8 @@ static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNod
|
||||||
|
|
||||||
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) {
|
static void destroySortOpGroupIdCalc(SSortOpGroupIdCalc* pCalc) {
|
||||||
if (pCalc) {
|
if (pCalc) {
|
||||||
for (int i = 0; i < taosArrayGetSize(pCalc->pSortColVals); i++) {
|
|
||||||
SGroupKeys key = *(SGroupKeys*)taosArrayGet(pCalc->pSortColVals, i);
|
|
||||||
taosMemoryFree(key.pData);
|
|
||||||
}
|
|
||||||
taosArrayDestroy(pCalc->pSortColVals);
|
|
||||||
taosArrayDestroy(pCalc->pSortColsArr);
|
taosArrayDestroy(pCalc->pSortColsArr);
|
||||||
taosMemoryFree(pCalc->keyBuf);
|
taosMemoryFree(pCalc->keyBuf);
|
||||||
taosMemoryFree(pCalc->lastKeyBuf);
|
|
||||||
taosMemoryFree(pCalc);
|
taosMemoryFree(pCalc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1555,7 +1555,14 @@ SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsortBuildKeys(SArray* pSortGroupCols, SArray* pColVals, STupleHandle* pTuple, char* keyBuf) {
|
int32_t tsortCompAndBuildKeys(const SArray* pSortCols, char* keyBuf, int32_t* keyLen,
|
||||||
extractCols(pSortGroupCols, pColVals, pTuple->pBlock, pTuple->rowIndex);
|
const STupleHandle* pTuple) {
|
||||||
return buildKeys(keyBuf, pColVals);
|
int32_t ret;
|
||||||
|
if (0 == compKeys(pSortCols, keyBuf, *keyLen, pTuple->pBlock, pTuple->rowIndex)) {
|
||||||
|
ret = 0;
|
||||||
|
} else {
|
||||||
|
*keyLen = buildKeys(keyBuf, pSortCols, pTuple->pBlock, pTuple->rowIndex);
|
||||||
|
ret = 1;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
|
@ -444,6 +444,7 @@ static int32_t logicAggCopy(const SAggLogicNode* pSrc, SAggLogicNode* pDst) {
|
||||||
CLONE_NODE_LIST_FIELD(pGroupKeys);
|
CLONE_NODE_LIST_FIELD(pGroupKeys);
|
||||||
CLONE_NODE_LIST_FIELD(pAggFuncs);
|
CLONE_NODE_LIST_FIELD(pAggFuncs);
|
||||||
COPY_SCALAR_FIELD(hasGroupKeyOptimized);
|
COPY_SCALAR_FIELD(hasGroupKeyOptimized);
|
||||||
|
COPY_SCALAR_FIELD(isPartTb);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -532,6 +533,7 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
|
||||||
CLONE_NODE_LIST_FIELD(pSortKeys);
|
CLONE_NODE_LIST_FIELD(pSortKeys);
|
||||||
COPY_SCALAR_FIELD(groupSort);
|
COPY_SCALAR_FIELD(groupSort);
|
||||||
COPY_SCALAR_FIELD(calcGroupId);
|
COPY_SCALAR_FIELD(calcGroupId);
|
||||||
|
COPY_SCALAR_FIELD(excludePkCol);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -540,6 +542,8 @@ static int32_t logicPartitionCopy(const SPartitionLogicNode* pSrc, SPartitionLog
|
||||||
CLONE_NODE_LIST_FIELD(pPartitionKeys);
|
CLONE_NODE_LIST_FIELD(pPartitionKeys);
|
||||||
CLONE_NODE_LIST_FIELD(pTags);
|
CLONE_NODE_LIST_FIELD(pTags);
|
||||||
CLONE_NODE_FIELD(pSubtable);
|
CLONE_NODE_FIELD(pSubtable);
|
||||||
|
COPY_SCALAR_FIELD(needBlockOutputTsOrder);
|
||||||
|
COPY_SCALAR_FIELD(tsSlotId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -679,6 +683,8 @@ static int32_t physiPartitionCopy(const SPartitionPhysiNode* pSrc, SPartitionPhy
|
||||||
CLONE_NODE_LIST_FIELD(pExprs);
|
CLONE_NODE_LIST_FIELD(pExprs);
|
||||||
CLONE_NODE_LIST_FIELD(pPartitionKeys);
|
CLONE_NODE_LIST_FIELD(pPartitionKeys);
|
||||||
CLONE_NODE_LIST_FIELD(pTargets);
|
CLONE_NODE_LIST_FIELD(pTargets);
|
||||||
|
COPY_SCALAR_FIELD(needBlockOutputTsOrder);
|
||||||
|
COPY_SCALAR_FIELD(tsSlotId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2328,6 +2328,7 @@ static const char* jkSortPhysiPlanExprs = "Exprs";
|
||||||
static const char* jkSortPhysiPlanSortKeys = "SortKeys";
|
static const char* jkSortPhysiPlanSortKeys = "SortKeys";
|
||||||
static const char* jkSortPhysiPlanTargets = "Targets";
|
static const char* jkSortPhysiPlanTargets = "Targets";
|
||||||
static const char* jkSortPhysiPlanCalcGroupIds = "CalcGroupIds";
|
static const char* jkSortPhysiPlanCalcGroupIds = "CalcGroupIds";
|
||||||
|
static const char* jkSortPhysiPlanExcludePKCol = "ExcludePKCol";
|
||||||
|
|
||||||
static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
|
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
|
||||||
|
@ -2345,6 +2346,9 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddBoolToObject(pJson, jkSortPhysiPlanCalcGroupIds, pNode->calcGroupId);
|
code = tjsonAddBoolToObject(pJson, jkSortPhysiPlanCalcGroupIds, pNode->calcGroupId);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddBoolToObject(pJson, jkSortPhysiPlanExcludePKCol, pNode->excludePkCol);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2365,6 +2369,9 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetBoolValue(pJson, jkSortPhysiPlanCalcGroupIds, &pNode->calcGroupId);
|
code = tjsonGetBoolValue(pJson, jkSortPhysiPlanCalcGroupIds, &pNode->calcGroupId);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code= tjsonGetBoolValue(pJson, jkSortPhysiPlanExcludePKCol, &pNode->excludePkCol);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2650,6 +2657,8 @@ static int32_t jsonToPhysiEventWindowNode(const SJson* pJson, void* pObj) {
|
||||||
static const char* jkPartitionPhysiPlanExprs = "Exprs";
|
static const char* jkPartitionPhysiPlanExprs = "Exprs";
|
||||||
static const char* jkPartitionPhysiPlanPartitionKeys = "PartitionKeys";
|
static const char* jkPartitionPhysiPlanPartitionKeys = "PartitionKeys";
|
||||||
static const char* jkPartitionPhysiPlanTargets = "Targets";
|
static const char* jkPartitionPhysiPlanTargets = "Targets";
|
||||||
|
static const char* jkPartitionPhysiPlanNeedBlockOutputTsOrder = "NeedBlockOutputTsOrder";
|
||||||
|
static const char* jkPartitionPhysiPlanTsSlotId = "tsSlotId";
|
||||||
|
|
||||||
static int32_t physiPartitionNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t physiPartitionNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SPartitionPhysiNode* pNode = (const SPartitionPhysiNode*)pObj;
|
const SPartitionPhysiNode* pNode = (const SPartitionPhysiNode*)pObj;
|
||||||
|
@ -2664,6 +2673,12 @@ static int32_t physiPartitionNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = nodeListToJson(pJson, jkPartitionPhysiPlanTargets, pNode->pTargets);
|
code = nodeListToJson(pJson, jkPartitionPhysiPlanTargets, pNode->pTargets);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
tjsonAddBoolToObject(pJson, jkPartitionPhysiPlanNeedBlockOutputTsOrder, pNode->needBlockOutputTsOrder);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
tjsonAddIntegerToObject(pJson, jkPartitionPhysiPlanTsSlotId, pNode->tsSlotId);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2681,6 +2696,12 @@ static int32_t jsonToPhysiPartitionNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = jsonToNodeList(pJson, jkPartitionPhysiPlanTargets, &pNode->pTargets);
|
code = jsonToNodeList(pJson, jkPartitionPhysiPlanTargets, &pNode->pTargets);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetBoolValue(pJson, jkPartitionPhysiPlanNeedBlockOutputTsOrder, &pNode->needBlockOutputTsOrder);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetIntValue(pJson, jkPartitionPhysiPlanTsSlotId, &pNode->tsSlotId);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2746,7 +2746,14 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_CALC_GROUPID };
|
enum {
|
||||||
|
PHY_SORT_CODE_BASE_NODE = 1,
|
||||||
|
PHY_SORT_CODE_EXPR,
|
||||||
|
PHY_SORT_CODE_SORT_KEYS,
|
||||||
|
PHY_SORT_CODE_TARGETS,
|
||||||
|
PHY_SORT_CODE_CALC_GROUPID,
|
||||||
|
PHY_SORT_CODE_EXCLUDE_PK_COL
|
||||||
|
};
|
||||||
|
|
||||||
static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
|
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
|
||||||
|
@ -2764,6 +2771,9 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeBool(pEncoder, PHY_SORT_CODE_CALC_GROUPID, pNode->calcGroupId);
|
code = tlvEncodeBool(pEncoder, PHY_SORT_CODE_CALC_GROUPID, pNode->calcGroupId);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeBool(pEncoder, PHY_SORT_CODE_EXCLUDE_PK_COL, pNode->excludePkCol);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -2790,6 +2800,8 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
case PHY_SORT_CODE_CALC_GROUPID:
|
case PHY_SORT_CODE_CALC_GROUPID:
|
||||||
code = tlvDecodeBool(pTlv, &pNode->calcGroupId);
|
code = tlvDecodeBool(pTlv, &pNode->calcGroupId);
|
||||||
break;
|
break;
|
||||||
|
case PHY_SORT_CODE_EXCLUDE_PK_COL:
|
||||||
|
code = tlvDecodeBool(pTlv, &pNode->excludePkCol);
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -3148,7 +3160,14 @@ static int32_t msgToPhysiEventWindowNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
enum { PHY_PARTITION_CODE_BASE_NODE = 1, PHY_PARTITION_CODE_EXPR, PHY_PARTITION_CODE_KEYS, PHY_PARTITION_CODE_TARGETS };
|
enum {
|
||||||
|
PHY_PARTITION_CODE_BASE_NODE = 1,
|
||||||
|
PHY_PARTITION_CODE_EXPR,
|
||||||
|
PHY_PARTITION_CODE_KEYS,
|
||||||
|
PHY_PARTITION_CODE_TARGETS,
|
||||||
|
PHY_PARTITION_CODE_HAS_OUTPUT_TS_ORDER,
|
||||||
|
PHY_PARTITION_CODE_TS_SLOTID
|
||||||
|
};
|
||||||
|
|
||||||
static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
||||||
const SPartitionPhysiNode* pNode = (const SPartitionPhysiNode*)pObj;
|
const SPartitionPhysiNode* pNode = (const SPartitionPhysiNode*)pObj;
|
||||||
|
@ -3163,6 +3182,12 @@ static int32_t physiPartitionNodeToMsg(const void* pObj, STlvEncoder* pEncoder)
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tlvEncodeObj(pEncoder, PHY_PARTITION_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
|
code = tlvEncodeObj(pEncoder, PHY_PARTITION_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeBool(pEncoder, PHY_PARTITION_CODE_HAS_OUTPUT_TS_ORDER, pNode->needBlockOutputTsOrder);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tlvEncodeI32(pEncoder, PHY_PARTITION_CODE_TS_SLOTID, pNode->tsSlotId);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3186,6 +3211,12 @@ static int32_t msgToPhysiPartitionNode(STlvDecoder* pDecoder, void* pObj) {
|
||||||
case PHY_PARTITION_CODE_TARGETS:
|
case PHY_PARTITION_CODE_TARGETS:
|
||||||
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
|
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
|
||||||
break;
|
break;
|
||||||
|
case PHY_PARTITION_CODE_HAS_OUTPUT_TS_ORDER:
|
||||||
|
code = tlvDecodeBool(pTlv, &pNode->needBlockOutputTsOrder);
|
||||||
|
break;
|
||||||
|
case PHY_PARTITION_CODE_TS_SLOTID:
|
||||||
|
code = tlvDecodeI32(pTlv, &pNode->tsSlotId);
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,6 +51,8 @@ int32_t collectTableAliasFromNodes(SNode* pNode, SSHashObj** ppRes);
|
||||||
bool isPartTableAgg(SAggLogicNode* pAgg);
|
bool isPartTableAgg(SAggLogicNode* pAgg);
|
||||||
bool isPartTagAgg(SAggLogicNode* pAgg);
|
bool isPartTagAgg(SAggLogicNode* pAgg);
|
||||||
bool isPartTableWinodw(SWindowLogicNode* pWindow);
|
bool isPartTableWinodw(SWindowLogicNode* pWindow);
|
||||||
|
bool keysHasCol(SNodeList* pKeys);
|
||||||
|
bool keysHasTbname(SNodeList* pKeys);
|
||||||
|
|
||||||
#define CLONE_LIMIT 1
|
#define CLONE_LIMIT 1
|
||||||
#define CLONE_SLIMIT 1 << 1
|
#define CLONE_SLIMIT 1 << 1
|
||||||
|
|
|
@ -738,6 +738,9 @@ static int32_t createAggLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect,
|
||||||
}
|
}
|
||||||
nodesDestroyList(pOutputGroupKeys);
|
nodesDestroyList(pOutputGroupKeys);
|
||||||
|
|
||||||
|
pAgg->isGroupTb = pAgg->pGroupKeys ? keysHasTbname(pAgg->pGroupKeys) : 0;
|
||||||
|
pAgg->isPartTb = pSelect->pPartitionByList ? keysHasTbname(pSelect->pPartitionByList) : 0;
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
*pLogicNode = (SLogicNode*)pAgg;
|
*pLogicNode = (SLogicNode*)pAgg;
|
||||||
} else {
|
} else {
|
||||||
|
@ -959,6 +962,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva
|
||||||
nodesDestroyNode((SNode*)pWindow);
|
nodesDestroyNode((SNode*)pWindow);
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
|
pWindow->isPartTb = pSelect->pPartitionByList ? keysHasTbname(pSelect->pPartitionByList) : 0;
|
||||||
|
|
||||||
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
|
return createWindowLogicNodeFinalize(pCxt, pSelect, pWindow, pLogicNode);
|
||||||
}
|
}
|
||||||
|
@ -1256,6 +1260,14 @@ static int32_t createPartitionLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pS
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (keysHasCol(pPartition->pPartitionKeys) && pSelect->pWindow &&
|
||||||
|
nodeType(pSelect->pWindow) == QUERY_NODE_INTERVAL_WINDOW) {
|
||||||
|
pPartition->needBlockOutputTsOrder = true;
|
||||||
|
SIntervalWindowNode* pInterval = (SIntervalWindowNode*)pSelect->pWindow;
|
||||||
|
SColumnNode* pTsCol = (SColumnNode*)pInterval->pCol;
|
||||||
|
pPartition->tsSlotId = pTsCol->slotId;
|
||||||
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags) {
|
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pTags) {
|
||||||
pPartition->pTags = nodesCloneList(pSelect->pTags);
|
pPartition->pTags = nodesCloneList(pSelect->pTags);
|
||||||
if (NULL == pPartition->pTags) {
|
if (NULL == pPartition->pTags) {
|
||||||
|
|
|
@ -1661,22 +1661,6 @@ static int32_t smaIndexOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
||||||
return smaIndexOptimizeImpl(pCxt, pLogicSubplan, pScan);
|
return smaIndexOptimizeImpl(pCxt, pLogicSubplan, pScan);
|
||||||
}
|
}
|
||||||
|
|
||||||
static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) {
|
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
|
||||||
if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) {
|
|
||||||
*(bool*)pContext = true;
|
|
||||||
return DEAL_RES_END;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return DEAL_RES_CONTINUE;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool planOptNodeListHasCol(SNodeList* pKeys) {
|
|
||||||
bool hasCol = false;
|
|
||||||
nodesWalkExprs(pKeys, partTagsOptHasColImpl, &hasCol);
|
|
||||||
return hasCol;
|
|
||||||
}
|
|
||||||
|
|
||||||
static EDealRes partTagsOptHasTbname(SNode* pNode, void* pContext) {
|
static EDealRes partTagsOptHasTbname(SNode* pNode, void* pContext) {
|
||||||
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
if (COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) {
|
if (COLUMN_TYPE_TBNAME == ((SColumnNode*)pNode)->colType) {
|
||||||
|
@ -1755,7 +1739,7 @@ static bool partTagsOptMayBeOptimized(SLogicNode* pNode) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return !planOptNodeListHasCol(partTagsGetPartKeys(pNode)) && partTagsOptAreSupportedFuncs(partTagsGetFuncs(pNode));
|
return !keysHasCol(partTagsGetPartKeys(pNode)) && partTagsOptAreSupportedFuncs(partTagsGetFuncs(pNode));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t partTagsOptRebuildTbanme(SNodeList* pPartKeys) {
|
static int32_t partTagsOptRebuildTbanme(SNodeList* pPartKeys) {
|
||||||
|
@ -2042,7 +2026,7 @@ static int32_t eliminateProjOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan*
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild);
|
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pProjectNode, pChild);
|
||||||
TSWAP(pProjectNode->node.pHint, pChild->pHint);
|
if (pProjectNode->node.pHint && !pChild->pHint) TSWAP(pProjectNode->node.pHint, pChild->pHint);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
NODES_CLEAR_LIST(pProjectNode->node.pChildren);
|
NODES_CLEAR_LIST(pProjectNode->node.pChildren);
|
||||||
nodesDestroyNode((SNode*)pProjectNode);
|
nodesDestroyNode((SNode*)pProjectNode);
|
||||||
|
@ -2735,7 +2719,7 @@ static bool tagScanOptShouldBeOptimized(SLogicNode* pNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SAggLogicNode* pAgg = (SAggLogicNode*)(pNode->pParent);
|
SAggLogicNode* pAgg = (SAggLogicNode*)(pNode->pParent);
|
||||||
if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs || planOptNodeListHasCol(pAgg->pGroupKeys) ||
|
if (NULL == pAgg->pGroupKeys || NULL != pAgg->pAggFuncs || keysHasCol(pAgg->pGroupKeys) ||
|
||||||
!planOptNodeListHasTbname(pAgg->pGroupKeys)) {
|
!planOptNodeListHasTbname(pAgg->pGroupKeys)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -3591,8 +3575,7 @@ static int32_t stableJoinOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicS
|
||||||
static bool partColOptShouldBeOptimized(SLogicNode* pNode) {
|
static bool partColOptShouldBeOptimized(SLogicNode* pNode) {
|
||||||
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||||
SPartitionLogicNode* pPartition = (SPartitionLogicNode*)pNode;
|
SPartitionLogicNode* pPartition = (SPartitionLogicNode*)pNode;
|
||||||
if (pPartition->node.pParent && nodeType(pPartition->node.pParent) == QUERY_NODE_LOGIC_PLAN_WINDOW) return false;
|
if (keysHasCol(pPartition->pPartitionKeys)) return true;
|
||||||
if (planOptNodeListHasCol(pPartition->pPartitionKeys)) return true;
|
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -3604,6 +3587,7 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
|
||||||
if (pSort) {
|
if (pSort) {
|
||||||
pSort->groupSort = false;
|
pSort->groupSort = false;
|
||||||
TSWAP(pSort->node.pChildren, pPartition->node.pChildren);
|
TSWAP(pSort->node.pChildren, pPartition->node.pChildren);
|
||||||
|
optResetParent((SLogicNode*)pSort);
|
||||||
FOREACH(node, pPartition->pPartitionKeys) {
|
FOREACH(node, pPartition->pPartitionKeys) {
|
||||||
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
||||||
if (!pOrder) {
|
if (!pOrder) {
|
||||||
|
@ -3615,6 +3599,30 @@ static SSortLogicNode* partColOptCreateSort(SPartitionLogicNode* pPartition) {
|
||||||
if (!pOrder->pExpr) code = TSDB_CODE_OUT_OF_MEMORY;
|
if (!pOrder->pExpr) code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pPartition->needBlockOutputTsOrder) {
|
||||||
|
SOrderByExprNode* pOrder = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
|
||||||
|
if (!pOrder) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
} else {
|
||||||
|
pSort->excludePkCol = true;
|
||||||
|
nodesListMakeAppend(&pSort->pSortKeys, (SNode*)pOrder);
|
||||||
|
pOrder->order = ORDER_ASC;
|
||||||
|
pOrder->pExpr = 0;
|
||||||
|
FOREACH(node, pPartition->node.pTargets) {
|
||||||
|
if (nodeType(node) == QUERY_NODE_COLUMN) {
|
||||||
|
SColumnNode* pCol = (SColumnNode*)node;
|
||||||
|
if (pCol->slotId == pPartition->tsSlotId) {
|
||||||
|
pOrder->pExpr = nodesCloneNode((SNode*)pCol);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (!pOrder->pExpr) {
|
||||||
|
code = TSDB_CODE_PAR_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
pSort->node.pTargets = nodesCloneList(((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0))->pTargets);
|
pSort->node.pTargets = nodesCloneList(((SLogicNode*)nodesListGetNode(pSort->node.pChildren, 0))->pTargets);
|
||||||
|
@ -3639,13 +3647,17 @@ static int32_t partitionColsOpt(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
||||||
|
|
||||||
// replace with sort node
|
// replace with sort node
|
||||||
SSortLogicNode* pSort = partColOptCreateSort(pNode);
|
SSortLogicNode* pSort = partColOptCreateSort(pNode);
|
||||||
if (!pSort) code = TSDB_CODE_OUT_OF_MEMORY;
|
if (!pSort) {
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
// if sort create failed, we eat the error, skip the optimization
|
||||||
|
code = TSDB_CODE_SUCCESS;
|
||||||
|
} else {
|
||||||
pSort->calcGroupId = true;
|
pSort->calcGroupId = true;
|
||||||
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort);
|
code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pNode, (SLogicNode*)pSort);
|
||||||
}
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
pCxt->optimized = true;
|
pCxt->optimized = true;
|
||||||
|
} else {
|
||||||
|
nodesDestroyNode((SNode*)pSort);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1750,6 +1750,7 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
||||||
SNodeList* pSortKeys = NULL;
|
SNodeList* pSortKeys = NULL;
|
||||||
int32_t code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
|
int32_t code = rewritePrecalcExprs(pCxt, pSortLogicNode->pSortKeys, &pPrecalcExprs, &pSortKeys);
|
||||||
pSort->calcGroupId = pSortLogicNode->calcGroupId;
|
pSort->calcGroupId = pSortLogicNode->calcGroupId;
|
||||||
|
pSort->excludePkCol = pSortLogicNode->excludePkCol;
|
||||||
|
|
||||||
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
|
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
|
||||||
// push down expression to pOutputDataBlockDesc of child node
|
// push down expression to pOutputDataBlockDesc of child node
|
||||||
|
@ -1798,6 +1799,8 @@ static int32_t createPartitionPhysiNodeImpl(SPhysiPlanContext* pCxt, SNodeList*
|
||||||
SNodeList* pPrecalcExprs = NULL;
|
SNodeList* pPrecalcExprs = NULL;
|
||||||
SNodeList* pPartitionKeys = NULL;
|
SNodeList* pPartitionKeys = NULL;
|
||||||
int32_t code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
|
int32_t code = rewritePrecalcExprs(pCxt, pPartLogicNode->pPartitionKeys, &pPrecalcExprs, &pPartitionKeys);
|
||||||
|
pPart->needBlockOutputTsOrder = pPartLogicNode->needBlockOutputTsOrder;
|
||||||
|
pPart->tsSlotId = pPartLogicNode->tsSlotId;
|
||||||
|
|
||||||
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
|
SDataBlockDescNode* pChildTupe = (((SPhysiNode*)nodesListGetNode(pChildren, 0))->pOutputDataBlockDesc);
|
||||||
// push down expression to pOutputDataBlockDesc of child node
|
// push down expression to pOutputDataBlockDesc of child node
|
||||||
|
|
|
@ -358,12 +358,12 @@ static bool stbNotSystemScan(SLogicNode* pNode) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool stbHasPartTbname(SNodeList* pPartKeys) {
|
bool keysHasTbname(SNodeList* pKeys) {
|
||||||
if (NULL == pPartKeys) {
|
if (NULL == pKeys) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
SNode* pPartKey = NULL;
|
SNode* pPartKey = NULL;
|
||||||
FOREACH(pPartKey, pPartKeys) {
|
FOREACH(pPartKey, pKeys) {
|
||||||
if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) {
|
if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) {
|
||||||
pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0);
|
pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0);
|
||||||
}
|
}
|
||||||
|
@ -390,10 +390,10 @@ bool isPartTableAgg(SAggLogicNode* pAgg) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
if (NULL != pAgg->pGroupKeys) {
|
if (NULL != pAgg->pGroupKeys) {
|
||||||
return stbHasPartTbname(pAgg->pGroupKeys) &&
|
return (pAgg->isGroupTb || keysHasTbname(pAgg->pGroupKeys)) &&
|
||||||
stbNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
|
stbNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
|
||||||
}
|
}
|
||||||
return stbHasPartTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
|
return pAgg->isPartTb || keysHasTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool stbHasPartTag(SNodeList* pPartKeys) {
|
static bool stbHasPartTag(SNodeList* pPartKeys) {
|
||||||
|
@ -478,7 +478,7 @@ bool isPartTagAgg(SAggLogicNode* pAgg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isPartTableWinodw(SWindowLogicNode* pWindow) {
|
bool isPartTableWinodw(SWindowLogicNode* pWindow) {
|
||||||
return stbHasPartTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
|
return pWindow->isPartTb || keysHasTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
|
||||||
}
|
}
|
||||||
|
|
||||||
bool cloneLimit(SLogicNode* pParent, SLogicNode* pChild, uint8_t cloneWhat) {
|
bool cloneLimit(SLogicNode* pParent, SLogicNode* pChild, uint8_t cloneWhat) {
|
||||||
|
@ -501,3 +501,19 @@ bool cloneLimit(SLogicNode* pParent, SLogicNode* pChild, uint8_t cloneWhat) {
|
||||||
}
|
}
|
||||||
return cloned;
|
return cloned;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static EDealRes partTagsOptHasColImpl(SNode* pNode, void* pContext) {
|
||||||
|
if (QUERY_NODE_COLUMN == nodeType(pNode)) {
|
||||||
|
if (COLUMN_TYPE_TAG != ((SColumnNode*)pNode)->colType && COLUMN_TYPE_TBNAME != ((SColumnNode*)pNode)->colType) {
|
||||||
|
*(bool*)pContext = true;
|
||||||
|
return DEAL_RES_END;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return DEAL_RES_CONTINUE;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool keysHasCol(SNodeList* pKeys) {
|
||||||
|
bool hasCol = false;
|
||||||
|
nodesWalkExprs(pKeys, partTagsOptHasColImpl, &hasCol);
|
||||||
|
return hasCol;
|
||||||
|
}
|
||||||
|
|
|
@ -86,7 +86,15 @@ class TDTestCase:
|
||||||
'stbName': 'meters',
|
'stbName': 'meters',
|
||||||
'colPrefix': 'c',
|
'colPrefix': 'c',
|
||||||
'tagPrefix': 't',
|
'tagPrefix': 't',
|
||||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'FLOAT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'smallint', 'count':1},{'type': 'tinyint', 'count':1},{'type': 'bool', 'count':1},{'type': 'binary', 'len':10, 'count':1},{'type': 'nchar', 'len':10, 'count':1}],
|
'colSchema': [{'type': 'INT', 'count':1},
|
||||||
|
{'type': 'BIGINT', 'count':1},
|
||||||
|
{'type': 'FLOAT', 'count':1},
|
||||||
|
{'type': 'DOUBLE', 'count':1},
|
||||||
|
{'type': 'smallint', 'count':1},
|
||||||
|
{'type': 'tinyint', 'count':1},
|
||||||
|
{'type': 'bool', 'count':1},
|
||||||
|
{'type': 'binary', 'len':10, 'count':1},
|
||||||
|
{'type': 'nchar', 'len':10, 'count':1}],
|
||||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
|
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'nchar', 'len':20, 'count':1},{'type': 'binary', 'len':20, 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'smallint', 'count':1},{'type': 'DOUBLE', 'count':1}],
|
||||||
'ctbPrefix': 't',
|
'ctbPrefix': 't',
|
||||||
'ctbStartIdx': 0,
|
'ctbStartIdx': 0,
|
||||||
|
@ -128,26 +136,35 @@ class TDTestCase:
|
||||||
|
|
||||||
|
|
||||||
def test_sort_for_partition_hint(self):
|
def test_sort_for_partition_hint(self):
|
||||||
sql = 'explain select count(*), c1 from meters partition by c1'
|
sql = 'select count(*), c1 from meters partition by c1'
|
||||||
sql_hint = 'explain select /*+ sort_for_group() */count(*), c1 from meters partition by c1'
|
sql_hint = 'select /*+ sort_for_group() */count(*), c1 from meters partition by c1'
|
||||||
tdSql.query(sql)
|
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
|
||||||
self.check_explain_res_has_row("Partition on", tdSql.queryResult)
|
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||||
tdSql.query(sql_hint)
|
|
||||||
self.check_explain_res_has_row("Sort", tdSql.queryResult)
|
|
||||||
|
|
||||||
sql = 'explain select count(*), c1, tbname from meters partition by tbname, c1'
|
sql = 'select count(*), c1, tbname from meters partition by tbname, c1'
|
||||||
sql_hint = 'explain select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1'
|
sql_hint = 'select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1'
|
||||||
tdSql.query(sql)
|
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
|
||||||
self.check_explain_res_has_row("Partition on", tdSql.queryResult)
|
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||||
tdSql.query(sql_hint)
|
|
||||||
self.check_explain_res_has_row("Sort", tdSql.queryResult)
|
|
||||||
|
|
||||||
sql_interval = 'explain select count(*), c1 from meters partition by c1 interval(1s)'
|
sql = 'select count(*), c1, tbname from meters partition by tbname, c1 interval(1s)'
|
||||||
sql_interval_hint = 'explain select /*+ sort_for_group() */ count(*), c1 from meters partition by c1 interval(1s)'
|
sql_hint = 'select /*+ sort_for_group() */ count(*), c1, tbname from meters partition by tbname, c1 interval(1s)'
|
||||||
tdSql.query(sql_interval)
|
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
|
||||||
self.check_explain_res_has_row("Partition on", tdSql.queryResult)
|
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||||
tdSql.query(sql_interval_hint)
|
|
||||||
self.check_explain_res_has_row("Partition on", tdSql.queryResult)
|
sql = 'select count(*), c1, t1 from meters partition by t1, c1'
|
||||||
|
sql_hint = 'select /*+ sort_for_group() */ count(*), c1, t1 from meters partition by t1, c1'
|
||||||
|
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
|
||||||
|
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||||
|
|
||||||
|
sql = 'select count(*), c1, t1 from meters partition by t1, c1 interval(1s)'
|
||||||
|
sql_hint = 'select /*+ sort_for_group() */ count(*), c1, t1 from meters partition by t1, c1 interval(1s)'
|
||||||
|
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
|
||||||
|
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||||
|
|
||||||
|
sql = 'select count(*), c1 from meters partition by c1 interval(1s)'
|
||||||
|
sql_hint = 'select /*+ sort_for_group() */ count(*), c1 from meters partition by c1 interval(1s)'
|
||||||
|
self.check_explain_res_has_row("Partition on", self.explain_sql(sql))
|
||||||
|
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||||
|
|
||||||
def add_order_by(self, sql: str, order_by: str, select_list: str = "*") -> str:
|
def add_order_by(self, sql: str, order_by: str, select_list: str = "*") -> str:
|
||||||
return "select %s from (%s)t order by %s" % (select_list, sql, order_by)
|
return "select %s from (%s)t order by %s" % (select_list, sql, order_by)
|
||||||
|
@ -157,9 +174,14 @@ class TDTestCase:
|
||||||
|
|
||||||
def query_with_time(self, sql):
|
def query_with_time(self, sql):
|
||||||
start = datetime.now()
|
start = datetime.now()
|
||||||
tdSql.query(sql)
|
tdSql.query(sql, queryTimes=1)
|
||||||
return (datetime.now().timestamp() - start.timestamp()) * 1000
|
return (datetime.now().timestamp() - start.timestamp()) * 1000
|
||||||
|
|
||||||
|
def explain_sql(self, sql: str):
|
||||||
|
sql = "explain " + sql
|
||||||
|
tdSql.query(sql)
|
||||||
|
return tdSql.queryResult
|
||||||
|
|
||||||
def query_and_compare_res(self, sql1, sql2):
|
def query_and_compare_res(self, sql1, sql2):
|
||||||
dur = self.query_with_time(sql1)
|
dur = self.query_with_time(sql1)
|
||||||
tdLog.debug("sql1 query with time: [%f]" % dur)
|
tdLog.debug("sql1 query with time: [%f]" % dur)
|
||||||
|
@ -180,8 +202,9 @@ class TDTestCase:
|
||||||
for sql in sqls:
|
for sql in sqls:
|
||||||
sql_hint = self.add_order_by(self.add_hint(sql), order_by, select_list)
|
sql_hint = self.add_order_by(self.add_hint(sql), order_by, select_list)
|
||||||
sql = self.add_order_by(sql, order_by, select_list)
|
sql = self.add_order_by(sql, order_by, select_list)
|
||||||
|
self.check_explain_res_has_row("Sort", self.explain_sql(sql_hint))
|
||||||
|
self.check_explain_res_has_row("Partition", self.explain_sql(sql))
|
||||||
self.query_and_compare_res(sql, sql_hint)
|
self.query_and_compare_res(sql, sql_hint)
|
||||||
pass
|
|
||||||
|
|
||||||
def test_sort_for_partition_res(self):
|
def test_sort_for_partition_res(self):
|
||||||
sqls_par_c1_agg = [
|
sqls_par_c1_agg = [
|
||||||
|
@ -200,7 +223,7 @@ class TDTestCase:
|
||||||
]
|
]
|
||||||
|
|
||||||
sqls_par_tbname_c1 = [
|
sqls_par_tbname_c1 = [
|
||||||
"select count(*), c1 , tbname as tb from meters partition by tbname, c1"
|
"select count(*), c1 , tbname as a from meters partition by tbname, c1"
|
||||||
]
|
]
|
||||||
sqls_par_tag_c1 = [
|
sqls_par_tag_c1 = [
|
||||||
"select count(*), c1, t1 from meters partition by t1, c1"
|
"select count(*), c1, t1 from meters partition by t1, c1"
|
||||||
|
@ -209,13 +232,60 @@ class TDTestCase:
|
||||||
self.prepare_and_query(sqls_par_c1, "c1, ts, c2", "c1, ts, c2")
|
self.prepare_and_query(sqls_par_c1, "c1, ts, c2", "c1, ts, c2")
|
||||||
self.prepare_and_query(sqls_par_c1_c2_agg, "c1, c2")
|
self.prepare_and_query(sqls_par_c1_c2_agg, "c1, c2")
|
||||||
self.prepare_and_query(sqls_par_c1_c2, "c1, c2, ts, c3", "c1, c2, ts, c3")
|
self.prepare_and_query(sqls_par_c1_c2, "c1, c2, ts, c3", "c1, c2, ts, c3")
|
||||||
self.prepare_and_query(sqls_par_tbname_c1, "tb, c1")
|
self.prepare_and_query(sqls_par_tbname_c1, "a, c1")
|
||||||
self.prepare_and_query(sqls_par_tag_c1, "t1, c1")
|
self.prepare_and_query(sqls_par_tag_c1, "t1, c1")
|
||||||
|
|
||||||
|
def get_interval_template_sqls(self, col_name):
|
||||||
|
sqls = [
|
||||||
|
'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1s)' % (col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30s)' % (col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1m)' % (col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(30m)' % (col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), tbname as a, %s from meters partition by tbname, %s interval(1h)' % (col_name, col_name),
|
||||||
|
|
||||||
|
'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1s)' % (col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30s)' % (col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1m)' % (col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(30m)' % (col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), t1 as a, %s from meters partition by t1, %s interval(1h)' % (col_name, col_name),
|
||||||
|
|
||||||
|
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1s)' % (col_name, col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30s)' % (col_name, col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1m)' % (col_name, col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(30m)' % (col_name, col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), %s as a, %s from meters partition by %s interval(1h)' % (col_name, col_name, col_name),
|
||||||
|
|
||||||
|
'select _wstart as ts, count(*), tbname as a, %s from meters partition by %s, tbname interval(1s)' % (col_name, col_name),
|
||||||
|
'select _wstart as ts, count(*), t1 as a, %s from meters partition by %s, t1 interval(1s)' % (col_name, col_name),
|
||||||
|
]
|
||||||
|
order_list = 'a, %s, ts' % (col_name)
|
||||||
|
return (sqls, order_list)
|
||||||
|
|
||||||
|
def test_sort_for_partition_interval(self):
|
||||||
|
sqls, order_list = self.get_interval_template_sqls('c1')
|
||||||
|
self.prepare_and_query(sqls, order_list)
|
||||||
|
sqls, order_list = self.get_interval_template_sqls('c2')
|
||||||
|
self.prepare_and_query(sqls, order_list)
|
||||||
|
sqls, order_list = self.get_interval_template_sqls('c3')
|
||||||
|
self.prepare_and_query(sqls, order_list)
|
||||||
|
sqls, order_list = self.get_interval_template_sqls('c4')
|
||||||
|
self.prepare_and_query(sqls, order_list)
|
||||||
|
sqls, order_list = self.get_interval_template_sqls('c5')
|
||||||
|
self.prepare_and_query(sqls, order_list)
|
||||||
|
sqls, order_list = self.get_interval_template_sqls('c6')
|
||||||
|
self.prepare_and_query(sqls, order_list)
|
||||||
|
sqls, order_list = self.get_interval_template_sqls('c7')
|
||||||
|
self.prepare_and_query(sqls, order_list)
|
||||||
|
sqls, order_list = self.get_interval_template_sqls('c8')
|
||||||
|
self.prepare_and_query(sqls, order_list)
|
||||||
|
sqls, order_list = self.get_interval_template_sqls('c9')
|
||||||
|
self.prepare_and_query(sqls, order_list)
|
||||||
|
|
||||||
def run(self):
|
def run(self):
|
||||||
self.prepareTestEnv()
|
self.prepareTestEnv()
|
||||||
self.test_sort_for_partition_hint()
|
self.test_sort_for_partition_hint()
|
||||||
self.test_sort_for_partition_res()
|
self.test_sort_for_partition_res()
|
||||||
|
self.test_sort_for_partition_interval()
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
tdSql.close()
|
tdSql.close()
|
||||||
|
|
Loading…
Reference in New Issue