Merge branch '3.0' into refact/fillhistory

This commit is contained in:
Haojun Liao 2023-06-28 13:48:26 +08:00 committed by GitHub
commit ed900d6ff6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
32 changed files with 1028 additions and 57 deletions

View File

@ -187,6 +187,9 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex);
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows);
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx);
void colDataTrim(SColumnInfoData* pColumnInfoData);
size_t blockDataGetNumOfCols(const SSDataBlock* pBlock);
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock);

View File

@ -184,6 +184,7 @@ extern int64_t tsStreamBufferSize;
extern int64_t tsCheckpointInterval;
extern bool tsFilterScalarMode;
extern int32_t tsMaxStreamBackendCache;
extern int32_t tsPQSortMemThreshold;
// #define NEEDTO_COMPRESSS_MSG(size) (tsCompressMsgSize != -1 && (size) > tsCompressMsgSize)

View File

@ -246,6 +246,7 @@ typedef struct SSortLogicNode {
SLogicNode node;
SNodeList* pSortKeys;
bool groupSort;
int64_t maxRows;
} SSortLogicNode;
typedef struct SPartitionLogicNode {
@ -523,6 +524,7 @@ typedef struct SSortPhysiNode {
SNodeList* pExprs; // these are expression list of order_by_clause and parameter expression of aggregate function
SNodeList* pSortKeys; // element is SOrderByExprNode, and SOrderByExprNode::pExpr is SColumnNode
SNodeList* pTargets;
int64_t maxRows;
} SSortPhysiNode;
typedef SSortPhysiNode SGroupSortPhysiNode;

View File

@ -17,6 +17,7 @@
#define _TD_UTIL_HEAP_H_
#include "os.h"
#include "tarray.h"
#ifdef __cplusplus
extern "C" {
@ -58,6 +59,48 @@ void heapDequeue(Heap* heap);
size_t heapSize(Heap* heap);
typedef bool (*pq_comp_fn)(void* l, void* r, void* param);
typedef struct PriorityQueueNode {
void* data;
} PriorityQueueNode;
typedef struct PriorityQueue PriorityQueue;
PriorityQueue* createPriorityQueue(pq_comp_fn fn, FDelete deleteFn, void* param);
void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn);
void destroyPriorityQueue(PriorityQueue* pq);
PriorityQueueNode* taosPQTop(PriorityQueue* pq);
size_t taosPQSize(PriorityQueue* pq);
void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node);
void taosPQPop(PriorityQueue* pq);
typedef struct BoundedQueue BoundedQueue;
BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete deleteFn, void* param);
void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn);
void destroyBoundedQueue(BoundedQueue* q);
void taosBQPush(BoundedQueue* q, PriorityQueueNode* n);
PriorityQueueNode* taosBQTop(BoundedQueue* q);
size_t taosBQSize(BoundedQueue* q);
size_t taosBQMaxSize(BoundedQueue* q);
void taosBQBuildHeap(BoundedQueue* q);
void taosBQPop(BoundedQueue* q);
#ifdef __cplusplus
}
#endif

View File

