refactor: do some internal refactor.
This commit is contained in:
parent
eb6f95c7dd
commit
c2a918a85e
|
@ -155,7 +155,7 @@ SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn,
|
||||||
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
|
void operatorDummyCloseFn(void* param, int32_t numOfCols) {}
|
||||||
|
|
||||||
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
static int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
||||||
int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs);
|
||||||
|
|
||||||
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
static void initCtxOutputBuffer(SqlFunctionCtx* pCtx, int32_t size);
|
||||||
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
|
static void setResultBufSize(STaskAttr* pQueryAttr, SResultInfo* pResultInfo);
|
||||||
|
@ -2136,23 +2136,13 @@ void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* p
|
||||||
* @param result
|
* @param result
|
||||||
*/
|
*/
|
||||||
int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf, SGroupResInfo* pGroupResInfo,
|
||||||
int32_t orderType, int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) {
|
int32_t* rowCellOffset, SqlFunctionCtx* pCtx, int32_t numOfExprs) {
|
||||||
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
|
||||||
int32_t numOfResult = pBlock->info.rows; // there are already exists result rows
|
int32_t start = pGroupResInfo->index;
|
||||||
|
|
||||||
int32_t start = 0;
|
|
||||||
int32_t step = 1;
|
|
||||||
|
|
||||||
// qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv));
|
// qDebug("QInfo:0x%"PRIx64" start to copy data from windowResInfo to output buf", GET_TASKID(pRuntimeEnv));
|
||||||
assert(orderType == TSDB_ORDER_ASC || orderType == TSDB_ORDER_DESC);
|
|
||||||
|
|
||||||
if (orderType == TSDB_ORDER_ASC) {
|
for (int32_t i = start; i < numOfRows; i += 1) {
|
||||||
start = pGroupResInfo->index;
|
|
||||||
} else { // desc order copy all data
|
|
||||||
start = numOfRows - pGroupResInfo->index - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = start; (i < numOfRows) && (i >= 0); i += step) {
|
|
||||||
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
SResKeyPos* pPos = taosArrayGetP(pGroupResInfo->pRows, i);
|
||||||
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
|
SFilePage* page = getBufPage(pBuf, pPos->pos.pageId);
|
||||||
|
|
||||||
|
@ -2162,9 +2152,7 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO copy multiple rows?
|
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
|
||||||
int32_t numOfRowsToCopy = pRow->numOfRows;
|
|
||||||
if (numOfResult + numOfRowsToCopy >= pBlock->info.capacity) {
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2195,7 +2183,6 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* taskInfo, SSDataBlock* pBlock, SExprIn
|
||||||
}
|
}
|
||||||
|
|
||||||
releaseBufPage(pBuf, page);
|
releaseBufPage(pBuf, page);
|
||||||
|
|
||||||
pBlock->info.rows += pRow->numOfRows;
|
pBlock->info.rows += pRow->numOfRows;
|
||||||
if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
|
if (pBlock->info.rows >= pBlock->info.capacity) { // output buffer is full
|
||||||
break;
|
break;
|
||||||
|
@ -2223,8 +2210,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t orderType = TSDB_ORDER_ASC;
|
doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, rowCellOffset, pCtx, numOfExprs);
|
||||||
doCopyToSDataBlock(pTaskInfo, pBlock, pExprInfo, pBuf, pGroupResInfo, orderType, rowCellOffset, pCtx, numOfExprs);
|
|
||||||
|
|
||||||
// add condition (pBlock->info.rows >= 1) just to runtime happy
|
// add condition (pBlock->info.rows >= 1) just to runtime happy
|
||||||
blockDataUpdateTsWindow(pBlock);
|
blockDataUpdateTsWindow(pBlock);
|
||||||
|
|
Loading…
Reference in New Issue