diff --git a/2.0/src/query/inc/qExecutor.h b/2.0/src/query/inc/qExecutor.h index 970b826303..967101fb41 100644 --- a/2.0/src/query/inc/qExecutor.h +++ b/2.0/src/query/inc/qExecutor.h @@ -574,11 +574,11 @@ typedef struct SMultiwayMergeInfo { } SMultiwayMergeInfo; // todo support the disk-based sort -typedef struct SOrderOperatorInfo { +typedef struct SSortOperatorInfo { int32_t colIndex; int32_t order; SSDataBlock *pDataBlock; -} SOrderOperatorInfo; +} SSortOperatorInfo; void appendUpstream(SOperatorInfo* p, SOperatorInfo* pUpstream); @@ -609,7 +609,7 @@ SOperatorInfo* createFilterOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperator int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pUpstream, int32_t numOfUpstream, SSchema* pSchema, int32_t numOfOutput); -SOperatorInfo* createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal); +SOperatorInfo* createSortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal); SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); diff --git a/2.0/src/query/src/qExecutor.c b/2.0/src/query/src/qExecutor.c index 490584c75a..1e879c3912 100644 --- a/2.0/src/query/src/qExecutor.c +++ b/2.0/src/query/src/qExecutor.c @@ -2301,7 +2301,7 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf } case OP_Order: { - pRuntimeEnv->proot = createOrderOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); + pRuntimeEnv->proot = createSortOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput, &pQueryAttr->order); break; } @@ -5516,7 +5516,7 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { return NULL; } - SOrderOperatorInfo* pInfo = pOperator->info; + SSortOperatorInfo* pInfo = pOperator->info; SSDataBlock* pBlock = NULL; while(1) { @@ -5556,8 +5556,8 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; } -SOperatorInfo *createOrderOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) { - SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); +SOperatorInfo *createSortOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput, SOrderVal* pOrderVal) { + SSortOperatorInfo* pInfo = calloc(1, sizeof(SSortOperatorInfo)); { SSDataBlock* pDataBlock = calloc(1, sizeof(SSDataBlock)); @@ -6611,7 +6611,7 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { } static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { - SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; + SSortOperatorInfo* pInfo = (SSortOperatorInfo*) param; pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); } diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 2d5fba7368..4a47acfa50 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -29,8 +29,9 @@ typedef struct SCorEpSet { } SCorEpSet; typedef struct SBlockOrderInfo { + bool nullFirst; int32_t order; - int32_t colIndex; + int32_t slotId; SColumnInfoData* pColData; } SBlockOrderInfo; @@ -176,7 +177,7 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock); SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols); -int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); +int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo); int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst); int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index d2a5f1ab74..7a958136e7 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -647,7 +647,6 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { typedef struct SSDataBlockSortHelper { SArray* orderInfo; // SArray SSDataBlock* pDataBlock; - bool nullFirst; } SSDataBlockSortHelper; int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { @@ -672,11 +671,11 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { } if (rightNull) { - return pHelper->nullFirst ? 1 : -1; + return pOrder->nullFirst ? 1 : -1; } if (leftNull) { - return pHelper->nullFirst ? -1 : 1; + return pOrder->nullFirst ? -1 : 1; } } @@ -907,7 +906,7 @@ static __compar_fn_t getComparFn(int32_t type, int32_t order) { } } -int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) { +int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo) { ASSERT(pDataBlock != NULL && pOrderInfo != NULL); if (pDataBlock->info.rows <= 1) { return TSDB_CODE_SUCCESS; @@ -922,7 +921,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs for (int32_t i = 0; i < taosArrayGetSize(pOrderInfo); ++i) { SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); - SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->colIndex); + SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId); if (pColInfoData->hasNull) { sortColumnHasNull = true; } @@ -961,10 +960,10 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs int64_t p0 = taosGetTimestampUs(); - SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; + SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) { struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i); - pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->colIndex); + pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->slotId); } taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar); @@ -1012,7 +1011,7 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* for (int32_t i = 0; i < numOfCols; ++i) { SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->slotId); pInfo->pColData = pColInfo; sortValLengthPerRow += pColInfo->info.bytes; } @@ -1106,7 +1105,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF // Allocate the additional buffer. int64_t p0 = taosGetTimestampUs(); - SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; + SSDataBlockSortHelper helper = {.pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; uint32_t rows = pDataBlock->info.rows; SHelper* index = createTupleIndex_rv(rows, helper.orderInfo, pDataBlock); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index fe48cc21e4..957678e953 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -593,8 +593,7 @@ typedef struct SSortedMergeOperatorInfo { SOptrBasicInfo binfo; bool hasVarCol; - SArray *orderInfo; // SArray - bool nullFirst; + SArray* pSortInfo; int32_t numOfSources; SSortHandle *pSortHandle; @@ -613,12 +612,10 @@ typedef struct SSortedMergeOperatorInfo { SAggSupporter aggSup; } SSortedMergeOperatorInfo; -typedef struct SOrderOperatorInfo { +typedef struct SSortOperatorInfo { uint32_t sortBufSize; // max buffer size for in-memory sort SSDataBlock *pDataBlock; - bool hasVarCol; // has variable length column, such as binary/varchar/nchar - SArray *orderInfo; - bool nullFirst; + SArray* pSortInfo; SSortHandle *pSortHandle; int32_t bufPageSize; int32_t numOfRowsInRes; @@ -629,7 +626,7 @@ typedef struct SOrderOperatorInfo { uint64_t totalSize; // total load bytes from remote uint64_t totalRows; // total number of rows uint64_t totalElapsed; // total elapsed time -} SOrderOperatorInfo; +} SSortOperatorInfo; typedef struct SDistinctDataInfo { int32_t index; @@ -655,8 +652,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, SSDataBlock* pResBlock, const SName* pName, SNode* pCondition, SEpSet epset, SArray* colList, SExecTaskInfo* pTaskInfo, bool showRewrite, int32_t accountId); SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/inc/tsort.h b/source/libs/executor/inc/tsort.h index ef7af2b4e3..8971ee33d3 100644 --- a/source/libs/executor/inc/tsort.h +++ b/source/libs/executor/inc/tsort.h @@ -63,7 +63,7 @@ typedef int32_t (*_sort_merge_compar_fn_t)(const void* p1, const void* p2, void* * @param type * @return */ -SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr); +SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr); /** * diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index f0cffafca2..2c6468a13f 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -19,8 +19,6 @@ #include "executil.h" #include "executorimpl.h" -//#include "queryLog.h" -#include "tbuffer.h" #include "tcompression.h" #include "tlosertree.h" diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f38888440c..266f910eb5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -322,26 +322,6 @@ static void sortGroupResByOrderList(SGroupResInfo *pGroupResInfo, STaskRuntimeEn } //setup the output buffer for each operator -SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows) { - const static int32_t minSize = 8; - - SSDataBlock *res = taosMemoryCalloc(1, sizeof(SSDataBlock)); - res->info.numOfCols = numOfOutput; - res->pDataBlock = taosArrayInit(numOfOutput, sizeof(SColumnInfoData)); - for (int32_t i = 0; i < numOfOutput; ++i) { - SColumnInfoData idata = {{0}}; - idata.info.type = pExpr[i].base.resSchema.type; - idata.info.bytes = pExpr[i].base.resSchema.bytes; - idata.info.colId = pExpr[i].base.resSchema.colId; - - int32_t size = TMAX(idata.info.bytes * numOfRows, minSize); - idata.pData = taosMemoryCalloc(1, size); // at least to hold a pointer on x64 platform - taosArrayPush(res->pDataBlock, &idata); - } - - return res; -} - SSDataBlock* createOutputBuf_rv1(SDataBlockDescNode* pNode) { int32_t numOfCols = LIST_LENGTH(pNode->pSlots); @@ -5840,14 +5820,14 @@ static void cleanupAggSup(SAggSupporter* pAggSup); static void destroySortedMergeOperatorInfo(void* param, int32_t numOfOutput) { SSortedMergeOperatorInfo* pInfo = (SSortedMergeOperatorInfo*) param; - taosArrayDestroy(pInfo->orderInfo); + taosArrayDestroy(pInfo->pSortInfo); taosArrayDestroy(pInfo->groupInfo); if (pInfo->pSortHandle != NULL) { tsortDestroySortHandle(pInfo->pSortHandle); } - blockDataDestroy(pInfo->binfo.pRes); + blockDataDestroy(pInfo->binfo.pRes); cleanupAggSup(&pInfo->aggSup); } @@ -6105,12 +6085,10 @@ static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) { return getSortedBlockData(pInfo->pSortHandle, pInfo->binfo.pRes, pInfo->hasVarCol, pInfo->binfo.capacity); } - SSchema* p = blockDataExtractSchema(pInfo->binfo.pRes, NULL); int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, - numOfBufPage, p, pInfo->binfo.pRes->info.numOfCols, "GET_TASKID(pTaskInfo)"); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_MULTISOURCE_MERGE, pInfo->bufPageSize, + numOfBufPage, pInfo->binfo.pRes, "GET_TASKID(pTaskInfo)"); - taosMemoryFreeClear(p); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); for(int32_t i = 0; i < pOperator->numOfDownstream; ++i) { @@ -6128,29 +6106,6 @@ static SSDataBlock* doSortedMerge(SOperatorInfo *pOperator, bool* newgroup) { return doMerge(pOperator); } -static SArray* createBlockOrder(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal) { - SArray* pOrderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); - - size_t numOfOrder = taosArrayGetSize(pOrderVal); - for (int32_t j = 0; j < numOfOrder; ++j) { - SBlockOrderInfo orderInfo = {0}; - SOrder* pOrder = taosArrayGet(pOrderVal, j); - orderInfo.order = pOrder->order; - - for (int32_t i = 0; i < numOfCols; ++i) { - SExprInfo* pExpr = &pExprInfo[i]; - if (pExpr->base.resSchema.colId == pOrder->col.colId) { - orderInfo.colIndex = i; - break; - } - } - - taosArrayPush(pOrderInfo, &orderInfo); - } - - return pOrderInfo; -} - static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGroupInfo, SSortedMergeOperatorInfo* pInfo) { if (pGroupInfo == NULL || taosArrayGetSize(pGroupInfo) == 0) { return 0; @@ -6199,7 +6154,7 @@ static int32_t initGroupCol(SExprInfo* pExprInfo, int32_t numOfCols, SArray* pGr return TSDB_CODE_SUCCESS; } -SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pSortInfo, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo) { SSortedMergeOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortedMergeOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -6228,7 +6183,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t // pRuntimeEnv->pQueryAttr->topBotQuery, false)); pInfo->sortBufSize = 1024 * 16; // 1MB pInfo->bufPageSize = 1024; - pInfo->orderInfo = createBlockOrder(pExprInfo, num, pOrderVal); + pInfo->pSortInfo = pSortInfo; pInfo->binfo.capacity = blockDataGetCapacityInRow(pInfo->binfo.pRes, pInfo->bufPageSize); @@ -6268,35 +6223,34 @@ static SSDataBlock* doSort(SOperatorInfo *pOperator, bool* newgroup) { } SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SOrderOperatorInfo* pInfo = pOperator->info; + SSortOperatorInfo* pInfo = pOperator->info; + bool hasVarCol = pInfo->pDataBlock->info.hasVarCol; + if (pOperator->status == OP_RES_TO_RETURN) { - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes); } - SSchema* p = blockDataExtractSchema(pInfo->pDataBlock, NULL); int32_t numOfBufPage = pInfo->sortBufSize / pInfo->bufPageSize; - pInfo->pSortHandle = tsortCreateSortHandle(pInfo->orderInfo, pInfo->nullFirst, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, - numOfBufPage, p, pInfo->pDataBlock->info.numOfCols, "GET_TASKID(pTaskInfo)"); + pInfo->pSortHandle = tsortCreateSortHandle(pInfo->pSortInfo, SORT_SINGLESOURCE_SORT, pInfo->bufPageSize, + numOfBufPage, pInfo->pDataBlock, "GET_TASKID(pTaskInfo)"); - taosMemoryFreeClear(p); tsortSetFetchRawDataFp(pInfo->pSortHandle, loadNextDataBlock); SGenericSource* ps = taosMemoryCalloc(1, sizeof(SGenericSource)); - ps->param = pOperator; + ps->param = pOperator->pDownstream[0]; tsortAddSource(pInfo->pSortHandle, ps); - // TODO set error code; int32_t code = tsortOpen(pInfo->pSortHandle); if (code != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, terrno); } pOperator->status = OP_RES_TO_RETURN; - return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, pInfo->hasVarCol, pInfo->numOfRowsInRes); + return getSortedBlockData(pInfo->pSortHandle, pInfo->pDataBlock, hasVarCol, pInfo->numOfRowsInRes); } -SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo) { - SOrderOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SOrderOperatorInfo)); +SOperatorInfo *createSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SArray* pSortInfo, SExecTaskInfo* pTaskInfo) { + SSortOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SSortOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { taosMemoryFreeClear(pInfo); @@ -6305,37 +6259,21 @@ SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx return NULL; } - pInfo->sortBufSize = 1024 * 16; // 1MB - pInfo->bufPageSize = 1024; - pInfo->numOfRowsInRes = 1024; + pInfo->sortBufSize = 1024 * 16; // 1MB, TODO dynamic set the available sort buffer + pInfo->bufPageSize = 1024; + pInfo->numOfRowsInRes = 1024; + pInfo->pDataBlock = pResBlock; + pInfo->pSortInfo = pSortInfo; - pInfo->orderInfo = createBlockOrder(pExprInfo, numOfCols, pOrderVal); - - for(int32_t i = 0; i < numOfCols; ++i) { - if (IS_VAR_DATA_TYPE(pExprInfo[i].base.resSchema.type)) { - pInfo->hasVarCol = true; - break; - } - } - - if (pInfo->orderInfo == NULL || pInfo->pDataBlock == NULL) { - taosMemoryFreeClear(pOperator); - destroyOrderOperatorInfo(pInfo, numOfCols); - taosMemoryFreeClear(pInfo); - - terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; - return NULL; - } - - pOperator->name = "Order"; + pOperator->name = "Sort"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_SORT; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - pOperator->getNextFn = doSort; - pOperator->closeFn = destroyOrderOperatorInfo; + pOperator->getNextFn = doSort; + pOperator->closeFn = destroyOrderOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -6496,7 +6434,6 @@ static bool aggDecodeResultRow(SOperatorInfo* pOperator, char *result, int32_t l offset += valueLen; initResultRow(resultRow); - pInfo->resultRowInfo.pPosition[pInfo->resultRowInfo.size++] = (SResultRowPosition) {.pageId = resultRow->pageId, .offset = resultRow->offset}; } @@ -7505,10 +7442,10 @@ static void destroyTagScanOperatorInfo(void* param, int32_t numOfOutput) { } static void destroyOrderOperatorInfo(void* param, int32_t numOfOutput) { - SOrderOperatorInfo* pInfo = (SOrderOperatorInfo*) param; + SSortOperatorInfo* pInfo = (SSortOperatorInfo*) param; pInfo->pDataBlock = blockDataDestroy(pInfo->pDataBlock); - taosArrayDestroy(pInfo->orderInfo); + taosArrayDestroy(pInfo->pSortInfo); } static void destroyConditionOperatorInfo(void* param, int32_t numOfOutput) { @@ -8035,7 +7972,7 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI offset += pExpr[index->colIndex].base.resSchema.bytes; } - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pOperator->resultInfo.capacity); +// pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pOperator->resultInfo.capacity); pOperator->name = "SLimitOperator"; @@ -8328,7 +8265,7 @@ SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperato pInfo->outputCapacity = 4096; pInfo->pDistinctDataInfo = taosArrayInit(numOfOutput, sizeof(SDistinctDataInfo)); pInfo->pSet = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); +// pInfo->pRes = createOutputBuf(pExpr, numOfOutput, (int32_t) pInfo->outputCapacity); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); @@ -8536,8 +8473,9 @@ static SArray* extractTableIdList(const STableGroupInfo* pTableGroupInfo); static SArray* extractScanColumnId(SNodeList* pNodeList); static SArray* extractColumnInfo(SNodeList* pNodeList); static SArray* extractColMatchInfo(SNodeList* pNodeList); +static SArray* createSortInfo(SNodeList* pNodeList); -SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { + SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; @@ -8623,16 +8561,37 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; + //todo: set the correct primary timestamp key column int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, + SInterval interval = {.interval = pIntervalPhyNode->interval, + .sliding = pIntervalPhyNode->sliding, .intervalUnit = pIntervalPhyNode->intervalUnit, - .slidingUnit = pIntervalPhyNode->slidingUnit, .offset = pIntervalPhyNode->offset}; + .slidingUnit = pIntervalPhyNode->slidingUnit, + .offset = pIntervalPhyNode->offset}; return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo); } - } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { + } else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) { + size_t size = LIST_LENGTH(pPhyNode->pChildren); + assert(size == 1); + + for (int32_t i = 0; i < size; ++i) { + SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); + SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + + SSortPhysiNode* pSortPhyNode = (SSortPhysiNode*)pPhyNode; + + int32_t num = 0; + SExprInfo *pExprInfo = createExprInfo(pSortPhyNode->pExprs, NULL, &num); + SSDataBlock *pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); + SArray *info = createSortInfo(pSortPhyNode->pSortKeys); + return createSortOperatorInfo(op, pExprInfo, num, pResBlock, info, pTaskInfo); + } + } else { + ASSERT(0); + }/*else if (pPhyNode->info.type == OP_MultiTableAggregate) { size_t size = taosArrayGetSize(pPhyNode->pChildren); assert(size == 1); @@ -8725,6 +8684,38 @@ SArray* extractColumnInfo(SNodeList* pNodeList) { return pList; } +SArray* createSortInfo(SNodeList* pNodeList) { + size_t numOfCols = LIST_LENGTH(pNodeList); + SArray* pList = taosArrayInit(numOfCols, sizeof(SBlockOrderInfo)); + if (pList == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return pList; + } + + for(int32_t i = 0; i < numOfCols; ++i) { + STargetNode* pNode = (STargetNode*) nodesListGetNode(pNodeList, i); + SOrderByExprNode* pSortKey = (SOrderByExprNode*) pNode->pExpr; + SBlockOrderInfo bi = {0}; + bi.order = (pSortKey->order == ORDER_ASC)? TSDB_ORDER_ASC:TSDB_ORDER_DESC; + bi.nullFirst = (pSortKey->nullOrder == NULL_ORDER_FIRST); + + SColumnNode* pColNode = (SColumnNode*)pSortKey->pExpr; + bi.slotId = pColNode->slotId; +// pColNode->order; +// SColumn c = {0}; +// c.slotId = pColNode->slotId; +// c.colId = pColNode->colId; +// c.type = pColNode->node.resType.type; +// c.bytes = pColNode->node.resType.bytes; +// c.precision = pColNode->node.resType.precision; +// c.scale = pColNode->node.resType.scale; + + taosArrayPush(pList, &bi); + } + + return pList; +} + SArray* extractColMatchInfo(SNodeList* pNodeList) { size_t numOfCols = LIST_LENGTH(pNodeList); SArray* pList = taosArrayInit(numOfCols, sizeof(SColMatchInfo)); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 85ba462c9a..6711daed5e 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -23,20 +23,19 @@ #include "tsort.h" #include "tutil.h" -typedef struct STupleHandle { +struct STupleHandle { SSDataBlock* pBlock; int32_t rowIndex; -} STupleHandle; +}; -typedef struct SSortHandle { +struct SSortHandle { int32_t type; int32_t pageSize; int32_t numOfPages; SDiskbasedBuf *pBuf; - SArray *pOrderInfo; - bool nullFirst; + SArray *pSortInfo; SArray *pOrderedSource; _sort_fetch_block_fn_t fetchfp; @@ -60,7 +59,7 @@ typedef struct SSortHandle { bool inMemSort; bool needAdjust; STupleHandle tupleHandle; -} SSortHandle; +}; static int32_t msortComparFn(const void *pLeft, const void *pRight, void *param); @@ -90,18 +89,18 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) { * @param type * @return */ -SSortHandle* tsortCreateSortHandle(SArray* pOrderInfo, bool nullFirst, int32_t type, int32_t pageSize, int32_t numOfPages, SSchema* pSchema, int32_t numOfCols, const char* idstr) { +SSortHandle* tsortCreateSortHandle(SArray* pSortInfo, int32_t type, int32_t pageSize, int32_t numOfPages, SSDataBlock* pBlock, const char* idstr) { SSortHandle* pSortHandle = taosMemoryCalloc(1, sizeof(SSortHandle)); pSortHandle->type = type; pSortHandle->pageSize = pageSize; pSortHandle->numOfPages = numOfPages; - pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); - pSortHandle->pOrderInfo = pOrderInfo; - pSortHandle->nullFirst = nullFirst; - pSortHandle->cmpParam.orderInfo = pOrderInfo; + pSortHandle->pSortInfo = pSortInfo; + pSortHandle->pDataBlock = createOneDataBlock(pBlock); + + pSortHandle->pOrderedSource = taosArrayInit(4, POINTER_BYTES); + pSortHandle->cmpParam.orderInfo = pSortInfo; - pSortHandle->pDataBlock = createDataBlock_rv(pSchema, numOfCols); tsortSetComparFp(pSortHandle, msortComparFn); if (idstr != NULL) { @@ -364,14 +363,14 @@ int32_t msortComparFn(const void *pLeft, const void *pRight, void *param) { for(int32_t i = 0; i < pInfo->size; ++i) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); - SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->colIndex); + SColumnInfoData* pLeftColInfoData = TARRAY_GET_ELEM(pLeftBlock->pDataBlock, pOrder->slotId); bool leftNull = false; if (pLeftColInfoData->hasNull) { leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pLeftSource->src.rowIndex, pLeftBlock->pBlockAgg); } - SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->colIndex); + SColumnInfoData* pRightColInfoData = TARRAY_GET_ELEM(pRightBlock->pDataBlock, pOrder->slotId); bool rightNull = false; if (pRightColInfoData->hasNull) { rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pRightSource->src.rowIndex, pRightBlock->pBlockAgg); @@ -542,7 +541,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { if (size > sortBufSize) { // Perform the in-memory sort and then flush data in the buffer into disk. int64_t p = taosGetTimestampUs(); - blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst); + blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); int64_t el = taosGetTimestampUs() - p; pHandle->sortElapsed += el; @@ -555,7 +554,7 @@ static int32_t createInitialSortedMultiSources(SSortHandle* pHandle) { size_t size = blockDataGetSize(pHandle->pDataBlock); // Perform the in-memory sort and then flush data in the buffer into disk. - blockDataSort(pHandle->pDataBlock, pHandle->pOrderInfo, pHandle->nullFirst); + blockDataSort(pHandle->pDataBlock, pHandle->pSortInfo); // All sorted data can fit in memory, external memory sort is not needed. Return to directly if (size <= sortBufSize) { diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 7fb9b2ad7e..c9b0b62013 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -966,7 +966,7 @@ TEST(testCase, inMem_sort_Test) { exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); taosArrayPush(pExprInfo, &exp1); - SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL); + SOperatorInfo* pOperator = createSortOperatorInfo(createDummyOperator(10000, 5, 1000, data_asc, 1), pExprInfo, pOrderVal, NULL); bool newgroup = false; SSDataBlock* pRes = pOperator->getNextFn(pOperator, &newgroup); @@ -1035,7 +1035,7 @@ TEST(testCase, external_sort_Test) { exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); // taosArrayPush(pExprInfo, &exp1); - SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(10000, 1500, 1000, data_desc, 1), pExprInfo, pOrderVal, NULL); + SOperatorInfo* pOperator = createSortOperatorInfo(createDummyOperator(10000, 1500, 1000, data_desc, 1), pExprInfo, pOrderVal, NULL); bool newgroup = false; SSDataBlock* pRes = NULL;