Merge pull request #12097 from taosdata/feature/3.0_liaohj
fix(query): set the correct primary timestamp column for state window operator.
This commit is contained in:
commit
d20d78729e
|
@ -155,7 +155,6 @@ static int32_t checkForCachedLast(STsdbReadHandle* pTsdbReadHandle);
|
||||||
|
|
||||||
static void changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
|
static void changeQueryHandleForInterpQuery(tsdbReaderT pHandle);
|
||||||
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
|
static void doMergeTwoLevelData(STsdbReadHandle* pTsdbReadHandle, STableCheckInfo* pCheckInfo, SBlock* pBlock);
|
||||||
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
|
||||||
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
|
static int32_t tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
|
||||||
STsdbReadHandle* pTsdbReadHandle);
|
STsdbReadHandle* pTsdbReadHandle);
|
||||||
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
|
static int32_t tsdbCheckInfoCompar(const void* key1, const void* key2);
|
||||||
|
@ -1337,6 +1336,8 @@ static int32_t handleDataMergeIfNeeded(STsdbReadHandle* pTsdbReadHandle, SBlock*
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||||
|
|
||||||
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
|
static int32_t loadFileDataBlock(STsdbReadHandle* pTsdbReadHandle, SBlock* pBlock, STableCheckInfo* pCheckInfo,
|
||||||
bool* exists) {
|
bool* exists) {
|
||||||
SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
SQueryFilePos* cur = &pTsdbReadHandle->cur;
|
||||||
|
|
|
@ -73,7 +73,7 @@ typedef struct SResKeyPos {
|
||||||
} SResKeyPos;
|
} SResKeyPos;
|
||||||
|
|
||||||
typedef struct SResultRowInfo {
|
typedef struct SResultRowInfo {
|
||||||
SResultRowPosition *pPosition;
|
SResultRowPosition *pPosition; // todo remove this
|
||||||
int32_t size; // number of result set
|
int32_t size; // number of result set
|
||||||
int32_t capacity; // max capacity
|
int32_t capacity; // max capacity
|
||||||
SResultRowPosition cur;
|
SResultRowPosition cur;
|
||||||
|
|
|
@ -197,7 +197,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggS
|
||||||
struct SOptrBasicInfo* pInfo, char* result, int32_t length);
|
struct SOptrBasicInfo* pInfo, char* result, int32_t length);
|
||||||
|
|
||||||
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
|
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr);
|
||||||
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup);
|
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr);
|
||||||
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
|
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
|
||||||
typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain);
|
typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain);
|
||||||
|
|
||||||
|
@ -425,7 +425,7 @@ typedef struct STimeWindowSupp {
|
||||||
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
SColumnInfoData timeWindowData; // query time window info for scalar function execution.
|
||||||
} STimeWindowAggSupp;
|
} STimeWindowAggSupp;
|
||||||
|
|
||||||
typedef struct STableIntervalOperatorInfo {
|
typedef struct SIntervalAggOperatorInfo {
|
||||||
SOptrBasicInfo binfo; // basic info
|
SOptrBasicInfo binfo; // basic info
|
||||||
SGroupResInfo groupResInfo; // multiple results build supporter
|
SGroupResInfo groupResInfo; // multiple results build supporter
|
||||||
SInterval interval; // interval info
|
SInterval interval; // interval info
|
||||||
|
@ -440,7 +440,7 @@ typedef struct STableIntervalOperatorInfo {
|
||||||
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
|
SArray* pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
|
||||||
STimeWindowAggSupp twAggSup;
|
STimeWindowAggSupp twAggSup;
|
||||||
struct SFillInfo* pFillInfo; // fill info
|
struct SFillInfo* pFillInfo; // fill info
|
||||||
} STableIntervalOperatorInfo;
|
} SIntervalAggOperatorInfo;
|
||||||
|
|
||||||
typedef struct SAggOperatorInfo {
|
typedef struct SAggOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo;
|
||||||
|
@ -533,6 +533,7 @@ typedef struct SSessionAggOperatorInfo {
|
||||||
SWindowRowsSup winSup;
|
SWindowRowsSup winSup;
|
||||||
bool reptScan; // next round scan
|
bool reptScan; // next round scan
|
||||||
int64_t gap; // session window gap
|
int64_t gap; // session window gap
|
||||||
|
int32_t tsSlotId; // primary timestamp slot id
|
||||||
STimeWindowAggSupp twAggSup;
|
STimeWindowAggSupp twAggSup;
|
||||||
} SSessionAggOperatorInfo;
|
} SSessionAggOperatorInfo;
|
||||||
|
|
||||||
|
@ -550,6 +551,7 @@ typedef struct SStateWindowOperatorInfo {
|
||||||
int32_t colIndex; // start row index
|
int32_t colIndex; // start row index
|
||||||
bool hasKey;
|
bool hasKey;
|
||||||
SStateKeys stateKey;
|
SStateKeys stateKey;
|
||||||
|
int32_t tsSlotId; // primary timestamp column slot id
|
||||||
STimeWindowAggSupp twAggSup;
|
STimeWindowAggSupp twAggSup;
|
||||||
// bool reptScan;
|
// bool reptScan;
|
||||||
} SStateWindowOperatorInfo;
|
} SStateWindowOperatorInfo;
|
||||||
|
@ -606,6 +608,9 @@ typedef struct SJoinOperatorInfo {
|
||||||
SNode *pOnCondition;
|
SNode *pOnCondition;
|
||||||
} SJoinOperatorInfo;
|
} SJoinOperatorInfo;
|
||||||
|
|
||||||
|
#define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED)
|
||||||
|
#define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED)
|
||||||
|
|
||||||
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
|
SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t streamFn,
|
||||||
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
|
__optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_encode_fn_t encode,
|
||||||
__optr_decode_fn_t decode, __optr_get_explain_fn_t explain);
|
__optr_decode_fn_t decode, __optr_get_explain_fn_t explain);
|
||||||
|
@ -616,8 +621,8 @@ int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t
|
||||||
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
int32_t initAggInfo(SOptrBasicInfo* pBasicInfo, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
|
SSDataBlock* pResultBlock, size_t keyBufSize, const char* pkey);
|
||||||
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
|
void initResultSizeInfo(SOperatorInfo* pOperator, int32_t numOfRows);
|
||||||
void doBuildResultDatablock(SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo,
|
void doBuildResultDatablock(SOptrBasicInfo *pbInfo, SGroupResInfo* pGroupResInfo, SExprInfo* pExprInfo, SDiskbasedBuf* pBuf);
|
||||||
SDiskbasedBuf* pBuf, int32_t* rowCellOffset, SqlFunctionCtx* pCtx);
|
|
||||||
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
|
void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf,
|
||||||
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
|
SResultRowInfo* pResultRowInfo, int32_t* rowCellInfoOffset);
|
||||||
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
|
void doApplyFunctions(SqlFunctionCtx* pCtx, STimeWindow* pWin, SColumnInfoData* pTimeWindowData, int32_t offset,
|
||||||
|
@ -635,6 +640,16 @@ void doSetOperatorCompleted(SOperatorInfo* pOperator);
|
||||||
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
void doFilter(const SNode* pFilterNode, SSDataBlock* pBlock);
|
||||||
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
|
SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, int32_t** rowCellInfoOffset);
|
||||||
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
|
void relocateColumnData(SSDataBlock* pBlock, const SArray* pColMatchInfo, SArray* pCols);
|
||||||
|
void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWindow);
|
||||||
|
void cleanupAggSup(SAggSupporter* pAggSup);
|
||||||
|
void destroyBasicOperatorInfo(void* param, int32_t numOfOutput);
|
||||||
|
|
||||||
|
void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput,
|
||||||
|
int32_t* rowCellInfoOffset);
|
||||||
|
|
||||||
|
SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo,
|
||||||
|
char* pData, int16_t bytes, bool masterscan, uint64_t groupId,
|
||||||
|
SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup);
|
||||||
|
|
||||||
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
|
@ -656,10 +671,12 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
|
|
||||||
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
SSDataBlock* pResBlock, SInterval* pInterval, int32_t primaryTsSlotId,
|
||||||
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
STimeWindowAggSupp *pTwAggSupp, const STableGroupInfo* pTableGroupInfo, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, int64_t gap, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp,
|
||||||
|
SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
|
SSDataBlock* pResultBlock, SArray* pGroupColList, SNode* pCondition,
|
||||||
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
|
SExprInfo* pScalarExprInfo, int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo,
|
||||||
|
@ -672,11 +689,12 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExp
|
||||||
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
|
SInterval* pInterval, STimeWindow* pWindow, SSDataBlock* pResBlock, int32_t fillType, SNodeListNode* fillVal,
|
||||||
bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
bool multigroupResult, SExecTaskInfo* pTaskInfo);
|
||||||
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols,
|
||||||
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pResBlock, STimeWindowAggSupp *pTwAggSupp, int32_t tsSlotId, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
|
SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo,
|
||||||
const STableGroupInfo* pTableGroupInfo);
|
const STableGroupInfo* pTableGroupInfo);
|
||||||
|
|
||||||
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols,
|
||||||
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
|
SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo);
|
||||||
|
|
||||||
|
@ -697,7 +715,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo
|
||||||
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput);
|
||||||
|
|
||||||
STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win);
|
STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win);
|
||||||
|
|
||||||
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
|
bool isTaskKilled(SExecTaskInfo* pTaskInfo);
|
||||||
int32_t checkForQueryBuf(size_t numOfTables);
|
int32_t checkForQueryBuf(size_t numOfTables);
|
||||||
|
|
|
@ -154,14 +154,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
||||||
|
|
||||||
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
|
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
|
||||||
|
|
||||||
bool newgroup = false;
|
|
||||||
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
int64_t st = 0;
|
|
||||||
|
|
||||||
st = taosGetTimestampUs();
|
|
||||||
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &newgroup);
|
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampUs();
|
||||||
|
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
|
||||||
uint64_t el = (taosGetTimestampUs() - st);
|
uint64_t el = (taosGetTimestampUs() - st);
|
||||||
|
|
||||||
pTaskInfo->cost.elapsedTime += el;
|
pTaskInfo->cost.elapsedTime += el;
|
||||||
|
|
||||||
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -256,7 +256,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgroup) {
|
static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -265,7 +265,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
SSDataBlock* pRes = pInfo->binfo.pRes;
|
SSDataBlock* pRes = pInfo->binfo.pRes;
|
||||||
|
|
||||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
|
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||||
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
if (pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
pOperator->status = OP_EXEC_DONE;
|
pOperator->status = OP_EXEC_DONE;
|
||||||
}
|
}
|
||||||
|
@ -277,7 +277,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
|
@ -311,7 +311,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou
|
||||||
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false);
|
initGroupedResultInfo(&pInfo->groupResInfo, pInfo->aggSup.pResultRowHashTable, false);
|
||||||
|
|
||||||
while(1) {
|
while(1) {
|
||||||
doBuildResultDatablock(pRes, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf, pInfo->binfo.rowCellInfoOffset, pInfo->binfo.pCtx);
|
doBuildResultDatablock(&pInfo->binfo, &pInfo->groupResInfo, pOperator->pExpr, pInfo->aggSup.pResultBuf);
|
||||||
doFilter(pInfo->pCondition, pRes);
|
doFilter(pInfo->pCondition, pRes);
|
||||||
|
|
||||||
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
bool hasRemain = hasRemainDataInCurrentGroup(&pInfo->groupResInfo);
|
||||||
|
@ -537,11 +537,12 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) {
|
||||||
|
|
||||||
pInfo->pageIndex += 1;
|
pInfo->pageIndex += 1;
|
||||||
|
|
||||||
|
blockDataUpdateTsWindow(pInfo->binfo.pRes);
|
||||||
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
|
pInfo->binfo.pRes->info.groupId = pGroupInfo->groupId;
|
||||||
return pInfo->binfo.pRes;
|
return pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
|
static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -558,7 +559,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup);
|
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||||
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -257,11 +257,9 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction
|
||||||
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
|
pTableScanInfo->cond.order = TSDB_ORDER_DESC;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
|
static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
|
|
||||||
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
SSDataBlock* pBlock = pTableScanInfo->pResBlock;
|
||||||
*newgroup = false;
|
|
||||||
|
|
||||||
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
|
while (tsdbNextDataBlock(pTableScanInfo->dataReader)) {
|
||||||
if (isTaskKilled(pOperator->pTaskInfo)) {
|
if (isTaskKilled(pOperator->pTaskInfo)) {
|
||||||
|
@ -289,7 +287,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
||||||
|
@ -298,10 +296,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
*newgroup = false;
|
|
||||||
|
|
||||||
while (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
|
while (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) {
|
||||||
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
|
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
@ -334,7 +330,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey);
|
||||||
|
|
||||||
while (pTableScanInfo->current < total) {
|
while (pTableScanInfo->current < total) {
|
||||||
SSDataBlock* p = doTableScanImpl(pOperator, newgroup);
|
SSDataBlock* p = doTableScanImpl(pOperator);
|
||||||
if (p != NULL) {
|
if (p != NULL) {
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
@ -421,13 +417,12 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) {
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) {
|
static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableScanInfo* pTableScanInfo = pOperator->info;
|
STableScanInfo* pTableScanInfo = pOperator->info;
|
||||||
*newgroup = false;
|
|
||||||
|
|
||||||
STableBlockDistInfo tableBlockDist = {0};
|
STableBlockDistInfo tableBlockDist = {0};
|
||||||
tableBlockDist.numOfTables = 1; // TODO set the correct number of tables
|
tableBlockDist.numOfTables = 1; // TODO set the correct number of tables
|
||||||
|
@ -514,7 +509,7 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) {
|
||||||
taosArrayClear(pInfo->pBlockLists);
|
taosArrayClear(pInfo->pBlockLists);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) {
|
static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
||||||
// NOTE: this operator does never check if current status is done or not
|
// NOTE: this operator does never check if current status is done or not
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||||
|
@ -859,7 +854,7 @@ static SSDataBlock* buildSysTableMetaBlock() {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) {
|
static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) {
|
||||||
// build message and send to mnode to fetch the content of system tables.
|
// build message and send to mnode to fetch the content of system tables.
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
SSysTableScanInfo* pInfo = pOperator->info;
|
SSysTableScanInfo* pInfo = pOperator->info;
|
||||||
|
@ -1191,7 +1186,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe
|
||||||
return pOperator;
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) {
|
static SSDataBlock* doTagScan(SOperatorInfo* pOperator) {
|
||||||
#if 0
|
#if 0
|
||||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -55,7 +55,7 @@ typedef struct SDummyInputInfo {
|
||||||
SSDataBlock* pBlock;
|
SSDataBlock* pBlock;
|
||||||
} SDummyInputInfo;
|
} SDummyInputInfo;
|
||||||
|
|
||||||
SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
|
SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) {
|
||||||
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
|
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
|
||||||
if (pInfo->current >= pInfo->totalPages) {
|
if (pInfo->current >= pInfo->totalPages) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -121,7 +121,7 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator, bool* newgroup) {
|
SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) {
|
||||||
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
|
SDummyInputInfo* pInfo = static_cast<SDummyInputInfo*>(pOperator->info);
|
||||||
if (pInfo->current >= pInfo->totalPages) {
|
if (pInfo->current >= pInfo->totalPages) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
Loading…
Reference in New Issue