diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 169a9ee17f..c0412d2617 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -187,6 +187,9 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex); int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows); +int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx); +void colDataTrim(SColumnInfoData* pColumnInfoData); + size_t blockDataGetNumOfCols(const SSDataBlock* pBlock); size_t blockDataGetNumOfRows(const SSDataBlock* pBlock); diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 8aa17e46d1..d53f78b41e 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -184,6 +184,7 @@ extern int64_t tsStreamBufferSize; extern int64_t tsCheckpointInterval; extern bool tsFilterScalarMode; extern int32_t tsMaxStreamBackendCache; +extern int32_t tsPQSortMemThreshold; // #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize) diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 453c5d4914..c1481da80c 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -246,6 +246,7 @@ typedef struct SSortLogicNode { SLogicNode node; SNodeList* pSortKeys; bool groupSort; + int64_t maxRows; } SSortLogicNode; typedef struct SPartitionLogicNode { @@ -523,6 +524,7 @@ typedef struct SSortPhysiNode { SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode SNodeList* pTargets; + int64_t maxRows; } SSortPhysiNode; typedef SSortPhysiNode SGroupSortPhysiNode; diff --git a/include/util/theap.h b/include/util/theap.h index fb5ff8301a..8ddeeb28a4 100644 --- a/include/util/theap.h +++ b/include/util/theap.h @@ -17,6 +17,7 @@ #define _TD_UTIL_HEAP_H_ #include "os.h" +#include "tarray.h" #ifdef __cplusplus extern "C" { @@ -58,6 +59,48 @@ void heapDequeue(Heap* heap); size_t heapSize(Heap* heap); +typedef bool (*pq_comp_fn)(void* l, void* r, void* param); + +typedef struct PriorityQueueNode { + void* data; +} PriorityQueueNode; + +typedef struct PriorityQueue PriorityQueue; + +PriorityQueue* createPriorityQueue(pq_comp_fn fn, FDelete deleteFn, void* param); + +void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn); + +void destroyPriorityQueue(PriorityQueue* pq); + +PriorityQueueNode* taosPQTop(PriorityQueue* pq); + +size_t taosPQSize(PriorityQueue* pq); + +void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node); + +void taosPQPop(PriorityQueue* pq); + +typedef struct BoundedQueue BoundedQueue; + +BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete deleteFn, void* param); + +void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn); + +void destroyBoundedQueue(BoundedQueue* q); + +void taosBQPush(BoundedQueue* q, PriorityQueueNode* n); + +PriorityQueueNode* taosBQTop(BoundedQueue* q); + +size_t taosBQSize(BoundedQueue* q); + +size_t taosBQMaxSize(BoundedQueue* q); + +void taosBQBuildHeap(BoundedQueue* q); + +void taosBQPop(BoundedQueue* q); + #ifdef __cplusplus } #endif diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 7bbce934d2..b2f03fa7ba 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -47,7 +47,17 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo } } -static int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { +int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) { + if (colDataIsNull_s(pColumnInfoData, rowIdx)) return 0; + + if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) return pColumnInfoData->info.bytes; + if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON) + return getJsonValueLen(colDataGetData(pColumnInfoData, rowIdx)); + else + return varDataTLen(colDataGetData(pColumnInfoData, rowIdx)); +} + +int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows; } else { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 948174b565..5f6ec92d50 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -62,6 +62,7 @@ int32_t tsNumOfQnodeFetchThreads = 1; int32_t tsNumOfSnodeStreamThreads = 4; int32_t tsNumOfSnodeWriteThreads = 1; int32_t tsMaxStreamBackendCache = 128; // M +int32_t tsPQSortMemThreshold = 16; // M // sync raft int32_t tsElectInterval = 25 * 1000; @@ -533,6 +534,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, 0) != 0) return -1; if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, 0) != 0) return -1; GRANT_CFG_ADD; return 0; @@ -914,6 +916,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval; tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32; + tsPQSortMemThreshold = cfgGetItem(pCfg, "pqSortMemThreshold")->i32; GRANT_CFG_GET; return 0; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index 30911c6061..33c9d845b9 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -75,10 +75,11 @@ typedef struct SResultRowInfo { } SResultRowInfo; typedef struct SColMatchItem { - int32_t colId; - int32_t srcSlotId; - int32_t dstSlotId; - bool needOutput; + int32_t colId; + int32_t srcSlotId; + int32_t dstSlotId; + bool needOutput; + SDataType dataType; } SColMatchItem; typedef struct SColMatchInfo { diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index 78c56c0405..7a0d236a37 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -64,10 +64,14 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* /** * * @param type + * @param maxRows keep maxRows at most + * @param maxTupleLength max len of one tuple, for check if heap sort is applicable + * @param sortBufSize sort memory buf size, for check if heap sort is applicable * @return */ SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, - SSDataBlock* pBlock, const char* idstr); + SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength, + uint32_t sortBufSize); /** * diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 0928029557..cfea233a1c 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1305,6 +1305,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod c.colId = pColNode->colId; c.srcSlotId = pColNode->slotId; c.dstSlotId = pNode->slotId; + c.dataType = pColNode->node.resType; taosArrayPush(pList, &c); } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7f510d6745..d56fa9de78 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2884,7 +2884,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1); int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, - pInfo->pSortInputBlock, pTaskInfo->id.str); + pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0); tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 585c2e8c54..20fb588a02 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -29,6 +29,8 @@ typedef struct SSortOperatorInfo { int64_t startTs; // sort start time uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included. SLimitInfo limitInfo; + uint64_t maxTupleLength; + int64_t maxRows; } SSortOperatorInfo; static SSDataBlock* doSort(SOperatorInfo* pOperator); @@ -36,6 +38,7 @@ static int32_t doOpenSortOperator(SOperatorInfo* pOperator); static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len); static void destroySortOperatorInfo(void* param); +static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys); // todo add limit/offset impl SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) { @@ -51,6 +54,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* int32_t numOfCols = 0; pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); pOperator->exprSupp.numOfExprs = numOfCols; + calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys); + pInfo->maxRows = pSortNode->maxRows; int32_t numOfOutputCols = 0; int32_t code = @@ -193,9 +198,9 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) { } pInfo->startTs = taosGetTimestampUs(); - // pInfo->binfo.pRes is not equalled to the input datablock. - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, + pInfo->maxRows, pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator); @@ -286,6 +291,20 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* return TSDB_CODE_SUCCESS; } +static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys) { + SColMatchInfo* pColItem = &pSortOperInfo->matchInfo; + size_t size = taosArrayGetSize(pColItem->pList); + for (size_t i = 0; i < size; ++i) { + pSortOperInfo->maxTupleLength += ((SColMatchItem*)taosArrayGet(pColItem->pList, i))->dataType.bytes; + } + size = LIST_LENGTH(pSortKeys); + for (size_t i = 0; i < size; ++i) { + SOrderByExprNode* pOrderExprNode = (SOrderByExprNode*)nodesListGetNode(pSortKeys, i); + pSortOperInfo->maxTupleLength += ((SColumnNode*)pOrderExprNode->pExpr)->node.resType.bytes; + } + return TSDB_CODE_SUCCESS; +} + //===================================================================================== // Group Sort Operator typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus; @@ -384,7 +403,7 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) { // pInfo->binfo.pRes is not equalled to the input datablock. pInfo->pCurrSortHandle = - tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str); + tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0, 0); tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator); @@ -582,7 +601,7 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) { int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage, - pInfo->pInputBlock, pTaskInfo->id.str); + pInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL); tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 58b3428b5b..daf06c81d1 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -19,6 +19,7 @@ #include "tcompare.h" #include "tdatablock.h" #include "tdef.h" +#include "theap.h" #include "tlosertree.h" #include "tpagedbuf.h" #include "tsort.h" @@ -41,6 +42,12 @@ struct SSortHandle { int64_t startTs; uint64_t totalElapsed; + uint64_t maxRows; + uint32_t maxTupleLength; + uint32_t sortBufSize; + BoundedQueue* pBoundedQueue; + uint32_t tmpRowIdx; + int32_t sourceId; SSDataBlock* pDataBlock; SMsortComparParam cmpParam; @@ -61,6 +68,47 @@ struct SSortHandle { static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param); +// | offset[0] | offset[1] |....| nullbitmap | data |...| +static void* createTuple(uint32_t columnNum, uint32_t tupleLen) { + uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen; + return taosMemoryCalloc(1, totalLen); +} +static void destoryTuple(void* t) { taosMemoryFree(t); } + +#define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx)) +#define tupleSetOffset(tuple, colIdx, offset) (*tupleOffset(tuple, colIdx) = offset) +#define tupleSetNull(tuple, colIdx, colNum) colDataSetNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx) +#define tupleColIsNull(tuple, colIdx, colNum) colDataIsNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx) +#define tupleGetDataStartOffset(colNum) (sizeof(uint32_t) * colNum + BitmapLen(colNum)) +#define tupleSetData(tuple, offset, data, length) memcpy(tuple + offset, data, length) + +/** + * @param t the tuple pointer addr, if realloced, *t is changed to the new addr + * @param offset copy data into pTuple start from offset + * @param colIndex the columnIndex, for setting null bitmap + * @return the next offset to add field + * */ +static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, size_t length, + bool isNull, uint32_t tupleLen) { + tupleSetOffset(*t, colIdx, offset); + if (isNull) { + tupleSetNull(*t, colIdx, colNum); + } else { + if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) { + *t = taosMemoryRealloc(*t, offset + length); + } + tupleSetData(*t, offset, data, length); + } + return offset + length; +} + +static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) { + if (tupleColIsNull(t, colIdx, colNum)) return NULL; + return t + *tupleOffset(t, colIdx); +} + +static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param); + SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) { return createOneDataBlock(pSortHandle->pDataBlock, false); } @@ -71,7 +119,8 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) { * @return */ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, - SSDataBlock* pBlock, const char* idstr) { + SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength, + uint32_t sortBufSize) { SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle)); pSortHandle->type = type; @@ -80,6 +129,13 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page pSortHandle->pSortInfo = pSortInfo; pSortHandle->loops = 0; + pSortHandle->maxTupleLength = maxTupleLength; + if (maxRows < 0) + pSortHandle->sortBufSize = 0; + else + pSortHandle->sortBufSize = sortBufSize; + pSortHandle->maxRows = maxRows; + if (pBlock != NULL) { pSortHandle->pDataBlock = createOneDataBlock(pBlock, false); } @@ -150,7 +206,6 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { if (pSortHandle == NULL) { return; } - tsortClose(pSortHandle); if (pSortHandle->pMergeTree != NULL) { tMergeTreeDestroy(&pSortHandle->pMergeTree); @@ -159,6 +214,7 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) { destroyDiskbasedBuf(pSortHandle->pBuf); taosMemoryFreeClear(pSortHandle->idStr); blockDataDestroy(pSortHandle->pDataBlock); + if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue); int64_t fetchUs = 0, fetchNum = 0; tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum); @@ -769,17 +825,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) { return code; } -int32_t tsortOpen(SSortHandle* pHandle) { - if (pHandle->opened) { - return 0; - } - - if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { - return -1; - } - - pHandle->opened = true; - +static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) { int32_t code = createInitialSources(pHandle); if (code != TSDB_CODE_SUCCESS) { return code; @@ -840,7 +886,7 @@ int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) { return TSDB_CODE_SUCCESS; } -STupleHandle* tsortNextTuple(SSortHandle* pHandle) { +static STupleHandle* tsortBufMergeSortNextTuple(SSortHandle* pHandle) { if (tsortIsClosed(pHandle)) { return NULL; } @@ -890,6 +936,168 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) { return &pHandle->tupleHandle; } +static bool tsortIsPQSortApplicable(SSortHandle* pHandle) { + if (pHandle->type != SORT_SINGLESOURCE_SORT) return false; + uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*)); + return maxRowsFitInMemory > pHandle->maxRows; +} + +static bool tsortPQCompFn(void* a, void* b, void* param) { + SSortHandle* pHandle = param; + int32_t res = pHandle->comparFn(a, b, param); + if (res < 0) return 1; + return 0; +} + +static bool tsortPQComFnReverse(void*a, void* b, void* param) { + SSortHandle* pHandle = param; + int32_t res = pHandle->comparFn(a, b, param); + if (res > 0) return 1; + return 0; +} + +static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param) { + char* pLTuple = (char*)pLeft; + char* pRTuple = (char*)pRight; + SSortHandle* pHandle = (SSortHandle*)param; + SArray* orderInfo = (SArray*)pHandle->pSortInfo; + uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock); + for (int32_t i = 0; i < orderInfo->size; ++i) { + SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i); + void *lData = tupleGetField(pLTuple, pOrder->slotId, colNum); + void *rData = tupleGetField(pRTuple, pOrder->slotId, colNum); + if (!lData && !rData) continue; + if (!lData) return pOrder->nullFirst ? -1 : 1; + if (!rData) return pOrder->nullFirst ? 1 : -1; + + int type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type; + __compar_fn_t fn = getKeyComparFunc(type, pOrder->order); + + int ret = fn(lData, rData); + if (ret == 0) { + continue; + } else { + return ret; + } + } + return 0; +} + +static int32_t tsortOpenForPQSort(SSortHandle* pHandle) { + pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destoryTuple, pHandle); + if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY; + tsortSetComparFp(pHandle, colDataComparFn); + + SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0); + SSortSource* source = *pSource; + + pHandle->pDataBlock = NULL; + uint32_t tupleLen = 0; + PriorityQueueNode pqNode; + while (1) { + // fetch data + SSDataBlock* pBlock = pHandle->fetchfp(source->param); + if (NULL == pBlock) break; + + if (pHandle->beforeFp != NULL) { + pHandle->beforeFp(pBlock, pHandle->param); + } + if (pHandle->pDataBlock == NULL) { + pHandle->pDataBlock = createOneDataBlock(pBlock, false); + } + if (pHandle->pDataBlock == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + size_t colNum = blockDataGetNumOfCols(pBlock); + + if (tupleLen == 0) { + for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); + tupleLen += pCol->info.bytes; + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + tupleLen += sizeof(VarDataLenT); + } + } + } + size_t colLen = 0; + for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) { + void* pTuple = createTuple(colNum, tupleLen); + if (pTuple == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + uint32_t offset = tupleGetDataStartOffset(colNum); + for (size_t colIdx = 0; colIdx < colNum; ++colIdx) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx); + if (colDataIsNull_s(pCol, rowIdx)) { + offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen); + } else { + colLen = colDataGetRowLength(pCol, rowIdx); + offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false, + tupleLen); + } + } + pqNode.data = pTuple; + taosBQPush(pHandle->pBoundedQueue, &pqNode); + } + } + return TSDB_CODE_SUCCESS; +} + +static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) { + blockDataCleanup(pHandle->pDataBlock); + blockDataEnsureCapacity(pHandle->pDataBlock, 1); + // abondan the top tuple if queue size bigger than max size + if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) { + taosBQPop(pHandle->pBoundedQueue); + } + if (pHandle->tmpRowIdx == 0) { + // sort the results + taosBQSetFn(pHandle->pBoundedQueue, tsortPQComFnReverse); + taosBQBuildHeap(pHandle->pBoundedQueue); + } + if (taosBQSize(pHandle->pBoundedQueue) > 0) { + uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock); + PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue); + char* pTuple = (char*)node->data; + + for (uint32_t i = 0; i < colNum; ++i) { + void* pData = tupleGetField(pTuple, i, colNum); + if (!pData) { + colDataSetNULL(bdGetColumnInfoData(pHandle->pDataBlock, i), 0); + } else { + colDataAppend(bdGetColumnInfoData(pHandle->pDataBlock, i), 0, pData, false); + } + } + pHandle->pDataBlock->info.rows++; + pHandle->tmpRowIdx++; + taosBQPop(pHandle->pBoundedQueue); + } + if (pHandle->pDataBlock->info.rows == 0) return NULL; + pHandle->tupleHandle.pBlock = pHandle->pDataBlock; + return &pHandle->tupleHandle; +} + +int32_t tsortOpen(SSortHandle* pHandle) { + if (pHandle->opened) { + return 0; + } + + if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) { + return -1; + } + + pHandle->opened = true; + if (tsortIsPQSortApplicable(pHandle)) + return tsortOpenForPQSort(pHandle); + else + return tsortOpenForBufMergeSort(pHandle); +} + +STupleHandle* tsortNextTuple(SSortHandle* pHandle) { + if (pHandle->pBoundedQueue) + return tsortPQSortNextTuple(pHandle); + else + return tsortBufMergeSortNextTuple(pHandle); +} + bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) { SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex); return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex); diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 6e4dde4ec1..8305daa45e 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -502,6 +502,7 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) { COPY_BASE_OBJECT_FIELD(node, logicNodeCopy); CLONE_NODE_LIST_FIELD(pSortKeys); COPY_SCALAR_FIELD(groupSort); + COPY_SCALAR_FIELD(maxRows); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index 0b449c5bfe..99790e0a93 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -2100,6 +2100,7 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) { static const char* jkSortPhysiPlanExprs = "Exprs"; static const char* jkSortPhysiPlanSortKeys = "SortKeys"; static const char* jkSortPhysiPlanTargets = "Targets"; +static const char* jkSortPhysiPlanMaxRows = "MaxRows"; static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) { const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; @@ -2114,6 +2115,9 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) { if (TSDB_CODE_SUCCESS == code) { code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonAddIntegerToObject(pJson, jkSortPhysiPlanMaxRows, pNode->maxRows); + } return code; } @@ -2131,6 +2135,9 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) { if (TSDB_CODE_SUCCESS == code) { code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tjsonGetBigIntValue(pJson, jkSortPhysiPlanMaxRows, &pNode->maxRows); + } return code; } diff --git a/source/libs/nodes/src/nodesMsgFuncs.c b/source/libs/nodes/src/nodesMsgFuncs.c index 1ca37defa4..e79a520615 100644 --- a/source/libs/nodes/src/nodesMsgFuncs.c +++ b/source/libs/nodes/src/nodesMsgFuncs.c @@ -2594,7 +2594,7 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) { return code; } -enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS }; +enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_MAX_ROWS }; static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj; @@ -2609,6 +2609,9 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) { if (TSDB_CODE_SUCCESS == code) { code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets); } + if (TSDB_CODE_SUCCESS == code) { + code = tlvEncodeI64(pEncoder, PHY_SORT_CODE_MAX_ROWS, pNode->maxRows); + } return code; } @@ -2632,6 +2635,9 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) { case PHY_SORT_CODE_TARGETS: code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets); break; + case PHY_SORT_CODE_MAX_ROWS: + code = tlvDecodeI64(pTlv, &pNode->maxRows); + break; default: break; } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 713f12e229..4a8d100db3 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -1027,6 +1027,7 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect return TSDB_CODE_OUT_OF_MEMORY; } + pSort->maxRows = -1; pSort->groupSort = pSelect->groupSort; pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR; pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE; @@ -1298,6 +1299,7 @@ static int32_t createSetOpSortLogicNode(SLogicPlanContext* pCxt, SSetOperator* p return TSDB_CODE_OUT_OF_MEMORY; } + pSort->maxRows = -1; TSWAP(pSort->node.pLimit, pSetOperator->pLimit); int32_t code = TSDB_CODE_SUCCESS; diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 2d1a758f33..82d883714d 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -123,7 +123,7 @@ static void optSetParentOrder(SLogicNode* pNode, EOrder order, SLogicNode* pNode pNode->inputTsOrder = order; switch (nodeType(pNode)) { // for those nodes that will change the order, stop propagating - //case QUERY_NODE_LOGIC_PLAN_WINDOW: + // case QUERY_NODE_LOGIC_PLAN_WINDOW: case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_SORT: @@ -769,8 +769,9 @@ static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond) } SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft); SColumnNode* pRight = (SColumnNode*)(pOper->pRight); - //TODO: add cast to operator and remove this restriction of optimization - if (pLeft->node.resType.type != pRight->node.resType.type || pLeft->node.resType.bytes != pRight->node.resType.bytes) { + // TODO: add cast to operator and remove this restriction of optimization + if (pLeft->node.resType.type != pRight->node.resType.type || + pLeft->node.resType.bytes != pRight->node.resType.bytes) { return false; } SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets; @@ -2575,7 +2576,7 @@ static void tagScanOptCloneAncestorSlimit(SLogicNode* pTableScanNode) { SLogicNode* pNode = tagScanOptFindAncestorWithSlimit(pTableScanNode); if (NULL != pNode) { - //TODO: only set the slimit now. push down slimit later + // TODO: only set the slimit now. push down slimit later pTableScanNode->pSlimit = nodesCloneNode(pNode->pSlimit); ((SLimitNode*)pTableScanNode->pSlimit)->limit += ((SLimitNode*)pTableScanNode->pSlimit)->offset; ((SLimitNode*)pTableScanNode->pSlimit)->offset = 0; @@ -2629,8 +2630,16 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp } static bool pushDownLimitOptShouldBeOptimized(SLogicNode* pNode) { - if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren) || - QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) { + if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren)) { + return false; + } + + SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0); + if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) { + SLimitNode* pChildLimit = (SLimitNode*)(pChild->pLimit); + // if we have pushed down, we skip it + if ((*(SSortLogicNode*)pChild).maxRows != -1) return false; + } else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) { return false; } return true; @@ -2644,8 +2653,18 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0); nodesDestroyNode(pChild->pLimit); - pChild->pLimit = pNode->pLimit; - pNode->pLimit = NULL; + if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) { + SLimitNode* pLimitNode = (SLimitNode*)pNode->pLimit; + int64_t maxRows = -1; + if (pLimitNode->limit != -1) { + maxRows = pLimitNode->limit; + if (pLimitNode->offset != -1) maxRows += pLimitNode->offset; + } + ((SSortLogicNode*)pChild)->maxRows = maxRows; + } else { + pChild->pLimit = pNode->pLimit; + pNode->pLimit = NULL; + } pCxt->optimized = true; return TSDB_CODE_SUCCESS; @@ -2898,7 +2917,7 @@ static SSortLogicNode* sortNonPriKeySatisfied(SLogicNode* pNode) { if (sortPriKeyOptIsPriKeyOrderBy(pSort->pSortKeys)) { return NULL; } - SNode* pSortKeyNode = NULL, *pSortKeyExpr = NULL; + SNode *pSortKeyNode = NULL, *pSortKeyExpr = NULL; FOREACH(pSortKeyNode, pSort->pSortKeys) { pSortKeyExpr = ((SOrderByExprNode*)pSortKeyNode)->pExpr; switch (nodeType(pSortKeyExpr)) { @@ -2931,7 +2950,7 @@ static int32_t sortNonPriKeyOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog optFindEligibleNode(pLogicSubplan->pNode, sortNonPriKeyShouldOptimize, pNodeList); SNode* pNode = NULL; FOREACH(pNode, pNodeList) { - SSortLogicNode* pSort = (SSortLogicNode*)pNode; + SSortLogicNode* pSort = (SSortLogicNode*)pNode; SOrderByExprNode* pOrderByExpr = (SOrderByExprNode*)nodesListGetNode(pSort->pSortKeys, 0); pSort->node.outputTsOrder = pOrderByExpr->order; optSetParentOrder(pSort->node.pParent, pOrderByExpr->order, NULL); diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index b3d94a5e47..a349e2c0e9 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -1374,6 +1374,7 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren if (NULL == pSort) { return TSDB_CODE_OUT_OF_MEMORY; } + pSort->maxRows = pSortLogicNode->maxRows; SNodeList* pPrecalcExprs = NULL; SNodeList* pSortKeys = NULL; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 246ee13fb0..f352a2bba3 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -1018,6 +1018,7 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut splSetParent((SLogicNode*)pPartSort); pPartSort->pSortKeys = pSortKeys; pPartSort->groupSort = pSort->groupSort; + pPartSort->maxRows = pSort->maxRows; code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys); } diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index 794d80bbfa..92f34db16d 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -482,6 +482,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) { sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index); + taosMsleep(1); goto _out; } ASSERT(pEntry->index == pBuf->matchIndex); diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index cf796c3862..ae1c775a18 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -364,10 +364,10 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64 if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) { pSyncNode->hbSlowNum++; - sNInfo(pSyncNode, - "recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 - ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64, - DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff); + sNTrace(pSyncNode, + "recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 + ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64, + DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff); } sNTrace(pSyncNode, diff --git a/source/util/src/theap.c b/source/util/src/theap.c index 8c1a1db057..d60606008f 100644 --- a/source/util/src/theap.c +++ b/source/util/src/theap.c @@ -187,3 +187,172 @@ void heapRemove(Heap* heap, HeapNode* node) { } void heapDequeue(Heap* heap) { heapRemove(heap, heap->min); } + + +struct PriorityQueue { + SArray* container; + pq_comp_fn fn; + FDelete deleteFn; + void* param; +}; +PriorityQueue* createPriorityQueue(pq_comp_fn fn, FDelete deleteFn, void* param) { + PriorityQueue* pq = (PriorityQueue*)taosMemoryCalloc(1, sizeof(PriorityQueue)); + pq->container = taosArrayInit(1, sizeof(PriorityQueueNode)); + pq->fn = fn; + pq->deleteFn = deleteFn; + pq->param = param; + return pq; +} + +void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn) { + pq->fn = fn; +} + +void destroyPriorityQueue(PriorityQueue* pq) { + if (pq->deleteFn) + taosArrayDestroyP(pq->container, pq->deleteFn); + else + taosArrayDestroy(pq->container); + taosMemoryFree(pq); +} + +static size_t pqParent(size_t i) { return (--i) >> 1; /* (i - 1) / 2 */ } +static size_t pqLeft(size_t i) { return (i << 1) | 1; /* i * 2 + 1 */ } +static size_t pqRight(size_t i) { return (++i) << 1; /* (i + 1) * 2 */} +static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) { + void * tmp = a->data; + a->data = b->data; + b->data = tmp; +} + +#define pqContainerGetEle(pq, i) ((PriorityQueueNode*)taosArrayGet((pq)->container, (i))) +#define pqContainerSize(pq) (taosArrayGetSize((pq)->container)) + +size_t taosPQSize(PriorityQueue* pq) { return pqContainerSize(pq); } + +static void pqHeapify(PriorityQueue* pq, size_t from, size_t last) { + size_t largest = from; + do { + from = largest; + size_t l = pqLeft(from); + size_t r = pqRight(from); + if (l < last && pq->fn(pqContainerGetEle(pq, from)->data, pqContainerGetEle(pq, l)->data, pq->param)) { + largest = l; + } + if (r < last && pq->fn(pqContainerGetEle(pq, largest)->data, pqContainerGetEle(pq, r)->data, pq->param)) { + largest = r; + } + if (largest != from) { + pqSwapPQNode(pqContainerGetEle(pq, from), pqContainerGetEle(pq, largest)); + } + } while (largest != from); +} + +static void pqBuildHeap(PriorityQueue* pq) { + if (pqContainerSize(pq) > 1) { + for (size_t i = pqContainerSize(pq) - 1; i > 0; --i) { + pqHeapify(pq, i, pqContainerSize(pq)); + } + pqHeapify(pq, 0, pqContainerSize(pq)); + } +} + +static void pqReverseHeapify(PriorityQueue* pq, size_t i) { + while (i > 0 && !pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) { + size_t parentIdx = pqParent(i); + pqSwapPQNode(pqContainerGetEle(pq, i), pqContainerGetEle(pq, parentIdx)); + i = parentIdx; + } +} + +static void pqUpdate(PriorityQueue* pq, size_t i) { + if (i == 0 || pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) { + // if value in pos i is smaller than parent, heapify down from i to the end + pqHeapify(pq, i, pqContainerSize(pq)); + } else { + // if value in pos i is big than parent, heapify up from i + pqReverseHeapify(pq, i); + } +} + +static void pqRemove(PriorityQueue* pq, size_t i) { + if (i == pqContainerSize(pq) - 1) { + taosArrayPop(pq->container); + return; + } + + taosArraySet(pq->container, i, taosArrayGet(pq->container, pqContainerSize(pq) - 1)); + taosArrayPop(pq->container); + pqUpdate(pq, i); +} + +PriorityQueueNode* taosPQTop(PriorityQueue* pq) { + return pqContainerGetEle(pq, 0); +} + +void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node) { + taosArrayPush(pq->container, node); + pqReverseHeapify(pq, pqContainerSize(pq) - 1); +} + +void taosPQPop(PriorityQueue* pq) { + PriorityQueueNode* top = taosPQTop(pq); + if (pq->deleteFn) pq->deleteFn(top->data); + pqRemove(pq, 0); +} + +struct BoundedQueue { + PriorityQueue* queue; + uint32_t maxSize; +}; + +BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete deleteFn, void* param) { + BoundedQueue* q = (BoundedQueue*)taosMemoryCalloc(1, sizeof(BoundedQueue)); + q->queue = createPriorityQueue(fn, deleteFn, param); + taosArrayEnsureCap(q->queue->container, maxSize + 1); + q->maxSize = maxSize; + return q; +} + +void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn) { + taosPQSetFn(q->queue, fn); +} + +void destroyBoundedQueue(BoundedQueue* q) { + if (!q) return; + destroyPriorityQueue(q->queue); + taosMemoryFree(q); +} + +void taosBQPush(BoundedQueue* q, PriorityQueueNode* n) { + if (pqContainerSize(q->queue) == q->maxSize + 1) { + PriorityQueueNode* top = pqContainerGetEle(q->queue, 0); + void *p = top->data; + top->data = n->data; + n->data = p; + if (q->queue->deleteFn) q->queue->deleteFn(n->data); + pqHeapify(q->queue, 0, taosBQSize(q)); + } else { + taosPQPush(q->queue, n); + } +} + +PriorityQueueNode* taosBQTop(BoundedQueue* q) { + return taosPQTop(q->queue); +} + +void taosBQBuildHeap(BoundedQueue *q) { + pqBuildHeap(q->queue); +} + +size_t taosBQMaxSize(BoundedQueue* q) { + return q->maxSize; +} + +size_t taosBQSize(BoundedQueue* q) { + return taosPQSize(q->queue); +} + +void taosBQPop(BoundedQueue* q) { + taosPQPop(q->queue); +} diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index f561bd4ed7..90a7f3fe42 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -33,6 +33,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb3.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py ,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py diff --git a/tests/script/tsim/parser/limit1_stb.sim b/tests/script/tsim/parser/limit1_stb.sim index 731a218de5..027a4f5c79 100644 --- a/tests/script/tsim/parser/limit1_stb.sim +++ b/tests/script/tsim/parser/limit1_stb.sim @@ -468,7 +468,7 @@ if $data01 != 1 then endi ## supertable aggregation + where + interval + group by order by tag + limit offset -sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9),t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 2 offset 0 +sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9),t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 2 offset 0 if $rows != 2 then return -1 endi diff --git a/tests/script/tsim/parser/limit_stb.sim b/tests/script/tsim/parser/limit_stb.sim index 6950df9ee1..46bd6260c3 100644 --- a/tests/script/tsim/parser/limit_stb.sim +++ b/tests/script/tsim/parser/limit_stb.sim @@ -508,7 +508,7 @@ endi ### supertable aggregation + where + interval + group by order by tag + limit offset ## TBASE-345 -sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 3 offset 0 +sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 3 offset 0 if $rows != 3 then return -1 endi @@ -554,7 +554,7 @@ if $data09 != 4 then return -1 endi -sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 and c1 > 0 and c2 < 9 and c3 > 4 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 3 offset 0 +sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 and c1 > 0 and c2 < 9 and c3 > 4 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 3 offset 0 if $rows != 3 then return -1 endi diff --git a/tests/script/tsim/parser/union.sim b/tests/script/tsim/parser/union.sim index dee5da96e8..f0c534ad11 100644 --- a/tests/script/tsim/parser/union.sim +++ b/tests/script/tsim/parser/union.sim @@ -126,7 +126,6 @@ endi if $data10 != 1 then return -1 endi - sql (select 'ab' as options from union_tb1 limit 1) union all (select 'dd' as options from union_tb0 limit 1) order by options; if $rows != 2 then return -1 diff --git a/tests/script/tsim/query/r/explain_tsorder.result b/tests/script/tsim/query/r/explain_tsorder.result index 6c63a343de..b69a77ada5 100644 --- a/tests/script/tsim/query/r/explain_tsorder.result +++ b/tests/script/tsim/query/r/explain_tsorder.result @@ -2558,3 +2558,243 @@ taos> select a.ts, a.c2, b.c2 from meters as a join (select * from meters order 2022-05-24 00:01:08.000 | 210 | 210 | 2022-05-24 00:01:08.000 | 210 | 210 | +taos> select ts, c2 from meters order by c2; + ts | c2 | +======================================== + 2022-05-21 00:01:08.000 | 11 | + 2022-05-21 00:01:08.000 | 11 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-24 00:01:08.000 | 210 | + 2022-05-24 00:01:08.000 | 210 | + 2022-05-15 00:01:08.000 | 234 | + 2022-05-15 00:01:08.000 | 234 | + 2022-05-19 00:01:08.000 | 243 | + 2022-05-19 00:01:08.000 | 243 | + +taos> select ts, c2 from meters order by c2 limit 4; + ts | c2 | +======================================== + 2022-05-21 00:01:08.000 | 11 | + 2022-05-21 00:01:08.000 | 11 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-18 00:01:08.000 | 58 | + +taos> select ts, c2 from meters order by c2 limit 2,2; + ts | c2 | +======================================== + 2022-05-18 00:01:08.000 | 58 | + 2022-05-18 00:01:08.000 | 58 | + +taos> select ts, c2 from meters order by ts asc, c2 desc limit 10; + ts | c2 | +======================================== + 2022-05-15 00:01:08.000 | 234 | + 2022-05-15 00:01:08.000 | 234 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-19 00:01:08.000 | 243 | + 2022-05-19 00:01:08.000 | 243 | + +taos> select ts, c2 from meters order by ts asc, c2 desc limit 5,5; + ts | c2 | +======================================== + 2022-05-17 00:01:08.000 | 59 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-19 00:01:08.000 | 243 | + 2022-05-19 00:01:08.000 | 243 | + +taos> select ts, c2 from d1 order by c2; + ts | c2 | +======================================== + 2022-05-21 00:01:08.000 | 11 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-24 00:01:08.000 | 210 | + 2022-05-15 00:01:08.000 | 234 | + 2022-05-19 00:01:08.000 | 243 | + +taos> select ts, c2 from d1 order by c2 limit 4; + ts | c2 | +======================================== + 2022-05-21 00:01:08.000 | 11 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-23 00:01:08.000 | 116 | + +taos> select ts, c2 from d1 order by c2 limit 2,2; + ts | c2 | +======================================== + 2022-05-17 00:01:08.000 | 59 | + 2022-05-23 00:01:08.000 | 116 | + +taos> select ts, c2 from d1 order by ts asc, c2 desc limit 10; + ts | c2 | +======================================== + 2022-05-15 00:01:08.000 | 234 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-19 00:01:08.000 | 243 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-21 00:01:08.000 | 11 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-24 00:01:08.000 | 210 | + +taos> select ts, c2 from d1 order by ts asc, c2 desc limit 5,5; + ts | c2 | +======================================== + 2022-05-20 00:01:08.000 | 120 | + 2022-05-21 00:01:08.000 | 11 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-24 00:01:08.000 | 210 | + +taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc; + _wstart | d | avg(c) | +================================================================================ + 2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 | + 2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 | + 2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 | + 2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 | + 2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 | + 2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 | + 2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 | + 2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 | + 2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000 | + 2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000 | + 2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.600000000 | + 2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000 | + 2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000 | + 2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.600000000 | + 2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000 | + 2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000 | + 2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000 | + 2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000 | + 2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.800000000 | + 2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000 | + 2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000 | + 2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000 | + 2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.800000000 | + 2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000 | + 2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.400000000 | + 2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000 | + 2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000 | + 2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000 | + 2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000 | + +taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2; + _wstart | d | avg(c) | +================================================================================ + 2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 | + 2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 | + +taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6; + _wstart | d | avg(c) | +================================================================================ + 2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 | + 2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 | + 2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 | + 2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 | + 2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 | + 2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 | + +taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10; + last(ts) | d | +======================================== + 2022-05-19 00:01:08.000 | 243 | + 2022-05-15 00:01:08.000 | 234 | + 2022-05-24 00:01:08.000 | 210 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-21 00:01:08.000 | 11 | + +taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 2,8; + last(ts) | d | +======================================== + 2022-05-24 00:01:08.000 | 210 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-21 00:01:08.000 | 11 | + +taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 9,1; + last(ts) | d | +======================================== + 2022-05-21 00:01:08.000 | 11 | + +taos> select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 2,8; + last(ts) | d | +======================================== + 2022-05-17 00:01:08.000 | 59 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-24 00:01:08.000 | 210 | + 2022-05-15 00:01:08.000 | 234 | + 2022-05-19 00:01:08.000 | 243 | + +taos> select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 9,1; + last(ts) | d | +======================================== + 2022-05-19 00:01:08.000 | 243 | + +taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 10; + ts | d | +======================================== + 2022-05-24 00:01:08.000 | 210 | + 2022-05-23 00:01:08.000 | 116 | + 2022-05-22 00:01:08.000 | 196 | + 2022-05-21 00:01:08.000 | 11 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-19 00:01:08.000 | 243 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-15 00:01:08.000 | 234 | + +taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 2,8; + ts | d | +======================================== + 2022-05-22 00:01:08.000 | 196 | + 2022-05-21 00:01:08.000 | 11 | + 2022-05-20 00:01:08.000 | 120 | + 2022-05-19 00:01:08.000 | 243 | + 2022-05-18 00:01:08.000 | 58 | + 2022-05-17 00:01:08.000 | 59 | + 2022-05-16 00:01:08.000 | 136 | + 2022-05-15 00:01:08.000 | 234 | + +taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 9,1; + ts | d | +======================================== + 2022-05-15 00:01:08.000 | 234 | + diff --git a/tests/script/tsim/query/t/explain_tsorder.sql b/tests/script/tsim/query/t/explain_tsorder.sql index d3264d8895..056ac440fe 100644 --- a/tests/script/tsim/query/t/explain_tsorder.sql +++ b/tests/script/tsim/query/t/explain_tsorder.sql @@ -71,3 +71,30 @@ select a.ts, a.c2, b.c2 from meters as a join meters as b on a.ts = b.ts order b explain verbose true select a.ts, a.c2, b.c2 from meters as a join (select ts, c2 from meters order by ts desc) b on a.ts = b.ts order by a.ts desc\G; explain verbose true select a.ts, a.c2, b.c2 from meters as a join (select ts, c2 from meters order by ts desc) b on a.ts = b.ts order by a.ts asc\G; select a.ts, a.c2, b.c2 from meters as a join (select * from meters order by ts desc) b on a.ts = b.ts order by a.ts asc; + +select ts, c2 from meters order by c2; +select ts, c2 from meters order by c2 limit 4; +select ts, c2 from meters order by c2 limit 2,2; + +select ts, c2 from meters order by ts asc, c2 desc limit 10; +select ts, c2 from meters order by ts asc, c2 desc limit 5,5; + +select ts, c2 from d1 order by c2; +select ts, c2 from d1 order by c2 limit 4; +select ts, c2 from d1 order by c2 limit 2,2; + +select ts, c2 from d1 order by ts asc, c2 desc limit 10; +select ts, c2 from d1 order by ts asc, c2 desc limit 5,5; + +select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc; +select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2; +select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6; + +select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10; +select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 2,8; +select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 9,1; +select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 2,8; +select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 9,1; +select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 10; +select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 2,8; +select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 9,1; diff --git a/tests/system-test/0-others/splitVGroup.py b/tests/system-test/0-others/splitVGroup.py index 32001f34b0..4509961066 100644 --- a/tests/system-test/0-others/splitVGroup.py +++ b/tests/system-test/0-others/splitVGroup.py @@ -328,9 +328,28 @@ class TDTestCase: tdLog.exit("split vgroup transaction is not finished after executing 50s") return False + # split error + def expectSplitError(self, dbName): + vgids = self.getVGroup(dbName) + selid = random.choice(vgids) + sql = f"split vgroup {selid}" + tdLog.info(sql) + tdSql.error(sql) + + # expect split ok + def expectSplitOk(self, dbName): + # split vgroup + vgList1 = self.getVGroup(dbName) + self.splitVGroup(dbName) + vgList2 = self.getVGroup(dbName) + vgNum1 = len(vgList1) + 1 + vgNum2 = len(vgList2) + if vgNum1 != vgNum2: + tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}") + return + # split empty database - def splitEmptyDB(self): - + def splitEmptyDB(self): dbName = "emptydb" vgNum = 2 # create database @@ -339,17 +358,33 @@ class TDTestCase: tdSql.execute(sql) # split vgroup - self.splitVGroup(dbName) - vgList = self.getVGroup(dbName) - vgNum1 = len(vgList) - vgNum2 = vgNum + 1 - if vgNum1 != vgNum2: - tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}") - return + self.expectSplitOk(dbName) + + + # forbid + def checkForbid(self): + # stream + tdLog.info("check forbid split having stream...") + tdSql.execute("create database streamdb;") + tdSql.execute("use streamdb;") + tdSql.execute("create table ta(ts timestamp, age int);") + tdSql.execute("create stream ma into sta as select count(*) from ta interval(1s);") + self.expectSplitError("streamdb") + tdSql.execute("drop stream ma;") + self.expectSplitOk("streamdb") + + # topic + tdLog.info("check forbid split having topic...") + tdSql.execute("create database topicdb wal_retention_period 10;") + tdSql.execute("use topicdb;") + tdSql.execute("create table ta(ts timestamp, age int);") + tdSql.execute("create topic toa as select * from ta;") + self.expectSplitError("topicdb") + tdSql.execute("drop topic toa;") + self.expectSplitOk("topicdb") # run def run(self): - # prepare env self.prepareEnv() @@ -360,12 +395,13 @@ class TDTestCase: # check two db query result same self.checkResult() - tdLog.info(f"split vgroup i={i} passed.") # split empty db - self.splitEmptyDB() + self.splitEmptyDB() + # check topic and stream forib + self.checkForbid() # stop def stop(self): diff --git a/tests/system-test/2-query/limit.py b/tests/system-test/2-query/limit.py index c00e3b7d56..4774602d69 100644 --- a/tests/system-test/2-query/limit.py +++ b/tests/system-test/2-query/limit.py @@ -321,7 +321,7 @@ class TDTestCase: limit = 5 offset = paraDict["rowsPerTbl"] * 2 offset = offset - 2 - sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1 limit %d offset %d"%(limit, offset) + sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1, max(c1) limit %d offset %d"%(limit, offset) # tdLog.info("====sql:%s"%(sqlStr)) tdSql.query(sqlStr) tdSql.checkRows(1) diff --git a/tests/system-test/eco-system/util/Consumer.py b/tests/system-test/eco-system/util/Consumer.py new file mode 100644 index 0000000000..b483253a95 --- /dev/null +++ b/tests/system-test/eco-system/util/Consumer.py @@ -0,0 +1,82 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +# +# The option for wal_retetion_period and wal_retention_size is work well +# + +import taos +from taos.tmq import Consumer + +import os +import sys +import threading +import json +import time +from datetime import date +from datetime import datetime +from datetime import timedelta +from os import path + + +# consume topic +def consume_topic(topic_name, consume_cnt, wait): + print("start consume...") + consumer = Consumer( + { + "group.id": "tg2", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "enable.auto.commit": "true", + } + ) + print("start subscrite...") + consumer.subscribe([topic_name]) + + cnt = 0 + try: + while True and cnt < consume_cnt: + res = consumer.poll(1) + if not res: + if wait: + continue + else: + break + err = res.error() + if err is not None: + raise err + val = res.value() + cnt += 1 + print(f" consume {cnt} ") + for block in val: + print(block.fetchall()) + finally: + consumer.unsubscribe() + consumer.close() + + +if __name__ == "__main__": + print(sys.argv) + if len(sys.argv) < 2: + + print(" please input topic name for consume . -c for wait") + else: + wait = False + if "-c" == sys.argv[1]: + wait = True + topic = sys.argv[2] + else: + topic = sys.argv[1] + + print(f' wait={wait} topic={topic}') + consume_topic(topic, 10000000, wait) \ No newline at end of file diff --git a/tests/system-test/eco-system/util/restartDnodes.py b/tests/system-test/eco-system/util/restartDnodes.py new file mode 100644 index 0000000000..feee260fdf --- /dev/null +++ b/tests/system-test/eco-system/util/restartDnodes.py @@ -0,0 +1,84 @@ +import time +import os +import subprocess +import random +import platform + +class dnode(): + def __init__(self, pid, path): + self.pid = pid + self.path = path + +# run exePath no wait finished +def runNoWait(exePath): + if platform.system().lower() == 'windows': + cmd = f"mintty -h never {exePath}" + else: + cmd = f"nohup {exePath} > /dev/null 2>&1 & " + + if os.system(cmd) != 0: + return False + else: + return True + +# get online dnodes +def getDnodes(): + cmd = "ps aux | grep taosd | awk '{{print $2,$11,$12,$13}}'" + result = os.system(cmd) + result=subprocess.check_output(cmd,shell=True) + strout = result.decode('utf-8').split("\n") + dnodes = [] + + for line in strout: + cols = line.split(' ') + if len(cols) != 4: + continue + exepath = cols[1] + if len(exepath) < 5 : + continue + if exepath[-5:] != 'taosd': + continue + + # add to list + path = cols[1] + " " + cols[2] + " " + cols[3] + dnodes.append(dnode(cols[0], path)) + + print(" show dnodes cnt=%d...\n"%(len(dnodes))) + for dn in dnodes: + print(f" pid={dn.pid} path={dn.path}") + + return dnodes + +def restartDnodes(dnodes, cnt, seconds): + print(f"start dnode cnt={cnt} wait={seconds}s") + selects = random.sample(dnodes, cnt) + for select in selects: + print(f" kill -9 {select.pid}") + cmd = f"kill -9 {select.pid}" + os.system(cmd) + print(f" restart {select.path}") + if runNoWait(select.path) == False: + print(f"run {select.path} failed.") + raise Exception("exe failed.") + print(f" sleep {seconds}s ...") + time.sleep(seconds) + +def run(): + # kill seconds interval + killLoop = 10 + minKill = 1 + maxKill = 10 + for i in range(killLoop): + dnodes = getDnodes() + killCnt = 0 + if len(dnodes) > 0: + killCnt = random.randint(1, len(dnodes)) + restartDnodes(dnodes, killCnt, random.randint(1, 5)) + + seconds = random.randint(minKill, maxKill) + print(f"----------- kill loop i={i} killCnt={killCnt} done. do sleep {seconds}s ... \n") + time.sleep(seconds) + + +if __name__ == '__main__': + run() \ No newline at end of file