|
|
|
@ -631,37 +631,29 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// get the correct time window according to the handled timestamp
|
|
|
|
|
static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQuery) {
|
|
|
|
|
static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQueryAttr) {
|
|
|
|
|
STimeWindow w = {0};
|
|
|
|
|
#if 0
|
|
|
|
|
|
|
|
|
|
if (pResultRowInfo->curIndex == -1) { // the first window, from the previous stored value
|
|
|
|
|
if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
|
|
|
|
getInitialStartTimeWindow(pQuery, ts, &w);
|
|
|
|
|
pResultRowInfo->prevSKey = w.skey;
|
|
|
|
|
} else {
|
|
|
|
|
w.skey = pResultRowInfo->prevSKey;
|
|
|
|
|
}
|
|
|
|
|
if (pResultRowInfo->curPos == -1) { // the first window, from the previous stored value
|
|
|
|
|
getInitialStartTimeWindow(pQueryAttr, ts, &w);
|
|
|
|
|
|
|
|
|
|
if (pQuery->interval.intervalUnit == 'n' || pQuery->interval.intervalUnit == 'y') {
|
|
|
|
|
w.ekey = taosTimeAdd(w.skey, pQuery->interval.interval, pQuery->interval.intervalUnit, pQuery->precision) - 1;
|
|
|
|
|
if (pQueryAttr->interval.intervalUnit == 'n' || pQueryAttr->interval.intervalUnit == 'y') {
|
|
|
|
|
w.ekey = taosTimeAdd(w.skey, pQueryAttr->interval.interval, pQueryAttr->interval.intervalUnit, pQueryAttr->precision) - 1;
|
|
|
|
|
} else {
|
|
|
|
|
w.ekey = w.skey + pQuery->interval.interval - 1;
|
|
|
|
|
w.ekey = w.skey + pQueryAttr->interval.interval - 1;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
int32_t slot = curTimeWindowIndex(pResultRowInfo);
|
|
|
|
|
SResultRow* pWindowRes = getResultRow(pResultRowInfo, slot);
|
|
|
|
|
w = pWindowRes->win;
|
|
|
|
|
w = getResultRow(pResultRowInfo, pResultRowInfo->curPos)->win;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* query border check, skey should not be bounded by the query time range, since the value skey will
|
|
|
|
|
* be used as the time window index value. So we only change ekey of time window accordingly.
|
|
|
|
|
*/
|
|
|
|
|
if (w.ekey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) {
|
|
|
|
|
w.ekey = pQuery->window.ekey;
|
|
|
|
|
if (w.ekey > pQueryAttr->window.ekey && QUERY_IS_ASC_QUERY(pQueryAttr)) {
|
|
|
|
|
w.ekey = pQueryAttr->window.ekey;
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
return w;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1059,7 +1051,7 @@ static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow *pNext
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/* interp query with fill should not skip time window */
|
|
|
|
|
if (pQueryAttr->interpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
|
|
|
|
|
if (pQueryAttr->pointInterpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
|
|
|
|
|
return startPos;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1576,18 +1568,15 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t groupId) {
|
|
|
|
|
(void)getCurrentActiveTimeWindow;
|
|
|
|
|
|
|
|
|
|
#if 0
|
|
|
|
|
static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t tableGroupId) {
|
|
|
|
|
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
|
|
|
|
|
|
|
|
|
|
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
|
|
|
|
|
int32_t numOfOutput = pOperatorInfo->numOfOutput;
|
|
|
|
|
SQueryAttr* pQuery = pRuntimeEnv->pQueryAttr;
|
|
|
|
|
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
|
|
|
|
|
|
|
|
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
|
|
|
|
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
|
|
|
|
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQueryAttr->order.order);
|
|
|
|
|
bool ascQuery = QUERY_IS_ASC_QUERY(pQueryAttr);
|
|
|
|
|
|
|
|
|
|
TSKEY* tsCols = NULL;
|
|
|
|
|
if (pSDataBlock->pDataBlock != NULL) {
|
|
|
|
@ -1598,34 +1587,35 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t startPos = ascQuery? 0 : (pSDataBlock->info.rows - 1);
|
|
|
|
|
TSKEY ts = getStartTsKey(pQuery, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
|
|
|
|
|
TSKEY ts = getStartTsKey(pQueryAttr, &pSDataBlock->info.window, tsCols, pSDataBlock->info.rows);
|
|
|
|
|
|
|
|
|
|
STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQuery);
|
|
|
|
|
STimeWindow win = getCurrentActiveTimeWindow(pResultRowInfo, ts, pQueryAttr);
|
|
|
|
|
bool masterScan = IS_MASTER_SCAN(pRuntimeEnv);
|
|
|
|
|
|
|
|
|
|
SResultRow* pResult = NULL;
|
|
|
|
|
int32_t forwardStep = 0;
|
|
|
|
|
int32_t ret = 0;
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
|
// null data, failed to allocate more memory buffer
|
|
|
|
|
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId,
|
|
|
|
|
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
|
|
|
|
ret = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult,
|
|
|
|
|
tableGroupId, pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
|
|
|
|
|
if (ret != TSDB_CODE_SUCCESS) {
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
TSKEY ekey = reviseWindowEkey(pQuery, &win);
|
|
|
|
|
forwardStep = getNumOfRowsInTimeWindow(pQuery, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
|
|
|
|
|
TSKEY ekey = reviseWindowEkey(pQueryAttr, &win);
|
|
|
|
|
forwardStep = getNumOfRowsInTimeWindow(pRuntimeEnv, &pSDataBlock->info, tsCols, startPos, ekey, binarySearchForKey, true);
|
|
|
|
|
|
|
|
|
|
// window start(end) key interpolation
|
|
|
|
|
doWindowBorderInterpolation(pOperatorInfo, pSDataBlock, pInfo->pCtx, pResult, &win, startPos, forwardStep);
|
|
|
|
|
doApplyFunctions(pRuntimeEnv, pInfo->pCtx, &win, startPos, forwardStep, tsCols, pSDataBlock->info.rows, numOfOutput);
|
|
|
|
|
|
|
|
|
|
int32_t prevEndPos = (forwardStep - 1) * step + startPos;
|
|
|
|
|
startPos = getNextQualifiedWindow(pQuery, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos);
|
|
|
|
|
startPos = getNextQualifiedWindow(pQueryAttr, &win, &pSDataBlock->info, tsCols, binarySearchForKey, prevEndPos);
|
|
|
|
|
if (startPos < 0) {
|
|
|
|
|
if (win.skey <= pQuery->window.ekey) {
|
|
|
|
|
int32_t code = setWindowOutputBufByKey(pRuntimeEnv, pResultRowInfo, &win, masterScan, &pResult, groupId,
|
|
|
|
|
if (win.skey <= pQueryAttr->window.ekey) {
|
|
|
|
|
int32_t code = setResultOutputBufByKey(pRuntimeEnv, pResultRowInfo, pSDataBlock->info.tid, &win, masterScan, &pResult, tableGroupId,
|
|
|
|
|
pInfo->pCtx, numOfOutput, pInfo->rowCellInfoOffset);
|
|
|
|
|
if (code != TSDB_CODE_SUCCESS || pResult == NULL) {
|
|
|
|
|
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
|
|
|
@ -1643,13 +1633,12 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|
|
|
|
setResultRowInterpo(pResult, RESULT_ROW_END_INTERP);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (pQuery->timeWindowInterpo) {
|
|
|
|
|
if (pQueryAttr->timeWindowInterpo) {
|
|
|
|
|
int32_t rowIndex = ascQuery? (pSDataBlock->info.rows-1):0;
|
|
|
|
|
saveDataBlockLastRow(pRuntimeEnv, &pSDataBlock->info, pSDataBlock->pDataBlock, rowIndex);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
updateResultRowInfoActiveIndex(pResultRowInfo, pQuery, pQuery->current->lastKey);
|
|
|
|
|
#endif
|
|
|
|
|
updateResultRowInfoActiveIndex(pResultRowInfo, pQueryAttr, pRuntimeEnv->current->lastKey);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -2150,6 +2139,12 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|
|
|
|
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case OP_AllMultiTableTimeInterval: {
|
|
|
|
|
pRuntimeEnv->proot =
|
|
|
|
|
createAllMultiTableTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
|
|
|
|
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case OP_TimeWindow: {
|
|
|
|
|
pRuntimeEnv->proot =
|
|
|
|
|
createTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
|
|
|
@ -2159,6 +2154,15 @@ static int32_t setupQueryRuntimeEnv(SQueryRuntimeEnv *pRuntimeEnv, int32_t numOf
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case OP_AllTimeWindow: {
|
|
|
|
|
pRuntimeEnv->proot =
|
|
|
|
|
createAllTimeIntervalOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
|
|
|
|
int32_t opType = pRuntimeEnv->proot->upstream[0]->operatorType;
|
|
|
|
|
if (opType != OP_DummyInput && opType != OP_Join) {
|
|
|
|
|
setTableScanFilterOperatorInfo(pRuntimeEnv->proot->upstream[0]->info, pRuntimeEnv->proot);
|
|
|
|
|
}
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
case OP_Groupby: {
|
|
|
|
|
pRuntimeEnv->proot =
|
|
|
|
|
createGroupbyOperatorInfo(pRuntimeEnv, pRuntimeEnv->proot, pQueryAttr->pExpr1, pQueryAttr->numOfOutput);
|
|
|
|
@ -3098,7 +3102,7 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|
|
|
|
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
|
|
|
|
|
|
|
|
|
|
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
|
|
|
|
|
if (pQueryAttr->interpQuery) {
|
|
|
|
|
if (pQueryAttr->pointInterpQuery) {
|
|
|
|
|
needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
|
|
|
|
|
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
|
|
|
|
pTableScanInfo->rowCellInfoOffset);
|
|
|
|
@ -5769,6 +5773,66 @@ static SSDataBlock* doIntervalAgg(void* param, bool* newgroup) {
|
|
|
|
|
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SSDataBlock* doAllIntervalAgg(void* param, bool* newgroup) {
|
|
|
|
|
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
|
|
|
|
if (pOperator->status == OP_EXEC_DONE) {
|
|
|
|
|
return NULL;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
|
|
|
|
|
|
|
|
|
|
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
|
|
|
|
if (pOperator->status == OP_RES_TO_RETURN) {
|
|
|
|
|
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
|
|
|
|
|
|
|
|
|
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
|
|
|
|
pOperator->status = OP_EXEC_DONE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return pIntervalInfo->pRes;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
|
|
|
|
int32_t order = pQueryAttr->order.order;
|
|
|
|
|
STimeWindow win = pQueryAttr->window;
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* upstream = pOperator->upstream[0];
|
|
|
|
|
|
|
|
|
|
while(1) {
|
|
|
|
|
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
|
|
|
|
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
|
|
|
|
|
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
|
|
|
|
|
|
|
|
|
if (pBlock == NULL) {
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
setTagValue(pOperator, pRuntimeEnv->current->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
|
|
|
|
|
|
|
|
|
|
// the pDataBlock are always the same one, no need to call this again
|
|
|
|
|
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order);
|
|
|
|
|
hashAllIntervalAgg(pOperator, &pIntervalInfo->resultRowInfo, pBlock, 0);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// restore the value
|
|
|
|
|
pQueryAttr->order.order = order;
|
|
|
|
|
pQueryAttr->window = win;
|
|
|
|
|
|
|
|
|
|
pOperator->status = OP_RES_TO_RETURN;
|
|
|
|
|
closeAllResultRows(&pIntervalInfo->resultRowInfo);
|
|
|
|
|
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
|
|
|
|
|
finalizeQueryResult(pOperator, pIntervalInfo->pCtx, &pIntervalInfo->resultRowInfo, pIntervalInfo->rowCellInfoOffset);
|
|
|
|
|
|
|
|
|
|
initGroupResInfo(&pRuntimeEnv->groupResInfo, &pIntervalInfo->resultRowInfo);
|
|
|
|
|
toSSDataBlock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pIntervalInfo->pRes);
|
|
|
|
|
|
|
|
|
|
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) {
|
|
|
|
|
pOperator->status = OP_EXEC_DONE;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return pIntervalInfo->pRes->info.rows == 0? NULL:pIntervalInfo->pRes;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
|
|
|
|
|
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
|
|
|
|
if (pOperator->status == OP_EXEC_DONE) {
|
|
|
|
@ -6502,6 +6566,32 @@ SOperatorInfo* createTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOp
|
|
|
|
|
appendUpstream(pOperator, upstream);
|
|
|
|
|
return pOperator;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* createAllTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
|
|
|
|
STableIntervalOperatorInfo* pInfo = calloc(1, sizeof(STableIntervalOperatorInfo));
|
|
|
|
|
|
|
|
|
|
pInfo->pCtx = createSQLFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->rowCellInfoOffset);
|
|
|
|
|
pInfo->pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity);
|
|
|
|
|
initResultRowInfo(&pInfo->resultRowInfo, 8, TSDB_DATA_TYPE_INT);
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
|
|
|
|
|
|
|
|
|
|
pOperator->name = "AllTimeIntervalAggOperator";
|
|
|
|
|
pOperator->operatorType = OP_AllTimeWindow;
|
|
|
|
|
pOperator->blockingOptr = true;
|
|
|
|
|
pOperator->status = OP_IN_EXECUTING;
|
|
|
|
|
pOperator->pExpr = pExpr;
|
|
|
|
|
pOperator->numOfOutput = numOfOutput;
|
|
|
|
|
pOperator->info = pInfo;
|
|
|
|
|
pOperator->pRuntimeEnv = pRuntimeEnv;
|
|
|
|
|
pOperator->exec = doAllIntervalAgg;
|
|
|
|
|
pOperator->cleanup = destroyBasicOperatorInfo;
|
|
|
|
|
|
|
|
|
|
appendUpstream(pOperator, upstream);
|
|
|
|
|
return pOperator;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
|
|
|
|
SStateWindowOperatorInfo* pInfo = calloc(1, sizeof(SStateWindowOperatorInfo));
|
|
|
|
|
pInfo->colIndex = -1;
|
|
|
|
@ -6525,7 +6615,6 @@ SOperatorInfo* createStatewindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOpe
|
|
|
|
|
appendUpstream(pOperator, upstream);
|
|
|
|
|
return pOperator;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
SOperatorInfo* createSWindowOperatorInfo(SQueryRuntimeEnv* pRuntimeEnv, SOperatorInfo* upstream, SExprInfo* pExpr, int32_t numOfOutput) {
|
|
|
|
|
SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo));
|
|
|
|
|
|
|
|
|
|