Merge branch 'develop' of https://github.com/taosdata/TDengine into develop
This commit is contained in:
commit
e095a35c67
|
@ -53,14 +53,12 @@ typedef struct {
|
|||
int32_t tsdbId;
|
||||
int32_t cacheBlockSize;
|
||||
int32_t totalBlocks;
|
||||
int32_t maxTables; // maximum number of tables this repository can have
|
||||
int32_t daysPerFile; // day per file sharding policy
|
||||
int32_t keep; // day of data to keep
|
||||
int32_t keep1;
|
||||
int32_t keep2;
|
||||
int32_t minRowsPerFileBlock; // minimum rows per file block
|
||||
int32_t maxRowsPerFileBlock; // maximum rows per file block
|
||||
int32_t commitTime;
|
||||
int8_t precision;
|
||||
int8_t compression;
|
||||
} STsdbCfg;
|
||||
|
|
|
@ -37,14 +37,12 @@ static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
|
|||
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnode->cfgVersion);
|
||||
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnode->tsdbCfg.cacheBlockSize);
|
||||
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnode->tsdbCfg.totalBlocks);
|
||||
len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnode->tsdbCfg.maxTables);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnode->tsdbCfg.daysPerFile);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnode->tsdbCfg.keep);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnode->tsdbCfg.keep1);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnode->tsdbCfg.keep2);
|
||||
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnode->tsdbCfg.minRowsPerFileBlock);
|
||||
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnode->tsdbCfg.maxRowsPerFileBlock);
|
||||
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnode->tsdbCfg.commitTime);
|
||||
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnode->tsdbCfg.precision);
|
||||
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnode->tsdbCfg.compression);
|
||||
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnode->walCfg.walLevel);
|
||||
|
@ -136,12 +134,12 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
|
|||
}
|
||||
pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint;
|
||||
|
||||
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
|
||||
if (!maxTables || maxTables->type != cJSON_Number) {
|
||||
printf("vgId:%d, failed to read vnode cfg, maxTables not found\n", pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.maxTables = maxTables->valueint;
|
||||
// cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
|
||||
// if (!maxTables || maxTables->type != cJSON_Number) {
|
||||
// printf("vgId:%d, failed to read vnode cfg, maxTables not found\n", pVnode->vgId);
|
||||
// goto PARSE_OVER;
|
||||
// }
|
||||
// pVnode->tsdbCfg.maxTables = maxTables->valueint;
|
||||
|
||||
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
|
||||
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
|
||||
|
@ -185,12 +183,12 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
|
|||
}
|
||||
pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
|
||||
|
||||
cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
|
||||
if (!commitTime || commitTime->type != cJSON_Number) {
|
||||
printf("vgId:%d, failed to read vnode cfg, commitTime not found\n", pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
|
||||
// cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
|
||||
// if (!commitTime || commitTime->type != cJSON_Number) {
|
||||
// printf("vgId:%d, failed to read vnode cfg, commitTime not found\n", pVnode->vgId);
|
||||
// goto PARSE_OVER;
|
||||
// }
|
||||
// pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
|
||||
|
||||
cJSON *precision = cJSON_GetObjectItem(root, "precision");
|
||||
if (!precision || precision->type != cJSON_Number) {
|
||||
|
|
|
@ -70,6 +70,7 @@ typedef struct {
|
|||
pthread_rwlock_t rwLock;
|
||||
|
||||
int32_t nTables;
|
||||
int32_t maxTables;
|
||||
STable** tables;
|
||||
SList* superList;
|
||||
SHashObj* uidMap;
|
||||
|
@ -111,9 +112,11 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
T_REF_DECLARE();
|
||||
SRWLatch latch;
|
||||
TSKEY keyFirst;
|
||||
TSKEY keyLast;
|
||||
int64_t numOfRows;
|
||||
int32_t maxTables;
|
||||
STableData** tData;
|
||||
SList* actList;
|
||||
SList* bufBlockList;
|
||||
|
@ -304,6 +307,7 @@ typedef struct {
|
|||
|
||||
// Operations
|
||||
// ------------------ tsdbMeta.c
|
||||
#define TSDB_INIT_NTABLES 1024
|
||||
#define TABLE_TYPE(t) (t)->type
|
||||
#define TABLE_NAME(t) (t)->name
|
||||
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
|
||||
|
@ -395,6 +399,7 @@ int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
|
|||
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
||||
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem);
|
||||
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
||||
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
||||
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
|
||||
|
@ -429,7 +434,7 @@ STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
|
|||
void tsdbFreeFileH(STsdbFileH* pFileH);
|
||||
int tsdbOpenFileH(STsdbRepo* pRepo);
|
||||
void tsdbCloseFileH(STsdbRepo* pRepo);
|
||||
SFileGroup* tsdbCreateFGroupIfNeed(STsdbRepo* pRepo, char* dataDir, int fid, int maxTables);
|
||||
SFileGroup* tsdbCreateFGroupIfNeed(STsdbRepo* pRepo, char* dataDir, int fid);
|
||||
void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction);
|
||||
void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid);
|
||||
SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter);
|
||||
|
@ -511,6 +516,7 @@ void tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type, char* fname
|
|||
int tsdbLockRepo(STsdbRepo* pRepo);
|
||||
int tsdbUnlockRepo(STsdbRepo* pRepo);
|
||||
char* tsdbGetDataDirName(char* rootDir);
|
||||
int tsdbGetNextMaxTables(int tid);
|
||||
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
|
||||
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
|
||||
|
||||
|
|
|
@ -149,7 +149,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
|
|||
}
|
||||
}
|
||||
|
||||
SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int maxTables) {
|
||||
SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) {
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
|
||||
if (pFileH->nFGroups >= pFileH->maxFGroups) return NULL;
|
||||
|
|
|
@ -62,7 +62,6 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo);
|
|||
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
|
||||
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
|
||||
static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
|
||||
static int tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables);
|
||||
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
|
||||
static int keyFGroupCompFunc(const void *key, const void *fgroup);
|
||||
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
|
||||
|
@ -85,10 +84,10 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
|
|||
if (tsdbSetRepoEnv(rootDir, pCfg) < 0) return -1;
|
||||
|
||||
tsdbDebug(
|
||||
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d maxTables %d daysPerFile %d keep "
|
||||
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d daysPerFile %d keep "
|
||||
"%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d",
|
||||
pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->maxTables, pCfg->daysPerFile, pCfg->keep,
|
||||
pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression);
|
||||
pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock,
|
||||
pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -307,13 +306,6 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
|
|||
tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks);
|
||||
configChanged = true;
|
||||
}
|
||||
if (pRCfg->maxTables != pCfg->maxTables) {
|
||||
if (tsdbAlterMaxTables(pRepo, pCfg->maxTables) < 0) {
|
||||
tsdbError("vgId:%d failed to configure repo when alter maxTables since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
configChanged = true;
|
||||
}
|
||||
|
||||
if (configChanged) {
|
||||
if (tsdbSaveConfig(pRepo->rootDir, &pRepo->config) < 0) {
|
||||
|
@ -385,6 +377,18 @@ char *tsdbGetDataDirName(char *rootDir) {
|
|||
return fname;
|
||||
}
|
||||
|
||||
int tsdbGetNextMaxTables(int tid) {
|
||||
ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES);
|
||||
int maxTables = TSDB_INIT_NTABLES;
|
||||
while (true) {
|
||||
maxTables = MIN(maxTables, TSDB_MAX_TABLES);
|
||||
if (tid <= maxTables) break;
|
||||
maxTables *= 2;
|
||||
}
|
||||
|
||||
return maxTables + 1;
|
||||
}
|
||||
|
||||
STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; }
|
||||
STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; }
|
||||
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; }
|
||||
|
@ -417,17 +421,6 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// Check maxTables
|
||||
if (pCfg->maxTables == -1) {
|
||||
pCfg->maxTables = TSDB_DEFAULT_TABLES+1;
|
||||
} else {
|
||||
if (pCfg->maxTables - 1 < TSDB_MIN_TABLES || pCfg->maxTables - 1 > TSDB_MAX_TABLES) {
|
||||
tsdbError("vgId:%d invalid maxTables configuration! maxTables %d TSDB_MIN_TABLES %d TSDB_MAX_TABLES %d",
|
||||
pCfg->tsdbId, pCfg->maxTables - 1, TSDB_MIN_TABLES, TSDB_MAX_TABLES);
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
// Check daysPerFile
|
||||
if (pCfg->daysPerFile == -1) {
|
||||
pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
|
||||
|
@ -713,6 +706,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
|
|||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
int64_t points = 0;
|
||||
|
||||
ASSERT(pBlock->tid < pMeta->maxTables);
|
||||
STable *pTable = pMeta->tables[pBlock->tid];
|
||||
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
|
||||
|
||||
|
@ -779,7 +773,6 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
|||
}
|
||||
|
||||
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||
// TODO
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup *pFGroup = NULL;
|
||||
|
@ -792,7 +785,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
|||
tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_DESC);
|
||||
while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) {
|
||||
if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err;
|
||||
for (int i = 1; i < pRepo->config.maxTables; i++) {
|
||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable == NULL) continue;
|
||||
tsdbSetHelperTable(&rhelper, pTable, pRepo);
|
||||
|
@ -868,36 +861,6 @@ static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
|
||||
// TODO
|
||||
int oldMaxTables = pRepo->config.maxTables;
|
||||
if (oldMaxTables < pRepo->config.maxTables) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
pMeta->tables = realloc(pMeta->tables, maxTables * sizeof(STable *));
|
||||
memset(&pMeta->tables[oldMaxTables], 0, sizeof(STable *) * (maxTables - oldMaxTables));
|
||||
pRepo->config.maxTables = maxTables;
|
||||
|
||||
if (pRepo->mem) {
|
||||
pRepo->mem->tData = realloc(pRepo->mem->tData, maxTables * sizeof(STableData *));
|
||||
memset(POINTER_SHIFT(pRepo->mem->tData, sizeof(STableData *) * oldMaxTables), 0,
|
||||
sizeof(STableData *) * (maxTables - oldMaxTables));
|
||||
}
|
||||
|
||||
if (pRepo->imem) {
|
||||
pRepo->imem->tData = realloc(pRepo->imem->tData, maxTables * sizeof(STableData *));
|
||||
memset(POINTER_SHIFT(pRepo->imem->tData, sizeof(STableData *) * oldMaxTables), 0,
|
||||
sizeof(STableData *) * (maxTables - oldMaxTables));
|
||||
}
|
||||
|
||||
tsdbDebug("vgId:%d, tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int keyFGroupCompFunc(const void *key, const void *fgroup) {
|
||||
int fid = *(int *)key;
|
||||
SFileGroup *pFGroup = (SFileGroup *)fgroup;
|
||||
|
@ -914,7 +877,6 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
|
|||
tlen += taosEncodeVariantI32(buf, pCfg->tsdbId);
|
||||
tlen += taosEncodeFixedI32(buf, pCfg->cacheBlockSize);
|
||||
tlen += taosEncodeVariantI32(buf, pCfg->totalBlocks);
|
||||
tlen += taosEncodeVariantI32(buf, pCfg->maxTables);
|
||||
tlen += taosEncodeVariantI32(buf, pCfg->daysPerFile);
|
||||
tlen += taosEncodeVariantI32(buf, pCfg->keep);
|
||||
tlen += taosEncodeVariantI32(buf, pCfg->keep1);
|
||||
|
@ -931,7 +893,6 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
|
|||
buf = taosDecodeVariantI32(buf, &(pCfg->tsdbId));
|
||||
buf = taosDecodeFixedI32(buf, &(pCfg->cacheBlockSize));
|
||||
buf = taosDecodeVariantI32(buf, &(pCfg->totalBlocks));
|
||||
buf = taosDecodeVariantI32(buf, &(pCfg->maxTables));
|
||||
buf = taosDecodeVariantI32(buf, &(pCfg->daysPerFile));
|
||||
buf = taosDecodeVariantI32(buf, &(pCfg->keep));
|
||||
buf = taosDecodeVariantI32(buf, &(pCfg->keep1));
|
||||
|
@ -1037,7 +998,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
|
|||
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||
pBlock->numOfRows = htons(pBlock->numOfRows);
|
||||
|
||||
if (pBlock->tid <= 0 || pBlock->tid >= pRepo->config.maxTables) {
|
||||
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
|
||||
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
|
||||
pBlock->tid);
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
|
@ -1120,7 +1081,7 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
|
|||
static void tsdbStartStream(STsdbRepo *pRepo) {
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
for (int i = 0; i < pRepo->config.maxTables; i++) {
|
||||
for (int i = 0; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
|
||||
|
@ -1133,7 +1094,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
|
|||
static void tsdbStopStream(STsdbRepo *pRepo) {
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
for (int i = 0; i < pRepo->config.maxTables; i++) {
|
||||
for (int i = 0; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
|
||||
(*pRepo->appH.cqDropFunc)(pTable->cqhandle);
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);
|
||||
|
||||
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
|
||||
static SMemTable * tsdbNewMemTable(STsdbCfg *pCfg);
|
||||
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
|
||||
static void tsdbFreeMemTable(SMemTable *pMemTable);
|
||||
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
|
||||
static void tsdbFreeTableData(STableData *pTableData);
|
||||
|
@ -30,13 +30,15 @@ static void * tsdbCommitData(void *arg);
|
|||
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
||||
static void tsdbEndCommit(STsdbRepo *pRepo);
|
||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
|
||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
||||
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
|
||||
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
|
||||
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
|
||||
|
||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
int32_t level = 0;
|
||||
int32_t headSize = 0;
|
||||
TSKEY key = dataRowKey(row);
|
||||
|
@ -45,7 +47,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
|||
SSkipList * pSList = NULL;
|
||||
int bytes = 0;
|
||||
|
||||
if (pMemTable != NULL && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
|
||||
if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
|
||||
pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) {
|
||||
pTableData = pMemTable->tData[TABLE_TID(pTable)];
|
||||
pSList = pTableData->pData;
|
||||
|
@ -66,13 +68,20 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
|||
// Operations above may change pRepo->mem, retake those values
|
||||
ASSERT(pRepo->mem != NULL);
|
||||
pMemTable = pRepo->mem;
|
||||
|
||||
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
|
||||
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) return -1;;
|
||||
}
|
||||
pTableData = pMemTable->tData[TABLE_TID(pTable)];
|
||||
|
||||
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
|
||||
if (pTableData != NULL) { // destroy the table skiplist (may have race condition problem)
|
||||
taosWLockLatch(&(pMemTable->latch));
|
||||
pMemTable->tData[TABLE_TID(pTable)] = NULL;
|
||||
tsdbFreeTableData(pTableData);
|
||||
taosWUnLockLatch(&(pMemTable->latch));
|
||||
}
|
||||
|
||||
pTableData = tsdbNewTableData(pCfg, pTable);
|
||||
if (pTableData == NULL) {
|
||||
tsdbError("vgId:%d failed to insert row with key %" PRId64
|
||||
|
@ -122,7 +131,6 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
|
|||
int ref = T_REF_DEC(pMemTable);
|
||||
tsdbDebug("vgId:%d unref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
|
||||
if (ref == 0) {
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbBufPool *pBufPool = pRepo->pPool;
|
||||
|
||||
SListNode *pNode = NULL;
|
||||
|
@ -139,7 +147,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
|
|||
}
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
|
||||
for (int i = 0; i < pCfg->maxTables; i++) {
|
||||
for (int i = 0; i < pMemTable->maxTables; i++) {
|
||||
if (pMemTable->tData[i] != NULL) {
|
||||
tsdbFreeTableData(pMemTable->tData[i]);
|
||||
}
|
||||
|
@ -161,11 +169,24 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
|
|||
tsdbRefMemTable(pRepo, *pIMem);
|
||||
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
|
||||
|
||||
if (*pMem != NULL) taosRLockLatch(&((*pMem)->latch));
|
||||
|
||||
tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) {
|
||||
if (pMem != NULL) {
|
||||
taosRUnLockLatch(&(pMem->latch));
|
||||
tsdbUnRefMemTable(pRepo, pMem);
|
||||
}
|
||||
|
||||
if (pIMem != NULL) {
|
||||
tsdbUnRefMemTable(pRepo, pIMem);
|
||||
}
|
||||
}
|
||||
|
||||
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||
|
@ -182,7 +203,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
|||
}
|
||||
|
||||
if (pRepo->mem == NULL) {
|
||||
SMemTable *pMemTable = tsdbNewMemTable(&pRepo->config);
|
||||
SMemTable *pMemTable = tsdbNewMemTable(pRepo);
|
||||
if (pMemTable == NULL) return NULL;
|
||||
|
||||
if (tsdbLockRepo(pRepo) < 0) {
|
||||
|
@ -329,7 +350,9 @@ static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
|
|||
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
||||
}
|
||||
|
||||
static SMemTable* tsdbNewMemTable(STsdbCfg* pCfg) {
|
||||
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
SMemTable *pMemTable = (SMemTable *)calloc(1, sizeof(*pMemTable));
|
||||
if (pMemTable == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
|
@ -340,7 +363,8 @@ static SMemTable* tsdbNewMemTable(STsdbCfg* pCfg) {
|
|||
pMemTable->keyLast = 0;
|
||||
pMemTable->numOfRows = 0;
|
||||
|
||||
pMemTable->tData = (STableData**)calloc(pCfg->maxTables, sizeof(STableData*));
|
||||
pMemTable->maxTables = pMeta->maxTables;
|
||||
pMemTable->tData = (STableData **)calloc(pMemTable->maxTables, sizeof(STableData *));
|
||||
if (pMemTable->tData == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
|
@ -398,9 +422,6 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// TODO: operation here should not be here, remove it
|
||||
pTableData->pData->level = 1;
|
||||
|
||||
return pTableData;
|
||||
|
||||
_err:
|
||||
|
@ -473,7 +494,7 @@ static void *tsdbCommitData(void *arg) {
|
|||
|
||||
_exit:
|
||||
tdFreeDataCols(pDataCols);
|
||||
tsdbDestroyCommitIters(iters, pCfg->maxTables);
|
||||
tsdbDestroyCommitIters(iters, pMem->maxTables);
|
||||
tsdbDestroyHelper(&whelper);
|
||||
tsdbEndCommit(pRepo);
|
||||
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
|
||||
|
@ -552,12 +573,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup *pGroup = NULL;
|
||||
SMemTable * pMem = pRepo->imem;
|
||||
|
||||
TSKEY minKey = 0, maxKey = 0;
|
||||
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
|
||||
|
||||
// Check if there are data to commit to this file
|
||||
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
|
||||
int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey);
|
||||
if (!hasDataToCommit) {
|
||||
tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
|
||||
return 0;
|
||||
|
@ -570,7 +592,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
return -1;
|
||||
}
|
||||
|
||||
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid, pCfg->maxTables)) == NULL) {
|
||||
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
|
||||
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
@ -582,7 +604,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
}
|
||||
|
||||
// Loop to commit data in each table
|
||||
for (int tid = 1; tid < pCfg->maxTables; tid++) {
|
||||
for (int tid = 1; tid < pMem->maxTables; tid++) {
|
||||
SCommitIter *pIter = iters + tid;
|
||||
if (pIter->pTable == NULL) continue;
|
||||
|
||||
|
@ -643,11 +665,10 @@ _err:
|
|||
}
|
||||
|
||||
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
|
||||
STsdbCfg * pCfg = &(pRepo->config);
|
||||
SMemTable *pMem = pRepo->imem;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
SCommitIter *iters = (SCommitIter *)calloc(pCfg->maxTables, sizeof(SCommitIter));
|
||||
SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
|
||||
if (iters == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
@ -656,7 +677,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
|
|||
if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;
|
||||
|
||||
// reference all tables
|
||||
for (int i = 0; i < pCfg->maxTables; i++) {
|
||||
for (int i = 0; i < pMem->maxTables; i++) {
|
||||
if (pMeta->tables[i] != NULL) {
|
||||
tsdbRefTable(pMeta->tables[i]);
|
||||
iters[i].pTable = pMeta->tables[i];
|
||||
|
@ -665,7 +686,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
|
|||
|
||||
if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
|
||||
|
||||
for (int i = 0; i < pCfg->maxTables; i++) {
|
||||
for (int i = 0; i < pMem->maxTables; i++) {
|
||||
if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
|
||||
if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
|
@ -679,7 +700,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
|
|||
return iters;
|
||||
|
||||
_err:
|
||||
tsdbDestroyCommitIters(iters, pCfg->maxTables);
|
||||
tsdbDestroyCommitIters(iters, pMem->maxTables);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -694,4 +715,26 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
|
|||
}
|
||||
|
||||
free(iters);
|
||||
}
|
||||
|
||||
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
|
||||
ASSERT(pMemTable->maxTables < maxTables);
|
||||
|
||||
STableData **pTableData = (STableData **)calloc(maxTables, sizeof(STableData *));
|
||||
if (pTableData == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
memcpy((void *)pTableData, (void *)pMemTable->tData, sizeof(STableData *) * pMemTable->maxTables);
|
||||
|
||||
STableData **tData = pMemTable->tData;
|
||||
|
||||
taosWLockLatch(&(pMemTable->latch));
|
||||
pMemTable->maxTables = maxTables;
|
||||
pMemTable->tData = pTableData;
|
||||
taosWUnLockLatch(&(pMemTable->latch));
|
||||
|
||||
tfree(tData);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -49,6 +49,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable);
|
|||
static void * tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable);
|
||||
static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable);
|
||||
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
|
||||
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
|
||||
|
||||
// ------------------ OUTER FUNCTIONS ------------------
|
||||
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
|
||||
|
@ -60,13 +61,13 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
|
|||
int tid = pCfg->tableId.tid;
|
||||
STable * pTable = NULL;
|
||||
|
||||
if (tid < 0 || tid >= pRepo->config.maxTables) {
|
||||
if (tid < 1 || tid > TSDB_MAX_TABLES) {
|
||||
tsdbError("vgId:%d failed to create table since invalid tid %d", REPO_ID(pRepo), tid);
|
||||
terrno = TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (pMeta->tables[tid] != NULL) {
|
||||
if (tid < pMeta->maxTables && pMeta->tables[tid] != NULL) {
|
||||
if (TABLE_UID(pMeta->tables[tid]) == pCfg->tableId.uid) {
|
||||
tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||
TABLE_TID(pTable), TABLE_UID(pTable));
|
||||
|
@ -422,7 +423,8 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->tables = (STable **)calloc(pCfg->maxTables, sizeof(STable *));
|
||||
pMeta->maxTables = TSDB_INIT_NTABLES + 1;
|
||||
pMeta->tables = (STable **)calloc(pMeta->maxTables, sizeof(STable *));
|
||||
if (pMeta->tables == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
|
@ -434,7 +436,7 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->uidMap = taosHashInit(pCfg->maxTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||
pMeta->uidMap = taosHashInit(TSDB_INIT_NTABLES * 1.1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||
if (pMeta->uidMap == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
|
@ -484,14 +486,13 @@ _err:
|
|||
}
|
||||
|
||||
int tsdbCloseMeta(STsdbRepo *pRepo) {
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
SListNode *pNode = NULL;
|
||||
STable * pTable = NULL;
|
||||
|
||||
if (pMeta == NULL) return 0;
|
||||
tdCloseKVStore(pMeta->pStore);
|
||||
for (int i = 1; i < pCfg->maxTables; i++) {
|
||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||
tsdbFreeTable(pMeta->tables[i]);
|
||||
}
|
||||
|
||||
|
@ -624,9 +625,8 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
|
|||
static void tsdbOrgMeta(void *pHandle) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
|
||||
for (int i = 1; i < pCfg->maxTables; i++) {
|
||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable != NULL && pTable->type == TSDB_CHILD_TABLE) {
|
||||
tsdbAddTableIntoIndex(pMeta, pTable, true);
|
||||
|
@ -781,6 +781,9 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
|
|||
goto _err;
|
||||
}
|
||||
} else {
|
||||
if (TABLE_TID(pTable) >= pMeta->maxTables) {
|
||||
if (tsdbAdjustMetaTables(pRepo, TABLE_TID(pTable)) < 0) goto _err;
|
||||
}
|
||||
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index
|
||||
if (tsdbAddTableIntoIndex(pMeta, pTable, true) < 0) {
|
||||
tsdbDebug("vgId:%d failed to add table %s to meta while add table to index since %s", REPO_ID(pRepo),
|
||||
|
@ -788,6 +791,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
|
|||
goto _err;
|
||||
}
|
||||
}
|
||||
ASSERT(TABLE_TID(pTable) < pMeta->maxTables);
|
||||
pMeta->tables[TABLE_TID(pTable)] = pTable;
|
||||
pMeta->nTables++;
|
||||
}
|
||||
|
@ -827,7 +831,6 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
|
|||
SListIter lIter = {0};
|
||||
SListNode *pNode = NULL;
|
||||
STable * tTable = NULL;
|
||||
STsdbCfg * pCfg = &(pRepo->config);
|
||||
|
||||
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||
int maxCols = schemaNCols(pSchema);
|
||||
|
@ -860,7 +863,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
|
|||
if (maxCols == pMeta->maxCols || maxRowBytes == pMeta->maxRowBytes) {
|
||||
maxCols = 0;
|
||||
maxRowBytes = 0;
|
||||
for (int i = 0; i < pCfg->maxTables; i++) {
|
||||
for (int i = 0; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable != NULL) {
|
||||
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||
|
@ -1266,5 +1269,28 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
|
|||
tsdbRemoveTableFromMeta(pRepo, pTable, true, true);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid) {
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
ASSERT(tid >= pMeta->maxTables);
|
||||
|
||||
int maxTables = tsdbGetNextMaxTables(tid);
|
||||
|
||||
STable **tables = (STable **)calloc(maxTables, sizeof(STable *));
|
||||
if (tables == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy((void *)tables, (void *)pMeta->tables, sizeof(STable *) * pMeta->maxTables);
|
||||
pMeta->maxTables = maxTables;
|
||||
|
||||
STable **tTables = pMeta->tables;
|
||||
pMeta->tables = tables;
|
||||
tfree(tTables);
|
||||
tsdbDebug("vgId:%d tsdb meta maxTables is adjusted as %d", REPO_ID(pRepo), maxTables);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -1231,8 +1231,8 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl
|
|||
if (tsdbCheckAndDecodeColumnData(pDataCol, pHelper->pBuffer, pCompCol->len, pCompBlock->algorithm,
|
||||
pCompBlock->numOfRows, pHelper->pRepo->config.maxRowsPerFileBlock,
|
||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) {
|
||||
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname, pCompCol->colId,
|
||||
(int64_t)pCompCol->offset);
|
||||
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname,
|
||||
pCompCol->colId, offset);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -317,17 +317,20 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
|||
}
|
||||
|
||||
assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
|
||||
|
||||
if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
|
||||
|
||||
// TODO: add uid check
|
||||
if (pHandle->mem && pCheckInfo->tableId.tid < pHandle->mem->maxTables &&
|
||||
pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
|
||||
pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData,
|
||||
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
|
||||
(const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
|
||||
}
|
||||
|
||||
if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
|
||||
|
||||
if (pHandle->imem && pCheckInfo->tableId.tid < pHandle->imem->maxTables &&
|
||||
pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
|
||||
pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData,
|
||||
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
|
||||
(const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
|
||||
}
|
||||
|
||||
|
||||
// both iterators are NULL, no data in buffer right now
|
||||
if (pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL) {
|
||||
return false;
|
||||
|
@ -1529,7 +1532,6 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
|
|||
|
||||
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
|
||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
assert(numOfTables <= ((STsdbRepo*)pQueryHandle->pTsdb)->config.maxTables);
|
||||
|
||||
while (pQueryHandle->activeIndex < numOfTables) {
|
||||
if (hasMoreDataInCache(pQueryHandle)) {
|
||||
|
@ -2418,8 +2420,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
|
|||
tfree(pQueryHandle->statis);
|
||||
|
||||
// todo check error
|
||||
tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->mem);
|
||||
tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem);
|
||||
tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pQueryHandle->mem, pQueryHandle->imem);
|
||||
|
||||
tsdbDestroyHelper(&pQueryHandle->rhelper);
|
||||
|
||||
|
|
|
@ -98,7 +98,7 @@ static void tsdbSetCfg(STsdbCfg *pCfg, int32_t tsdbId, int32_t cacheBlockSize, i
|
|||
pCfg->tsdbId = tsdbId;
|
||||
pCfg->cacheBlockSize = cacheBlockSize;
|
||||
pCfg->totalBlocks = totalBlocks;
|
||||
pCfg->maxTables = maxTables;
|
||||
// pCfg->maxTables = maxTables;
|
||||
pCfg->daysPerFile = daysPerFile;
|
||||
pCfg->keep = keep;
|
||||
pCfg->minRowsPerFileBlock = minRows;
|
||||
|
|
|
@ -123,7 +123,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
|
||||
tsdbCfg.cacheBlockSize = pVnodeCfg->cfg.cacheBlockSize;
|
||||
tsdbCfg.totalBlocks = pVnodeCfg->cfg.totalBlocks;
|
||||
tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables;
|
||||
// tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables;
|
||||
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
|
||||
tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep;
|
||||
tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock;
|
||||
|
@ -630,14 +630,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion);
|
||||
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize);
|
||||
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks);
|
||||
len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
|
||||
// len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnodeCfg->cfg.daysPerFile);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnodeCfg->cfg.daysToKeep);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnodeCfg->cfg.daysToKeep1);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2);
|
||||
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
|
||||
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
|
||||
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
|
||||
// len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
|
||||
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
|
||||
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
|
||||
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel);
|
||||
|
@ -729,12 +729,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
|||
}
|
||||
pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint;
|
||||
|
||||
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
|
||||
if (!maxTables || maxTables->type != cJSON_Number) {
|
||||
vError("vgId:%d, failed to read vnode cfg, maxTables not found", pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.maxTables = maxTables->valueint;
|
||||
// cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
|
||||
// if (!maxTables || maxTables->type != cJSON_Number) {
|
||||
// vError("vgId:%d, failed to read vnode cfg, maxTables not found", pVnode->vgId);
|
||||
// goto PARSE_OVER;
|
||||
// }
|
||||
// pVnode->tsdbCfg.maxTables = maxTables->valueint;
|
||||
|
||||
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
|
||||
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
|
||||
|
@ -778,12 +778,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
|||
}
|
||||
pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
|
||||
|
||||
cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
|
||||
if (!commitTime || commitTime->type != cJSON_Number) {
|
||||
vError("vgId:%d, failed to read vnode cfg, commitTime not found", pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
|
||||
// cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
|
||||
// if (!commitTime || commitTime->type != cJSON_Number) {
|
||||
// vError("vgId:%d, failed to read vnode cfg, commitTime not found", pVnode->vgId);
|
||||
// goto PARSE_OVER;
|
||||
// }
|
||||
// pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
|
||||
|
||||
cJSON *precision = cJSON_GetObjectItem(root, "precision");
|
||||
if (!precision || precision->type != cJSON_Number) {
|
||||
|
|
Loading…
Reference in New Issue