From c57e99e4c76a7ce3f150d4691c37763e09b86c89 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 8 Feb 2022 10:21:00 +0800 Subject: [PATCH] [td-11818] 1. Add orderby unit test; 2. refactor API; --- 2.0/src/query/tests/resultBufferTest.cpp | 6 +- include/common/tep.h | 3 + include/util/tlosertree.h | 4 +- include/util/tpagedfile.h | 92 +-- source/common/src/tep.c | 128 +++- source/libs/executor/inc/executorimpl.h | 673 ++++++++++---------- source/libs/executor/src/executil.c | 2 +- source/libs/executor/src/executorimpl.c | 307 ++++++++- source/libs/executor/test/executorTests.cpp | 138 +++- source/libs/function/inc/tpercentile.h | 2 +- source/libs/function/src/tpercentile.c | 10 +- source/util/src/tlosertree.c | 8 +- source/util/src/tpagedfile.c | 111 +++- 13 files changed, 1023 insertions(+), 461 deletions(-) diff --git a/2.0/src/query/tests/resultBufferTest.cpp b/2.0/src/query/tests/resultBufferTest.cpp index 9724b98f7c..4c523a52bf 100644 --- a/2.0/src/query/tests/resultBufferTest.cpp +++ b/2.0/src/query/tests/resultBufferTest.cpp @@ -13,7 +13,7 @@ namespace { // simple test void simpleTest() { - SDiskbasedResultBuf* pResultBuf = NULL; + SDiskbasedBuf* pResultBuf = NULL; int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4096, 1); int32_t pageId = 0; @@ -55,7 +55,7 @@ void simpleTest() { } void writeDownTest() { - SDiskbasedResultBuf* pResultBuf = NULL; + SDiskbasedBuf* pResultBuf = NULL; int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1); int32_t pageId = 0; @@ -102,7 +102,7 @@ void writeDownTest() { } void recyclePageTest() { - SDiskbasedResultBuf* pResultBuf = NULL; + SDiskbasedBuf* pResultBuf = NULL; int32_t ret = createDiskbasedResultBuffer(&pResultBuf, 1024, 4*1024, 1); int32_t pageId = 0; diff --git a/include/common/tep.h b/include/common/tep.h index 7e90b46c3c..b894d972d4 100644 --- a/include/common/tep.h +++ b/include/common/tep.h @@ -45,7 +45,10 @@ size_t colDataGetNumOfRows(const SSDataBlock* pBlock); int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc); int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize); +SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount); + int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock); +int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf); size_t blockDataGetSize(const SSDataBlock* pBlock); size_t blockDataGetRowSize(const SSDataBlock* pBlock); diff --git a/include/util/tlosertree.h b/include/util/tlosertree.h index d6ffde82ca..2977287be6 100644 --- a/include/util/tlosertree.h +++ b/include/util/tlosertree.h @@ -24,7 +24,7 @@ typedef int (*__merge_compare_fn_t)(const void *, const void *, void *param); typedef struct SLoserTreeNode { int32_t index; - void *pData; + void *pData; // TODO remove it? } SLoserTreeNode; typedef struct SLoserTreeInfo { @@ -35,7 +35,7 @@ typedef struct SLoserTreeInfo { SLoserTreeNode *pNode; } SLoserTreeInfo; -uint32_t tLoserTreeCreate(SLoserTreeInfo **pTree, int32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); +int32_t tLoserTreeCreate(SLoserTreeInfo **pTree, uint32_t numOfEntries, void *param, __merge_compare_fn_t compareFn); void tLoserTreeInit(SLoserTreeInfo *pTree); diff --git a/include/util/tpagedfile.h b/include/util/tpagedfile.h index 5bc4dc92a0..c5c6c5cb59 100644 --- a/include/util/tpagedfile.h +++ b/include/util/tpagedfile.h @@ -26,54 +26,8 @@ extern "C" { #include "tlockfree.h" typedef struct SArray* SIDList; - -typedef struct SPageDiskInfo { - int32_t offset; - int32_t length; -} SPageDiskInfo; - -typedef struct SPageInfo { - SListNode* pn; // point to list node - int32_t pageId; - SPageDiskInfo info; - void* pData; - bool used; // set current page is in used -} SPageInfo; - -typedef struct SFreeListItem { - int32_t offset; - int32_t len; -} SFreeListItem; - -typedef struct SResultBufStatis { - int32_t flushBytes; - int32_t loadBytes; - int32_t getPages; - int32_t releasePages; - int32_t flushPages; -} SResultBufStatis; - -typedef struct SDiskbasedResultBuf { - int32_t numOfPages; - int64_t totalBufSize; - int64_t fileSize; // disk file size - FILE* file; - int32_t allocateId; // allocated page id - char* path; // file path - int32_t pageSize; // current used page size - int32_t inMemPages; // numOfPages that are allocated in memory - SHashObj* groupSet; // id hash table - SHashObj* all; - SList* lruList; - void* emptyDummyIdList; // dummy id list - void* assistBuf; // assistant buffer for compress/decompress data - SArray* pFree; // free area in file - bool comp; // compressed before flushed to disk - int32_t nextPos; // next page flush position - - uint64_t qId; // for debug purpose - SResultBufStatis statis; -} SDiskbasedResultBuf; +typedef struct SPageInfo SPageInfo; +typedef struct SDiskbasedBuf SDiskbasedBuf; #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes #define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1} @@ -93,7 +47,7 @@ typedef struct SFilePage { * @param handle * @return */ -int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); +int32_t createDiskbasedResultBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir); /** * @@ -102,7 +56,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa * @param pageId * @return */ -SFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId); +SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pageId); /** * @@ -110,7 +64,7 @@ SFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 * @param groupId * @return */ -SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId); +SIDList getDataBufPagesIdList(SDiskbasedBuf* pResultBuf, int32_t groupId); /** * get the specified buffer page by id @@ -118,49 +72,69 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId); * @param id * @return */ -SFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id); +SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id); /** * release the referenced buf pages * @param pResultBuf * @param page */ -void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page); +void releaseResBufPage(SDiskbasedBuf* pResultBuf, void* page); /** * * @param pResultBuf * @param pi */ -void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi); - +void releaseResBufPageInfo(SDiskbasedBuf* pResultBuf, struct SPageInfo* pi); /** * get the total buffer size in the format of disk file * @param pResultBuf * @return */ -size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf); +size_t getResBufSize(const SDiskbasedBuf* pResultBuf); /** * get the number of groups in the result buffer * @param pResultBuf * @return */ -size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf); +size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pResultBuf); /** * destroy result buffer * @param pResultBuf */ -void destroyResultBuf(SDiskbasedResultBuf* pResultBuf); +void destroyResultBuf(SDiskbasedBuf* pResultBuf); /** * * @param pList * @return */ -SPageInfo* getLastPageInfo(SIDList pList); +struct SPageInfo* getLastPageInfo(SIDList pList); + +/** + * + * @param pPgInfo + * @return + */ +int32_t getPageId(const struct SPageInfo* pPgInfo); + +/** + * Return the buffer page size. + * @param pResultBuf + * @return + */ +int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf); + +/** + * + * @param pResultBuf + * @return + */ +bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf); #ifdef __cplusplus } diff --git a/source/common/src/tep.c b/source/common/src/tep.c index ce088bd173..554dc4e552 100644 --- a/source/common/src/tep.c +++ b/source/common/src/tep.c @@ -246,9 +246,11 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co } pColumnInfoData->varmeta.offset = (int32_t*) p; - memcpy(pColumnInfoData->varmeta.offset + sizeof(int32_t) * numOfRow1, pSource->varmeta.offset, sizeof(int32_t) * numOfRow2); + for(int32_t i = 0; i < numOfRow2; ++i) { + pColumnInfoData->varmeta.offset[i + numOfRow1] = pSource->varmeta.offset[i] + pColumnInfoData->varmeta.length; + } - // copy the + // copy data uint32_t len = pSource->varmeta.length; uint32_t oldLen = pColumnInfoData->varmeta.length; if (pColumnInfoData->varmeta.allocLen < len + oldLen) { @@ -261,7 +263,8 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co pColumnInfoData->varmeta.allocLen = len + oldLen; } - memcpy(pColumnInfoData->pData + oldLen, pSource->pData + sizeof(int32_t), len); + memcpy(pColumnInfoData->pData + oldLen, pSource->pData, len); + pColumnInfoData->varmeta.length = len + oldLen; } else { doBitmapMerge(pColumnInfoData, numOfRow1, pSource, numOfRow2); @@ -414,18 +417,62 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd } } +SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int32_t rowCount) { + if (pBlock == NULL || startIndex < 0 || rowCount > pBlock->info.rows || rowCount + startIndex > pBlock->info.rows) { + return NULL; + } + + SSDataBlock* pDst = calloc(1, sizeof(SSDataBlock)); + pDst->info = pBlock->info; + + pDst->info.rows = 0; + pDst->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData)); + + for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData colInfo = {0}; + SColumnInfoData* pSrcCol = taosArrayGet(pBlock->pDataBlock, i); + colInfo.info = pSrcCol->info; + + if (IS_VAR_DATA_TYPE(pSrcCol->info.type)) { + SVarColAttr* pAttr = &colInfo.varmeta; + pAttr->offset = calloc(rowCount, sizeof(int32_t)); + } else { + colInfo.nullbitmap = calloc(1, BitmapLen(rowCount)); + colInfo.pData = calloc(rowCount, colInfo.info.bytes); + } + + taosArrayPush(pDst->pDataBlock, &colInfo); + } + + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); + + for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { + bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg); + char* p = colDataGet(pColData, j); + + colDataAppend(pDstCol, j - startIndex, p, isNull); + } + } + + pDst->info.rows = rowCount; + return pDst; +} + + /** * - * +---------------------------+---------------------+ - * |the number of rows(4 bytes)| column #1 | - * |---------------------+ - * | | null bitmap| values | - * +---------------------------+---------------------+ + * +------------------+---------------+--------------------+ + * |the number of rows| column length | column #1 | + * | (4 bytes) | (4 bytes) |--------------------+ + * | | | null bitmap| values| + * +------------------+---------------+--------------------+ * @param buf * @param pBlock * @return */ -int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { +int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { // TODO add the column length!! ASSERT(pBlock != NULL); // write the number of rows @@ -447,6 +494,10 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { } uint32_t dataSize = colDataGetSize(pCol, numOfRows); + + *(int32_t*) pStart = dataSize; + pStart += sizeof(int32_t); + memcpy(pStart, pCol->pData, dataSize); pStart += dataSize; } @@ -454,6 +505,48 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { return 0; } +int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { + pBlock->info.rows = *(int32_t*) buf; + + int32_t numOfCols = pBlock->info.numOfCols; + const char* pStart = buf + sizeof(uint32_t); + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); + + size_t metaSize = pBlock->info.rows * sizeof(int32_t); + if (IS_VAR_DATA_TYPE(pCol->info.type)) { + char* p = realloc(pCol->varmeta.offset, metaSize); + if (p == NULL) { + // TODO handle error + } + + pCol->varmeta.offset = (int32_t*)p; + memcpy(pCol->varmeta.offset, pStart, metaSize); + pStart += metaSize; + } else { + char* p = realloc(pCol->nullbitmap, BitmapLen(pBlock->info.rows)); + if (p == NULL) { + // TODO handle error + } + + pCol->nullbitmap = p; + memcpy(pCol->nullbitmap, pStart, BitmapLen(pBlock->info.rows)); + pStart += BitmapLen(pBlock->info.rows); + } + + int32_t colLength = *(int32_t*) pStart; + pStart += sizeof(int32_t); + + if (pCol->pData == NULL) { + pCol->pData = malloc(pCol->info.bytes * 4096); // TODO refactor the memory mgmt + } + + memcpy(pCol->pData, pStart, colLength); + pStart += colLength; + } +} + size_t blockDataGetRowSize(const SSDataBlock* pBlock) { ASSERT(pBlock != NULL); size_t rowSize = 0; @@ -507,12 +600,12 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { switch(pColInfoData->info.type) { case TSDB_DATA_TYPE_INT: { if (*(int32_t*) left1 == *(int32_t*) right1) { - continue;// TODO continue + break; } else { if (pOrder->order == TSDB_ORDER_ASC) { - return (*(int32_t*) left1 < *(int32_t*) right1)? -1:1; + return (*(int32_t*) left1 <= *(int32_t*) right1)? -1:1; } else { - return (*(int32_t*) left1 < *(int32_t*) right1)? 1:-1; + return (*(int32_t*) left1 <= *(int32_t*) right1)? 1:-1; } } } @@ -624,10 +717,13 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs return terrno; } + int64_t p0 = taosGetTimestampUs(); + SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; taosqsort(index, rows, sizeof(int32_t), &helper, dataBlockCompar); - int32_t numOfCols = pDataBlock->info.numOfCols; + int64_t p1 = taosGetTimestampUs(); + SColumnInfoData* pCols = createHelpColInfoData(pDataBlock); if (pCols == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -640,7 +736,10 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs printf("%d, %d, %d\n", index[i], ((int32_t*)px->pData)[i], ((int32_t*)px->pData)[index[i]]); } #endif + int64_t p2 = taosGetTimestampUs(); + blockDataAssign(pCols, pDataBlock, index); + int64_t p3 = taosGetTimestampUs(); #if 0 for(int32_t i = 0; i < pDataBlock->info.rows; ++i) { @@ -655,5 +754,8 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs #endif copyBackToBlock(pDataBlock, pCols); + int64_t p4 = taosGetTimestampUs(); + + printf("sort:%ld, create:%ld, assign:%ld, copyback:%ld\n", p1-p0, p2 - p1, p3 - p2, p4-p3); destroyTupleIndex(index); } diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index f2a068869c..b9c8012cff 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -15,8 +15,13 @@ #ifndef TDENGINE_EXECUTORIMPL_H #define TDENGINE_EXECUTORIMPL_H +#ifdef __cplusplus +extern "C" { +#endif + #include "os.h" #include "common.h" +#include "tlosertree.h" #include "ttszip.h" #include "tvariant.h" @@ -36,14 +41,14 @@ struct SColumnFilterElem; typedef int32_t (*__block_search_fn_t)(char* data, int32_t num, int64_t key, int32_t order); #define IS_QUERY_KILLED(_q) ((_q)->code == TSDB_CODE_TSC_QUERY_CANCELLED) -#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) +#define Q_STATUS_EQUAL(p, s) (((p) & (s)) != 0u) #define QUERY_IS_ASC_QUERY(q) (GET_FORWARD_DIRECTION_FACTOR((q)->order.order) == QUERY_ASC_FORWARD_STEP) -#define GET_TABLEGROUP(q, _index) ((SArray*) taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) +#define GET_TABLEGROUP(q, _index) ((SArray*)taosArrayGetP((q)->tableqinfoGroupInfo.pGroupList, (_index))) -#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL? 0:((_r)->outputBuf)->info.rows) +#define GET_NUM_OF_RESULTS(_r) (((_r)->outputBuf) == NULL ? 0 : ((_r)->outputBuf)->info.rows) -#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData? 1 : 0) +#define NEEDTO_COMPRESS_QUERY(size) ((size) > tsCompressColData ? 1 : 0) enum { // when this task starts to execute, this status will set @@ -62,8 +67,8 @@ enum { }; typedef struct SResultRowCell { - uint64_t groupId; - SResultRow *pRow; + uint64_t groupId; + SResultRow* pRow; } SResultRowCell; /** @@ -80,24 +85,24 @@ typedef struct SColumnFilterElem { int16_t bytes; // column length __filter_func_t fp; SColumnFilterInfo filterInfo; - void *q; + void* q; } SColumnFilterElem; typedef struct SSingleColumnFilterInfo { void* pData; - void* pData2; //used for nchar column + void* pData2; // used for nchar column int32_t numOfFilters; SColumnInfo info; SColumnFilterElem* pFilters; } SSingleColumnFilterInfo; typedef struct STableQueryInfo { - TSKEY lastKey; - int32_t groupIndex; // group id in table list - SVariant tag; - STimeWindow win; // todo remove it later - STSCursor cur; - void* pTable; // for retrieve the page id list + TSKEY lastKey; + int32_t groupIndex; // group id in table list + SVariant tag; + STimeWindow win; // todo remove it later + STSCursor cur; + void* pTable; // for retrieve the page id list SResultRowInfo resInfo; } STableQueryInfo; @@ -109,11 +114,11 @@ typedef enum { typedef struct { EQueryProfEventType eventType; - int64_t eventTime; + int64_t eventTime; union { - uint8_t operatorType; //for operator event - int32_t abortCode; //for query abort event + uint8_t operatorType; // for operator event + int32_t abortCode; // for query abort event }; } SQueryProfEvent; @@ -124,33 +129,33 @@ typedef struct { } SOperatorProfResult; typedef struct STaskCostInfo { - int64_t created; - int64_t start; - int64_t end; + int64_t created; + int64_t start; + int64_t end; - uint64_t loadStatisTime; - uint64_t loadFileBlockTime; - uint64_t loadDataInCacheTime; - uint64_t loadStatisSize; - uint64_t loadFileBlockSize; - uint64_t loadDataInCacheSize; + uint64_t loadStatisTime; + uint64_t loadFileBlockTime; + uint64_t loadDataInCacheTime; + uint64_t loadStatisSize; + uint64_t loadFileBlockSize; + uint64_t loadDataInCacheSize; - uint64_t loadDataTime; - uint64_t totalRows; - uint64_t totalCheckedRows; - uint32_t totalBlocks; - uint32_t loadBlocks; - uint32_t loadBlockStatis; - uint32_t discardBlocks; - uint64_t elapsedTime; - uint64_t firstStageMergeTime; - uint64_t winInfoSize; - uint64_t tableInfoSize; - uint64_t hashSize; - uint64_t numOfTimeWindows; + uint64_t loadDataTime; + uint64_t totalRows; + uint64_t totalCheckedRows; + uint32_t totalBlocks; + uint32_t loadBlocks; + uint32_t loadBlockStatis; + uint32_t discardBlocks; + uint64_t elapsedTime; + uint64_t firstStageMergeTime; + uint64_t winInfoSize; + uint64_t tableInfoSize; + uint64_t hashSize; + uint64_t numOfTimeWindows; - SArray *queryProfEvents; //SArray - SHashObj *operatorProfResults; //map + SArray* queryProfEvents; // SArray + SHashObj* operatorProfResults; // map } STaskCostInfo; typedef struct { @@ -166,67 +171,67 @@ typedef struct { // The basic query information extracted from the SQueryInfo tree to support the // execution of query in a data node. typedef struct STaskAttr { - SLimit limit; - SLimit slimit; + SLimit limit; + SLimit slimit; // todo comment it - bool stableQuery; // super table query or not - bool topBotQuery; // TODO used bitwise flag - bool groupbyColumn; // denote if this is a groupby normal column query - bool hasTagResults; // if there are tag values in final result or not - bool timeWindowInterpo;// if the time window start/end required interpolation - bool queryBlockDist; // if query data block distribution - bool stabledev; // super table stddev query - bool tsCompQuery; // is tscomp query - bool diffQuery; // is diff query - bool simpleAgg; - bool pointInterpQuery; // point interpolation query - bool needReverseScan; // need reverse scan - bool distinct; // distinct query or not - bool stateWindow; // window State on sub/normal table - bool createFilterOperator; // if filter operator is needed - bool multigroupResult; // multigroup result can exist in one SSDataBlock - int32_t interBufSize; // intermediate buffer sizse + bool stableQuery; // super table query or not + bool topBotQuery; // TODO used bitwise flag + bool groupbyColumn; // denote if this is a groupby normal column query + bool hasTagResults; // if there are tag values in final result or not + bool timeWindowInterpo; // if the time window start/end required interpolation + bool queryBlockDist; // if query data block distribution + bool stabledev; // super table stddev query + bool tsCompQuery; // is tscomp query + bool diffQuery; // is diff query + bool simpleAgg; + bool pointInterpQuery; // point interpolation query + bool needReverseScan; // need reverse scan + bool distinct; // distinct query or not + bool stateWindow; // window State on sub/normal table + bool createFilterOperator; // if filter operator is needed + bool multigroupResult; // multigroup result can exist in one SSDataBlock + int32_t interBufSize; // intermediate buffer sizse - int32_t havingNum; // having expr number + int32_t havingNum; // having expr number - SOrder order; - int16_t numOfCols; - int16_t numOfTags; + SOrder order; + int16_t numOfCols; + int16_t numOfTags; - STimeWindow window; - SInterval interval; - SSessionWindow sw; - int16_t precision; - int16_t numOfOutput; - int16_t fillType; + STimeWindow window; + SInterval interval; + SSessionWindow sw; + int16_t precision; + int16_t numOfOutput; + int16_t fillType; - int32_t srcRowSize; // todo extract struct - int32_t resultRowSize; - int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query. - int32_t maxTableColumnWidth; - int32_t tagLen; // tag value length of current query - SGroupbyExpr *pGroupbyExpr; + int32_t srcRowSize; // todo extract struct + int32_t resultRowSize; + int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query. + int32_t maxTableColumnWidth; + int32_t tagLen; // tag value length of current query + SGroupbyExpr* pGroupbyExpr; - SExprInfo* pExpr1; - SExprInfo* pExpr2; - int32_t numOfExpr2; - SExprInfo* pExpr3; - int32_t numOfExpr3; + SExprInfo* pExpr1; + SExprInfo* pExpr2; + int32_t numOfExpr2; + SExprInfo* pExpr3; + int32_t numOfExpr3; - SColumnInfo* tableCols; - SColumnInfo* tagColList; - int32_t numOfFilterCols; - int64_t* fillVal; - SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. + SColumnInfo* tableCols; + SColumnInfo* tagColList; + int32_t numOfFilterCols; + int64_t* fillVal; + SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query. SSingleColumnFilterInfo* pFilterInfo; -// SFilterInfo *pFilters; - - void* tsdb; - STableGroupInfo tableGroupInfo; // table list SArray - int32_t vgId; - SArray *pUdfInfo; // no need to free + // SFilterInfo *pFilters; + + void* tsdb; + STableGroupInfo tableGroupInfo; // table list SArray + int32_t vgId; + SArray* pUdfInfo; // no need to free } STaskAttr; typedef int32_t (*__optr_prepare_fn_t)(void* param); @@ -236,176 +241,172 @@ typedef void (*__optr_cleanup_fn_t)(void* param, int32_t num); struct SOperatorInfo; typedef struct STaskIdInfo { - uint64_t queryId; // this is also a request id - uint64_t subplanId; - uint64_t templateId; - char *str; + uint64_t queryId; // this is also a request id + uint64_t subplanId; + uint64_t templateId; + char* str; } STaskIdInfo; typedef struct SExecTaskInfo { STaskIdInfo id; - char *content; + char* content; uint32_t status; STimeWindow window; STaskCostInfo cost; - int64_t owner; // if it is in execution + int64_t owner; // if it is in execution int32_t code; - uint64_t totalRows; // total number of rows + uint64_t totalRows; // total number of rows STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure - char *sql; // query sql string - jmp_buf env; // - struct SOperatorInfo *pRoot; + char* sql; // query sql string + jmp_buf env; // + struct SOperatorInfo* pRoot; } SExecTaskInfo; typedef struct STaskRuntimeEnv { - jmp_buf env; - STaskAttr* pQueryAttr; - uint32_t status; // query status - void* qinfo; - uint8_t scanFlag; // denotes reversed scan of data or not - void* pTsdbReadHandle; + jmp_buf env; + STaskAttr* pQueryAttr; + uint32_t status; // query status + void* qinfo; + uint8_t scanFlag; // denotes reversed scan of data or not + void* pTsdbReadHandle; - int32_t prevGroupId; // previous executed group id - bool enableGroupData; - SDiskbasedResultBuf* pResultBuf; // query result buffer based on blocked-wised disk file - SHashObj* pResultRowHashTable; // quick locate the window object for each result - SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not - SArray* pResultRowArrayList; // The array list that contains the Result rows - char* keyBuf; // window key buffer - SResultRowPool* pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. - char** prevRow; + int32_t prevGroupId; // previous executed group id + bool enableGroupData; + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + SHashObj* pResultRowHashTable; // quick locate the window object for each result + SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not + SArray* pResultRowArrayList; // The array list that contains the Result rows + char* keyBuf; // window key buffer + SResultRowPool* + pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. + char** prevRow; - SArray* prevResult; // intermediate result, SArray - STSBuf* pTsBuf; // timestamp filter list - STSCursor cur; + SArray* prevResult; // intermediate result, SArray + STSBuf* pTsBuf; // timestamp filter list + STSCursor cur; - char* tagVal; // tag value of current data block - struct SScalarFunctionSupport * scalarSup; + char* tagVal; // tag value of current data block + struct SScalarFunctionSupport* scalarSup; - SSDataBlock *outputBuf; - STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure - struct SOperatorInfo *proot; + SSDataBlock* outputBuf; + STableGroupInfo tableqinfoGroupInfo; // this is a group array list, including SArray structure + struct SOperatorInfo* proot; SGroupResInfo groupResInfo; - int64_t currentOffset; // dynamic offset value + int64_t currentOffset; // dynamic offset value - STableQueryInfo *current; - SRspResultInfo resultInfo; - SHashObj *pTableRetrieveTsMap; - struct SUdfInfo *pUdfInfo; + STableQueryInfo* current; + SRspResultInfo resultInfo; + SHashObj* pTableRetrieveTsMap; + struct SUdfInfo* pUdfInfo; } STaskRuntimeEnv; enum { - OP_IN_EXECUTING = 1, - OP_RES_TO_RETURN = 2, - OP_EXEC_DONE = 3, + OP_IN_EXECUTING = 1, + OP_RES_TO_RETURN = 2, + OP_EXEC_DONE = 3, }; typedef struct SOperatorInfo { - uint8_t operatorType; - bool blockingOptr; // block operator or not - uint8_t status; // denote if current operator is completed - int32_t numOfOutput; // number of columns of the current operator results - char *name; // name, used to show the query execution plan - void *info; // extension attribution - SExprInfo *pExpr; - STaskRuntimeEnv *pRuntimeEnv; // todo remove it - SExecTaskInfo *pTaskInfo; + uint8_t operatorType; + bool blockingOptr; // block operator or not + uint8_t status; // denote if current operator is completed + int32_t numOfOutput; // number of columns of the current operator results + char* name; // name, used to show the query execution plan + void* info; // extension attribution + SExprInfo* pExpr; + STaskRuntimeEnv* pRuntimeEnv; // todo remove it + SExecTaskInfo* pTaskInfo; - struct SOperatorInfo **pDownstream; // downstram pointer list - int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator - __optr_prepare_fn_t prepareFn; - __operator_fn_t exec; - __optr_cleanup_fn_t cleanupFn; + struct SOperatorInfo** pDownstream; // downstram pointer list + int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator + __optr_prepare_fn_t prepareFn; + __operator_fn_t exec; + __optr_cleanup_fn_t cleanupFn; } SOperatorInfo; -enum { - QUERY_RESULT_NOT_READY = 1, - QUERY_RESULT_READY = 2, -}; - typedef struct { int32_t numOfTags; int32_t numOfCols; - SColumnInfo *colList; + SColumnInfo* colList; } SQueriedTableInfo; typedef struct SQInfo { - void* signature; - uint64_t qId; - int32_t code; // error code to returned to client - int64_t owner; // if it is in execution + void* signature; + uint64_t qId; + int32_t code; // error code to returned to client + int64_t owner; // if it is in execution STaskRuntimeEnv runtimeEnv; STaskAttr query; - void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; + void* pBuf; // allocated buffer for STableQueryInfo, sizeof(STableQueryInfo)*numOfTables; - pthread_mutex_t lock; // used to synchronize the rsp/query threads - tsem_t ready; - int32_t dataReady; // denote if query result is ready or not - void* rspContext; // response context - int64_t startExecTs; // start to exec timestamp - char* sql; // query sql string - STaskCostInfo summary; + pthread_mutex_t lock; // used to synchronize the rsp/query threads + tsem_t ready; + int32_t dataReady; // denote if query result is ready or not + void* rspContext; // response context + int64_t startExecTs; // start to exec timestamp + char* sql; // query sql string + STaskCostInfo summary; } SQInfo; typedef struct STaskParam { - char *sql; - char *tagCond; - char *colCond; - char *tbnameCond; - char *prevResult; - SArray *pTableIdList; - SSqlExpr **pExpr; - SSqlExpr **pSecExpr; - SExprInfo *pExprs; - SExprInfo *pSecExprs; + char* sql; + char* tagCond; + char* colCond; + char* tbnameCond; + char* prevResult; + SArray* pTableIdList; + SSqlExpr** pExpr; + SSqlExpr** pSecExpr; + SExprInfo* pExprs; + SExprInfo* pSecExprs; - SFilterInfo *pFilters; + SFilterInfo* pFilters; - SColIndex *pGroupColIndex; - SColumnInfo *pTagColumnInfo; - SGroupbyExpr *pGroupbyExpr; + SColIndex* pGroupColIndex; + SColumnInfo* pTagColumnInfo; + SGroupbyExpr* pGroupbyExpr; int32_t tableScanOperator; - SArray *pOperator; - struct SUdfInfo *pUdfInfo; + SArray* pOperator; + struct SUdfInfo* pUdfInfo; } STaskParam; typedef struct SExchangeInfo { - SArray *pSources; + SArray* pSources; tsem_t ready; - void *pTransporter; - SRetrieveTableRsp *pRsp; - SSDataBlock *pResult; + void* pTransporter; + SRetrieveTableRsp* pRsp; + SSDataBlock* pResult; int32_t current; uint64_t rowsOfCurrentSource; - uint64_t totalSize; // total load bytes from remote - uint64_t totalRows; // total number of rows - uint64_t totalElapsed;// total elapsed time + uint64_t totalSize; // total load bytes from remote + uint64_t totalRows; // total number of rows + uint64_t totalElapsed; // total elapsed time } SExchangeInfo; typedef struct STableScanInfo { - void *pTsdbReadHandle; - int32_t numOfBlocks; // extract basic running information. - int32_t numOfSkipped; - int32_t numOfBlockStatis; - int64_t numOfRows; - - int32_t order; // scan order - int32_t times; // repeat counts - int32_t current; - int32_t reverseTimes; // 0 by default + void* pTsdbReadHandle; + int32_t numOfBlocks; // extract basic running information. + int32_t numOfSkipped; + int32_t numOfBlockStatis; + int64_t numOfRows; - SqlFunctionCtx *pCtx; // next operator query context - SResultRowInfo *pResultRowInfo; - int32_t *rowCellInfoOffset; - SExprInfo *pExpr; + int32_t order; // scan order + int32_t times; // repeat counts + int32_t current; + int32_t reverseTimes; // 0 by default + + SqlFunctionCtx* pCtx; // next operator query context + SResultRowInfo* pResultRowInfo; + int32_t* rowCellInfoOffset; + SExprInfo* pExpr; SSDataBlock block; int32_t numOfOutput; int64_t elapsedTime; int32_t prevGroupId; // previous table group id - int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan + int32_t scanFlag; // table scan flag to denote if it is a repeat/reverse/main scan } STableScanInfo; typedef struct STagScanInfo { @@ -416,32 +417,33 @@ typedef struct STagScanInfo { } STagScanInfo; typedef struct SStreamBlockScanInfo { - SSDataBlock *pRes; // result SSDataBlock - SColumnInfo *pCols; // the output column info - uint64_t numOfRows; // total scanned rows - uint64_t numOfExec; // execution times - void *readerHandle;// stream block reader handle + SSDataBlock* pRes; // result SSDataBlock + SColumnInfo* pCols; // the output column info + uint64_t numOfRows; // total scanned rows + uint64_t numOfExec; // execution times + void* readerHandle; // stream block reader handle } SStreamBlockScanInfo; typedef struct SOptrBasicInfo { - SResultRowInfo resultRowInfo; - int32_t *rowCellInfoOffset; // offset value for each row result cell info - SqlFunctionCtx *pCtx; - SSDataBlock *pRes; + SResultRowInfo resultRowInfo; + int32_t* rowCellInfoOffset; // offset value for each row result cell info + SqlFunctionCtx* pCtx; + SSDataBlock* pRes; } SOptrBasicInfo; typedef struct SOptrBasicInfo STableIntervalOperatorInfo; typedef struct SAggOperatorInfo { - SOptrBasicInfo binfo; - uint32_t seed; - SDiskbasedResultBuf *pResultBuf; // query result buffer based on blocked-wised disk file - SHashObj* pResultRowHashTable; // quick locate the window object for each result - SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not - SArray* pResultRowArrayList; // The array list that contains the Result rows - char* keyBuf; // window key buffer - SResultRowPool* pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. - STableQueryInfo *current; + SOptrBasicInfo binfo; + uint32_t seed; + SDiskbasedBuf* pResultBuf; // query result buffer based on blocked-wised disk file + SHashObj* pResultRowHashTable; // quick locate the window object for each result + SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not + SArray* pResultRowArrayList; // The array list that contains the Result rows + char* keyBuf; // window key buffer + SResultRowPool* + pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. + STableQueryInfo* current; } SAggOperatorInfo; typedef struct SProjectOperatorInfo { @@ -449,52 +451,52 @@ typedef struct SProjectOperatorInfo { int32_t bufCapacity; uint32_t seed; - SSDataBlock *existDataBlock; + SSDataBlock* existDataBlock; } SProjectOperatorInfo; typedef struct SLimitOperatorInfo { - int64_t limit; - int64_t total; + int64_t limit; + int64_t total; } SLimitOperatorInfo; typedef struct SSLimitOperatorInfo { - int64_t groupTotal; - int64_t currentGroupOffset; + int64_t groupTotal; + int64_t currentGroupOffset; - int64_t rowsTotal; - int64_t currentOffset; - SLimit limit; - SLimit slimit; + int64_t rowsTotal; + int64_t currentOffset; + SLimit limit; + SLimit slimit; - char **prevRow; - SArray *orderColumnList; + char** prevRow; + SArray* orderColumnList; bool hasPrev; bool ignoreCurrentGroup; bool multigroupResult; - SSDataBlock *pRes; // result buffer - SSDataBlock *pPrevBlock; + SSDataBlock* pRes; // result buffer + SSDataBlock* pPrevBlock; int64_t capacity; int64_t threshold; } SSLimitOperatorInfo; typedef struct SFilterOperatorInfo { - SSingleColumnFilterInfo *pFilterInfo; - int32_t numOfFilterCols; + SSingleColumnFilterInfo* pFilterInfo; + int32_t numOfFilterCols; } SFilterOperatorInfo; typedef struct SFillOperatorInfo { - struct SFillInfo *pFillInfo; - SSDataBlock *pRes; - int64_t totalInputRows; - void **p; - SSDataBlock *existNewGroupBlock; - bool multigroupResult; + struct SFillInfo* pFillInfo; + SSDataBlock* pRes; + int64_t totalInputRows; + void** p; + SSDataBlock* existNewGroupBlock; + bool multigroupResult; } SFillOperatorInfo; typedef struct SGroupbyOperatorInfo { SOptrBasicInfo binfo; int32_t colIndex; - char *prevData; // previous group by value + char* prevData; // previous group by value } SGroupbyOperatorInfo; typedef struct SSWindowOperatorInfo { @@ -503,16 +505,16 @@ typedef struct SSWindowOperatorInfo { TSKEY prevTs; // previous timestamp int32_t numOfRows; // number of rows int32_t start; // start row index - bool reptScan; // next round scan + bool reptScan; // next round scan } SSWindowOperatorInfo; typedef struct SStateWindowOperatorInfo { SOptrBasicInfo binfo; STimeWindow curWindow; // current time window int32_t numOfRows; // number of rows - int32_t colIndex; // start row index + int32_t colIndex; // start row index int32_t start; - char* prevData; // previous data + char* prevData; // previous data bool reptScan; } SStateWindowOperatorInfo; @@ -520,83 +522,106 @@ typedef struct SDistinctDataInfo { int32_t index; int32_t type; int32_t bytes; -} SDistinctDataInfo; +} SDistinctDataInfo; typedef struct SDistinctOperatorInfo { - SHashObj *pSet; - SSDataBlock *pRes; - bool recordNullVal; //has already record the null value, no need to try again - int64_t threshold; - int64_t outputCapacity; - int32_t totalBytes; - char* buf; - SArray* pDistinctDataInfo; + SHashObj* pSet; + SSDataBlock* pRes; + bool recordNullVal; // has already record the null value, no need to try again + int64_t threshold; + int64_t outputCapacity; + int32_t totalBytes; + char* buf; + SArray* pDistinctDataInfo; } SDistinctOperatorInfo; struct SGlobalMerger; typedef struct SMultiwayMergeInfo { - struct SGlobalMerger *pMerge; - SOptrBasicInfo binfo; - int32_t bufCapacity; - int64_t seed; - char **prevRow; - SArray *orderColumnList; - int32_t resultRowFactor; + struct SGlobalMerger* pMerge; + SOptrBasicInfo binfo; + int32_t bufCapacity; + int64_t seed; + char** prevRow; + SArray* orderColumnList; + int32_t resultRowFactor; - bool hasGroupColData; - char **currentGroupColData; - SArray *groupColumnList; - bool hasDataBlockForNewGroup; - SSDataBlock *pExistBlock; + bool hasGroupColData; + char** currentGroupColData; + SArray* groupColumnList; + bool hasDataBlockForNewGroup; + SSDataBlock* pExistBlock; - SArray *udfInfo; - bool hasPrev; - bool multiGroupResults; + SArray* udfInfo; + bool hasPrev; + bool multiGroupResults; } SMultiwayMergeInfo; -// todo support the disk-based sort typedef struct SOrderOperatorInfo { - uint32_t sortBufSize; // max buffer size for in-memory sort - SArray *orderInfo; // SArray - SSDataBlock *pDataBlock; - bool nullFirst; // null value is put in the front + int32_t sourceId; + uint32_t sortBufSize; // max buffer size for in-memory sort + SArray* orderInfo; // SArray + SSDataBlock* pDataBlock; + bool nullFirst; // null value is put in the front + bool hasVarCol; // has variable length column, such as binary/varchar/nchar + SDiskbasedBuf* pSortInternalBuf; + int32_t numOfSources; + int32_t numOfCompleted; + SLoserTreeInfo *pMergeTree; + SArray *pSources; // SArray } SOrderOperatorInfo; SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray* pSchema, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlocksOptScanInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, + int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createProjectOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, + int32_t numOfOutput); SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); -SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); -SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, + int32_t numOfOutput); +SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, + SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, + int32_t numOfOutput); +SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, + int32_t numOfOutput, bool multigroupResult); +SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, + int32_t numOfOutput); +SOperatorInfo* createMultiTableAggOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, + SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, + SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, + SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTagScanOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); +SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, + int32_t numOfOutput); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows, void* merger); -SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); -SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult); +SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, + SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, + bool groupResultMixedUp); +SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, + int32_t numOfOutput); +SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, + int32_t numOfOutput, void* merger, bool multigroupResult); SOperatorInfo* createFilterOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SColumnInfo* pCols, int32_t numOfFilter); -SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, int32_t numOfOutput); -SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SOrder* pOrderVal); -SOperatorInfo* createMergeSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, SOrder* pOrderVal); +SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema, + int32_t numOfOutput); +SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal); +SOperatorInfo* createMergeSortOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, + SOrder* pOrderVal); -//SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); -//SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); -//SSDataBlock* doSLimit(void* param, bool* newgroup); +// SSDataBlock* doGlobalAggregate(void* param, bool* newgroup); +// SSDataBlock* doMultiwayMergeSort(void* param, bool* newgroup); +// SSDataBlock* doSLimit(void* param, bool* newgroup); -//int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); +// int32_t doCreateFilterInfo(SColumnInfo* pCols, int32_t numOfCols, int32_t numOfFilterCols, SSingleColumnFilterInfo** pFilterInfo, uint64_t qId); void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock); bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p); void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p); @@ -606,58 +631,64 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); -void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset); -void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity, int32_t numOfInputRows); -void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t *bufCapacity); +void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, + int32_t* rowCellInfoOffset); +void updateOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity, int32_t numOfInputRows); +void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); -int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, - SSqlExpr** pExprMsg, SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo); +int32_t createQueryFunc(SQueriedTableInfo* pTableInfo, int32_t numOfOutput, SExprInfo** pExprInfo, SSqlExpr** pExprMsg, + SColumnInfo* pTagCols, int32_t queryType, void* pMsg, struct SUdfInfo* pUdfInfo); -int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq *pQueryMsg, int32_t numOfOutput, SExprInfo **pExprInfo, - SSqlExpr **pExpr, SExprInfo *prevExpr, struct SUdfInfo *pUdfInfo); +int32_t createIndirectQueryFuncExprFromMsg(SQueryTableReq* pQueryMsg, int32_t numOfOutput, SExprInfo** pExprInfo, + SSqlExpr** pExpr, SExprInfo* prevExpr, struct SUdfInfo* pUdfInfo); -int32_t createQueryFilter(char *data, uint16_t len, SFilterInfo** pFilters); +int32_t createQueryFilter(char* data, uint16_t len, SFilterInfo** pFilters); -SGroupbyExpr *createGroupbyExprFromMsg(SQueryTableReq *pQueryMsg, SColIndex *pColIndex, int32_t *code); +SGroupbyExpr* createGroupbyExprFromMsg(SQueryTableReq* pQueryMsg, SColIndex* pColIndex, int32_t* code); int32_t initQInfo(STsBufInfo* pTsBufInfo, void* tsdb, void* sourceOptr, SQInfo* pQInfo, STaskParam* param, char* start, int32_t prevResultLen, void* merger); int32_t createFilterInfo(STaskAttr* pQueryAttr, uint64_t qId); -void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); +void freeColumnFilterInfo(SColumnFilterInfo* pFilter, int32_t numOfFilters); -STableQueryInfo *createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, void* buf); +STableQueryInfo* createTableQueryInfo(STaskAttr* pQueryAttr, void* pTable, bool groupbyColumn, STimeWindow win, + void* buf); STableQueryInfo* createTmpTableQueryInfo(STimeWindow win); -int32_t buildArithmeticExprFromMsg(SExprInfo *pArithExprInfo, void *pQueryMsg); +int32_t buildArithmeticExprFromMsg(SExprInfo* pArithExprInfo, void* pQueryMsg); -bool isTaskKilled(SExecTaskInfo *pTaskInfo); +bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); -bool checkNeedToCompressQueryCol(SQInfo *pQInfo); -void setQueryStatus(STaskRuntimeEnv *pRuntimeEnv, int8_t status); +bool checkNeedToCompressQueryCol(SQInfo* pQInfo); +void setQueryStatus(STaskRuntimeEnv* pRuntimeEnv, int8_t status); bool onlyQueryTags(STaskAttr* pQueryAttr); -//void destroyUdfInfo(struct SUdfInfo* pUdfInfo); +// void destroyUdfInfo(struct SUdfInfo* pUdfInfo); -int32_t doDumpQueryResult(SQInfo *pQInfo, char *data, int8_t compressed, int32_t *compLen); +int32_t doDumpQueryResult(SQInfo* pQInfo, char* data, int8_t compressed, int32_t* compLen); -size_t getResultSize(SQInfo *pQInfo, int64_t *numOfRows); -void setTaskKilled(SExecTaskInfo *pTaskInfo); +size_t getResultSize(SQInfo* pQInfo, int64_t* numOfRows); +void setTaskKilled(SExecTaskInfo* pTaskInfo); void publishOperatorProfEvent(SOperatorInfo* operatorInfo, EQueryProfEventType eventType); -void publishQueryAbortEvent(SExecTaskInfo * pTaskInfo, int32_t code); +void publishQueryAbortEvent(SExecTaskInfo* pTaskInfo, int32_t code); void calculateOperatorProfResults(SQInfo* pQInfo); -void queryCostStatis(SExecTaskInfo *pTaskInfo); +void queryCostStatis(SExecTaskInfo* pTaskInfo); -void doDestroyTask(SExecTaskInfo *pTaskInfo); -void freeQueryAttr(STaskAttr *pQuery); +void doDestroyTask(SExecTaskInfo* pTaskInfo); +void freeQueryAttr(STaskAttr* pQuery); int32_t getMaximumIdleDurationSec(); -void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx *pCtx, int32_t idx, int32_t type); -void setTaskStatus(SExecTaskInfo *pTaskInfo, int8_t status); +void doInvokeUdf(struct SUdfInfo* pUdfInfo, SqlFunctionCtx* pCtx, int32_t idx, int32_t type); +void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId); +#ifdef __cplusplus +} +#endif + #endif // TDENGINE_EXECUTORIMPL_H diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 78093ce080..a6abe0662f 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -141,7 +141,7 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow, int16_ return; } - // the result does not put into the SDiskbasedResultBuf, ignore it. + // the result does not put into the SDiskbasedBuf, ignore it. if (pResultRow->pageId >= 0) { SFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pResultRow->pageId); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f0d16f3733..e59671b7cc 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -693,7 +693,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, i } // a new buffer page for each table. Needs to opt this design -static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf *pResultBuf, int32_t tid, uint32_t size) { +static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pResultBuf, int32_t tid, uint32_t size) { if (pWindowRes->pageId != -1) { return 0; } @@ -708,10 +708,10 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedResultBuf pData = getNewDataBuf(pResultBuf, tid, &pageId); } else { SPageInfo* pi = getLastPageInfo(list); - pData = getResBufPage(pResultBuf, pi->pageId); - pageId = pi->pageId; + pData = getResBufPage(pResultBuf, getPageId(pi)); + pageId = getPageId(pi); - if (pData->num + size > pResultBuf->pageSize) { + if (pData->num + size > getBufPageSize(pResultBuf)) { // release current page first, and prepare the next one releaseResBufPageInfo(pResultBuf, pi); pData = getNewDataBuf(pResultBuf, tid, &pageId); @@ -748,7 +748,7 @@ static int32_t setResultOutputBufByKey(STaskRuntimeEnv *pRuntimeEnv, SResultRowI bool masterscan, SResultRow **pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { assert(win->skey <= win->ekey); - SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; + SDiskbasedBuf *pResultBuf = pRuntimeEnv->pResultBuf; SResultRow *pResultRow = doSetResultOutBufByKey(pRuntimeEnv, pResultRowInfo, tid, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId); if (pResultRow == NULL) { @@ -1755,7 +1755,7 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) { } static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binfo, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex) { - SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf; + SDiskbasedBuf *pResultBuf = pRuntimeEnv->pResultBuf; int32_t *rowCellInfoOffset = binfo->rowCellInfoOffset; SResultRowInfo *pResultRowInfo = &binfo->resultRowInfo; @@ -5535,6 +5535,195 @@ SOperatorInfo *createMultiwaySortOperatorInfo(STaskRuntimeEnv *pRuntimeEnv, SExp return pOperator; } +typedef struct SExternalMemSource { + SArray* pageIdList; + int32_t pageIndex; + int32_t sourceId; + int32_t rowIndex; + SSDataBlock *pBlock; +} SExternalMemSource; + +typedef struct SCompareParam { + SExternalMemSource **pSources; + int32_t num; + SArray *orderInfo; // SArray + bool nullFirst; +} SCompareParam; + +int32_t doMergeSortCompar(const void *pLeft, const void *pRight, void *param) { + int32_t pLeftIdx = *(int32_t *)pLeft; + int32_t pRightIdx = *(int32_t *)pRight; + + SCompareParam *pParam = (SCompareParam *)param; + SExternalMemSource **pSources = pParam->pSources; + + SArray *pInfo = pParam->orderInfo; + + // this input is exhausted, set the special value to denote this + if (pSources[pLeftIdx]->rowIndex == -1) { + return 1; + } + + if (pSources[pRightIdx]->rowIndex == -1) { + return -1; + } + + SSDataBlock* pLeftBlock = pSources[pLeftIdx]->pBlock; + SSDataBlock* pRightBlock = pSources[pRightIdx]->pBlock; + + size_t num = taosArrayGetSize(pInfo); + for(int32_t i = 0; i < num; ++i) { + SBlockOrderInfo* pOrder = taosArrayGet(pInfo, i); + + SColumnInfoData* pLeftColInfoData = taosArrayGet(pLeftBlock->pDataBlock, pOrder->colIndex); + bool leftNull = colDataIsNull(pLeftColInfoData, pLeftBlock->info.rows, pSources[pLeftIdx]->rowIndex, pLeftBlock->pBlockAgg); + + SColumnInfoData* pRightColInfoData = taosArrayGet(pRightBlock->pDataBlock, pOrder->colIndex); + bool rightNull = colDataIsNull(pRightColInfoData, pRightBlock->info.rows, pSources[pRightIdx]->rowIndex, pRightBlock->pBlockAgg); + + if (leftNull && rightNull) { + continue; // continue to next slot + } + + if (rightNull) { + return pParam->nullFirst? 1:-1; + } + + if (leftNull) { + return pParam->nullFirst? -1:1; + } + + void* left1 = colDataGet(pLeftColInfoData, pSources[pLeftIdx]->rowIndex); + void* right1 = colDataGet(pRightColInfoData, pSources[pRightIdx]->rowIndex); + + switch(pLeftColInfoData->info.type) { + case TSDB_DATA_TYPE_INT: + if (*(int32_t*) left1 == *(int32_t*) right1) { + break; + } else { + if (pOrder->order == TSDB_ORDER_ASC) { + return *(int32_t*) left1 <= *(int32_t*) right1? -1:1; + } else { + return *(int32_t*) left1 <= *(int32_t*) right1? 1:-1; + } + } + default: + assert(0); + } + } +} + +int32_t loadNewDataBlock(SExternalMemSource *pSource, SOrderOperatorInfo* pInfo) { + pSource->rowIndex = 0; + pSource->pageIndex += 1; + + if (pSource->pageIndex < taosArrayGetSize(pSource->pageIdList)) { + struct SPageInfo* pPgInfo = *(struct SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); + + SFilePage* pPage = getResBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); + return blockDataFromBuf(pSource->pBlock, pPage->data); + } else { + pInfo->numOfCompleted += 1; + pSource->rowIndex = -1; + pSource->pageIndex = -1; + + return 0; + } +} + +void adjustLoserTreeFromNewData(SExternalMemSource *pSource, SLoserTreeInfo *pTree, SOrderOperatorInfo* pInfo) { + /* + * load a new SDataBlock into memory of a given intermediate data-set source, + * since it's last record in buffer has been chosen to be processed, as the winner of loser-tree + */ + if (pSource->rowIndex >= pSource->pBlock->info.rows) { + // TODO check if has remain pages. + loadNewDataBlock(pSource, pInfo); + } + + /* + * Adjust loser tree otherwise, according to new candidate data + * if the loser tree is rebuild completed, we do not need to adjust + */ + int32_t leafNodeIdx = pTree->pNode[0].index + pInfo->numOfSources; + +#ifdef _DEBUG_VIEW + printf("before adjust:\t"); + tLoserTreeDisplay(pTree); +#endif + + tLoserTreeAdjust(pTree, leafNodeIdx); + +#ifdef _DEBUG_VIEW + printf("\nafter adjust:\t"); + tLoserTreeDisplay(pTree); + printf("\n"); +#endif +} + +static void appendOneRowToDataBlock(SSDataBlock *pBlock, const SSDataBlock* pSource, int32_t* rowIndex) { + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + + SColumnInfoData* pSrcColInfo = taosArrayGet(pSource->pDataBlock, i); + bool isNull = colDataIsNull(pSrcColInfo, pSource->info.rows, *rowIndex, NULL); + char* pData = colDataGet(pSrcColInfo, *rowIndex); + + colDataAppend(pColInfo, pBlock->info.rows, pData, isNull); + } + + pBlock->info.rows += 1; + *rowIndex += 1; +} + +void addToDiskBasedBuf(SOrderOperatorInfo* pInfo) { + int32_t start = 0; + + while(start < pInfo->pDataBlock->info.rows) { + int32_t stop = 0; + blockDataSplitRows(pInfo->pDataBlock, pInfo->hasVarCol, start, &stop, getBufPageSize(pInfo->pSortInternalBuf)); + SSDataBlock* p = blockDataExtractBlock(pInfo->pDataBlock, start, stop - start + 1); + + int32_t pageId = -1; + SFilePage* pPage = getNewDataBuf(pInfo->pSortInternalBuf, pInfo->sourceId, &pageId); + blockDataToBuf(pPage->data, p); + + start = stop + 1; + } + + int32_t numOfCols = pInfo->pDataBlock->info.numOfCols; + + pInfo->pDataBlock->info.rows = 0; + if (pInfo->hasVarCol) { + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* p = taosArrayGet(pInfo->pDataBlock->pDataBlock, i); + + if (IS_VAR_DATA_TYPE(p->info.type)) { + p->varmeta.length = 0; + } + } + } + + pInfo->sourceId += 1; + + // TODO extract method + SExternalMemSource* pSource = calloc(1, sizeof(SExternalMemSource)); + pSource->pageIdList = getDataBufPagesIdList(pInfo->pSortInternalBuf, pInfo->sourceId - 1); + pSource->sourceId = pInfo->sourceId - 1; + pSource->pBlock = calloc(1, sizeof(SSDataBlock)); + pSource->pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + pSource->pBlock->info.numOfCols = numOfCols; + + for(int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData colInfo = {0}; + SColumnInfoData* p = taosArrayGet(pInfo->pDataBlock->pDataBlock, i); + colInfo.info = p->info; + taosArrayPush(pSource->pBlock->pDataBlock, &colInfo); + } + + taosArrayPush(pInfo->pSources, &pSource); +} + static SSDataBlock* doSort(void* param, bool* newgroup) { SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { @@ -5542,8 +5731,8 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { } SOrderOperatorInfo* pInfo = pOperator->info; - SSDataBlock* pBlock = NULL; + while(1) { publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); pBlock = pOperator->pDownstream[0]->exec(pOperator->pDownstream[0], newgroup); @@ -5551,7 +5740,6 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { // start to flush data into disk and try do multiway merge sort if (pBlock == NULL) { - doSetOperatorCompleted(pOperator); break; } @@ -5563,39 +5751,106 @@ static SSDataBlock* doSort(void* param, bool* newgroup) { size_t size = blockDataGetSize(pInfo->pDataBlock); if (size > pInfo->sortBufSize) { // Perform the in-memory sort and then flush data in the buffer into disk. + int64_t p = taosGetTimestampUs(); blockDataSort(pInfo->pDataBlock, pInfo->orderInfo, pInfo->nullFirst); + printf("sort time:%ld\n", taosGetTimestampUs() - p); // flush to disk + addToDiskBasedBuf(pInfo); } } -// int32_t numOfCols = pInfo->pDataBlock->info.numOfCols; -// void** pCols = calloc(numOfCols, POINTER_BYTES); -// SSchema* pSchema = calloc(numOfCols, sizeof(SSchema)); -// -// for(int32_t i = 0; i < numOfCols; ++i) { -// SColumnInfoData* p1 = taosArrayGet(pInfo->pDataBlock->pDataBlock, i); -// pCols[i] = p1->pData; -// pSchema[i].colId = p1->info.colId; -// pSchema[i].bytes = p1->info.bytes; -// pSchema[i].type = (uint8_t) p1->info.type; -// } + if (pInfo->pDataBlock->info.rows > 0) { + pInfo->numOfSources += 1; -// __compar_fn_t comp = getKeyComparFunc(pSchema[pInfo->colIndex].type, pInfo->order); -// taoscQSort(pCols, pInfo->pDataBlock->info.rows, sizeof(int32_t), pInfo, comp); + // Perform the in-memory sort and then flush data in the buffer into disk. + blockDataSort(pInfo->pDataBlock, pInfo->orderInfo, pInfo->nullFirst); + + // All sorted data are resident in memory, external memory sort is not needed. + // Return to the upstream operator directly + if (isAllDataInMemBuf(pInfo->pSortInternalBuf)) { + pOperator->status = OP_RES_TO_RETURN; + return (pInfo->pDataBlock->info.rows == 0)? NULL:pInfo->pDataBlock; + } + + // flush to disk + addToDiskBasedBuf(pInfo); + } + + SCompareParam cmpParam = {0}; + cmpParam.nullFirst = pInfo->nullFirst; + cmpParam.orderInfo = pInfo->orderInfo; + cmpParam.num = pInfo->numOfSources; + cmpParam.pSources = pInfo->pSources->pData; + + pInfo->numOfSources = taosArrayGetSize(pInfo->pSources); + for(int32_t i = 0; i < pInfo->numOfSources; ++i) { + SExternalMemSource* pSource = cmpParam.pSources[i]; + SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); + + SFilePage* pPage = getResBufPage(pInfo->pSortInternalBuf, getPageId(pPgInfo)); + int32_t code = blockDataFromBuf(cmpParam.pSources[i]->pBlock, pPage->data); + } + + int32_t code = tLoserTreeCreate(&pInfo->pMergeTree, pInfo->numOfSources, &cmpParam, doMergeSortCompar); + + while(1) { + if (pInfo->numOfSources == pInfo->numOfCompleted) { + break; + } + + SExternalMemSource *pSource = cmpParam.pSources[pInfo->pMergeTree->pNode[0].index]; + appendOneRowToDataBlock(pInfo->pDataBlock, pSource->pBlock, &pSource->rowIndex); + adjustLoserTreeFromNewData(pSource, pInfo->pMergeTree, pInfo); + + if (pInfo->pDataBlock->info.rows >= 4096) { + return pInfo->pDataBlock; + } + } -// tfree(pCols); -// tfree(pSchema); return (pInfo->pDataBlock->info.rows > 0)? pInfo->pDataBlock:NULL; } -SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SOrder* pOrderVal) { +static SArray* createBlockOrder(SArray* pExprInfo, SArray* pOrderVal) { + SArray* pOrderInfo = taosArrayInit(1, sizeof(SBlockOrderInfo)); + + size_t numOfOrder = taosArrayGetSize(pOrderVal); + for (int32_t j = 0; j < numOfOrder; ++j) { + SBlockOrderInfo orderInfo = {0}; + SOrder* pOrder = taosArrayGet(pOrderVal, j); + orderInfo.order = pOrder->order; + + for (int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { + SExprInfo* pExpr = taosArrayGet(pExprInfo, i); + if (pExpr->base.resSchema.colId == pOrder->col.info.colId) { + orderInfo.colIndex = i; + break; + } + } + + taosArrayPush(pOrderInfo, &orderInfo); + } + + return pOrderInfo; +} + +SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SArray* pExprInfo, SArray* pOrderVal) { SOrderOperatorInfo* pInfo = calloc(1, sizeof(SOrderOperatorInfo)); pInfo->sortBufSize = 1024 * 1024; // 1MB pInfo->pDataBlock = createOutputBuf_rv(pExprInfo, 4096); - pInfo->orderInfo = taosArrayInit(1, sizeof(SOrder)); - taosArrayPush(pInfo->orderInfo, pOrderVal); // todo more than one order column + pInfo->orderInfo = createBlockOrder(pExprInfo, pOrderVal); + pInfo->pSources = taosArrayInit(4, POINTER_BYTES); + + for(int32_t i = 0; i < taosArrayGetSize(pExprInfo); ++i) { + SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); + if (IS_VAR_DATA_TYPE(pExpr->base.resSchema.type)) { + pInfo->hasVarCol = true; + break; + } + } + + int32_t code = createDiskbasedResultBuffer(&pInfo->pSortInternalBuf, 4096, 4096*1000, 1, "/tmp/"); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "Order"; diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index f0cabc6515..f777f6fe23 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -33,6 +33,83 @@ #include "stub.h" #include "executor.h" +namespace { + +typedef struct SDummyInputInfo { + int32_t max; + int32_t current; + int32_t startVal; +} SDummyInputInfo; + +SSDataBlock* getDummyBlock(void* param, bool* newgroup) { + SOperatorInfo* pOperator = static_cast(param); + SDummyInputInfo* pInfo = static_cast(pOperator->info); + if (pInfo->current >= pInfo->max) { + return NULL; + } + + SSDataBlock* pBlock = static_cast(calloc(1, sizeof(SSDataBlock))); + assert(pBlock != NULL); + + pBlock->pDataBlock = taosArrayInit(4, sizeof(SColumnInfoData)); + + int32_t numOfRows = 1000; + + SColumnInfoData colInfo = {0}; + colInfo.info.type = TSDB_DATA_TYPE_INT; + colInfo.info.bytes = sizeof(int32_t); + colInfo.info.colId = 1; + colInfo.pData = static_cast(calloc(numOfRows, sizeof(int32_t))); + colInfo.nullbitmap = static_cast(calloc(1, (numOfRows + 7) / 8)); + + taosArrayPush(pBlock->pDataBlock, &colInfo); + + SColumnInfoData colInfo1 = {0}; + colInfo1.info.type = TSDB_DATA_TYPE_BINARY; + colInfo1.info.bytes = 40; + colInfo1.info.colId = 2; + + colInfo1.varmeta.allocLen = 0;//numOfRows * sizeof(int32_t); + colInfo1.varmeta.length = 0; + colInfo1.varmeta.offset = static_cast(calloc(1, numOfRows * sizeof(int32_t))); + + taosArrayPush(pBlock->pDataBlock, &colInfo1); + + char buf[128] = {0}; + char b1[128] = {0}; + for(int32_t i = 0; i < numOfRows; ++i) { + SColumnInfoData* pColInfo = static_cast(taosArrayGet(pBlock->pDataBlock, 0)); + + int32_t v = (--pInfo->startVal); + colDataAppend(pColInfo, i, reinterpret_cast(&v), false); + + sprintf(buf, "this is %d row", i); + STR_TO_VARSTR(b1, buf); + + SColumnInfoData* pColInfo2 = static_cast(taosArrayGet(pBlock->pDataBlock, 1)); + colDataAppend(pColInfo2, i, b1, false); + } + + pBlock->info.rows = numOfRows; + pBlock->info.numOfCols = 2; + + pInfo->current += 1; + return pBlock; +} + +SOperatorInfo* createDummyOperator(int32_t numOfBlocks) { + SOperatorInfo* pOperator = static_cast(calloc(1, sizeof(SOperatorInfo))); + pOperator->name = "dummyInputOpertor4Test"; + pOperator->exec = getDummyBlock; + + SDummyInputInfo *pInfo = (SDummyInputInfo*) calloc(1, sizeof(SDummyInputInfo)); + pInfo->max = numOfBlocks; + pInfo->startVal = 100000; + + pOperator->info = pInfo; + return pOperator; +} +} int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); @@ -128,9 +205,66 @@ TEST(testCase, build_executor_tree_Test) { SExecTaskInfo* pTaskInfo = nullptr; DataSinkHandle sinkHandle = nullptr; - SReadHandle handle = {.reader = NULL, .meta = NULL}; + SReadHandle handle = {.reader = reinterpret_cast(0x1), .meta = reinterpret_cast(0x1)}; - int32_t code = qCreateExecTask(&handle, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); +// int32_t code = qCreateExecTask(&handle, 2, 1, NULL, (void**) &pTaskInfo, &sinkHandle); } +//TEST(testCase, inMem_sort_Test) { +// SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); +// SOrder o = {.order = TSDB_ORDER_ASC}; +// o.col.info.colId = 1; +// o.col.info.type = TSDB_DATA_TYPE_INT; +// taosArrayPush(pOrderVal, &o); +// +// SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo)); +// SExprInfo *exp = static_cast(calloc(1, sizeof(SExprInfo))); +// exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res"); +// taosArrayPush(pExprInfo, &exp); +// +// SExprInfo *exp1 = static_cast(calloc(1, sizeof(SExprInfo))); +// exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); +// taosArrayPush(pExprInfo, &exp1); +// +// SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(5), pExprInfo, pOrderVal); +// +// bool newgroup = false; +// SSDataBlock* pRes = pOperator->exec(pOperator, &newgroup); +// +// SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); +// SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); +// for(int32_t i = 0; i < pRes->info.rows; ++i) { +// char* p = colDataGet(pCol2, i); +// printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); +// } +//} + +TEST(testCase, external_sort_Test) { + SArray* pOrderVal = taosArrayInit(4, sizeof(SOrder)); + SOrder o = {.order = TSDB_ORDER_ASC}; + o.col.info.colId = 1; + o.col.info.type = TSDB_DATA_TYPE_INT; + taosArrayPush(pOrderVal, &o); + + SArray* pExprInfo = taosArrayInit(4, sizeof(SExprInfo)); + SExprInfo *exp = static_cast(calloc(1, sizeof(SExprInfo))); + exp->base.resSchema = createSchema(TSDB_DATA_TYPE_INT, sizeof(int32_t), 1, "res"); + taosArrayPush(pExprInfo, &exp); + + SExprInfo *exp1 = static_cast(calloc(1, sizeof(SExprInfo))); + exp1->base.resSchema = createSchema(TSDB_DATA_TYPE_BINARY, 40, 2, "res1"); + taosArrayPush(pExprInfo, &exp1); + + SOperatorInfo* pOperator = createOrderOperatorInfo(createDummyOperator(100), pExprInfo, pOrderVal); + + bool newgroup = false; + SSDataBlock* pRes = pOperator->exec(pOperator, &newgroup); + + SColumnInfoData* pCol1 = static_cast(taosArrayGet(pRes->pDataBlock, 0)); + SColumnInfoData* pCol2 = static_cast(taosArrayGet(pRes->pDataBlock, 1)); + for(int32_t i = 0; i < pRes->info.rows; ++i) { + char* p = colDataGet(pCol2, i); + printf("%d: %d, %s\n", i, ((int32_t*)pCol1->pData)[i], (char*)varDataVal(p)); + } +} #pragma GCC diagnostic pop \ No newline at end of file diff --git a/source/libs/function/inc/tpercentile.h b/source/libs/function/inc/tpercentile.h index 563a63f6a5..33fe5ccd20 100644 --- a/source/libs/function/inc/tpercentile.h +++ b/source/libs/function/inc/tpercentile.h @@ -63,7 +63,7 @@ typedef struct tMemBucket { __compar_fn_t comparFn; tMemBucketSlot * pSlots; - SDiskbasedResultBuf *pBuffer; + SDiskbasedBuf *pBuffer; __perc_hash_func_t hashFunc; } tMemBucket; diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 5d8876fee1..c90d8e209d 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -35,9 +35,9 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) int32_t offset = 0; for(int32_t i = 0; i < list->size; ++i) { - SPageInfo* pgInfo = *(SPageInfo**) taosArrayGet(list, i); + struct SPageInfo* pgInfo = *(struct SPageInfo**) taosArrayGet(list, i); - SFilePage* pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); + SFilePage* pg = getResBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); offset += (int32_t)(pg->num * pMemBucket->bytes); @@ -98,8 +98,8 @@ double findOnlyResult(tMemBucket *pMemBucket) { SIDList list = getDataBufPagesIdList(pMemBucket->pBuffer, groupId); assert(list->size == 1); - SPageInfo* pgInfo = (SPageInfo*) taosArrayGetP(list, 0); - SFilePage* pPage = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); + struct SPageInfo* pgInfo = (struct SPageInfo*) taosArrayGetP(list, 0); + SFilePage* pPage = getResBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); assert(pPage->num == 1); double v = 0; @@ -471,7 +471,7 @@ double getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction) for (int32_t f = 0; f < list->size; ++f) { SPageInfo *pgInfo = *(SPageInfo **)taosArrayGet(list, f); - SFilePage *pg = getResBufPage(pMemBucket->pBuffer, pgInfo->pageId); + SFilePage *pg = getResBufPage(pMemBucket->pBuffer, getPageId(pgInfo)); tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); releaseResBufPageInfo(pMemBucket->pBuffer, pgInfo); diff --git a/source/util/src/tlosertree.c b/source/util/src/tlosertree.c index 6155ba4c1a..5e1cf8bb1f 100644 --- a/source/util/src/tlosertree.c +++ b/source/util/src/tlosertree.c @@ -39,8 +39,8 @@ void tLoserTreeDisplay(SLoserTreeInfo* pTree) { printf("\n"); } -uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* param, __merge_compare_fn_t compareFn) { - int32_t totalEntries = numOfEntries << 1; +int32_t tLoserTreeCreate(SLoserTreeInfo** pTree, uint32_t numOfSources, void* param, __merge_compare_fn_t compareFn) { + int32_t totalEntries = numOfSources << 1u; *pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries); if ((*pTree) == NULL) { @@ -50,7 +50,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa (*pTree)->pNode = (SLoserTreeNode*)(((char*)(*pTree)) + sizeof(SLoserTreeInfo)); - (*pTree)->numOfEntries = numOfEntries; + (*pTree)->numOfEntries = numOfSources; (*pTree)->totalEntries = totalEntries; (*pTree)->param = param; (*pTree)->comparFn = compareFn; @@ -63,7 +63,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa tLoserTreeDisplay(*pTree); #endif - for (int32_t i = totalEntries - 1; i >= numOfEntries; i--) { + for (int32_t i = totalEntries - 1; i >= numOfSources; i--) { tLoserTreeAdjust(*pTree, i); } diff --git a/source/util/src/tpagedfile.c b/source/util/src/tpagedfile.c index 3373d09876..a8cb8563a4 100644 --- a/source/util/src/tpagedfile.c +++ b/source/util/src/tpagedfile.c @@ -7,10 +7,58 @@ #define GET_DATA_PAYLOAD(_p) ((char *)(_p)->pData + POINTER_BYTES) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) -int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { - *pResultBuf = calloc(1, sizeof(SDiskbasedResultBuf)); +typedef struct SFreeListItem { + int32_t offset; + int32_t len; +} SFreeListItem; - SDiskbasedResultBuf* pResBuf = *pResultBuf; +typedef struct SPageDiskInfo { + int32_t offset; + int32_t length; +} SPageDiskInfo; + +typedef struct SPageInfo { + SListNode* pn; // point to list node + void* pData; + int32_t pageId; + SPageDiskInfo info; + bool used; // set current page is in used +} SPageInfo; + +typedef struct SDiskbasedBufStatis { + int32_t flushBytes; + int32_t loadBytes; + int32_t getPages; + int32_t releasePages; + int32_t flushPages; +} SDiskbasedBufStatis; + +typedef struct SDiskbasedBuf { + int32_t numOfPages; + int64_t totalBufSize; + int64_t fileSize; // disk file size + FILE* file; + int32_t allocateId; // allocated page id + char* path; // file path + int32_t pageSize; // current used page size + int32_t inMemPages; // numOfPages that are allocated in memory + SHashObj* groupSet; // id hash table + SHashObj* all; + SList* lruList; + void* emptyDummyIdList; // dummy id list + void* assistBuf; // assistant buffer for compress/decompress data + SArray* pFree; // free area in file + bool comp; // compressed before flushed to disk + int32_t nextPos; // next page flush position + + uint64_t qId; // for debug purpose + SDiskbasedBufStatis statis; +} SDiskbasedBuf; + +int32_t createDiskbasedResultBuffer(SDiskbasedBuf** pResultBuf, int32_t pagesize, int32_t inMemBufSize, uint64_t qId, const char* dir) { + *pResultBuf = calloc(1, sizeof(SDiskbasedBuf)); + + SDiskbasedBuf* pResBuf = *pResultBuf; if (pResBuf == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -47,7 +95,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t pa return TSDB_CODE_SUCCESS; } -static int32_t createDiskFile(SDiskbasedResultBuf* pResultBuf) { +static int32_t createDiskFile(SDiskbasedBuf* pResultBuf) { pResultBuf->file = fopen(pResultBuf->path, "wb+"); if (pResultBuf->file == NULL) { // qError("failed to create tmp file: %s on disk. %s", pResultBuf->path, strerror(errno)); @@ -57,7 +105,7 @@ static int32_t createDiskFile(SDiskbasedResultBuf* pResultBuf) { return TSDB_CODE_SUCCESS; } -static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedResultBuf* pResultBuf) { // do nothing +static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedBuf* pResultBuf) { // do nothing if (!pResultBuf->comp) { *dst = srcSize; return data; @@ -69,7 +117,7 @@ static char* doCompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbase return data; } -static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedResultBuf* pResultBuf) { // do nothing +static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskbasedBuf* pResultBuf) { // do nothing if (!pResultBuf->comp) { *dst = srcSize; return data; @@ -82,7 +130,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t *dst, SDiskba return data; } -static int32_t allocatePositionInFile(SDiskbasedResultBuf* pResultBuf, size_t size) { +static int32_t allocatePositionInFile(SDiskbasedBuf* pResultBuf, size_t size) { if (pResultBuf->pFree == NULL) { return pResultBuf->nextPos; } else { @@ -105,7 +153,7 @@ static int32_t allocatePositionInFile(SDiskbasedResultBuf* pResultBuf, size_t si } } -static char* doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { +static char* doFlushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { assert(!pg->used && pg->pData != NULL); int32_t size = -1; @@ -163,7 +211,7 @@ static char* doFlushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { return ret; } -static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { +static char* flushPageToDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { int32_t ret = TSDB_CODE_SUCCESS; assert(((int64_t) pResultBuf->numOfPages * pResultBuf->pageSize) == pResultBuf->totalBufSize && pResultBuf->numOfPages >= pResultBuf->inMemPages); @@ -178,7 +226,7 @@ static char* flushPageToDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { } // load file block data in disk -static char* loadPageFromDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { +static char* loadPageFromDisk(SDiskbasedBuf* pResultBuf, SPageInfo* pg) { int32_t ret = fseek(pResultBuf->file, pg->info.offset, SEEK_SET); ret = (int32_t)fread(GET_DATA_PAYLOAD(pg), 1, pg->info.length, pResultBuf->file); if (ret != pg->info.length) { @@ -194,7 +242,7 @@ static char* loadPageFromDisk(SDiskbasedResultBuf* pResultBuf, SPageInfo* pg) { return (char*)GET_DATA_PAYLOAD(pg); } -static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { +static SIDList addNewGroup(SDiskbasedBuf* pResultBuf, int32_t groupId) { assert(taosHashGet(pResultBuf->groupSet, (const char*) &groupId, sizeof(int32_t)) == NULL); SArray* pa = taosArrayInit(1, POINTER_BYTES); @@ -204,7 +252,7 @@ static SIDList addNewGroup(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { return pa; } -static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t pageId) { +static SPageInfo* registerPage(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t pageId) { SIDList list = NULL; char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); @@ -227,7 +275,7 @@ static SPageInfo* registerPage(SDiskbasedResultBuf* pResultBuf, int32_t groupId, return *(SPageInfo**) taosArrayPush(list, &ppi); } -static SListNode* getEldestUnrefedPage(SDiskbasedResultBuf* pResultBuf) { +static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pResultBuf) { SListIter iter = {0}; tdListInitIter(pResultBuf->lruList, &iter, TD_LIST_BACKWARD); @@ -246,7 +294,7 @@ static SListNode* getEldestUnrefedPage(SDiskbasedResultBuf* pResultBuf) { return pn; } -static char* evicOneDataPage(SDiskbasedResultBuf* pResultBuf) { +static char* evicOneDataPage(SDiskbasedBuf* pResultBuf) { char* bufPage = NULL; SListNode* pn = getEldestUnrefedPage(pResultBuf); @@ -290,7 +338,7 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + 2 + sizeof(SFilePage); } -SFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32_t* pageId) { +SFilePage* getNewDataBuf(SDiskbasedBuf* pResultBuf, int32_t groupId, int32_t* pageId) { pResultBuf->statis.getPages += 1; char* availablePage = NULL; @@ -327,7 +375,7 @@ SFilePage* getNewDataBuf(SDiskbasedResultBuf* pResultBuf, int32_t groupId, int32 return (void *)(GET_DATA_PAYLOAD(pi)); } -SFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { +SFilePage* getResBufPage(SDiskbasedBuf* pResultBuf, int32_t id) { assert(pResultBuf != NULL && id >= 0); pResultBuf->statis.getPages += 1; @@ -373,7 +421,7 @@ SFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { } } -void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page) { +void releaseResBufPage(SDiskbasedBuf* pResultBuf, void* page) { assert(pResultBuf != NULL && page != NULL); char* p = (char*) page - POINTER_BYTES; @@ -381,18 +429,18 @@ void releaseResBufPage(SDiskbasedResultBuf* pResultBuf, void* page) { releaseResBufPageInfo(pResultBuf, ppi); } -void releaseResBufPageInfo(SDiskbasedResultBuf* pResultBuf, SPageInfo* pi) { +void releaseResBufPageInfo(SDiskbasedBuf* pResultBuf, SPageInfo* pi) { assert(pi->pData != NULL && pi->used); pi->used = false; pResultBuf->statis.releasePages += 1; } -size_t getNumOfResultBufGroupId(const SDiskbasedResultBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); } +size_t getNumOfResultBufGroupId(const SDiskbasedBuf* pResultBuf) { return taosHashGetSize(pResultBuf->groupSet); } -size_t getResBufSize(const SDiskbasedResultBuf* pResultBuf) { return (size_t)pResultBuf->totalBufSize; } +size_t getResBufSize(const SDiskbasedBuf* pResultBuf) { return (size_t)pResultBuf->totalBufSize; } -SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { +SIDList getDataBufPagesIdList(SDiskbasedBuf* pResultBuf, int32_t groupId) { assert(pResultBuf != NULL); char** p = taosHashGet(pResultBuf->groupSet, (const char*)&groupId, sizeof(int32_t)); @@ -403,7 +451,7 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) } } -void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) { +void destroyResultBuf(SDiskbasedBuf* pResultBuf) { if (pResultBuf == NULL) { return; } @@ -444,8 +492,23 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf) { tfree(pResultBuf); } -SPageInfo* getLastPageInfo(SIDList pList) { +struct SPageInfo* getLastPageInfo(SIDList pList) { size_t size = taosArrayGetSize(pList); - return (SPageInfo*) taosArrayGetP(pList, size - 1); + SPageInfo* pPgInfo = taosArrayGetP(pList, size - 1); + return pPgInfo; } +int32_t getPageId(const struct SPageInfo* pPgInfo) { + ASSERT(pPgInfo != NULL); + return pPgInfo->pageId; +} + +int32_t getBufPageSize(const SDiskbasedBuf* pResultBuf) { + return pResultBuf->pageSize; +} + +bool isAllDataInMemBuf(const SDiskbasedBuf* pResultBuf) { + return pResultBuf->fileSize == 0; +} + +