Merge branch 'develop' into feature/2.0tsdb
This commit is contained in:
commit
5716d72e32
|
@ -361,7 +361,7 @@ int main(int argc, char *argv[]) {
|
||||||
arguments.num_of_DPT = 100000;
|
arguments.num_of_DPT = 100000;
|
||||||
arguments.num_of_RPR = 1000;
|
arguments.num_of_RPR = 1000;
|
||||||
arguments.use_metric = true;
|
arguments.use_metric = true;
|
||||||
arguments.insert_only = true;
|
arguments.insert_only = false;
|
||||||
// end change
|
// end change
|
||||||
|
|
||||||
argp_parse(&argp, argc, argv, 0, 0, &arguments);
|
argp_parse(&argp, argc, argv, 0, 0, &arguments);
|
||||||
|
@ -954,13 +954,13 @@ void *readMetric(void *sarg) {
|
||||||
|
|
||||||
for (int i = 1; i <= m; i++) {
|
for (int i = 1; i <= m; i++) {
|
||||||
if (i == 1) {
|
if (i == 1) {
|
||||||
sprintf(tempS, "index = %d", i);
|
sprintf(tempS, "areaid = %d", i);
|
||||||
} else {
|
} else {
|
||||||
sprintf(tempS, " or index = %d ", i);
|
sprintf(tempS, " or areaid = %d ", i);
|
||||||
}
|
}
|
||||||
strcat(condition, tempS);
|
strcat(condition, tempS);
|
||||||
|
|
||||||
sprintf(command, "select %s from m1 where %s", aggreFunc[j], condition);
|
sprintf(command, "select %s from meters where %s", aggreFunc[j], condition);
|
||||||
|
|
||||||
printf("Where condition: %s\n", condition);
|
printf("Where condition: %s\n", condition);
|
||||||
fprintf(fp, "%s\n", command);
|
fprintf(fp, "%s\n", command);
|
||||||
|
|
|
@ -42,6 +42,7 @@ typedef struct SDiskbasedResultBuf {
|
||||||
|
|
||||||
void* iBuf; // inmemory buf
|
void* iBuf; // inmemory buf
|
||||||
void* handle; // for debug purpose
|
void* handle; // for debug purpose
|
||||||
|
void* emptyDummyIdList; // dummy id list
|
||||||
} SDiskbasedResultBuf;
|
} SDiskbasedResultBuf;
|
||||||
|
|
||||||
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
|
#define DEFAULT_INTERN_BUF_PAGE_SIZE (1024L)
|
||||||
|
@ -87,7 +88,6 @@ SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId);
|
||||||
* @param id
|
* @param id
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
//#define getResBufPage(buf, id) ((tFilePage*)((buf)->pBuf + (buf)->pageSize * (id)))
|
|
||||||
static FORCE_INLINE tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
|
static FORCE_INLINE tFilePage* getResBufPage(SDiskbasedResultBuf* pResultBuf, int32_t id) {
|
||||||
if (id < pResultBuf->inMemPages) {
|
if (id < pResultBuf->inMemPages) {
|
||||||
return (tFilePage*) ((char*) pResultBuf->iBuf + id * pResultBuf->pageSize);
|
return (tFilePage*) ((char*) pResultBuf->iBuf + id * pResultBuf->pageSize);
|
||||||
|
|
|
@ -36,6 +36,7 @@ int32_t createDiskbasedResultBuffer(SDiskbasedResultBuf** pResultBuf, int32_t nu
|
||||||
|
|
||||||
pResBuf->fd = FD_INITIALIZER;
|
pResBuf->fd = FD_INITIALIZER;
|
||||||
pResBuf->pBuf = NULL;
|
pResBuf->pBuf = NULL;
|
||||||
|
pResBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
|
||||||
|
|
||||||
qDebug("QInfo:%p create resBuf for output, page size:%d, initial pages:%d, %" PRId64 "bytes", handle,
|
qDebug("QInfo:%p create resBuf for output, page size:%d, initial pages:%d, %" PRId64 "bytes", handle,
|
||||||
pResBuf->pageSize, pResBuf->numOfPages, pResBuf->totalBufSize);
|
pResBuf->pageSize, pResBuf->numOfPages, pResBuf->totalBufSize);
|
||||||
|
@ -173,7 +174,7 @@ int32_t getNumOfRowsPerPage(SDiskbasedResultBuf* pResultBuf) { return pResultBuf
|
||||||
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
|
SIDList getDataBufPagesIdList(SDiskbasedResultBuf* pResultBuf, int32_t groupId) {
|
||||||
int32_t slot = getGroupIndex(pResultBuf, groupId);
|
int32_t slot = getGroupIndex(pResultBuf, groupId);
|
||||||
if (slot < 0) {
|
if (slot < 0) {
|
||||||
return taosArrayInit(1, sizeof(int32_t));
|
return pResultBuf->emptyDummyIdList;
|
||||||
} else {
|
} else {
|
||||||
return taosArrayGetP(pResultBuf->list, slot);
|
return taosArrayGetP(pResultBuf->list, slot);
|
||||||
}
|
}
|
||||||
|
@ -206,6 +207,7 @@ void destroyResultBuf(SDiskbasedResultBuf* pResultBuf, void* handle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pResultBuf->list);
|
taosArrayDestroy(pResultBuf->list);
|
||||||
|
taosArrayDestroy(pResultBuf->emptyDummyIdList);
|
||||||
taosHashCleanup(pResultBuf->idsTable);
|
taosHashCleanup(pResultBuf->idsTable);
|
||||||
|
|
||||||
tfree(pResultBuf->iBuf);
|
tfree(pResultBuf->iBuf);
|
||||||
|
|
|
@ -11,5 +11,5 @@ IF (HEADER_GTEST_INCLUDE_DIR AND LIB_GTEST_STATIC_DIR)
|
||||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||||
|
|
||||||
ADD_EXECUTABLE(queryTest ${SOURCE_LIST})
|
ADD_EXECUTABLE(queryTest ${SOURCE_LIST})
|
||||||
TARGET_LINK_LIBRARIES(queryTest taos query gtest pthread)
|
TARGET_LINK_LIBRARIES(queryTest taos query gtest pthread gcov)
|
||||||
ENDIF()
|
ENDIF()
|
||||||
|
|
|
@ -440,6 +440,7 @@ int tsdbUpdateFileHeader(SFile* pFile, uint32_t version);
|
||||||
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
|
int tsdbEncodeSFileInfo(void** buf, const STsdbFileInfo* pInfo);
|
||||||
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
|
void* tsdbDecodeSFileInfo(void* buf, STsdbFileInfo* pInfo);
|
||||||
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
|
void tsdbRemoveFileGroup(STsdbRepo* pRepo, SFileGroup* pFGroup);
|
||||||
|
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
|
||||||
|
|
||||||
// ------------------ tsdbRWHelper.c
|
// ------------------ tsdbRWHelper.c
|
||||||
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
|
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
|
||||||
|
|
|
@ -31,7 +31,6 @@ static int tsdbCommitMeta(STsdbRepo *pRepo);
|
||||||
static void tsdbEndCommit(STsdbRepo *pRepo);
|
static void tsdbEndCommit(STsdbRepo *pRepo);
|
||||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
|
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
|
||||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
||||||
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey);
|
|
||||||
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
|
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
|
||||||
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
|
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
|
||||||
|
|
||||||
|
@ -544,7 +543,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
|
void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TSKEY *minKey, TSKEY *maxKey) {
|
||||||
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
|
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
|
||||||
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
|
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,8 +128,7 @@ typedef struct STsdbQueryHandle {
|
||||||
|
|
||||||
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
|
static void changeQueryHandleForLastrowQuery(TsdbQueryHandleT pqHandle);
|
||||||
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
|
static void changeQueryHandleForInterpQuery(TsdbQueryHandleT pHandle);
|
||||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
|
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock);
|
||||||
SArray* sa);
|
|
||||||
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
static int32_t binarySearchForKey(char* pValue, int num, TSKEY key, int order);
|
||||||
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
|
static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int maxRowsToRead, STimeWindow* win,
|
||||||
STsdbQueryHandle* pQueryHandle);
|
STsdbQueryHandle* pQueryHandle);
|
||||||
|
@ -695,7 +694,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
|
||||||
}
|
}
|
||||||
|
|
||||||
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
doLoadFileDataBlock(pQueryHandle, pBlock, pCheckInfo);
|
||||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
|
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
||||||
} else {
|
} else {
|
||||||
/*
|
/*
|
||||||
* no data in cache, only load data from file
|
* no data in cache, only load data from file
|
||||||
|
@ -711,6 +710,7 @@ static void handleDataMergeIfNeeded(STsdbQueryHandle* pQueryHandle, SCompBlock*
|
||||||
cur->mixBlock = false;
|
cur->mixBlock = false;
|
||||||
cur->blockCompleted = true;
|
cur->blockCompleted = true;
|
||||||
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
|
cur->lastKey = binfo.window.ekey + (ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1);
|
||||||
|
pCheckInfo->lastKey = cur->lastKey;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -734,7 +734,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
||||||
cur->pos = 0;
|
cur->pos = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
|
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
||||||
} else { // the whole block is loaded in to buffer
|
} else { // the whole block is loaded in to buffer
|
||||||
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
||||||
}
|
}
|
||||||
|
@ -751,7 +751,7 @@ static bool loadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlock
|
||||||
cur->pos = pBlock->numOfRows - 1;
|
cur->pos = pBlock->numOfRows - 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock, pQueryHandle->defaultLoadColumn);
|
doMergeTwoLevelData(pQueryHandle, pCheckInfo, pBlock);
|
||||||
} else {
|
} else {
|
||||||
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
handleDataMergeIfNeeded(pQueryHandle, pBlock, pCheckInfo);
|
||||||
}
|
}
|
||||||
|
@ -907,12 +907,12 @@ static int32_t copyDataFromFileBlock(STsdbQueryHandle* pQueryHandle, int32_t cap
|
||||||
|
|
||||||
pQueryHandle->cur.win.ekey = tsArray[end];
|
pQueryHandle->cur.win.ekey = tsArray[end];
|
||||||
pQueryHandle->cur.lastKey = tsArray[end] + step;
|
pQueryHandle->cur.lastKey = tsArray[end] + step;
|
||||||
|
|
||||||
return numOfRows + num;
|
return numOfRows + num;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
|
static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity, int32_t numOfRows, SDataRow row,
|
||||||
STsdbMeta *pMeta, int32_t numOfCols, STable* pTable) {
|
int32_t numOfCols, STable* pTable) {
|
||||||
char* pData = NULL;
|
char* pData = NULL;
|
||||||
|
|
||||||
// the schema version info is embeded in SDataRow
|
// the schema version info is embeded in SDataRow
|
||||||
|
@ -973,8 +973,7 @@ static void copyOneRowFromMem(STsdbQueryHandle* pQueryHandle, int32_t capacity,
|
||||||
|
|
||||||
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
|
// only return the qualified data to client in terms of query time window, data rows in the same block but do not
|
||||||
// be included in the query time window will be discarded
|
// be included in the query time window will be discarded
|
||||||
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock,
|
static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo* pCheckInfo, SCompBlock* pBlock) {
|
||||||
SArray* sa) {
|
|
||||||
SQueryFilePos* cur = &pQueryHandle->cur;
|
SQueryFilePos* cur = &pQueryHandle->cur;
|
||||||
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
SDataBlockInfo blockInfo = GET_FILE_DATA_BLOCK_INFO(pCheckInfo, pBlock);
|
||||||
|
|
||||||
|
@ -987,7 +986,6 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
|
int32_t step = ASCENDING_TRAVERSE(pQueryHandle->order)? 1:-1;
|
||||||
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
int32_t numOfCols = taosArrayGetSize(pQueryHandle->pColumns);
|
||||||
|
|
||||||
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
|
|
||||||
STable* pTable = pCheckInfo->pTableObj;
|
STable* pTable = pCheckInfo->pTableObj;
|
||||||
|
|
||||||
int32_t endPos = cur->pos;
|
int32_t endPos = cur->pos;
|
||||||
|
@ -1066,7 +1064,7 @@ static void doMergeTwoLevelData(STsdbQueryHandle* pQueryHandle, STableCheckInfo*
|
||||||
|
|
||||||
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
if ((key < tsArray[pos] && ASCENDING_TRAVERSE(pQueryHandle->order)) ||
|
||||||
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
(key > tsArray[pos] && !ASCENDING_TRAVERSE(pQueryHandle->order))) {
|
||||||
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, pMeta, numOfCols, pTable);
|
copyOneRowFromMem(pQueryHandle, pQueryHandle->outputCapacity, numOfRows, row, numOfCols, pTable);
|
||||||
numOfRows += 1;
|
numOfRows += 1;
|
||||||
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
if (cur->win.skey == TSKEY_INITIAL_VAL) {
|
||||||
cur->win.skey = key;
|
cur->win.skey = key;
|
||||||
|
@ -1406,8 +1404,21 @@ static int32_t getDataBlocksInFilesImpl(STsdbQueryHandle* pQueryHandle, bool* ex
|
||||||
|
|
||||||
int32_t numOfBlocks = 0;
|
int32_t numOfBlocks = 0;
|
||||||
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
int32_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||||
|
|
||||||
|
STsdbCfg* pCfg = &pQueryHandle->pTsdb->config;
|
||||||
|
STimeWindow win = TSWINDOW_INITIALIZER;
|
||||||
|
|
||||||
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
|
while ((pQueryHandle->pFileGroup = tsdbGetFileGroupNext(&pQueryHandle->fileIter)) != NULL) {
|
||||||
|
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, pQueryHandle->pFileGroup->fileId, &win.skey, &win.ekey);
|
||||||
|
|
||||||
|
// current file are not overlapped with query time window, ignore remain files
|
||||||
|
if ((ASCENDING_TRAVERSE(pQueryHandle->order) && win.skey > pQueryHandle->window.ekey) ||
|
||||||
|
(!ASCENDING_TRAVERSE(pQueryHandle->order) && win.ekey < pQueryHandle->window.ekey)) {
|
||||||
|
tsdbDebug("%p remain files are not qualified for qrange:%"PRId64"-%"PRId64", ignore, %p", pQueryHandle, pQueryHandle->window.skey, pQueryHandle->window.ekey, pQueryHandle->qinfo)
|
||||||
|
pQueryHandle->pFileGroup = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
|
if ((code = getFileCompInfo(pQueryHandle, &numOfBlocks)) != TSDB_CODE_SUCCESS) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -1765,7 +1776,6 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
||||||
win->skey = TSKEY_INITIAL_VAL;
|
win->skey = TSKEY_INITIAL_VAL;
|
||||||
|
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
STsdbMeta* pMeta = tsdbGetMeta(pQueryHandle->pTsdb);
|
|
||||||
STable* pTable = pCheckInfo->pTableObj;
|
STable* pTable = pCheckInfo->pTableObj;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
|
@ -1787,7 +1797,7 @@ static int tsdbReadRowsFromCache(STableCheckInfo* pCheckInfo, TSKEY maxKey, int
|
||||||
}
|
}
|
||||||
|
|
||||||
win->ekey = key;
|
win->ekey = key;
|
||||||
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, pMeta, numOfCols, pTable);
|
copyOneRowFromMem(pQueryHandle, maxRowsToRead, numOfRows, row, numOfCols, pTable);
|
||||||
|
|
||||||
if (++numOfRows >= maxRowsToRead) {
|
if (++numOfRows >= maxRowsToRead) {
|
||||||
moveToNextRowInMem(pCheckInfo);
|
moveToNextRowInMem(pCheckInfo);
|
||||||
|
|
|
@ -11,5 +11,5 @@ IF (HEADER_GTEST_INCLUDE_DIR AND LIB_GTEST_STATIC_DIR)
|
||||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||||
|
|
||||||
ADD_EXECUTABLE(utilTest ${SOURCE_LIST})
|
ADD_EXECUTABLE(utilTest ${SOURCE_LIST})
|
||||||
TARGET_LINK_LIBRARIES(utilTest tutil common gtest pthread)
|
TARGET_LINK_LIBRARIES(utilTest tutil common gtest pthread gcov)
|
||||||
ENDIF()
|
ENDIF()
|
||||||
|
|
Loading…
Reference in New Issue