Merge pull request #1654 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
This commit is contained in:
slguan 2020-04-18 21:52:15 +08:00 committed by GitHub
commit c65b3c5ca8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 2158 additions and 544 deletions

View File

@ -111,7 +111,6 @@ typedef struct SDataCol {
int len;
int offset;
void * pData; // Original data
void * pCData; // Compressed data
} SDataCol;
typedef struct {
@ -133,9 +132,12 @@ typedef struct {
SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows);
void tdResetDataCols(SDataCols *pCols);
void tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
void tdFreeDataCols(SDataCols *pCols);
void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols);
void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop);
int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge);
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows);
#ifdef __cplusplus
}

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dataformat.h"
#include "tutil.h"
static int tdFLenFromSchema(STSchema *pSchema);
@ -338,6 +339,28 @@ void tdFreeDataCols(SDataCols *pCols) {
}
}
SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) {
SDataCols *pRet = tdNewDataCols(pDataCols->maxRowSize, pDataCols->maxCols, pDataCols->maxPoints);
if (pRet == NULL) return NULL;
pRet->numOfCols = pDataCols->numOfCols;
pRet->sversion = pDataCols->sversion;
if (keepData) pRet->numOfPoints = pDataCols->numOfPoints;
for (int i = 0; i < pDataCols->numOfCols; i++) {
pRet->cols[i].type = pDataCols->cols[i].type;
pRet->cols[i].colId = pDataCols->cols[i].colId;
pRet->cols[i].bytes = pDataCols->cols[i].bytes;
pRet->cols[i].len = pDataCols->cols[i].len;
pRet->cols[i].offset = pDataCols->cols[i].offset;
pRet->cols[i].pData = (void *)((char *)pRet->buf + ((char *)(pDataCols->cols[i].pData) - (char *)(pDataCols->buf)));
if (keepData) memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pRet->cols[i].bytes * pDataCols->numOfPoints);
}
return pRet;
}
void tdResetDataCols(SDataCols *pCols) {
pCols->numOfPoints = 0;
for (int i = 0; i < pCols->maxCols; i++) {
@ -382,6 +405,58 @@ static int tdFLenFromSchema(STSchema *pSchema) {
return ret;
}
int tdMergeDataCols(SDataCols *target, SDataCols *source) {
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints);
SDataCols *pTarget = tdDupDataCols(target, true);
if (pTarget == NULL) goto _err;
// tdResetDataCols(target);
int iter1 = 0;
int iter2 = 0;
tdMergeTwoDataCols(target,pTarget, &iter1, source, &iter2, pTarget->numOfPoints + rowsToMerge);
tdFreeDataCols(pTarget);
return 0;
_err:
tdFreeDataCols(pTarget);
return -1;
}
void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows) {
tdResetDataCols(target);
while (target->numOfPoints < tRows) {
if (*iter1 >= src1->numOfPoints && *iter2 >= src2->numOfPoints) break;
TSKEY key1 = (*iter1 >= src1->numOfPoints) ? INT64_MAX : ((TSKEY *)(src1->cols[0].pData))[*iter1];
TSKEY key2 = (*iter2 >= src2->numOfPoints) ? INT64_MAX : ((TSKEY *)(src2->cols[0].pData))[*iter2];
if (key1 < key2) {
for (int i = 0; i < src1->numOfCols; i++) {
ASSERT(target->cols[i].type == src1->cols[i].type);
memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints),
(void *)((char *)(src1->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * (*iter1)),
TYPE_BYTES[target->cols[i].type]);
target->cols[i].len += TYPE_BYTES[target->cols[i].type];
}
target->numOfPoints++;
(*iter1)++;
} else if (key1 > key2) {
for (int i = 0; i < src2->numOfCols; i++) {
ASSERT(target->cols[i].type == src2->cols[i].type);
memcpy((void *)((char *)(target->cols[i].pData) + TYPE_BYTES[target->cols[i].type] * target->numOfPoints),
(void *)((char *)(src2->cols[i].pData) + TYPE_BYTES[src2->cols[i].type] * (*iter2)),
TYPE_BYTES[target->cols[i].type]);
target->cols[i].len += TYPE_BYTES[target->cols[i].type];
}
target->numOfPoints++;
(*iter2)++;
} else {
ASSERT(false);
}
}
}

View File

@ -46,6 +46,7 @@ typedef struct {
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
typedef struct {
int8_t precision;
int8_t compression;
int32_t tsdbId;
int32_t maxTables; // maximum number of tables this repository can have
int32_t daysPerFile; // day per file sharding policy

View File

@ -220,11 +220,12 @@ STsdbFileH *tsdbInitFileH(char *dataDir, int maxFiles);
void tsdbCloseFileH(STsdbFileH *pFileH);
int tsdbCreateFile(char *dataDir, int fileId, const char *suffix, int maxTables, SFile *pFile, int writeHeader,
int toClose);
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables);
int tsdbOpenFile(SFile *pFile, int oflag);
int tsdbCloseFile(SFile *pFile);
SFileGroup *tsdbOpenFilesForCommit(STsdbFileH *pFileH, int fid);
int tsdbRemoveFileGroup(STsdbFileH *pFile, int fid);
int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname);
#define TSDB_FGROUP_ITER_FORWARD TSDB_ORDER_ASC
#define TSDB_FGROUP_ITER_BACKWARD TSDB_ORDER_DESC
@ -270,6 +271,8 @@ typedef struct {
TSKEY keyLast;
} SCompBlock;
// Maximum number of sub-blocks a super-block can have
#define TSDB_MAX_SUBBLOCKS 8
#define IS_SUPER_BLOCK(pBlock) ((pBlock)->numOfSubBlocks >= 1)
#define IS_SUB_BLOCK(pBlock) ((pBlock)->numOfSubBlocks == 0)
@ -309,17 +312,9 @@ typedef struct {
STsdbFileH *tsdbGetFile(tsdb_repo_t *pRepo);
int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast,
SDataCols *pCols);
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables);
int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf);
int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf);
int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf);
int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData);
int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast,
SDataCols *pCols);
SFileGroup *tsdbSearchFGroup(STsdbFileH *pFileH, int fid);
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
// TSDB repository definition
@ -379,6 +374,100 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo);
int32_t tsdbLockRepo(tsdb_repo_t *repo);
int32_t tsdbUnLockRepo(tsdb_repo_t *repo);
typedef enum { TSDB_WRITE_HELPER, TSDB_READ_HELPER } tsdb_rw_helper_t;
typedef struct {
tsdb_rw_helper_t type; // helper type
int maxTables;
int maxRowSize;
int maxRows;
int maxCols;
int minRowsPerFileBlock;
int maxRowsPerFileBlock;
int8_t compress;
} SHelperCfg;
typedef struct {
int fid;
TSKEY minKey;
TSKEY maxKey;
// For read/write purpose
SFile headF;
SFile dataF;
SFile lastF;
// For write purpose only
SFile nHeadF;
SFile nLastF;
} SHelperFile;
typedef struct {
int64_t uid;
int32_t tid;
int32_t sversion;
} SHelperTable;
typedef struct {
// Global configuration
SHelperCfg config;
int8_t state;
// For file set usage
SHelperFile files;
SCompIdx * pCompIdx;
// For table set usage
SHelperTable tableInfo;
SCompInfo * pCompInfo;
bool hasOldLastBlock;
// For block set usage
SCompData *pCompData;
SDataCols *pDataCols[2];
} SRWHelper;
// --------- Helper state
#define TSDB_HELPER_CLEAR_STATE 0x0 // Clear state
#define TSDB_HELPER_FILE_SET_AND_OPEN 0x1 // File is set
#define TSDB_HELPER_IDX_LOAD 0x2 // SCompIdx part is loaded
#define TSDB_HELPER_TABLE_SET 0x4 // Table is set
#define TSDB_HELPER_INFO_LOAD 0x8 // SCompInfo part is loaded
#define TSDB_HELPER_FILE_DATA_LOAD 0x10 // SCompData part is loaded
#define TSDB_HELPER_TYPE(h) ((h)->config.type)
#define helperSetState(h, s) (((h)->state) |= (s))
#define helperClearState(h, s) ((h)->state &= (~(s)))
#define helperHasState(h, s) ((((h)->state) & (s)) == (s))
#define blockAtIdx(h, idx) ((h)->pCompInfo->blocks + idx)
int tsdbInitReadHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
int tsdbInitWriteHelper(SRWHelper *pHelper, STsdbRepo *pRepo);
// int tsdbInitHelper(SRWHelper *pHelper, SHelperCfg *pCfg);
void tsdbDestroyHelper(SRWHelper *pHelper);
void tsdbResetHelper(SRWHelper *pHelper);
// --------- For set operations
int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup);
// void tsdbSetHelperTable(SRWHelper *pHelper, SHelperTable *pHelperTable, STSchema *pSchema);
void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo);
int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError);
// --------- For read operations
int tsdbLoadCompIdx(SRWHelper *pHelper, void *target);
int tsdbLoadCompInfo(SRWHelper *pHelper, void *target);
int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target);
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SDataCols *pDataCols, int blkIdx, int16_t *colIds, int numOfColIds);
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SDataCols *target);
// --------- For write operations
int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols);
int tsdbMoveLastBlockIfNeccessary(SRWHelper *pHelper);
int tsdbWriteCompInfo(SRWHelper *pHelper);
int tsdbWriteCompIdx(SRWHelper *pHelper);
#ifdef __cplusplus
}
#endif

