fix merge issue
This commit is contained in:
parent
c13c0f5727
commit
b8883c59be
|
@ -457,17 +457,24 @@ static bool chkResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *p
|
|||
(SResultRow **)taosHashGet(pRuntimeEnv->pResultRowHashTable, pRuntimeEnv->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes));
|
||||
|
||||
// in case of repeat scan/reverse scan, no new time window added.
|
||||
if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQuery)) {
|
||||
if (QUERY_IS_INTERVAL_QUERY(pRuntimeEnv->pQueryAttr)) {
|
||||
if (!masterscan) { // the *p1 may be NULL in case of sliding+offset exists.
|
||||
return p1 != NULL;
|
||||
}
|
||||
|
||||
if (p1 != NULL) {
|
||||
for(int32_t i = pResultRowInfo->size - 1; i >= 0; --i) {
|
||||
if (pResultRowInfo->pResult[i] == (*p1)) {
|
||||
pResultRowInfo->curIndex = i;
|
||||
if (pResultRowInfo->size == 0) {
|
||||
existed = false;
|
||||
assert(pResultRowInfo->curPos == -1);
|
||||
} else if (pResultRowInfo->size == 1) {
|
||||
existed = (pResultRowInfo->pResult[0] == (*p1));
|
||||
} else { // check if current pResultRowInfo contains the existed pResultRow
|
||||
SET_RES_EXT_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, uid, pResultRowInfo);
|
||||
int64_t* index = taosHashGet(pRuntimeEnv->pResultRowListSet, pRuntimeEnv->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes));
|
||||
if (index != NULL) {
|
||||
existed = true;
|
||||
break;
|
||||
} else {
|
||||
existed = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -479,8 +486,8 @@ static bool chkResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *p
|
|||
}
|
||||
|
||||
|
||||
static SResultRow *doPrepareResultRowFromKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, char *pData,
|
||||
int16_t bytes, bool masterscan, uint64_t uid) {
|
||||
static SResultRow* doSetResultOutBufByKey(SQueryRuntimeEnv* pRuntimeEnv, SResultRowInfo* pResultRowInfo, int64_t tid,
|
||||
char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId) {
|
||||
bool existed = false;
|
||||
SET_RES_WINDOW_KEY(pRuntimeEnv->keyBuf, pData, bytes, tableGroupId);
|
||||
|
||||
|
@ -624,8 +631,9 @@ static STimeWindow getActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t
|
|||
}
|
||||
|
||||
// get the correct time window according to the handled timestamp
|
||||
static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQuery *pQuery) {
|
||||
static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, int64_t ts, SQueryAttr *pQuery) {
|
||||
STimeWindow w = {0};
|
||||
#if 0
|
||||
|
||||
if (pResultRowInfo->curIndex == -1) { // the first window, from the previous stored value
|
||||
if (pResultRowInfo->prevSKey == TSKEY_INITIAL_VAL) {
|
||||
|
@ -653,7 +661,7 @@ static STimeWindow getCurrentActiveTimeWindow(SResultRowInfo * pResultRowInfo, i
|
|||
if (w.ekey > pQuery->window.ekey && QUERY_IS_ASC_QUERY(pQuery)) {
|
||||
w.ekey = pQuery->window.ekey;
|
||||
}
|
||||
|
||||
#endif
|
||||
return w;
|
||||
}
|
||||
|
||||
|
@ -712,8 +720,8 @@ static bool chkWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInf
|
|||
return chkResultRowFromKey(pRuntimeEnv, pResultRowInfo, (char *)&win->skey, TSDB_KEYSIZE, masterscan, groupId);
|
||||
}
|
||||
|
||||
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, STimeWindow *win,
|
||||
bool masterscan, SResultRow **pResult, int64_t groupId, SQLFunctionCtx* pCtx,
|
||||
static int32_t setResultOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SResultRowInfo *pResultRowInfo, int64_t tid, STimeWindow *win,
|
||||
bool masterscan, SResultRow **pResult, int64_t tableGroupId, SQLFunctionCtx* pCtx,
|
||||
int32_t numOfOutput, int32_t* rowCellInfoOffset) {
|
||||
assert(win->skey <= win->ekey);
|
||||
SDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
||||
|
@ -840,7 +848,7 @@ static void doUpdateResultRowIndex(SResultRowInfo*pResultRowInfo, TSKEY lastKey,
|
|||
}
|
||||
}
|
||||
|
||||
pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey;
|
||||
//pResultRowInfo->prevSKey = pResultRowInfo->pResult[pResultRowInfo->curIndex]->win.skey;
|
||||
}
|
||||
|
||||
static void updateResultRowInfoActiveIndex(SResultRowInfo* pResultRowInfo, SQueryAttr* pQueryAttr, TSKEY lastKey) {
|
||||
|
@ -1051,7 +1059,7 @@ static int32_t getNextQualifiedWindow(SQueryAttr* pQueryAttr, STimeWindow *pNext
|
|||
}
|
||||
|
||||
/* interp query with fill should not skip time window */
|
||||
if (pQuery->interpQuery && pQuery->fillType != TSDB_FILL_NONE) {
|
||||
if (pQueryAttr->interpQuery && pQueryAttr->fillType != TSDB_FILL_NONE) {
|
||||
return startPos;
|
||||
}
|
||||
|
||||
|
@ -1569,11 +1577,14 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
|
|||
|
||||
|
||||
static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResultRowInfo, SSDataBlock* pSDataBlock, int32_t groupId) {
|
||||
(void)getCurrentActiveTimeWindow;
|
||||
|
||||
#if 0
|
||||
STableIntervalOperatorInfo* pInfo = (STableIntervalOperatorInfo*) pOperatorInfo->info;
|
||||
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperatorInfo->pRuntimeEnv;
|
||||
int32_t numOfOutput = pOperatorInfo->numOfOutput;
|
||||
SQuery* pQuery = pRuntimeEnv->pQuery;
|
||||
SQueryAttr* pQuery = pRuntimeEnv->pQueryAttr;
|
||||
|
||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||
bool ascQuery = QUERY_IS_ASC_QUERY(pQuery);
|
||||
|
@ -1638,6 +1649,7 @@ static void hashAllIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pRe
|
|||
}
|
||||
|
||||
updateResultRowInfoActiveIndex(pResultRowInfo, pQuery, pQuery->current->lastKey);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
@ -3086,18 +3098,12 @@ int32_t loadDataBlockOnDemand(SQueryRuntimeEnv* pRuntimeEnv, STableScanInfo* pTa
|
|||
TSKEY k = ascQuery? pBlock->info.window.skey : pBlock->info.window.ekey;
|
||||
|
||||
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQueryAttr);
|
||||
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
|
||||
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
||||
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
||||
STimeWindow win = getActiveTimeWindow(pTableScanInfo->pResultRowInfo, k, pQuery);
|
||||
if (pQuery->interpQuery) {
|
||||
if (pQueryAttr->interpQuery) {
|
||||
needFilter = chkWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
|
||||
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
||||
pTableScanInfo->rowCellInfoOffset);
|
||||
} else {
|
||||
if (setWindowOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, &win, masterScan, &pResult, groupId,
|
||||
if (setResultOutputBufByKey(pRuntimeEnv, pTableScanInfo->pResultRowInfo, pBlock->info.tid, &win, masterScan, &pResult, groupId,
|
||||
pTableScanInfo->pCtx, pTableScanInfo->numOfOutput,
|
||||
pTableScanInfo->rowCellInfoOffset) != TSDB_CODE_SUCCESS) {
|
||||
longjmp(pRuntimeEnv->env, TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
|
@ -5818,6 +5824,64 @@ static SSDataBlock* doSTableIntervalAgg(void* param, bool* newgroup) {
|
|||
return pIntervalInfo->pRes;
|
||||
}
|
||||
|
||||
static SSDataBlock* doAllSTableIntervalAgg(void* param, bool* newgroup) {
|
||||
SOperatorInfo* pOperator = (SOperatorInfo*) param;
|
||||
if (pOperator->status == OP_EXEC_DONE) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STableIntervalOperatorInfo* pIntervalInfo = pOperator->info;
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
|
||||
if (pOperator->status == OP_RES_TO_RETURN) {
|
||||
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
return pIntervalInfo->pRes;
|
||||
}
|
||||
|
||||
SQueryAttr* pQueryAttr = pRuntimeEnv->pQueryAttr;
|
||||
int32_t order = pQueryAttr->order.order;
|
||||
|
||||
SOperatorInfo* upstream = pOperator->upstream[0];
|
||||
|
||||
while(1) {
|
||||
publishOperatorProfEvent(upstream, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||
SSDataBlock* pBlock = upstream->exec(upstream, newgroup);
|
||||
publishOperatorProfEvent(upstream, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||
|
||||
if (pBlock == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
// the pDataBlock are always the same one, no need to call this again
|
||||
STableQueryInfo* pTableQueryInfo = pRuntimeEnv->current;
|
||||
|
||||
setTagValue(pOperator, pTableQueryInfo->pTable, pIntervalInfo->pCtx, pOperator->numOfOutput);
|
||||
setInputDataBlock(pOperator, pIntervalInfo->pCtx, pBlock, pQueryAttr->order.order);
|
||||
setIntervalQueryRange(pRuntimeEnv, pBlock->info.window.skey);
|
||||
|
||||
hashAllIntervalAgg(pOperator, &pTableQueryInfo->resInfo, pBlock, pTableQueryInfo->groupIndex);
|
||||
}
|
||||
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
pQueryAttr->order.order = order; // TODO : restore the order
|
||||
doCloseAllTimeWindow(pRuntimeEnv);
|
||||
setQueryStatus(pRuntimeEnv, QUERY_COMPLETED);
|
||||
|
||||
copyToSDataBlock(pRuntimeEnv, 3000, pIntervalInfo->pRes, pIntervalInfo->rowCellInfoOffset);
|
||||
if (pIntervalInfo->pRes->info.rows == 0 || !hasRemainData(&pRuntimeEnv->groupResInfo)) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
}
|
||||
|
||||
return pIntervalInfo->pRes;
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) {
|
||||
SQueryRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv;
|
||||
STableQueryInfo* item = pRuntimeEnv->current;
|
||||
|
@ -6524,7 +6588,6 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu
|
|||
pOperator->operatorType = OP_AllMultiTableTimeInterval;
|
||||
pOperator->blockingOptr = true;
|
||||
pOperator->status = OP_IN_EXECUTING;
|
||||
pOperator->upstream = upstream;
|
||||
pOperator->pExpr = pExpr;
|
||||
pOperator->numOfOutput = numOfOutput;
|
||||
pOperator->info = pInfo;
|
||||
|
@ -6533,6 +6596,8 @@ SOperatorInfo* createAllMultiTableTimeIntervalOperatorInfo(SQueryRuntimeEnv* pRu
|
|||
pOperator->exec = doAllSTableIntervalAgg;
|
||||
pOperator->cleanup = destroyBasicOperatorInfo;
|
||||
|
||||
appendUpstream(pOperator, upstream);
|
||||
|
||||
return pOperator;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue