From 7874adece96fe22dea581a82534dc5f8272abfb9 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Tue, 18 Feb 2020 01:30:52 +0800 Subject: [PATCH] fix bugs in handling the sliding query. --- src/system/detail/inc/vnodeQueryImpl.h | 13 +- src/system/detail/src/vnodeQueryImpl.c | 346 +++++++++++----------- src/system/detail/src/vnodeQueryProcess.c | 71 +++-- 3 files changed, 223 insertions(+), 207 deletions(-) diff --git a/src/system/detail/inc/vnodeQueryImpl.h b/src/system/detail/inc/vnodeQueryImpl.h index ba3f7efde8..4ad6e8e78b 100644 --- a/src/system/detail/inc/vnodeQueryImpl.h +++ b/src/system/detail/inc/vnodeQueryImpl.h @@ -56,7 +56,7 @@ typedef enum { * the program will call this function again, if this status is set. * used to transfer from QUERY_RESBUF_FULL */ - QUERY_NOT_COMPLETED = 0x1, + QUERY_NOT_COMPLETED = 0x1u, /* * output buffer is full, so, the next query will be employed, @@ -66,7 +66,7 @@ typedef enum { * this status is only exist in group-by clause and * diff/add/division/multiply/ query. */ - QUERY_RESBUF_FULL = 0x2, + QUERY_RESBUF_FULL = 0x2u, /* * query is over @@ -76,14 +76,13 @@ typedef enum { * 2. when the query range on timestamp is satisfied, it is also denoted as * query_compeleted */ - QUERY_COMPLETED = 0x4, + QUERY_COMPLETED = 0x4u, /* * all data has been scanned, so current search is stopped, * At last, the function will transfer this status to QUERY_COMPLETED */ - QUERY_NO_DATA_TO_CHECK = 0x8, - + QUERY_NO_DATA_TO_CHECK = 0x8u, } vnodeQueryStatus; typedef struct SPointInterpoSupporter { @@ -170,7 +169,7 @@ void disableFunctForSuppleScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); void enableFunctForMasterScan(SQueryRuntimeEnv* pRuntimeEnv, int32_t order); int32_t mergeMetersResultToOneGroups(STableQuerySupportObj* pSupporter); -void copyFromGroupBuf(SQInfo* pQInfo, SWindowResult* result); +void copyFromWindowResToSData(SQInfo* pQInfo, SWindowResult* result); SBlockInfo getBlockBasicInfo(SQueryRuntimeEnv* pRuntimeEnv, void* pBlock, int32_t blockType); SCacheBlock* getCacheDataBlock(SMeterObj* pMeterObj, SQueryRuntimeEnv* pRuntimeEnv, int32_t slot); @@ -291,6 +290,8 @@ int32_t initWindowResInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRun void cleanupTimeWindowInfo(SWindowResInfo* pWindowResInfo, SQueryRuntimeEnv* pRuntimeEnv); void resetTimeWindowInfo(SQueryRuntimeEnv* pRuntimeEnv, SWindowResInfo* pWindowResInfo); +void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num); + void clearClosedTimeWindow(SQueryRuntimeEnv* pRuntimeEnv); int32_t numOfClosedTimeWindow(SWindowResInfo* pWindowResInfo); void closeTimeWindow(SWindowResInfo* pWindowResInfo, int32_t slot); diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index 5e82c299d3..bd7cd224f1 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1248,6 +1248,18 @@ static void *getGenericDataBlock(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim } } +static SBlockInfo getBlockInfo(SQueryRuntimeEnv* pRuntimeEnv) { + SQuery *pQuery = pRuntimeEnv->pQuery; + SMeterObj* pMeterObj = pRuntimeEnv->pMeterObj; + + void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); + assert(pBlock != NULL); + + int32_t blockType = IS_DISK_DATA_BLOCK(pQuery) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; + return getBlockBasicInfo(pRuntimeEnv, pBlock, blockType); +} + + static int32_t getFileIdFromKey(int32_t vid, TSKEY key) { SVnodeObj *pVnode = &vnodeList[vid]; int64_t delta = (int64_t)pVnode->cfg.daysPerFile * tsMsPerDay[(uint8_t)pVnode->cfg.precision]; @@ -1733,6 +1745,41 @@ static void doRowwiseApplyFunctions(SQueryRuntimeEnv *pRuntimeEnv, SWindowStatus } } +static int32_t getNextQualifiedWindow(SQueryRuntimeEnv *pRuntimeEnv, STimeWindow *pNextWin, + SWindowResInfo *pWindowResInfo, SBlockInfo *pBlockInfo, TSKEY *primaryKeys, + __block_search_fn_t searchFn) { + SQuery *pQuery = pRuntimeEnv->pQuery; + + while (1) { + getNextTimeWindow(pRuntimeEnv, pNextWin); + + if (pWindowResInfo->startTime > pNextWin->skey || (pNextWin->skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (pNextWin->ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + return -1; + } + + // next time window not in current block + if ((pNextWin->skey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) || + (pNextWin->ekey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) { + return -1; + } + + TSKEY startKey = QUERY_IS_ASC_QUERY(pQuery) ? pNextWin->skey : pNextWin->ekey; + int32_t startPos = searchFn((char *)primaryKeys, pBlockInfo->size, startKey, pQuery->order.order); + + /* + * This time window does not cover any data, try next time window, + * this case may happen when the time window is too small + */ + if ((primaryKeys[startPos] > pNextWin->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (primaryKeys[startPos] < pNextWin->skey && !QUERY_IS_ASC_QUERY(pQuery))) { + continue; + } + + return startPos; + } +} + /** * * @param pRuntimeEnv @@ -1791,7 +1838,11 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, pRuntimeEnv->pMeterObj->sid, &win) != TSDB_CODE_SUCCESS) { return 0; } - + + if (win.skey == 1433955937630) { + int32_t k = 1; + } + TSKEY ekey = QUERY_IS_ASC_QUERY(pQuery) ? win.ekey : win.skey; forwardStep = getNumOfRowsInTimeWindow(pQuery, pBlockInfo, primaryKeyCol, pQuery->pos, ekey, searchFn, false); @@ -1802,31 +1853,16 @@ static int32_t blockwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t STimeWindow nextWin = win; while (1) { - getNextTimeWindow(pRuntimeEnv, &nextWin); - - if (pWindowResInfo->startTime > nextWin.skey || (nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + int32_t startPos = + getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, pBlockInfo, primaryKeyCol, searchFn); + if (startPos < 0) { break; } - // next time window not in current block - if ((nextWin.skey > pBlockInfo->keyLast && QUERY_IS_ASC_QUERY(pQuery)) || - (nextWin.ekey < pBlockInfo->keyFirst && !QUERY_IS_ASC_QUERY(pQuery))) { - break; + if (nextWin.skey == 1433955937630) { + int32_t k = 1; } - - TSKEY startKey = QUERY_IS_ASC_QUERY(pQuery) ? nextWin.skey : nextWin.ekey; - int32_t startPos = searchFn((char *)primaryKeyCol, pBlockInfo->size, startKey, pQuery->order.order); - - /* - * This time window does not cover any data, try next time window - * when the time window is too small, this case may happen - */ - if ((primaryKeyCol[startPos] > nextWin.ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (primaryKeyCol[startPos] < nextWin.skey && !QUERY_IS_ASC_QUERY(pQuery))) { - continue; - } - + // null data, failed to allocate more memory buffer int32_t sid = pRuntimeEnv->pMeterObj->sid; if (setWindowOutputBufByKey(pRuntimeEnv, pWindowResInfo, sid, &nextWin) != TSDB_CODE_SUCCESS) { @@ -1999,14 +2035,16 @@ void resetTimeWindowInfo(SQueryRuntimeEnv *pRuntimeEnv, SWindowResInfo *pWindowR pWindowResInfo->prevSKey = 0; } -void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) { +void clearFirstNTimeWindow(SQueryRuntimeEnv *pRuntimeEnv, int32_t num) { SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; - if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) { + if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0 || num == 0) { return; } - int32_t i = 0; - for (i = 0; i < pWindowResInfo->size; ++i) { + int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo); + assert(num >= 0 && num <= numOfClosed); + + for (int32_t i = 0; i < num; ++i) { SWindowResult *pResult = &pWindowResInfo->pResult[i]; if (pResult->status.closed) { // remove the window slot from hash table taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); @@ -2014,42 +2052,47 @@ void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) { break; } } - - // no window is closed, no need to clear the window list - if (i == 0) { - return; - } - - int32_t unclosed = pWindowResInfo->size - i; - + + int32_t remain = pWindowResInfo->size - num; + // clear all the closed windows from the window list - for (int32_t k = 0; k < unclosed; ++k) { - copyTimeWindowResBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[i + k]); + for (int32_t k = 0; k < remain; ++k) { + copyTimeWindowResBuf(pRuntimeEnv, &pWindowResInfo->pResult[k], &pWindowResInfo->pResult[num + k]); } - + // move the unclosed window in the front of the window list - for (int32_t k = unclosed; k < pWindowResInfo->size; ++k) { + for (int32_t k = remain; k < pWindowResInfo->size; ++k) { SWindowResult *pWindowRes = &pWindowResInfo->pResult[k]; clearTimeWindowResBuf(pRuntimeEnv, pWindowRes); } - - pWindowResInfo->size = unclosed; - + + pWindowResInfo->size = remain; + for (int32_t k = 0; k < pWindowResInfo->size; ++k) { SWindowResult *pResult = &pWindowResInfo->pResult[k]; int32_t *p = (int32_t *)taosGetDataFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); - int32_t v = (*p - i); - + int32_t v = (*p - remain); + // todo add the update function for hash table taosDeleteFromHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE); taosAddToHashTable(pWindowResInfo->hashList, (const char *)&pResult->window.skey, TSDB_KEYSIZE, (char *)&v, sizeof(int32_t)); } - + pWindowResInfo->curIndex = -1; } +void clearClosedTimeWindow(SQueryRuntimeEnv *pRuntimeEnv) { + SWindowResInfo *pWindowResInfo = &pRuntimeEnv->windowResInfo; + if (pWindowResInfo == NULL || pWindowResInfo->capacity == 0 || pWindowResInfo->size == 0) { + return; + } + + int32_t numOfClosed = numOfClosedTimeWindow(pWindowResInfo); + clearFirstNTimeWindow(pRuntimeEnv, numOfClosed); +} + int32_t numOfClosedTimeWindow(SWindowResInfo *pWindowResInfo) { int32_t i = 0; while (i < pWindowResInfo->size && pWindowResInfo->pResult[i].status.closed) { @@ -4031,9 +4074,8 @@ static int32_t doSkipDataBlock(SQueryRuntimeEnv *pRuntimeEnv) { void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); assert(pBlock != NULL); - int32_t blockType = IS_DISK_DATA_BLOCK(pQuery) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; - SBlockInfo blockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, blockType); - + SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv); + int32_t maxReads = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.size - pQuery->pos : pQuery->pos + 1; assert(maxReads >= 0); @@ -4062,9 +4104,9 @@ void forwardQueryStartPosition(SQueryRuntimeEnv *pRuntimeEnv) { } void *pBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); - - int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; - SBlockInfo blockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, blockType); + assert(pBlock != NULL); + + SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv); // get the qualified data that can be skipped int32_t maxReads = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.size - pQuery->pos : pQuery->pos + 1; @@ -4099,119 +4141,88 @@ static bool forwardQueryStartPosIfNeeded(SQInfo *pQInfo, STableQuerySupportObj * * not valid. otherwise, we only forward pQuery->limit.offset number of points */ if (pQuery->intervalTime > 0) { - // while (1) { - // /* - // * the skey may not be the aligned start time - // * 1. it is the value of first existed data point, therefore, the range - // * between skey and ekey may be less than the interval value. - // * 2. the ekey may not be the actual end value of time interval, in case of the - // */ - // if (QUERY_IS_ASC_QUERY(pQuery)) { - // pQuery->skey = pQuery->ekey + 1; - // } else { - // pQuery->skey = pQuery->ekey - 1; - // } - // - // // boundary check - // if ((pQuery->skey > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || - // (pQuery->skey < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) { - // setQueryStatus(pQuery, QUERY_COMPLETED); - // - // sem_post(&pQInfo->dataReady); - // pQInfo->over = 1; - // return false; - // } - // - // /* - // * NOTE: the end key must be set the last value, to cover all possible - // * data. Otherwise, it may contain no data with only one interval time range - // */ - // pQuery->ekey = pSupporter->rawEKey; - // pQuery->lastKey = pQuery->skey; - // - // // todo opt performance - // if (normalizedFirstQueryRange(dataInDisk, dataInCache, pSupporter, NULL, NULL) == false) { - // sem_post(&pQInfo->dataReady); // hack for next read for empty return - // pQInfo->over = 1; - // return false; - // } - // - // if (--pQuery->limit.offset == 0) { - // break; - // } - // } - int16_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); __block_search_fn_t searchFn = vnodeSearchKeyFunc[pRuntimeEnv->pMeterObj->searchAlgorithm]; - - STimeWindow win = getActiveTimeWindow(&pRuntimeEnv->windowResInfo, pRuntimeEnv->windowResInfo.prevSKey, pQuery); + SWindowResInfo* pWindowResInfo = &pRuntimeEnv->windowResInfo; + + TSKEY* primaryKey = (TSKEY*) pRuntimeEnv->primaryColBuffer->data; + STimeWindow win = getActiveTimeWindow(pWindowResInfo, pWindowResInfo->prevSKey, pQuery); while (pQuery->limit.offset > 0) { - void *pBlock = getGenericDataBlock(pRuntimeEnv->pMeterObj, pRuntimeEnv, pQuery->slot); + SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv); - int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; - SBlockInfo blockInfo = getBlockBasicInfo(pRuntimeEnv, pBlock, blockType); - - if (pQuery->ekey < blockInfo.keyLast) { - break; - } - + // time window ended in current data block if (win.ekey <= blockInfo.keyLast) { - while(1) { - getNextTimeWindow(pRuntimeEnv, &win); - - if ((win.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (win.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - break; - } - - TSKEY startKey = QUERY_IS_ASC_QUERY(pQuery) ? win.skey : win.ekey; - int32_t startPos = searchFn((char *)primaryKeyCol, blockInfo.size, startKey, pQuery->order.order); - - /* - * This time window does not cover any data, try next time window - * when the time window is too small, this case may happen - */ - if ((primaryKeyCol[startPos] > win.ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (primaryKeyCol[startPos] < win.skey && !QUERY_IS_ASC_QUERY(pQuery))) { - continue; - } - - break; - } - - if ((win.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || - (win.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { - break; - } - pQuery->limit.offset -= 1; - + if (pQuery->limit.offset == 0) { + int32_t k = 1; + } if (win.ekey == blockInfo.keyLast) { moveToNextBlock(pRuntimeEnv, step, searchFn, false); if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { break; } + + // next block does not included in time range, abort query + blockInfo = getBlockInfo(pRuntimeEnv); + if ((blockInfo.keyFirst > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (blockInfo.keyLast < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + setQueryStatus(pQuery, QUERY_COMPLETED); + break; + } + + // set the window that start from the next data block + win = getActiveTimeWindow(pWindowResInfo, blockInfo.keyFirst, pQuery); + } else { + // the time window is closed in current data block, load disk file block into memory to + // check the next time window + if (IS_DISK_DATA_BLOCK(pQuery)) { + getTimestampInDiskBlock(pRuntimeEnv, 0); + } + + STimeWindow nextWin = win; + int32_t startPos = + getNextQualifiedWindow(pRuntimeEnv, &nextWin, pWindowResInfo, &blockInfo, primaryKey, searchFn); + + if (startPos < 0) { // failed to find the qualified time window + assert((nextWin.skey > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (nextWin.ekey < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))); + + setQueryStatus(pQuery, QUERY_COMPLETED); + break; + } else { // set the abort info + pQuery->pos = startPos; + pQuery->lastKey = primaryKey[startPos]; + win = nextWin; + } } - + continue; } - + moveToNextBlock(pRuntimeEnv, step, searchFn, false); if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK)) { break; } - } - - if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK) || pQuery->limit.offset > 0) { - setQueryStatus(pQuery, QUERY_COMPLETED); + blockInfo = getBlockInfo(pRuntimeEnv); + if ((blockInfo.keyFirst > pQuery->ekey && QUERY_IS_ASC_QUERY(pQuery)) || + (blockInfo.keyLast < pQuery->ekey && !QUERY_IS_ASC_QUERY(pQuery))) { + setQueryStatus(pQuery, QUERY_COMPLETED); + break; + } + } + + if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED) || pQuery->limit.offset > 0) { + setQueryStatus(pQuery, QUERY_COMPLETED); + sem_post(&pQInfo->dataReady); // hack for next read for empty return; pQInfo->over = 1; return false; } else { - - + if (IS_DISK_DATA_BLOCK(pQuery)) { + getTimestampInDiskBlock(pRuntimeEnv, 0); + } } } else { // forward the start position for projection query forwardQueryStartPosition(&pSupporter->runtimeEnv); @@ -5290,7 +5301,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { * 1. interval query. * 2. multi-output query that may cause buffer overflow. */ - // if (pQuery->intervalTime > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { if (nextPos >= blockInfo.size || nextPos < 0) { moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA); // slot/pos/fileId is updated in moveToNextBlock function @@ -5309,10 +5319,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { } // check next block - void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); - - int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; - blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType); + blockInfo = getBlockInfo(pRuntimeEnv); if ((QUERY_IS_ASC_QUERY(pQuery) && blockInfo.keyFirst > pQuery->ekey) || (!QUERY_IS_ASC_QUERY(pQuery) && blockInfo.keyLast < pQuery->ekey)) { @@ -5330,10 +5337,7 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED | QUERY_NO_DATA_TO_CHECK)) { closeAllTimeWindow(&pRuntimeEnv->windowResInfo); } else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed - void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); - - int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; - SBlockInfo blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType); + SBlockInfo blockInfo = getBlockInfo(pRuntimeEnv); // check if need to close window result or not TSKEY t = (QUERY_IS_ASC_QUERY(pQuery)) ? blockInfo.keyFirst : blockInfo.keyLast; @@ -7796,7 +7800,7 @@ static int32_t getNumOfSubset(STableQuerySupportObj *pSupporter) { SQuery *pQuery = pSupporter->runtimeEnv.pQuery; int32_t totalSubset = 0; - if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0 && pQuery->slidingTime > 0)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || (pQuery->intervalTime > 0)) { totalSubset = numOfClosedTimeWindow(&pSupporter->runtimeEnv.windowResInfo); } else { totalSubset = pSupporter->pSidSet->numOfSubSet; @@ -7805,26 +7809,26 @@ static int32_t getNumOfSubset(STableQuerySupportObj *pSupporter) { return totalSubset; } -static int32_t doCopyFromGroupBuf(STableQuerySupportObj *pSupporter, SWindowResult *result, int32_t orderType) { +static int32_t doCopyToSData(STableQuerySupportObj *pSupporter, SWindowResult *result, int32_t orderType) { SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; SQuery * pQuery = pRuntimeEnv->pQuery; int32_t numOfResult = 0; int32_t startIdx = 0; - int32_t forward = 1; - - dTrace("QInfo:%p start to copy data to dest buf", GET_QINFO_ADDR(pSupporter->runtimeEnv.pQuery)); + int32_t step = -1; + dTrace("QInfo:%p start to copy data from windowResInfo to pQuery buf", GET_QINFO_ADDR(pQuery)); int32_t totalSubset = getNumOfSubset(pSupporter); if (orderType == TSQL_SO_ASC) { startIdx = pSupporter->subgroupIdx; - } else { // desc + step = 1; + } else {// desc order copy all data startIdx = totalSubset - pSupporter->subgroupIdx - 1; - forward = -1; + step = -1; } - for (int32_t i = startIdx; (i < totalSubset) && (i >= 0); i += forward) { + for (int32_t i = startIdx; (i < totalSubset) && (i >= 0); i += step) { if (result[i].numOfRows == 0) { pSupporter->offset = 0; pSupporter->subgroupIdx += 1; @@ -7836,8 +7840,11 @@ static int32_t doCopyFromGroupBuf(STableQuerySupportObj *pSupporter, SWindowResu int32_t numOfRowsToCopy = result[i].numOfRows - pSupporter->offset; int32_t oldOffset = pSupporter->offset; + /* + * current output space is not enough to keep all the result data of this group, only copy partial results + * to SQuery object's result buffer + */ if (numOfRowsToCopy > pQuery->pointsToRead - numOfResult) { - // current output space is not enough for the keep the data of this group numOfRowsToCopy = pQuery->pointsToRead - numOfResult; pSupporter->offset += numOfRowsToCopy; } else { @@ -7846,10 +7853,11 @@ static int32_t doCopyFromGroupBuf(STableQuerySupportObj *pSupporter, SWindowResu } for (int32_t j = 0; j < pQuery->numOfOutputCols; ++j) { - int32_t elemSize = pRuntimeEnv->pCtx[j].outputBytes; - char * outputBuf = pQuery->sdata[j]->data + numOfResult * elemSize; - char * p = getPosInResultPage(pRuntimeEnv, j, &result[i]); - memcpy(outputBuf, p + oldOffset * elemSize, elemSize * numOfRowsToCopy); + int32_t size = pRuntimeEnv->pCtx[j].outputBytes; + + char *out = pQuery->sdata[j]->data + numOfResult * size; + char *in = getPosInResultPage(pRuntimeEnv, j, &result[i]); + memcpy(out, in + oldOffset * size, size * numOfRowsToCopy); } numOfResult += numOfRowsToCopy; @@ -7858,7 +7866,7 @@ static int32_t doCopyFromGroupBuf(STableQuerySupportObj *pSupporter, SWindowResu } } - dTrace("QInfo:%p done copy data to dst buf", GET_QINFO_ADDR(pSupporter->runtimeEnv.pQuery)); + dTrace("QInfo:%p copy data to SQuery buf completed", GET_QINFO_ADDR(pQuery)); #ifdef _DEBUG_VIEW displayInterResult(pQuery->sdata, pQuery, numOfResult); @@ -7867,16 +7875,20 @@ static int32_t doCopyFromGroupBuf(STableQuerySupportObj *pSupporter, SWindowResu } /** - * copyFromGroupBuf support copy data in ascending/descending order + * copyFromWindowResToSData support copy data in ascending/descending order + * For interval query of both super table and table, copy the data in ascending order, since the output results are + * ordered in SWindowResutl already. While handling the group by query for both table and super table, + * all group result are completed already. + * * @param pQInfo * @param result */ -void copyFromGroupBuf(SQInfo *pQInfo, SWindowResult *result) { +void copyFromWindowResToSData(SQInfo *pQInfo, SWindowResult *result) { SQuery * pQuery = &pQInfo->query; STableQuerySupportObj *pSupporter = pQInfo->pTableQuerySupporter; int32_t orderType = (pQuery->pGroupbyExpr != NULL) ? pQuery->pGroupbyExpr->orderType : TSQL_SO_ASC; - int32_t numOfResult = doCopyFromGroupBuf(pSupporter, result, orderType); + int32_t numOfResult = doCopyToSData(pSupporter, result, orderType); pQuery->pointsRead += numOfResult; assert(pQuery->pointsRead <= pQuery->pointsToRead); @@ -7906,7 +7918,6 @@ void applyIntervalQueryOnBlock(STableQuerySupportObj *pSupporter, SMeterDataInfo getNumOfRowsInTimeWindow(pQuery, pBlockInfo, pPrimaryKey, pQuery->pos, pQuery->ekey, searchFn, true); int32_t numOfRes = 0; - int64_t st = taosGetTimestampUs(); if (pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) { numOfRes = rowwiseApplyAllFunctions(pRuntimeEnv, &forwardStep, pFields, pBlockInfo, pWindowResInfo); } else { @@ -7921,9 +7932,6 @@ void applyIntervalQueryOnBlock(STableQuerySupportObj *pSupporter, SMeterDataInfo } } - int64_t e = taosGetTimestampUs() - st; - printf("-------------------------------total result:%d\n", pRuntimeEnv->windowResInfo.size); - int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); if ((pQuery->lastKey > pSupporter->rawEKey && QUERY_IS_ASC_QUERY(pQuery)) || (pQuery->lastKey < pSupporter->rawEKey && !QUERY_IS_ASC_QUERY(pQuery))) { diff --git a/src/system/detail/src/vnodeQueryProcess.c b/src/system/detail/src/vnodeQueryProcess.c index 1fbaa5b5ca..b937cffbbb 100644 --- a/src/system/detail/src/vnodeQueryProcess.c +++ b/src/system/detail/src/vnodeQueryProcess.c @@ -663,7 +663,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { * we need to return it to client in the first place. */ if (pSupporter->subgroupIdx > 0) { - copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult); + copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQInfo->pointsRead += pQuery->pointsRead; if (pQuery->pointsRead > 0) { @@ -814,7 +814,7 @@ static void vnodeSTableSeqProcessor(SQInfo *pQInfo) { pQInfo->pTableQuerySupporter->subgroupIdx = 0; pQuery->pointsRead = 0; - copyFromGroupBuf(pQInfo, pWindowResInfo->pResult); + copyFromWindowResToSData(pQInfo, pWindowResInfo->pResult); } pQInfo->pointsRead += pQuery->pointsRead; @@ -915,7 +915,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { displayInterResult(pQuery->sdata, pQuery, pQuery->sdata[0]->len); #endif } else { - copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult); + copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); } pQInfo->pointsRead += pQuery->pointsRead; @@ -979,7 +979,7 @@ static void vnodeMultiMeterQueryProcessor(SQInfo *pQInfo) { #endif } } else { // not a interval query - copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult); + copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); } // handle the limitation of output buffer @@ -1090,7 +1090,6 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter while (1) { initCtxOutputBuf(pRuntimeEnv); - clearClosedTimeWindow(pRuntimeEnv); vnodeScanAllData(pRuntimeEnv); if (isQueryKilled(pQuery)) { @@ -1101,18 +1100,19 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter doFinalizeResult(pRuntimeEnv); - int64_t maxOutput = getNumOfResult(pRuntimeEnv); +// int64_t maxOutput = getNumOfResult(pRuntimeEnv); // here we can ignore the records in case of no interpolation + // todo handle offset, in case of top/bottom interval query if ((pQuery->numOfFilterCols > 0 || pRuntimeEnv->pTSBuf != NULL) && pQuery->limit.offset > 0 && pQuery->interpoType == TSDB_INTERPO_NONE) { // maxOutput <= 0, means current query does not generate any results - // todo handle offset, in case of top/bottom interval query - if (maxOutput > 0) { - pQuery->limit.offset--; - } + int32_t numOfClosed = numOfClosedTimeWindow(&pRuntimeEnv->windowResInfo); + + int32_t c = MIN(numOfClosed, pQuery->limit.offset); + clearFirstNTimeWindow(pRuntimeEnv, c); + pQuery->limit.offset -= c; } else { - // assert(0); // pQuery->pointsRead += maxOutput; // forwardCtxOutputBuf(pRuntimeEnv, maxOutput); } @@ -1126,16 +1126,16 @@ static void vnodeSingleMeterIntervalMainLooper(STableQuerySupportObj *pSupporter break; } - /* - * the scan limitation mechanism is upon here, - * 1. since there is only one(k) record is generated in one scan operation - * 2. remain space is not sufficient for next query output, abort - */ - if ((pQuery->pointsRead % pQuery->pointsToRead == 0 && pQuery->pointsRead != 0) || - ((pQuery->pointsRead + maxOutput) > pQuery->pointsToRead)) { - setQueryStatus(pQuery, QUERY_RESBUF_FULL); - break; - } +// /* +// * the scan limitation mechanism is upon here, +// * 1. since there is only one(k) record is generated in one scan operation +// * 2. remain space is not sufficient for next query output, abort +// */ +// if ((pQuery->pointsRead % pQuery->pointsToRead == 0 && pQuery->pointsRead != 0) || +// ((pQuery->pointsRead + maxOutput) > pQuery->pointsToRead)) { +// setQueryStatus(pQuery, QUERY_RESBUF_FULL); +// break; +// } } } @@ -1154,9 +1154,11 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) { vnodeSingleMeterIntervalMainLooper(pSupporter, pRuntimeEnv); if (pQuery->intervalTime > 0) { - pSupporter->subgroupIdx = 0; + pSupporter->subgroupIdx = 0; // always start from 0 pQuery->pointsRead = 0; - copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult); + copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); + + clearFirstNTimeWindow(pRuntimeEnv, pSupporter->subgroupIdx); } // the offset is handled at prepare stage if no interpolation involved @@ -1190,7 +1192,7 @@ static void vnodeSingleTableIntervalProcessor(SQInfo *pQInfo) { if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { pSupporter->subgroupIdx = 0; pQuery->pointsRead = 0; - copyFromGroupBuf(pQInfo, pRuntimeEnv->windowResInfo.pResult); + copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); } pQInfo->pointsRead += pQuery->pointsRead; @@ -1220,13 +1222,14 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) { SQuery * pQuery = &pQInfo->query; SMeterObj *pMeterObj = pQInfo->pObj; + STableQuerySupportObj* pSupporter = pQInfo->pTableQuerySupporter; + SQueryRuntimeEnv *pRuntimeEnv = &pSupporter->runtimeEnv; + + assert(pRuntimeEnv->pMeterObj == pMeterObj); dTrace("vid:%d sid:%d id:%s, query thread is created, numOfQueries:%d, QInfo:%p", pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pMeterObj->numOfQueries, pQInfo); - SQueryRuntimeEnv *pRuntimeEnv = &pQInfo->pTableQuerySupporter->runtimeEnv; - assert(pRuntimeEnv->pMeterObj == pMeterObj); - if (vnodeHasRemainResults(pQInfo)) { /* * There are remain results that are not returned due to result interpolation @@ -1258,12 +1261,16 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) { // here we have scan all qualified data in both data file and cache if (Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK | QUERY_COMPLETED)) { // continue to get push data from the group result - if (isGroupbyNormalCol(pQuery->pGroupbyExpr)) { + if (isGroupbyNormalCol(pQuery->pGroupbyExpr) || pQuery->intervalTime > 0) { pQuery->pointsRead = 0; - if (pQInfo->pTableQuerySupporter->subgroupIdx > 0) { - copyFromGroupBuf(pQInfo, pQInfo->pTableQuerySupporter->runtimeEnv.windowResInfo.pResult); + pSupporter->subgroupIdx = 0; // always start from 0 + + if (pRuntimeEnv->windowResInfo.size > 0) { + copyFromWindowResToSData(pQInfo, pRuntimeEnv->windowResInfo.pResult); pQInfo->pointsRead += pQuery->pointsRead; - + + clearFirstNTimeWindow(pRuntimeEnv, pSupporter->subgroupIdx); + if (pQuery->pointsRead > 0) { dTrace("QInfo:%p vid:%d sid:%d id:%s, %d points returned %d from group results, totalRead:%d totalReturn:%d", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQuery->pointsRead, pQInfo->pointsRead, @@ -1281,7 +1288,7 @@ void vnodeSingleTableQuery(SSchedMsg *pMsg) { dTrace("QInfo:%p vid:%d sid:%d id:%s, query over, %d points are returned", pQInfo, pMeterObj->vnode, pMeterObj->sid, pMeterObj->meterId, pQInfo->pointsRead); - vnodePrintQueryStatistics(pQInfo->pTableQuerySupporter); + vnodePrintQueryStatistics(pSupporter); sem_post(&pQInfo->dataReady); vnodeDecRefCount(pQInfo);