[td-13039] enable interval query on ordinary table.
This commit is contained in:
parent
a853c6ddf5
commit
e06cdb0872
|
@ -57,7 +57,8 @@ typedef struct SDataBlockInfo {
|
|||
STimeWindow window;
|
||||
int32_t rows;
|
||||
int32_t rowSize;
|
||||
int32_t numOfCols;
|
||||
int16_t numOfCols;
|
||||
int16_t hasVarCol;
|
||||
union {int64_t uid; int64_t blockId;};
|
||||
} SDataBlockInfo;
|
||||
|
||||
|
@ -96,13 +97,15 @@ typedef struct SColumnInfoData {
|
|||
|
||||
static FORCE_INLINE int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock) {
|
||||
int64_t tbUid = pBlock->info.uid;
|
||||
int32_t numOfCols = pBlock->info.numOfCols;
|
||||
int16_t numOfCols = pBlock->info.numOfCols;
|
||||
int16_t hasVarCol = pBlock->info.hasVarCol;
|
||||
int32_t rows = pBlock->info.rows;
|
||||
int32_t sz = taosArrayGetSize(pBlock->pDataBlock);
|
||||
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, tbUid);
|
||||
tlen += taosEncodeFixedI32(buf, numOfCols);
|
||||
tlen += taosEncodeFixedI16(buf, numOfCols);
|
||||
tlen += taosEncodeFixedI16(buf, hasVarCol);
|
||||
tlen += taosEncodeFixedI32(buf, rows);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
|
@ -120,7 +123,8 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock)
|
|||
int32_t sz;
|
||||
|
||||
buf = taosDecodeFixedI64(buf, &pBlock->info.uid);
|
||||
buf = taosDecodeFixedI32(buf, &pBlock->info.numOfCols);
|
||||
buf = taosDecodeFixedI16(buf, &pBlock->info.numOfCols);
|
||||
buf = taosDecodeFixedI16(buf, &pBlock->info.hasVarCol);
|
||||
buf = taosDecodeFixedI32(buf, &pBlock->info.rows);
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pBlock->pDataBlock = taosArrayInit(sz, sizeof(SColumnInfoData));
|
||||
|
|
|
@ -117,7 +117,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
|
|||
|
||||
int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRows);
|
||||
int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows);
|
||||
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol);
|
||||
void blockDataClearup(SSDataBlock* pDataBlock);
|
||||
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock);
|
||||
size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize);
|
||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||
|
|
|
@ -1059,10 +1059,10 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
|
|||
// destroyTupleIndex(index);
|
||||
}
|
||||
|
||||
void blockDataClearup(SSDataBlock* pDataBlock, bool hasVarCol) {
|
||||
void blockDataClearup(SSDataBlock* pDataBlock) {
|
||||
pDataBlock->info.rows = 0;
|
||||
|
||||
if (hasVarCol) {
|
||||
if (pDataBlock->info.hasVarCol) {
|
||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
|
||||
|
@ -1148,7 +1148,9 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) {
|
|||
|
||||
SSDataBlock* pBlock = calloc(1, sizeof(SSDataBlock));
|
||||
pBlock->pDataBlock = taosArrayInit(numOfCols, sizeof(SColumnInfoData));
|
||||
|
||||
pBlock->info.numOfCols = numOfCols;
|
||||
pBlock->info.hasVarCol = pDataBlock->info.hasVarCol;
|
||||
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumnInfoData colInfo = {0};
|
||||
|
|
|
@ -50,7 +50,7 @@ typedef struct SGroupResInfo {
|
|||
int32_t totalGroup;
|
||||
int32_t currentGroup;
|
||||
int32_t index;
|
||||
SArray* pRows; // SArray<SResultRow*>
|
||||
SArray* pRows; // SArray<SResultRowPosition*>
|
||||
bool ordered;
|
||||
int32_t position;
|
||||
} SGroupResInfo;
|
||||
|
@ -67,10 +67,15 @@ typedef struct SResultRow {
|
|||
char *key; // start key of current result row
|
||||
} SResultRow;
|
||||
|
||||
typedef struct SResultRowPosition {
|
||||
int32_t pageId;
|
||||
int32_t offset;
|
||||
} SResultRowPosition;
|
||||
|
||||
typedef struct SResultRowInfo {
|
||||
SList *pRows;
|
||||
SResultRowPosition *pPosition;
|
||||
SResultRow **pResult; // result list
|
||||
// int16_t type:8; // data type for hash key
|
||||
int32_t size; // number of result set
|
||||
int32_t capacity; // max capacity
|
||||
int32_t curPos; // current active result row index of pResult list
|
||||
|
@ -131,7 +136,7 @@ static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffs
|
|||
assert(rowOffset >= 0);
|
||||
|
||||
int32_t numOfRows = 1;//(int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
|
||||
return ((char *)page->data) + rowOffset + offset * numOfRows;
|
||||
return (char*) page + rowOffset + offset * numOfRows;
|
||||
}
|
||||
|
||||
//bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
|
||||
|
@ -139,12 +144,7 @@ static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffs
|
|||
|
||||
__filter_func_t getFilterOperator(int32_t lowerOptr, int32_t upperOptr);
|
||||
|
||||
SResultRowPool* initResultRowPool(size_t size);
|
||||
SResultRow* getNewResultRow(SResultRowPool* p);
|
||||
int64_t getResultRowPoolMemSize(SResultRowPool* p);
|
||||
void* destroyResultRowPool(SResultRowPool* p);
|
||||
int32_t getNumOfAllocatedResultRows(SResultRowPool* p);
|
||||
int32_t getNumOfUsedResultRows(SResultRowPool* p);
|
||||
|
||||
typedef struct {
|
||||
SArray* pResult; // SArray<SResPair>
|
||||
|
|
|
@ -463,7 +463,7 @@ typedef struct SAggSupporter {
|
|||
SHashObj* pResultRowListSet; // used to check if current ResultRowInfo has ResultRow object or not
|
||||
SArray* pResultRowArrayList; // The array list that contains the Result rows
|
||||
char* keyBuf; // window key buffer
|
||||
SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
|
||||
// SResultRowPool *pool; // The window result objects pool, all the resultRow Objects are allocated and managed by this object.
|
||||
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
||||
} SAggSupporter;
|
||||
|
||||
|
@ -678,8 +678,8 @@ SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numO
|
|||
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
|
||||
|
||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
||||
void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo,
|
||||
int32_t* rowCellInfoOffset);
|
||||
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||
|
||||
void clearOutputBuf(SOptrBasicInfo* pBInfo, int32_t* bufCapacity);
|
||||
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||
|
||||
|
|
|
@ -59,7 +59,8 @@ int32_t initResultRowInfo(SResultRowInfo *pResultRowInfo, int32_t size) {
|
|||
pResultRowInfo->capacity = size;
|
||||
|
||||
pResultRowInfo->pResult = calloc(pResultRowInfo->capacity, POINTER_BYTES);
|
||||
if (pResultRowInfo->pResult == NULL) {
|
||||
pResultRowInfo->pPosition = calloc(pResultRowInfo->capacity, sizeof(SResultRowPosition));
|
||||
if (pResultRowInfo->pResult == NULL || pResultRowInfo->pPosition == NULL) {
|
||||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -182,22 +183,6 @@ size_t getResultRowSize(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|||
return rowSize;
|
||||
}
|
||||
|
||||
SResultRowPool* initResultRowPool(size_t size) {
|
||||
SResultRowPool* p = calloc(1, sizeof(SResultRowPool));
|
||||
if (p == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
p->numOfElemPerBlock = 128;
|
||||
|
||||
p->elemSize = (int32_t) size;
|
||||
p->blockSize = p->numOfElemPerBlock * p->elemSize;
|
||||
p->position.pos = 0;
|
||||
|
||||
p->pData = taosArrayInit(8, POINTER_BYTES);
|
||||
return p;
|
||||
}
|
||||
|
||||
SResultRow* getNewResultRow(SResultRowPool* p) {
|
||||
if (p == NULL) {
|
||||
return NULL;
|
||||
|
@ -221,132 +206,6 @@ SResultRow* getNewResultRow(SResultRowPool* p) {
|
|||
return ptr;
|
||||
}
|
||||
|
||||
int64_t getResultRowPoolMemSize(SResultRowPool* p) {
|
||||
if (p == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return taosArrayGetSize(p->pData) * p->blockSize;
|
||||
}
|
||||
|
||||
int32_t getNumOfAllocatedResultRows(SResultRowPool* p) {
|
||||
return (int32_t) taosArrayGetSize(p->pData) * p->numOfElemPerBlock;
|
||||
}
|
||||
|
||||
int32_t getNumOfUsedResultRows(SResultRowPool* p) {
|
||||
return getNumOfAllocatedResultRows(p) - p->numOfElemPerBlock + p->position.pos;
|
||||
}
|
||||
|
||||
void* destroyResultRowPool(SResultRowPool* p) {
|
||||
if (p == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
size_t size = taosArrayGetSize(p->pData);
|
||||
for(int32_t i = 0; i < size; ++i) {
|
||||
void** ptr = taosArrayGet(p->pData, i);
|
||||
tfree(*ptr);
|
||||
}
|
||||
|
||||
taosArrayDestroy(p->pData);
|
||||
|
||||
tfree(p);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen) {
|
||||
uint32_t numOfGroup = (uint32_t) taosArrayGetSize(pRes);
|
||||
tbufWriteUint32(bw, numOfGroup);
|
||||
tbufWriteUint16(bw, tagLen);
|
||||
|
||||
for(int32_t i = 0; i < numOfGroup; ++i) {
|
||||
SInterResult* pOne = taosArrayGet(pRes, i);
|
||||
if (tagLen > 0) {
|
||||
tbufWriteBinary(bw, pOne->tags, tagLen);
|
||||
}
|
||||
|
||||
uint32_t numOfCols = (uint32_t) taosArrayGetSize(pOne->pResult);
|
||||
tbufWriteUint32(bw, numOfCols);
|
||||
for(int32_t j = 0; j < numOfCols; ++j) {
|
||||
SStddevInterResult* p = taosArrayGet(pOne->pResult, j);
|
||||
uint32_t numOfRows = (uint32_t) taosArrayGetSize(p->pResult);
|
||||
|
||||
tbufWriteUint16(bw, p->colId);
|
||||
tbufWriteUint32(bw, numOfRows);
|
||||
|
||||
for(int32_t k = 0; k < numOfRows; ++k) {
|
||||
// SResPair v = *(SResPair*) taosArrayGet(p->pResult, k);
|
||||
// tbufWriteDouble(bw, v.avg);
|
||||
// tbufWriteInt64(bw, v.key);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
SArray* interResFromBinary(const char* data, int32_t len) {
|
||||
SBufferReader br = tbufInitReader(data, len, false);
|
||||
uint32_t numOfGroup = tbufReadUint32(&br);
|
||||
uint16_t tagLen = tbufReadUint16(&br);
|
||||
|
||||
char* tag = NULL;
|
||||
if (tagLen > 0) {
|
||||
tag = calloc(1, tagLen);
|
||||
}
|
||||
|
||||
SArray* pResult = taosArrayInit(4, sizeof(SInterResult));
|
||||
|
||||
for(int32_t i = 0; i < numOfGroup; ++i) {
|
||||
if (tagLen > 0) {
|
||||
memset(tag, 0, tagLen);
|
||||
tbufReadToBinary(&br, tag, tagLen);
|
||||
}
|
||||
|
||||
uint32_t numOfCols = tbufReadUint32(&br);
|
||||
|
||||
SArray* p = taosArrayInit(numOfCols, sizeof(SStddevInterResult));
|
||||
for(int32_t j = 0; j < numOfCols; ++j) {
|
||||
// int16_t colId = tbufReadUint16(&br);
|
||||
int32_t numOfRows = tbufReadUint32(&br);
|
||||
|
||||
// SStddevInterResult interRes = {.colId = colId, .pResult = taosArrayInit(4, sizeof(struct SResPair)),};
|
||||
for(int32_t k = 0; k < numOfRows; ++k) {
|
||||
// SResPair px = {0};
|
||||
// px.avg = tbufReadDouble(&br);
|
||||
// px.key = tbufReadInt64(&br);
|
||||
//
|
||||
// taosArrayPush(interRes.pResult, &px);
|
||||
}
|
||||
|
||||
// taosArrayPush(p, &interRes);
|
||||
}
|
||||
|
||||
char* p1 = NULL;
|
||||
if (tagLen > 0) {
|
||||
p1 = malloc(tagLen);
|
||||
memcpy(p1, tag, tagLen);
|
||||
}
|
||||
|
||||
SInterResult d = {.pResult = p, .tags = p1,};
|
||||
taosArrayPush(pResult, &d);
|
||||
}
|
||||
|
||||
tfree(tag);
|
||||
return pResult;
|
||||
}
|
||||
|
||||
void freeInterResult(void* param) {
|
||||
SInterResult* pResult = (SInterResult*) param;
|
||||
tfree(pResult->tags);
|
||||
|
||||
int32_t numOfCols = (int32_t) taosArrayGetSize(pResult->pResult);
|
||||
for(int32_t i = 0; i < numOfCols; ++i) {
|
||||
SStddevInterResult *p = taosArrayGet(pResult->pResult, i);
|
||||
taosArrayDestroy(p->pResult);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pResult->pResult);
|
||||
}
|
||||
|
||||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
|
||||
assert(pGroupResInfo != NULL);
|
||||
|
||||
|
@ -360,7 +219,7 @@ void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo)
|
|||
taosArrayDestroy(pGroupResInfo->pRows);
|
||||
}
|
||||
|
||||
pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES);
|
||||
pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pPosition, pResultInfo->size, sizeof(SResultRowPosition));
|
||||
pGroupResInfo->index = 0;
|
||||
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
||||
}
|
||||
|
|
|
@ -26,15 +26,16 @@
|
|||
#include "tsort.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#include "../../function/inc/taggfunction.h"
|
||||
#include "executorimpl.h"
|
||||
#include "function.h"
|
||||
#include "query.h"
|
||||
#include "tcompare.h"
|
||||
#include "tcompression.h"
|
||||
#include "thash.h"
|
||||
#include "ttypes.h"
|
||||
#include "query.h"
|
||||
#include "vnode.h"
|
||||
#include "tsdb.h"
|
||||
#include "ttypes.h"
|
||||
#include "vnode.h"
|
||||
|
||||
#define IS_MAIN_SCAN(runtime) ((runtime)->scanFlag == MAIN_SCAN)
|
||||
#define IS_REVERSE_SCAN(runtime) ((runtime)->scanFlag == REVERSE_SCAN)
|
||||
|
@ -235,7 +236,7 @@ static int32_t operatorDummyOpenFn(SOperatorInfo *pOperator) {
|
|||
|
||||
static void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
|
||||
|
||||
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity);
|
||||
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset);
|
||||
|
||||
static int32_t getGroupbyColumnIndex(SGroupbyExpr *pGroupbyExpr, SSDataBlock* pDataBlock);
|
||||
static int32_t setGroupResultOutputBuf(STaskRuntimeEnv *pRuntimeEnv, SOptrBasicInfo *binf, int32_t numOfCols, char *pData, int16_t type, int16_t bytes, int32_t groupIndex);
|
||||
|
@ -444,10 +445,12 @@ static void prepareResultListBuffer(SResultRowInfo* pResultRowInfo, jmp_buf env)
|
|||
longjmp(env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
pResultRowInfo->pPosition = realloc(pResultRowInfo->pPosition, newCapacity * sizeof(SResultRowPosition));
|
||||
pResultRowInfo->pResult = (SResultRow **)t;
|
||||
|
||||
int32_t inc = (int32_t)newCapacity - pResultRowInfo->capacity;
|
||||
memset(&pResultRowInfo->pResult[pResultRowInfo->capacity], 0, POINTER_BYTES * inc);
|
||||
memset(&pResultRowInfo->pPosition[pResultRowInfo->capacity], 0, sizeof(SResultRowPosition));
|
||||
|
||||
pResultRowInfo->capacity = (int32_t)newCapacity;
|
||||
}
|
||||
|
@ -564,7 +567,47 @@ static SResultRow* doSetResultOutBufByKey(STaskRuntimeEnv* pRuntimeEnv, SResultR
|
|||
return pResultRowInfo->pResult[pResultRowInfo->curPos];
|
||||
}
|
||||
|
||||
static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int64_t tid, char* pData, int16_t bytes,
|
||||
SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, int32_t interBufSize) {
|
||||
SFilePage *pData = NULL;
|
||||
|
||||
// in the first scan, new space needed for results
|
||||
int32_t pageId = -1;
|
||||
SIDList list = getDataBufPagesIdList(pResultBuf, tableGroupId);
|
||||
|
||||
if (taosArrayGetSize(list) == 0) {
|
||||
pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
|
||||
pData->num = sizeof(SFilePage);
|
||||
} else {
|
||||
SPageInfo* pi = getLastPageInfo(list);
|
||||
pData = getBufPage(pResultBuf, getPageId(pi));
|
||||
pageId = getPageId(pi);
|
||||
|
||||
if (pData->num + interBufSize + sizeof(SResultRow) > getBufPageSize(pResultBuf)) {
|
||||
// release current page first, and prepare the next one
|
||||
releaseBufPageInfo(pResultBuf, pi);
|
||||
|
||||
pData = getNewBufPage(pResultBuf, tableGroupId, &pageId);
|
||||
if (pData != NULL) {
|
||||
pData->num = sizeof(SFilePage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (pData == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
// set the number of rows in current disk page
|
||||
SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
|
||||
pResultRow->pageId = pageId;
|
||||
pResultRow->offset = (int32_t)pData->num;
|
||||
|
||||
pData->num += interBufSize + sizeof(SResultRow);
|
||||
|
||||
return pResultRow;
|
||||
}
|
||||
|
||||
static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t tid, char* pData, int16_t bytes,
|
||||
bool masterscan, uint64_t tableGroupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) {
|
||||
bool existed = false;
|
||||
SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, tableGroupId);
|
||||
|
@ -608,7 +651,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int
|
|||
|
||||
SResultRow *pResult = NULL;
|
||||
if (p1 == NULL) {
|
||||
pResult = getNewResultRow(pSup->pool);
|
||||
pResult = getNewResultRow_rv(pResultBuf, tableGroupId, pSup->resultRowSize);
|
||||
int32_t ret = initResultRow(pResult);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
@ -623,6 +666,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SResultRowInfo* pResultRowInfo, int
|
|||
}
|
||||
|
||||
pResultRowInfo->curPos = pResultRowInfo->size;
|
||||
pResultRowInfo->pPosition[pResultRowInfo->size] = (SResultRowPosition) {.pageId = pResult->pageId, .offset = pResult->offset};
|
||||
pResultRowInfo->pResult[pResultRowInfo->size++] = pResult;
|
||||
|
||||
int64_t index = pResultRowInfo->curPos;
|
||||
|
@ -742,6 +786,7 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes
|
|||
|
||||
if (taosArrayGetSize(list) == 0) {
|
||||
pData = getNewBufPage(pResultBuf, tid, &pageId);
|
||||
pData->num = sizeof(SFilePage);
|
||||
} else {
|
||||
SPageInfo* pi = getLastPageInfo(list);
|
||||
pData = getBufPage(pResultBuf, getPageId(pi));
|
||||
|
@ -750,9 +795,10 @@ static int32_t addNewWindowResultBuf(SResultRow *pWindowRes, SDiskbasedBuf *pRes
|
|||
if (pData->num + size > getBufPageSize(pResultBuf)) {
|
||||
// release current page first, and prepare the next one
|
||||
releaseBufPageInfo(pResultBuf, pi);
|
||||
|
||||
pData = getNewBufPage(pResultBuf, tid, &pageId);
|
||||
if (pData != NULL) {
|
||||
assert(pData->num == 0); // number of elements must be 0 for new allocated buffer
|
||||
pData->num = sizeof(SFilePage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -814,7 +860,7 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo *pResultRowInfo, int64_
|
|||
bool masterscan, SResultRow **pResult, int64_t tableGroupId, SqlFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, int32_t* rowCellInfoOffset, SDiskbasedBuf *pBuf, SAggSupporter *pAggSup, SExecTaskInfo* pTaskInfo) {
|
||||
assert(win->skey <= win->ekey);
|
||||
SResultRow *pResultRow = doSetResultOutBufByKey_rv(pResultRowInfo, id, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId,
|
||||
SResultRow *pResultRow = doSetResultOutBufByKey_rv(pBuf, pResultRowInfo, id, (char *)&win->skey, TSDB_KEYSIZE, masterscan, tableGroupId,
|
||||
pTaskInfo, true, pAggSup);
|
||||
|
||||
if (pResultRow == NULL) {
|
||||
|
@ -822,19 +868,10 @@ static int32_t setResultOutputBufByKey_rv(SResultRowInfo *pResultRowInfo, int64_
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// not assign result buffer yet, add new result buffer
|
||||
if (pResultRow->pageId == -1) { // todo intermediate result size
|
||||
int32_t ret = addNewWindowResultBuf(pResultRow, pBuf, (int32_t) tableGroupId, 0);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// set time window for current result
|
||||
pResultRow->win = (*win);
|
||||
*pResult = pResultRow;
|
||||
setResultRowOutputBufInitCtx_rv(pBuf, pResultRow, pCtx, numOfOutput, rowCellInfoOffset);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -990,15 +1027,16 @@ static int32_t getNumOfRowsInTimeWindow(SDataBlockInfo *pDataBlockInfo, TSKEY *p
|
|||
static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
|
||||
int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
pCtx[k].size = forwardStep;
|
||||
pCtx[k].startTs = pWin->skey;
|
||||
|
||||
// keep it temporarialy
|
||||
int32_t startOffset = pCtx[k].startRow;
|
||||
bool hasAgg = pCtx[k].isAggSet;
|
||||
int32_t startOffset = pCtx[k].input.startRowIndex;
|
||||
bool hasAgg = pCtx[k].input.colDataAggIsSet;
|
||||
int32_t numOfRows = pCtx[k].input.numOfRows;
|
||||
|
||||
int32_t pos = (order == TSDB_ORDER_ASC) ? offset : offset - (forwardStep - 1);
|
||||
pCtx[k].startRow = pos;
|
||||
pCtx[k].input.startRowIndex = pos;
|
||||
pCtx[k].input.numOfRows = forwardStep;
|
||||
|
||||
if (tsCol != NULL) {
|
||||
pCtx[k].ptsList = &tsCol[pos];
|
||||
|
@ -1011,12 +1049,13 @@ static void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, int32_t of
|
|||
}
|
||||
|
||||
if (functionNeedToExecute(&pCtx[k])) {
|
||||
// pCtx[k].fpSet.process(&pCtx[k]);
|
||||
pCtx[k].fpSet.process(&pCtx[k]);
|
||||
}
|
||||
|
||||
// restore it
|
||||
pCtx[k].isAggSet = hasAgg;
|
||||
pCtx[k].startRow = startOffset;
|
||||
pCtx[k].input.colDataAggIsSet = hasAgg;
|
||||
pCtx[k].input.startRowIndex = startOffset;
|
||||
pCtx[k].input.numOfRows = numOfRows;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1440,8 +1479,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
if (pSDataBlock->pDataBlock != NULL) {
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, 0);
|
||||
tsCols = (int64_t*) pColDataInfo->pData;
|
||||
assert(tsCols[0] == pSDataBlock->info.window.skey &&
|
||||
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
|
||||
assert(tsCols[0] == pSDataBlock->info.window.skey && tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
|
||||
}
|
||||
|
||||
int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1);
|
||||
|
@ -1533,7 +1571,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
// updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false);
|
||||
}
|
||||
|
||||
|
||||
static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
|
||||
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
|
||||
|
||||
|
@ -3276,10 +3313,9 @@ void switchCtxOrder(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO fix this bug.
|
||||
int32_t initResultRow(SResultRow *pResultRow) {
|
||||
pResultRow->pEntryInfo = (struct SResultRowEntryInfo*)((char*)pResultRow + sizeof(SResultRow));
|
||||
pResultRow->pageId = -1;
|
||||
pResultRow->offset = -1;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3335,7 +3371,7 @@ void setFunctionResultOutput(SOptrBasicInfo* pInfo, SAggSupporter* pSup, int32_t
|
|||
|
||||
int64_t tid = 0;
|
||||
int64_t groupId = 0;
|
||||
SResultRow* pRow = doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup);
|
||||
SResultRow* pRow = doSetResultOutBufByKey_rv(NULL, pResultRowInfo, tid, (char *)&tid, sizeof(tid), true, groupId, pTaskInfo, false, pSup);
|
||||
|
||||
for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) {
|
||||
SColumnInfoData* pData = taosArrayGet(pDataBlock->pDataBlock, i);
|
||||
|
@ -3476,48 +3512,44 @@ static void setupEnvForReverseScan(STableScanInfo *pTableScanInfo, SqlFunctionCt
|
|||
pTableScanInfo->reverseTimes = 0;
|
||||
}
|
||||
|
||||
void finalizeQueryResult(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
|
||||
int32_t numOfOutput = pOperator->numOfOutput;
|
||||
// if (pQueryAttr->groupbyColumn || QUERY_IS_INTERVAL_QUERY(pQueryAttr) || pQueryAttr->sw.gap > 0 || pQueryAttr->stateWindow) {
|
||||
// // for each group result, call the finalize function for each column
|
||||
// if (pQueryAttr->groupbyColumn) {
|
||||
// closeAllResultRows(pResultRowInfo);
|
||||
// }
|
||||
//
|
||||
// for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
|
||||
// SResultRow *buf = pResultRowInfo->pResult[i];
|
||||
// if (!isResultRowClosed(pResultRowInfo, i)) {
|
||||
// continue;
|
||||
// }
|
||||
//
|
||||
// setResultOutputBuf(pRuntimeEnv, buf, pCtx, numOfOutput, rowCellInfoOffset);
|
||||
//
|
||||
// for (int32_t j = 0; j < numOfOutput; ++j) {
|
||||
//// pCtx[j].startTs = buf->win.skey;
|
||||
//// if (pCtx[j].functionId < 0) {
|
||||
//// doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
|
||||
//// } else {
|
||||
//// aAggs[pCtx[j].functionId].xFinalize(&pCtx[j]);
|
||||
//// }
|
||||
// }
|
||||
//
|
||||
//
|
||||
// /*
|
||||
// * set the number of output results for group by normal columns, the number of output rows usually is 1 except
|
||||
// * the top and bottom query
|
||||
// */
|
||||
// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput);
|
||||
// }
|
||||
//
|
||||
// } else {
|
||||
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput) {
|
||||
for (int32_t j = 0; j < numOfOutput; ++j) {
|
||||
// if (pCtx[j].functionId < 0) {
|
||||
// doInvokeUdf(pRuntimeEnv->pUdfInfo, &pCtx[j], 0, TSDB_UDF_FUNC_FINALIZE);
|
||||
// } else {
|
||||
pCtx[j].fpSet.finalize(&pCtx[j]);
|
||||
// }
|
||||
}
|
||||
// }
|
||||
}
|
||||
|
||||
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf *pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset) {
|
||||
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
|
||||
SResultRowPosition* pPos = &pResultRowInfo->pPosition[i];
|
||||
|
||||
SFilePage* bufPage = getBufPage(pBuf, pPos->pageId);
|
||||
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->offset);
|
||||
if (!isResultRowClosed(pResultRowInfo, i)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfOutput; ++j) {
|
||||
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset);
|
||||
|
||||
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
|
||||
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
pCtx[j].fpSet.finalize(&pCtx[j]);
|
||||
|
||||
if (pRow->numOfRows < pResInfo->numOfRes) {
|
||||
pRow->numOfRows = pResInfo->numOfRes;
|
||||
}
|
||||
}
|
||||
|
||||
releaseBufPage(pBuf, bufPage);
|
||||
/*
|
||||
* set the number of output results for group by normal columns, the number of output rows usually is 1 except
|
||||
* the top and bottom query
|
||||
*/
|
||||
// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput);
|
||||
}
|
||||
}
|
||||
|
||||
static bool hasMainOutput(STaskAttr *pQueryAttr) {
|
||||
|
@ -3612,31 +3644,29 @@ void setResultRowOutputBufInitCtx_rv(SDiskbasedBuf * pBuf, SResultRow *pResult,
|
|||
// Note: pResult->pos[i]->num == 0, there is only fixed number of results for each group
|
||||
SFilePage* bufPage = getBufPage(pBuf, pResult->pageId);
|
||||
|
||||
int32_t offset = 0;
|
||||
// int32_t offset = 0;
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset);
|
||||
|
||||
struct SResultRowEntryInfo* pResInfo = pCtx[i].resultInfo;
|
||||
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
|
||||
offset += pCtx[i].resDataInfo.bytes;
|
||||
// offset += pCtx[i].resDataInfo.bytes;
|
||||
continue;
|
||||
}
|
||||
|
||||
pCtx[i].pOutput = getPosInResultPage_rv(bufPage, pResult->offset, offset);
|
||||
offset += pCtx[i].resDataInfo.bytes;
|
||||
// offset += pCtx[i].resDataInfo.bytes;
|
||||
|
||||
int32_t functionId = pCtx[i].functionId;
|
||||
if (functionId < 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
|
||||
if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
|
||||
}
|
||||
|
||||
// if (!pResInfo->initialized) {
|
||||
// aAggs[functionId].init(&pCtx[i], pResInfo);
|
||||
// int32_t functionId = pCtx[i].functionId;
|
||||
// if (functionId < 0) {
|
||||
// continue;
|
||||
// }
|
||||
// if (functionId == FUNCTION_TOP || functionId == FUNCTION_BOTTOM || functionId == FUNCTION_DIFF) {
|
||||
// if (i > 0) pCtx[i].ptsOutputBuf = pCtx[i - 1].pOutput;
|
||||
// }
|
||||
|
||||
if (!pResInfo->initialized) {
|
||||
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3650,7 +3680,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, i
|
|||
int32_t* rowCellInfoOffset = pAggInfo->binfo.rowCellInfoOffset;
|
||||
|
||||
SResultRow* pResultRow =
|
||||
doSetResultOutBufByKey_rv(pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid, pTaskInfo, false, &pAggInfo->aggSup);
|
||||
doSetResultOutBufByKey_rv(pAggInfo->pResultBuf, pResultRowInfo, tid, (char*)&tableGroupId, sizeof(tableGroupId), true, uid, pTaskInfo, false, &pAggInfo->aggSup);
|
||||
assert (pResultRow != NULL);
|
||||
|
||||
/*
|
||||
|
@ -3891,8 +3921,7 @@ void setIntervalQueryRange(STaskRuntimeEnv *pRuntimeEnv, TSKEY key) {
|
|||
* @param pQInfo
|
||||
* @param result
|
||||
*/
|
||||
|
||||
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity) {
|
||||
static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResInfo, int32_t orderType, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) {
|
||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
|
||||
|
||||
|
@ -3910,13 +3939,19 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI
|
|||
step = -1;
|
||||
}
|
||||
|
||||
int32_t nrows = pBlock->info.rows;
|
||||
|
||||
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
|
||||
SResultRow* pRow = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||
SResultRowPosition* pPos = taosArrayGet(pGroupResInfo->pRows, i);
|
||||
SFilePage *page = getBufPage(pBuf, pPos->pageId);
|
||||
|
||||
SResultRow* pRow = (SResultRow*)((char*)page + pPos->offset);
|
||||
if (pRow->numOfRows == 0) {
|
||||
pGroupResInfo->index += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// TODO copy multiple rows?
|
||||
int32_t numOfRowsToCopy = pRow->numOfRows;
|
||||
if (numOfResult + numOfRowsToCopy >= rowCapacity) {
|
||||
break;
|
||||
|
@ -3924,20 +3959,17 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI
|
|||
|
||||
pGroupResInfo->index += 1;
|
||||
|
||||
SFilePage *page = getBufPage(pBuf, pRow->pageId);
|
||||
|
||||
int32_t offset = 0;
|
||||
for (int32_t j = 0; j < pBlock->info.numOfCols; ++j) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, j);
|
||||
int32_t bytes = pColInfoData->info.bytes;
|
||||
SResultRowEntryInfo* pEntryInfo = getResultCell(pRow, j, rowCellOffset);
|
||||
|
||||
char *out = pColInfoData->pData + numOfResult * bytes;
|
||||
char *in = getPosInResultPage_rv(page, pRow->offset, offset);
|
||||
memcpy(out, in, bytes * numOfRowsToCopy);
|
||||
|
||||
offset += bytes;
|
||||
char* in = GET_ROWCELL_INTERBUF(pEntryInfo);
|
||||
colDataAppend(pColInfoData, nrows, in, pEntryInfo->numOfRes == 0);
|
||||
}
|
||||
|
||||
releaseBufPage(pBuf, page);
|
||||
nrows += 1;
|
||||
|
||||
numOfResult += numOfRowsToCopy;
|
||||
if (numOfResult == rowCapacity) { // output buffer is full
|
||||
break;
|
||||
|
@ -3949,16 +3981,16 @@ static int32_t doCopyToSDataBlock(SDiskbasedBuf *pBuf, SGroupResInfo* pGroupResI
|
|||
return 0;
|
||||
}
|
||||
|
||||
static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity) {
|
||||
static void toSDatablock(SGroupResInfo *pGroupResInfo, SDiskbasedBuf* pBuf, SSDataBlock* pBlock, int32_t rowCapacity, int32_t* rowCellOffset) {
|
||||
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
|
||||
|
||||
pBlock->info.rows = 0;
|
||||
blockDataClearup(pBlock);
|
||||
if (!hasRemainDataInCurrentGroup(pGroupResInfo)) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t orderType = TSDB_ORDER_ASC;//(pQueryAttr->pGroupbyExpr != NULL) ? pQueryAttr->pGroupbyExpr->orderType : TSDB_ORDER_ASC;
|
||||
doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity);
|
||||
doCopyToSDataBlock(pBuf, pGroupResInfo, orderType, pBlock, rowCapacity, rowCellOffset);
|
||||
|
||||
// add condition (pBlock->info.rows >= 1) just to runtime happy
|
||||
blockDataUpdateTsWindow(pBlock);
|
||||
|
@ -5173,9 +5205,8 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo *pOperator, bool* newgroup) {
|
|||
SExchangeInfo *pExchangeInfo = pOperator->info;
|
||||
SExecTaskInfo *pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
int32_t code = pOperator->_openFn(pOperator);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
pTaskInfo->code = code;
|
||||
pTaskInfo->code = pOperator->_openFn(pOperator);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -5761,7 +5792,7 @@ static void appendOneRowToDataBlock(SSDataBlock *pBlock, STupleHandle* pTupleHan
|
|||
}
|
||||
|
||||
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, bool hasVarCol, int32_t capacity) {
|
||||
blockDataClearup(pDataBlock, hasVarCol);
|
||||
blockDataClearup(pDataBlock);
|
||||
|
||||
while(1) {
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
|
@ -5917,7 +5948,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) {
|
|||
|
||||
while(1) {
|
||||
|
||||
blockDataClearup(pDataBlock, pInfo->hasVarCol);
|
||||
blockDataClearup(pDataBlock);
|
||||
while (1) {
|
||||
STupleHandle* pTupleHandle = tsortNextTuple(pHandle);
|
||||
if (pTupleHandle == NULL) {
|
||||
|
@ -6234,7 +6265,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo *pOperator) {
|
|||
doAggregateImpl(pOperator, 0, pInfo->pCtx);
|
||||
}
|
||||
|
||||
finalizeQueryResult(pOperator, pInfo->pCtx, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
|
||||
finalizeQueryResult(pInfo->pCtx, pOperator->numOfOutput);
|
||||
|
||||
OPTR_SET_OPENED(pOperator);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -6270,7 +6301,7 @@ static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgro
|
|||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity);
|
||||
toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity, pAggInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
|
@ -6319,7 +6350,7 @@ static SSDataBlock* doMultiTableAggregate(SOperatorInfo *pOperator, bool* newgro
|
|||
updateNumOfRowsInResultRows(pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo, pInfo->rowCellInfoOffset);
|
||||
|
||||
initGroupResInfo(&pAggInfo->groupResInfo, &pInfo->resultRowInfo);
|
||||
toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity);
|
||||
toSDatablock(&pAggInfo->groupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity, pAggInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
|
@ -6333,7 +6364,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo *pOperator, bool* newgroup)
|
|||
SOptrBasicInfo *pInfo = &pProjectInfo->binfo;
|
||||
|
||||
SSDataBlock* pRes = pInfo->pRes;
|
||||
blockDataClearup(pRes, pProjectInfo->hasVarCol);
|
||||
blockDataClearup(pRes);
|
||||
|
||||
if (pProjectInfo->existDataBlock) { // TODO refactor
|
||||
// STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
||||
|
@ -6496,29 +6527,21 @@ static SSDataBlock* doFilter(void* param, bool* newgroup) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
static int32_t doOpenIntervalAgg(SOperatorInfo *pOperator) {
|
||||
if (OPTR_IS_OPENED(pOperator)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
// toSDatablock(pAggInfo->pGroupResInfo, pAggInfo->pResultBuf, pInfo->pRes, pAggInfo->binfo.capacity);
|
||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
return pInfo->binfo.pRes;
|
||||
}
|
||||
|
||||
// int32_t order = pQueryAttr->order.order;
|
||||
// STimeWindow win = pQueryAttr->window;
|
||||
bool newgroup = false;
|
||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||
|
||||
while (1) {
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, newgroup);
|
||||
SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup);
|
||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
|
@ -6532,17 +6555,29 @@ static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
|
|||
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
||||
}
|
||||
|
||||
// restore the value
|
||||
// pQueryAttr->order.order = order;
|
||||
// pQueryAttr->window = win;
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pInfo->binfo.resultRowInfo);
|
||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
|
||||
finalizeMultiTupleQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->pResultBuf, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
initGroupResInfo(&pInfo->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||
toSDatablock(&pInfo->groupResInfo, pInfo->pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity);
|
||||
OPTR_SET_OPENED(pOperator);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SSDataBlock* doIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
|
||||
STableIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pTaskInfo->code = pOperator->_openFn(pOperator);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
|
||||
toSDatablock(&pInfo->groupResInfo, pInfo->pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity, pInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
|
@ -6598,7 +6633,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
|
|||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pIntervalInfo->binfo.resultRowInfo);
|
||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
finalizeQueryResult(pOperator, pIntervalInfo->binfo.pCtx, &pIntervalInfo->binfo.resultRowInfo, pIntervalInfo->binfo.rowCellInfoOffset);
|
||||
finalizeQueryResult(pIntervalInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
|
||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->binfo.resultRowInfo);
|
||||
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
||||
|
@ -6847,7 +6882,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo *pOperator, bool* newgroup) {
|
|||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pBInfo->resultRowInfo);
|
||||
setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
|
||||
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
|
||||
finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput);
|
||||
|
||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo);
|
||||
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
||||
|
@ -6896,7 +6931,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup)
|
|||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pBInfo->resultRowInfo);
|
||||
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
|
||||
finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset);
|
||||
finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput);
|
||||
|
||||
// initGroupResInfo(&pBInfo->groupResInfo, &pBInfo->resultRowInfo);
|
||||
// toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes);
|
||||
|
@ -6950,7 +6985,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo *pOperator, bool* newgrou
|
|||
// setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED);
|
||||
|
||||
if (!pRuntimeEnv->pQueryAttr->stableQuery) { // finalize include the update of result rows
|
||||
finalizeQueryResult(pOperator, pInfo->binfo.pCtx, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
|
||||
finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfOutput);
|
||||
} else {
|
||||
updateNumOfRowsInResultRows(pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
|
||||
}
|
||||
|
@ -7109,11 +7144,11 @@ int32_t doInitAggInfoSup(SAggSupporter* pAggSup, SqlFunctionCtx *pCtx, int32_t n
|
|||
pAggSup->keyBuf = calloc(1, sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES);
|
||||
pAggSup->pResultRowHashTable = taosHashInit(10, hashFn, true, HASH_NO_LOCK);
|
||||
pAggSup->pResultRowListSet = taosHashInit(100, hashFn, false, HASH_NO_LOCK);
|
||||
pAggSup->pool = initResultRowPool(pAggSup->resultRowSize);
|
||||
// pAggSup->pool = initResultRowPool(pAggSup->resultRowSize);
|
||||
pAggSup->pResultRowArrayList = taosArrayInit(10, sizeof(SResultRowCell));
|
||||
|
||||
if (pAggSup->keyBuf == NULL || pAggSup->pResultRowArrayList == NULL || pAggSup->pResultRowListSet == NULL ||
|
||||
pAggSup->pResultRowHashTable == NULL || pAggSup->pool == NULL) {
|
||||
pAggSup->pResultRowHashTable == NULL) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
@ -7125,7 +7160,7 @@ static void cleanupAggSup(SAggSupporter* pAggSup) {
|
|||
taosHashCleanup(pAggSup->pResultRowHashTable);
|
||||
taosHashCleanup(pAggSup->pResultRowListSet);
|
||||
taosArrayDestroy(pAggSup->pResultRowArrayList);
|
||||
destroyResultRowPool(pAggSup->pool);
|
||||
// destroyResultRowPool(pAggSup->pool);
|
||||
}
|
||||
|
||||
static int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||
|
@ -7460,7 +7495,10 @@ const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
|
|||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->interval = *pInterval;
|
||||
|
||||
int32_t numOfRows = 1;
|
||||
pInfo->win.skey = INT64_MIN;
|
||||
pInfo->win.ekey = INT64_MAX;
|
||||
|
||||
int32_t numOfRows = 4096;
|
||||
int32_t code = initAggInfo(&pInfo->binfo, &pInfo->aggSup, pExprInfo, numOfCols, numOfRows, pResBlock);
|
||||
// pInfo->pTableQueryInfo = initTableQueryInfo(pTableGroupInfo);
|
||||
if (code != TSDB_CODE_SUCCESS/* || pInfo->pTableQueryInfo == NULL*/) {
|
||||
|
@ -7483,7 +7521,7 @@ const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo) {
|
|||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pOperator->numOfOutput = numOfCols;
|
||||
pOperator->info = pInfo;
|
||||
pOperator->_openFn = operatorDummyOpenFn;
|
||||
pOperator->_openFn = doOpenIntervalAgg;
|
||||
pOperator->getNextFn = doIntervalAgg;
|
||||
pOperator->closeFn = destroyIntervalOperatorInfo;
|
||||
|
||||
|
@ -8304,10 +8342,14 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
|
|||
SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, i);
|
||||
SOperatorInfo* op = doCreateOperatorTreeNode(pChildNode, pTaskInfo, pHandle, queryId, taskId, pTableGroupInfo);
|
||||
|
||||
SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode;
|
||||
|
||||
int32_t num = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(((SIntervalPhysiNode*)pPhyNode)->pFuncs, &num);
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->pFuncs, &num);
|
||||
SSDataBlock* pResBlock = createOutputBuf_rv1(pPhyNode->pOutputDataBlockDesc);
|
||||
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, NULL, pTableGroupInfo, pTaskInfo);
|
||||
|
||||
SInterval interval = {.interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, .intervalUnit = 'a', .slidingUnit = 'a'};
|
||||
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo);
|
||||
}
|
||||
} /*else if (pPhyNode->info.type == OP_MultiTableAggregate) {
|
||||
size_t size = taosArrayGetSize(pPhyNode->pChildren);
|
||||
|
|
|
@ -37,7 +37,6 @@ typedef struct SSortHandle {
|
|||
|
||||
SArray *pOrderInfo;
|
||||
bool nullFirst;
|
||||
bool hasVarCol;
|
||||
SArray *pOrderedSource;
|
||||
|
||||
_sort_fetch_block_fn_t fetchfp;
|
||||
|
@ -77,6 +76,10 @@ static SSDataBlock* createDataBlock_rv(SSchema* pSchema, int32_t numOfCols) {
|
|||
colInfo.info.bytes = pSchema[i].bytes;
|
||||
colInfo.info.colId = pSchema[i].colId;
|
||||
taosArrayPush(pBlock->pDataBlock, &colInfo);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(colInfo.info.type)) {
|
||||
pBlock->info.hasVarCol = true;
|
||||
}
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
|
@ -155,7 +158,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
|||
|
||||
while(start < pDataBlock->info.rows) {
|
||||
int32_t stop = 0;
|
||||
blockDataSplitRows(pDataBlock, pHandle->hasVarCol, start, &stop, pHandle->pageSize);
|
||||
blockDataSplitRows(pDataBlock, pDataBlock->info.hasVarCol, start, &stop, pHandle->pageSize);
|
||||
SSDataBlock* p = blockDataExtractBlock(pDataBlock, start, stop - start + 1);
|
||||
if (p == NULL) {
|
||||
return terrno;
|
||||
|
@ -179,7 +182,7 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) {
|
|||
start = stop + 1;
|
||||
}
|
||||
|
||||
blockDataClearup(pDataBlock, pHandle->hasVarCol);
|
||||
blockDataClearup(pDataBlock);
|
||||
|
||||
SSDataBlock* pBlock = createOneDataBlock(pDataBlock);
|
||||
int32_t code = doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId);
|
||||
|
@ -309,7 +312,7 @@ static int32_t adjustMergeTreeForNextTuple(SExternalMemSource *pSource, SMultiwa
|
|||
}
|
||||
|
||||
static SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SMsortComparParam* cmpParam, int32_t capacity) {
|
||||
blockDataClearup(pHandle->pDataBlock, pHandle->hasVarCol);
|
||||
blockDataClearup(pHandle->pDataBlock);
|
||||
|
||||
while(1) {
|
||||
if (cmpParam->numOfSources == pHandle->numOfCompletedSources) {
|
||||
|
@ -475,7 +478,7 @@ static int32_t doInternalMergeSort(SSortHandle* pHandle) {
|
|||
setBufPageDirty(pPage, true);
|
||||
releaseBufPage(pHandle->pBuf, pPage);
|
||||
|
||||
blockDataClearup(pDataBlock, pHandle->hasVarCol);
|
||||
blockDataClearup(pDataBlock);
|
||||
}
|
||||
|
||||
tMergeTreeDestroy(pHandle->pMergeTree);
|
||||
|
|
|
@ -52,10 +52,6 @@ static void doFinalizer(SResultRowEntryInfo* pResInfo) { cleanupResultRowEntry(p
|
|||
|
||||
void functionFinalizer(SqlFunctionCtx *pCtx) {
|
||||
SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx);
|
||||
if (pResInfo->hasResult != DATA_SET_FLAG) {
|
||||
// setNull(pCtx->pOutput, pCtx->resDataInfo.type, pCtx->resDataInfo.bytes);
|
||||
}
|
||||
|
||||
doFinalizer(pResInfo);
|
||||
}
|
||||
|
||||
|
|
|
@ -310,9 +310,12 @@ static SLogicNode* createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInt
|
|||
pWindow->node.id = pCxt->planNodeId++;
|
||||
|
||||
pWindow->winType = WINDOW_TYPE_INTERVAL;
|
||||
pWindow->interval = ((SValueNode*)pInterval->pInterval)->datum.i;
|
||||
SValueNode* pIntervalNode = (SValueNode*)((SRawExprNode*)(pInterval->pInterval))->pNode;
|
||||
|
||||
pWindow->interval = pIntervalNode->datum.i;
|
||||
pWindow->offset = (NULL != pInterval->pOffset ? ((SValueNode*)pInterval->pOffset)->datum.i : 0);
|
||||
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : 0);
|
||||
pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval);
|
||||
|
||||
if (NULL != pInterval->pFill) {
|
||||
pWindow->pFill = nodesCloneNode(pInterval->pFill);
|
||||
CHECK_ALLOC(pWindow->pFill, (SLogicNode*)pWindow);
|
||||
|
|
Loading…
Reference in New Issue