View File

@ -21,10 +21,12 @@
#include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include <libgen.h>
#include "talgo.h"
#include "tchecksum.h"
#include "tsdbMain.h"
#include "tutil.h"
#include "talgo.h"
const char *tsdbFileSuffix[] = {
".head", // TSDB_FILE_TYPE_HEAD
@ -34,7 +36,6 @@ const char *tsdbFileSuffix[] = {
static int compFGroupKey(const void *key, const void *fgroup);
static int compFGroup(const void *arg1, const void *arg2);
static int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname);
static int tsdbWriteFileHead(SFile *pFile);
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables);
static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid);
@ -93,24 +94,36 @@ static int tsdbOpenFGroup(STsdbFileH *pFileH, char *dataDir, int fid) {
return 0;
}
int tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) {
if (pFileH->numOfFGroups >= pFileH->maxFGroups) return -1;
/**
* Create the file group if the file group not exists.
*
* @return A pointer to
*/
SFileGroup *tsdbCreateFGroup(STsdbFileH *pFileH, char *dataDir, int fid, int maxTables) {
if (pFileH->numOfFGroups >= pFileH->maxFGroups) return NULL;
SFileGroup fGroup;
SFileGroup *pFGroup = &fGroup;
if (tsdbSearchFGroup(pFileH, fid) == NULL) { // if not exists, create one
SFileGroup *pGroup = tsdbSearchFGroup(pFileH, fid);
if (pGroup == NULL) { // if not exists, create one
pFGroup->fileId = fid;
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]), type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0) {
// TODO: deal with the ERROR here, remove those creaed file
return -1;
}
if (tsdbCreateFile(dataDir, fid, tsdbFileSuffix[type], maxTables, &(pFGroup->files[type]),
type == TSDB_FILE_TYPE_HEAD ? 1 : 0, 1) < 0)
goto _err;
}
pFileH->fGroup[pFileH->numOfFGroups++] = fGroup;
qsort((void *)(pFileH->fGroup), pFileH->numOfFGroups, sizeof(SFileGroup), compFGroup);
return tsdbSearchFGroup(pFileH, fid);
}
return 0;
return pGroup;
_err:
// TODO: deal with the err here
return NULL;
}
int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) {
@ -183,27 +196,27 @@ SFileGroup *tsdbGetFileGroupNext(SFileGroupIter *pIter) {
return ret;
}
int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) {
SCompBlock *pBlock = pStartBlock;
for (int i = 0; i < numOfBlocks; i++) {
if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1;
pCols->numOfPoints += (pCompData->cols[0].len / 8);
for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) {
SCompCol *pCompCol = &(pCompData->cols[iCol]);
// pCols->numOfPoints += pBlock->numOfPoints;
int k = 0;
for (; k < pCols->numOfCols; k++) {
if (pCompCol->colId == pCols->cols[k].colId) break;
}
// int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) {
// SCompBlock *pBlock = pStartBlock;
// for (int i = 0; i < numOfBlocks; i++) {
// if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1;
// pCols->numOfPoints += (pCompData->cols[0].len / 8);
// for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) {
// SCompCol *pCompCol = &(pCompData->cols[iCol]);
// // pCols->numOfPoints += pBlock->numOfPoints;
// int k = 0;
// for (; k < pCols->numOfCols; k++) {
// if (pCompCol->colId == pCols->cols[k].colId) break;
// }
if (tsdbLoadColData(pFile, pCompCol, pBlock->offset,
(void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0)
return -1;
}
pStartBlock++;
}
return 0;
}
// if (tsdbLoadColData(pFile, pCompCol, pBlock->offset,
// (void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0)
// return -1;
// }
// pStartBlock++;
// }
// return 0;
// }
int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols) {
SCompBlock *pSuperBlock = TSDB_COMPBLOCK_AT(pCompInfo, idx);
@ -239,42 +252,42 @@ int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInf
return 0;
}
int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) {
SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
// int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables) {
// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
// if (lseek(pFile->fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1;
// TODO: need to check the correctness
return 0;
}
// if (read(pFile->fd, buf, sizeof(SCompIdx) * maxTables) < 0) return -1;
// // TODO: need to check the correctness
// return 0;
// }
int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) {
SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
// int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf) {
// SFile *pFile = &(pGroup->files[TSDB_FILE_TYPE_HEAD]);
if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1;
// if (lseek(pFile->fd, pIdx->offset, SEEK_SET) < 0) return -1;
if (read(pFile->fd, buf, pIdx->len) < 0) return -1;
// if (read(pFile->fd, buf, pIdx->len) < 0) return -1;
// TODO: need to check the correctness
// // TODO: need to check the correctness
return 0;
}
// return 0;
// }
int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) {
// assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
// int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf) {
// // assert(pBlock->numOfSubBlocks == 0 || pBlock->numOfSubBlocks == 1);
if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1;
size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols;
if (read(pFile->fd, buf, size) < 0) return -1;
// if (lseek(pFile->fd, pBlock->offset, SEEK_SET) < 0) return -1;
// size_t size = sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols;
// if (read(pFile->fd, buf, size) < 0) return -1;
return 0;
}
// return 0;
// }
int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) {
if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1;
if (read(pFile->fd, buf, pCol->len) < 0) return -1;
return 0;
}
// int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf) {
// if (lseek(pFile->fd, blockBaseOffset + pCol->offset, SEEK_SET) < 0) return -1;
// if (read(pFile->fd, buf, pCol->len) < 0) return -1;
// return 0;
// }
static int compFGroupKey(const void *key, const void *fgroup) {
int fid = *(int *)key;
@ -299,7 +312,7 @@ static int tsdbWriteFileHead(SFile *pFile) {
}
static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
int size = sizeof(SCompIdx) * maxTables;
int size = sizeof(SCompIdx) * maxTables + sizeof(TSCKSUM);
void *buf = calloc(1, size);
if (buf == NULL) return -1;
@ -308,6 +321,8 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
return -1;
}
taosCalcChecksumAppend(0, (uint8_t *)buf, size);
if (write(pFile->fd, buf, size) < 0) {
free(buf);
return -1;
@ -319,7 +334,7 @@ static int tsdbWriteHeadFileIdx(SFile *pFile, int maxTables) {
return 0;
}
static int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) {
int tsdbGetFileName(char *dataDir, int fileId, const char *suffix, char *fname) {
if (dataDir == NULL || fname == NULL) return -1;
sprintf(fname, "%s/f%d%s", dataDir, fileId, suffix);

View File

@ -12,15 +12,16 @@
#include <tlog.h>
#include <unistd.h>
// #include "taosdef.h"
// #include "disk.h"
#include "os.h"
#include "talgo.h"
#include "tsdb.h"
#include "tsdbMain.h"
#include "tscompression.h"
#define TSDB_DEFAULT_PRECISION TSDB_PRECISION_MILLI // default precision
#define IS_VALID_PRECISION(precision) (((precision) >= TSDB_PRECISION_MILLI) && ((precision) <= TSDB_PRECISION_NANO))
#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
#define TSDB_MIN_ID 0
#define TSDB_MAX_ID INT_MAX
#define TSDB_MIN_TABLES 10
@ -57,11 +58,12 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname);
static void * tsdbCommitData(void *arg);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols);
static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey);
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper,
SDataCols *pDataCols);
static TSKEY tsdbNextIterKey(SSkipListIterator *pIter);
static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey);
static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len,
int64_t uid);
// static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len,
// int64_t uid);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name)
@ -82,6 +84,7 @@ void tsdbSetDefaultCfg(STsdbCfg *pCfg) {
pCfg->maxRowsPerFileBlock = -1;
pCfg->keep = -1;
pCfg->maxCacheSize = -1;
pCfg->compression = TWO_STAGE_COMP;
}
/**
@ -397,6 +400,7 @@ int tsdbInitTableCfg(STableCfg *config, ETableType type, int64_t uid, int32_t ti
config->superUid = TSDB_INVALID_SUPER_TABLE_ID;
config->tableId.uid = uid;
config->tableId.tid = tid;
config->name = strdup("test1");
return 0;
}
@ -571,6 +575,13 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
if (!IS_VALID_PRECISION(pCfg->precision)) return -1;
}
// Check compression
if (pCfg->compression == -1) {
pCfg->compression = TSDB_DEFAULT_COMPRESSION;
} else {
if (!IS_VALID_COMPRESSION(pCfg->compression)) return -1;
}
// Check tsdbId
if (pCfg->tsdbId < 0) return -1;
@ -785,6 +796,9 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
}
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols) {
ASSERT(maxRowsToRead > 0);
if (pIter == NULL) return 0;
int numOfRows = 0;
do {
@ -823,19 +837,16 @@ static SSkipListIterator **tsdbCreateTableIters(STsdbMeta *pMeta, int maxTables)
if (pTable == NULL || pTable->imem == NULL) continue;
iters[tid] = tSkipListCreateIter(pTable->imem->pData);
if (iters[tid] == NULL) {
tsdbDestroyTableIters(iters, maxTables);
return NULL;
}
if (iters[tid] == NULL) goto _err;
if (!tSkipListIterNext(iters[tid])) {
// No data in this iterator
tSkipListDestroyIter(iters[tid]);
iters[tid] = NULL;
}
if (!tSkipListIterNext(iters[tid])) goto _err;
}
return iters;
_err:
tsdbDestroyTableIters(iters, maxTables);
return NULL;
}
static void tsdbFreeMemTable(SMemTable *pMemTable) {
@ -847,14 +858,17 @@ static void tsdbFreeMemTable(SMemTable *pMemTable) {
// Commit to file
static void *tsdbCommitData(void *arg) {
// TODO
printf("Starting to commit....\n");
STsdbRepo * pRepo = (STsdbRepo *)arg;
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbCache *pCache = pRepo->tsdbCache;
STsdbCfg * pCfg = &(pRepo->config);
STsdbCfg * pCfg = &(pRepo->config);
SDataCols * pDataCols = NULL;
SRWHelper whelper = {0};
if (pCache->imem == NULL) return NULL;
pRepo->appH.walCallBack(pRepo->appH.appH);
// Create the iterator to read from cache
SSkipListIterator **iters = tsdbCreateTableIters(pMeta, pCfg->maxTables);
if (iters == NULL) {
@ -862,23 +876,21 @@ static void *tsdbCommitData(void *arg) {
return NULL;
}
// Create a data column buffer for commit
SDataCols *pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock);
if (pDataCols == NULL) {
// TODO: deal with the error
return NULL;
}
if (tsdbInitWriteHelper(&whelper, pRepo) < 0) goto _exit;
if ((pDataCols = tdNewDataCols(pMeta->maxRowBytes, pMeta->maxCols, pCfg->maxRowsPerFileBlock)) == NULL) goto _exit;
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst, pCfg->daysPerFile, pCfg->precision);
int efid = tsdbGetKeyFileId(pCache->imem->keyLast, pCfg->daysPerFile, pCfg->precision);
// Loop to commit to each file
for (int fid = sfid; fid <= efid; fid++) {
if (tsdbCommitToFile(pRepo, fid, iters, pDataCols) < 0) {
// TODO: deal with the error here
// assert(0);
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
ASSERT(false);
goto _exit;
}
}
_exit:
tdFreeDataCols(pDataCols);
tsdbDestroyTableIters(iters, pCfg->maxTables);
@ -888,7 +900,6 @@ static void *tsdbCommitData(void *arg) {
free(pCache->imem);
pCache->imem = NULL;
pRepo->commit = 0;
// TODO: free the skiplist
for (int i = 0; i < pCfg->maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable && pTable->imem) {
@ -901,19 +912,12 @@ static void *tsdbCommitData(void *arg) {
return NULL;
}
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) {
int isNewLastFile = 0;
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SRWHelper *pHelper, SDataCols *pDataCols) {
STsdbMeta * pMeta = pRepo->tsdbMeta;
STsdbFileH *pFileH = pRepo->tsdbFileH;
STsdbCfg * pCfg = &pRepo->config;
SFile hFile, lFile;
SFileGroup *pGroup = NULL;
SCompIdx * pIndices = NULL;
SCompInfo * pCompInfo = NULL;
// size_t compInfoSize = 0;
// SCompBlock compBlock;
// SCompBlock *pBlock = &compBlock;
TSKEY minKey = 0, maxKey = 0;
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
@ -922,334 +926,93 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
if (!hasDataToCommit) return 0; // No data to commit, just return
// TODO: make it more flexible
pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + sizeof(SCompBlock) * 1000);
// Create and open files for commit
tsdbGetDataDirName(pRepo, dataDir);
if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */
}
pGroup = tsdbOpenFilesForCommit(pFileH, fid);
if (pGroup == NULL) { /* TODO */
}
tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 1);
tsdbOpenFile(&hFile, O_RDWR);
if (0 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
// TODO: make it not to write the last file every time
tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0);
isNewLastFile = 1;
}
if ((pGroup = tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables)) == NULL) goto _err;
// Load the SCompIdx
pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables);
if (pIndices == NULL) { /* TODO*/
}
if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { /* TODO */
}
lseek(hFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET);
// Open files for write/read
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) goto _err;
// Loop to commit data in each table
for (int tid = 0; tid < pCfg->maxTables; tid++) {
STable * pTable = pMeta->tables[tid];
if (pTable == NULL) continue;
SSkipListIterator *pIter = iters[tid];
SCompIdx * pIdx = &pIndices[tid];
int nNewBlocks = 0;
if (pTable == NULL || pIter == NULL) continue;
/* If no new data to write for this table, just write the old data to new file
* if there are.
*/
if (!tsdbHasDataInRange(pIter, minKey, maxKey)) {
// has old data
if (pIdx->len > 0) {
goto _table_over;
// if (isNewLastFile && pIdx->hasLast) {
if (0) {
// need to move the last block to new file
if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */
}
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */
}
tdInitDataCols(pCols, tsdbGetTableSchema(pMeta, pTable));
SCompBlock *pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks);
int nBlocks = 0;
TSDB_COMPBLOCK_GET_START_AND_SIZE(pCompInfo, pTBlock, nBlocks);
SCompData tBlock;
int64_t toffset;
int32_t tlen;
tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_LAST], pTBlock, nBlocks, pCols, &tBlock);
tsdbWriteBlockToFileImpl(&lFile, pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid);
pTBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks);
pTBlock->offset = toffset;
pTBlock->len = tlen;
pTBlock->numOfPoints = pCols->numOfPoints;
pTBlock->numOfSubBlocks = 1;
pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR);
if (nBlocks > 1) {
pIdx->len -= (sizeof(SCompBlock) * nBlocks);
}
write(hFile.fd, (void *)pCompInfo, pIdx->len);
} else {
pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR);
sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len);
hFile.info.size += pIdx->len;
}
}
continue;
}
pCompInfo->delimiter = TSDB_FILE_DELIMITER;
pCompInfo->checksum = 0;
pCompInfo->uid = pTable->tableId.uid;
// Load SCompBlock part if neccessary
// int isCompBlockLoaded = 0;
if (0) {
// if (pIdx->offset > 0) {
if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) {
// has last block || cache key overlap with commit key
pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100);
if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { /* TODO */
}
// if (pCompInfo->uid == pTable->tableId.uid) isCompBlockLoaded = 1;
} else {
// TODO: No need to load the SCompBlock part, just sendfile the SCompBlock part
// and write those new blocks to it
}
}
tdInitDataCols(pCols, tsdbGetTableSchema(pMeta, pTable));
// Set the helper and the buffer dataCols object to help to write this table
tsdbSetHelperTable(pHelper, pTable, pRepo);
tdInitDataCols(pDataCols, tsdbGetTableSchema(pMeta, pTable));
// Loop to write the data in the cache to files. If no data to write, just break the loop
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
while (1) {
tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols);
if (pCols->numOfPoints == 0) break;
int nLoop = 0;
while (true) {
int rowsRead = tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pDataCols);
assert(rowsRead >= 0);
if (pDataCols->numOfPoints == 0) break;
nLoop++;
int pointsWritten = pCols->numOfPoints;
// TODO: all write to the end of .data file
int64_t toffset = 0;
int32_t tlen = 0;
tsdbWriteBlockToFileImpl(&pGroup->files[TSDB_FILE_TYPE_DATA], pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid);
ASSERT(dataColsKeyFirst(pDataCols) >= minKey && dataColsKeyFirst(pDataCols) <= maxKey);
ASSERT(dataColsKeyLast(pDataCols) >= minKey && dataColsKeyLast(pDataCols) <= maxKey);
// Make the compBlock
SCompBlock *pTBlock = pCompInfo->blocks + nNewBlocks++;
pTBlock->offset = toffset;
pTBlock->len = tlen;
pTBlock->keyFirst = dataColsKeyFirst(pCols);
pTBlock->keyLast = dataColsKeyLast(pCols);
pTBlock->last = 0;
pTBlock->algorithm = 0;
pTBlock->numOfPoints = pCols->numOfPoints;
pTBlock->sversion = pTable->sversion;
pTBlock->numOfSubBlocks = 1;
pTBlock->numOfCols = pCols->numOfCols;
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
ASSERT(rowsWritten != 0);
if (rowsWritten < 0) goto _err;
ASSERT(rowsWritten <= pDataCols->numOfPoints);
if (dataColsKeyLast(pCols) > pIdx->maxKey) pIdx->maxKey = dataColsKeyLast(pCols);
tdPopDataColsPoints(pCols, pointsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints;
tdPopDataColsPoints(pDataCols, rowsWritten);
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pDataCols->numOfPoints;
}
ASSERT(pDataCols->numOfPoints == 0);
// Move the last block to the new .l file if neccessary
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) goto _err;
_table_over:
// Write the SCompBlock part
pIdx->offset = lseek(hFile.fd, 0, SEEK_END);
if (pIdx->len > 0) {
int bytes = tsendfile(hFile.fd, pGroup->files[TSDB_FILE_TYPE_HEAD].fd, NULL, pIdx->len);
if (bytes < pIdx->len) {
printf("Failed to send file, reason: %s\n", strerror(errno));
}
if (nNewBlocks > 0) {
write(hFile.fd, (void *)(pCompInfo->blocks), sizeof(SCompBlock) * nNewBlocks);
pIdx->len += (sizeof(SCompBlock) * nNewBlocks);
}
} else {
if (nNewBlocks > 0) {
write(hFile.fd, (void *)pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks);
pIdx->len += sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks;
}
}
pIdx->checksum = 0;
pIdx->numOfSuperBlocks += nNewBlocks;
pIdx->hasLast = 0;
if (tsdbWriteCompInfo(pHelper) < 0) goto _err;
}
// Write the SCompIdx part
if (lseek(hFile.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {/* TODO */}
if (write(hFile.fd, (void *)pIndices, sizeof(SCompIdx) * pCfg->maxTables) < 0) {/* TODO */}
if (tsdbWriteCompIdx(pHelper) < 0) goto _err;
// close the files
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbCloseFile(&pGroup->files[type]);
}
tsdbCloseFile(&hFile);
if (isNewLastFile) tsdbCloseFile(&lFile);
// TODO: replace the .head and .last file
rename(hFile.fname, pGroup->files[TSDB_FILE_TYPE_HEAD].fname);
pGroup->files[TSDB_FILE_TYPE_HEAD].info = hFile.info;
if (isNewLastFile) {
rename(lFile.fname, pGroup->files[TSDB_FILE_TYPE_LAST].fname);
pGroup->files[TSDB_FILE_TYPE_LAST].info = lFile.info;
}
if (pIndices) free(pIndices);
if (pCompInfo) free(pCompInfo);
tsdbCloseHelperFile(pHelper, 0);
// TODO: make it atomic with some methods
pGroup->files[TSDB_FILE_TYPE_HEAD] = pHelper->files.headF;
pGroup->files[TSDB_FILE_TYPE_DATA] = pHelper->files.dataF;
pGroup->files[TSDB_FILE_TYPE_LAST] = pHelper->files.lastF;
return 0;
}
static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey) {
if (pIter == NULL) return 0;
SSkipListNode *node = tSkipListIterGet(pIter);
if (node == NULL) return 0;
SDataRow row = SL_GET_NODE_DATA(node);
if (dataRowKey(row) >= minKey && dataRowKey(row) <= maxKey) return 1;
return 0;
}
static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey) {
for (int i = 0; i < nIters; i++) {
SSkipListIterator *pIter = iters[i];
if (tsdbHasDataInRange(pIter, minKey, maxKey)) return 1;
}
return 0;
}
static int tsdbWriteBlockToFileImpl(SFile *pFile, SDataCols *pCols, int pointsToWrite, int64_t *offset, int32_t *len, int64_t uid) {
size_t size = sizeof(SCompData) + sizeof(SCompCol) * pCols->numOfCols;
SCompData *pCompData = (SCompData *)malloc(size);
if (pCompData == NULL) return -1;
pCompData->delimiter = TSDB_FILE_DELIMITER;
pCompData->uid = uid;
pCompData->numOfCols = pCols->numOfCols;
*offset = lseek(pFile->fd, 0, SEEK_END);
*len = size;
int toffset = size;
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SCompCol *pCompCol = pCompData->cols + iCol;
SDataCol *pDataCol = pCols->cols + iCol;
pCompCol->colId = pDataCol->colId;
pCompCol->type = pDataCol->type;
pCompCol->offset = toffset;
// TODO: add compression
pCompCol->len = TYPE_BYTES[pCompCol->type] * pointsToWrite;
toffset += pCompCol->len;
}
// Write the block
if (write(pFile->fd, (void *)pCompData, size) < 0) goto _err;
*len += size;
for (int iCol = 0; iCol < pCols->numOfCols; iCol++) {
SDataCol *pDataCol = pCols->cols + iCol;
SCompCol *pCompCol = pCompData->cols + iCol;
if (write(pFile->fd, pDataCol->pData, pCompCol->len) < 0) goto _err;
*len += pCompCol->len;
}
tfree(pCompData);
return 0;
_err:
tfree(pCompData);
_err:
ASSERT(false);
tsdbCloseHelperFile(pHelper, 1);
return -1;
}
static int compareKeyBlock(const void *arg1, const void *arg2) {
TSKEY key = *(TSKEY *)arg1;
SCompBlock *pBlock = (SCompBlock *)arg2;
/**
* Return the next iterator key.
*
* @return the next key if iter has
* -1 if iter not
*/
static TSKEY tsdbNextIterKey(SSkipListIterator *pIter) {
if (pIter == NULL) return -1;
if (key < pBlock->keyFirst) {
return -1;
} else if (key > pBlock->keyLast) {
return 1;
SSkipListNode *node = tSkipListIterGet(pIter);
if (node == NULL) return -1;
SDataRow row = SL_GET_NODE_DATA(node);
return dataRowKey(row);
}
static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey) {
TSKEY nextKey;
for (int i = 0; i < nIters; i++) {
SSkipListIterator *pIter = iters[i];
nextKey = tsdbNextIterKey(pIter);
if (nextKey > 0 && (nextKey >= minKey && nextKey <= maxKey)) return 1;
}
return 0;
}
int tsdbWriteBlockToFile(STsdbRepo *pRepo, SFileGroup *pGroup, SCompIdx *pIdx, SCompInfo *pCompInfo, SDataCols *pCols, SCompBlock *pCompBlock, SFile *lFile, int64_t uid) {
STsdbCfg * pCfg = &(pRepo->config);
SFile * pFile = NULL;
int numOfPointsToWrite = 0;
int64_t offset = 0;
int32_t len = 0;
memset((void *)pCompBlock, 0, sizeof(SCompBlock));
if (pCompInfo == NULL) {
// Just append the data block to .data or .l or .last file
numOfPointsToWrite = pCols->numOfPoints;
if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // Write to .data file
pFile = &(pGroup->files[TSDB_FILE_TYPE_DATA]);
} else { // Write to .last or .l file
pCompBlock->last = 1;
if (lFile) {
pFile = lFile;
} else {
pFile = &(pGroup->files[TSDB_FILE_TYPE_LAST]);
}
}
tsdbWriteBlockToFileImpl(pFile, pCols, numOfPointsToWrite, &offset, &len, uid);
pCompBlock->offset = offset;
pCompBlock->len = len;
pCompBlock->algorithm = 2; // TODO : add to configuration
pCompBlock->sversion = pCols->sversion;
pCompBlock->numOfPoints = pCols->numOfPoints;
pCompBlock->numOfSubBlocks = 1;
pCompBlock->numOfCols = pCols->numOfCols;
pCompBlock->keyFirst = dataColsKeyFirst(pCols);
pCompBlock->keyLast = dataColsKeyLast(pCols);
} else {
// Need to merge the block to either the last block or the other block
TSKEY keyFirst = dataColsKeyFirst(pCols);
SCompBlock *pMergeBlock = NULL;
// Search the block to merge in
void *ptr = taosbsearch((void *)&keyFirst, (void *)(pCompInfo->blocks), sizeof(SCompBlock), pIdx->numOfSuperBlocks,
compareKeyBlock, TD_GE);
if (ptr == NULL) {
// No block greater or equal than the key, but there are data in the .last file, need to merge the last file block
// and merge the data
pMergeBlock = TSDB_COMPBLOCK_AT(pCompInfo, pIdx->numOfSuperBlocks - 1);
} else {
pMergeBlock = (SCompBlock *)ptr;
}
if (pMergeBlock->last) {
if (pMergeBlock->last + pCols->numOfPoints > pCfg->minRowsPerFileBlock) {
// Need to load the data from .last and combine data in pCols to write to .data file
} else { // Just append the block to .last or .l file
if (lFile) {
// read the block from .last file and merge with pCols, write to .l file
} else {
// tsdbWriteBlockToFileImpl();
}
}
} else { // The block need to merge in .data file
}
}
return numOfPointsToWrite;
}
}