@ -47,7 +47,17 @@ int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRo
}
}
static int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
int32_t colDataGetRowLength(const SColumnInfoData* pColumnInfoData, int32_t rowIdx) {
if (colDataIsNull_s(pColumnInfoData, rowIdx)) return 0;
if (!IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) return pColumnInfoData->info.bytes;
if (pColumnInfoData->info.type == TSDB_DATA_TYPE_JSON)
return getJsonValueLen(colDataGetData(pColumnInfoData, rowIdx));
else
return varDataTLen(colDataGetData(pColumnInfoData, rowIdx));
}
int32_t colDataGetFullLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
return pColumnInfoData->varmeta.length + sizeof(int32_t) * numOfRows;
} else {

View File

@ -62,6 +62,7 @@ int32_t tsNumOfQnodeFetchThreads = 1;
int32_t tsNumOfSnodeStreamThreads = 4;
int32_t tsNumOfSnodeWriteThreads = 1;
int32_t tsMaxStreamBackendCache = 128; // M
int32_t tsPQSortMemThreshold = 16; // M
// sync raft
int32_t tsElectInterval = 25 * 1000;
@ -533,6 +534,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddBool(pCfg, "filterScalarMode", tsFilterScalarMode, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamBackendCache", tsMaxStreamBackendCache, 16, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "pqSortMemThreshold", tsPQSortMemThreshold, 1, 10240, 0) != 0) return -1;
GRANT_CFG_ADD;
return 0;
@ -914,6 +916,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsFilterScalarMode = cfgGetItem(pCfg, "filterScalarMode")->bval;
tsMaxStreamBackendCache = cfgGetItem(pCfg, "maxStreamBackendCache")->i32;
tsPQSortMemThreshold = cfgGetItem(pCfg, "pqSortMemThreshold")->i32;
GRANT_CFG_GET;
return 0;

View File

@ -79,6 +79,7 @@ typedef struct SColMatchItem {
int32_t srcSlotId;
int32_t dstSlotId;
bool needOutput;
SDataType dataType;
} SColMatchItem;
typedef struct SColMatchInfo {

View File

@ -64,10 +64,14 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void*
/**
*
* @param type
* @param maxRows keep maxRows at most
* @param maxTupleLength max len of one tuple, for check if heap sort is applicable
* @param sortBufSize sort memory buf size, for check if heap sort is applicable
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr);
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
uint32_t sortBufSize);
/**
*

View File

@ -1305,6 +1305,7 @@ int32_t extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
c.colId = pColNode->colId;
c.srcSlotId = pColNode->slotId;
c.dstSlotId = pNode->slotId;
c.dataType = pColNode->node.resType;
taosArrayPush(pList, &c);
}
}

View File

@ -2884,7 +2884,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) {
pInfo->sortBufSize = pInfo->bufPageSize * (kWay + 1);
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pSortInputBlock, pTaskInfo->id.str);
pInfo->pSortInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetFetchRawDataFp(pInfo->pSortHandle, getTableDataBlockImpl, NULL, NULL);

View File

@ -29,6 +29,8 @@ typedef struct SSortOperatorInfo {
int64_t startTs; // sort start time
uint64_t sortElapsed; // sort elapsed time, time to flush to disk not included.
SLimitInfo limitInfo;
uint64_t maxTupleLength;
int64_t maxRows;
} SSortOperatorInfo;
static SSDataBlock* doSort(SOperatorInfo* pOperator);
@ -36,6 +38,7 @@ static int32_t doOpenSortOperator(SOperatorInfo* pOperator);
static int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t* len);
static void destroySortOperatorInfo(void* param);
static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys);
// todo add limit/offset impl
SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo) {
@ -51,6 +54,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
int32_t numOfCols = 0;
pOperator->exprSupp.pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols);
pOperator->exprSupp.numOfExprs = numOfCols;
calcSortOperMaxTupleLength(pInfo, pSortNode->pSortKeys);
pInfo->maxRows = pSortNode->maxRows;
int32_t numOfOutputCols = 0;
int32_t code =
@ -193,9 +198,9 @@ int32_t doOpenSortOperator(SOperatorInfo* pOperator) {
}
pInfo->startTs = taosGetTimestampUs();
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str);
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str,
pInfo->maxRows, pInfo->maxTupleLength, tsPQSortMemThreshold * 1024 * 1024);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, applyScalarFunction, pOperator);
@ -286,6 +291,20 @@ int32_t getExplainExecInfo(SOperatorInfo* pOptr, void** pOptrExplain, uint32_t*
return TSDB_CODE_SUCCESS;
}
static int32_t calcSortOperMaxTupleLength(SSortOperatorInfo* pSortOperInfo, SNodeList* pSortKeys) {
SColMatchInfo* pColItem = &pSortOperInfo->matchInfo;
size_t size = taosArrayGetSize(pColItem->pList);
for (size_t i = 0; i < size; ++i) {
pSortOperInfo->maxTupleLength += ((SColMatchItem*)taosArrayGet(pColItem->pList, i))->dataType.bytes;
}
size = LIST_LENGTH(pSortKeys);
for (size_t i = 0; i < size; ++i) {
SOrderByExprNode* pOrderExprNode = (SOrderByExprNode*)nodesListGetNode(pSortKeys, i);
pSortOperInfo->maxTupleLength += ((SColumnNode*)pOrderExprNode->pExpr)->node.resType.bytes;
}
return TSDB_CODE_SUCCESS;
}
//=====================================================================================
// Group Sort Operator
typedef enum EChildOperatorStatus { CHILD_OP_NEW_GROUP, CHILD_OP_SAME_GROUP, CHILD_OP_FINISHED } EChildOperatorStatus;
@ -384,7 +403,7 @@ int32_t beginSortGroup(SOperatorInfo* pOperator) {
// pInfo->binfo.pRes is not equalled to the input datablock.
pInfo->pCurrSortHandle =
tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str);
tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, -1, -1, NULL, pTaskInfo->id.str, 0, 0, 0);
tsortSetFetchRawDataFp(pInfo->pCurrSortHandle, fetchNextGroupSortDataBlock, applyScalarFunction, pOperator);
@ -582,7 +601,7 @@ int32_t openMultiwayMergeOperator(SOperatorInfo* pOperator) {
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, numOfBufPage,
pInfo->pInputBlock, pTaskInfo->id.str);
pInfo->pInputBlock, pTaskInfo->id.str, 0, 0, 0);
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
tsortSetCompareGroupId(pInfo->pSortHandle, pInfo->groupSort);

View File

@ -19,6 +19,7 @@
#include "tcompare.h"
#include "tdatablock.h"
#include "tdef.h"
#include "theap.h"
#include "tlosertree.h"
#include "tpagedbuf.h"
#include "tsort.h"
@ -41,6 +42,12 @@ struct SSortHandle {
int64_t startTs;
uint64_t totalElapsed;
uint64_t maxRows;
uint32_t maxTupleLength;
uint32_t sortBufSize;
BoundedQueue* pBoundedQueue;
uint32_t tmpRowIdx;
int32_t sourceId;
SSDataBlock* pDataBlock;
SMsortComparParam cmpParam;
@ -61,6 +68,47 @@ struct SSortHandle {
static int32_t msortComparFn(const void* pLeft, const void* pRight, void* param);
// | offset[0] | offset[1] |....| nullbitmap | data |...|
static void* createTuple(uint32_t columnNum, uint32_t tupleLen) {
uint32_t totalLen = sizeof(uint32_t) * columnNum + BitmapLen(columnNum) + tupleLen;
return taosMemoryCalloc(1, totalLen);
}
static void destoryTuple(void* t) { taosMemoryFree(t); }
#define tupleOffset(tuple, colIdx) ((uint32_t*)(tuple + sizeof(uint32_t) * colIdx))
#define tupleSetOffset(tuple, colIdx, offset) (*tupleOffset(tuple, colIdx) = offset)
#define tupleSetNull(tuple, colIdx, colNum) colDataSetNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
#define tupleColIsNull(tuple, colIdx, colNum) colDataIsNull_f((char*)tuple + sizeof(uint32_t) * colNum, colIdx)
#define tupleGetDataStartOffset(colNum) (sizeof(uint32_t) * colNum + BitmapLen(colNum))
#define tupleSetData(tuple, offset, data, length) memcpy(tuple + offset, data, length)
/**
* @param t the tuple pointer addr, if realloced, *t is changed to the new addr
* @param offset copy data into pTuple start from offset
* @param colIndex the columnIndex, for setting null bitmap
* @return the next offset to add field
* */
static inline size_t tupleAddField(char** t, uint32_t colNum, uint32_t offset, uint32_t colIdx, void* data, size_t length,
bool isNull, uint32_t tupleLen) {
tupleSetOffset(*t, colIdx, offset);
if (isNull) {
tupleSetNull(*t, colIdx, colNum);
} else {
if (offset + length > tupleLen + tupleGetDataStartOffset(colNum)) {
*t = taosMemoryRealloc(*t, offset + length);
}
tupleSetData(*t, offset, data, length);
}
return offset + length;
}
static void* tupleGetField(char* t, uint32_t colIdx, uint32_t colNum) {
if (tupleColIsNull(t, colIdx, colNum)) return NULL;
return t + *tupleOffset(t, colIdx);
}
static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param);
SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
return createOneDataBlock(pSortHandle->pDataBlock, false);
}
@ -71,7 +119,8 @@ SSDataBlock* tsortGetSortedDataBlock(const SSortHandle* pSortHandle) {
* @return
*/
SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages,
SSDataBlock* pBlock, const char* idstr) {
SSDataBlock* pBlock, const char* idstr, uint64_t maxRows, uint32_t maxTupleLength,
uint32_t sortBufSize) {
SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle));
pSortHandle->type = type;
@ -80,6 +129,13 @@ SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t page
pSortHandle->pSortInfo = pSortInfo;
pSortHandle->loops = 0;
pSortHandle->maxTupleLength = maxTupleLength;
if (maxRows < 0)
pSortHandle->sortBufSize = 0;
else
pSortHandle->sortBufSize = sortBufSize;
pSortHandle->maxRows = maxRows;
if (pBlock != NULL) {
pSortHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
@ -150,7 +206,6 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
if (pSortHandle == NULL) {
return;
}
tsortClose(pSortHandle);
if (pSortHandle->pMergeTree != NULL) {
tMergeTreeDestroy(&pSortHandle->pMergeTree);
@ -159,6 +214,7 @@ void tsortDestroySortHandle(SSortHandle* pSortHandle) {
destroyDiskbasedBuf(pSortHandle->pBuf);
taosMemoryFreeClear(pSortHandle->idStr);
blockDataDestroy(pSortHandle->pDataBlock);
if (pSortHandle->pBoundedQueue) destroyBoundedQueue(pSortHandle->pBoundedQueue);
int64_t fetchUs = 0, fetchNum = 0;
tsortClearOrderdSource(pSortHandle->pOrderedSource, &fetchUs, &fetchNum);
@ -769,17 +825,7 @@ static int32_t createInitialSources(SSortHandle* pHandle) {
return code;
}
int32_t tsortOpen(SSortHandle* pHandle) {
if (pHandle->opened) {
return 0;
}
if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
return -1;
}
pHandle->opened = true;
static bool tsortOpenForBufMergeSort(SSortHandle* pHandle) {
int32_t code = createInitialSources(pHandle);
if (code != TSDB_CODE_SUCCESS) {
return code;
@ -840,7 +886,7 @@ int32_t tsortSetCompareGroupId(SSortHandle* pHandle, bool compareGroupId) {
return TSDB_CODE_SUCCESS;
}
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
static STupleHandle* tsortBufMergeSortNextTuple(SSortHandle* pHandle) {
if (tsortIsClosed(pHandle)) {
return NULL;
}
@ -890,6 +936,168 @@ STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
return &pHandle->tupleHandle;
}
static bool tsortIsPQSortApplicable(SSortHandle* pHandle) {
if (pHandle->type != SORT_SINGLESOURCE_SORT) return false;
uint64_t maxRowsFitInMemory = pHandle->sortBufSize / (pHandle->maxTupleLength + sizeof(char*));
return maxRowsFitInMemory > pHandle->maxRows;
}
static bool tsortPQCompFn(void* a, void* b, void* param) {
SSortHandle* pHandle = param;
int32_t res = pHandle->comparFn(a, b, param);
if (res < 0) return 1;
return 0;
}
static bool tsortPQComFnReverse(void*a, void* b, void* param) {
SSortHandle* pHandle = param;
int32_t res = pHandle->comparFn(a, b, param);
if (res > 0) return 1;
return 0;
}
static int32_t colDataComparFn(const void* pLeft, const void* pRight, void* param) {
char* pLTuple = (char*)pLeft;
char* pRTuple = (char*)pRight;
SSortHandle* pHandle = (SSortHandle*)param;
SArray* orderInfo = (SArray*)pHandle->pSortInfo;
uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
for (int32_t i = 0; i < orderInfo->size; ++i) {
SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(orderInfo, i);
void *lData = tupleGetField(pLTuple, pOrder->slotId, colNum);
void *rData = tupleGetField(pRTuple, pOrder->slotId, colNum);
if (!lData && !rData) continue;
if (!lData) return pOrder->nullFirst ? -1 : 1;
if (!rData) return pOrder->nullFirst ? 1 : -1;
int type = ((SColumnInfoData*)taosArrayGet(pHandle->pDataBlock->pDataBlock, pOrder->slotId))->info.type;
__compar_fn_t fn = getKeyComparFunc(type, pOrder->order);
int ret = fn(lData, rData);
if (ret == 0) {
continue;
} else {
return ret;
}
}
return 0;
}
static int32_t tsortOpenForPQSort(SSortHandle* pHandle) {
pHandle->pBoundedQueue = createBoundedQueue(pHandle->maxRows, tsortPQCompFn, destoryTuple, pHandle);
if (NULL == pHandle->pBoundedQueue) return TSDB_CODE_OUT_OF_MEMORY;
tsortSetComparFp(pHandle, colDataComparFn);
SSortSource** pSource = taosArrayGet(pHandle->pOrderedSource, 0);
SSortSource* source = *pSource;
pHandle->pDataBlock = NULL;
uint32_t tupleLen = 0;
PriorityQueueNode pqNode;
while (1) {
// fetch data
SSDataBlock* pBlock = pHandle->fetchfp(source->param);
if (NULL == pBlock) break;
if (pHandle->beforeFp != NULL) {
pHandle->beforeFp(pBlock, pHandle->param);
}
if (pHandle->pDataBlock == NULL) {
pHandle->pDataBlock = createOneDataBlock(pBlock, false);
}
if (pHandle->pDataBlock == NULL) return TSDB_CODE_OUT_OF_MEMORY;
size_t colNum = blockDataGetNumOfCols(pBlock);
if (tupleLen == 0) {
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
tupleLen += pCol->info.bytes;
if (IS_VAR_DATA_TYPE(pCol->info.type)) {
tupleLen += sizeof(VarDataLenT);
}
}
}
size_t colLen = 0;
for (size_t rowIdx = 0; rowIdx < pBlock->info.rows; ++rowIdx) {
void* pTuple = createTuple(colNum, tupleLen);
if (pTuple == NULL) return TSDB_CODE_OUT_OF_MEMORY;
uint32_t offset = tupleGetDataStartOffset(colNum);
for (size_t colIdx = 0; colIdx < colNum; ++colIdx) {
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, colIdx);
if (colDataIsNull_s(pCol, rowIdx)) {
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, 0, 0, true, tupleLen);
} else {
colLen = colDataGetRowLength(pCol, rowIdx);
offset = tupleAddField((char**)&pTuple, colNum, offset, colIdx, colDataGetData(pCol, rowIdx), colLen, false,
tupleLen);
}
}
pqNode.data = pTuple;
taosBQPush(pHandle->pBoundedQueue, &pqNode);
}
}
return TSDB_CODE_SUCCESS;
}
static STupleHandle* tsortPQSortNextTuple(SSortHandle* pHandle) {
blockDataCleanup(pHandle->pDataBlock);
blockDataEnsureCapacity(pHandle->pDataBlock, 1);
// abondan the top tuple if queue size bigger than max size
if (taosBQSize(pHandle->pBoundedQueue) == taosBQMaxSize(pHandle->pBoundedQueue) + 1) {
taosBQPop(pHandle->pBoundedQueue);
}
if (pHandle->tmpRowIdx == 0) {
// sort the results
taosBQSetFn(pHandle->pBoundedQueue, tsortPQComFnReverse);
taosBQBuildHeap(pHandle->pBoundedQueue);
}
if (taosBQSize(pHandle->pBoundedQueue) > 0) {
uint32_t colNum = blockDataGetNumOfCols(pHandle->pDataBlock);
PriorityQueueNode* node = taosBQTop(pHandle->pBoundedQueue);
char* pTuple = (char*)node->data;
for (uint32_t i = 0; i < colNum; ++i) {
void* pData = tupleGetField(pTuple, i, colNum);
if (!pData) {
colDataSetNULL(bdGetColumnInfoData(pHandle->pDataBlock, i), 0);
} else {
colDataAppend(bdGetColumnInfoData(pHandle->pDataBlock, i), 0, pData, false);
}
}
pHandle->pDataBlock->info.rows++;
pHandle->tmpRowIdx++;
taosBQPop(pHandle->pBoundedQueue);
}
if (pHandle->pDataBlock->info.rows == 0) return NULL;
pHandle->tupleHandle.pBlock = pHandle->pDataBlock;
return &pHandle->tupleHandle;
}
int32_t tsortOpen(SSortHandle* pHandle) {
if (pHandle->opened) {
return 0;
}
if (pHandle->fetchfp == NULL || pHandle->comparFn == NULL) {
return -1;
}
pHandle->opened = true;
if (tsortIsPQSortApplicable(pHandle))
return tsortOpenForPQSort(pHandle);
else
return tsortOpenForBufMergeSort(pHandle);
}
STupleHandle* tsortNextTuple(SSortHandle* pHandle) {
if (pHandle->pBoundedQueue)
return tsortPQSortNextTuple(pHandle);
else
return tsortBufMergeSortNextTuple(pHandle);
}
bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colIndex) {
SColumnInfoData* pColInfoSrc = taosArrayGet(pVHandle->pBlock->pDataBlock, colIndex);
return colDataIsNull_s(pColInfoSrc, pVHandle->rowIndex);

View File

@ -502,6 +502,7 @@ static int32_t logicSortCopy(const SSortLogicNode* pSrc, SSortLogicNode* pDst) {
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
CLONE_NODE_LIST_FIELD(pSortKeys);
COPY_SCALAR_FIELD(groupSort);
COPY_SCALAR_FIELD(maxRows);
return TSDB_CODE_SUCCESS;
}

View File

@ -2100,6 +2100,7 @@ static int32_t jsonToPhysiMergeNode(const SJson* pJson, void* pObj) {
static const char* jkSortPhysiPlanExprs = "Exprs";
static const char* jkSortPhysiPlanSortKeys = "SortKeys";
static const char* jkSortPhysiPlanTargets = "Targets";
static const char* jkSortPhysiPlanMaxRows = "MaxRows";
static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
@ -2114,6 +2115,9 @@ static int32_t physiSortNodeToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = nodeListToJson(pJson, jkSortPhysiPlanTargets, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSortPhysiPlanMaxRows, pNode->maxRows);
}
return code;
}
@ -2131,6 +2135,9 @@ static int32_t jsonToPhysiSortNode(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = jsonToNodeList(pJson, jkSortPhysiPlanTargets, &pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBigIntValue(pJson, jkSortPhysiPlanMaxRows, &pNode->maxRows);
}
return code;
}

View File

@ -2594,7 +2594,7 @@ static int32_t msgToPhysiMergeNode(STlvDecoder* pDecoder, void* pObj) {
return code;
}
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS };
enum { PHY_SORT_CODE_BASE_NODE = 1, PHY_SORT_CODE_EXPR, PHY_SORT_CODE_SORT_KEYS, PHY_SORT_CODE_TARGETS, PHY_SORT_CODE_MAX_ROWS };
static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
const SSortPhysiNode* pNode = (const SSortPhysiNode*)pObj;
@ -2609,6 +2609,9 @@ static int32_t physiSortNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeObj(pEncoder, PHY_SORT_CODE_TARGETS, nodeListToMsg, pNode->pTargets);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeI64(pEncoder, PHY_SORT_CODE_MAX_ROWS, pNode->maxRows);
}
return code;
}
@ -2632,6 +2635,9 @@ static int32_t msgToPhysiSortNode(STlvDecoder* pDecoder, void* pObj) {
case PHY_SORT_CODE_TARGETS:
code = msgToNodeListFromTlv(pTlv, (void**)&pNode->pTargets);
break;
case PHY_SORT_CODE_MAX_ROWS:
code = tlvDecodeI64(pTlv, &pNode->maxRows);
break;
default:
break;
}

