[td-3047] refactor.
This commit is contained in:
parent
7488757485
commit
d88664a3e6
|
@ -210,6 +210,7 @@ typedef struct SQuery {
|
|||
|
||||
int32_t srcRowSize; // todo extract struct
|
||||
int32_t resultRowSize;
|
||||
int32_t intermediateResultRowSize; // intermediate result row size, in case of top-k query.
|
||||
int32_t maxSrcColumnSize;
|
||||
int32_t tagLen; // tag value length of current query
|
||||
SSqlGroupbyExpr* pGroupbyExpr;
|
||||
|
|
|
@ -76,7 +76,7 @@ typedef struct SDiskbasedResultBuf {
|
|||
SResultBufStatis statis;
|
||||
} SDiskbasedResultBuf;
|
||||
|
||||
#define DEFAULT_INTERN_BUF_PAGE_SIZE (256L) // in bytes
|
||||
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) // in bytes
|
||||
#define PAGE_INFO_INITIALIZER (SPageDiskInfo){-1, -1}
|
||||
|
||||
/**
|
||||
|
|
|
@ -54,9 +54,9 @@ static FORCE_INLINE SResultRow *getResultRow(SResultRowInfo *pResultRowInfo, int
|
|||
static FORCE_INLINE char *getPosInResultPage(SQuery *pQuery, tFilePage* page, int32_t rowOffset, int16_t offset) {
|
||||
assert(rowOffset >= 0 && pQuery != NULL);
|
||||
|
||||
// int32_t realRowId = (int32_t)(rowId * GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery));
|
||||
int32_t numOfRows = GET_ROW_PARAM_FOR_MULTIOUTPUT(pQuery, pQuery->topBotQuery, pQuery->stableQuery);
|
||||
// return ((char *)page->data) + offset * numOfRowsPerPage + bytes * realRowId;
|
||||
return ((char *)page->data) + rowOffset + offset;
|
||||
return ((char *)page->data) + rowOffset + offset * numOfRows;
|
||||
}
|
||||
|
||||
bool isNullOperator(SColumnFilterElem *pFilter, const char* minval, const char* maxval, int16_t type);
|
||||
|
@ -82,7 +82,7 @@ void interResToBinary(SBufferWriter* bw, SArray* pRes, int32_t tagLen);
|
|||
SArray* interResFromBinary(const char* data, int32_t len);
|
||||
void freeInterResult(void* param);
|
||||
|
||||
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, int32_t offset);
|
||||
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo);
|
||||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
||||
bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo);
|
||||
bool hasRemainData(SGroupResInfo* pGroupResInfo);
|
||||
|
|
|
@ -653,7 +653,8 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRow
|
|||
|
||||
// not assign result buffer yet, add new result buffer
|
||||
if (pResultRow->pageId == -1) {
|
||||
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQuery->resultRowSize);
|
||||
|
||||
int32_t ret = addNewWindowResultBuf(pResultRow, pResultBuf, (int32_t) groupId, pRuntimeEnv->pQuery->intermediateResultRowSize);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -2444,8 +2445,8 @@ static SColumnInfo* doGetTagColumnInfoById(SColumnInfo* pTagColList, int32_t num
|
|||
static void doSetTagValueInParam(void* pTable, int32_t tagColId, tVariant *tag, int16_t type, int16_t bytes);
|
||||
|
||||
//TODO refactor
|
||||
int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock *pBlock,
|
||||
uint32_t *status) {
|
||||
int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTableScanInfo, SSDataBlock* pBlock,
|
||||
uint32_t* status) {
|
||||
*status = BLK_DATA_NO_NEEDED;
|
||||
pBlock->pDataBlock = NULL;
|
||||
pBlock->pBlockStatis = NULL;
|
||||
|
@ -2502,19 +2503,23 @@ int32_t loadDataBlockOnDemand_rv(SQueryRuntimeEnv *pRuntimeEnv, STableScanInfo*
|
|||
int32_t numOfOutput = pTableScanInfo->numOfOutput;
|
||||
SQLFunctionCtx* pCtx = pTableScanInfo->pCtx;
|
||||
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
int32_t functionId = pCtx[i].functionId;
|
||||
int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId;
|
||||
if (pQuery->groupbyColumn) {
|
||||
(*status) = BLK_DATA_ALL_NEEDED;
|
||||
} else {
|
||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||
int32_t functionId = pCtx[i].functionId;
|
||||
int32_t colId = pTableScanInfo->pExpr[i].base.colInfo.colId;
|
||||
|
||||
// group by + first/last should not apply the first/last block filter
|
||||
if (!pQuery->groupbyColumn && (functionId == TSDB_FUNC_FIRST_DST || functionId == TSDB_FUNC_LAST_DST)) {
|
||||
(*status) |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
|
||||
if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
|
||||
// group by + first/last should not apply the first/last block filter
|
||||
if (functionId != TSDB_FUNC_FIRST_DST && functionId != TSDB_FUNC_LAST_DST) {
|
||||
(*status) |= aAggs[functionId].dataReqFunc(&pTableScanInfo->pCtx[i], &pBlock->info.window, colId);
|
||||
if (((*status) & BLK_DATA_ALL_NEEDED) == BLK_DATA_ALL_NEEDED) {
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
(*status) |= BLK_DATA_ALL_NEEDED;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
(*status) |= BLK_DATA_ALL_NEEDED;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -3495,12 +3500,6 @@ static int32_t doCopyToSData_rv(SQueryRuntimeEnv* pRuntimeEnv, SGroupResInfo* pG
|
|||
|
||||
int32_t numOfRowsToCopy = pRow->numOfRows;
|
||||
|
||||
//current output space is not enough to accommodate all data of this page, prepare more space
|
||||
// if (numOfRowsToCopy > (pRuntimeEnv->resultInfo.capacity - numOfResult)) {
|
||||
// int32_t newSize = pRuntimeEnv->resultInfo.capacity + (numOfRowsToCopy - numOfResult);
|
||||
// expandBuffer(pRuntimeEnv, newSize, pRuntimeEnv->qinfo);
|
||||
// }
|
||||
|
||||
pGroupResInfo->index += 1;
|
||||
|
||||
tFilePage *page = getResBufPage(pRuntimeEnv->pResultBuf, pRow->pageId);
|
||||
|
@ -4169,7 +4168,7 @@ int32_t doInitQInfo(SQInfo *pQInfo, STSBuf *pTsBuf, SArray* prevResult, void *ts
|
|||
|
||||
int32_t ps = DEFAULT_PAGE_SIZE;
|
||||
int32_t rowsize = 0;
|
||||
getIntermediateBufInfo(pRuntimeEnv, &ps, &rowsize);
|
||||
getIntermediateBufInfo(pRuntimeEnv, &ps, &pQuery->intermediateResultRowSize);
|
||||
int32_t TENMB = 1024*1024*10;
|
||||
|
||||
if (isSTableQuery && !onlyQueryTags(pQuery)) {
|
||||
|
@ -4630,7 +4629,7 @@ static SSDataBlock* doSTableAggregate(void* param) {
|
|||
updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->pCtx, pOperator->numOfOutput, &pInfo->resultRowInfo,
|
||||
pInfo->rowCellInfoOffset);
|
||||
|
||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo, 0);
|
||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->resultRowInfo);
|
||||
|
||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->pRes);
|
||||
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||
|
@ -4785,7 +4784,7 @@ static SSDataBlock* doIntervalAgg(void* param) {
|
|||
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
|
||||
finalizeQueryResult_rv(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
|
||||
|
||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo, 0);
|
||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo);
|
||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
||||
|
||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||
|
@ -4894,7 +4893,7 @@ static SSDataBlock* doHashGroupbyAgg(void* param) {
|
|||
|
||||
updateWindowResNumOfRes_rv(pRuntimeEnv, pInfo->binfo.pCtx, pOperator->numOfOutput, &pInfo->binfo.resultRowInfo, pInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo, 0);
|
||||
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pInfo->binfo.resultRowInfo);
|
||||
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pInfo->binfo.pRes);
|
||||
|
||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
||||
|
|
|
@ -344,13 +344,13 @@ void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo) {
|
|||
pGroupResInfo->index = 0;
|
||||
}
|
||||
|
||||
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo, int32_t offset) {
|
||||
void initGroupResInfo(SGroupResInfo* pGroupResInfo, SResultRowInfo* pResultInfo) {
|
||||
if (pGroupResInfo->pRows != NULL) {
|
||||
taosArrayDestroy(pGroupResInfo->pRows);
|
||||
}
|
||||
|
||||
pGroupResInfo->pRows = taosArrayFromList(pResultInfo->pResult, pResultInfo->size, POINTER_BYTES);
|
||||
pGroupResInfo->index = offset;
|
||||
pGroupResInfo->index = 0;
|
||||
|
||||
assert(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
||||
}
|
||||
|
|
|
@ -73,6 +73,60 @@ if $row != 100 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
sql select bottom(c3, 5) from tb_tb1 interval(1y);
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 0.00000 then
|
||||
print expect 0.00000, actual:$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 0.00000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data21 != 0.00000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data31 != 0.00000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select top(c4, 5) from tb_tb1 interval(1y);
|
||||
if $rows != 5 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 9.000000000 then
|
||||
print expect 9.000000000, acutal:$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data11 != 9.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data21 != 9.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data31 != 9.000000000 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select top(c3, 5) from tb_tb1 interval(40h)
|
||||
if $rows != 25 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data01 != 9.00000 then
|
||||
print expect 9.00000, actual:$data01
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql select last(*) from tb_tb9
|
||||
if $row != 1 then
|
||||
return -1
|
||||
|
|
Loading…
Reference in New Issue