fix(query): set buffer page dirty when place data in it.
This commit is contained in:
parent
25e2a9dd72
commit
03c916b436
|
@ -156,7 +156,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
|
||||||
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
|
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
|
||||||
|
|
||||||
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
||||||
int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
||||||
|
|
||||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||||
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
|
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
|
||||||
|
@ -182,10 +182,11 @@ static int compareRowData(const void* a, const void* b, const void* userData) {
|
||||||
SFilePage* page2 = getBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId);
|
SFilePage* page2 = getBufPage(pRuntimeEnv->pResultBuf, pRow2->pageId);
|
||||||
|
|
||||||
int16_t offset = supporter->dataOffset;
|
int16_t offset = supporter->dataOffset;
|
||||||
char* in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset);
|
return 0;
|
||||||
char* in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset);
|
// char* in1 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page1, pRow1->offset, offset);
|
||||||
|
// char* in2 = getPosInResultPage(pRuntimeEnv->pQueryAttr, page2, pRow2->offset, offset);
|
||||||
|
|
||||||
return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0;
|
// return (in1 != NULL && in2 != NULL) ? supporter->comFunc(in1, in2) : 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// setup the output buffer for each operator
|
// setup the output buffer for each operator
|
||||||
|
@ -333,6 +334,8 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId,
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
setBufPageDirty(pData, true);
|
||||||
|
|
||||||
// set the number of rows in current disk page
|
// set the number of rows in current disk page
|
||||||
SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
|
SResultRow* pResultRow = (SResultRow*)((char*)pData + pData->num);
|
||||||
pResultRow->pageId = pageId;
|
pResultRow->pageId = pageId;
|
||||||
|
@ -364,12 +367,14 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
if (isIntervalQuery) {
|
if (isIntervalQuery) {
|
||||||
if (masterscan && p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists.
|
if (masterscan && p1 != NULL) { // the *p1 may be NULL in case of sliding+offset exists.
|
||||||
pResult = getResultRowByPos(pResultBuf, p1);
|
pResult = getResultRowByPos(pResultBuf, p1);
|
||||||
|
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
|
// In case of group by column query, the required SResultRow object must be existInCurrentResusltRowInfo in the
|
||||||
// pResultRowInfo object.
|
// pResultRowInfo object.
|
||||||
if (p1 != NULL) {
|
if (p1 != NULL) {
|
||||||
pResult = getResultRowByPos(pResultBuf, p1);
|
pResult = getResultRowByPos(pResultBuf, p1);
|
||||||
|
ASSERT(pResult->pageId == p1->pageId && pResult->offset == p1->offset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1417,21 +1422,6 @@ static int32_t updateBlockLoadStatus(STaskAttr* pQuery, int32_t status) {
|
||||||
// }
|
// }
|
||||||
//}
|
//}
|
||||||
|
|
||||||
static void getIntermediateBufInfo(STaskRuntimeEnv* pRuntimeEnv, int32_t* ps, int32_t* rowsize) {
|
|
||||||
STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
|
||||||
int32_t MIN_ROWS_PER_PAGE = 4;
|
|
||||||
|
|
||||||
*rowsize = (int32_t)(pQueryAttr->resultRowSize *
|
|
||||||
getRowNumForMultioutput(pQueryAttr, pQueryAttr->topBotQuery, pQueryAttr->stableQuery));
|
|
||||||
int32_t overhead = sizeof(SFilePage);
|
|
||||||
|
|
||||||
// one page contains at least two rows
|
|
||||||
*ps = DEFAULT_INTERN_BUF_PAGE_SIZE;
|
|
||||||
while (((*rowsize) * MIN_ROWS_PER_PAGE) > (*ps) - overhead) {
|
|
||||||
*ps = ((*ps) << 1u);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
|
// static FORCE_INLINE bool doFilterByBlockStatistics(STaskRuntimeEnv* pRuntimeEnv, SDataStatis *pDataStatis,
|
||||||
// SqlFunctionCtx *pCtx, int32_t numOfRows) {
|
// SqlFunctionCtx *pCtx, int32_t numOfRows) {
|
||||||
// STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
// STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||||
|
@ -1708,36 +1698,6 @@ int32_t loadDataBlockOnDemand(SExecTaskInfo* pTaskInfo, STableScanInfo* pTableSc
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
void copyToSDataBlock(SSDataBlock* pBlock, int32_t* offset, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pResBuf) {
|
|
||||||
pBlock->info.rows = 0;
|
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
while (pGroupResInfo->currentGroup < pGroupResInfo->totalGroup) {
|
|
||||||
// all results in current group have been returned to client, try next group
|
|
||||||
if ((pGroupResInfo->pRows == NULL) || taosArrayGetSize(pGroupResInfo->pRows) == 0) {
|
|
||||||
assert(pGroupResInfo->index == 0);
|
|
||||||
// if ((code = mergeIntoGroupResult(&pGroupResInfo, pRuntimeEnv, offset)) != TSDB_CODE_SUCCESS) {
|
|
||||||
return;
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
|
|
||||||
// doCopyToSDataBlock(pResBuf, pGroupResInfo, TSDB_ORDER_ASC, pBlock, );
|
|
||||||
|
|
||||||
// current data are all dumped to result buffer, clear it
|
|
||||||
if (!hasRemainDataInCurrentGroup(pGroupResInfo)) {
|
|
||||||
cleanupGroupResInfo(pGroupResInfo);
|
|
||||||
if (!incNextGroup(pGroupResInfo)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// enough results in data buffer, return
|
|
||||||
// if (pBlock->info.rows >= threshold) {
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
|
static void updateTableQueryInfoForReverseScan(STableQueryInfo* pTableQueryInfo) {
|
||||||
if (pTableQueryInfo == NULL) {
|
if (pTableQueryInfo == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -1884,35 +1844,6 @@ void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo merged with the build group result.
|
|
||||||
void finalizeMultiTupleQueryResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo,
|
|
||||||
int32_t* rowCellInfoOffset) {
|
|
||||||
for (int32_t i = 0; i < pResultRowInfo->size; ++i) {
|
|
||||||
SResultRowPosition* pPos = &pResultRowInfo->pPosition[i];
|
|
||||||
|
|
||||||
SFilePage* bufPage = getBufPage(pBuf, pPos->pageId);
|
|
||||||
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->offset);
|
|
||||||
|
|
||||||
// TODO ignore the close status anyway.
|
|
||||||
// if (!isResultRowClosed(pRow)) {
|
|
||||||
// continue;
|
|
||||||
// }
|
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfOutput; ++j) {
|
|
||||||
struct SResultRowEntryInfo* pResInfo = getResultCell(pRow, j, rowCellInfoOffset);
|
|
||||||
if (!isRowEntryInitialized(pResInfo)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pRow->numOfRows < pResInfo->numOfRes) {
|
|
||||||
pRow->numOfRows = pResInfo->numOfRes;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
releaseBufPage(pBuf, bufPage);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win) {
|
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win) {
|
||||||
STableQueryInfo* pTableQueryInfo = buf;
|
STableQueryInfo* pTableQueryInfo = buf;
|
||||||
pTableQueryInfo->lastKey = win.skey;
|
pTableQueryInfo->lastKey = win.skey;
|
||||||
|
@ -2062,20 +1993,34 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
|
||||||
pAggInfo->groupId = groupId;
|
pAggInfo->groupId = groupId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doUpdateNumOfRows(SResultRow* pRow, int32_t numOfExprs, const int32_t* rowCellOffset) {
|
||||||
|
for (int32_t j = 0; j < numOfExprs; ++j) {
|
||||||
|
struct SResultRowEntryInfo* pResInfo = getResultCell(pRow, j, rowCellOffset);
|
||||||
|
if (!isRowEntryInitialized(pResInfo)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRow->numOfRows < pResInfo->numOfRes) {
|
||||||
|
pRow->numOfRows = pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
||||||
int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) {
|
const int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) {
|
||||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||||
int32_t start = pGroupResInfo->index;
|
int32_t start = pGroupResInfo->index;
|
||||||
|
|
||||||
// qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv));
|
|
||||||
|
|
||||||
for (int32_t i = start; i < numOfRows; i += 1) {
|
for (int32_t i = start; i < numOfRows; i += 1) {
|
||||||
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||||
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
|
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
|
||||||
|
|
||||||
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
|
SResultRow* pRow = (SResultRow*)((char*)page + pPos->pos.offset);
|
||||||
|
|
||||||
|
doUpdateNumOfRows(pRow, numOfExprs, rowCellOffset);
|
||||||
if (pRow->numOfRows == 0) {
|
if (pRow->numOfRows == 0) {
|
||||||
pGroupResInfo->index += 1;
|
pGroupResInfo->index += 1;
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2084,11 +2029,13 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
||||||
} else {
|
} else {
|
||||||
// current value belongs to different group, it can't be packed into one datablock
|
// current value belongs to different group, it can't be packed into one datablock
|
||||||
if (pBlock->info.groupId != pPos->groupId) {
|
if (pBlock->info.groupId != pPos->groupId) {
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||||
|
releaseBufPage(pBuf, page);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2130,8 +2077,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprI
|
||||||
}
|
}
|
||||||
|
|
||||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) {
|
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) {
|
||||||
assert(pGroupResInfo->currentGroup <= pGroupResInfo->totalGroup);
|
|
||||||
|
|
||||||
SExprInfo* pExprInfo = pOperator->pExpr;
|
SExprInfo* pExprInfo = pOperator->pExpr;
|
||||||
int32_t numOfExprs = pOperator->numOfExprs;
|
int32_t numOfExprs = pOperator->numOfExprs;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
@ -2141,7 +2086,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
SqlFunctionCtx* pCtx = pbInfo->pCtx;
|
SqlFunctionCtx* pCtx = pbInfo->pCtx;
|
||||||
|
|
||||||
blockDataCleanup(pBlock);
|
blockDataCleanup(pBlock);
|
||||||
if (!hasRemainDataInCurrentGroup(pGroupResInfo)) {
|
if (!hashRemainDataInGroupInfo(pGroupResInfo)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3627,9 +3572,6 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
|
closeAllResultRows(&pAggInfo->binfo.resultRowInfo);
|
||||||
finalizeMultiTupleQueryResult(pOperator->numOfExprs, pAggInfo->aggSup.pResultBuf, &pAggInfo->binfo.resultRowInfo,
|
|
||||||
pAggInfo->binfo.rowCellInfoOffset);
|
|
||||||
|
|
||||||
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
|
initGroupedResultInfo(&pAggInfo->groupResInfo, pAggInfo->aggSup.pResultRowHashTable, 0);
|
||||||
OPTR_SET_OPENED(pOperator);
|
OPTR_SET_OPENED(pOperator);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -3651,7 +3593,7 @@ static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, pInfo, &pAggInfo->groupResInfo, pAggInfo->aggSup.pResultBuf);
|
||||||
if (pInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pAggInfo->groupResInfo)) {
|
if (pInfo->pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pAggInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -269,7 +269,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pRes->info.rows == 0 || !hashRemainDataInGroupInfo(&pInfo->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
return (pRes->info.rows == 0)? NULL:pRes;
|
return (pRes->info.rows == 0)? NULL:pRes;
|
||||||
|
@ -303,9 +303,6 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pOperator->status = OP_RES_TO_RETURN;
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
closeAllResultRows(&pInfo->binfo.resultRowInfo);
|
closeAllResultRows(&pInfo->binfo.resultRowInfo);
|
||||||
|
|
||||||
finalizeMultiTupleQueryResult(pOperator->numOfExprs, pInfo->aggSup.pResultBuf, &pInfo->binfo.resultRowInfo,
|
|
||||||
pInfo->binfo.rowCellInfoOffset);
|
|
||||||
// if (!stableQuery) { // finalize include the update of result rows
|
// if (!stableQuery) { // finalize include the update of result rows
|
||||||
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs);
|
// finalizeQueryResult(pInfo->binfo.pCtx, pOperator->numOfExprs);
|
||||||
// } else {
|
// } else {
|
||||||
|
@ -320,7 +317,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->aggSup.pResultBuf);
|
||||||
doFilter(pInfo->pCondition, pRes, NULL);
|
doFilter(pInfo->pCondition, pRes, NULL);
|
||||||
|
|
||||||
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
bool hasRemain = hashRemainDataInGroupInfo(&pInfo->groupResInfo);
|
||||||
if (!hasRemain) {
|
if (!hasRemain) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue