refactor: do some internal refactor, and set the correct intermediate buffer size for average function.
This commit is contained in:
parent
2964542617
commit
25e2a9dd72
|
@ -42,11 +42,8 @@
|
|||
#define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str)
|
||||
|
||||
typedef struct SGroupResInfo {
|
||||
int32_t totalGroup;
|
||||
int32_t currentGroup;
|
||||
int32_t index;
|
||||
SArray* pRows; // SArray<SResultRowPosition*>
|
||||
bool ordered;
|
||||
SArray* pRows; // SArray<SResKeyPos>
|
||||
int32_t position;
|
||||
} SGroupResInfo;
|
||||
|
||||
|
@ -80,18 +77,12 @@ typedef struct SResultRowInfo {
|
|||
SResultRowPosition cur;
|
||||
} SResultRowInfo;
|
||||
|
||||
struct STaskAttr;
|
||||
struct STaskRuntimeEnv;
|
||||
struct SUdfInfo;
|
||||
struct SqlFunctionCtx;
|
||||
|
||||
int32_t getOutputInterResultBufSize(struct STaskAttr* pQueryAttr);
|
||||
|
||||
size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||
int32_t initResultRowInfo(SResultRowInfo* pResultRowInfo, int32_t size);
|
||||
void cleanupResultRowInfo(SResultRowInfo* pResultRowInfo);
|
||||
|
||||
void resetResultRowInfo(struct STaskRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo);
|
||||
int32_t numOfClosedResultRows(SResultRowInfo* pResultRowInfo);
|
||||
void closeAllResultRows(SResultRowInfo* pResultRowInfo);
|
||||
|
||||
|
@ -99,9 +90,7 @@ void initResultRow(SResultRow *pResultRow);
|
|||
void closeResultRow(SResultRow* pResultRow);
|
||||
bool isResultRowClosed(SResultRow* pResultRow);
|
||||
|
||||
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset);
|
||||
|
||||
int32_t getRowNumForMultioutput(struct STaskAttr* pQueryAttr, bool topBottomQuery, bool stable);
|
||||
struct SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset);
|
||||
|
||||
static FORCE_INLINE SResultRow *getResultRow(SDiskbasedBuf* pBuf, SResultRowInfo *pResultRowInfo, int32_t slot) {
|
||||
ASSERT(pResultRowInfo != NULL && slot >= 0 && slot < pResultRowInfo->size);
|
||||
|
@ -118,39 +107,13 @@ static FORCE_INLINE SResultRow *getResultRowByPos(SDiskbasedBuf* pBuf, SResultRo
|
|||
return pRow;
|
||||
}
|
||||
|
||||
static FORCE_INLINE char* getPosInResultPage(struct STaskAttr* pQueryAttr, SFilePage* page, int32_t rowOffset,
|
||||
int32_t offset) {
|
||||
assert(rowOffset >= 0 && pQueryAttr != NULL);
|
||||
|
||||
// int32_t numOfRows = (int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
|
||||
return ((char *)page->data);
|
||||
|
||||
}
|
||||
|
||||
static FORCE_INLINE char* getPosInResultPage_rv(SFilePage* page, int32_t rowOffset, int32_t offset) {
|
||||
assert(rowOffset >= 0);
|
||||
|
||||
int32_t numOfRows = 1;//(int32_t)getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery);
|
||||
return (char*) page + rowOffset + offset * numOfRows;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
SArray* pResult; // SArray<SResPair>
|
||||
int32_t colId;
|
||||
} SStddevInterResult;
|
||||
|
||||
void initGroupedResultInfo(SGroupResInfo* pGroupResInfo, SHashObj* pHashmap, int32_t order);
|
||||
void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList);
|
||||
|
||||
void cleanupGroupResInfo(SGroupResInfo* pGroupResInfo);
|
||||
bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo);
|
||||
bool hasRemainData(SGroupResInfo* pGroupResInfo);
|
||||
bool hashRemainDataInGroupInfo(SGroupResInfo* pGroupResInfo);
|
||||
|
||||
bool incNextGroup(SGroupResInfo* pGroupResInfo);
|
||||
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo);
|
||||
|
||||
int32_t mergeIntoGroupResult(SGroupResInfo* pGroupResInfo, struct STaskRuntimeEnv *pRuntimeEnv, int32_t* offset);
|
||||
|
||||
//int32_t initUdfInfo(struct SUdfInfo* pUdfInfo);
|
||||
|
||||
#endif // TDENGINE_QUERYUTIL_H
|
||||
|
|
|
@ -666,7 +666,6 @@ int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInf
|
|||
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
|
||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf);
|
||||
|
||||
void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
|
||||
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
|
||||
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
|
||||
int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* pData, int16_t type, int16_t bytes,
|
||||
|
|
|
@ -114,12 +114,6 @@ void closeAllResultRows(SResultRowInfo *pResultRowInfo) {
|
|||
assert(pResultRowInfo->size >= 0 && pResultRowInfo->capacity >= pResultRowInfo->size);
|
||||
|
||||
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
|
||||
// SResultRow* pRow = pResultRowInfo->pResult[i];
|
||||
// if (pRow->closed) {
|
||||
// continue;
|
||||
// }
|
||||
|
||||
// pRow->closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -144,11 +138,11 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) {
|
|||
for (int32_t i = 0; i < pRuntimeEnv->pQueryAttr->numOfOutput; ++i) {
|
||||
struct SResultRowEntryInfo *pEntryInfo = NULL;//pResultRow->pEntryInfo[i];
|
||||
|
||||
int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resSchema.bytes;
|
||||
char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset);
|
||||
memset(s, 0, size);
|
||||
// int16_t size = pRuntimeEnv->pQueryAttr->pExpr1[i].base.resSchema.bytes;
|
||||
// char * s = getPosInResultPage(pRuntimeEnv->pQueryAttr, page, pResultRow->offset, offset);
|
||||
// memset(s, 0, size);
|
||||
|
||||
offset += size;
|
||||
// offset += size;
|
||||
cleanupResultRowEntry(pEntryInfo);
|
||||
}
|
||||
}
|
||||
|
@ -161,7 +155,7 @@ void clearResultRow(STaskRuntimeEnv *pRuntimeEnv, SResultRow *pResultRow) {
|
|||
}
|
||||
|
||||
// TODO refactor: use macro
|
||||
SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, int32_t* offset) {
|
||||
SResultRowEntryInfo* getResultCell(const SResultRow* pRow, int32_t index, const int32_t* offset) {
|
||||
assert(index >= 0 && offset != NULL);
|
||||
return (SResultRowEntryInfo*)((char*) pRow->pEntryInfo + offset[index]);
|
||||
}
|
||||
|
@ -247,7 +241,7 @@ void initMultiResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayL
|
|||
ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
||||
}
|
||||
|
||||
bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo) {
|
||||
bool hashRemainDataInGroupInfo(SGroupResInfo* pGroupResInfo) {
|
||||
if (pGroupResInfo->pRows == NULL) {
|
||||
return false;
|
||||
}
|
||||
|
@ -255,18 +249,6 @@ bool hasRemainDataInCurrentGroup(SGroupResInfo* pGroupResInfo) {
|
|||
return pGroupResInfo->index < taosArrayGetSize(pGroupResInfo->pRows);
|
||||
}
|
||||
|
||||
bool hasRemainData(SGroupResInfo* pGroupResInfo) {
|
||||
if (hasRemainDataInCurrentGroup(pGroupResInfo)) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return pGroupResInfo->currentGroup < pGroupResInfo->totalGroup;
|
||||
}
|
||||
|
||||
bool incNextGroup(SGroupResInfo* pGroupResInfo) {
|
||||
return (++pGroupResInfo->currentGroup) < pGroupResInfo->totalGroup;
|
||||
}
|
||||
|
||||
int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo) {
|
||||
assert(pGroupResInfo != NULL);
|
||||
if (pGroupResInfo->pRows == 0) {
|
||||
|
@ -387,11 +369,6 @@ void orderTheResultRows(STaskRuntimeEnv* pRuntimeEnv) {
|
|||
}
|
||||
|
||||
static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupResInfo* pGroupResInfo, uint64_t groupId, int32_t* rowCellInfoOffset) {
|
||||
if (!pGroupResInfo->ordered) {
|
||||
orderTheResultRows(pRuntimeEnv);
|
||||
pGroupResInfo->ordered = true;
|
||||
}
|
||||
|
||||
if (pGroupResInfo->pRows == NULL) {
|
||||
pGroupResInfo->pRows = taosArrayInit(100, POINTER_BYTES);
|
||||
}
|
||||
|
@ -403,6 +380,7 @@ static int32_t mergeIntoGroupResultImplRv(STaskRuntimeEnv *pRuntimeEnv, SGroupRe
|
|||
break;
|
||||
}
|
||||
|
||||
|
||||
int64_t num = getNumOfResultWindowRes(pRuntimeEnv, &pResultRowCell->pos, rowCellInfoOffset);
|
||||
if (num <= 0) {
|
||||
continue;
|
||||
|
|
|
@ -815,9 +815,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
closeAllResultRows(&pInfo->binfo.resultRowInfo);
|
||||
finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo,
|
||||
pInfo->binfo.rowCellInfoOffset);
|
||||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, pInfo->order);
|
||||
OPTR_SET_OPENED(pOperator);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -933,7 +930,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -960,13 +957,11 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pBInfo->resultRowInfo);
|
||||
finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
|
||||
pBInfo->rowCellInfoOffset);
|
||||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
|
||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
|
@ -994,7 +989,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) {
|
|||
blockDataEnsureCapacity(pBlock, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
|
||||
if (pBlock->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
if (pBlock->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
|
@ -1082,7 +1077,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
if (pInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
|
@ -1413,7 +1408,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
|||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
return NULL;
|
||||
}
|
||||
|
@ -1440,13 +1435,11 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
|||
// restore the value
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
closeAllResultRows(&pBInfo->resultRowInfo);
|
||||
finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pBInfo->resultRowInfo,
|
||||
pBInfo->rowCellInfoOffset);
|
||||
|
||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, TSDB_ORDER_ASC);
|
||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
if (pBInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
|
@ -1461,7 +1454,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
|
|||
STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
||||
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
|
||||
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) {
|
||||
doSetOperatorCompleted(pOperator);
|
||||
}
|
||||
|
||||
|
@ -1493,7 +1486,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) {
|
|||
// initGroupedResultInfo(&pSliceInfo->groupResInfo, &pSliceInfo->binfo.resultRowInfo);
|
||||
// doBuildResultDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pSliceInfo->pRes);
|
||||
|
||||
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pSliceInfo->groupResInfo)) {
|
||||
if (pSliceInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pSliceInfo->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
|
@ -1695,7 +1688,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
return NULL;
|
||||
} else if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||
if (pInfo->binfo.pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||
|
|
|
@ -543,7 +543,7 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
|||
}
|
||||
|
||||
bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) {
|
||||
pEnv->calcMemSize = sizeof(double);
|
||||
pEnv->calcMemSize = sizeof(SAvgRes);
|
||||
return true;
|
||||
}
|
||||
|
||||
|
|
|
@ -187,9 +187,6 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
|
|||
}
|
||||
} else { // NOTE: the size may be -1, the this recycle page has not been flushed to disk yet.
|
||||
size = pg->length;
|
||||
if (size == -1) {
|
||||
printf("----\n");
|
||||
}
|
||||
}
|
||||
|
||||
ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1));
|
||||
|
|
Loading…
Reference in New Issue