Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/row_refact
This commit is contained in:
commit
01869960a3
|
@ -122,6 +122,10 @@ typedef enum EFunctionType {
|
||||||
// internal function
|
// internal function
|
||||||
FUNCTION_TYPE_SELECT_VALUE,
|
FUNCTION_TYPE_SELECT_VALUE,
|
||||||
|
|
||||||
|
// distributed splitting functions
|
||||||
|
FUNCTION_TYPE_APERCENTILE_PARTIAL,
|
||||||
|
FUNCTION_TYPE_APERCENTILE_MERGE,
|
||||||
|
|
||||||
// user defined funcion
|
// user defined funcion
|
||||||
FUNCTION_TYPE_UDF = 10000
|
FUNCTION_TYPE_UDF = 10000
|
||||||
} EFunctionType;
|
} EFunctionType;
|
||||||
|
|
|
@ -1216,7 +1216,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
|
||||||
|
|
||||||
pBlock->info.numOfCols = numOfCols;
|
pBlock->info.numOfCols = numOfCols;
|
||||||
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
|
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
|
||||||
pBlock->info.rowSize = pDataBlock->info.rows;
|
pBlock->info.rowSize = pDataBlock->info.rowSize;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
SColumnInfoData colInfo = {0};
|
SColumnInfoData colInfo = {0};
|
||||||
|
|
|
@ -47,6 +47,9 @@ extern "C" {
|
||||||
#define EXPLAIN_TIME_WINDOWS_FORMAT "Time Window: interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c"
|
#define EXPLAIN_TIME_WINDOWS_FORMAT "Time Window: interval=%" PRId64 "%c offset=%" PRId64 "%c sliding=%" PRId64 "%c"
|
||||||
#define EXPLAIN_WINDOW_FORMAT "Window: gap=%" PRId64
|
#define EXPLAIN_WINDOW_FORMAT "Window: gap=%" PRId64
|
||||||
#define EXPLAIN_RATIO_TIME_FORMAT "Ratio: %f"
|
#define EXPLAIN_RATIO_TIME_FORMAT "Ratio: %f"
|
||||||
|
#define EXPLAIN_MERGE_FORMAT "Merge"
|
||||||
|
#define EXPLAIN_MERGE_KEYS_FORMAT "Merge Key: "
|
||||||
|
|
||||||
#define EXPLAIN_PLANNING_TIME_FORMAT "Planning Time: %.3f ms"
|
#define EXPLAIN_PLANNING_TIME_FORMAT "Planning Time: %.3f ms"
|
||||||
#define EXPLAIN_EXEC_TIME_FORMAT "Execution Time: %.3f ms"
|
#define EXPLAIN_EXEC_TIME_FORMAT "Execution Time: %.3f ms"
|
||||||
|
|
||||||
|
|
|
@ -173,6 +173,11 @@ int32_t qExplainGenerateResChildren(SPhysiNode *pNode, SExplainGroup *group, SNo
|
||||||
pPhysiChildren = partitionPhysiNode->node.pChildren;
|
pPhysiChildren = partitionPhysiNode->node.pChildren;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case QUERY_NODE_PHYSICAL_PLAN_MERGE: {
|
||||||
|
SMergePhysiNode *mergePhysiNode = (SMergePhysiNode *)pNode;
|
||||||
|
pPhysiChildren = mergePhysiNode->node.pChildren;
|
||||||
|
break;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
qError("not supported physical node type %d", pNode->type);
|
qError("not supported physical node type %d", pNode->type);
|
||||||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||||
|
@ -857,6 +862,50 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case QUERY_NODE_PHYSICAL_PLAN_MERGE: {
|
||||||
|
SMergePhysiNode *pMergeNode = (SMergePhysiNode *)pNode;
|
||||||
|
EXPLAIN_ROW_NEW(level, EXPLAIN_MERGE_FORMAT);
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
|
||||||
|
if (pResNode->pExecInfo) {
|
||||||
|
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
|
}
|
||||||
|
|
||||||
|
SDataBlockDescNode *pDescNode = pMergeNode->node.pOutputDataBlockDesc;
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pDescNode->pSlots));
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDescNode->totalRowSize);
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
|
||||||
|
EXPLAIN_ROW_END();
|
||||||
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
|
||||||
|
|
||||||
|
if (verbose) {
|
||||||
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_OUTPUT_FORMAT);
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT,
|
||||||
|
nodesGetOutputNumFromSlotList(pMergeNode->node.pOutputDataBlockDesc->pSlots));
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
|
||||||
|
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pMergeNode->node.pOutputDataBlockDesc->outputRowSize);
|
||||||
|
EXPLAIN_ROW_END();
|
||||||
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
|
|
||||||
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_MERGE_KEYS_FORMAT);
|
||||||
|
for (int32_t i = 0; i < LIST_LENGTH(pMergeNode->pMergeKeys); ++i) {
|
||||||
|
SOrderByExprNode *ptn = nodesListGetNode(pMergeNode->pMergeKeys, i);
|
||||||
|
EXPLAIN_ROW_APPEND("%s ", nodesGetNameFromColumnNode(ptn->pExpr));
|
||||||
|
}
|
||||||
|
EXPLAIN_ROW_END();
|
||||||
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
|
|
||||||
|
if (pMergeNode->node.pConditions) {
|
||||||
|
EXPLAIN_ROW_NEW(level + 1, EXPLAIN_FILTER_FORMAT);
|
||||||
|
QRY_ERR_RET(nodesNodeToSQL(pMergeNode->node.pConditions, tbuf + VARSTR_HEADER_SIZE,
|
||||||
|
TSDB_EXPLAIN_RESULT_ROW_SIZE, &tlen));
|
||||||
|
EXPLAIN_ROW_END();
|
||||||
|
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level + 1));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
default:
|
default:
|
||||||
qError("not supported physical node type %d", pNode->type);
|
qError("not supported physical node type %d", pNode->type);
|
||||||
return TSDB_CODE_QRY_APP_ERROR;
|
return TSDB_CODE_QRY_APP_ERROR;
|
||||||
|
|
|
@ -787,7 +787,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
|
||||||
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SNode* pCondition, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock, SArray* pSortInfo, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
|
SArray* pIndexMap, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams,
|
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock,
|
||||||
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
|
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
|
||||||
SExecTaskInfo* pTaskInfo);
|
SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
|
@ -130,6 +130,12 @@ bool tsortIsNullVal(STupleHandle* pVHandle, int32_t colId);
|
||||||
*/
|
*/
|
||||||
void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
|
void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* @param pVHandle
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
uint64_t tsortGetGroupId(STupleHandle* pVHandle);
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
* @param pSortHandle
|
* @param pSortHandle
|
||||||
|
|
|
@ -4666,8 +4666,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
int32_t numOfOutputCols = 0;
|
int32_t numOfOutputCols = 0;
|
||||||
SArray* pColList =
|
SArray* pColList =
|
||||||
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
|
extractColMatchInfo(pMergePhyNode->pTargets, pDescNode, &numOfOutputCols, pTaskInfo, COL_MATCH_FROM_SLOT_ID);
|
||||||
|
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0);
|
||||||
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pResBlock, sortInfo, pColList, pTaskInfo);
|
SSDataBlock* pInputDataBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc);
|
||||||
|
pOptr = createMultiwaySortMergeOperatorInfo(ops, size, pInputDataBlock, pResBlock, sortInfo, pColList, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION == type) {
|
||||||
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
|
|
|
@ -84,7 +84,6 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
|
||||||
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
||||||
SArray* pColMatchInfo) {
|
SArray* pColMatchInfo) {
|
||||||
blockDataCleanup(pDataBlock);
|
blockDataCleanup(pDataBlock);
|
||||||
ASSERT(taosArrayGetSize(pColMatchInfo) == pDataBlock->info.numOfCols);
|
|
||||||
|
|
||||||
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
@ -230,7 +229,12 @@ typedef struct SMultiwaySortMergeOperatorInfo {
|
||||||
SSortHandle* pSortHandle;
|
SSortHandle* pSortHandle;
|
||||||
SArray* pColMatchInfo; // for index map from table scan output
|
SArray* pColMatchInfo; // for index map from table scan output
|
||||||
|
|
||||||
|
SSDataBlock* pInputBlock;
|
||||||
int64_t startTs; // sort start time
|
int64_t startTs; // sort start time
|
||||||
|
|
||||||
|
bool hasGroupId;
|
||||||
|
uint64_t groupId;
|
||||||
|
STupleHandle *prefetchedTuple;
|
||||||
} SMultiwaySortMergeOperatorInfo;
|
} SMultiwaySortMergeOperatorInfo;
|
||||||
|
|
||||||
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
||||||
|
@ -246,14 +250,14 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
||||||
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize;
|
||||||
|
|
||||||
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE,
|
pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, pInfo->pColMatchInfo, SORT_MULTISOURCE_MERGE,
|
||||||
pInfo->bufPageSize, numOfBufPage, NULL, pTaskInfo->id.str);
|
pInfo->bufPageSize, numOfBufPage, pInfo->pInputBlock, pTaskInfo->id.str);
|
||||||
|
|
||||||
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
|
tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock, NULL, NULL);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
|
for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
|
||||||
SSortSource ps = {0};
|
SSortSource* ps = taosMemoryCalloc(1, sizeof(SSortSource));
|
||||||
ps.param = pOperator->pDownstream[i];
|
ps->param = pOperator->pDownstream[i];
|
||||||
tsortAddSource(pInfo->pSortHandle, &ps);
|
tsortAddSource(pInfo->pSortHandle, ps);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = tsortOpen(pInfo->pSortHandle);
|
int32_t code = tsortOpen(pInfo->pSortHandle);
|
||||||
|
@ -269,6 +273,70 @@ int32_t doOpenMultiwaySortMergeOperator(SOperatorInfo* pOperator) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, int32_t capacity,
|
||||||
|
SArray* pColMatchInfo, SMultiwaySortMergeOperatorInfo* pInfo) {
|
||||||
|
blockDataCleanup(pDataBlock);
|
||||||
|
|
||||||
|
SSDataBlock* p = tsortGetSortedDataBlock(pHandle);
|
||||||
|
if (p == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(p, capacity);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
|
||||||
|
STupleHandle* pTupleHandle = NULL;
|
||||||
|
if (pInfo->prefetchedTuple == NULL) {
|
||||||
|
pTupleHandle = tsortNextTuple(pHandle);
|
||||||
|
} else {
|
||||||
|
pTupleHandle = pInfo->prefetchedTuple;
|
||||||
|
pInfo->prefetchedTuple = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTupleHandle == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t tupleGroupId = tsortGetGroupId(pTupleHandle);
|
||||||
|
if (!pInfo->hasGroupId) {
|
||||||
|
pInfo->groupId = tupleGroupId;
|
||||||
|
pInfo->hasGroupId = true;
|
||||||
|
appendOneRowToDataBlock(p, pTupleHandle);
|
||||||
|
} else if (pInfo->groupId == tupleGroupId) {
|
||||||
|
appendOneRowToDataBlock(p, pTupleHandle);
|
||||||
|
} else {
|
||||||
|
pInfo->prefetchedTuple = pTupleHandle;
|
||||||
|
pInfo->groupId = tupleGroupId;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p->info.rows >= capacity) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
if (p->info.rows > 0) {
|
||||||
|
int32_t numOfCols = taosArrayGetSize(pColMatchInfo);
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
SColMatchInfo* pmInfo = taosArrayGet(pColMatchInfo, i);
|
||||||
|
ASSERT(pmInfo->matchType == COL_MATCH_FROM_SLOT_ID);
|
||||||
|
|
||||||
|
SColumnInfoData* pSrc = taosArrayGet(p->pDataBlock, pmInfo->srcSlotId);
|
||||||
|
SColumnInfoData* pDst = taosArrayGet(pDataBlock->pDataBlock, pmInfo->targetSlotId);
|
||||||
|
colDataAssign(pDst, pSrc, p->info.rows);
|
||||||
|
}
|
||||||
|
|
||||||
|
pDataBlock->info.rows = p->info.rows;
|
||||||
|
pDataBlock->info.capacity = p->info.rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
blockDataDestroy(p);
|
||||||
|
return (pDataBlock->info.rows > 0) ? pDataBlock : NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -283,7 +351,11 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* pBlock =
|
SSDataBlock* pBlock =
|
||||||
getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pOperator->resultInfo.capacity, pInfo->pColMatchInfo);
|
getMultiwaySortedBlockData(pInfo->pSortHandle,
|
||||||
|
pInfo->binfo.pRes,
|
||||||
|
pOperator->resultInfo.capacity,
|
||||||
|
pInfo->pColMatchInfo,
|
||||||
|
pInfo);
|
||||||
|
|
||||||
if (pBlock != NULL) {
|
if (pBlock != NULL) {
|
||||||
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
pOperator->resultInfo.totalRows += pBlock->info.rows;
|
||||||
|
@ -296,6 +368,7 @@ SSDataBlock* doMultiwaySortMerge(SOperatorInfo* pOperator) {
|
||||||
void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
void destroyMultiwaySortMergeOperatorInfo(void* param, int32_t numOfOutput) {
|
||||||
SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param;
|
SMultiwaySortMergeOperatorInfo * pInfo = (SMultiwaySortMergeOperatorInfo*)param;
|
||||||
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
pInfo->binfo.pRes = blockDataDestroy(pInfo->binfo.pRes);
|
||||||
|
pInfo->pInputBlock = blockDataDestroy(pInfo->pInputBlock);
|
||||||
|
|
||||||
taosArrayDestroy(pInfo->pSortInfo);
|
taosArrayDestroy(pInfo->pSortInfo);
|
||||||
taosArrayDestroy(pInfo->pColMatchInfo);
|
taosArrayDestroy(pInfo->pColMatchInfo);
|
||||||
|
@ -313,7 +386,7 @@ int32_t getMultiwaySortMergeExplainExecInfo(SOperatorInfo* pOptr, void** pOptrEx
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams,
|
SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, int32_t numStreams, SSDataBlock* pInputBlock,
|
||||||
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
|
SSDataBlock* pResBlock, SArray* pSortInfo, SArray* pColMatchColInfo,
|
||||||
SExecTaskInfo* pTaskInfo) {
|
SExecTaskInfo* pTaskInfo) {
|
||||||
SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo));
|
SMultiwaySortMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SMultiwaySortMergeOperatorInfo));
|
||||||
|
@ -330,6 +403,7 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams,
|
||||||
|
|
||||||
pInfo->pSortInfo = pSortInfo;
|
pInfo->pSortInfo = pSortInfo;
|
||||||
pInfo->pColMatchInfo = pColMatchColInfo;
|
pInfo->pColMatchInfo = pColMatchColInfo;
|
||||||
|
pInfo->pInputBlock = pInputBlock;
|
||||||
pOperator->name = "MultiwaySortMerge";
|
pOperator->name = "MultiwaySortMerge";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_MERGE;
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
|
|
|
@ -225,6 +225,10 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int
|
||||||
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
|
for (int32_t i = 0; i < cmpParam->numOfSources; ++i) {
|
||||||
SSortSource* pSource = cmpParam->pSources[i];
|
SSortSource* pSource = cmpParam->pSources[i];
|
||||||
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
|
pSource->src.pBlock = pHandle->fetchfp(pSource->param);
|
||||||
|
if (pSource->src.pBlock == NULL) {
|
||||||
|
pSource->src.rowIndex = -1;
|
||||||
|
++pHandle->numOfCompletedSources;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -361,13 +365,21 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) {
|
||||||
|
|
||||||
bool leftNull = false;
|
bool leftNull = false;
|
||||||
if (pLeftColInfoData->hasNull) {
|
if (pLeftColInfoData->hasNull) {
|
||||||
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[pOrder->slotId]);
|
if (pLeftBlock->pBlockAgg == NULL) {
|
||||||
|
leftNull = colDataIsNull_s(pLeftColInfoData, pLeftSource->src.rowIndex);
|
||||||
|
} else {
|
||||||
|
leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
|
SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId);
|
||||||
bool rightNull = false;
|
bool rightNull = false;
|
||||||
if (pRightColInfoData->hasNull) {
|
if (pRightColInfoData->hasNull) {
|
||||||
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[pOrder->slotId]);
|
if (pLeftBlock->pBlockAgg == NULL) {
|
||||||
|
rightNull = colDataIsNull_s(pRightColInfoData, pRightSource->src.rowIndex);
|
||||||
|
} else {
|
||||||
|
rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg[i]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (leftNull && rightNull) {
|
if (leftNull && rightNull) {
|
||||||
|
@ -408,7 +420,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
||||||
|
|
||||||
pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
|
pHandle->totalElapsed = taosGetTimestampUs() - pHandle->startTs;
|
||||||
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64,
|
qDebug("%s %d rounds mergesort required to complete the sort, first-round sorted data size:%"PRIzu", sort elapsed:%"PRId64", total elapsed:%"PRId64,
|
||||||
pHandle->idStr, (int32_t) (sortPass + 1), getTotalBufSize(pHandle->pBuf), pHandle->sortElapsed, pHandle->totalElapsed);
|
pHandle->idStr, (int32_t) (sortPass + 1), pHandle->pBuf ? getTotalBufSize(pHandle->pBuf) : 0, pHandle->sortElapsed, pHandle->totalElapsed);
|
||||||
|
|
||||||
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
|
int32_t numOfRows = blockDataGetCapacityInRow(pHandle->pDataBlock, pHandle->pageSize);
|
||||||
blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
|
blockDataEnsureCapacity(pHandle->pDataBlock, numOfRows);
|
||||||
|
@ -697,6 +709,10 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
|
||||||
return colDataGetData(pColInfo, pVHandle->rowIndex);
|
return colDataGetData(pColInfo, pVHandle->rowIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t tsortGetGroupId(STupleHandle* pVHandle) {
|
||||||
|
return pVHandle->pBlock->info.groupId;
|
||||||
|
}
|
||||||
|
|
||||||
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
|
SSortExecInfo tsortGetSortExecInfo(SSortHandle* pHandle) {
|
||||||
SSortExecInfo info = {0};
|
SSortExecInfo info = {0};
|
||||||
|
|
||||||
|
|
|
@ -23,9 +23,13 @@ extern "C" {
|
||||||
#include "function.h"
|
#include "function.h"
|
||||||
#include "functionMgt.h"
|
#include "functionMgt.h"
|
||||||
|
|
||||||
|
bool dummyGetEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* UNUSED_PARAM(pEnv));
|
||||||
|
bool dummyInit(SqlFunctionCtx* UNUSED_PARAM(pCtx), SResultRowEntryInfo* UNUSED_PARAM(pResultInfo));
|
||||||
|
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx));
|
||||||
|
int32_t dummyFinalize(SqlFunctionCtx* UNUSED_PARAM(pCtx), SSDataBlock* UNUSED_PARAM(pBlock));
|
||||||
|
|
||||||
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool functionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t functionFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx));
|
|
||||||
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult);
|
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult);
|
||||||
int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
int32_t combineFunction(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx);
|
||||||
|
|
||||||
|
@ -74,10 +78,13 @@ bool percentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultI
|
||||||
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
int32_t percentileFunction(SqlFunctionCtx *pCtx);
|
||||||
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t percentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
int32_t getApercentileMaxSize();
|
||||||
bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getApercentileFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
bool apercentileFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResultInfo);
|
||||||
int32_t apercentileFunction(SqlFunctionCtx *pCtx);
|
int32_t apercentileFunction(SqlFunctionCtx *pCtx);
|
||||||
|
int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx);
|
||||||
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock);
|
||||||
|
|
||||||
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
bool getDiffFuncEnv(struct SFunctionNode* pFunc, SFuncExecEnv* pEnv);
|
||||||
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
bool diffFunctionSetup(SqlFunctionCtx *pCtx, SResultRowEntryInfo* pResInfo);
|
||||||
|
|
|
@ -251,6 +251,73 @@ static int32_t translateApercentile(SFunctionNode* pFunc, char* pErrBuf, int32_t
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t translateApercentileImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
|
||||||
|
int32_t numOfParams = LIST_LENGTH(pFunc->pParameterList);
|
||||||
|
|
||||||
|
if (isPartial) {
|
||||||
|
if (2 != numOfParams && 3 != numOfParams) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
// param1
|
||||||
|
SNode* pParamNode1 = nodesListGetNode(pFunc->pParameterList, 1);
|
||||||
|
if (nodeType(pParamNode1) != QUERY_NODE_VALUE) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SValueNode* pValue = (SValueNode*)pParamNode1;
|
||||||
|
if (pValue->datum.i < 0 || pValue->datum.i > 100) {
|
||||||
|
return invaildFuncParaValueErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pValue->notReserved = true;
|
||||||
|
|
||||||
|
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
uint8_t para2Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 1))->resType.type;
|
||||||
|
if (!IS_NUMERIC_TYPE(para1Type) || !IS_INTEGER_TYPE(para2Type)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
// param2
|
||||||
|
if (3 == numOfParams) {
|
||||||
|
uint8_t para3Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 2))->resType.type;
|
||||||
|
if (!IS_VAR_DATA_TYPE(para3Type)) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
SNode* pParamNode2 = nodesListGetNode(pFunc->pParameterList, 2);
|
||||||
|
if (QUERY_NODE_VALUE != nodeType(pParamNode2) || !validateApercentileAlgo((SValueNode*)pParamNode2)) {
|
||||||
|
return buildFuncErrMsg(pErrBuf, len, TSDB_CODE_FUNC_FUNTION_ERROR,
|
||||||
|
"Third parameter algorithm of apercentile must be 'default' or 't-digest'");
|
||||||
|
}
|
||||||
|
|
||||||
|
pValue = (SValueNode*)pParamNode2;
|
||||||
|
pValue->notReserved = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = getApercentileMaxSize() + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY};
|
||||||
|
} else {
|
||||||
|
if (1 != numOfParams) {
|
||||||
|
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
uint8_t para1Type = ((SExprNode*)nodesListGetNode(pFunc->pParameterList, 0))->resType.type;
|
||||||
|
if (TSDB_DATA_TYPE_BINARY != para1Type) {
|
||||||
|
return invaildFuncParaTypeErrMsg(pErrBuf, len, pFunc->functionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
pFunc->node.resType = (SDataType){.bytes = tDataTypes[TSDB_DATA_TYPE_DOUBLE].bytes, .type = TSDB_DATA_TYPE_DOUBLE};
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t translateApercentilePartial(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
return translateApercentileImpl(pFunc, pErrBuf, len, true);
|
||||||
|
}
|
||||||
|
static int32_t translateApercentileMerge(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
|
return translateApercentileImpl(pFunc, pErrBuf, len, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
static int32_t translateTbnameColumn(SFunctionNode* pFunc, char* pErrBuf, int32_t len) {
|
||||||
// pseudo column do not need to check parameters
|
// pseudo column do not need to check parameters
|
||||||
pFunc->node.resType =
|
pFunc->node.resType =
|
||||||
|
@ -1057,8 +1124,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.finalizeFunc = functionFinalize,
|
.finalizeFunc = functionFinalize,
|
||||||
.invertFunc = countInvertFunction,
|
.invertFunc = countInvertFunction,
|
||||||
.combineFunc = combineFunction,
|
.combineFunc = combineFunction,
|
||||||
// .pPartialFunc = "count",
|
.pPartialFunc = "count",
|
||||||
// .pMergeFunc = "sum"
|
.pMergeFunc = "sum"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "sum",
|
.name = "sum",
|
||||||
|
@ -1072,6 +1139,8 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.finalizeFunc = functionFinalize,
|
.finalizeFunc = functionFinalize,
|
||||||
.invertFunc = sumInvertFunction,
|
.invertFunc = sumInvertFunction,
|
||||||
.combineFunc = sumCombine,
|
.combineFunc = sumCombine,
|
||||||
|
.pPartialFunc = "sum",
|
||||||
|
.pMergeFunc = "sum"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "min",
|
.name = "min",
|
||||||
|
@ -1083,7 +1152,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.initFunc = minmaxFunctionSetup,
|
.initFunc = minmaxFunctionSetup,
|
||||||
.processFunc = minFunction,
|
.processFunc = minFunction,
|
||||||
.finalizeFunc = minmaxFunctionFinalize,
|
.finalizeFunc = minmaxFunctionFinalize,
|
||||||
.combineFunc = minCombine
|
.combineFunc = minCombine,
|
||||||
|
.pPartialFunc = "min",
|
||||||
|
.pMergeFunc = "min"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "max",
|
.name = "max",
|
||||||
|
@ -1095,7 +1166,9 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.initFunc = minmaxFunctionSetup,
|
.initFunc = minmaxFunctionSetup,
|
||||||
.processFunc = maxFunction,
|
.processFunc = maxFunction,
|
||||||
.finalizeFunc = minmaxFunctionFinalize,
|
.finalizeFunc = minmaxFunctionFinalize,
|
||||||
.combineFunc = maxCombine
|
.combineFunc = maxCombine,
|
||||||
|
.pPartialFunc = "max",
|
||||||
|
.pMergeFunc = "max"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
.name = "stddev",
|
.name = "stddev",
|
||||||
|
@ -1151,6 +1224,28 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = {
|
||||||
.getEnvFunc = getApercentileFuncEnv,
|
.getEnvFunc = getApercentileFuncEnv,
|
||||||
.initFunc = apercentileFunctionSetup,
|
.initFunc = apercentileFunctionSetup,
|
||||||
.processFunc = apercentileFunction,
|
.processFunc = apercentileFunction,
|
||||||
|
.finalizeFunc = apercentileFinalize,
|
||||||
|
.pPartialFunc = "_apercentile_partial",
|
||||||
|
.pMergeFunc = "_apercentile_merge"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_apercentile_partial",
|
||||||
|
.type = FUNCTION_TYPE_APERCENTILE_PARTIAL,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateApercentilePartial,
|
||||||
|
.getEnvFunc = getApercentileFuncEnv,
|
||||||
|
.initFunc = apercentileFunctionSetup,
|
||||||
|
.processFunc = apercentileFunction,
|
||||||
|
.finalizeFunc = apercentilePartialFinalize
|
||||||
|
},
|
||||||
|
{
|
||||||
|
.name = "_apercentile_merge",
|
||||||
|
.type = FUNCTION_TYPE_APERCENTILE_MERGE,
|
||||||
|
.classification = FUNC_MGT_AGG_FUNC,
|
||||||
|
.translateFunc = translateApercentileMerge,
|
||||||
|
.getEnvFunc = getApercentileFuncEnv,
|
||||||
|
.initFunc = functionSetup,
|
||||||
|
.processFunc = apercentileFunctionMerge,
|
||||||
.finalizeFunc = apercentileFinalize
|
.finalizeFunc = apercentileFinalize
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
|
|
|
@ -100,6 +100,7 @@ typedef struct SPercentileInfo {
|
||||||
|
|
||||||
typedef struct SAPercentileInfo {
|
typedef struct SAPercentileInfo {
|
||||||
double result;
|
double result;
|
||||||
|
double percent;
|
||||||
int8_t algo;
|
int8_t algo;
|
||||||
SHistogramInfo *pHisto;
|
SHistogramInfo *pHisto;
|
||||||
TDigest *pTDigest;
|
TDigest *pTDigest;
|
||||||
|
@ -283,6 +284,22 @@ typedef struct SUniqueInfo {
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
bool dummyGetEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* UNUSED_PARAM(pEnv)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool dummyInit(SqlFunctionCtx* UNUSED_PARAM(pCtx), SResultRowEntryInfo* UNUSED_PARAM(pResultInfo)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t dummyFinalize(SqlFunctionCtx* UNUSED_PARAM(pCtx), SSDataBlock* UNUSED_PARAM(pBlock)) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
bool functionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
if (pResultInfo->initialized) {
|
if (pResultInfo->initialized) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -327,10 +344,6 @@ int32_t firstCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dummyProcess(SqlFunctionCtx* UNUSED_PARAM(pCtx)) {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) {
|
int32_t functionFinalizeWithResultBuf(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, char* finalResult) {
|
||||||
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
@ -1974,6 +1987,12 @@ bool getApercentileFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t getApercentileMaxSize() {
|
||||||
|
int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||||
|
int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
|
||||||
|
return TMAX(bytesHist, bytesDigest);
|
||||||
|
}
|
||||||
|
|
||||||
static int8_t getApercentileAlgo(char *algoStr) {
|
static int8_t getApercentileAlgo(char *algoStr) {
|
||||||
int8_t algoType;
|
int8_t algoType;
|
||||||
if (strcasecmp(algoStr, "default") == 0) {
|
if (strcasecmp(algoStr, "default") == 0) {
|
||||||
|
@ -1992,12 +2011,20 @@ static void buildHistogramInfo(SAPercentileInfo* pInfo) {
|
||||||
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
|
pInfo->pHisto->elems = (SHistBin*) ((char*)pInfo->pHisto + sizeof(SHistogramInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void buildTDigestInfo(SAPercentileInfo* pInfo) {
|
||||||
|
pInfo->pTDigest = (TDigest*)((char*)pInfo + sizeof(SAPercentileInfo));
|
||||||
|
}
|
||||||
|
|
||||||
bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
bool apercentileFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) {
|
||||||
if (!functionSetup(pCtx, pResultInfo)) {
|
if (!functionSetup(pCtx, pResultInfo)) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResultInfo);
|
||||||
|
|
||||||
|
SVariant* pVal = &pCtx->param[1].param;
|
||||||
|
pInfo->percent = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d;
|
||||||
|
|
||||||
if (pCtx->numOfParams == 2) {
|
if (pCtx->numOfParams == 2) {
|
||||||
pInfo->algo = APERCT_ALGO_DEFAULT;
|
pInfo->algo = APERCT_ALGO_DEFAULT;
|
||||||
} else if (pCtx->numOfParams == 3) {
|
} else if (pCtx->numOfParams == 3) {
|
||||||
|
@ -2062,23 +2089,87 @@ int32_t apercentileFunction(SqlFunctionCtx* pCtx) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
int32_t apercentileFunctionMerge(SqlFunctionCtx* pCtx) {
|
||||||
SVariant* pVal = &pCtx->param[1].param;
|
int32_t numOfElems = 0;
|
||||||
double percent = (pVal->nType == TSDB_DATA_TYPE_BIGINT) ? pVal->i : pVal->d;
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
|
||||||
|
SInputColumnInfoData* pInput = &pCtx->input;
|
||||||
|
|
||||||
|
SColumnInfoData* pCol = pInput->pData[0];
|
||||||
|
ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY);
|
||||||
|
|
||||||
|
SAPercentileInfo* pInfo = GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
SAPercentileInfo* pInputInfo;
|
||||||
|
|
||||||
|
int32_t start = pInput->startRowIndex;
|
||||||
|
for (int32_t i = start; i < pInput->numOfRows + start; ++i) {
|
||||||
|
//if (colDataIsNull_s(pCol, i)) {
|
||||||
|
// continue;
|
||||||
|
//}
|
||||||
|
numOfElems += 1;
|
||||||
|
char* data = colDataGetData(pCol, i);
|
||||||
|
|
||||||
|
pInputInfo = (SAPercentileInfo *)varDataVal(data);
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->percent = pInputInfo->percent;
|
||||||
|
pInfo->algo = pInputInfo->algo;
|
||||||
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
|
buildTDigestInfo(pInputInfo);
|
||||||
|
tdigestAutoFill(pInputInfo->pTDigest, COMPRESSION);
|
||||||
|
|
||||||
|
if(pInputInfo->pTDigest->num_centroids == 0 && pInputInfo->pTDigest->num_buffered_pts == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
buildTDigestInfo(pInfo);
|
||||||
|
TDigest *pTDigest = pInfo->pTDigest;
|
||||||
|
|
||||||
|
if(pTDigest->num_centroids <= 0) {
|
||||||
|
memcpy(pTDigest, pInputInfo->pTDigest, (size_t)TDIGEST_SIZE(COMPRESSION));
|
||||||
|
tdigestAutoFill(pTDigest, COMPRESSION);
|
||||||
|
} else {
|
||||||
|
tdigestMerge(pTDigest, pInputInfo->pTDigest);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
buildHistogramInfo(pInputInfo);
|
||||||
|
if (pInputInfo->pHisto->numOfElems <= 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
buildHistogramInfo(pInfo);
|
||||||
|
SHistogramInfo *pHisto = pInfo->pHisto;
|
||||||
|
|
||||||
|
if (pHisto->numOfElems <= 0) {
|
||||||
|
memcpy(pHisto, pInputInfo->pHisto, sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||||
|
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
||||||
|
} else {
|
||||||
|
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
||||||
|
SHistogramInfo *pRes = tHistogramMerge(pHisto, pInputInfo->pHisto, MAX_HISTOGRAM_BIN);
|
||||||
|
memcpy(pHisto, pRes, sizeof(SHistogramInfo) + sizeof(SHistBin) * MAX_HISTOGRAM_BIN);
|
||||||
|
pHisto->elems = (SHistBin*) ((char *)pHisto + sizeof(SHistogramInfo));
|
||||||
|
tHistogramDestroy(&pRes);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SET_VAL(pResInfo, numOfElems, 1);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
if (pInfo->pTDigest->size > 0) {
|
if (pInfo->pTDigest->size > 0) {
|
||||||
pInfo->result = tdigestQuantile(pInfo->pTDigest, percent/100);
|
pInfo->result = tdigestQuantile(pInfo->pTDigest, pInfo->percent / 100);
|
||||||
} else { // no need to free
|
} else { // no need to free
|
||||||
//setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
|
//setNull(pCtx->pOutput, pCtx->outputType, pCtx->outputBytes);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (pInfo->pHisto->numOfElems > 0) {
|
if (pInfo->pHisto->numOfElems > 0) {
|
||||||
double ratio[] = {percent};
|
double ratio[] = {pInfo->percent};
|
||||||
double *res = tHistogramUniform(pInfo->pHisto, ratio, 1);
|
double *res = tHistogramUniform(pInfo->pHisto, ratio, 1);
|
||||||
pInfo->result = *res;
|
pInfo->result = *res;
|
||||||
//memcpy(pCtx->pOutput, res, sizeof(double));
|
//memcpy(pCtx->pOutput, res, sizeof(double));
|
||||||
|
@ -2092,6 +2183,40 @@ int32_t apercentileFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
return functionFinalize(pCtx, pBlock);
|
return functionFinalize(pCtx, pBlock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t apercentilePartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) {
|
||||||
|
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||||
|
SAPercentileInfo* pInfo = (SAPercentileInfo*)GET_ROWCELL_INTERBUF(pResInfo);
|
||||||
|
|
||||||
|
int32_t bytesHist = (int32_t)(sizeof(SAPercentileInfo) + sizeof(SHistogramInfo) + sizeof(SHistBin) * (MAX_HISTOGRAM_BIN + 1));
|
||||||
|
int32_t bytesDigest = (int32_t)(sizeof(SAPercentileInfo) + TDIGEST_SIZE(COMPRESSION));
|
||||||
|
int32_t resultBytes = TMAX(bytesHist, bytesDigest);
|
||||||
|
char *tmp = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char));
|
||||||
|
|
||||||
|
if (pInfo->algo == APERCT_ALGO_TDIGEST) {
|
||||||
|
if (pInfo->pTDigest->size > 0) {
|
||||||
|
memcpy(varDataVal(tmp), pInfo, resultBytes);
|
||||||
|
varDataSetLen(tmp, resultBytes);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pInfo->pHisto->numOfElems > 0) {
|
||||||
|
memcpy(varDataVal(tmp), pInfo, resultBytes);
|
||||||
|
varDataSetLen(tmp, resultBytes);
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t slotId = pCtx->pExpr->base.resSchema.slotId;
|
||||||
|
SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
|
||||||
|
|
||||||
|
colDataAppend(pCol, pBlock->info.rows, tmp, false);
|
||||||
|
|
||||||
|
taosMemoryFree(tmp);
|
||||||
|
return pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
|
||||||
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
bool getFirstLastFuncEnv(SFunctionNode* pFunc, SFuncExecEnv* pEnv) {
|
||||||
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
SColumnNode* pNode = nodesListGetNode(pFunc->pParameterList, 0);
|
||||||
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
|
pEnv->calcMemSize = pNode->node.resType.bytes + sizeof(int64_t);
|
||||||
|
|
|
@ -166,8 +166,8 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
||||||
}
|
}
|
||||||
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
return !stbSplHasGatherExecFunc(pWindow->pFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||||
}
|
}
|
||||||
// case QUERY_NODE_LOGIC_PLAN_SORT:
|
case QUERY_NODE_LOGIC_PLAN_SORT:
|
||||||
// return stbSplHasMultiTbScan(streamQuery, pNode);
|
return stbSplHasMultiTbScan(streamQuery, pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
case QUERY_NODE_LOGIC_PLAN_SCAN:
|
||||||
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
|
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -42,14 +42,12 @@ sql explain select count(*),sum(f1) from tb1;
|
||||||
sql explain select count(*),sum(f1) from st1;
|
sql explain select count(*),sum(f1) from st1;
|
||||||
sql explain select count(*),sum(f1) from st1 group by f1;
|
sql explain select count(*),sum(f1) from st1 group by f1;
|
||||||
#sql explain select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
#sql explain select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
||||||
sql explain select min(f1) from st1 interval(1m, 2a) sliding(30s);
|
|
||||||
|
|
||||||
print ======== step3
|
print ======== step3
|
||||||
sql explain verbose true select * from st1 where -2;
|
sql explain verbose true select * from st1 where -2;
|
||||||
sql explain verbose true select ts from tb1 where f1 > 0;
|
sql explain verbose true select ts from tb1 where f1 > 0;
|
||||||
sql explain verbose true select * from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
|
sql explain verbose true select * from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
|
||||||
sql explain verbose true select * from information_schema.user_stables where db_name='db2';
|
sql explain verbose true select * from information_schema.user_stables where db_name='db2';
|
||||||
sql explain verbose true select count(*),sum(f1) from st1 where f1 > 0 and ts > '2021-10-31 00:00:00' group by f1 having sum(f1) > 0;
|
|
||||||
|
|
||||||
print ======== step4
|
print ======== step4
|
||||||
sql explain analyze select ts from st1 where -2;
|
sql explain analyze select ts from st1 where -2;
|
||||||
|
@ -61,8 +59,6 @@ sql explain analyze select * from information_schema.user_stables;
|
||||||
sql explain analyze select count(*),sum(f1) from tb1;
|
sql explain analyze select count(*),sum(f1) from tb1;
|
||||||
sql explain analyze select count(*),sum(f1) from st1;
|
sql explain analyze select count(*),sum(f1) from st1;
|
||||||
sql explain analyze select count(*),sum(f1) from st1 group by f1;
|
sql explain analyze select count(*),sum(f1) from st1 group by f1;
|
||||||
#sql explain analyze select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
|
||||||
sql explain analyze select min(f1) from st1 interval(3m, 2a) sliding(1m);
|
|
||||||
|
|
||||||
print ======== step5
|
print ======== step5
|
||||||
sql explain analyze verbose true select ts from st1 where -2;
|
sql explain analyze verbose true select ts from st1 where -2;
|
||||||
|
@ -78,8 +74,6 @@ sql explain analyze verbose true select count(*),sum(f1) from st1 group by f1;
|
||||||
sql explain analyze verbose true select ts from tb1 where f1 > 0;
|
sql explain analyze verbose true select ts from tb1 where f1 > 0;
|
||||||
sql explain analyze verbose true select f1 from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
|
sql explain analyze verbose true select f1 from st1 where f1 > 0 and ts > '2020-10-31 00:00:00' and ts < '2021-10-31 00:00:00';
|
||||||
sql explain analyze verbose true select * from information_schema.user_stables where db_name='db2';
|
sql explain analyze verbose true select * from information_schema.user_stables where db_name='db2';
|
||||||
sql explain analyze verbose true select count(*),sum(f1) from st1 where f1 > 0 and ts > '2021-10-31 00:00:00' group by f1 having sum(f1) > 0;
|
|
||||||
sql explain analyze verbose true select min(f1) from st1 interval(3m, 2a) sliding(1m);
|
|
||||||
sql explain analyze verbose true select * from (select min(f1),count(*) a from st1 where f1 > 0) where a < 0;
|
sql explain analyze verbose true select * from (select min(f1),count(*) a from st1 where f1 > 0) where a < 0;
|
||||||
|
|
||||||
#not pass case
|
#not pass case
|
||||||
|
@ -93,6 +87,12 @@ sql explain analyze verbose true select * from (select min(f1),count(*) a from s
|
||||||
#sql explain select * from tb1, tb2 where tb1.ts=tb2.ts;
|
#sql explain select * from tb1, tb2 where tb1.ts=tb2.ts;
|
||||||
#sql explain select * from st1, st2 where tb1.ts=tb2.ts;
|
#sql explain select * from st1, st2 where tb1.ts=tb2.ts;
|
||||||
#sql explain analyze verbose true select sum(a+b) from (select _rowts, min(f1) b,count(*) a from st1 where f1 > 0 interval(1a)) where a < 0 interval(1s);
|
#sql explain analyze verbose true select sum(a+b) from (select _rowts, min(f1) b,count(*) a from st1 where f1 > 0 interval(1a)) where a < 0 interval(1s);
|
||||||
|
#sql explain select min(f1) from st1 interval(1m, 2a) sliding(30s);
|
||||||
|
#sql explain verbose true select count(*),sum(f1) from st1 where f1 > 0 and ts > '2021-10-31 00:00:00' group by f1 having sum(f1) > 0;
|
||||||
|
#sql explain analyze select min(f1) from st1 interval(3m, 2a) sliding(1m);
|
||||||
|
#sql explain analyze select count(f1) from tb1 interval(10s, 2s) sliding(3s) fill(prev);
|
||||||
|
#sql explain analyze verbose true select count(*),sum(f1) from st1 where f1 > 0 and ts > '2021-10-31 00:00:00' group by f1 having sum(f1) > 0;
|
||||||
|
#sql explain analyze verbose true select min(f1) from st1 interval(3m, 2a) sliding(1m);
|
||||||
|
|
||||||
|
|
||||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||||
|
|
|
@ -66,6 +66,7 @@ if $rows != 0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
sql select * from tb1 where null;
|
sql select * from tb1 where null;
|
||||||
|
print $rows
|
||||||
if $rows != 0 then
|
if $rows != 0 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
|
@ -470,8 +470,8 @@ class TDTestCase:
|
||||||
tdSql.checkData(10, 1, '"femail"')
|
tdSql.checkData(10, 1, '"femail"')
|
||||||
|
|
||||||
# test having
|
# test having
|
||||||
tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' having count(*) > 1")
|
#tdSql.query("select count(*),jtag->'tag1' from jsons1 group by jtag->'tag1' having count(*) > 1")
|
||||||
tdSql.checkRows(3)
|
#tdSql.checkRows(3)
|
||||||
|
|
||||||
# subquery with json tag
|
# subquery with json tag
|
||||||
tdSql.query("select * from (select jtag, dataint from jsons1) order by dataint")
|
tdSql.query("select * from (select jtag, dataint from jsons1) order by dataint")
|
||||||
|
|
Loading…
Reference in New Issue