diff --git a/include/common/tcommon.h b/include/common/tcommon.h index b5bd088006..579b1bc46d 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -54,10 +54,11 @@ typedef struct SColumnDataAgg { } SColumnDataAgg; typedef struct SDataBlockInfo { - STimeWindow window; - int32_t rows; - int32_t rowSize; - int32_t numOfCols; + STimeWindow window; + int32_t rows; + int32_t rowSize; + int16_t numOfCols; + int16_t hasVarCol; union {int64_t uid; int64_t blockId;}; } SDataBlockInfo; @@ -96,13 +97,15 @@ typedef struct SColumnInfoData { static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) { int64_t tbUid = pBlock->info.uid; - int32_t numOfCols = pBlock->info.numOfCols; + int16_t numOfCols = pBlock->info.numOfCols; + int16_t hasVarCol = pBlock->info.hasVarCol; int32_t rows = pBlock->info.rows; int32_t sz = taosArrayGetSize(pBlock->pDataBlock); int32_t tlen = 0; tlen += taosEncodeFixedI64(buf, tbUid); - tlen += taosEncodeFixedI32(buf, numOfCols); + tlen += taosEncodeFixedI16(buf, numOfCols); + tlen += taosEncodeFixedI16(buf, hasVarCol); tlen += taosEncodeFixedI32(buf, rows); tlen += taosEncodeFixedI32(buf, sz); for (int32_t i = 0; i < sz; i++) { @@ -120,7 +123,8 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) int32_t sz; buf = taosDecodeFixedI64(buf, &pBlock->info.uid); - buf = taosDecodeFixedI32(buf, &pBlock->info.numOfCols); + buf = taosDecodeFixedI16(buf, &pBlock->info.numOfCols); + buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol); buf = taosDecodeFixedI32(buf, &pBlock->info.rows); buf = taosDecodeFixedI32(buf, &sz); pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData)); diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index c2249f408a..7e60013aa1 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -117,7 +117,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); -void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol); +void blockDataClearup(SSDataBlock* pDataBlock); SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); void* blockDataDestroy(SSDataBlock* pBlock); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 4070224ab8..7e4f1d9025 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1059,10 +1059,10 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF // destroyTupleIndex(index); } -void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol) { +void blockDataClearup(SSDataBlock* pDataBlock) { pDataBlock->info.rows = 0; - if (hasVarCol) { + if (pDataBlock->info.hasVarCol) { for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); @@ -1148,7 +1148,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock)); pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData)); + pBlock->info.numOfCols = numOfCols; + pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; for(int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index b34067ba4e..bcbfeb7015 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -50,7 +50,7 @@ typedef struct SGroupResInfo { int32_t totalGroup; int32_t currentGroup; int32_t index; - SArray* pRows; // SArray + SArray* pRows; // SArray bool ordered; int32_t position; } SGroupResInfo; @@ -67,10 +67,15 @@ typedef struct SResultRow { char *key; // start key of current result row } SResultRow; +typedef struct SResultRowPosition { + int32_t pageId; + int32_t offset; +} SResultRowPosition; + typedef struct SResultRowInfo { - SList* pRows; - SResultRow** pResult; // result list -// int16_t type:8; // data type for hash key + SList *pRows; + SResultRowPosition *pPosition; + SResultRow **pResult; // result list int32_t size; // number of result set int32_t capacity; // max capacity int32_t curPos; // current active result row index of pResult list @@ -131,7 +136,7 @@ static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffs assert(rowOffset >= 0); int32_t numOfRows = 1;//(int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery); - return ((char *)page->data) + rowOffset + offset * numOfRows; + return (char*) page + rowOffset + offset * numOfRows; } //bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type); @@ -139,12 +144,7 @@ static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffs __filter_func_t getFilterOperator(int32_t lowerOptr, int32_t upperOptr); -SResultRowPool* initResultRowPool(size_t size); SResultRow* getNewResultRow(SResultRowPool* p); -int64_t getResultRowPoolMemSize(SResultRowPool* p); -void* destroyResultRowPool(SResultRowPool* p); -int32_t getNumOfAllocatedResultRows(SResultRowPool* p); -int32_t getNumOfUsedResultRows(SResultRowPool* p); typedef struct { SArray* pResult; // SArray diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 351903dfdc..97a51a46ce 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -463,7 +463,7 @@ typedef struct SAggSupporter { SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not SArray* pResultRowArrayList; // The array list that contains the Result rows char* keyBuf; // window key buffer - SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. +// SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object. int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row } SAggSupporter; @@ -678,8 +678,8 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols); void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order); -void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, - int32_t* rowCellInfoOffset); +void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput); + void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 9d77e23d38..a04a10ef95 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -59,7 +59,8 @@ int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) { pResultRowInfo->capacity = size; pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES); - if (pResultRowInfo->pResult == NULL) { + pResultRowInfo->pPosition = calloc(pResultRowInfo->capacity, sizeof(SResultRowPosition)); + if (pResultRowInfo->pResult == NULL || pResultRowInfo->pPosition == NULL) { return TSDB_CODE_QRY_OUT_OF_MEMORY; } @@ -182,22 +183,6 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) { return rowSize; } -SResultRowPool* initResultRowPool(size_t size) { - SResultRowPool* p = calloc(1, sizeof(SResultRowPool)); - if (p == NULL) { - return NULL; - } - - p->numOfElemPerBlock = 128; - - p->elemSize = (int32_t) size; - p->blockSize = p->numOfElemPerBlock * p->elemSize; - p->position.pos = 0; - - p->pData = taosArrayInit(8, POINTER_BYTES); - return p; -} - SResultRow* getNewResultRow(SResultRowPool* p) { if (p == NULL) { return NULL; @@ -221,132 +206,6 @@ SResultRow* getNewResultRow(SResultRowPool* p) { return ptr; } -int64_t getResultRowPoolMemSize(SResultRowPool* p) { - if (p == NULL) { - return 0; - } - - return taosArrayGetSize(p->pData) * p->blockSize; -} - -int32_t getNumOfAllocatedResultRows(SResultRowPool* p) { - return (int32_t) taosArrayGetSize(p->pData) * p->numOfElemPerBlock; -} - -int32_t getNumOfUsedResultRows(SResultRowPool* p) { - return getNumOfAllocatedResultRows(p) - p->numOfElemPerBlock + p->position.pos; -} - -void* destroyResultRowPool(SResultRowPool* p) { - if (p == NULL) { - return NULL; - } - - size_t size = taosArrayGetSize(p->pData); - for(int32_t i = 0; i < size; ++i) { - void** ptr = taosArrayGet(p->pData, i); - tfree(*ptr); - } - - taosArrayDestroy(p->pData); - - tfree(p); - return NULL; -} - -void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen) { - uint32_t numOfGroup = (uint32_t) taosArrayGetSize(pRes); - tbufWriteUint32(bw, numOfGroup); - tbufWriteUint16(bw, tagLen); - - for(int32_t i = 0; i < numOfGroup; ++i) { - SInterResult* pOne = taosArrayGet(pRes, i); - if (tagLen > 0) { - tbufWriteBinary(bw, pOne->tags, tagLen); - } - - uint32_t numOfCols = (uint32_t) taosArrayGetSize(pOne->pResult); - tbufWriteUint32(bw, numOfCols); - for(int32_t j = 0; j < numOfCols; ++j) { - SStddevInterResult* p = taosArrayGet(pOne->pResult, j); - uint32_t numOfRows = (uint32_t) taosArrayGetSize(p->pResult); - - tbufWriteUint16(bw, p->colId); - tbufWriteUint32(bw, numOfRows); - - for(int32_t k = 0; k < numOfRows; ++k) { -// SResPair v = *(SResPair*) taosArrayGet(p->pResult, k); -// tbufWriteDouble(bw, v.avg); -// tbufWriteInt64(bw, v.key); - } - } - } -} - -SArray* interResFromBinary(const char* data, int32_t len) { - SBufferReader br = tbufInitReader(data, len, false); - uint32_t numOfGroup = tbufReadUint32(&br); - uint16_t tagLen = tbufReadUint16(&br); - - char* tag = NULL; - if (tagLen > 0) { - tag = calloc(1, tagLen); - } - - SArray* pResult = taosArrayInit(4, sizeof(SInterResult)); - - for(int32_t i = 0; i < numOfGroup; ++i) { - if (tagLen > 0) { - memset(tag, 0, tagLen); - tbufReadToBinary(&br, tag, tagLen); - } - - uint32_t numOfCols = tbufReadUint32(&br); - - SArray* p = taosArrayInit(numOfCols, sizeof(SStddevInterResult)); - for(int32_t j = 0; j < numOfCols; ++j) { -// int16_t colId = tbufReadUint16(&br); - int32_t numOfRows = tbufReadUint32(&br); - -// SStddevInterResult interRes = {.colId = colId, .pResult = taosArrayInit(4, sizeof(struct SResPair)),}; - for(int32_t k = 0; k < numOfRows; ++k) { -// SResPair px = {0}; -// px.avg = tbufReadDouble(&br); -// px.key = tbufReadInt64(&br); -// -// taosArrayPush(interRes.pResult, &px); - } - -// taosArrayPush(p, &interRes); - } - - char* p1 = NULL; - if (tagLen > 0) { - p1 = malloc(tagLen); - memcpy(p1, tag, tagLen); - } - - SInterResult d = {.pResult = p, .tags = p1,}; - taosArrayPush(pResult, &d); - } - - tfree(tag); - return pResult; -} - -void freeInterResult(void* param) { - SInterResult* pResult = (SInterResult*) param; - tfree(pResult->tags); - - int32_t numOfCols = (int32_t) taosArrayGetSize(pResult->pResult); - for(int32_t i = 0; i < numOfCols; ++i) { - SStddevInterResult *p = taosArrayGet(pResult->pResult, i); - taosArrayDestroy(p->pResult); - } - - taosArrayDestroy(pResult->pResult); -} - void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) { assert(pGroupResInfo != NULL); @@ -360,7 +219,7 @@ void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) taosArrayDestroy(pGroupResInfo->pRows); } - pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES); + pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pPosition, pResultInfo->size, sizeof(SResultRowPosition)); pGroupResInfo->index = 0; assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo)); } diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9d3a80d7b4..ef2dbea4fd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -26,15 +26,16 @@ #include "tsort.h" #include "ttime.h" +#include "../../function/inc/taggfunction.h" #include "executorimpl.h" #include "function.h" +#include "query.h" #include "tcompare.h" #include "tcompression.h" #include "thash.h" -#include "ttypes.h" -#include "query.h" -#include "vnode.h" #include "tsdb.h" +#include "ttypes.h" +#include "vnode.h" #define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN) #define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN) @@ -235,7 +236,7 @@ static int32_t operatorDummyOpenFn(SOperatorInfo *pOperator) { static void operatorDummyCloseFn(void* param, int32_t numOfCols) {} -static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity); +static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset); static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock); static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex); @@ -444,10 +445,12 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env) longjmp(env, TSDB_CODE_QRY_OUT_OF_MEMORY); } + pResultRowInfo->pPosition = realloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition)); pResultRowInfo->pResult = (SResultRow **)t; int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity; memset(&pResultRowInfo->pResult[pResultRowInfo->capacity], 0, POINTER_BYTES * inc); + memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition)); pResultRowInfo->capacity = (int32_t)newCapacity; } @@ -564,7 +567,47 @@ static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultR return pResultRowInfo->pResult[pResultRowInfo->curPos]; } -static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_t tid, char* pData, int16_t bytes, +SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) { + SFilePage *pData = NULL; + + // in the first scan, new space needed for results + int32_t pageId = -1; + SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId); + + if (taosArrayGetSize(list) == 0) { + pData = getNewBufPage(pResultBuf, tableGroupId, &pageId); + pData->num = sizeof(SFilePage); + } else { + SPageInfo* pi = getLastPageInfo(list); + pData = getBufPage(pResultBuf, getPageId(pi)); + pageId = getPageId(pi); + + if (pData->num + interBufSize + sizeof(SResultRow) > getBufPageSize(pResultBuf)) { + // release current page first, and prepare the next one + releaseBufPageInfo(pResultBuf, pi); + + pData = getNewBufPage(pResultBuf, tableGroupId, &pageId); + if (pData != NULL) { + pData->num = sizeof(SFilePage); + } + } + } + + if (pData == NULL) { + return NULL; + } + + // set the number of rows in current disk page + SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num); + pResultRow->pageId = pageId; + pResultRow->offset = (int32_t)pData->num; + + pData->num += interBufSize + sizeof(SResultRow); + + return pResultRow; +} + +static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t tid, char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) { bool existed = false; SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, tableGroupId); @@ -608,7 +651,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int SResultRow *pResult = NULL; if (p1 == NULL) { - pResult = getNewResultRow(pSup->pool); + pResult = getNewResultRow_rv(pResultBuf, tableGroupId, pSup->resultRowSize); int32_t ret = initResultRow(pResult); if (ret != TSDB_CODE_SUCCESS) { longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -623,6 +666,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int } pResultRowInfo->curPos = pResultRowInfo->size; + pResultRowInfo->pPosition[pResultRowInfo->size] = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset}; pResultRowInfo->pResult[pResultRowInfo->size++] = pResult; int64_t index = pResultRowInfo->curPos; @@ -742,6 +786,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes if (taosArrayGetSize(list) == 0) { pData = getNewBufPage(pResultBuf, tid, &pageId); + pData->num = sizeof(SFilePage); } else { SPageInfo* pi = getLastPageInfo(list); pData = getBufPage(pResultBuf, getPageId(pi)); @@ -750,9 +795,10 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes if (pData->num + size > getBufPageSize(pResultBuf)) { // release current page first, and prepare the next one releaseBufPageInfo(pResultBuf, pi); + pData = getNewBufPage(pResultBuf, tid, &pageId); if (pData != NULL) { - assert(pData->num == 0); // number of elements must be 0 for new allocated buffer + pData->num = sizeof(SFilePage); } } } @@ -814,7 +860,7 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo *pResultRowInfo, int64_ bool masterscan, SResultRow **pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset, SDiskbasedBuf *pBuf, SAggSupporter *pAggSup, SExecTaskInfo* pTaskInfo) { assert(win->skey <= win->ekey); - SResultRow *pResultRow = doSetResultOutBufByKey_rv(pResultRowInfo, id, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId, + SResultRow *pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, id, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId, pTaskInfo, true, pAggSup); if (pResultRow == NULL) { @@ -822,19 +868,10 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo *pResultRowInfo, int64_ return TSDB_CODE_SUCCESS; } - // not assign result buffer yet, add new result buffer - if (pResultRow->pageId == -1) { // todo intermediate result size - int32_t ret = addNewWindowResultBuf(pResultRow, pBuf, (int32_t) tableGroupId, 0); - if (ret != TSDB_CODE_SUCCESS) { - return -1; - } - } - // set time window for current result pResultRow->win = (*win); *pResult = pResultRow; setResultRowOutputBufInitCtx_rv(pBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset); - return TSDB_CODE_SUCCESS; } @@ -988,17 +1025,18 @@ static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo *pDataBlockInfo, TSKEY *p } static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol, - int32_t numOfTotal, int32_t numOfOutput, int32_t order) { + int32_t numOfTotal, int32_t numOfOutput, int32_t order) { for (int32_t k = 0; k < numOfOutput; ++k) { - pCtx[k].size = forwardStep; pCtx[k].startTs = pWin->skey; // keep it temporarialy - int32_t startOffset = pCtx[k].startRow; - bool hasAgg = pCtx[k].isAggSet; + int32_t startOffset = pCtx[k].input.startRowIndex; + bool hasAgg = pCtx[k].input.colDataAggIsSet; + int32_t numOfRows = pCtx[k].input.numOfRows; int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1); - pCtx[k].startRow = pos; + pCtx[k].input.startRowIndex = pos; + pCtx[k].input.numOfRows = forwardStep; if (tsCol != NULL) { pCtx[k].ptsList = &tsCol[pos]; @@ -1011,12 +1049,13 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t of } if (functionNeedToExecute(&pCtx[k])) { -// pCtx[k].fpSet.process(&pCtx[k]); + pCtx[k].fpSet.process(&pCtx[k]); } // restore it - pCtx[k].isAggSet = hasAgg; - pCtx[k].startRow = startOffset; + pCtx[k].input.colDataAggIsSet = hasAgg; + pCtx[k].input.startRowIndex = startOffset; + pCtx[k].input.numOfRows = numOfRows; } } @@ -1440,8 +1479,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul if (pSDataBlock->pDataBlock != NULL) { SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0); tsCols = (int64_t*) pColDataInfo->pData; - assert(tsCols[0] == pSDataBlock->info.window.skey && - tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); + assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey); } int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1); @@ -1533,7 +1571,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul // updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false); } - static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) { STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info; @@ -3276,10 +3313,9 @@ void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) { } } +// TODO fix this bug. int32_t initResultRow(SResultRow *pResultRow) { pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow)); - pResultRow->pageId = -1; - pResultRow->offset = -1; return TSDB_CODE_SUCCESS; } @@ -3335,7 +3371,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t int64_t tid = 0; int64_t groupId = 0; - SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup); + SResultRow* pRow = doSetResultOutBufByKey_rv(NULL, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup); for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i); @@ -3476,48 +3512,44 @@ static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SqlFunctionCt pTableScanInfo->reverseTimes = 0; } -void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { - int32_t numOfOutput = pOperator->numOfOutput; -// if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) { -// // for each group result, call the finalize function for each column -// if (pQueryAttr->groupbyColumn) { -// closeAllResultRows(pResultRowInfo); -// } -// -// for (int32_t i = 0; i < pResultRowInfo->size; ++i) { -// SResultRow *buf = pResultRowInfo->pResult[i]; -// if (!isResultRowClosed(pResultRowInfo, i)) { -// continue; -// } -// -// setResultOutputBuf(pRuntimeEnv, buf, pCtx, numOfOutput, rowCellInfoOffset); -// -// for (int32_t j = 0; j < numOfOutput; ++j) { -//// pCtx[j].startTs = buf->win.skey; -//// if (pCtx[j].functionId < 0) { -//// doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); -//// } else { -//// aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]); -//// } -// } -// -// -// /* -// * set the number of output results for group by normal columns, the number of output rows usually is 1 except -// * the top and bottom query -// */ -// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput); -// } -// -// } else { - for (int32_t j = 0; j < numOfOutput; ++j) { -// if (pCtx[j].functionId < 0) { -// doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE); -// } else { - pCtx[j].fpSet.finalize(&pCtx[j]); -// } +void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput) { + for (int32_t j = 0; j < numOfOutput; ++j) { + pCtx[j].fpSet.finalize(&pCtx[j]); + } +} + +void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf *pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) { + for (int32_t i = 0; i < pResultRowInfo->size; ++i) { + SResultRowPosition* pPos = &pResultRowInfo->pPosition[i]; + + SFilePage* bufPage = getBufPage(pBuf, pPos->pageId); + SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->offset); + if (!isResultRowClosed(pResultRowInfo, i)) { + continue; } -// } + + for (int32_t j = 0; j < numOfOutput; ++j) { + pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset); + + struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; + if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { + continue; + } + + pCtx[j].fpSet.finalize(&pCtx[j]); + + if (pRow->numOfRows < pResInfo->numOfRes) { + pRow->numOfRows = pResInfo->numOfRes; + } + } + + releaseBufPage(pBuf, bufPage); + /* + * set the number of output results for group by normal columns, the number of output rows usually is 1 except + * the top and bottom query + */ +// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput); + } } static bool hasMainOutput(STaskAttr *pQueryAttr) { @@ -3612,31 +3644,29 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pResult, // Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group SFilePage* bufPage = getBufPage(pBuf, pResult->pageId); - int32_t offset = 0; +// int32_t offset = 0; for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo; if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) { - offset += pCtx[i].resDataInfo.bytes; +// offset += pCtx[i].resDataInfo.bytes; continue; } - pCtx[i].pOutput = getPosInResultPage_rv(bufPage, pResult->offset, offset); - offset += pCtx[i].resDataInfo.bytes; +// offset += pCtx[i].resDataInfo.bytes; - int32_t functionId = pCtx[i].functionId; - if (functionId < 0) { - continue; +// int32_t functionId = pCtx[i].functionId; +// if (functionId < 0) { +// continue; +// } +// if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { +// if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput; +// } + + if (!pResInfo->initialized) { + pCtx[i].fpSet.init(&pCtx[i], pResInfo); } - - if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) { - if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput; - } - - // if (!pResInfo->initialized) { - // aAggs[functionId].init(&pCtx[i], pResInfo); - // } } } @@ -3650,7 +3680,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset; SResultRow* pResultRow = - doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid, pTaskInfo, false, &pAggInfo->aggSup); + doSetResultOutBufByKey_rv(pAggInfo->pResultBuf, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid, pTaskInfo, false, &pAggInfo->aggSup); assert (pResultRow != NULL); /* @@ -3891,8 +3921,7 @@ void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) { * @param pQInfo * @param result */ - -static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity) { +static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) { int32_t numOfRows = getNumOfTotalRes(pGroupResInfo); int32_t numOfResult = pBlock->info.rows; // there are already exists result rows @@ -3910,13 +3939,19 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI step = -1; } + int32_t nrows = pBlock->info.rows; + for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) { - SResultRow* pRow = taosArrayGetP(pGroupResInfo->pRows, i); + SResultRowPosition* pPos = taosArrayGet(pGroupResInfo->pRows, i); + SFilePage *page = getBufPage(pBuf, pPos->pageId); + + SResultRow* pRow = (SResultRow*)((char*)page + pPos->offset); if (pRow->numOfRows == 0) { pGroupResInfo->index += 1; continue; } + // TODO copy multiple rows? int32_t numOfRowsToCopy = pRow->numOfRows; if (numOfResult + numOfRowsToCopy >= rowCapacity) { break; @@ -3924,20 +3959,17 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI pGroupResInfo->index += 1; - SFilePage *page = getBufPage(pBuf, pRow->pageId); - - int32_t offset = 0; for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j); - int32_t bytes = pColInfoData->info.bytes; + SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset); - char *out = pColInfoData->pData + numOfResult * bytes; - char *in = getPosInResultPage_rv(page, pRow->offset, offset); - memcpy(out, in, bytes * numOfRowsToCopy); - - offset += bytes; + char* in = GET_ROWCELL_INTERBUF(pEntryInfo); + colDataAppend(pColInfoData, nrows, in, pEntryInfo->numOfRes == 0); } + releaseBufPage(pBuf, page); + nrows += 1; + numOfResult += numOfRowsToCopy; if (numOfResult == rowCapacity) { // output buffer is full break; @@ -3949,16 +3981,16 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI return 0; } -static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity) { +static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) { assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup); - pBlock->info.rows = 0; + blockDataClearup(pBlock); if (!hasRemainDataInCurrentGroup(pGroupResInfo)) { return; } int32_t orderType = TSDB_ORDER_ASC;//(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC; - doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity); + doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity, rowCellOffset); // add condition (pBlock->info.rows >= 1) just to runtime happy blockDataUpdateTsWindow(pBlock); @@ -5173,9 +5205,8 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo *pOperator, bool* newgroup) { SExchangeInfo *pExchangeInfo = pOperator->info; SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo; - int32_t code = pOperator->_openFn(pOperator); - if (code != TSDB_CODE_SUCCESS) { - pTaskInfo->code = code; + pTaskInfo->code = pOperator->_openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { return NULL; } @@ -5761,7 +5792,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHan } static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, int32_t capacity) { - blockDataClearup(pDataBlock, hasVarCol); + blockDataClearup(pDataBlock); while(1) { STupleHandle* pTupleHandle = tsortNextTuple(pHandle); @@ -5917,7 +5948,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { while(1) { - blockDataClearup(pDataBlock, pInfo->hasVarCol); + blockDataClearup(pDataBlock); while (1) { STupleHandle* pTupleHandle = tsortNextTuple(pHandle); if (pTupleHandle == NULL) { @@ -6234,7 +6265,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo *pOperator) { doAggregateImpl(pOperator, 0, pInfo->pCtx); } - finalizeQueryResult(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); + finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput); OPTR_SET_OPENED(pOperator); return TSDB_CODE_SUCCESS; @@ -6270,7 +6301,7 @@ static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgro SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; if (pOperator->status == OP_RES_TO_RETURN) { - toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity); + toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity, pAggInfo->binfo.rowCellInfoOffset); if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { pOperator->status = OP_EXEC_DONE; @@ -6319,7 +6350,7 @@ static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgro updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset); initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo); - toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity); + toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity, pAggInfo->binfo.rowCellInfoOffset); if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -6333,7 +6364,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup) SOptrBasicInfo *pInfo = &pProjectInfo->binfo; SSDataBlock* pRes = pInfo->pRes; - blockDataClearup(pRes, pProjectInfo->hasVarCol); + blockDataClearup(pRes); if (pProjectInfo->existDataBlock) { // TODO refactor // STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current; @@ -6496,53 +6527,57 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) { return NULL; } -static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; +static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) { + if (OPTR_IS_OPENED(pOperator)) { + return TSDB_CODE_SUCCESS; } STableIntervalOperatorInfo* pInfo = pOperator->info; - if (pOperator->status == OP_RES_TO_RETURN) { -// toSDatablock(pAggInfo->pGroupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity); - if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { - doSetOperatorCompleted(pOperator); - } - - return pInfo->binfo.pRes; - } - -// int32_t order = pQueryAttr->order.order; -// STimeWindow win = pQueryAttr->window; + // int32_t order = pQueryAttr->order.order; + // STimeWindow win = pQueryAttr->window; + bool newgroup = false; SOperatorInfo* downstream = pOperator->pDownstream[0]; - while(1) { + while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; } -// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); + // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, TSDB_ORDER_ASC); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); } - // restore the value -// pQueryAttr->order.order = order; -// pQueryAttr->window = win; - - pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pInfo->binfo.resultRowInfo); - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); + finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo); - toSDatablock(&pInfo->groupResInfo, pInfo->pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity); + OPTR_SET_OPENED(pOperator); + return TSDB_CODE_SUCCESS; +} + +static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { + STableIntervalOperatorInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + pTaskInfo->code = pOperator->_openFn(pOperator); + if (pTaskInfo->code != TSDB_CODE_SUCCESS) { + return NULL; + } + + blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity); + toSDatablock(&pInfo->groupResInfo, pInfo->pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset); if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) { doSetOperatorCompleted(pOperator); @@ -6598,7 +6633,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pIntervalInfo->binfo.resultRowInfo); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - finalizeQueryResult(pOperator, pIntervalInfo->binfo.pCtx, &pIntervalInfo->binfo.resultRowInfo, pIntervalInfo->binfo.rowCellInfoOffset); + finalizeQueryResult(pIntervalInfo->binfo.pCtx, pOperator->numOfOutput); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->binfo.resultRowInfo); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes); @@ -6847,7 +6882,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo *pOperator, bool* newgroup) { pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); + finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput); initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); @@ -6896,7 +6931,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup) pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); // setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); - finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); + finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput); // initGroupResInfo(&pBInfo->groupResInfo, &pBInfo->resultRowInfo); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); @@ -6950,7 +6985,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou // setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows - finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); + finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput); } else { updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset); } @@ -7109,11 +7144,11 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t n pAggSup->keyBuf = calloc(1, sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES); pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK); pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK); - pAggSup->pool = initResultRowPool(pAggSup->resultRowSize); +// pAggSup->pool = initResultRowPool(pAggSup->resultRowSize); pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell)); if (pAggSup->keyBuf == NULL || pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL || - pAggSup->pResultRowHashTable == NULL || pAggSup->pool == NULL) { + pAggSup->pResultRowHashTable == NULL) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -7125,7 +7160,7 @@ static void cleanupAggSup(SAggSupporter* pAggSup) { taosHashCleanup(pAggSup->pResultRowHashTable); taosHashCleanup(pAggSup->pResultRowListSet); taosArrayDestroy(pAggSup->pResultRowArrayList); - destroyResultRowPool(pAggSup->pool); +// destroyResultRowPool(pAggSup->pool); } static int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, @@ -7448,7 +7483,7 @@ SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, int32_t numOfD } SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SInterval* pInterval, -const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { + const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -7460,7 +7495,10 @@ const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { pInfo->win = pTaskInfo->window; pInfo->interval = *pInterval; - int32_t numOfRows = 1; + pInfo->win.skey = INT64_MIN; + pInfo->win.ekey = INT64_MAX; + + int32_t numOfRows = 4096; int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock); // pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo); if (code != TSDB_CODE_SUCCESS/* || pInfo->pTableQueryInfo == NULL*/) { @@ -7483,7 +7521,7 @@ const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) { pOperator->pTaskInfo = pTaskInfo; pOperator->numOfOutput = numOfCols; pOperator->info = pInfo; - pOperator->_openFn = operatorDummyOpenFn; + pOperator->_openFn = doOpenIntervalAgg; pOperator->getNextFn = doIntervalAgg; pOperator->closeFn = destroyIntervalOperatorInfo; @@ -8304,10 +8342,14 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i); SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo); + SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; + int32_t num = 0; - SExprInfo* pExprInfo = createExprInfo(((SIntervalPhysiNode*)pPhyNode)->pFuncs, &num); + SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->pFuncs, &num); SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc); - return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, NULL, pTableGroupInfo, pTaskInfo); + + SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, .intervalUnit = 'a', .slidingUnit = 'a'}; + return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo); } } /*else if (pPhyNode->info.type == OP_MultiTableAggregate) { size_t size = taosArrayGetSize(pPhyNode->pChildren); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 34dd248ba7..08bab762be 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -37,7 +37,6 @@ typedef struct SSortHandle { SArray *pOrderInfo; bool nullFirst; - bool hasVarCol; SArray *pOrderedSource; _sort_fetch_block_fn_t fetchfp; @@ -77,6 +76,10 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) { colInfo.info.bytes = pSchema[i].bytes; colInfo.info.colId = pSchema[i].colId; taosArrayPush(pBlock->pDataBlock, &colInfo); + + if (IS_VAR_DATA_TYPE(colInfo.info.type)) { + pBlock->info.hasVarCol = true; + } } return pBlock; @@ -155,7 +158,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { while(start < pDataBlock->info.rows) { int32_t stop = 0; - blockDataSplitRows(pDataBlock, pHandle->hasVarCol, start, &stop, pHandle->pageSize); + blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize); SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1); if (p == NULL) { return terrno; @@ -179,7 +182,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { start = stop + 1; } - blockDataClearup(pDataBlock, pHandle->hasVarCol); + blockDataClearup(pDataBlock); SSDataBlock* pBlock = createOneDataBlock(pDataBlock); int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); @@ -309,7 +312,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa } static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) { - blockDataClearup(pHandle->pDataBlock, pHandle->hasVarCol); + blockDataClearup(pHandle->pDataBlock); while(1) { if (cmpParam->numOfSources == pHandle->numOfCompletedSources) { @@ -475,7 +478,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) { setBufPageDirty(pPage, true); releaseBufPage(pHandle->pBuf, pPage); - blockDataClearup(pDataBlock, pHandle->hasVarCol); + blockDataClearup(pDataBlock); } tMergeTreeDestroy(pHandle->pMergeTree); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index aaaee6d56c..78e0e9f8e7 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -52,10 +52,6 @@ static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(p void functionFinalizer(SqlFunctionCtx *pCtx) { SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - if (pResInfo->hasResult != DATA_SET_FLAG) { -// setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes); - } - doFinalizer(pResInfo); } diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index 7520ea3c9e..3fae580de9 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -310,9 +310,12 @@ static SLogicNode* createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInt pWindow->node.id = pCxt->planNodeId++; pWindow->winType = WINDOW_TYPE_INTERVAL; - pWindow->interval = ((SValueNode*)pInterval->pInterval)->datum.i; + SValueNode* pIntervalNode = (SValueNode*)((SRawExprNode*)(pInterval->pInterval))->pNode; + + pWindow->interval = pIntervalNode->datum.i; pWindow->offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0); - pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : 0); + pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval); + if (NULL != pInterval->pFill) { pWindow->pFill = nodesCloneNode(pInterval->pFill); CHECK_ALLOC(pWindow->pFill, (SLogicNode*)pWindow);