From 9213011d184c77ecdb6dc2035693726352000200 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Jul 2020 12:32:58 +0800 Subject: [PATCH 1/4] [td-225] add the stop check during scan files --- src/query/inc/qResultbuf.h | 1 - src/tsdb/inc/tsdbMain.h | 1 + src/tsdb/src/tsdbMemTable.c | 3 +-- src/tsdb/src/tsdbRead.c | 38 +++++++++++++++++++++++-------------- 4 files changed, 26 insertions(+), 17 deletions(-) diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index 7df39c00fd..d76659cc91 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -87,7 +87,6 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId); * @param id * @return */ -//#define getResBufPage(buf, id) ((tFilePage*)((buf)->pBuf + (buf)->pageSize * (id))) static FORCE_INLINE tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) { if (id < pResultBuf->inMemPages) { return (tFilePage*) ((char*) pResultBuf->iBuf + id * pResultBuf->pageSize); diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 40f2dac660..210d95853c 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -426,6 +426,7 @@ int tsdbUpdateFileHeader(SFile* pFile, uint32_t version); int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo); void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo); void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup); +void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); // ------------------ tsdbRWHelper.c #define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index b29cec3cf9..1b7db635b3 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -31,7 +31,6 @@ static int tsdbCommitMeta(STsdbRepo *pRepo); static void tsdbEndCommit(STsdbRepo *pRepo); static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols); -static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey); static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo); static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables); @@ -544,7 +543,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK return 0; } -static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) { +void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; } diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 5b1afe3da8..37784577c4 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -128,8 +128,7 @@ typedef struct STsdbQueryHandle { static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle); static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle); -static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, - SArray* sa); +static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock); static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order); static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win, STsdbQueryHandle* pQueryHandle); @@ -695,7 +694,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* } doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo); - doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn); + doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { /* * no data in cache, only load data from file @@ -711,6 +710,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock* cur->mixBlock = false; cur->blockCompleted = true; cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1); + pCheckInfo->lastKey = cur->lastKey; } } @@ -734,7 +734,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock cur->pos = 0; } - doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn); + doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { // the whole block is loaded in to buffer handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); } @@ -751,7 +751,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock cur->pos = pBlock->numOfRows - 1; } - doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn); + doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock); } else { handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo); } @@ -907,12 +907,12 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap pQueryHandle->cur.win.ekey = tsArray[end]; pQueryHandle->cur.lastKey = tsArray[end] + step; - + return numOfRows + num; } static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row, - STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) { + int32_t numOfCols, STable* pTable) { char* pData = NULL; // the schema version info is embeded in SDataRow @@ -973,8 +973,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, // only return the qualified data to client in terms of query time window, data rows in the same block but do not // be included in the query time window will be discarded -static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock, - SArray* sa) { +static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) { SQueryFilePos* cur = &pQueryHandle->cur; SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock); @@ -987,7 +986,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1; int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns); - STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); STable* pTable = pCheckInfo->pTableObj; int32_t endPos = cur->pos; @@ -1066,7 +1064,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) || (key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) { - copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable); + copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable); numOfRows += 1; if (cur->win.skey == TSKEY_INITIAL_VAL) { cur->win.skey = key; @@ -1406,8 +1404,21 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex int32_t numOfBlocks = 0; int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo); - + + STsdbCfg* pCfg = &pQueryHandle->pTsdb->config; + STimeWindow win = TSWINDOW_INITIALIZER; + while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) { + tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fileId, &win.skey, &win.ekey); + + // current file are not overlapped with query time window, ignore remain files + if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) || + (!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) { + tsdbDebug("%p remain files are not qualified for qrange:%"PRId64"-%"PRId64", ignore, %p", pQueryHandle, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo) + pQueryHandle->pFileGroup = NULL; + break; + } + if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) { break; } @@ -1765,7 +1776,6 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int win->skey = TSKEY_INITIAL_VAL; int64_t st = taosGetTimestampUs(); - STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb); STable* pTable = pCheckInfo->pTableObj; do { @@ -1787,7 +1797,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int } win->ekey = key; - copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable); + copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable); if (++numOfRows >= maxRowsToRead) { moveToNextRowInMem(pCheckInfo); From e57e4a6d654d968b5588b620d3a867eaa4782f2a Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Mon, 20 Jul 2020 16:06:00 +0800 Subject: [PATCH 2/4] add gcov to support coverage test. --- src/query/tests/CMakeLists.txt | 4 ++-- src/util/tests/CMakeLists.txt | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/query/tests/CMakeLists.txt b/src/query/tests/CMakeLists.txt index 0ae8600756..1856223391 100644 --- a/src/query/tests/CMakeLists.txt +++ b/src/query/tests/CMakeLists.txt @@ -11,5 +11,5 @@ IF (HEADER_GTEST_INCLUDE_DIR AND LIB_GTEST_STATIC_DIR) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(queryTest ${SOURCE_LIST}) - TARGET_LINK_LIBRARIES(queryTest taos query gtest pthread) -ENDIF() \ No newline at end of file + TARGET_LINK_LIBRARIES(queryTest taos query gtest pthread gcov) +ENDIF() diff --git a/src/util/tests/CMakeLists.txt b/src/util/tests/CMakeLists.txt index 9f66eba37a..b0b5d3013b 100644 --- a/src/util/tests/CMakeLists.txt +++ b/src/util/tests/CMakeLists.txt @@ -11,5 +11,5 @@ IF (HEADER_GTEST_INCLUDE_DIR AND LIB_GTEST_STATIC_DIR) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(utilTest ${SOURCE_LIST}) - TARGET_LINK_LIBRARIES(utilTest tutil common gtest pthread) -ENDIF() \ No newline at end of file + TARGET_LINK_LIBRARIES(utilTest tutil common gtest pthread gcov) +ENDIF() From dfe2c4fc67756167c13cfebe0de4ca515e1514b8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 20 Jul 2020 17:16:59 +0800 Subject: [PATCH 3/4] [td-225] fix memory leaks. --- src/query/inc/qResultbuf.h | 1 + src/query/src/qResultbuf.c | 4 +++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/src/query/inc/qResultbuf.h b/src/query/inc/qResultbuf.h index d76659cc91..8c8afb0957 100644 --- a/src/query/inc/qResultbuf.h +++ b/src/query/inc/qResultbuf.h @@ -42,6 +42,7 @@ typedef struct SDiskbasedResultBuf { void* iBuf; // inmemory buf void* handle; // for debug purpose + void* emptyDummyIdList; // dummy id list } SDiskbasedResultBuf; #define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L) diff --git a/src/query/src/qResultbuf.c b/src/query/src/qResultbuf.c index 0eefb106b9..de59676e59 100644 --- a/src/query/src/qResultbuf.c +++ b/src/query/src/qResultbuf.c @@ -36,6 +36,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu pResBuf->fd = FD_INITIALIZER; pResBuf->pBuf = NULL; + pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); qDebug("QInfo:%p create resBuf for output, page size:%d, initial pages:%d, %" PRId64 "bytes", handle, pResBuf->pageSize, pResBuf->numOfPages, pResBuf->totalBufSize); @@ -173,7 +174,7 @@ int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) { int32_t slot = getGroupIndex(pResultBuf, groupId); if (slot < 0) { - return taosArrayInit(1, sizeof(int32_t)); + return pResultBuf->emptyDummyIdList; } else { return taosArrayGetP(pResultBuf->list, slot); } @@ -206,6 +207,7 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) { } taosArrayDestroy(pResultBuf->list); + taosArrayDestroy(pResultBuf->emptyDummyIdList); taosHashCleanup(pResultBuf->idsTable); tfree(pResultBuf->iBuf); From ce0ffb408090d3ae64398e8d9fb77e1f9e12e33f Mon Sep 17 00:00:00 2001 From: liu0x54 Date: Tue, 21 Jul 2020 03:36:34 +0000 Subject: [PATCH 4/4] [TD-970] fix taosdemo bugs --- src/kit/taosdemo/taosdemo.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/kit/taosdemo/taosdemo.c b/src/kit/taosdemo/taosdemo.c index 305302b71a..79cfbe2a37 100644 --- a/src/kit/taosdemo/taosdemo.c +++ b/src/kit/taosdemo/taosdemo.c @@ -361,7 +361,7 @@ int main(int argc, char *argv[]) { arguments.num_of_DPT = 100000; arguments.num_of_RPR = 1000; arguments.use_metric = true; - arguments.insert_only = true; + arguments.insert_only = false; // end change argp_parse(&argp, argc, argv, 0, 0, &arguments); @@ -954,13 +954,13 @@ void *readMetric(void *sarg) { for (int i = 1; i <= m; i++) { if (i == 1) { - sprintf(tempS, "index = %d", i); + sprintf(tempS, "areaid = %d", i); } else { - sprintf(tempS, " or index = %d ", i); + sprintf(tempS, " or areaid = %d ", i); } strcat(condition, tempS); - sprintf(command, "select %s from m1 where %s", aggreFunc[j], condition); + sprintf(command, "select %s from meters where %s", aggreFunc[j], condition); printf("Where condition: %s\n", condition); fprintf(fp, "%s\n", command);