1128
src/tsdb/src/tsdbRWHelper.c Normal file

File diff suppressed because it is too large Load Diff

View File

@ -120,6 +120,7 @@ typedef struct STsdbQueryHandle {
SFileGroup* pFileGroup;
SFileGroupIter fileIter;
SCompIdx* compIndex;
SRWHelper rhelper;
} STsdbQueryHandle;
static void tsdbInitDataBlockLoadInfo(SDataBlockLoadInfo* pBlockLoadInfo) {
@ -142,7 +143,8 @@ tsdb_query_handle_t* tsdbQueryTables(tsdb_repo_t* tsdb, STsdbQueryCond* pCond, S
pQueryHandle->order = pCond->order;
pQueryHandle->window = pCond->twindow;
pQueryHandle->pTsdb = tsdb;
pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx)),
pQueryHandle->compIndex = calloc(10000, sizeof(SCompIdx));
tsdbInitReadHelper(&pQueryHandle->rhelper, (STsdbRepo*) tsdb);
pQueryHandle->cur.fid = -1;
@ -289,14 +291,10 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
SFileGroup* fileGroup = pQueryHandle->pFileGroup;
assert(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname > 0);
if (fileGroup->files[TSDB_FILE_TYPE_HEAD].fd == FD_INITIALIZER) {
fileGroup->files[TSDB_FILE_TYPE_HEAD].fd = open(fileGroup->files[TSDB_FILE_TYPE_HEAD].fname, O_RDONLY);
} else {
assert(FD_VALID(fileGroup->files[TSDB_FILE_TYPE_HEAD].fd));
}
tsdbSetAndOpenHelperFile(&pQueryHandle->rhelper, fileGroup);
// load all the comp offset value for all tables in this file
tsdbLoadCompIdx(fileGroup, pQueryHandle->compIndex, 10000); // todo set dynamic max tables
// tsdbLoadCompIdx(fileGroup, pQueryHandle->compIndex, 10000); // todo set dynamic max tables
*numOfBlocks = 0;
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
@ -304,7 +302,7 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
for (int32_t i = 0; i < numOfTables; ++i) {
STableCheckInfo* pCheckInfo = taosArrayGet(pQueryHandle->pTableCheckInfo, i);
SCompIdx* compIndex = &pQueryHandle->compIndex[pCheckInfo->tableId.tid];
SCompIdx* compIndex = &pQueryHandle->rhelper.pCompIdx[pCheckInfo->tableId.tid];
if (compIndex->len == 0 || compIndex->numOfSuperBlocks == 0) { // no data block in this file, try next file
continue;//no data blocks in the file belongs to pCheckInfo->pTable
} else {
@ -318,8 +316,13 @@ static int32_t getFileCompInfo(STsdbQueryHandle* pQueryHandle, int32_t* numOfBlo
pCheckInfo->compSize = compIndex->len;
}
tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo);
// tsdbLoadCompBlocks(fileGroup, compIndex, pCheckInfo->pCompInfo);
STable* pTable = tsdbGetTableByUid(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->tableId.uid);
assert(pTable != NULL);
tsdbSetHelperTable(&pQueryHandle->rhelper, pTable, pQueryHandle->pTsdb);
tsdbLoadCompInfo(&(pQueryHandle->rhelper), (void *)(pCheckInfo->pCompInfo));
SCompInfo* pCompInfo = pCheckInfo->pCompInfo;
TSKEY s = MIN(pCheckInfo->lastKey, pQueryHandle->window.ekey);
@ -410,12 +413,12 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
tdInitDataCols(pCheckInfo->pDataCols, tsdbGetTableSchema(tsdbGetMeta(pQueryHandle->pTsdb), pCheckInfo->pTableObj));
SFile* pFile = &pQueryHandle->pFileGroup->files[TSDB_FILE_TYPE_DATA];
if (pFile->fd == FD_INITIALIZER) {
pFile->fd = open(pFile->fname, O_RDONLY);
}
// SFile* pFile = &pQueryHandle->pFileGroup->files[TSDB_FILE_TYPE_DATA];
// if (pFile->fd == FD_INITIALIZER) {
// pFile->fd = open(pFile->fname, O_RDONLY);
// }
if (tsdbLoadDataBlock(pFile, pBlock, 1, pCheckInfo->pDataCols, data) == 0) {
if (tsdbLoadBlockData(&(pQueryHandle->rhelper), pBlock, NULL) == 0) {
SDataBlockLoadInfo* pBlockLoadInfo = &pQueryHandle->dataBlockLoadInfo;
pBlockLoadInfo->fileGroup = pQueryHandle->pFileGroup;
@ -428,8 +431,8 @@ static bool doLoadFileDataBlock(STsdbQueryHandle* pQueryHandle, SCompBlock* pBlo
taosArrayDestroy(sa);
tfree(data);
TSKEY* d = (TSKEY*)pCheckInfo->pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData;
assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast);
// TSKEY* d = (TSKEY*)pCheckInfo->pDataCols->cols[PRIMARYKEY_TIMESTAMP_COL_INDEX].pData;
// assert(d[0] == pBlock->keyFirst && d[pBlock->numOfPoints - 1] == pBlock->keyLast);
return blockLoaded;
}
@ -585,7 +588,7 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
}
}
int32_t start = MIN(cur->pos, endPos);
// int32_t start = MIN(cur->pos, endPos);
// move the data block in the front to data block if needed
int32_t numOfCols = QH_GET_NUM_OF_COLS(pQueryHandle);
@ -597,9 +600,10 @@ static void filterDataInDataBlock(STsdbQueryHandle* pQueryHandle, STableCheckInf
SColumnInfoData* pCol = taosArrayGet(pQueryHandle->pColumns, j);
if (pCol->info.colId == colId) {
SDataCol* pDataCol = &pCols->cols[i];
memmove(pCol->pData, pDataCol->pData + pCol->info.bytes * start,
pQueryHandle->realNumOfRows * pCol->info.bytes);
// SDataCol* pDataCol = &pCols->cols[i];
pCol->pData = pQueryHandle->rhelper.pDataCols[0]->cols[i].pData;
// memmove(pCol->pData, pDataCol->pData + pCol->info.bytes * start,
// pQueryHandle->realNumOfRows * pCol->info.bytes);
break;
}
}
@ -1517,14 +1521,15 @@ void tsdbCleanupQueryHandle(tsdb_query_handle_t queryHandle) {
taosArrayDestroy(pQueryHandle->pTableCheckInfo);
tfree(pQueryHandle->compIndex);
size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
for (int32_t i = 0; i < cols; ++i) {
SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
tfree(pColInfo->pData);
}
// size_t cols = taosArrayGetSize(pQueryHandle->pColumns);
// for (int32_t i = 0; i < cols; ++i) {
// SColumnInfoData* pColInfo = taosArrayGet(pQueryHandle->pColumns, i);
// // tfree(pColInfo->pData);
// }
taosArrayDestroy(pQueryHandle->pColumns);
tfree(pQueryHandle->pDataBlockInfo);
tsdbDestroyHelper(&pQueryHandle->rhelper);
tfree(pQueryHandle);
}

