[td-13039] support stream execution interval query.
This commit is contained in:
parent
dd66affd0a
commit
dba4a72dac
|
@ -469,8 +469,7 @@ typedef struct {
|
||||||
int32_t tz; // query client timezone
|
int32_t tz; // query client timezone
|
||||||
char intervalUnit;
|
char intervalUnit;
|
||||||
char slidingUnit;
|
char slidingUnit;
|
||||||
char
|
char offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
|
||||||
offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration.
|
|
||||||
int8_t precision;
|
int8_t precision;
|
||||||
int64_t interval;
|
int64_t interval;
|
||||||
int64_t sliding;
|
int64_t sliding;
|
||||||
|
|
|
@ -161,20 +161,8 @@ typedef struct STaskCostInfo {
|
||||||
typedef struct SOperatorCostInfo {
|
typedef struct SOperatorCostInfo {
|
||||||
uint64_t openCost;
|
uint64_t openCost;
|
||||||
uint64_t execCost;
|
uint64_t execCost;
|
||||||
// uint64_t totalRows;
|
|
||||||
// uint64_t totalBytes;
|
|
||||||
} SOperatorCostInfo;
|
} SOperatorCostInfo;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t vgroupLimit;
|
|
||||||
int64_t ts;
|
|
||||||
} SOrderedPrjQueryInfo;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
char* tags;
|
|
||||||
SArray* pResult; // SArray<SStddevInterResult>
|
|
||||||
} SInterResult;
|
|
||||||
|
|
||||||
// The basic query information extracted from the SQueryInfo tree to support the
|
// The basic query information extracted from the SQueryInfo tree to support the
|
||||||
// execution of query in a data node.
|
// execution of query in a data node.
|
||||||
typedef struct STaskAttr {
|
typedef struct STaskAttr {
|
||||||
|
@ -230,7 +218,6 @@ typedef struct STaskAttr {
|
||||||
SColumnInfo* tagColList;
|
SColumnInfo* tagColList;
|
||||||
int32_t numOfFilterCols;
|
int32_t numOfFilterCols;
|
||||||
int64_t* fillVal;
|
int64_t* fillVal;
|
||||||
SOrderedPrjQueryInfo prjInfo; // limit value for each vgroup, only available in global order projection query.
|
|
||||||
|
|
||||||
SSingleColumnFilterInfo* pFilterInfo;
|
SSingleColumnFilterInfo* pFilterInfo;
|
||||||
// SFilterInfo *pFilters;
|
// SFilterInfo *pFilters;
|
||||||
|
@ -245,6 +232,7 @@ struct SOperatorInfo;
|
||||||
|
|
||||||
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length);
|
typedef void (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char **result, int32_t *length);
|
||||||
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length);
|
typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, char *result, int32_t length);
|
||||||
|
|
||||||
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param);
|
typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* param);
|
||||||
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup);
|
typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* param, bool* newgroup);
|
||||||
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
|
typedef void (*__optr_close_fn_t)(void* param, int32_t num);
|
||||||
|
@ -330,11 +318,12 @@ typedef struct SOperatorInfo {
|
||||||
SResultInfo resultInfo;
|
SResultInfo resultInfo;
|
||||||
struct SOperatorInfo** pDownstream; // downstram pointer list
|
struct SOperatorInfo** pDownstream; // downstram pointer list
|
||||||
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
|
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
|
||||||
__optr_fn_t getNextFn;
|
|
||||||
__optr_fn_t cleanupFn;
|
|
||||||
__optr_close_fn_t closeFn;
|
|
||||||
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
|
__optr_open_fn_t _openFn; // DO NOT invoke this function directly
|
||||||
__optr_encode_fn_t encodeResultRow; //
|
__optr_fn_t getNextFn;
|
||||||
|
__optr_fn_t getStreamResFn; // execute the aggregate in the stream model.
|
||||||
|
__optr_fn_t cleanupFn; // call this function to release the allocated resources ASAP
|
||||||
|
__optr_close_fn_t closeFn;
|
||||||
|
__optr_encode_fn_t encodeResultRow;
|
||||||
__optr_decode_fn_t decodeResultRow;
|
__optr_decode_fn_t decodeResultRow;
|
||||||
} SOperatorInfo;
|
} SOperatorInfo;
|
||||||
|
|
||||||
|
@ -363,18 +352,18 @@ typedef struct SQInfo {
|
||||||
STaskCostInfo summary;
|
STaskCostInfo summary;
|
||||||
} SQInfo;
|
} SQInfo;
|
||||||
|
|
||||||
enum {
|
typedef enum {
|
||||||
DATA_NOT_READY = 0x1,
|
EX_SOURCE_DATA_NOT_READY = 0x1,
|
||||||
DATA_READY = 0x2,
|
EX_SOURCE_DATA_READY = 0x2,
|
||||||
DATA_EXHAUSTED = 0x3,
|
EX_SOURCE_DATA_EXHAUSTED = 0x3,
|
||||||
};
|
} EX_SOURCE_STATUS;
|
||||||
|
|
||||||
typedef struct SSourceDataInfo {
|
typedef struct SSourceDataInfo {
|
||||||
struct SExchangeInfo *pEx;
|
struct SExchangeInfo *pEx;
|
||||||
int32_t index;
|
int32_t index;
|
||||||
SRetrieveTableRsp *pRsp;
|
SRetrieveTableRsp *pRsp;
|
||||||
uint64_t totalRows;
|
uint64_t totalRows;
|
||||||
int32_t status;
|
EX_SOURCE_STATUS status;
|
||||||
} SSourceDataInfo;
|
} SSourceDataInfo;
|
||||||
|
|
||||||
typedef struct SLoadRemoteDataInfo {
|
typedef struct SLoadRemoteDataInfo {
|
||||||
|
@ -383,12 +372,6 @@ typedef struct SLoadRemoteDataInfo {
|
||||||
uint64_t totalElapsed; // total elapsed time
|
uint64_t totalElapsed; // total elapsed time
|
||||||
} SLoadRemoteDataInfo;
|
} SLoadRemoteDataInfo;
|
||||||
|
|
||||||
enum {
|
|
||||||
EX_SOURCE_DATA_NOT_READY = 0x1,
|
|
||||||
EX_SOURCE_DATA_READY = 0x2,
|
|
||||||
EX_SOURCE_DATA_EXHAUSTED = 0x3,
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef struct SExchangeInfo {
|
typedef struct SExchangeInfo {
|
||||||
SArray* pSources;
|
SArray* pSources;
|
||||||
SArray* pSourceDataInfo;
|
SArray* pSourceDataInfo;
|
||||||
|
@ -483,17 +466,23 @@ typedef struct SAggSupporter {
|
||||||
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
int32_t resultRowSize; // the result buffer size for each result row, with the meta data size for each row
|
||||||
} SAggSupporter;
|
} SAggSupporter;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
OPTR_EXEC_MODEL_BATCH = 0x1,
|
||||||
|
OPTR_EXEC_MODEL_STREAM = 0x2,
|
||||||
|
} OPTR_EXEC_MODEL;
|
||||||
|
|
||||||
typedef struct STableIntervalOperatorInfo {
|
typedef struct STableIntervalOperatorInfo {
|
||||||
SOptrBasicInfo binfo;
|
SOptrBasicInfo binfo; // basic info
|
||||||
SGroupResInfo groupResInfo;
|
SGroupResInfo groupResInfo; // multiple results build supporter
|
||||||
SInterval interval;
|
SInterval interval; // interval info
|
||||||
STimeWindow win;
|
STimeWindow win; // query time range
|
||||||
int32_t precision;
|
bool timeWindowInterpo; // interpolation needed or not
|
||||||
bool timeWindowInterpo;
|
char **pRow; // previous row/tuple of already processed datablock
|
||||||
char **pRow;
|
SAggSupporter aggSup; // aggregate supporter
|
||||||
SAggSupporter aggSup;
|
STableQueryInfo *pCurrent; // current tableQueryInfo struct
|
||||||
STableQueryInfo *pCurrent;
|
int32_t order; // current SSDataBlock scan order
|
||||||
int32_t order;
|
OPTR_EXEC_MODEL execModel; // operator execution model [batch model|stream model]
|
||||||
|
SArray *pUpdatedWindow; // updated time window due to the input data block from the downstream operator.
|
||||||
} STableIntervalOperatorInfo;
|
} STableIntervalOperatorInfo;
|
||||||
|
|
||||||
typedef struct SAggOperatorInfo {
|
typedef struct SAggOperatorInfo {
|
||||||
|
@ -695,12 +684,6 @@ SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorI
|
||||||
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
|
SOperatorInfo* createJoinOperatorInfo(SOperatorInfo** pdownstream, int32_t numOfDownstream, SSchema* pSchema,
|
||||||
int32_t numOfOutput);
|
int32_t numOfOutput);
|
||||||
|
|
||||||
void doSetFilterColumnInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, SSDataBlock* pBlock);
|
|
||||||
bool doFilterDataBlock(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols, int32_t numOfRows, int8_t* p);
|
|
||||||
void doCompactSDataBlock(SSDataBlock* pBlock, int32_t numOfRows, int8_t* p);
|
|
||||||
|
|
||||||
SSDataBlock* createOutputBuf(SExprInfo* pExpr, int32_t numOfOutput, int32_t numOfRows);
|
|
||||||
|
|
||||||
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
|
void* doDestroyFilterInfo(SSingleColumnFilterInfo* pFilterInfo, int32_t numOfFilterCols);
|
||||||
|
|
||||||
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlock* pBlock, int32_t order);
|
||||||
|
|
|
@ -1054,7 +1054,7 @@ static int32_t getNextQualifiedWindow(SInterval* pInterval, STimeWindow* pNext,
|
||||||
int32_t order = pInfo->order;
|
int32_t order = pInfo->order;
|
||||||
bool ascQuery = (order == TSDB_ORDER_ASC);
|
bool ascQuery = (order == TSDB_ORDER_ASC);
|
||||||
|
|
||||||
int32_t precision = pInfo->precision;
|
int32_t precision = pInterval->precision;
|
||||||
getNextTimeWindow(pInterval, precision, order, pNext);
|
getNextTimeWindow(pInterval, precision, order, pNext);
|
||||||
|
|
||||||
// next time window is not in current block
|
// next time window is not in current block
|
||||||
|
@ -1489,15 +1489,20 @@ static void doWindowBorderInterpolation(SOperatorInfo* pOperatorInfo, SSDataBloc
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
|
static SArray* hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock,
|
||||||
int32_t tableGroupId) {
|
int32_t tableGroupId) {
|
||||||
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info;
|
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*)pOperatorInfo->info;
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperatorInfo->pTaskInfo;
|
||||||
int32_t numOfOutput = pOperatorInfo->numOfOutput;
|
int32_t numOfOutput = pOperatorInfo->numOfOutput;
|
||||||
|
|
||||||
|
SArray* pUpdated = NULL;
|
||||||
|
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||||
|
pUpdated = taosArrayInit(4, sizeof(SResultRowPosition));
|
||||||
|
}
|
||||||
|
|
||||||
int32_t step = 1;
|
int32_t step = 1;
|
||||||
bool ascQuery = true;
|
bool ascScan = true;
|
||||||
|
|
||||||
int32_t prevIndex = pResultRowInfo->curPos;
|
int32_t prevIndex = pResultRowInfo->curPos;
|
||||||
|
|
||||||
|
@ -1509,10 +1514,10 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
|
tsCols[pSDataBlock->info.rows - 1] == pSDataBlock->info.window.ekey);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t startPos = ascQuery ? 0 : (pSDataBlock->info.rows - 1);
|
int32_t startPos = ascScan? 0 : (pSDataBlock->info.rows - 1);
|
||||||
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascQuery);
|
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols, pSDataBlock->info.rows, ascScan);
|
||||||
|
|
||||||
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, &pInfo->interval, pInfo->precision, &pInfo->win);
|
STimeWindow win = getActiveTimeWindow(pResultRowInfo, ts, &pInfo->interval, pInfo->interval.precision, &pInfo->win);
|
||||||
bool masterScan = true;
|
bool masterScan = true;
|
||||||
|
|
||||||
SResultRow* pResult = NULL;
|
SResultRow* pResult = NULL;
|
||||||
|
@ -1523,6 +1528,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||||
|
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
|
||||||
|
taosArrayPush(pUpdated, &pos);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t forwardStep = 0;
|
int32_t forwardStep = 0;
|
||||||
TSKEY ekey = win.ekey;
|
TSKEY ekey = win.ekey;
|
||||||
forwardStep =
|
forwardStep =
|
||||||
|
@ -1534,8 +1544,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
|
for (int32_t j = prevIndex; j < curIndex; ++j) { // previous time window may be all closed already.
|
||||||
SResultRow* pRes = getResultRow(pResultRowInfo, j);
|
SResultRow* pRes = getResultRow(pResultRowInfo, j);
|
||||||
if (pRes->closed) {
|
if (pRes->closed) {
|
||||||
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) &&
|
assert(resultRowInterpolated(pRes, RESULT_ROW_START_INTERP) && resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
|
||||||
resultRowInterpolated(pRes, RESULT_ROW_END_INTERP));
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1548,7 +1557,6 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
|
assert(!resultRowInterpolated(pResult, RESULT_ROW_END_INTERP));
|
||||||
|
|
||||||
doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pSDataBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1,
|
doTimeWindowInterpolation(pOperatorInfo, &pInfo->binfo, pSDataBlock->pDataBlock, *(TSKEY*)pInfo->pRow[0], -1,
|
||||||
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
|
tsCols[startPos], startPos, w.ekey, RESULT_ROW_END_INTERP);
|
||||||
|
|
||||||
|
@ -1589,6 +1597,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
longjmp(pTaskInfo->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) {
|
||||||
|
SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset};
|
||||||
|
taosArrayPush(pUpdated, &pos);
|
||||||
|
}
|
||||||
|
|
||||||
ekey = nextWin.ekey; // reviseWindowEkey(pQueryAttr, &nextWin);
|
ekey = nextWin.ekey; // reviseWindowEkey(pQueryAttr, &nextWin);
|
||||||
forwardStep =
|
forwardStep =
|
||||||
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
getNumOfRowsInTimeWindow(&pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC);
|
||||||
|
@ -1601,10 +1614,11 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pInfo->timeWindowInterpo) {
|
if (pInfo->timeWindowInterpo) {
|
||||||
int32_t rowIndex = ascQuery ? (pSDataBlock->info.rows - 1) : 0;
|
int32_t rowIndex = ascScan ? (pSDataBlock->info.rows - 1) : 0;
|
||||||
saveDataBlockLastRow(pInfo->pRow, pSDataBlock->pDataBlock, rowIndex, pSDataBlock->info.numOfCols);
|
saveDataBlockLastRow(pInfo->pRow, pSDataBlock->pDataBlock, rowIndex, pSDataBlock->info.numOfCols);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return pUpdated;
|
||||||
// updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false);
|
// updateResultRowInfoActiveIndex(pResultRowInfo, &pInfo->win, pRuntimeEnv->current->lastKey, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3590,6 +3604,42 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void finalizeUpdatedResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SDiskbasedBuf* pBuf, SArray* pUpdateList,
|
||||||
|
int32_t* rowCellInfoOffset) {
|
||||||
|
size_t num = taosArrayGetSize(pUpdateList);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
|
SResultRowPosition* pPos = taosArrayGet(pUpdateList, i);
|
||||||
|
|
||||||
|
SFilePage* bufPage = getBufPage(pBuf, pPos->pageId);
|
||||||
|
SResultRow* pRow = (SResultRow*)((char*)bufPage + pPos->offset);
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < numOfOutput; ++j) {
|
||||||
|
pCtx[j].resultInfo = getResultCell(pRow, j, rowCellInfoOffset);
|
||||||
|
|
||||||
|
struct SResultRowEntryInfo* pResInfo = pCtx[j].resultInfo;
|
||||||
|
if (isRowEntryCompleted(pResInfo) && isRowEntryInitialized(pResInfo)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCtx[j].fpSet.process) { // TODO set the dummy function.
|
||||||
|
pCtx[j].fpSet.finalize(&pCtx[j]);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pRow->numOfRows < pResInfo->numOfRes) {
|
||||||
|
pRow->numOfRows = pResInfo->numOfRes;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
releaseBufPage(pBuf, bufPage);
|
||||||
|
/*
|
||||||
|
* set the number of output results for group by normal columns, the number of output rows usually is 1 except
|
||||||
|
* the top and bottom query
|
||||||
|
*/
|
||||||
|
// buf->numOfRows = (uint16_t)getNumOfResult(pCtx, numOfOutput);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static bool hasMainOutput(STaskAttr* pQueryAttr) {
|
static bool hasMainOutput(STaskAttr* pQueryAttr) {
|
||||||
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
|
for (int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
|
||||||
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
|
int32_t functionId = getExprFunctionId(&pQueryAttr->pExpr1[i]);
|
||||||
|
@ -5074,12 +5124,12 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
|
||||||
for (int32_t i = 0; i < totalSources; ++i) {
|
for (int32_t i = 0; i < totalSources; ++i) {
|
||||||
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
|
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, i);
|
||||||
|
|
||||||
if (pDataInfo->status == DATA_EXHAUSTED) {
|
if (pDataInfo->status == EX_SOURCE_DATA_EXHAUSTED) {
|
||||||
completed += 1;
|
completed += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDataInfo->status != DATA_READY) {
|
if (pDataInfo->status != EX_SOURCE_DATA_READY) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5093,7 +5143,7 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
|
||||||
" try next",
|
" try next",
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, i + 1, pDataInfo->totalRows,
|
||||||
pExchangeInfo->loadInfo.totalRows);
|
pExchangeInfo->loadInfo.totalRows);
|
||||||
pDataInfo->status = DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
completed += 1;
|
completed += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -5111,16 +5161,15 @@ static SSDataBlock* concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SEx
|
||||||
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows,
|
||||||
pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1, totalSources);
|
pLoadInfo->totalRows, pLoadInfo->totalSize, i + 1, totalSources);
|
||||||
pDataInfo->status = DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64 ", totalBytes:%" PRIu64,
|
||||||
", totalBytes:%" PRIu64,
|
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pLoadInfo->totalRows,
|
||||||
pLoadInfo->totalSize);
|
pLoadInfo->totalSize);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pDataInfo->status != DATA_EXHAUSTED) {
|
if (pDataInfo->status != EX_SOURCE_DATA_EXHAUSTED) {
|
||||||
pDataInfo->status = DATA_NOT_READY;
|
pDataInfo->status = EX_SOURCE_DATA_NOT_READY;
|
||||||
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
|
code = doSendFetchDataRequest(pExchangeInfo, pTaskInfo, i);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _error;
|
goto _error;
|
||||||
|
@ -5223,7 +5272,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pExchangeInfo->current + 1,
|
||||||
pDataInfo->totalRows, pLoadInfo->totalRows);
|
pDataInfo->totalRows, pLoadInfo->totalRows);
|
||||||
|
|
||||||
pDataInfo->status = DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
pExchangeInfo->current += 1;
|
pExchangeInfo->current += 1;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -5240,7 +5289,7 @@ static SSDataBlock* seqLoadRemoteData(SOperatorInfo* pOperator) {
|
||||||
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows,
|
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pRes->info.rows, pDataInfo->totalRows,
|
||||||
pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources);
|
pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources);
|
||||||
|
|
||||||
pDataInfo->status = DATA_EXHAUSTED;
|
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
|
||||||
pExchangeInfo->current += 1;
|
pExchangeInfo->current += 1;
|
||||||
} else {
|
} else {
|
||||||
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64
|
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " numOfRows:%d, totalRows:%" PRIu64
|
||||||
|
@ -6798,37 +6847,6 @@ static SSDataBlock* doLimit(SOperatorInfo* pOperator, bool* newgroup) {
|
||||||
return pBlock;
|
return pBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSDataBlock* doFilter(void* param, bool* newgroup) {
|
|
||||||
SOperatorInfo* pOperator = (SOperatorInfo*)param;
|
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
SFilterOperatorInfo* pCondInfo = pOperator->info;
|
|
||||||
STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
|
||||||
SSDataBlock* pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup);
|
|
||||||
publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC);
|
|
||||||
|
|
||||||
if (pBlock == NULL) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
doSetFilterColumnInfo(pCondInfo->pFilterInfo, pCondInfo->numOfFilterCols, pBlock);
|
|
||||||
assert(pRuntimeEnv->pTsBuf == NULL);
|
|
||||||
filterRowsInDataBlock(pRuntimeEnv, pCondInfo->pFilterInfo, pCondInfo->numOfFilterCols, pBlock, true);
|
|
||||||
|
|
||||||
if (pBlock->info.rows > 0) {
|
|
||||||
return pBlock;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
doSetOperatorCompleted(pOperator);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
if (OPTR_IS_OPENED(pOperator)) {
|
if (OPTR_IS_OPENED(pOperator)) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -6837,7 +6855,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
STableIntervalOperatorInfo* pInfo = pOperator->info;
|
STableIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
int32_t order = TSDB_ORDER_ASC;
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
// STimeWindow win = pQueryAttr->window;
|
// STimeWindow win = {0};
|
||||||
bool newgroup = false;
|
bool newgroup = false;
|
||||||
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
|
@ -6851,7 +6869,6 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
||||||
|
|
||||||
// the pDataBlock are always the same one, no need to call this again
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
|
||||||
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
||||||
|
@ -6890,6 +6907,59 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro
|
||||||
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SSDataBlock* doStreamIntervalAgg(SOperatorInfo *pOperator) {
|
||||||
|
STableIntervalOperatorInfo* pInfo = pOperator->info;
|
||||||
|
int32_t order = TSDB_ORDER_ASC;
|
||||||
|
|
||||||
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||||
|
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
|
||||||
|
pInfo->binfo.rowCellInfoOffset);
|
||||||
|
if (pInfo->binfo.pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pInfo->groupResInfo)) {
|
||||||
|
pOperator->status = OP_EXEC_DONE;
|
||||||
|
}
|
||||||
|
return pInfo->binfo.pRes;
|
||||||
|
}
|
||||||
|
|
||||||
|
// STimeWindow win = {0};
|
||||||
|
bool newgroup = false;
|
||||||
|
SOperatorInfo* downstream = pOperator->pDownstream[0];
|
||||||
|
|
||||||
|
SArray* pUpdated = NULL;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
|
SSDataBlock* pBlock = downstream->getNextFn(downstream, &newgroup);
|
||||||
|
publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||||
|
|
||||||
|
if (pBlock == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
// The timewindows that overlaps the timestamps of the input pBlock need to be recalculated and return to the caller.
|
||||||
|
// Note that all the time window are not close till now.
|
||||||
|
|
||||||
|
// setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput);
|
||||||
|
// the pDataBlock are always the same one, no need to call this again
|
||||||
|
setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order);
|
||||||
|
pUpdated = hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
finalizeUpdatedResult(pInfo->binfo.pCtx, pOperator->numOfOutput, pInfo->aggSup.pResultBuf, pUpdated, pInfo->binfo.rowCellInfoOffset);
|
||||||
|
|
||||||
|
blockDataEnsureCapacity(pInfo->binfo.pRes, pInfo->binfo.capacity);
|
||||||
|
toSDatablock(&pInfo->groupResInfo, pInfo->aggSup.pResultBuf, pInfo->binfo.pRes, pInfo->binfo.capacity,
|
||||||
|
pInfo->binfo.rowCellInfoOffset);
|
||||||
|
|
||||||
|
ASSERT(pInfo->binfo.pRes->info.rows > 0);
|
||||||
|
pOperator->status = OP_RES_TO_RETURN;
|
||||||
|
|
||||||
|
return pInfo->binfo.pRes->info.rows == 0 ? NULL : pInfo->binfo.pRes;
|
||||||
|
}
|
||||||
|
|
||||||
static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
|
static SSDataBlock* doAllIntervalAgg(SOperatorInfo *pOperator, bool* newgroup) {
|
||||||
if (pOperator->status == OP_EXEC_DONE) {
|
if (pOperator->status == OP_EXEC_DONE) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -7773,9 +7843,11 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo*
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->order = TSDB_ORDER_ASC;
|
pInfo->order = TSDB_ORDER_ASC;
|
||||||
pInfo->precision = TSDB_TIME_PRECISION_MILLI;
|
|
||||||
pInfo->win = pTaskInfo->window;
|
pInfo->win = pTaskInfo->window;
|
||||||
pInfo->interval = *pInterval;
|
pInfo->interval = *pInterval;
|
||||||
|
|
||||||
|
pInfo->execModel = OPTR_EXEC_MODEL_BATCH;
|
||||||
|
|
||||||
pInfo->win.skey = INT64_MIN;
|
pInfo->win.skey = INT64_MIN;
|
||||||
pInfo->win.ekey = INT64_MAX;
|
pInfo->win.ekey = INT64_MAX;
|
||||||
|
|
||||||
|
@ -8748,7 +8820,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
.intervalUnit = pIntervalPhyNode->intervalUnit,
|
||||||
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
.slidingUnit = pIntervalPhyNode->slidingUnit,
|
||||||
.offset = pIntervalPhyNode->offset,
|
.offset = pIntervalPhyNode->offset,
|
||||||
.precision = TSDB_TIME_PRECISION_MILLI,};
|
.precision = TSDB_TIME_PRECISION_MILLI};
|
||||||
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo);
|
return createIntervalOperatorInfo(op, pExprInfo, num, pResBlock, &interval, pTableGroupInfo, pTaskInfo);
|
||||||
}
|
}
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SORT == nodeType(pPhyNode)) {
|
||||||
|
|
Loading…
Reference in New Issue