fix bugs found in regression test.
This commit is contained in:
parent
05e8b2ab7e
commit
acc083b2b2
|
@ -74,7 +74,7 @@ static void validateTimestampForSupplementResult(SQueryRuntimeEnv *pRuntimeEn
|
||||||
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
|
static void getBasicCacheInfoSnapshot(SQuery *pQuery, SCacheInfo *pCacheInfo, int32_t vid);
|
||||||
static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
|
static TSKEY getQueryPositionForCacheInvalid(SQueryRuntimeEnv *pRuntimeEnv, __block_search_fn_t searchFn);
|
||||||
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
|
static bool functionNeedToExecute(SQueryRuntimeEnv *pRuntimeEnv, SQLFunctionCtx *pCtx, int32_t functionId);
|
||||||
static void getNextTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow);
|
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow);
|
||||||
|
|
||||||
static int32_t getGroupResultId(int32_t groupIndex) {
|
static int32_t getGroupResultId(int32_t groupIndex) {
|
||||||
int32_t base = 200000;
|
int32_t base = 200000;
|
||||||
|
@ -1024,28 +1024,33 @@ SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv *pRuntimeEnv, void *pBlock, int32_
|
||||||
* @return TRUE means query not completed, FALSE means query is completed
|
* @return TRUE means query not completed, FALSE means query is completed
|
||||||
*/
|
*/
|
||||||
static bool queryPausedInCurrentBlock(SQuery *pQuery, SBlockInfo *pBlockInfo, int32_t forwardStep) {
|
static bool queryPausedInCurrentBlock(SQuery *pQuery, SBlockInfo *pBlockInfo, int32_t forwardStep) {
|
||||||
|
// current query completed
|
||||||
|
if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
|
(pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||||
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// output buffer is full, pause current query
|
||||||
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
|
if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
|
||||||
// assert(pQuery->checkBufferInLoop == 1 && pQuery->over == QUERY_RESBUF_FULL && pQuery->pointsOffset == 0);
|
|
||||||
|
|
||||||
assert((QUERY_IS_ASC_QUERY(pQuery) && forwardStep + pQuery->pos <= pBlockInfo->size) ||
|
assert((QUERY_IS_ASC_QUERY(pQuery) && forwardStep + pQuery->pos <= pBlockInfo->size) ||
|
||||||
(!QUERY_IS_ASC_QUERY(pQuery) && pQuery->pos - forwardStep + 1 >= 0));
|
(!QUERY_IS_ASC_QUERY(pQuery) && pQuery->pos - forwardStep + 1 >= 0));
|
||||||
|
|
||||||
// current query completed
|
|
||||||
if ((pQuery->lastKey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
|
||||||
(pQuery->lastKey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
} else { // query completed
|
|
||||||
if ((pQuery->ekey <= pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
|
|
||||||
(pQuery->ekey >= pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) {
|
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// query completed
|
||||||
|
if ((pQuery->ekey <= pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
|
(pQuery->ekey >= pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||||
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1242,18 +1247,17 @@ static void *getGenericDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static SBlockInfo getBlockInfo(SQueryRuntimeEnv* pRuntimeEnv) {
|
static SBlockInfo getBlockInfo(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
SMeterObj* pMeterObj = pRuntimeEnv->pMeterObj;
|
SMeterObj *pMeterObj = pRuntimeEnv->pMeterObj;
|
||||||
|
|
||||||
void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
|
void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
|
||||||
assert(pBlock != NULL);
|
assert(pBlock != NULL);
|
||||||
|
|
||||||
int32_t blockType = IS_DISK_DATA_BLOCK(pQuery) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
|
int32_t blockType = IS_DISK_DATA_BLOCK(pQuery) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK;
|
||||||
return getBlockBasicInfo(pRuntimeEnv, pBlock, blockType);
|
return getBlockBasicInfo(pRuntimeEnv, pBlock, blockType);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t getFileIdFromKey(int32_t vid, TSKEY key) {
|
static int32_t getFileIdFromKey(int32_t vid, TSKEY key) {
|
||||||
SVnodeObj *pVnode = &vnodeList[vid];
|
SVnodeObj *pVnode = &vnodeList[vid];
|
||||||
int64_t delta = (int64_t)pVnode->cfg.daysPerFile * tsMsPerDay[(uint8_t)pVnode->cfg.precision];
|
int64_t delta = (int64_t)pVnode->cfg.daysPerFile * tsMsPerDay[(uint8_t)pVnode->cfg.precision];
|
||||||
|
@ -1513,6 +1517,15 @@ static STimeWindow getActiveTimeWindow(SWindowResInfo *pWindowResInfo, int64_t t
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(ts >= w.skey && ts <= w.ekey && w.skey != 0);
|
assert(ts >= w.skey && ts <= w.ekey && w.skey != 0);
|
||||||
|
|
||||||
|
// query border check
|
||||||
|
if (w.ekey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
|
w.ekey = pQuery->ekey;
|
||||||
|
}
|
||||||
|
if (w.skey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
|
w.skey = pQuery->ekey;
|
||||||
|
}
|
||||||
|
|
||||||
return w;
|
return w;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1557,7 +1570,7 @@ static int32_t addNewWindowResultBuf(SWindowResult *pWindowRes, SQueryDiskbasedR
|
||||||
|
|
||||||
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t sid,
|
static int32_t setWindowOutputBufByKey(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowResInfo, int32_t sid,
|
||||||
STimeWindow *win) {
|
STimeWindow *win) {
|
||||||
assert(win->skey < win->ekey);
|
assert(win->skey <= win->ekey);
|
||||||
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
SQueryDiskbasedResultBuf *pResultBuf = pRuntimeEnv->pResultBuf;
|
||||||
|
|
||||||
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE);
|
SWindowResult *pWindowRes = doSetTimeWindowFromKey(pRuntimeEnv, pWindowResInfo, (char *)&win->skey, TSDB_KEYSIZE);
|
||||||
|
@ -1661,47 +1674,47 @@ static int32_t getNumOfRowsInTimeWindow(SQuery *pQuery, SBlockInfo *pBlockInfo,
|
||||||
TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) {
|
TSKEY ekey, __block_search_fn_t searchFn, bool updateLastKey) {
|
||||||
assert(startPos >= 0 && startPos < pBlockInfo->size);
|
assert(startPos >= 0 && startPos < pBlockInfo->size);
|
||||||
|
|
||||||
int32_t forwardStep = -1;
|
int32_t num = -1;
|
||||||
int32_t order = pQuery->order.order;
|
int32_t order = pQuery->order.order;
|
||||||
|
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(order);
|
||||||
|
|
||||||
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
if (QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
if (ekey < pBlockInfo->keyLast) {
|
if (ekey < pBlockInfo->keyLast) {
|
||||||
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, ekey, startPos, order, pPrimaryColumn);
|
num = getForwardStepsInBlock(pBlockInfo->size, searchFn, ekey, startPos, order, pPrimaryColumn);
|
||||||
if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value
|
if (num == 0) { // no qualified data in current block, do not update the lastKey value
|
||||||
assert(ekey < pPrimaryColumn[startPos]);
|
assert(ekey < pPrimaryColumn[startPos]);
|
||||||
} else {
|
} else {
|
||||||
if (updateLastKey) {
|
if (updateLastKey) {
|
||||||
pQuery->lastKey = MAX(ekey, pPrimaryColumn[startPos + (forwardStep - 1)]) + step;
|
pQuery->lastKey = MAX(ekey, pPrimaryColumn[startPos + (num - 1)]) + step;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
forwardStep = pBlockInfo->size - startPos;
|
num = pBlockInfo->size - startPos;
|
||||||
if (updateLastKey) {
|
if (updateLastKey) {
|
||||||
pQuery->lastKey = pBlockInfo->keyLast + step;
|
pQuery->lastKey = pBlockInfo->keyLast + step;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else { // desc
|
} else { // desc
|
||||||
if (ekey > pBlockInfo->keyFirst) {
|
if (ekey > pBlockInfo->keyFirst) {
|
||||||
forwardStep = getForwardStepsInBlock(pBlockInfo->size, searchFn, ekey, startPos, order, pPrimaryColumn);
|
num = getForwardStepsInBlock(pBlockInfo->size, searchFn, ekey, startPos, order, pPrimaryColumn);
|
||||||
if (forwardStep == 0) { // no qualified data in current block, do not update the lastKey value
|
if (num == 0) { // no qualified data in current block, do not update the lastKey value
|
||||||
assert(ekey > pPrimaryColumn[startPos]);
|
assert(ekey > pPrimaryColumn[startPos]);
|
||||||
} else {
|
} else {
|
||||||
if (updateLastKey) {
|
if (updateLastKey) {
|
||||||
pQuery->lastKey = MIN(ekey, pPrimaryColumn[startPos - (forwardStep - 1)]) + step;
|
pQuery->lastKey = MIN(ekey, pPrimaryColumn[startPos - (num - 1)]) + step;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
forwardStep = startPos + 1;
|
num = startPos + 1;
|
||||||
if (updateLastKey) {
|
if (updateLastKey) {
|
||||||
pQuery->lastKey = pBlockInfo->keyFirst + step;
|
pQuery->lastKey = pBlockInfo->keyFirst + step;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
assert(forwardStep >= 0);
|
assert(num >= 0);
|
||||||
return forwardStep;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin,
|
static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin,
|
||||||
|
@ -1723,7 +1736,7 @@ static void doBlockwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStat
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin) {
|
static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus *pStatus, STimeWindow *pWin, int32_t offset) {
|
||||||
SQuery * pQuery = pRuntimeEnv->pQuery;
|
SQuery * pQuery = pRuntimeEnv->pQuery;
|
||||||
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
SQLFunctionCtx *pCtx = pRuntimeEnv->pCtx;
|
||||||
|
|
||||||
|
@ -1733,7 +1746,7 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus
|
||||||
|
|
||||||
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
|
int32_t functionId = pQuery->pSelectExpr[k].pBase.functionId;
|
||||||
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
if (functionNeedToExecute(pRuntimeEnv, &pCtx[k], functionId)) {
|
||||||
aAggs[functionId].xFunction(&pCtx[k]);
|
aAggs[functionId].xFunctionF(&pCtx[k], offset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1745,14 +1758,14 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
SQuery *pQuery = pRuntimeEnv->pQuery;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
getNextTimeWindow(pRuntimeEnv, pNextWin);
|
getNextTimeWindow(pQuery, pNextWin);
|
||||||
|
|
||||||
if (pWindowResInfo->startTime > pNextWin->skey || (pNextWin->skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
if (pWindowResInfo->startTime > pNextWin->skey || (pNextWin->skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
(pNextWin->ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
(pNextWin->ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// next time window not in current block
|
// next time window is not in current block
|
||||||
if ((pNextWin->skey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
|
if ((pNextWin->skey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
(pNextWin->ekey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) {
|
(pNextWin->ekey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1770,6 +1783,13 @@ static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pNextWin->ekey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
|
pNextWin->ekey = pQuery->ekey;
|
||||||
|
}
|
||||||
|
if (pNextWin->skey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery)) {
|
||||||
|
pNextWin->skey = pQuery->ekey;
|
||||||
|
}
|
||||||
|
|
||||||
return startPos;
|
return startPos;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1832,7 +1852,7 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t
|
||||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) {
|
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
TSKEY ekey = QUERY_IS_ASC_QUERY(pQuery) ? win.ekey : win.skey;
|
TSKEY ekey = QUERY_IS_ASC_QUERY(pQuery) ? win.ekey : win.skey;
|
||||||
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, false);
|
forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, false);
|
||||||
|
|
||||||
|
@ -2029,7 +2049,7 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
||||||
|
|
||||||
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo);
|
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo);
|
||||||
assert(num >= 0 && num <= numOfClosed);
|
assert(num >= 0 && num <= numOfClosed);
|
||||||
|
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
SWindowResult *pResult = &pWindowResInfo->pResult[i];
|
||||||
if (pResult->status.closed) { // remove the window slot from hash table
|
if (pResult->status.closed) { // remove the window slot from hash table
|
||||||
|
@ -2038,34 +2058,34 @@ void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t remain = pWindowResInfo->size - num;
|
int32_t remain = pWindowResInfo->size - num;
|
||||||
|
|
||||||
// clear all the closed windows from the window list
|
// clear all the closed windows from the window list
|
||||||
for (int32_t k = 0; k < remain; ++k) {
|
for (int32_t k = 0; k < remain; ++k) {
|
||||||
copyTimeWindowResBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[num + k]);
|
copyTimeWindowResBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[num + k]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// move the unclosed window in the front of the window list
|
// move the unclosed window in the front of the window list
|
||||||
for (int32_t k = remain; k < pWindowResInfo->size; ++k) {
|
for (int32_t k = remain; k < pWindowResInfo->size; ++k) {
|
||||||
SWindowResult *pWindowRes = &pWindowResInfo->pResult[k];
|
SWindowResult *pWindowRes = &pWindowResInfo->pResult[k];
|
||||||
clearTimeWindowResBuf(pRuntimeEnv, pWindowRes);
|
clearTimeWindowResBuf(pRuntimeEnv, pWindowRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
pWindowResInfo->size = remain;
|
pWindowResInfo->size = remain;
|
||||||
|
|
||||||
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
|
for (int32_t k = 0; k < pWindowResInfo->size; ++k) {
|
||||||
SWindowResult *pResult = &pWindowResInfo->pResult[k];
|
SWindowResult *pResult = &pWindowResInfo->pResult[k];
|
||||||
int32_t *p = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
|
int32_t *p = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey,
|
||||||
TSDB_KEYSIZE);
|
TSDB_KEYSIZE);
|
||||||
int32_t v = (*p - remain);
|
int32_t v = (*p - remain);
|
||||||
|
|
||||||
// todo add the update function for hash table
|
// todo add the update function for hash table
|
||||||
taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
|
taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE);
|
||||||
taosAddToHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v,
|
taosAddToHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v,
|
||||||
sizeof(int32_t));
|
sizeof(int32_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
pWindowResInfo->curIndex = -1;
|
pWindowResInfo->curIndex = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2074,7 +2094,7 @@ void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) {
|
if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo);
|
int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo);
|
||||||
clearFirstNTimeWindow(pRuntimeEnv, numOfClosed);
|
clearFirstNTimeWindow(pRuntimeEnv, numOfClosed);
|
||||||
}
|
}
|
||||||
|
@ -2297,35 +2317,34 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t *
|
||||||
offset -= pCtx[0].startOffset;
|
offset -= pCtx[0].startOffset;
|
||||||
|
|
||||||
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
SWindowStatus *pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
||||||
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win);
|
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &win, offset);
|
||||||
|
|
||||||
lastKey = ts;
|
lastKey = ts;
|
||||||
int32_t prev = pWindowResInfo->curIndex;
|
|
||||||
STimeWindow nextWin = win;
|
STimeWindow nextWin = win;
|
||||||
|
int32_t index = pWindowResInfo->curIndex;
|
||||||
|
int32_t sid = pRuntimeEnv->pMeterObj->sid;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
getNextTimeWindow(pRuntimeEnv, &nextWin);
|
getNextTimeWindow(pQuery, &nextWin);
|
||||||
if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
(nextWin.skey > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
(nextWin.skey > pQuery->skey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||||
pWindowResInfo->curIndex = prev;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ts >= nextWin.skey && ts <= nextWin.ekey) {
|
if (ts < nextWin.skey || ts > nextWin.ekey) {
|
||||||
// null data, failed to allocate more memory buffer
|
|
||||||
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &nextWin) !=
|
|
||||||
TSDB_CODE_SUCCESS) {
|
|
||||||
pWindowResInfo->curIndex = prev;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
|
||||||
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin);
|
|
||||||
} else {
|
|
||||||
pWindowResInfo->curIndex = prev;
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// null data, failed to allocate more memory buffer
|
||||||
|
if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, sid, &nextWin) != TSDB_CODE_SUCCESS) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStatus = getTimeWindowResStatus(pWindowResInfo, curTimeWindow(pWindowResInfo));
|
||||||
|
doRowwiseApplyFunctions(pRuntimeEnv, pStatus, &nextWin, offset);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pWindowResInfo->curIndex = index;
|
||||||
} else { // other queries
|
} else { // other queries
|
||||||
// decide which group this rows belongs to according to current state value
|
// decide which group this rows belongs to according to current state value
|
||||||
if (groupbyStateValue) {
|
if (groupbyStateValue) {
|
||||||
|
@ -2454,7 +2473,12 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *
|
||||||
|
|
||||||
TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst;
|
TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst;
|
||||||
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
|
doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo);
|
||||||
|
|
||||||
|
// interval query with limit applied
|
||||||
|
if (pQuery->intervalTime > 0 && pQuery->limit.limit > 0 && pQuery->limit.limit <= numOfClosedTimeWindow(pWindowResInfo)) {
|
||||||
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
|
}
|
||||||
|
|
||||||
assert(*numOfRes >= 0);
|
assert(*numOfRes >= 0);
|
||||||
|
|
||||||
// check if buffer is large enough for accommodating all qualified points
|
// check if buffer is large enough for accommodating all qualified points
|
||||||
|
@ -4025,7 +4049,7 @@ static int32_t doSkipDataBlock(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
assert(pBlock != NULL);
|
assert(pBlock != NULL);
|
||||||
|
|
||||||
SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv);
|
SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv);
|
||||||
|
|
||||||
int32_t maxReads = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.size - pQuery->pos : pQuery->pos + 1;
|
int32_t maxReads = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.size - pQuery->pos : pQuery->pos + 1;
|
||||||
assert(maxReads >= 0);
|
assert(maxReads >= 0);
|
||||||
|
|
||||||
|
@ -4055,7 +4079,7 @@ void forwardQueryStartPosition(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
|
||||||
void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
|
void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot);
|
||||||
assert(pBlock != NULL);
|
assert(pBlock != NULL);
|
||||||
|
|
||||||
SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv);
|
SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv);
|
||||||
|
|
||||||
// get the qualified data that can be skipped
|
// get the qualified data that can be skipped
|
||||||
|
@ -4093,9 +4117,9 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj *
|
||||||
if (pQuery->intervalTime > 0) {
|
if (pQuery->intervalTime > 0) {
|
||||||
int16_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int16_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pRuntimeEnv->pMeterObj->searchAlgorithm];
|
__block_search_fn_t searchFn = vnodeSearchKeyFunc[pRuntimeEnv->pMeterObj->searchAlgorithm];
|
||||||
SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
SWindowResInfo * pWindowResInfo = &pRuntimeEnv->windowResInfo;
|
||||||
|
|
||||||
TSKEY* primaryKey = (TSKEY*) pRuntimeEnv->primaryColBuffer->data;
|
TSKEY * primaryKey = (TSKEY *)pRuntimeEnv->primaryColBuffer->data;
|
||||||
STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQuery);
|
STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQuery);
|
||||||
|
|
||||||
while (pQuery->limit.offset > 0) {
|
while (pQuery->limit.offset > 0) {
|
||||||
|
@ -4110,7 +4134,7 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj *
|
||||||
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
|
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// next block does not included in time range, abort query
|
// next block does not included in time range, abort query
|
||||||
blockInfo = getBlockInfo(pRuntimeEnv);
|
blockInfo = getBlockInfo(pRuntimeEnv);
|
||||||
if ((blockInfo.keyFirst > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
if ((blockInfo.keyFirst > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
|
@ -4118,7 +4142,7 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj *
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// set the window that start from the next data block
|
// set the window that start from the next data block
|
||||||
win = getActiveTimeWindow(pWindowResInfo, blockInfo.keyFirst, pQuery);
|
win = getActiveTimeWindow(pWindowResInfo, blockInfo.keyFirst, pQuery);
|
||||||
} else {
|
} else {
|
||||||
|
@ -4127,18 +4151,18 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj *
|
||||||
if (IS_DISK_DATA_BLOCK(pQuery)) {
|
if (IS_DISK_DATA_BLOCK(pQuery)) {
|
||||||
getTimestampInDiskBlock(pRuntimeEnv, 0);
|
getTimestampInDiskBlock(pRuntimeEnv, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
STimeWindow nextWin = win;
|
STimeWindow nextWin = win;
|
||||||
int32_t startPos =
|
int32_t startPos =
|
||||||
getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, &blockInfo, primaryKey, searchFn);
|
getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, &blockInfo, primaryKey, searchFn);
|
||||||
|
|
||||||
if (startPos < 0) { // failed to find the qualified time window
|
if (startPos < 0) { // failed to find the qualified time window
|
||||||
assert((nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
assert((nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
(nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery)));
|
(nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery)));
|
||||||
|
|
||||||
setQueryStatus(pQuery, QUERY_COMPLETED);
|
setQueryStatus(pQuery, QUERY_COMPLETED);
|
||||||
break;
|
break;
|
||||||
} else { // set the abort info
|
} else { // set the abort info
|
||||||
pQuery->pos = startPos;
|
pQuery->pos = startPos;
|
||||||
pQuery->lastKey = primaryKey[startPos];
|
pQuery->lastKey = primaryKey[startPos];
|
||||||
win = nextWin;
|
win = nextWin;
|
||||||
|
@ -4152,7 +4176,7 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj *
|
||||||
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
|
if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
blockInfo = getBlockInfo(pRuntimeEnv);
|
blockInfo = getBlockInfo(pRuntimeEnv);
|
||||||
if ((blockInfo.keyFirst > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
if ((blockInfo.keyFirst > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) ||
|
||||||
(blockInfo.keyLast < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
(blockInfo.keyLast < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) {
|
||||||
|
@ -5188,12 +5212,12 @@ static void doHandleDataBlockImpl(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo *pbl
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void getNextTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pTimeWindow) {
|
// previous time window may not be of the same size of pQuery->intervalTime
|
||||||
SQuery *pQuery = pRuntimeEnv->pQuery;
|
static void getNextTimeWindow(SQuery *pQuery, STimeWindow *pTimeWindow) {
|
||||||
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
int32_t factor = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order);
|
||||||
|
|
||||||
pTimeWindow->skey += (pQuery->slidingTime * factor);
|
pTimeWindow->skey += (pQuery->slidingTime * factor);
|
||||||
pTimeWindow->ekey += (pQuery->slidingTime * factor);
|
pTimeWindow->ekey = pTimeWindow->skey + (pQuery->intervalTime - 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
|
@ -5245,16 +5269,18 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) {
|
||||||
int32_t nextPos = accessPos + step;
|
int32_t nextPos = accessPos + step;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* set the next access position, nextPos only required by
|
* set the next access position, nextPos only required when the interval query and projection query
|
||||||
* 1. interval query.
|
* that cause output buffer overflow. When the query is completed, no need to load the next block any more.
|
||||||
* 2. multi-output query that may cause buffer overflow.
|
|
||||||
*/
|
*/
|
||||||
if (nextPos >= blockInfo.size || nextPos < 0) {
|
if (!Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED) && Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) {
|
||||||
moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA);
|
if (nextPos >= blockInfo.size || nextPos < 0) {
|
||||||
// slot/pos/fileId is updated in moveToNextBlock function
|
moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA);
|
||||||
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos);
|
|
||||||
} else {
|
// slot/pos/fileId is updated in moveToNextBlock function
|
||||||
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, accessPos + step);
|
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos);
|
||||||
|
} else {
|
||||||
|
savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, nextPos);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
|
@ -7393,7 +7419,7 @@ static int32_t doCopyToSData(STableQuerySupportObj *pSupporter, SWindowResult *r
|
||||||
if (orderType == TSQL_SO_ASC) {
|
if (orderType == TSQL_SO_ASC) {
|
||||||
startIdx = pSupporter->subgroupIdx;
|
startIdx = pSupporter->subgroupIdx;
|
||||||
step = 1;
|
step = 1;
|
||||||
} else {// desc order copy all data
|
} else { // desc order copy all data
|
||||||
startIdx = totalSubset - pSupporter->subgroupIdx - 1;
|
startIdx = totalSubset - pSupporter->subgroupIdx - 1;
|
||||||
step = -1;
|
step = -1;
|
||||||
}
|
}
|
||||||
|
@ -7424,7 +7450,7 @@ static int32_t doCopyToSData(STableQuerySupportObj *pSupporter, SWindowResult *r
|
||||||
|
|
||||||
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
|
for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) {
|
||||||
int32_t size = pRuntimeEnv->pCtx[j].outputBytes;
|
int32_t size = pRuntimeEnv->pCtx[j].outputBytes;
|
||||||
|
|
||||||
char *out = pQuery->sdata[j]->data + numOfResult * size;
|
char *out = pQuery->sdata[j]->data + numOfResult * size;
|
||||||
char *in = getPosInResultPage(pRuntimeEnv, j, &result[i]);
|
char *in = getPosInResultPage(pRuntimeEnv, j, &result[i]);
|
||||||
memcpy(out, in + oldOffset * size, size * numOfRowsToCopy);
|
memcpy(out, in + oldOffset * size, size * numOfRowsToCopy);
|
||||||
|
|
Loading…
Reference in New Issue