View File

@ -1027,6 +1027,7 @@ static int32_t createSortLogicNode(SLogicPlanContext* pCxt, SSelectStmt* pSelect
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->maxRows = -1;
pSort->groupSort = pSelect->groupSort;
pSort->node.groupAction = pSort->groupSort ? GROUP_ACTION_KEEP : GROUP_ACTION_CLEAR;
pSort->node.requireDataOrder = DATA_ORDER_LEVEL_NONE;
@ -1298,6 +1299,7 @@ static int32_t createSetOpSortLogicNode(SLogicPlanContext* pCxt, SSetOperator* p
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->maxRows = -1;
TSWAP(pSort->node.pLimit, pSetOperator->pLimit);
int32_t code = TSDB_CODE_SUCCESS;

View File

@ -770,7 +770,8 @@ static bool pushDownCondOptIsColEqualOnCond(SJoinLogicNode* pJoin, SNode* pCond)
SColumnNode* pLeft = (SColumnNode*)(pOper->pLeft);
SColumnNode* pRight = (SColumnNode*)(pOper->pRight);
// TODO: add cast to operator and remove this restriction of optimization
if (pLeft->node.resType.type != pRight->node.resType.type || pLeft->node.resType.bytes != pRight->node.resType.bytes) {
if (pLeft->node.resType.type != pRight->node.resType.type ||
pLeft->node.resType.bytes != pRight->node.resType.bytes) {
return false;
}
SNodeList* pLeftCols = ((SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 0))->pTargets;
@ -2629,8 +2630,16 @@ static int32_t tagScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
}
static bool pushDownLimitOptShouldBeOptimized(SLogicNode* pNode) {
if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren) ||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
if (NULL == pNode->pLimit || 1 != LIST_LENGTH(pNode->pChildren)) {
return false;
}
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
SLimitNode* pChildLimit = (SLimitNode*)(pChild->pLimit);
// if we have pushed down, we skip it
if ((*(SSortLogicNode*)pChild).maxRows != -1) return false;
} else if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pChild)) {
return false;
}
return true;
@ -2644,8 +2653,18 @@ static int32_t pushDownLimitOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLog
SLogicNode* pChild = (SLogicNode*)nodesListGetNode(pNode->pChildren, 0);
nodesDestroyNode(pChild->pLimit);
if (QUERY_NODE_LOGIC_PLAN_SORT == nodeType(pChild)) {
SLimitNode* pLimitNode = (SLimitNode*)pNode->pLimit;
int64_t maxRows = -1;
if (pLimitNode->limit != -1) {
maxRows = pLimitNode->limit;
if (pLimitNode->offset != -1) maxRows += pLimitNode->offset;
}
((SSortLogicNode*)pChild)->maxRows = maxRows;
} else {
pChild->pLimit = pNode->pLimit;
pNode->pLimit = NULL;
}
pCxt->optimized = true;
return TSDB_CODE_SUCCESS;

View File

@ -1374,6 +1374,7 @@ static int32_t createSortPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
if (NULL == pSort) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pSort->maxRows = pSortLogicNode->maxRows;
SNodeList* pPrecalcExprs = NULL;
SNodeList* pSortKeys = NULL;

View File

@ -1018,6 +1018,7 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut
splSetParent((SLogicNode*)pPartSort);
pPartSort->pSortKeys = pSortKeys;
pPartSort->groupSort = pSort->groupSort;
pPartSort->maxRows = pSort->maxRows;
code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
}

View File

@ -482,6 +482,7 @@ int64_t syncLogBufferProceed(SSyncLogBuffer* pBuf, SSyncNode* pNode, SyncTerm* p
if (syncLogStorePersist(pLogStore, pNode, pEntry) < 0) {
sError("vgId:%d, failed to persist sync log entry from buffer since %s. index:%" PRId64, pNode->vgId, terrstr(),
pEntry->index);
taosMsleep(1);
goto _out;
}
ASSERT(pEntry->index == pBuf->matchIndex);

View File

@ -364,7 +364,7 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64
if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
pSyncNode->hbSlowNum++;
sNInfo(pSyncNode,
sNTrace(pSyncNode,
"recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);

View File

@ -187,3 +187,172 @@ void heapRemove(Heap* heap, HeapNode* node) {
}
void heapDequeue(Heap* heap) { heapRemove(heap, heap->min); }
struct PriorityQueue {
SArray* container;
pq_comp_fn fn;
FDelete deleteFn;
void* param;
};
PriorityQueue* createPriorityQueue(pq_comp_fn fn, FDelete deleteFn, void* param) {
PriorityQueue* pq = (PriorityQueue*)taosMemoryCalloc(1, sizeof(PriorityQueue));
pq->container = taosArrayInit(1, sizeof(PriorityQueueNode));
pq->fn = fn;
pq->deleteFn = deleteFn;
pq->param = param;
return pq;
}
void taosPQSetFn(PriorityQueue* pq, pq_comp_fn fn) {
pq->fn = fn;
}
void destroyPriorityQueue(PriorityQueue* pq) {
if (pq->deleteFn)
taosArrayDestroyP(pq->container, pq->deleteFn);
else
taosArrayDestroy(pq->container);
taosMemoryFree(pq);
}
static size_t pqParent(size_t i) { return (--i) >> 1; /* (i - 1) / 2 */ }
static size_t pqLeft(size_t i) { return (i << 1) | 1; /* i * 2 + 1 */ }
static size_t pqRight(size_t i) { return (++i) << 1; /* (i + 1) * 2 */}
static void pqSwapPQNode(PriorityQueueNode* a, PriorityQueueNode* b) {
void * tmp = a->data;
a->data = b->data;
b->data = tmp;
}
#define pqContainerGetEle(pq, i) ((PriorityQueueNode*)taosArrayGet((pq)->container, (i)))
#define pqContainerSize(pq) (taosArrayGetSize((pq)->container))
size_t taosPQSize(PriorityQueue* pq) { return pqContainerSize(pq); }
static void pqHeapify(PriorityQueue* pq, size_t from, size_t last) {
size_t largest = from;
do {
from = largest;
size_t l = pqLeft(from);
size_t r = pqRight(from);
if (l < last && pq->fn(pqContainerGetEle(pq, from)->data, pqContainerGetEle(pq, l)->data, pq->param)) {
largest = l;
}
if (r < last && pq->fn(pqContainerGetEle(pq, largest)->data, pqContainerGetEle(pq, r)->data, pq->param)) {
largest = r;
}
if (largest != from) {
pqSwapPQNode(pqContainerGetEle(pq, from), pqContainerGetEle(pq, largest));
}
} while (largest != from);
}
static void pqBuildHeap(PriorityQueue* pq) {
if (pqContainerSize(pq) > 1) {
for (size_t i = pqContainerSize(pq) - 1; i > 0; --i) {
pqHeapify(pq, i, pqContainerSize(pq));
}
pqHeapify(pq, 0, pqContainerSize(pq));
}
}
static void pqReverseHeapify(PriorityQueue* pq, size_t i) {
while (i > 0 && !pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) {
size_t parentIdx = pqParent(i);
pqSwapPQNode(pqContainerGetEle(pq, i), pqContainerGetEle(pq, parentIdx));
i = parentIdx;
}
}
static void pqUpdate(PriorityQueue* pq, size_t i) {
if (i == 0 || pq->fn(pqContainerGetEle(pq, i)->data, pqContainerGetEle(pq, pqParent(i))->data, pq->param)) {
// if value in pos i is smaller than parent, heapify down from i to the end
pqHeapify(pq, i, pqContainerSize(pq));
} else {
// if value in pos i is big than parent, heapify up from i
pqReverseHeapify(pq, i);
}
}
static void pqRemove(PriorityQueue* pq, size_t i) {
if (i == pqContainerSize(pq) - 1) {
taosArrayPop(pq->container);
return;
}
taosArraySet(pq->container, i, taosArrayGet(pq->container, pqContainerSize(pq) - 1));
taosArrayPop(pq->container);
pqUpdate(pq, i);
}
PriorityQueueNode* taosPQTop(PriorityQueue* pq) {
return pqContainerGetEle(pq, 0);
}
void taosPQPush(PriorityQueue* pq, const PriorityQueueNode* node) {
taosArrayPush(pq->container, node);
pqReverseHeapify(pq, pqContainerSize(pq) - 1);
}
void taosPQPop(PriorityQueue* pq) {
PriorityQueueNode* top = taosPQTop(pq);
if (pq->deleteFn) pq->deleteFn(top->data);
pqRemove(pq, 0);
}
struct BoundedQueue {
PriorityQueue* queue;
uint32_t maxSize;
};
BoundedQueue* createBoundedQueue(uint32_t maxSize, pq_comp_fn fn, FDelete deleteFn, void* param) {
BoundedQueue* q = (BoundedQueue*)taosMemoryCalloc(1, sizeof(BoundedQueue));
q->queue = createPriorityQueue(fn, deleteFn, param);
taosArrayEnsureCap(q->queue->container, maxSize + 1);
q->maxSize = maxSize;
return q;
}
void taosBQSetFn(BoundedQueue* q, pq_comp_fn fn) {
taosPQSetFn(q->queue, fn);
}
void destroyBoundedQueue(BoundedQueue* q) {
if (!q) return;
destroyPriorityQueue(q->queue);
taosMemoryFree(q);
}
void taosBQPush(BoundedQueue* q, PriorityQueueNode* n) {
if (pqContainerSize(q->queue) == q->maxSize + 1) {
PriorityQueueNode* top = pqContainerGetEle(q->queue, 0);
void *p = top->data;
top->data = n->data;
n->data = p;
if (q->queue->deleteFn) q->queue->deleteFn(n->data);
pqHeapify(q->queue, 0, taosBQSize(q));
} else {
taosPQPush(q->queue, n);
}
}
PriorityQueueNode* taosBQTop(BoundedQueue* q) {
return taosPQTop(q->queue);
}
void taosBQBuildHeap(BoundedQueue *q) {
pqBuildHeap(q->queue);
}
size_t taosBQMaxSize(BoundedQueue* q) {
return q->maxSize;
}
size_t taosBQSize(BoundedQueue* q) {
return taosPQSize(q->queue);
}
void taosBQPop(BoundedQueue* q) {
taosPQPop(q->queue);
}

View File

@ -33,6 +33,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeStb3.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/subscribeDb0.py -N 3 -n 3
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/ins_topics_test.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqClientConsLog.py
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/delete_stable.py

View File

@ -468,7 +468,7 @@ if $data01 != 1 then
endi
## supertable aggregation + where + interval + group by order by tag + limit offset
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9),t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 2 offset 0
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9),t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 2 offset 0
if $rows != 2 then
return -1
endi

View File

@ -508,7 +508,7 @@ endi
### supertable aggregation + where + interval + group by order by tag + limit offset
## TBASE-345
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 3 offset 0
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 5 and c1 > 0 and c2 < 9 and c3 > 1 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 3 offset 0
if $rows != 3 then
return -1
endi
@ -554,7 +554,7 @@ if $data09 != 4 then
return -1
endi
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 and c1 > 0 and c2 < 9 and c3 > 4 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc limit 3 offset 0
sql select _wstart, max(c1), min(c2), avg(c3), sum(c5), spread(c6), first(c7), last(c8), first(c9), t1 from $stb where ts >= $ts0 and ts <= $tsu and t1 > 1 and t1 < 8 and c1 > 0 and c2 < 9 and c3 > 4 and c4 < 7 and c5 > 4 partition by t1 interval(5m) order by t1 desc, max(c1) asc limit 3 offset 0
if $rows != 3 then
return -1
endi

View File

@ -126,7 +126,6 @@ endi
if $data10 != 1 then
return -1
endi
sql (select 'ab' as options from union_tb1 limit 1) union all (select 'dd' as options from union_tb0 limit 1) order by options;
if $rows != 2 then
return -1

View File

@ -2558,3 +2558,243 @@ taos> select a.ts, a.c2, b.c2 from meters as a join (select * from meters order
2022-05-24 00:01:08.000 | 210 | 210 |
2022-05-24 00:01:08.000 | 210 | 210 |
taos> select ts, c2 from meters order by c2;
ts | c2 |
========================================
2022-05-21 00:01:08.000 | 11 |
2022-05-21 00:01:08.000 | 11 |
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
2022-05-23 00:01:08.000 | 116 |
2022-05-20 00:01:08.000 | 120 |
2022-05-20 00:01:08.000 | 120 |
2022-05-16 00:01:08.000 | 136 |
2022-05-16 00:01:08.000 | 136 |
2022-05-22 00:01:08.000 | 196 |
2022-05-22 00:01:08.000 | 196 |
2022-05-24 00:01:08.000 | 210 |
2022-05-24 00:01:08.000 | 210 |
2022-05-15 00:01:08.000 | 234 |
2022-05-15 00:01:08.000 | 234 |
2022-05-19 00:01:08.000 | 243 |
2022-05-19 00:01:08.000 | 243 |
taos> select ts, c2 from meters order by c2 limit 4;
ts | c2 |
========================================
2022-05-21 00:01:08.000 | 11 |
2022-05-21 00:01:08.000 | 11 |
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
taos> select ts, c2 from meters order by c2 limit 2,2;
ts | c2 |
========================================
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
taos> select ts, c2 from meters order by ts asc, c2 desc limit 10;
ts | c2 |
========================================
2022-05-15 00:01:08.000 | 234 |
2022-05-15 00:01:08.000 | 234 |
2022-05-16 00:01:08.000 | 136 |
2022-05-16 00:01:08.000 | 136 |
2022-05-17 00:01:08.000 | 59 |
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
2022-05-19 00:01:08.000 | 243 |
2022-05-19 00:01:08.000 | 243 |
taos> select ts, c2 from meters order by ts asc, c2 desc limit 5,5;
ts | c2 |
========================================
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-18 00:01:08.000 | 58 |
2022-05-19 00:01:08.000 | 243 |
2022-05-19 00:01:08.000 | 243 |
taos> select ts, c2 from d1 order by c2;
ts | c2 |
========================================
2022-05-21 00:01:08.000 | 11 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
2022-05-20 00:01:08.000 | 120 |
2022-05-16 00:01:08.000 | 136 |
2022-05-22 00:01:08.000 | 196 |
2022-05-24 00:01:08.000 | 210 |
2022-05-15 00:01:08.000 | 234 |
2022-05-19 00:01:08.000 | 243 |
taos> select ts, c2 from d1 order by c2 limit 4;
ts | c2 |
========================================
2022-05-21 00:01:08.000 | 11 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
taos> select ts, c2 from d1 order by c2 limit 2,2;
ts | c2 |
========================================
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
taos> select ts, c2 from d1 order by ts asc, c2 desc limit 10;
ts | c2 |
========================================
2022-05-15 00:01:08.000 | 234 |
2022-05-16 00:01:08.000 | 136 |
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-19 00:01:08.000 | 243 |
2022-05-20 00:01:08.000 | 120 |
2022-05-21 00:01:08.000 | 11 |
2022-05-22 00:01:08.000 | 196 |
2022-05-23 00:01:08.000 | 116 |
2022-05-24 00:01:08.000 | 210 |
taos> select ts, c2 from d1 order by ts asc, c2 desc limit 5,5;
ts | c2 |
========================================
2022-05-20 00:01:08.000 | 120 |
2022-05-21 00:01:08.000 | 11 |
2022-05-22 00:01:08.000 | 196 |
2022-05-23 00:01:08.000 | 116 |
2022-05-24 00:01:08.000 | 210 |
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc;
_wstart | d | avg(c) |
================================================================================
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 |
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 |
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 |
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 |
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 |
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 |
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 |
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 |
2022-05-18 13:00:00.000 | 2022-05-18 14:25:00.000 | 169.000000000 |
2022-05-15 15:00:00.000 | 2022-05-15 18:01:00.000 | 160.500000000 |
2022-05-19 19:00:00.000 | 2022-05-19 19:13:00.000 | 144.600000000 |
2022-05-15 20:00:00.000 | 2022-05-16 00:01:00.000 | 136.000000000 |
2022-05-18 08:00:00.000 | 2022-05-18 09:37:00.000 | 132.000000000 |
2022-05-16 01:00:00.000 | 2022-05-16 04:49:00.000 | 120.600000000 |
2022-05-20 00:00:00.000 | 2022-05-20 00:01:00.000 | 120.000000000 |
2022-05-16 06:00:00.000 | 2022-05-16 09:37:00.000 | 105.200000000 |
2022-05-18 03:00:00.000 | 2022-05-18 04:49:00.000 | 95.000000000 |
2022-05-20 05:00:00.000 | 2022-05-20 06:01:00.000 | 92.750000000 |
2022-05-16 11:00:00.000 | 2022-05-16 14:25:00.000 | 89.800000000 |
2022-05-16 16:00:00.000 | 2022-05-16 19:13:00.000 | 74.400000000 |
2022-05-20 10:00:00.000 | 2022-05-20 12:01:00.000 | 65.500000000 |
2022-05-16 21:00:00.000 | 2022-05-17 00:01:00.000 | 59.000000000 |
2022-05-17 02:00:00.000 | 2022-05-17 04:49:00.000 | 58.800000000 |
2022-05-17 07:00:00.000 | 2022-05-17 09:37:00.000 | 58.600000000 |
2022-05-17 12:00:00.000 | 2022-05-17 14:25:00.000 | 58.400000000 |
2022-05-17 17:00:00.000 | 2022-05-17 19:13:00.000 | 58.200000000 |
2022-05-17 22:00:00.000 | 2022-05-18 00:01:00.000 | 58.000000000 |
2022-05-20 15:00:00.000 | 2022-05-20 18:01:00.000 | 38.250000000 |
2022-05-20 20:00:00.000 | 2022-05-21 00:01:00.000 | 11.000000000 |
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2;
_wstart | d | avg(c) |
================================================================================
2022-05-18 23:00:00.000 | 2022-05-19 00:01:00.000 | 243.000000000 |
2022-05-15 00:00:00.000 | 2022-05-15 00:01:00.000 | 234.000000000 |
taos> select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6;
_wstart | d | avg(c) |
================================================================================
2022-05-19 04:00:00.000 | 2022-05-19 04:49:00.000 | 218.400000000 |
2022-05-15 05:00:00.000 | 2022-05-15 06:01:00.000 | 209.500000000 |
2022-05-18 18:00:00.000 | 2022-05-18 19:13:00.000 | 206.000000000 |
2022-05-19 09:00:00.000 | 2022-05-19 09:37:00.000 | 193.800000000 |
2022-05-15 10:00:00.000 | 2022-05-15 12:01:00.000 | 185.000000000 |
2022-05-19 14:00:00.000 | 2022-05-19 14:25:00.000 | 169.200000000 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10;
last(ts) | d |
========================================
2022-05-19 00:01:08.000 | 243 |
2022-05-15 00:01:08.000 | 234 |
2022-05-24 00:01:08.000 | 210 |
2022-05-22 00:01:08.000 | 196 |
2022-05-16 00:01:08.000 | 136 |
2022-05-20 00:01:08.000 | 120 |
2022-05-23 00:01:08.000 | 116 |
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-21 00:01:08.000 | 11 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 2,8;
last(ts) | d |
========================================
2022-05-24 00:01:08.000 | 210 |
2022-05-22 00:01:08.000 | 196 |
2022-05-16 00:01:08.000 | 136 |
2022-05-20 00:01:08.000 | 120 |
2022-05-23 00:01:08.000 | 116 |
2022-05-17 00:01:08.000 | 59 |
2022-05-18 00:01:08.000 | 58 |
2022-05-21 00:01:08.000 | 11 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 9,1;
last(ts) | d |
========================================
2022-05-21 00:01:08.000 | 11 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 2,8;
last(ts) | d |
========================================
2022-05-17 00:01:08.000 | 59 |
2022-05-23 00:01:08.000 | 116 |
2022-05-20 00:01:08.000 | 120 |
2022-05-16 00:01:08.000 | 136 |
2022-05-22 00:01:08.000 | 196 |
2022-05-24 00:01:08.000 | 210 |
2022-05-15 00:01:08.000 | 234 |
2022-05-19 00:01:08.000 | 243 |
taos> select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 9,1;
last(ts) | d |
========================================
2022-05-19 00:01:08.000 | 243 |
taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 10;
ts | d |
========================================
2022-05-24 00:01:08.000 | 210 |
2022-05-23 00:01:08.000 | 116 |
2022-05-22 00:01:08.000 | 196 |
2022-05-21 00:01:08.000 | 11 |
2022-05-20 00:01:08.000 | 120 |
2022-05-19 00:01:08.000 | 243 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-16 00:01:08.000 | 136 |
2022-05-15 00:01:08.000 | 234 |
taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 2,8;
ts | d |
========================================
2022-05-22 00:01:08.000 | 196 |
2022-05-21 00:01:08.000 | 11 |
2022-05-20 00:01:08.000 | 120 |
2022-05-19 00:01:08.000 | 243 |
2022-05-18 00:01:08.000 | 58 |
2022-05-17 00:01:08.000 | 59 |
2022-05-16 00:01:08.000 | 136 |
2022-05-15 00:01:08.000 | 234 |
taos> select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 9,1;
ts | d |
========================================
2022-05-15 00:01:08.000 | 234 |

View File

@ -71,3 +71,30 @@ select a.ts, a.c2, b.c2 from meters as a join meters as b on a.ts = b.ts order b
explain verbose true select a.ts, a.c2, b.c2 from meters as a join (select ts, c2 from meters order by ts desc) b on a.ts = b.ts order by a.ts desc\G;
explain verbose true select a.ts, a.c2, b.c2 from meters as a join (select ts, c2 from meters order by ts desc) b on a.ts = b.ts order by a.ts asc\G;
select a.ts, a.c2, b.c2 from meters as a join (select * from meters order by ts desc) b on a.ts = b.ts order by a.ts asc;
select ts, c2 from meters order by c2;
select ts, c2 from meters order by c2 limit 4;
select ts, c2 from meters order by c2 limit 2,2;
select ts, c2 from meters order by ts asc, c2 desc limit 10;
select ts, c2 from meters order by ts asc, c2 desc limit 5,5;
select ts, c2 from d1 order by c2;
select ts, c2 from d1 order by c2 limit 4;
select ts, c2 from d1 order by c2 limit 2,2;
select ts, c2 from d1 order by ts asc, c2 desc limit 10;
select ts, c2 from d1 order by ts asc, c2 desc limit 5,5;
select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc;
select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2;
select _wstart, first(a) as d, avg(c) from (select _wstart as a, last(ts) as b, avg(c2) as c from meters interval(10s) order by a desc) where a > '2022-05-15 00:01:00.000' and a < '2022-05-21 00:01:08.000' interval(5h) fill(linear) order by avg(c) desc limit 2,6;
select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 10;
select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 2,8;
select last(ts), c2 as d from d1 group by c2 order by c2 desc limit 9,1;
select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 2,8;
select last(ts), c2 as d from d1 group by c2 order by c2 asc limit 9,1;
select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 10;
select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 2,8;
select last(ts) as ts, c2 as d from d1 group by c2 order by ts desc, c2 asc limit 9,1;

View File

@ -328,9 +328,28 @@ class TDTestCase:
tdLog.exit("split vgroup transaction is not finished after executing 50s")
return False
# split error
def expectSplitError(self, dbName):
vgids = self.getVGroup(dbName)
selid = random.choice(vgids)
sql = f"split vgroup {selid}"
tdLog.info(sql)
tdSql.error(sql)
# expect split ok
def expectSplitOk(self, dbName):
# split vgroup
vgList1 = self.getVGroup(dbName)
self.splitVGroup(dbName)
vgList2 = self.getVGroup(dbName)
vgNum1 = len(vgList1) + 1
vgNum2 = len(vgList2)
if vgNum1 != vgNum2:
tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}")
return
# split empty database
def splitEmptyDB(self):
dbName = "emptydb"
vgNum = 2
# create database
@ -339,17 +358,33 @@ class TDTestCase:
tdSql.execute(sql)
# split vgroup
self.splitVGroup(dbName)
vgList = self.getVGroup(dbName)
vgNum1 = len(vgList)
vgNum2 = vgNum + 1
if vgNum1 != vgNum2:
tdLog.exit(f" vglist len={vgNum1} is not same for expect {vgNum2}")
return
self.expectSplitOk(dbName)
# forbid
def checkForbid(self):
# stream
tdLog.info("check forbid split having stream...")
tdSql.execute("create database streamdb;")
tdSql.execute("use streamdb;")
tdSql.execute("create table ta(ts timestamp, age int);")
tdSql.execute("create stream ma into sta as select count(*) from ta interval(1s);")
self.expectSplitError("streamdb")
tdSql.execute("drop stream ma;")
self.expectSplitOk("streamdb")
# topic
tdLog.info("check forbid split having topic...")
tdSql.execute("create database topicdb wal_retention_period 10;")
tdSql.execute("use topicdb;")
tdSql.execute("create table ta(ts timestamp, age int);")
tdSql.execute("create topic toa as select * from ta;")
self.expectSplitError("topicdb")
tdSql.execute("drop topic toa;")
self.expectSplitOk("topicdb")
# run
def run(self):
# prepare env
self.prepareEnv()
@ -360,12 +395,13 @@ class TDTestCase:
# check two db query result same
self.checkResult()
tdLog.info(f"split vgroup i={i} passed.")
# split empty db
self.splitEmptyDB()
# check topic and stream forib
self.checkForbid()
# stop
def stop(self):

View File

@ -321,7 +321,7 @@ class TDTestCase:
limit = 5
offset = paraDict["rowsPerTbl"] * 2
offset = offset - 2
sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1 limit %d offset %d"%(limit, offset)
sqlStr = f"select max(c1), min(c2), sum(c3), avg(c4), first(c7), last(c8), first(c9) from lm2_tb0 where ts >= 1537146000000 and ts <= 1543145400000 partition by t1 interval(5m) fill(value, -1, -2, -3, -4 ,-7 ,'-8', '-9') order by t1, max(c1) limit %d offset %d"%(limit, offset)
# tdLog.info("====sql:%s"%(sqlStr))
tdSql.query(sqlStr)
tdSql.checkRows(1)

View File

@ -0,0 +1,82 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
#
# The option for wal_retetion_period and wal_retention_size is work well
#
import taos
from taos.tmq import Consumer
import os
import sys
import threading
import json
import time
from datetime import date
from datetime import datetime
from datetime import timedelta
from os import path
# consume topic
def consume_topic(topic_name, consume_cnt, wait):
print("start consume...")
consumer = Consumer(
{
"group.id": "tg2",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
}
)
print("start subscrite...")
consumer.subscribe([topic_name])
cnt = 0
try:
while True and cnt < consume_cnt:
res = consumer.poll(1)
if not res:
if wait:
continue
else:
break
err = res.error()
if err is not None:
raise err
val = res.value()
cnt += 1
print(f" consume {cnt} ")
for block in val:
print(block.fetchall())
finally:
consumer.unsubscribe()
consumer.close()
if __name__ == "__main__":
print(sys.argv)
if len(sys.argv) < 2:
print(" please input topic name for consume . -c for wait")
else:
wait = False
if "-c" == sys.argv[1]:
wait = True
topic = sys.argv[2]
else:
topic = sys.argv[1]
print(f' wait={wait} topic={topic}')
consume_topic(topic, 10000000, wait)

View File

@ -0,0 +1,84 @@
import time
import os
import subprocess
import random
import platform
class dnode():
def __init__(self, pid, path):
self.pid = pid
self.path = path
# run exePath no wait finished
def runNoWait(exePath):
if platform.system().lower() == 'windows':
cmd = f"mintty -h never {exePath}"
else:
cmd = f"nohup {exePath} > /dev/null 2>&1 & "
if os.system(cmd) != 0:
return False
else:
return True
# get online dnodes
def getDnodes():
cmd = "ps aux | grep taosd | awk '{{print $2,$11,$12,$13}}'"
result = os.system(cmd)
result=subprocess.check_output(cmd,shell=True)
strout = result.decode('utf-8').split("\n")
dnodes = []
for line in strout:
cols = line.split(' ')
if len(cols) != 4:
continue
exepath = cols[1]
if len(exepath) < 5 :
continue
if exepath[-5:] != 'taosd':
continue
# add to list
path = cols[1] + " " + cols[2] + " " + cols[3]
dnodes.append(dnode(cols[0], path))
print(" show dnodes cnt=%d...\n"%(len(dnodes)))
for dn in dnodes:
print(f" pid={dn.pid} path={dn.path}")
return dnodes
def restartDnodes(dnodes, cnt, seconds):
print(f"start dnode cnt={cnt} wait={seconds}s")
selects = random.sample(dnodes, cnt)
for select in selects:
print(f" kill -9 {select.pid}")
cmd = f"kill -9 {select.pid}"
os.system(cmd)
print(f" restart {select.path}")
if runNoWait(select.path) == False:
print(f"run {select.path} failed.")
raise Exception("exe failed.")
print(f" sleep {seconds}s ...")
time.sleep(seconds)
def run():
# kill seconds interval
killLoop = 10
minKill = 1
maxKill = 10
for i in range(killLoop):
dnodes = getDnodes()
killCnt = 0
if len(dnodes) > 0:
killCnt = random.randint(1, len(dnodes))
restartDnodes(dnodes, killCnt, random.randint(1, 5))
seconds = random.randint(minKill, maxKill)
print(f"----------- kill loop i={i} killCnt={killCnt} done. do sleep {seconds}s ... \n")
time.sleep(seconds)
if __name__ == '__main__':
run()