From d65299430a893301d4a3c6547580761922a67d06 Mon Sep 17 00:00:00 2001 From: hjxilinx Date: Fri, 14 Feb 2020 15:53:59 +0800 Subject: [PATCH] add the test code for hashtable --- src/system/detail/src/vnodeQueryImpl.c | 71 ++++++++++++++++---------- 1 file changed, 44 insertions(+), 27 deletions(-) diff --git a/src/system/detail/src/vnodeQueryImpl.c b/src/system/detail/src/vnodeQueryImpl.c index a05c563315..226c2df35c 100644 --- a/src/system/detail/src/vnodeQueryImpl.c +++ b/src/system/detail/src/vnodeQueryImpl.c @@ -1616,6 +1616,9 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, for (i = 0; i < pWindowResInfo->size; ++i) { SWindowResult *pResult = &pWindowResInfo->pResult[i]; + if (pResult->status.closed) { + continue; + } if ((pResult->window.ekey <= lastKey && QUERY_IS_ASC_QUERY(pQuery)) || (pResult->window.skey >= lastKey && !QUERY_IS_ASC_QUERY(pQuery))) { @@ -1632,7 +1635,6 @@ static void doCheckQueryCompleted(SQueryRuntimeEnv *pRuntimeEnv, TSKEY lastKey, } pWindowResInfo->prevSKey = skey; - assert(skey != 0); // the number of completed slots are larger than the threshold, dump to client immediately. int32_t v = numOfClosedSlidingWindow(pWindowResInfo); @@ -2321,9 +2323,6 @@ static int32_t rowwiseApplyAllFunctions(SQueryRuntimeEnv *pRuntimeEnv, int32_t * free(sasArray); - lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst; - doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); - /* * No need to calculate the number of output results for group-by normal columns, interval query * because the results of group by normal column is put into intermediate buffer. @@ -2436,7 +2435,10 @@ static int32_t applyFunctionsOnBlock(SQueryRuntimeEnv *pRuntimeEnv, SBlockInfo * *numOfRes = blockwiseApplyAllFunctions(pRuntimeEnv, newForwardStep, pPrimaryColumn, pFields, pBlockInfo, pWindowResInfo, searchFn); } - + + TSKEY lastKey = (QUERY_IS_ASC_QUERY(pQuery)) ? pBlockInfo->keyLast : pBlockInfo->keyFirst; + doCheckQueryCompleted(pRuntimeEnv, lastKey, pWindowResInfo); + assert(*numOfRes >= 0); // check if buffer is large enough for accommodating all qualified points @@ -5150,7 +5152,6 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { __block_search_fn_t searchFn = vnodeSearchKeyFunc[pMeterObj->searchAlgorithm]; int32_t blockLoadStatus = DISK_DATA_LOADED; - SQueryCostSummary * pSummary = &pRuntimeEnv->summary; int32_t step = GET_FORWARD_DIRECTION_FACTOR(pQuery->order.order); @@ -5193,32 +5194,33 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { * 1. interval query. * 2. multi-output query that may cause buffer overflow. */ - if (pQuery->intervalTime > 0 || - (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)/* && pQuery->checkBufferInLoop == 1*/)) { +// if (pQuery->intervalTime > 0 || Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { if (nextPos >= blockInfo.size || nextPos < 0) { moveToNextBlock(pRuntimeEnv, step, searchFn, !LOAD_DATA); - if (!Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) { +// if (!Q_STATUS_EQUAL(pQuery->over, QUERY_NO_DATA_TO_CHECK|QUERY_COMPLETED)) { // slot/pos/fileId is updated in moveToNextBlock function savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, pQuery->pos); - // check next block - void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); - - int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; - blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType); - - // check if need to close window result or not - if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) { - TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast; - doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo); - } - } +// // check next block +// void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); +// +// int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; +// blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType); +// +// // check if need to close window result or not +// if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) { +// TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast; +// doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo); +// } +// } } else { savePointPosition(&pRuntimeEnv->nextPos, pQuery->fileId, pQuery->slot, accessPos + step); } - } +// } else { +// assert(0); +// } break; } else { // query not completed, move to next block blockLoadStatus = moveToNextBlock(pRuntimeEnv, step, searchFn, LOAD_DATA); @@ -5240,17 +5242,32 @@ static int64_t doScanAllDataBlocks(SQueryRuntimeEnv *pRuntimeEnv) { break; } - // check if need to close window result or not - if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) { - TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast; - doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo); - } +// // check if need to close window result or not +// if (pQuery->intervalTime > 0 && pQuery->slidingTime > 0) { +// TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast; +// doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo); +// } if(Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { break; } } // while(1) + + if (pQuery->intervalTime > 0) { + if (Q_STATUS_EQUAL(pQuery->over, QUERY_COMPLETED|QUERY_NO_DATA_TO_CHECK)) { + closeAllSlidingWindow(&pRuntimeEnv->windowResInfo); + } else if (Q_STATUS_EQUAL(pQuery->over, QUERY_RESBUF_FULL)) { // check if window needs to be closed + void *pNextBlock = getGenericDataBlock(pMeterObj, pRuntimeEnv, pQuery->slot); + + int32_t blockType = (IS_DISK_DATA_BLOCK(pQuery)) ? BLK_FILE_BLOCK : BLK_CACHE_BLOCK; + SBlockInfo blockInfo = getBlockBasicInfo(pRuntimeEnv, pNextBlock, blockType); + + // check if need to close window result or not + TSKEY t = (QUERY_IS_ASC_QUERY(pQuery))? blockInfo.keyFirst:blockInfo.keyLast; + doCheckQueryCompleted(pRuntimeEnv, t, &pRuntimeEnv->windowResInfo); + } + } return cnt; }