View File

@ -5,12 +5,84 @@
#include "dataformat.h"
#include "tsdbMain.h"
double getCurTime() {
static double getCurTime() {
struct timeval tv;
gettimeofday(&tv, NULL);
return tv.tv_sec + tv.tv_usec * 1E-6;
}
typedef struct {
tsdb_repo_t *pRepo;
int tid;
int64_t uid;
int sversion;
TSKEY startTime;
TSKEY interval;
int totalRows;
int rowsPerSubmit;
STSchema * pSchema;
} SInsertInfo;
static int insertData(SInsertInfo *pInfo) {
SSubmitMsg *pMsg =
(SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(pInfo->pSchema) * pInfo->rowsPerSubmit);
if (pMsg == NULL) return -1;
TSKEY start_time = pInfo->startTime;
// Loop to write data
double stime = getCurTime();
for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) {
memset((void *)pMsg, 0, sizeof(SSubmitMsg));
SSubmitBlk *pBlock = pMsg->blocks;
pBlock->uid = pInfo->uid;
pBlock->tid = pInfo->tid;
pBlock->sversion = pInfo->sversion;
pBlock->len = 0;
for (int i = 0; i < pInfo->rowsPerSubmit; i++) {
// start_time += 1000;
start_time += pInfo->interval;
SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
tdInitDataRow(row, pInfo->pSchema);
for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) {
if (j == 0) { // Just for timestamp
tdAppendColVal(row, (void *)(&start_time), schemaColAt(pInfo->pSchema, j));
} else { // For int
int val = 10;
tdAppendColVal(row, (void *)(&val), schemaColAt(pInfo->pSchema, j));
}
}
pBlock->len += dataRowLen(row);
}
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
pMsg->numOfBlocks = 1;
pBlock->len = htonl(pBlock->len);
pBlock->numOfRows = htonl(pBlock->numOfRows);
pBlock->uid = htobe64(pBlock->uid);
pBlock->tid = htonl(pBlock->tid);
pBlock->sversion = htonl(pBlock->sversion);
pBlock->padding = htonl(pBlock->padding);
pMsg->length = htonl(pMsg->length);
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
pMsg->compressed = htonl(pMsg->numOfBlocks);
if (tsdbInsertData(pInfo->pRepo, pMsg) < 0) {
tfree(pMsg);
return -1;
}
}
double etime = getCurTime();
printf("Spent %f seconds to write %d records\n", etime - stime, pInfo->totalRows);
tfree(pMsg);
return 0;
}
TEST(TsdbTest, DISABLED_tableEncodeDecode) {
// TEST(TsdbTest, tableEncodeDecode) {
STable *pTable = (STable *)malloc(sizeof(STable));
@ -48,135 +120,132 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) {
ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0);
}
TEST(TsdbTest, DISABLED_createRepo) {
// TEST(TsdbTest, createRepo) {
// STsdbCfg config;
// TEST(TsdbTest, DISABLED_createRepo) {
TEST(TsdbTest, createRepo) {
STsdbCfg config;
STsdbRepo *repo;
// // 1. Create a tsdb repository
// tsdbSetDefaultCfg(&config);
// tsdb_repo_t *pRepo = tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL);
// ASSERT_NE(pRepo, nullptr);
// 1. Create a tsdb repository
tsdbSetDefaultCfg(&config);
ASSERT_EQ(tsdbCreateRepo("/home/ubuntu/work/ttest/vnode0", &config, NULL), 0);
// // 2. Create a normal table
// STableCfg tCfg;
// ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1);
// ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NORMAL_TABLE, 987607499877672L, 0), 0);
tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
ASSERT_NE(pRepo, nullptr);
// int nCols = 5;
// STSchema *schema = tdNewSchema(nCols);
// 2. Create a normal table
STableCfg tCfg;
ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_SUPER_TABLE, 987607499877672L, 0), -1);
ASSERT_EQ(tsdbInitTableCfg(&tCfg, TSDB_NORMAL_TABLE, 987607499877672L, 0), 0);
// for (int i = 0; i < nCols; i++) {
// if (i == 0) {
// tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
// } else {
// tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1);
// }
// }
int nCols = 5;
STSchema *schema = tdNewSchema(nCols);
// tsdbTableSetSchema(&tCfg, schema, true);
// tsdbCreateTable(pRepo, &tCfg);
// // // 3. Loop to write some simple data
// int nRows = 1;
// int rowsPerSubmit = 1;
// int64_t start_time = 1584081000000;
// SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit);
// double stime = getCurTime();
// for (int k = 0; k < nRows/rowsPerSubmit; k++) {
// memset((void *)pMsg, 0, sizeof(SSubmitMsg));
// SSubmitBlk *pBlock = pMsg->blocks;
// pBlock->uid = 987607499877672L;
// pBlock->tid = 0;
// pBlock->sversion = 0;
// pBlock->len = 0;
// for (int i = 0; i < rowsPerSubmit; i++) {
// // start_time += 1000;
// start_time += 1000;
// SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
// tdInitDataRow(row, schema);
// for (int j = 0; j < schemaNCols(schema); j++) {
// if (j == 0) { // Just for timestamp
// tdAppendColVal(row, (void *)(&start_time), schemaColAt(schema, j));
// } else { // For int
// int val = 10;
// tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j));
// }
// }
// pBlock->len += dataRowLen(row);
// }
// pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
// pMsg->numOfBlocks = 1;
// pBlock->len = htonl(pBlock->len);
// pBlock->numOfRows = htonl(pBlock->numOfRows);
// pBlock->uid = htobe64(pBlock->uid);
// pBlock->tid = htonl(pBlock->tid);
// pBlock->sversion = htonl(pBlock->sversion);
// pBlock->padding = htonl(pBlock->padding);
// pMsg->length = htonl(pMsg->length);
// pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
// pMsg->compressed = htonl(pMsg->numOfBlocks);
// tsdbInsertData(pRepo, pMsg);
// }
// double etime = getCurTime();
// void *ptr = malloc(150000);
// free(ptr);
// printf("Spent %f seconds to write %d records\n", etime - stime, nRows);
// tsdbCloseRepo(pRepo);
}
// TEST(TsdbTest, DISABLED_openRepo) {
TEST(TsdbTest, openRepo) {
tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL);
ASSERT_NE(repo, nullptr);
STsdbRepo *pRepo = (STsdbRepo *)repo;
SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1655);
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
tsdbOpenFile(&pGroup->files[type], O_RDONLY);
for (int i = 0; i < nCols; i++) {
if (i == 0) {
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
} else {
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1);
}
}
SCompIdx *pIdx = (SCompIdx *)calloc(pRepo->config.maxTables, sizeof(SCompIdx));
tsdbLoadCompIdx(pGroup, (void *)pIdx, pRepo->config.maxTables);
tsdbTableSetSchema(&tCfg, schema, true);
SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len);
tsdbCreateTable(pRepo, &tCfg);
tsdbLoadCompBlocks(pGroup, &pIdx[1], (void *)pCompInfo);
// Insert Some Data
SInsertInfo iInfo = {
.pRepo = pRepo,
.tid = tCfg.tableId.tid,
.uid = tCfg.tableId.uid,
.sversion = tCfg.sversion,
.startTime = 1584081000000,
.interval = 1000,
.totalRows = 50,
.rowsPerSubmit = 1,
.pSchema = schema
};
int blockIdx = 0;
SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx]);
ASSERT_EQ(insertData(&iInfo), 0);
SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
// Close the repository
tsdbCloseRepo(pRepo);
tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData);
// Open the repository again
pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
repo = (STsdbRepo *)pRepo;
ASSERT_NE(pRepo, nullptr);
STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid);
SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5, 10);
tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable));
// Insert more data
iInfo.startTime = iInfo.startTime + iInfo.interval * iInfo.totalRows;
iInfo.totalRows = 10;
iInfo.pRepo = pRepo;
ASSERT_EQ(insertData(&iInfo), 0);
tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData);
// Close the repository
tsdbCloseRepo(pRepo);
tdResetDataCols(pDataCols);
// Open the repository again
pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0", NULL);
repo = (STsdbRepo *)pRepo;
ASSERT_NE(pRepo, nullptr);
tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock + 1, 1, pDataCols, pCompData);
// Read from file
SRWHelper rhelper;
tsdbInitReadHelper(&rhelper, repo);
SFileGroup *pFGroup = tsdbSearchFGroup(repo->tsdbFileH, 1833);
ASSERT_NE(pFGroup, nullptr);
ASSERT_GE(tsdbSetAndOpenHelperFile(&rhelper, pFGroup), 0);
STable *pTable = tsdbGetTableByUid(repo->tsdbMeta, tCfg.tableId.uid);
ASSERT_NE(pTable, nullptr);
tsdbSetHelperTable(&rhelper, pTable, repo);
ASSERT_EQ(tsdbLoadCompInfo(&rhelper, NULL), 0);
ASSERT_EQ(tsdbLoadBlockData(&rhelper, blockAtIdx(&rhelper, 0), NULL), 0);
int k = 0;
}
TEST(TsdbTest, DISABLED_openRepo) {
// TEST(TsdbTest, openRepo) {
// tsdb_repo_t *repo = tsdbOpenRepo("/home/ubuntu/work/build/test/data/vnode/vnode2/tsdb", NULL);
// ASSERT_NE(repo, nullptr);
// STsdbRepo *pRepo = (STsdbRepo *)repo;
// SFileGroup *pGroup = tsdbSearchFGroup(pRepo->tsdbFileH, 1655);
// for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
// tsdbOpenFile(&pGroup->files[type], O_RDONLY);
// }
// SCompIdx *pIdx = (SCompIdx *)calloc(pRepo->config.maxTables, sizeof(SCompIdx));
// tsdbLoadCompIdx(pGroup, (void *)pIdx, pRepo->config.maxTables);
// SCompInfo *pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + pIdx[1].len);
// tsdbLoadCompBlocks(pGroup, &pIdx[1], (void *)pCompInfo);
// int blockIdx = 0;
// SCompBlock *pBlock = &(pCompInfo->blocks[blockIdx]);
// SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * pBlock->numOfCols);
// tsdbLoadCompCols(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, (void *)pCompData);
// STable *pTable = tsdbGetTableByUid(pRepo->tsdbMeta, pCompData->uid);
// SDataCols *pDataCols = tdNewDataCols(tdMaxRowBytesFromSchema(tsdbGetTableSchema(pRepo->tsdbMeta, pTable)), 5, 10);
// tdInitDataCols(pDataCols, tsdbGetTableSchema(pRepo->tsdbMeta, pTable));
// tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock, 1, pDataCols, pCompData);
// tdResetDataCols(pDataCols);
// tsdbLoadDataBlock(&pGroup->files[TSDB_FILE_TYPE_DATA], pBlock + 1, 1, pDataCols, pCompData);
// int k = 0;
}

View File

@ -176,6 +176,13 @@ uint32_t ip2uint(const char *const ip_addr);
void taosSetAllocMode(int mode, const char* path, bool autoDump);
void taosDumpMemoryLeak();
void * tmalloc(size_t size);
void * tcalloc(size_t nmemb, size_t size);
size_t tsizeof(void *ptr);
void tmemset(void *ptr, int c);
void * trealloc(void *ptr, size_t size);
void tzfree(void *ptr);
#ifdef TAOS_MEM_CHECK
void * taos_malloc(size_t size, const char *file, uint32_t line);

View File

@ -618,3 +618,48 @@ char *taosCharsetReplace(char *charsetstr) {
return strdup(charsetstr);
}
void *tmalloc(size_t size) {
if (size <= 0) return NULL;
void *ret = malloc(size + sizeof(size_t));
if (ret == NULL) return NULL;
*(size_t *)ret = size;
return (void *)((char *)ret + sizeof(size_t));
}
void *tcalloc(size_t nmemb, size_t size) {
size_t tsize = nmemb * size;
void * ret = tmalloc(tsize);
if (ret == NULL) return NULL;
tmemset(ret, 0);
return ret;
}
size_t tsizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; }
void tmemset(void *ptr, int c) { memset(ptr, c, tsizeof(ptr)); }
void * trealloc(void *ptr, size_t size) {
if (ptr == NULL) return tmalloc(size);
if (size <= tsizeof(ptr)) return ptr;
void * tptr = (void *)((char *)ptr - sizeof(size_t));
size_t tsize = size + sizeof(size_t);
tptr = realloc(tptr, tsize);
if (tptr == NULL) return NULL;
*(size_t *)tptr = size;
return (void *)((char *)tptr + sizeof(size_t));
}
void tzfree(void *ptr) {
if (ptr) {
free((void *)((char *)ptr - sizeof(size_t)));
}
}

View File

@ -0,0 +1,414 @@
#include <gtest/gtest.h>
#include "talgo.h"
static int compareFunc(const void *arg1, const void *arg2) { return (*(int *)arg1) - (*(int *)arg2); }
TEST(testCase, taosbsearch_equal) {
// For equal test
int key = 3;
void *pRet = NULL;
pRet = taosbsearch((void *)&key, NULL, 0, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(pRet, nullptr);
// 1 element
int array1[1] = {5};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(pRet, nullptr);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(pRet, nullptr);
key = 5;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_EQ);
ASSERT_NE(pRet, nullptr);
ASSERT_EQ(*(int *)pRet, key);
// 2 element
int array2[2] = {3, 6};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(pRet, nullptr);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(*(int *)pRet, key);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(pRet, nullptr);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(*(int *)pRet, 6);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(pRet, nullptr);
// 3 element
int array3[3] = {3, 6, 8};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(pRet, nullptr);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(*(int *)pRet, 3);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(pRet, nullptr);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(*(int *)pRet, 6);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(pRet, nullptr);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(*(int *)pRet, 8);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_EQ);
ASSERT_EQ(pRet, nullptr);
}
TEST(testCase, taosbsearch_greater_or_equal) {
// For equal test
int key = 3;
void *pRet = NULL;
pRet = taosbsearch((void *)&key, NULL, 0, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(pRet, nullptr);
// 1 element
int array1[1] = {5};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 5);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(pRet, nullptr);
key = 5;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_GE);
ASSERT_NE(pRet, nullptr);
ASSERT_EQ(*(int *)pRet, 5);
// 2 element
int array2[2] = {3, 6};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 3);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 3);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 6);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 6);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(pRet, nullptr);
// 3 element
int array3[3] = {3, 6, 8};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 3);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 3);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 6);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 6);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 8);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 8);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(pRet, nullptr);
// 4 element
int array4[4] = {3, 6, 8, 11};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 3);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 3);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 6);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 6);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 8);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 8);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 11);
key = 11;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 11);
key = 13;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(pRet, nullptr);
// 5 element
int array5[5] = {3, 6, 8, 11, 15};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 3);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 3);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 6);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 6);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 8);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 8);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 11);
key = 11;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 11);
key = 13;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 15);
key = 15;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(*(int *)pRet, 15);
key = 17;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_GE);
ASSERT_EQ(pRet, nullptr);
}
TEST(testCase, taosbsearch_less_or_equal) {
// For equal test
int key = 3;
void *pRet = NULL;
pRet = taosbsearch((void *)&key, NULL, 0, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(pRet, nullptr);
// 1 element
int array1[1] = {5};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(pRet, nullptr);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 5);
key = 5;
pRet = taosbsearch((void *)&key, (void *)array1, 1, sizeof(int), compareFunc, TD_LE);
ASSERT_NE(pRet, nullptr);
ASSERT_EQ(*(int *)pRet, 5);
// 2 element
int array2[2] = {3, 6};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(pRet, nullptr);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 3);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 3);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 6);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array2, 2, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 6);
// 3 element
int array3[3] = {3, 6, 8};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(pRet, nullptr);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 3);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 3);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 6);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 6);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 8);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array3, 3, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 8);
// 4 element
int array4[4] = {3, 6, 8, 11};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(pRet, nullptr);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 3);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 3);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 6);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 6);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 8);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 8);
key = 11;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 11);
key = 13;
pRet = taosbsearch((void *)&key, (void *)array4, 4, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 11);
// 5 element
int array5[5] = {3, 6, 8, 11, 15};
key = 1;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(pRet, nullptr);
key = 3;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 3);
key = 4;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 3);
key = 6;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 6);
key = 7;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 6);
key = 8;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 8);
key = 9;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 8);
key = 11;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 11);
key = 13;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 11);
key = 15;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 15);
key = 17;
pRet = taosbsearch((void *)&key, (void *)array5, 5, sizeof(int), compareFunc, TD_LE);
ASSERT_EQ(*(int *)pRet, 15);
}

View File

@ -88,6 +88,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
STsdbCfg tsdbCfg = {0};
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.compression = -1;
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;