Merge pull request #16276 from taosdata/feature/3_liaohj
fix(query): add one more condition check when merge file block
This commit is contained in:
commit
dd37065010
|
@ -1992,7 +1992,7 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
|
|||
TSDBROW fRow = tsdbRowFromBlockData(pBlockData, pDumpInfo->rowIndex);
|
||||
|
||||
// no last block
|
||||
if (pLastBlockReader->lastBlockData.nRow == 0) {
|
||||
if (pLastBlockReader->lastBlockData.nRow == 0 || (!hasDataInLastBlock(pLastBlockReader))) {
|
||||
if (tryCopyDistinctRowFromFileBlock(pReader, pBlockData, key, pDumpInfo)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
|
@ -2495,13 +2495,12 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) {
|
|||
}
|
||||
|
||||
static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||
TSDBKEY key = {0};
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SBlock* pBlock = NULL;
|
||||
|
||||
SReaderStatus* pStatus = &pReader->status;
|
||||
SDataBlockIter* pBlockIter = &pStatus->blockIter;
|
||||
|
||||
TSDBKEY key = {0};
|
||||
SBlock* pBlock = NULL;
|
||||
STableBlockScanInfo* pScanInfo = NULL;
|
||||
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter);
|
||||
SLastBlockReader* pLastBlockReader = pReader->status.fileIter.pLastBlockReader;
|
||||
|
|
|
@ -860,8 +860,8 @@ int32_t handleLimitOffset(SOperatorInfo *pOperator, SLimitInfo* pLimitInfo, SSDa
|
|||
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
||||
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
|
||||
|
||||
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
|
||||
int32_t forwardStep, TSKEY* tsCol, int32_t numOfTotal, int32_t numOfOutput, int32_t order);
|
||||
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
|
||||
int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput);
|
||||
|
||||
int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, int32_t numOfOutput, SArray* pColList, char** pNextStart);
|
||||
void updateLoadRemoteInfo(SLoadRemoteDataInfo *pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs,
|
||||
|
|
|
@ -378,15 +378,30 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow
|
|||
|
||||
void cleanupExecTimeWindowInfo(SColumnInfoData* pColData) { colDataDestroy(pColData); }
|
||||
|
||||
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow* pWin,
|
||||
SColumnInfoData* pTimeWindowData, int32_t offset, int32_t forwardStep, TSKEY* tsCol,
|
||||
int32_t numOfTotal, int32_t numOfOutput, int32_t order) {
|
||||
typedef struct {
|
||||
bool hasAgg;
|
||||
int32_t numOfRows;
|
||||
int32_t startOffset;
|
||||
} SFunctionCtxStatus;
|
||||
|
||||
static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
|
||||
pStatus->hasAgg = pCtx->input.colDataAggIsSet;
|
||||
pStatus->numOfRows = pCtx->input.numOfRows;
|
||||
pStatus->startOffset = pCtx->input.startRowIndex;
|
||||
}
|
||||
|
||||
static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) {
|
||||
pCtx->input.colDataAggIsSet = pStatus->hasAgg;
|
||||
pCtx->input.numOfRows = pStatus->numOfRows;
|
||||
pCtx->input.startRowIndex = pStatus->startOffset;
|
||||
}
|
||||
|
||||
void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset,
|
||||
int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) {
|
||||
for (int32_t k = 0; k < numOfOutput; ++k) {
|
||||
// keep it temporarily
|
||||
// todo no need this??
|
||||
bool hasAgg = pCtx[k].input.colDataAggIsSet;
|
||||
int32_t numOfRows = pCtx[k].input.numOfRows;
|
||||
int32_t startOffset = pCtx[k].input.startRowIndex;
|
||||
SFunctionCtxStatus status = {0};
|
||||
functionCtxSave(&pCtx[k], &status);
|
||||
|
||||
pCtx[k].input.startRowIndex = offset;
|
||||
pCtx[k].input.numOfRows = forwardStep;
|
||||
|
@ -424,9 +439,7 @@ void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, STimeWindow
|
|||
}
|
||||
|
||||
// restore it
|
||||
pCtx[k].input.colDataAggIsSet = hasAgg;
|
||||
pCtx[k].input.startRowIndex = startOffset;
|
||||
pCtx[k].input.numOfRows = numOfRows;
|
||||
functionCtxRestore(&pCtx[k], &status);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -277,7 +277,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
int32_t rowIndex = j - num;
|
||||
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
|
||||
|
||||
// assign the group keys or user input constant values if required
|
||||
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
|
||||
|
@ -295,7 +295,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
|||
}
|
||||
|
||||
int32_t rowIndex = pBlock->info.rows - num;
|
||||
doApplyFunctions(pTaskInfo, pCtx, &w, NULL, rowIndex, num, NULL, pBlock->info.rows, pOperator->exprSupp.numOfExprs, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs);
|
||||
doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -641,8 +641,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num
|
|||
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
||||
setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP);
|
||||
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &w, &pInfo->twAggSup.timeWindowData, startPos, 0, tsCols, pBlock->info.rows,
|
||||
numOfExprs, pInfo->inputOrder);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs);
|
||||
|
||||
if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) {
|
||||
closeResultRow(pr);
|
||||
|
@ -986,8 +985,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
if ((!pInfo->ignoreExpiredData || !isCloseWindow(&win, &pInfo->twAggSup)) &&
|
||||
inSlidingWindow(&pInfo->interval, &win, &pBlock->info)) {
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &win, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
|
||||
pBlock->info.rows, numOfOutput, pInfo->inputOrder);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||
pBlock->info.rows, numOfOutput);
|
||||
}
|
||||
|
||||
doCloseWindow(pResultRowInfo, pInfo, pResult);
|
||||
|
@ -1026,8 +1025,8 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
doWindowBorderInterpolation(pInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pSup);
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
|
||||
pBlock->info.rows, numOfOutput, pInfo->inputOrder);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||
pBlock->info.rows, numOfOutput);
|
||||
doCloseWindow(pResultRowInfo, pInfo, pResult);
|
||||
}
|
||||
|
||||
|
@ -1190,8 +1189,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
}
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
|
||||
|
||||
// here we start a new session window
|
||||
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
|
||||
|
@ -1215,8 +1214,8 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI
|
|||
}
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
|
||||
}
|
||||
|
||||
static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) {
|
||||
|
@ -1934,8 +1933,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
|||
|
||||
// pInfo->numOfRows data belong to the current session window
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &window, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
|
||||
|
||||
// here we start a new session window
|
||||
doKeepNewWindowStartInfo(pRowSup, tsList, j, gid);
|
||||
|
@ -1952,8 +1951,8 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator
|
|||
}
|
||||
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pRowSup->win, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex,
|
||||
pRowSup->numOfRows, pBlock->info.rows, numOfOutput);
|
||||
}
|
||||
|
||||
static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
|
||||
|
@ -2952,8 +2951,8 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
|
|||
setResultBufPageDirty(pInfo->aggSup.pResultBuf, &pResultRowInfo->cur);
|
||||
}
|
||||
updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &nextWin, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
|
||||
pSDataBlock->info.rows, numOfOutput, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||
pSDataBlock->info.rows, numOfOutput);
|
||||
int32_t prevEndPos = (forwardRows - 1) * step + startPos;
|
||||
ASSERT(pSDataBlock->info.window.skey > 0 && pSDataBlock->info.window.ekey > 0);
|
||||
startPos = getNextQualifiedWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos, pInfo->order);
|
||||
|
@ -3776,8 +3775,7 @@ static int32_t doOneWindowAggImpl(int32_t tsColId, SOptrBasicInfo* pBinfo, SStre
|
|||
return TSDB_CODE_QRY_OUT_OF_MEMORY;
|
||||
}
|
||||
updateTimeWindowInfo(pTimeWindowData, &pCurWin->win, false);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &pCurWin->win, pTimeWindowData, startIndex, winRows, tsCols,
|
||||
pSDataBlock->info.rows, numOutput, TSDB_ORDER_ASC);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, pSDataBlock->info.rows, numOutput);
|
||||
SFilePage* bufPage = getBufPage(pAggSup->pResultBuf, pCurWin->pos.pageId);
|
||||
setBufPageDirty(bufPage, true);
|
||||
releaseBufPage(pAggSup->pResultBuf, bufPage);
|
||||
|
@ -4938,8 +4936,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
|
|||
}
|
||||
|
||||
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
|
||||
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
|
||||
pBlock->info.rows, numOfOutput);
|
||||
|
||||
outputMergeAlignedIntervalResult(pOperatorInfo, tableGroupId, pResultBlock, miaInfo->curTs);
|
||||
miaInfo->curTs = tsCols[currPos];
|
||||
|
@ -4960,8 +4958,8 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR
|
|||
}
|
||||
|
||||
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &currWin, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
|
||||
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
|
||||
doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos,
|
||||
pBlock->info.rows, numOfOutput);
|
||||
}
|
||||
|
||||
static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
|
||||
|
@ -5253,8 +5251,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
|
|||
}
|
||||
|
||||
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true);
|
||||
doApplyFunctions(pTaskInfo, pExprSup->pCtx, &win, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, tsCols,
|
||||
pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
|
||||
doApplyFunctions(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||
pBlock->info.rows, numOfOutput);
|
||||
doCloseWindow(pResultRowInfo, iaInfo, pResult);
|
||||
|
||||
// output previous interval results after this interval (&win) is closed
|
||||
|
@ -5285,8 +5283,8 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
|
|||
doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup);
|
||||
|
||||
updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true);
|
||||
doApplyFunctions(pTaskInfo, pExprSup->pCtx, &nextWin, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||
tsCols, pBlock->info.rows, numOfOutput, iaInfo->inputOrder);
|
||||
doApplyFunctions(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows,
|
||||
pBlock->info.rows, numOfOutput);
|
||||
doCloseWindow(pResultRowInfo, iaInfo, pResult);
|
||||
|
||||
// output previous interval results after this interval (&nextWin) is closed
|
||||
|
|
Loading…
Reference in New Issue