fix bugs in sliding query processing
This commit is contained in:
parent
eb39c4bf3f
commit
2a89ac7d7f
|
@ -609,6 +609,10 @@ int32_t parseIntervalClause(SQueryInfo* pQueryInfo, SQuerySQL* pQuerySql) {
|
||||||
|
|
||||||
// for top/bottom + interval query, we do not add additional timestamp column in the front
|
// for top/bottom + interval query, we do not add additional timestamp column in the front
|
||||||
if (isTopBottomQuery(pQueryInfo)) {
|
if (isTopBottomQuery(pQueryInfo)) {
|
||||||
|
if (parseSlidingClause(pQueryInfo, pQuerySql) != TSDB_CODE_SUCCESS) {
|
||||||
|
return TSDB_CODE_INVALID_SQL;
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -282,7 +282,7 @@ void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pOneOutpu
|
||||||
void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* dst, const SWindowResult* src);
|
void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult* dst, const SWindowResult* src);
|
||||||
|
|
||||||
void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo* pWindowResInfo);
|
void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo* pWindowResInfo);
|
||||||
void clearCompletedSlidingWindows(SQueryRuntimeEnv* pRuntimeEnv);
|
void clearClosedSlidingWindows(SQueryRuntimeEnv* pRuntimeEnv);
|
||||||
int32_t numOfClosedSlidingWindow(SWindowResInfo* pWindowResInfo);
|
int32_t numOfClosedSlidingWindow(SWindowResInfo* pWindowResInfo);
|
||||||
void closeSlidingWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
|
void closeSlidingWindow(SWindowResInfo* pWindowResInfo, int32_t slot);
|
||||||
void closeAllSlidingWindow(SWindowResInfo* pWindowResInfo);
|
void closeAllSlidingWindow(SWindowResInfo* pWindowResInfo);
|
||||||
|
|
|
@ -1010,16 +1010,10 @@ SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void *pBlock, int32_
|
||||||
return blockInfo;
|
return blockInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool checkQueryRangeAgainstNextBlock(SBlockInfo *pBlockInfo, SQueryRuntimeEnv *pRuntimeEnv) {
|
static bool checkQueryRangeAgainstNextBlock(SBlockInfo *pBlockInfo, SQuery *pQuery) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
|
||||||
|
|
||||||
if ((QUERY_IS_ASC_QUERY(pQuery) && pBlockInfo->keyFirst > pQuery->ekey) ||
|
if ((QUERY_IS_ASC_QUERY(pQuery) && pBlockInfo->keyFirst > pQuery->ekey) ||
|
||||||
(!QUERY_IS_ASC_QUERY(pQuery) && pBlockInfo->keyLast < pQuery->ekey)) {
|
(!QUERY_IS_ASC_QUERY(pQuery) && pBlockInfo->keyLast < pQuery->ekey)) {
|
||||||
int32_t pos = QUERY_IS_ASC_QUERY(pQuery) ? 0 : pBlockInfo->size - 1;
|
|
||||||
|
|
||||||
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pos);
|
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1033,7 +1027,7 @@ static bool checkQueryRangeAgainstNextBlock(SBlockInfo *pBlockInfo, SQueryRuntim
|
||||||
* @param forwardStep
|
* @param forwardStep
|
||||||
* @return TRUE means query not completed, FALSE means query is completed
|
* @return TRUE means query not completed, FALSE means query is completed
|
||||||
*/
|
*/
|
||||||
static bool queryCompleteInBlock(SQuery *pQuery, SBlockInfo *pBlockInfo, int32_t forwardStep) {
|
static bool queryPausedInCurrentBlock(SQuery *pQuery, SBlockInfo *pBlockInfo, int32_t forwardStep) {
|
||||||
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
|
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
|
||||||
// assert(pQuery->checkBufferInLoop == 1 && pQuery->over == QUERY_RESBUF_FULL && pQuery->pointsOffset == 0);
|
// assert(pQuery->checkBufferInLoop == 1 && pQuery->over == QUERY_RESBUF_FULL && pQuery->pointsOffset == 0);
|
||||||
|
|
||||||
|
@ -1466,12 +1460,12 @@ static SWindowResult *doSetSlidingWindowFromKey(SQueryRuntimeEnv *pRuntimeEnv, S
|
||||||
// todo
|
// todo
|
||||||
}
|
}
|
||||||
|
|
||||||
pWindowResInfo->capacity = newCap;
|
|
||||||
|
|
||||||
for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) {
|
for (int32_t i = pWindowResInfo->capacity; i < newCap; ++i) {
|
||||||
SPosInfo pos = {-1, -1};
|
SPosInfo pos = {-1, -1};
|
||||||
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos);
|
createQueryResultInfo(pQuery, &pWindowResInfo->pResult[i], pRuntimeEnv->stableQuery, &pos);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pWindowResInfo->capacity = newCap;
|
||||||
}
|
}
|
||||||
|
|
||||||
// add a new result set for a new group
|
// add a new result set for a new group
|
||||||
|
@ -1490,13 +1484,16 @@ static STimeWindow getActiveSlidingWindow(SWindowResInfo *pWindowResInfo, int64_
|
||||||
w.skey = pWindowResInfo->prevSKey;
|
w.skey = pWindowResInfo->prevSKey;
|
||||||
w.ekey = w.skey + pQuery->intervalTime - 1;
|
w.ekey = w.skey + pQuery->intervalTime - 1;
|
||||||
} else {
|
} else {
|
||||||
int32_t slot = curSlidingWindow(pWindowResInfo);
|
int32_t slot = curSlidingWindow(pWindowResInfo);
|
||||||
STimeWindow *window = &pWindowResInfo->pResult[slot].window;
|
w = pWindowResInfo->pResult[slot].window;
|
||||||
|
}
|
||||||
|
// STimeWindow *window = &pWindowResInfo->pResult[slot].window;
|
||||||
|
|
||||||
if (window->skey <= ts && window->ekey >= ts) {
|
if (w.skey > ts || w.ekey < ts) {
|
||||||
w = *window; // belongs to current active window
|
// if (w.skey <= ts && w.ekey >= ts) {
|
||||||
} else {
|
// w = *window; // belongs to current active window
|
||||||
int64_t st = window->skey;
|
// } else {
|
||||||
|
int64_t st = w.skey;
|
||||||
|
|
||||||
while (st > ts) {
|
while (st > ts) {
|
||||||
st -= pQuery->slidingTime;
|
st -= pQuery->slidingTime;
|
||||||
|
@ -1509,7 +1506,6 @@ static STimeWindow getActiveSlidingWindow(SWindowResInfo *pWindowResInfo, int64_
|
||||||
w.skey = st;
|
w.skey = st;
|
||||||
w.ekey = w.skey + pQuery->intervalTime - 1;
|
w.ekey = w.skey + pQuery->intervalTime - 1;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
assert(ts >= w.skey && ts <= w.ekey);
|
assert(ts >= w.skey && ts <= w.ekey);
|
||||||
return w;
|
return w;
|
||||||
|
@ -1582,6 +1578,7 @@ static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowRes
|
||||||
}
|
}
|
||||||
|
|
||||||
static SWindowStatus *getSlidingWindowStatus(SWindowResInfo *pWindowResInfo, int32_t slot) {
|
static SWindowStatus *getSlidingWindowStatus(SWindowResInfo *pWindowResInfo, int32_t slot) {
|
||||||
|
assert(slot >= 0 && slot < pWindowResInfo->size);
|
||||||
return &pWindowResInfo->pResult[slot].status;
|
return &pWindowResInfo->pResult[slot].status;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1629,8 +1626,14 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// all windows are closed, set the last one to be the skey
|
||||||
|
if (skey == 0) {
|
||||||
|
skey = pWindowResInfo->pResult[pWindowResInfo->size-1].window.skey;
|
||||||
|
}
|
||||||
|
|
||||||
pWindowResInfo->prevSKey = skey;
|
pWindowResInfo->prevSKey = skey;
|
||||||
|
assert(skey != 0);
|
||||||
|
|
||||||
// the number of completed slots are larger than the threshold, dump to client immediately.
|
// the number of completed slots are larger than the threshold, dump to client immediately.
|
||||||
int32_t v = numOfClosedSlidingWindow(pWindowResInfo);
|
int32_t v = numOfClosedSlidingWindow(pWindowResInfo);
|
||||||
if (v > pWindowResInfo->threshold) {
|
if (v > pWindowResInfo->threshold) {
|
||||||
|
@ -1639,6 +1642,8 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey,
|
||||||
|
|
||||||
dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v);
|
dTrace("QInfo:%p total window:%d, closed:%d", GET_QINFO_ADDR(pQuery), pWindowResInfo->size, v);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert(pWindowResInfo->prevSKey != 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1696,8 +1701,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
|
||||||
TSKEY ts = primaryKeyCol[offset];
|
TSKEY ts = primaryKeyCol[offset];
|
||||||
|
|
||||||
STimeWindow win = getActiveSlidingWindow(pWindowResInfo, ts, pQuery);
|
STimeWindow win = getActiveSlidingWindow(pWindowResInfo, ts, pQuery);
|
||||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win);
|
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) {
|
||||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1715,24 +1719,28 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
|
||||||
forwardStep = pQuery->pos + 1;
|
forwardStep = pQuery->pos + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
|
SWindowStatus* pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
|
||||||
pCtx[k].nStartQueryTimestamp = win.skey;
|
|
||||||
pCtx[k].size = forwardStep;
|
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
|
||||||
pCtx[k].startOffset = pQuery->pos;
|
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
|
||||||
|
pCtx[k].nStartQueryTimestamp = win.skey;
|
||||||
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
|
pCtx[k].size = forwardStep;
|
||||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery)) ? pQuery->pos : pQuery->pos - (forwardStep - 1);
|
||||||
aAggs[functionId].xFunction(&pCtx[k]);
|
|
||||||
|
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
|
||||||
|
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||||
|
aAggs[functionId].xFunction(&pCtx[k]);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t index = pWindowResInfo->curIndex;
|
int32_t index = pWindowResInfo->curIndex;
|
||||||
STimeWindow nextWin = win;
|
STimeWindow nextWin = win;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
getNextLogicalQueryRange(pRuntimeEnv, &nextWin);
|
getNextLogicalQueryRange(pRuntimeEnv, &nextWin);
|
||||||
|
|
||||||
if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
(nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
(nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||||
pWindowResInfo->curIndex = index;
|
pWindowResInfo->curIndex = index;
|
||||||
|
@ -1782,23 +1790,21 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
|
||||||
forwardStep = startPos + 1;
|
forwardStep = startPos + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
|
||||||
|
|
||||||
|
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
|
||||||
|
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
|
||||||
|
pCtx[k].nStartQueryTimestamp = nextWin.skey;
|
||||||
|
pCtx[k].size = forwardStep;
|
||||||
|
pCtx[k].startOffset = (QUERY_IS_ASC_QUERY(pQuery))? startPos : startPos - (forwardStep - 1);
|
||||||
|
|
||||||
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
|
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
|
||||||
pCtx[k].nStartQueryTimestamp = nextWin.skey;
|
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||||
pCtx[k].size = forwardStep;
|
aAggs[functionId].xFunction(&pCtx[k]);
|
||||||
pCtx[k].startOffset = startPos;
|
}
|
||||||
|
|
||||||
SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
|
|
||||||
if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
|
|
||||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
|
||||||
aAggs[functionId].xFunction(&pCtx[k]);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// } else {
|
// } else {
|
||||||
// pWindowResInfo->curIndex = index;
|
// pWindowResInfo->curIndex = index;
|
||||||
// break;
|
// break;
|
||||||
|
@ -1941,8 +1947,8 @@ void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWind
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
|
for (int32_t i = 0; i < pWindowResInfo->size; ++i) {
|
||||||
SWindowResult *pOneRes = &pWindowResInfo->pResult[i];
|
SWindowResult *pWindowRes = &pWindowResInfo->pResult[i];
|
||||||
clearGroupResultBuf(pRuntimeEnv, pOneRes);
|
clearGroupResultBuf(pRuntimeEnv, pWindowRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
pWindowResInfo->curIndex = -1;
|
pWindowResInfo->curIndex = -1;
|
||||||
|
@ -1956,7 +1962,7 @@ void resetSlidingWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWind
|
||||||
pWindowResInfo->prevSKey = 0;
|
pWindowResInfo->prevSKey = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearCompletedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) {
|
void clearClosedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) {
|
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) {
|
||||||
return;
|
return;
|
||||||
|
@ -1972,33 +1978,34 @@ void clearCompletedSlidingWindows(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// no window is closed, no need to clear the window list
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t remain = pWindowResInfo->size - i;
|
int32_t unclosed = pWindowResInfo->size - i;
|
||||||
|
|
||||||
// clear remain list
|
// clear all the closed windows from the window list
|
||||||
for (int32_t k = 0; k < remain; ++k) {
|
for (int32_t k = 0; k < unclosed; ++k) {
|
||||||
copyGroupResultBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[i + k]);
|
copyGroupResultBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[i + k]);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t k = remain; k < pWindowResInfo->size; ++k) {
|
// move the unclosed window in the front of the window list
|
||||||
SWindowResult *pOneRes = &pWindowResInfo->pResult[k];
|
for (int32_t k = unclosed; k < pWindowResInfo->size; ++k) {
|
||||||
clearGroupResultBuf(pRuntimeEnv, pOneRes);
|
SWindowResult *pWindowRes = &pWindowResInfo->pResult[k];
|
||||||
|
clearGroupResultBuf(pRuntimeEnv, pWindowRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
pWindowResInfo->size = remain;
|
pWindowResInfo->size = unclosed;
|
||||||
|
|
||||||
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
|
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
|
||||||
SWindowResult *pResult = &pWindowResInfo->pResult[k];
|
SWindowResult *pResult = &pWindowResInfo->pResult[k];
|
||||||
int32_t *p = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
|
int32_t *p = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
|
||||||
TSDB_KEYSIZE);
|
TSDB_KEYSIZE);
|
||||||
int32_t v = *p;
|
int32_t v = (*p - i);
|
||||||
v = (v - i);
|
|
||||||
|
//todo add the update function for hash table
|
||||||
taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
|
taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
|
||||||
|
|
||||||
taosAddToHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v,
|
taosAddToHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v,
|
||||||
sizeof(int32_t));
|
sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
@ -2213,7 +2220,7 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
|
||||||
// decide the time window according to the primary timestamp
|
// decide the time window according to the primary timestamp
|
||||||
int64_t ts = primaryKeyCol[offset];
|
int64_t ts = primaryKeyCol[offset];
|
||||||
STimeWindow win = getActiveSlidingWindow(pWindowResInfo, ts, pQuery);
|
STimeWindow win = getActiveSlidingWindow(pWindowResInfo, ts, pQuery);
|
||||||
|
|
||||||
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win);
|
int32_t ret = setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win);
|
||||||
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
if (ret != TSDB_CODE_SUCCESS) { // null data, too many state code
|
||||||
continue;
|
continue;
|
||||||
|
@ -2221,33 +2228,29 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
|
||||||
|
|
||||||
// all startOffset are identical
|
// all startOffset are identical
|
||||||
offset -= pCtx[0].startOffset;
|
offset -= pCtx[0].startOffset;
|
||||||
|
|
||||||
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
|
SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
|
||||||
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
|
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
|
||||||
pCtx[k].nStartQueryTimestamp = win.skey;
|
|
||||||
|
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
|
||||||
SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
|
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
|
||||||
|
pCtx[k].nStartQueryTimestamp = win.skey;
|
||||||
if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) {
|
|
||||||
// qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld",
|
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||||
// GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey);
|
aAggs[functionId].xFunctionF(&pCtx[k], offset);
|
||||||
continue;
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
|
||||||
aAggs[functionId].xFunctionF(&pCtx[k], offset);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
lastKey = ts;
|
lastKey = ts;
|
||||||
int32_t index = pWindowResInfo->curIndex;
|
int32_t prev = pWindowResInfo->curIndex;
|
||||||
|
|
||||||
STimeWindow nextWin = win;
|
STimeWindow nextWin = win;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
getNextLogicalQueryRange(pRuntimeEnv, &nextWin);
|
getNextLogicalQueryRange(pRuntimeEnv, &nextWin);
|
||||||
if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
(nextWin.skey > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
(nextWin.skey > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||||
pWindowResInfo->curIndex = index;
|
pWindowResInfo->curIndex = prev;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2255,27 +2258,23 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
|
||||||
// null data, failed to allocate more memory buffer
|
// null data, failed to allocate more memory buffer
|
||||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &nextWin) !=
|
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &nextWin) !=
|
||||||
TSDB_CODE_SUCCESS) {
|
TSDB_CODE_SUCCESS) {
|
||||||
pWindowResInfo->curIndex = index;
|
pWindowResInfo->curIndex = prev;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
|
||||||
|
if (IS_MASTER_SCAN(pRuntimeEnv) || pStatus->closed) {
|
||||||
|
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
|
||||||
|
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
|
||||||
|
pCtx[k].nStartQueryTimestamp = nextWin.skey;
|
||||||
|
|
||||||
for (int32_t k = 0; k < pQuery->numOfOutputCols; ++k) {
|
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||||
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
|
aAggs[functionId].xFunctionF(&pCtx[k], offset);
|
||||||
pCtx[k].nStartQueryTimestamp = nextWin.skey;
|
}
|
||||||
|
|
||||||
SWindowStatus *pStatus = getSlidingWindowStatus(pWindowResInfo, curSlidingWindow(pWindowResInfo));
|
|
||||||
if (!IS_MASTER_SCAN(pRuntimeEnv) && !pStatus->closed) {
|
|
||||||
// qTrace("QInfo:%p not completed in supplementary scan, ignore funcId:%d, window:%lld-%lld",
|
|
||||||
// GET_QINFO_ADDR(pQuery), functionId, pStatus->window.skey, pStatus->window.ekey);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
|
||||||
aAggs[functionId].xFunctionF(&pCtx[k], offset);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pWindowResInfo->curIndex = index;
|
pWindowResInfo->curIndex = prev;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2389,47 +2388,37 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
|
|
||||||
validateQueryRangeAndData(pRuntimeEnv, pPrimaryColumn, pBlockInfo);
|
validateQueryRangeAndData(pRuntimeEnv, pPrimaryColumn, pBlockInfo);
|
||||||
|
|
||||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
if (pQuery->ekey < pBlockInfo->keyLast) {
|
if (pQuery->ekey < pBlockInfo->keyLast) {
|
||||||
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, pQuery->ekey, pQuery->pos, pQuery->order.order,
|
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, pQuery->ekey, pQuery->pos, pQuery->order.order,
|
||||||
pPrimaryColumn);
|
pPrimaryColumn);
|
||||||
assert(forwardStep >= 0);
|
if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value
|
||||||
|
|
||||||
if (forwardStep == 0) {
|
|
||||||
// no qualified data in current block, do not update the lastKey value
|
|
||||||
assert(pQuery->ekey < pPrimaryColumn[pQuery->pos]);
|
assert(pQuery->ekey < pPrimaryColumn[pQuery->pos]);
|
||||||
} else { // todo MAX()!
|
} else {
|
||||||
pQuery->lastKey = pQuery->ekey + step; // pPrimaryColumn[pQuery->pos + (forwardStep - 1)] + step;
|
pQuery->lastKey = MAX(pQuery->ekey, pPrimaryColumn[pQuery->pos + (forwardStep - 1)]) + step;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
forwardStep = pBlockInfo->size - pQuery->pos;
|
forwardStep = pBlockInfo->size - pQuery->pos;
|
||||||
assert(forwardStep > 0);
|
|
||||||
|
|
||||||
pQuery->lastKey = pBlockInfo->keyLast + step;
|
pQuery->lastKey = pBlockInfo->keyLast + step;
|
||||||
}
|
}
|
||||||
} else { // desc
|
} else { // desc
|
||||||
if (pQuery->ekey > pBlockInfo->keyFirst) {
|
if (pQuery->ekey > pBlockInfo->keyFirst) {
|
||||||
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, pQuery->ekey, pQuery->pos, pQuery->order.order,
|
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, pQuery->ekey, pQuery->pos, pQuery->order.order,
|
||||||
pPrimaryColumn);
|
pPrimaryColumn);
|
||||||
assert(forwardStep >= 0);
|
if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value
|
||||||
|
|
||||||
if (forwardStep == 0) {
|
|
||||||
// no qualified data in current block, do not update the lastKey value
|
|
||||||
assert(pQuery->ekey > pPrimaryColumn[pQuery->pos]);
|
assert(pQuery->ekey > pPrimaryColumn[pQuery->pos]);
|
||||||
} else {
|
} else {
|
||||||
pQuery->lastKey = pQuery->ekey + step; // pPrimaryColumn[pQuery->pos - (forwardStep - 1)] + step;
|
pQuery->lastKey = MIN(pQuery->ekey, pPrimaryColumn[pQuery->pos - (forwardStep - 1)]) + step;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
forwardStep = pQuery->pos + 1;
|
forwardStep = pQuery->pos + 1;
|
||||||
assert(forwardStep > 0);
|
|
||||||
|
|
||||||
pQuery->lastKey = pBlockInfo->keyFirst + step;
|
pQuery->lastKey = pBlockInfo->keyFirst + step;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
assert(forwardStep >= 0);
|
||||||
|
|
||||||
int32_t newForwardStep = reviseForwardSteps(pRuntimeEnv, forwardStep);
|
int32_t newForwardStep = reviseForwardSteps(pRuntimeEnv, forwardStep);
|
||||||
assert(newForwardStep <= forwardStep && newForwardStep >= 0);
|
assert(newForwardStep <= forwardStep && newForwardStep >= 0);
|
||||||
|
@ -4380,8 +4369,8 @@ static int32_t createQueryResultBuffer(SQueryRuntimeEnv *pRuntimeEnv, int32_t nu
|
||||||
// pRuntimeEnv->windowResInfo.pResult = calloc(numOfRows, sizeof(SWindowResult));
|
// pRuntimeEnv->windowResInfo.pResult = calloc(numOfRows, sizeof(SWindowResult));
|
||||||
//
|
//
|
||||||
// for (int32_t k = 0; k < numOfRows; ++k) {
|
// for (int32_t k = 0; k < numOfRows; ++k) {
|
||||||
// SWindowResult *pOneRes = &pRuntimeEnv->windowResInfo.pResult[k];
|
// SWindowResult *pWindowRes = &pRuntimeEnv->windowResInfo.pResult[k];
|
||||||
// pOneRes->nAlloc = 1;
|
// pWindowRes->nAlloc = 1;
|
||||||
//
|
//
|
||||||
// /*
|
// /*
|
||||||
// * for single table top/bottom query, the output for group by normal column, the output rows is
|
// * for single table top/bottom query, the output for group by normal column, the output rows is
|
||||||
|
@ -4391,7 +4380,7 @@ static int32_t createQueryResultBuffer(SQueryRuntimeEnv *pRuntimeEnv, int32_t nu
|
||||||
// assert(pQuery->numOfOutputCols > 1);
|
// assert(pQuery->numOfOutputCols > 1);
|
||||||
//
|
//
|
||||||
// SSqlFunctionExpr *pExpr = &pQuery->pSelectExpr[1];
|
// SSqlFunctionExpr *pExpr = &pQuery->pSelectExpr[1];
|
||||||
// pOneRes->nAlloc = pExpr->pBase.arg[0].argValue.i64;
|
// pWindowRes->nAlloc = pExpr->pBase.arg[0].argValue.i64;
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// if (page == NULL || page->numOfElems >= pRuntimeEnv->numOfRowsPerPage) {
|
// if (page == NULL || page->numOfElems >= pRuntimeEnv->numOfRowsPerPage) {
|
||||||
|
@ -4401,7 +4390,7 @@ static int32_t createQueryResultBuffer(SQueryRuntimeEnv *pRuntimeEnv, int32_t nu
|
||||||
// assert(pageId >= 0);
|
// assert(pageId >= 0);
|
||||||
//
|
//
|
||||||
// SPosInfo posInfo = {.pageId = pageId, .rowId = page->numOfElems};
|
// SPosInfo posInfo = {.pageId = pageId, .rowId = page->numOfElems};
|
||||||
// createQueryResultInfo(pQuery, pOneRes, isSTableQuery, &posInfo);
|
// createQueryResultInfo(pQuery, pWindowRes, isSTableQuery, &posInfo);
|
||||||
// page->numOfElems += 1; // next row is available
|
// page->numOfElems += 1; // next row is available
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
@ -4563,8 +4552,9 @@ int32_t vnodeQuerySingleMeterPrepare(SQInfo *pQInfo, SMeterObj *pMeterObj, SMete
|
||||||
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false);
|
pRuntimeEnv->numOfRowsPerPage = getNumOfRowsInResultPage(pQuery, false);
|
||||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) {
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) {
|
||||||
int32_t rows = initialNumOfRows(pSupporter);
|
int32_t rows = initialNumOfRows(pSupporter);
|
||||||
|
|
||||||
if ((code = createQueryResultBuffer(pRuntimeEnv, rows, false)) != TSDB_CODE_SUCCESS) {
|
code = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, rows, pQuery->rowSize);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4771,7 +4761,8 @@ int32_t vnodeMultiMeterQueryPrepare(SQInfo *pQInfo, SQuery *pQuery, void *param)
|
||||||
vnodeRecordAllFiles(pQInfo, pMeter->vnode);
|
vnodeRecordAllFiles(pQInfo, pMeter->vnode);
|
||||||
|
|
||||||
if (pQuery->intervalTime == 0 && pQuery->slidingTime <= 0) {
|
if (pQuery->intervalTime == 0 && pQuery->slidingTime <= 0) {
|
||||||
if ((ret = createQueryResultBuffer(pRuntimeEnv, 3, true)) != TSDB_CODE_SUCCESS) {
|
ret = createDiskbasedResultBuffer(&pRuntimeEnv->pResultBuf, 3, pQuery->rowSize);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5194,7 +5185,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
cnt += forwardStep;
|
cnt += forwardStep;
|
||||||
|
|
||||||
if (queryCompleteInBlock(pQuery, &blockInfo, forwardStep)) {
|
if (queryPausedInCurrentBlock(pQuery, &blockInfo, forwardStep)) {
|
||||||
int32_t nextPos = accessPos + step;
|
int32_t nextPos = accessPos + step;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -5203,33 +5194,38 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
* 2. multi-output query that may cause buffer overflow.
|
* 2. multi-output query that may cause buffer overflow.
|
||||||
*/
|
*/
|
||||||
if (pQuery->intervalTime > 0 ||
|
if (pQuery->intervalTime > 0 ||
|
||||||
(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL) && pQuery->checkBufferInLoop == 1)) {
|
(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)/* && pQuery->checkBufferInLoop == 1*/)) {
|
||||||
if (nextPos >= blockInfo.size || nextPos < 0) {
|
if (nextPos >= blockInfo.size || nextPos < 0) {
|
||||||
moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA);
|
moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA);
|
||||||
|
|
||||||
// slot/pos/fileId is updated in moveToNextBlock function
|
if (!Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) {
|
||||||
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos);
|
// slot/pos/fileId is updated in moveToNextBlock function
|
||||||
|
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos);
|
||||||
|
|
||||||
|
// check next block
|
||||||
|
void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
|
||||||
|
|
||||||
|
int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
|
||||||
|
blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType);
|
||||||
|
|
||||||
|
// check if need to close window result or not
|
||||||
|
if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) {
|
||||||
|
TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast;
|
||||||
|
doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, accessPos + step);
|
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, accessPos + step);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
} else { // query not completed, move to next block
|
} else { // query not completed, move to next block
|
||||||
int64_t start = taosGetTimestampUs();
|
|
||||||
|
|
||||||
blockLoadStatus = moveToNextBlock(pRuntimeEnv, step, searchFn, LOAD_DATA);
|
blockLoadStatus = moveToNextBlock(pRuntimeEnv, step, searchFn, LOAD_DATA);
|
||||||
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) {
|
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) {
|
||||||
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos);
|
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t delta = (taosGetTimestampUs() - start);
|
|
||||||
if (IS_DISK_DATA_BLOCK(pQuery)) {
|
|
||||||
pSummary->fileTimeUs += delta;
|
|
||||||
} else {
|
|
||||||
pSummary->cacheTimeUs += delta;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// check next block
|
// check next block
|
||||||
|
@ -5237,9 +5233,23 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
|
int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
|
||||||
blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType);
|
blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType);
|
||||||
if (!checkQueryRangeAgainstNextBlock(&blockInfo, pRuntimeEnv)) {
|
|
||||||
|
if ((QUERY_IS_ASC_QUERY(pQuery) && blockInfo.keyFirst > pQuery->ekey) ||
|
||||||
|
(!QUERY_IS_ASC_QUERY(pQuery) && blockInfo.keyLast < pQuery->ekey)) {
|
||||||
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// check if need to close window result or not
|
||||||
|
if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) {
|
||||||
|
TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast;
|
||||||
|
doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
if(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
} // while(1)
|
} // while(1)
|
||||||
|
|
||||||
return cnt;
|
return cnt;
|
||||||
|
@ -5914,26 +5924,39 @@ void createQueryResultInfo(SQuery *pQuery, SWindowResult *pResultRow, bool isSTa
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pOneOutputRes) {
|
void clearGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *pWindowRes) {
|
||||||
if (pOneOutputRes == NULL) {
|
if (pWindowRes == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) {
|
for (int32_t i = 0; i < pRuntimeEnv->pQuery->numOfOutputCols; ++i) {
|
||||||
SResultInfo *pResultInfo = &pOneOutputRes->resultInfo[i];
|
SResultInfo *pResultInfo = &pWindowRes->resultInfo[i];
|
||||||
|
|
||||||
char * s = getPosInResultPage(pRuntimeEnv, i, pOneOutputRes);
|
char * s = getPosInResultPage(pRuntimeEnv, i, pWindowRes);
|
||||||
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes;
|
size_t size = pRuntimeEnv->pQuery->pSelectExpr[i].resBytes;
|
||||||
memset(s, 0, size);
|
memset(s, 0, size);
|
||||||
|
|
||||||
resetResultInfo(pResultInfo);
|
resetResultInfo(pResultInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pWindowRes->numOfRows = 0;
|
||||||
|
pWindowRes->nAlloc = 0;
|
||||||
|
pWindowRes->pos = (SPosInfo){-1, -1};
|
||||||
|
pWindowRes->status.closed = false;
|
||||||
|
pWindowRes->window = (STimeWindow) {0, 0};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The source window result pos attribution of the source window result does not assign to the destination,
|
||||||
|
* since the attribute of "Pos" is bound to each window result when the window result is created in the
|
||||||
|
* disk-based result buffer.
|
||||||
|
*/
|
||||||
void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) {
|
void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const SWindowResult *src) {
|
||||||
dst->numOfRows = src->numOfRows;
|
dst->numOfRows = src->numOfRows;
|
||||||
dst->nAlloc = src->nAlloc;
|
dst->nAlloc = src->nAlloc;
|
||||||
|
dst->window = src->window;
|
||||||
|
dst->status = src->status;
|
||||||
|
|
||||||
int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutputCols;
|
int32_t nOutputCols = pRuntimeEnv->pQuery->numOfOutputCols;
|
||||||
|
|
||||||
for (int32_t i = 0; i < nOutputCols; ++i) {
|
for (int32_t i = 0; i < nOutputCols; ++i) {
|
||||||
|
@ -5956,16 +5979,16 @@ void copyGroupResultBuf(SQueryRuntimeEnv *pRuntimeEnv, SWindowResult *dst, const
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void destroyGroupResultBuf(SWindowResult *pOneOutputRes, int32_t nOutputCols) {
|
void destroyGroupResultBuf(SWindowResult *pWindowRes, int32_t nOutputCols) {
|
||||||
if (pOneOutputRes == NULL) {
|
if (pWindowRes == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < nOutputCols; ++i) {
|
for (int32_t i = 0; i < nOutputCols; ++i) {
|
||||||
free(pOneOutputRes->resultInfo[i].interResultBuf);
|
free(pWindowRes->resultInfo[i].interResultBuf);
|
||||||
}
|
}
|
||||||
|
|
||||||
free(pOneOutputRes->resultInfo);
|
free(pWindowRes->resultInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
|
void resetCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
@ -6005,7 +6028,7 @@ void forwardCtxOutputBuf(SQueryRuntimeEnv *pRuntimeEnv, int64_t output) {
|
||||||
|
|
||||||
// set next output position
|
// set next output position
|
||||||
if (IS_OUTER_FORWARD(aAggs[functionId].nStatus)) {
|
if (IS_OUTER_FORWARD(aAggs[functionId].nStatus)) {
|
||||||
pRuntimeEnv->pCtx[j].aOutputBuf += pRuntimeEnv->pCtx[j].outputBytes * output /** factor*/;
|
pRuntimeEnv->pCtx[j].aOutputBuf += pRuntimeEnv->pCtx[j].outputBytes * output;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
|
if (functionId == TSDB_FUNC_TOP || functionId == TSDB_FUNC_BOTTOM) {
|
||||||
|
@ -6239,8 +6262,7 @@ void vnodeScanAllData(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
doSingleMeterSupplementScan(pRuntimeEnv);
|
doSingleMeterSupplementScan(pRuntimeEnv);
|
||||||
|
|
||||||
// update the pQuery->skey/pQuery->ekey to limit the scan scope of sliding query during
|
// update the pQuery->skey/pQuery->ekey to limit the scan scope of sliding query during supplementary scan
|
||||||
// supplementary scan
|
|
||||||
pQuery->skey = newSkey;
|
pQuery->skey = newSkey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -6375,30 +6397,30 @@ void forwardIntervalQueryRange(SMeterQuerySupportObj *pSupporter, SQueryRuntimeE
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t r = getNextIntervalQueryRange(pSupporter, pRuntimeEnv, &pQuery->skey, &pQuery->ekey);
|
// int32_t r = getNextIntervalQueryRange(pSupporter, pRuntimeEnv, &pQuery->skey, &pQuery->ekey);
|
||||||
if (r == QUERY_COMPLETED) {
|
// if (r == QUERY_COMPLETED) {
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
// setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
getNextLogicalQueryRange(pRuntimeEnv, &pRuntimeEnv->intervalWindow);
|
// getNextLogicalQueryRange(pRuntimeEnv, &pRuntimeEnv->intervalWindow);
|
||||||
|
//
|
||||||
/* ensure the search in cache will return right position */
|
// /* ensure the search in cache will return right position */
|
||||||
pQuery->lastKey = pQuery->skey;
|
// pQuery->lastKey = pQuery->skey;
|
||||||
|
//
|
||||||
TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
|
// TSKEY nextTimestamp = loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
|
||||||
if ((nextTimestamp > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
// if ((nextTimestamp > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
(nextTimestamp < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery)) ||
|
// (nextTimestamp < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
|
// Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
// setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
return;
|
// return;
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
// bridge the gap in group by time function
|
// // bridge the gap in group by time function
|
||||||
if ((nextTimestamp > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
// if ((nextTimestamp > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
(nextTimestamp < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
// (nextTimestamp < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||||
getAlignedIntervalQueryRange(pRuntimeEnv, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey);
|
// getAlignedIntervalQueryRange(pRuntimeEnv, nextTimestamp, pSupporter->rawSKey, pSupporter->rawEKey);
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t offsetComparator(const void *pLeft, const void *pRight) {
|
static int32_t offsetComparator(const void *pLeft, const void *pRight) {
|
||||||
|
|
|
@ -1114,11 +1114,8 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
assert((pQuery->skey <= pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
|
||||||
(pQuery->skey >= pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery)));
|
|
||||||
|
|
||||||
initCtxOutputBuf(pRuntimeEnv);
|
initCtxOutputBuf(pRuntimeEnv);
|
||||||
clearCompletedSlidingWindows(pRuntimeEnv);
|
clearClosedSlidingWindows(pRuntimeEnv);
|
||||||
|
|
||||||
vnodeScanAllData(pRuntimeEnv);
|
vnodeScanAllData(pRuntimeEnv);
|
||||||
if (isQueryKilled(pQuery)) {
|
if (isQueryKilled(pQuery)) {
|
||||||
|
@ -1141,16 +1138,17 @@ static void vnodeSingleMeterIntervalMainLooper(SMeterQuerySupportObj *pSupporter
|
||||||
pQuery->limit.offset--;
|
pQuery->limit.offset--;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pQuery->pointsRead += maxOutput;
|
// assert(0);
|
||||||
forwardCtxOutputBuf(pRuntimeEnv, maxOutput);
|
// pQuery->pointsRead += maxOutput;
|
||||||
|
// forwardCtxOutputBuf(pRuntimeEnv, maxOutput);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
|
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
forwardIntervalQueryRange(pSupporter, pRuntimeEnv);
|
loadRequiredBlockIntoMem(pRuntimeEnv, &pRuntimeEnv->nextPos);
|
||||||
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED|QUERY_RESBUF_FULL)) {
|
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1180,7 +1178,13 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
|
||||||
while (1) {
|
while (1) {
|
||||||
resetCtxOutputBuf(pRuntimeEnv);
|
resetCtxOutputBuf(pRuntimeEnv);
|
||||||
vnodeSingleMeterIntervalMainLooper(pSupporter, pRuntimeEnv);
|
vnodeSingleMeterIntervalMainLooper(pSupporter, pRuntimeEnv);
|
||||||
|
|
||||||
|
if (pQuery->intervalTime > 0) {
|
||||||
|
pSupporter->subgroupIdx = 0;
|
||||||
|
pQuery->pointsRead = 0;
|
||||||
|
copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||||
|
}
|
||||||
|
|
||||||
// the offset is handled at prepare stage if no interpolation involved
|
// the offset is handled at prepare stage if no interpolation involved
|
||||||
if (pQuery->interpoType == TSDB_INTERPO_NONE) {
|
if (pQuery->interpoType == TSDB_INTERPO_NONE) {
|
||||||
doRevisedResultsByLimit(pQInfo);
|
doRevisedResultsByLimit(pQInfo);
|
||||||
|
@ -1208,8 +1212,9 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->slidingTime > 0 && pQuery->intervalTime > 0)) {
|
// all data scanned, the group by normal column can return
|
||||||
pQInfo->pMeterQuerySupporter->subgroupIdx = 0;
|
if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) {
|
||||||
|
pSupporter->subgroupIdx = 0;
|
||||||
pQuery->pointsRead = 0;
|
pQuery->pointsRead = 0;
|
||||||
copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue