Merge branch 'develop' into feature/query
This commit is contained in:
commit
bb9a5213fa
|
@ -106,13 +106,14 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
|
|||
SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
|
||||
|
||||
taosCorBeginWrite(&pVgroupInfo->version);
|
||||
//TODO(dengyihao), dont care vgid
|
||||
tscDebug("before: Endpoint in use: %d", pVgroupInfo->inUse);
|
||||
pVgroupInfo->inUse = pEpSet->inUse;
|
||||
pVgroupInfo->numOfEps = pEpSet->numOfEps;
|
||||
for (int32_t i = 0; pVgroupInfo->numOfEps; i++) {
|
||||
for (int32_t i = 0; i < pVgroupInfo->numOfEps; i++) {
|
||||
strncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN);
|
||||
pVgroupInfo->epAddr[i].port = pEpSet->port[i];
|
||||
}
|
||||
tscDebug("after: EndPoint in use: %d", pVgroupInfo->inUse);
|
||||
taosCorEndWrite(&pVgroupInfo->version);
|
||||
}
|
||||
void tscPrintMgmtEp() {
|
||||
|
@ -283,9 +284,7 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
|||
}
|
||||
|
||||
if (pEpSet) {
|
||||
//SRpcEpSet dump;
|
||||
tscEpSetHtons(pEpSet);
|
||||
if (tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
|
||||
if (!tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
|
||||
if(pCmd->command < TSDB_SQL_MGMT) {
|
||||
tscUpdateVgroupInfo(pSql, pEpSet);
|
||||
} else {
|
||||
|
|
|
@ -53,14 +53,12 @@ typedef struct {
|
|||
int32_t tsdbId;
|
||||
int32_t cacheBlockSize;
|
||||
int32_t totalBlocks;
|
||||
int32_t maxTables; // maximum number of tables this repository can have
|
||||
int32_t daysPerFile; // day per file sharding policy
|
||||
int32_t keep; // day of data to keep
|
||||
int32_t keep1;
|
||||
int32_t keep2;
|
||||
int32_t minRowsPerFileBlock; // minimum rows per file block
|
||||
int32_t maxRowsPerFileBlock; // maximum rows per file block
|
||||
int32_t commitTime;
|
||||
int8_t precision;
|
||||
int8_t compression;
|
||||
} STsdbCfg;
|
||||
|
|
|
@ -37,14 +37,12 @@ static int32_t saveVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
|
|||
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnode->cfgVersion);
|
||||
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnode->tsdbCfg.cacheBlockSize);
|
||||
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnode->tsdbCfg.totalBlocks);
|
||||
len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnode->tsdbCfg.maxTables);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnode->tsdbCfg.daysPerFile);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnode->tsdbCfg.keep);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnode->tsdbCfg.keep1);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnode->tsdbCfg.keep2);
|
||||
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnode->tsdbCfg.minRowsPerFileBlock);
|
||||
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnode->tsdbCfg.maxRowsPerFileBlock);
|
||||
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnode->tsdbCfg.commitTime);
|
||||
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnode->tsdbCfg.precision);
|
||||
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnode->tsdbCfg.compression);
|
||||
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnode->walCfg.walLevel);
|
||||
|
@ -136,12 +134,12 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
|
|||
}
|
||||
pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint;
|
||||
|
||||
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
|
||||
if (!maxTables || maxTables->type != cJSON_Number) {
|
||||
printf("vgId:%d, failed to read vnode cfg, maxTables not found\n", pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.maxTables = maxTables->valueint;
|
||||
// cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
|
||||
// if (!maxTables || maxTables->type != cJSON_Number) {
|
||||
// printf("vgId:%d, failed to read vnode cfg, maxTables not found\n", pVnode->vgId);
|
||||
// goto PARSE_OVER;
|
||||
// }
|
||||
// pVnode->tsdbCfg.maxTables = maxTables->valueint;
|
||||
|
||||
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
|
||||
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
|
||||
|
@ -185,12 +183,12 @@ static int32_t readVnodeCfg(SVnodeObj *pVnode, char* cfgFile)
|
|||
}
|
||||
pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
|
||||
|
||||
cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
|
||||
if (!commitTime || commitTime->type != cJSON_Number) {
|
||||
printf("vgId:%d, failed to read vnode cfg, commitTime not found\n", pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
|
||||
// cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
|
||||
// if (!commitTime || commitTime->type != cJSON_Number) {
|
||||
// printf("vgId:%d, failed to read vnode cfg, commitTime not found\n", pVnode->vgId);
|
||||
// goto PARSE_OVER;
|
||||
// }
|
||||
// pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
|
||||
|
||||
cJSON *precision = cJSON_GetObjectItem(root, "precision");
|
||||
if (!precision || precision->type != cJSON_Number) {
|
||||
|
|
|
@ -70,6 +70,7 @@ typedef struct {
|
|||
pthread_rwlock_t rwLock;
|
||||
|
||||
int32_t nTables;
|
||||
int32_t maxTables;
|
||||
STable** tables;
|
||||
SList* superList;
|
||||
SHashObj* uidMap;
|
||||
|
@ -111,9 +112,11 @@ typedef struct {
|
|||
|
||||
typedef struct {
|
||||
T_REF_DECLARE();
|
||||
SRWLatch latch;
|
||||
TSKEY keyFirst;
|
||||
TSKEY keyLast;
|
||||
int64_t numOfRows;
|
||||
int32_t maxTables;
|
||||
STableData** tData;
|
||||
SList* actList;
|
||||
SList* bufBlockList;
|
||||
|
@ -304,6 +307,7 @@ typedef struct {
|
|||
|
||||
// Operations
|
||||
// ------------------ tsdbMeta.c
|
||||
#define TSDB_INIT_NTABLES 1024
|
||||
#define TABLE_TYPE(t) (t)->type
|
||||
#define TABLE_NAME(t) (t)->name
|
||||
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
|
||||
|
@ -395,6 +399,7 @@ int tsdbInsertRowToMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
|
|||
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
||||
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem);
|
||||
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
||||
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
||||
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
|
||||
|
@ -429,7 +434,7 @@ STsdbFileH* tsdbNewFileH(STsdbCfg* pCfg);
|
|||
void tsdbFreeFileH(STsdbFileH* pFileH);
|
||||
int tsdbOpenFileH(STsdbRepo* pRepo);
|
||||
void tsdbCloseFileH(STsdbRepo* pRepo);
|
||||
SFileGroup* tsdbCreateFGroupIfNeed(STsdbRepo* pRepo, char* dataDir, int fid, int maxTables);
|
||||
SFileGroup* tsdbCreateFGroupIfNeed(STsdbRepo* pRepo, char* dataDir, int fid);
|
||||
void tsdbInitFileGroupIter(STsdbFileH* pFileH, SFileGroupIter* pIter, int direction);
|
||||
void tsdbSeekFileGroupIter(SFileGroupIter* pIter, int fid);
|
||||
SFileGroup* tsdbGetFileGroupNext(SFileGroupIter* pIter);
|
||||
|
@ -511,6 +516,7 @@ void tsdbGetDataFileName(STsdbRepo* pRepo, int fid, int type, char* fname
|
|||
int tsdbLockRepo(STsdbRepo* pRepo);
|
||||
int tsdbUnlockRepo(STsdbRepo* pRepo);
|
||||
char* tsdbGetDataDirName(char* rootDir);
|
||||
int tsdbGetNextMaxTables(int tid);
|
||||
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
|
||||
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
|
||||
|
||||
|
|
|
@ -149,7 +149,7 @@ void tsdbCloseFileH(STsdbRepo *pRepo) {
|
|||
}
|
||||
}
|
||||
|
||||
SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid, int maxTables) {
|
||||
SFileGroup *tsdbCreateFGroupIfNeed(STsdbRepo *pRepo, char *dataDir, int fid) {
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
|
||||
if (pFileH->nFGroups >= pFileH->maxFGroups) return NULL;
|
||||
|
|
|
@ -62,7 +62,6 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo);
|
|||
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
|
||||
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
|
||||
static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
|
||||
static int tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables);
|
||||
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
|
||||
static int keyFGroupCompFunc(const void *key, const void *fgroup);
|
||||
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
|
||||
|
@ -85,10 +84,10 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
|
|||
if (tsdbSetRepoEnv(rootDir, pCfg) < 0) return -1;
|
||||
|
||||
tsdbDebug(
|
||||
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d maxTables %d daysPerFile %d keep "
|
||||
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d daysPerFile %d keep "
|
||||
"%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d",
|
||||
pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->maxTables, pCfg->daysPerFile, pCfg->keep,
|
||||
pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression);
|
||||
pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock,
|
||||
pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -307,13 +306,6 @@ int32_t tsdbConfigRepo(TSDB_REPO_T *repo, STsdbCfg *pCfg) {
|
|||
tsdbAlterCacheTotalBlocks(pRepo, pCfg->totalBlocks);
|
||||
configChanged = true;
|
||||
}
|
||||
if (pRCfg->maxTables != pCfg->maxTables) {
|
||||
if (tsdbAlterMaxTables(pRepo, pCfg->maxTables) < 0) {
|
||||
tsdbError("vgId:%d failed to configure repo when alter maxTables since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
configChanged = true;
|
||||
}
|
||||
|
||||
if (configChanged) {
|
||||
if (tsdbSaveConfig(pRepo->rootDir, &pRepo->config) < 0) {
|
||||
|
@ -385,6 +377,18 @@ char *tsdbGetDataDirName(char *rootDir) {
|
|||
return fname;
|
||||
}
|
||||
|
||||
int tsdbGetNextMaxTables(int tid) {
|
||||
ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES);
|
||||
int maxTables = TSDB_INIT_NTABLES;
|
||||
while (true) {
|
||||
maxTables = MIN(maxTables, TSDB_MAX_TABLES);
|
||||
if (tid <= maxTables) break;
|
||||
maxTables *= 2;
|
||||
}
|
||||
|
||||
return maxTables + 1;
|
||||
}
|
||||
|
||||
STsdbMeta * tsdbGetMeta(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbMeta; }
|
||||
STsdbFileH * tsdbGetFile(TSDB_REPO_T *pRepo) { return ((STsdbRepo *)pRepo)->tsdbFileH; }
|
||||
STsdbRepoInfo *tsdbGetStatus(TSDB_REPO_T *pRepo) { return NULL; }
|
||||
|
@ -417,17 +421,6 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// Check maxTables
|
||||
if (pCfg->maxTables == -1) {
|
||||
pCfg->maxTables = TSDB_DEFAULT_TABLES+1;
|
||||
} else {
|
||||
if (pCfg->maxTables - 1 < TSDB_MIN_TABLES || pCfg->maxTables - 1 > TSDB_MAX_TABLES) {
|
||||
tsdbError("vgId:%d invalid maxTables configuration! maxTables %d TSDB_MIN_TABLES %d TSDB_MAX_TABLES %d",
|
||||
pCfg->tsdbId, pCfg->maxTables - 1, TSDB_MIN_TABLES, TSDB_MAX_TABLES);
|
||||
goto _err;
|
||||
}
|
||||
}
|
||||
|
||||
// Check daysPerFile
|
||||
if (pCfg->daysPerFile == -1) {
|
||||
pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
|
||||
|
@ -713,6 +706,7 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
|
|||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
int64_t points = 0;
|
||||
|
||||
ASSERT(pBlock->tid < pMeta->maxTables);
|
||||
STable *pTable = pMeta->tables[pBlock->tid];
|
||||
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
|
||||
|
||||
|
@ -779,7 +773,6 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
|||
}
|
||||
|
||||
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||
// TODO
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup *pFGroup = NULL;
|
||||
|
@ -792,7 +785,7 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
|||
tsdbInitFileGroupIter(pFileH, &iter, TSDB_ORDER_DESC);
|
||||
while ((pFGroup = tsdbGetFileGroupNext(&iter)) != NULL) {
|
||||
if (tsdbSetAndOpenHelperFile(&rhelper, pFGroup) < 0) goto _err;
|
||||
for (int i = 1; i < pRepo->config.maxTables; i++) {
|
||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable == NULL) continue;
|
||||
tsdbSetHelperTable(&rhelper, pTable, pRepo);
|
||||
|
@ -868,36 +861,6 @@ static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbAlterMaxTables(STsdbRepo *pRepo, int32_t maxTables) {
|
||||
// TODO
|
||||
int oldMaxTables = pRepo->config.maxTables;
|
||||
if (oldMaxTables < pRepo->config.maxTables) {
|
||||
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
pMeta->tables = realloc(pMeta->tables, maxTables * sizeof(STable *));
|
||||
memset(&pMeta->tables[oldMaxTables], 0, sizeof(STable *) * (maxTables - oldMaxTables));
|
||||
pRepo->config.maxTables = maxTables;
|
||||
|
||||
if (pRepo->mem) {
|
||||
pRepo->mem->tData = realloc(pRepo->mem->tData, maxTables * sizeof(STableData *));
|
||||
memset(POINTER_SHIFT(pRepo->mem->tData, sizeof(STableData *) * oldMaxTables), 0,
|
||||
sizeof(STableData *) * (maxTables - oldMaxTables));
|
||||
}
|
||||
|
||||
if (pRepo->imem) {
|
||||
pRepo->imem->tData = realloc(pRepo->imem->tData, maxTables * sizeof(STableData *));
|
||||
memset(POINTER_SHIFT(pRepo->imem->tData, sizeof(STableData *) * oldMaxTables), 0,
|
||||
sizeof(STableData *) * (maxTables - oldMaxTables));
|
||||
}
|
||||
|
||||
tsdbDebug("vgId:%d, tsdb maxTables is changed from %d to %d!", pRepo->config.tsdbId, oldMaxTables, maxTables);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int keyFGroupCompFunc(const void *key, const void *fgroup) {
|
||||
int fid = *(int *)key;
|
||||
SFileGroup *pFGroup = (SFileGroup *)fgroup;
|
||||
|
@ -914,7 +877,6 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
|
|||
tlen += taosEncodeVariantI32(buf, pCfg->tsdbId);
|
||||
tlen += taosEncodeFixedI32(buf, pCfg->cacheBlockSize);
|
||||
tlen += taosEncodeVariantI32(buf, pCfg->totalBlocks);
|
||||
tlen += taosEncodeVariantI32(buf, pCfg->maxTables);
|
||||
tlen += taosEncodeVariantI32(buf, pCfg->daysPerFile);
|
||||
tlen += taosEncodeVariantI32(buf, pCfg->keep);
|
||||
tlen += taosEncodeVariantI32(buf, pCfg->keep1);
|
||||
|
@ -931,7 +893,6 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
|
|||
buf = taosDecodeVariantI32(buf, &(pCfg->tsdbId));
|
||||
buf = taosDecodeFixedI32(buf, &(pCfg->cacheBlockSize));
|
||||
buf = taosDecodeVariantI32(buf, &(pCfg->totalBlocks));
|
||||
buf = taosDecodeVariantI32(buf, &(pCfg->maxTables));
|
||||
buf = taosDecodeVariantI32(buf, &(pCfg->daysPerFile));
|
||||
buf = taosDecodeVariantI32(buf, &(pCfg->keep));
|
||||
buf = taosDecodeVariantI32(buf, &(pCfg->keep1));
|
||||
|
@ -1037,7 +998,7 @@ static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
|
|||
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||
pBlock->numOfRows = htons(pBlock->numOfRows);
|
||||
|
||||
if (pBlock->tid <= 0 || pBlock->tid >= pRepo->config.maxTables) {
|
||||
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
|
||||
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
|
||||
pBlock->tid);
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
|
@ -1120,7 +1081,7 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
|
|||
static void tsdbStartStream(STsdbRepo *pRepo) {
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
for (int i = 0; i < pRepo->config.maxTables; i++) {
|
||||
for (int i = 0; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
|
||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
|
||||
|
@ -1133,7 +1094,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
|
|||
static void tsdbStopStream(STsdbRepo *pRepo) {
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
for (int i = 0; i < pRepo->config.maxTables; i++) {
|
||||
for (int i = 0; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
|
||||
(*pRepo->appH.cqDropFunc)(pTable->cqhandle);
|
||||
|
|
|
@ -21,7 +21,7 @@
|
|||
static FORCE_INLINE STsdbBufBlock *tsdbGetCurrBufBlock(STsdbRepo *pRepo);
|
||||
|
||||
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
|
||||
static SMemTable * tsdbNewMemTable(STsdbCfg *pCfg);
|
||||
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
|
||||
static void tsdbFreeMemTable(SMemTable *pMemTable);
|
||||
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
|
||||
static void tsdbFreeTableData(STableData *pTableData);
|
||||
|
@ -30,13 +30,15 @@ static void * tsdbCommitData(void *arg);
|
|||
static int tsdbCommitMeta(STsdbRepo *pRepo);
|
||||
static void tsdbEndCommit(STsdbRepo *pRepo);
|
||||
static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSKEY maxKey);
|
||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
||||
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
|
||||
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
|
||||
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
|
||||
|
||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||
int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
int32_t level = 0;
|
||||
int32_t headSize = 0;
|
||||
TSKEY key = dataRowKey(row);
|
||||
|
@ -45,7 +47,7 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
|||
SSkipList * pSList = NULL;
|
||||
int bytes = 0;
|
||||
|
||||
if (pMemTable != NULL && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
|
||||
if (pMemTable != NULL && TABLE_TID(pTable) < pMemTable->maxTables && pMemTable->tData[TABLE_TID(pTable)] != NULL &&
|
||||
pMemTable->tData[TABLE_TID(pTable)]->uid == TABLE_UID(pTable)) {
|
||||
pTableData = pMemTable->tData[TABLE_TID(pTable)];
|
||||
pSList = pTableData->pData;
|
||||
|
@ -66,13 +68,20 @@ int tsdbInsertRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
|||
// Operations above may change pRepo->mem, retake those values
|
||||
ASSERT(pRepo->mem != NULL);
|
||||
pMemTable = pRepo->mem;
|
||||
|
||||
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
|
||||
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) return -1;;
|
||||
}
|
||||
pTableData = pMemTable->tData[TABLE_TID(pTable)];
|
||||
|
||||
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
|
||||
if (pTableData != NULL) { // destroy the table skiplist (may have race condition problem)
|
||||
taosWLockLatch(&(pMemTable->latch));
|
||||
pMemTable->tData[TABLE_TID(pTable)] = NULL;
|
||||
tsdbFreeTableData(pTableData);
|
||||
taosWUnLockLatch(&(pMemTable->latch));
|
||||
}
|
||||
|
||||
pTableData = tsdbNewTableData(pCfg, pTable);
|
||||
if (pTableData == NULL) {
|
||||
tsdbError("vgId:%d failed to insert row with key %" PRId64
|
||||
|
@ -122,7 +131,6 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
|
|||
int ref = T_REF_DEC(pMemTable);
|
||||
tsdbDebug("vgId:%d unref memtable %p ref %d", REPO_ID(pRepo), pMemTable, ref);
|
||||
if (ref == 0) {
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbBufPool *pBufPool = pRepo->pPool;
|
||||
|
||||
SListNode *pNode = NULL;
|
||||
|
@ -139,7 +147,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
|
|||
}
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
|
||||
for (int i = 0; i < pCfg->maxTables; i++) {
|
||||
for (int i = 0; i < pMemTable->maxTables; i++) {
|
||||
if (pMemTable->tData[i] != NULL) {
|
||||
tsdbFreeTableData(pMemTable->tData[i]);
|
||||
}
|
||||
|
@ -161,11 +169,24 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
|
|||
tsdbRefMemTable(pRepo, *pIMem);
|
||||
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
|
||||
|
||||
if (*pMem != NULL) taosRLockLatch(&((*pMem)->latch));
|
||||
|
||||
tsdbDebug("vgId:%d take memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), *pMem, *pIMem);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) {
|
||||
if (pMem != NULL) {
|
||||
taosRUnLockLatch(&(pMem->latch));
|
||||
tsdbUnRefMemTable(pRepo, pMem);
|
||||
}
|
||||
|
||||
if (pIMem != NULL) {
|
||||
tsdbUnRefMemTable(pRepo, pIMem);
|
||||
}
|
||||
}
|
||||
|
||||
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||
|
@ -182,7 +203,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
|||
}
|
||||
|
||||
if (pRepo->mem == NULL) {
|
||||
SMemTable *pMemTable = tsdbNewMemTable(&pRepo->config);
|
||||
SMemTable *pMemTable = tsdbNewMemTable(pRepo);
|
||||
if (pMemTable == NULL) return NULL;
|
||||
|
||||
if (tsdbLockRepo(pRepo) < 0) {
|
||||
|
@ -329,7 +350,9 @@ static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
|
|||
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
||||
}
|
||||
|
||||
static SMemTable* tsdbNewMemTable(STsdbCfg* pCfg) {
|
||||
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
SMemTable *pMemTable = (SMemTable *)calloc(1, sizeof(*pMemTable));
|
||||
if (pMemTable == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
|
@ -340,7 +363,8 @@ static SMemTable* tsdbNewMemTable(STsdbCfg* pCfg) {
|
|||
pMemTable->keyLast = 0;
|
||||
pMemTable->numOfRows = 0;
|
||||
|
||||
pMemTable->tData = (STableData**)calloc(pCfg->maxTables, sizeof(STableData*));
|
||||
pMemTable->maxTables = pMeta->maxTables;
|
||||
pMemTable->tData = (STableData **)calloc(pMemTable->maxTables, sizeof(STableData *));
|
||||
if (pMemTable->tData == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
|
@ -398,9 +422,6 @@ static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
// TODO: operation here should not be here, remove it
|
||||
pTableData->pData->level = 1;
|
||||
|
||||
return pTableData;
|
||||
|
||||
_err:
|
||||
|
@ -473,7 +494,7 @@ static void *tsdbCommitData(void *arg) {
|
|||
|
||||
_exit:
|
||||
tdFreeDataCols(pDataCols);
|
||||
tsdbDestroyCommitIters(iters, pCfg->maxTables);
|
||||
tsdbDestroyCommitIters(iters, pMem->maxTables);
|
||||
tsdbDestroyHelper(&whelper);
|
||||
tsdbEndCommit(pRepo);
|
||||
tsdbInfo("vgId:%d commit over", pRepo->config.tsdbId);
|
||||
|
@ -552,12 +573,13 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup *pGroup = NULL;
|
||||
SMemTable * pMem = pRepo->imem;
|
||||
|
||||
TSKEY minKey = 0, maxKey = 0;
|
||||
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
|
||||
|
||||
// Check if there are data to commit to this file
|
||||
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
|
||||
int hasDataToCommit = tsdbHasDataToCommit(iters, pMem->maxTables, minKey, maxKey);
|
||||
if (!hasDataToCommit) {
|
||||
tsdbDebug("vgId:%d no data to commit to file %d", REPO_ID(pRepo), fid);
|
||||
return 0;
|
||||
|
@ -570,7 +592,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
return -1;
|
||||
}
|
||||
|
||||
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid, pCfg->maxTables)) == NULL) {
|
||||
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
|
||||
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||
goto _err;
|
||||
}
|
||||
|
@ -582,7 +604,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
}
|
||||
|
||||
// Loop to commit data in each table
|
||||
for (int tid = 1; tid < pCfg->maxTables; tid++) {
|
||||
for (int tid = 1; tid < pMem->maxTables; tid++) {
|
||||
SCommitIter *pIter = iters + tid;
|
||||
if (pIter->pTable == NULL) continue;
|
||||
|
||||
|
@ -643,11 +665,10 @@ _err:
|
|||
}
|
||||
|
||||
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
|
||||
STsdbCfg * pCfg = &(pRepo->config);
|
||||
SMemTable *pMem = pRepo->imem;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
SCommitIter *iters = (SCommitIter *)calloc(pCfg->maxTables, sizeof(SCommitIter));
|
||||
SCommitIter *iters = (SCommitIter *)calloc(pMem->maxTables, sizeof(SCommitIter));
|
||||
if (iters == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
|
@ -656,7 +677,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
|
|||
if (tsdbRLockRepoMeta(pRepo) < 0) goto _err;
|
||||
|
||||
// reference all tables
|
||||
for (int i = 0; i < pCfg->maxTables; i++) {
|
||||
for (int i = 0; i < pMem->maxTables; i++) {
|
||||
if (pMeta->tables[i] != NULL) {
|
||||
tsdbRefTable(pMeta->tables[i]);
|
||||
iters[i].pTable = pMeta->tables[i];
|
||||
|
@ -665,7 +686,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
|
|||
|
||||
if (tsdbUnlockRepoMeta(pRepo) < 0) goto _err;
|
||||
|
||||
for (int i = 0; i < pCfg->maxTables; i++) {
|
||||
for (int i = 0; i < pMem->maxTables; i++) {
|
||||
if ((iters[i].pTable != NULL) && (pMem->tData[i] != NULL) && (TABLE_UID(iters[i].pTable) == pMem->tData[i]->uid)) {
|
||||
if ((iters[i].pIter = tSkipListCreateIter(pMem->tData[i]->pData)) == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
|
@ -679,7 +700,7 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo) {
|
|||
return iters;
|
||||
|
||||
_err:
|
||||
tsdbDestroyCommitIters(iters, pCfg->maxTables);
|
||||
tsdbDestroyCommitIters(iters, pMem->maxTables);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -694,4 +715,26 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
|
|||
}
|
||||
|
||||
free(iters);
|
||||
}
|
||||
|
||||
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables) {
|
||||
ASSERT(pMemTable->maxTables < maxTables);
|
||||
|
||||
STableData **pTableData = (STableData **)calloc(maxTables, sizeof(STableData *));
|
||||
if (pTableData == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
memcpy((void *)pTableData, (void *)pMemTable->tData, sizeof(STableData *) * pMemTable->maxTables);
|
||||
|
||||
STableData **tData = pMemTable->tData;
|
||||
|
||||
taosWLockLatch(&(pMemTable->latch));
|
||||
pMemTable->maxTables = maxTables;
|
||||
pMemTable->tData = pTableData;
|
||||
taosWUnLockLatch(&(pMemTable->latch));
|
||||
|
||||
tfree(tData);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -49,6 +49,7 @@ static int tsdbGetTableEncodeSize(int8_t act, STable *pTable);
|
|||
static void * tsdbInsertTableAct(STsdbRepo *pRepo, int8_t act, void *buf, STable *pTable);
|
||||
static int tsdbRemoveTableFromStore(STsdbRepo *pRepo, STable *pTable);
|
||||
static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable);
|
||||
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid);
|
||||
|
||||
// ------------------ OUTER FUNCTIONS ------------------
|
||||
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
|
||||
|
@ -60,13 +61,13 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
|
|||
int tid = pCfg->tableId.tid;
|
||||
STable * pTable = NULL;
|
||||
|
||||
if (tid < 0 || tid >= pRepo->config.maxTables) {
|
||||
if (tid < 1 || tid > TSDB_MAX_TABLES) {
|
||||
tsdbError("vgId:%d failed to create table since invalid tid %d", REPO_ID(pRepo), tid);
|
||||
terrno = TSDB_CODE_TDB_IVD_CREATE_TABLE_INFO;
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (pMeta->tables[tid] != NULL) {
|
||||
if (tid < pMeta->maxTables && pMeta->tables[tid] != NULL) {
|
||||
if (TABLE_UID(pMeta->tables[tid]) == pCfg->tableId.uid) {
|
||||
tsdbError("vgId:%d table %s already exists, tid %d uid %" PRId64, REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||
TABLE_TID(pTable), TABLE_UID(pTable));
|
||||
|
@ -422,7 +423,8 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->tables = (STable **)calloc(pCfg->maxTables, sizeof(STable *));
|
||||
pMeta->maxTables = TSDB_INIT_NTABLES + 1;
|
||||
pMeta->tables = (STable **)calloc(pMeta->maxTables, sizeof(STable *));
|
||||
if (pMeta->tables == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
|
@ -434,7 +436,7 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
|
|||
goto _err;
|
||||
}
|
||||
|
||||
pMeta->uidMap = taosHashInit(pCfg->maxTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||
pMeta->uidMap = taosHashInit(TSDB_INIT_NTABLES * 1.1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false);
|
||||
if (pMeta->uidMap == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
|
@ -484,14 +486,13 @@ _err:
|
|||
}
|
||||
|
||||
int tsdbCloseMeta(STsdbRepo *pRepo) {
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
SListNode *pNode = NULL;
|
||||
STable * pTable = NULL;
|
||||
|
||||
if (pMeta == NULL) return 0;
|
||||
tdCloseKVStore(pMeta->pStore);
|
||||
for (int i = 1; i < pCfg->maxTables; i++) {
|
||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||
tsdbFreeTable(pMeta->tables[i]);
|
||||
}
|
||||
|
||||
|
@ -624,9 +625,8 @@ static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
|
|||
static void tsdbOrgMeta(void *pHandle) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
|
||||
for (int i = 1; i < pCfg->maxTables; i++) {
|
||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable != NULL && pTable->type == TSDB_CHILD_TABLE) {
|
||||
tsdbAddTableIntoIndex(pMeta, pTable, true);
|
||||
|
@ -781,6 +781,9 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
|
|||
goto _err;
|
||||
}
|
||||
} else {
|
||||
if (TABLE_TID(pTable) >= pMeta->maxTables) {
|
||||
if (tsdbAdjustMetaTables(pRepo, TABLE_TID(pTable)) < 0) goto _err;
|
||||
}
|
||||
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index
|
||||
if (tsdbAddTableIntoIndex(pMeta, pTable, true) < 0) {
|
||||
tsdbDebug("vgId:%d failed to add table %s to meta while add table to index since %s", REPO_ID(pRepo),
|
||||
|
@ -788,6 +791,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
|
|||
goto _err;
|
||||
}
|
||||
}
|
||||
ASSERT(TABLE_TID(pTable) < pMeta->maxTables);
|
||||
pMeta->tables[TABLE_TID(pTable)] = pTable;
|
||||
pMeta->nTables++;
|
||||
}
|
||||
|
@ -827,7 +831,6 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
|
|||
SListIter lIter = {0};
|
||||
SListNode *pNode = NULL;
|
||||
STable * tTable = NULL;
|
||||
STsdbCfg * pCfg = &(pRepo->config);
|
||||
|
||||
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||
int maxCols = schemaNCols(pSchema);
|
||||
|
@ -860,7 +863,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
|
|||
if (maxCols == pMeta->maxCols || maxRowBytes == pMeta->maxRowBytes) {
|
||||
maxCols = 0;
|
||||
maxRowBytes = 0;
|
||||
for (int i = 0; i < pCfg->maxTables; i++) {
|
||||
for (int i = 0; i < pMeta->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable != NULL) {
|
||||
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||
|
@ -1266,5 +1269,28 @@ static int tsdbRmTableFromMeta(STsdbRepo *pRepo, STable *pTable) {
|
|||
tsdbRemoveTableFromMeta(pRepo, pTable, true, true);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbAdjustMetaTables(STsdbRepo *pRepo, int tid) {
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
ASSERT(tid >= pMeta->maxTables);
|
||||
|
||||
int maxTables = tsdbGetNextMaxTables(tid);
|
||||
|
||||
STable **tables = (STable **)calloc(maxTables, sizeof(STable *));
|
||||
if (tables == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
memcpy((void *)tables, (void *)pMeta->tables, sizeof(STable *) * pMeta->maxTables);
|
||||
pMeta->maxTables = maxTables;
|
||||
|
||||
STable **tTables = pMeta->tables;
|
||||
pMeta->tables = tables;
|
||||
tfree(tTables);
|
||||
tsdbDebug("vgId:%d tsdb meta maxTables is adjusted as %d", REPO_ID(pRepo), maxTables);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -1231,8 +1231,8 @@ static int tsdbLoadColData(SRWHelper *pHelper, SFile *pFile, SCompBlock *pCompBl
|
|||
if (tsdbCheckAndDecodeColumnData(pDataCol, pHelper->pBuffer, pCompCol->len, pCompBlock->algorithm,
|
||||
pCompBlock->numOfRows, pHelper->pRepo->config.maxRowsPerFileBlock,
|
||||
pHelper->compBuffer, tsizeof(pHelper->compBuffer)) < 0) {
|
||||
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname, pCompCol->colId,
|
||||
(int64_t)pCompCol->offset);
|
||||
tsdbError("vgId:%d file %s is broken at column %d offset %" PRId64, REPO_ID(pHelper->pRepo), pFile->fname,
|
||||
pCompCol->colId, offset);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -318,14 +318,17 @@ static bool initTableMemIterator(STsdbQueryHandle* pHandle, STableCheckInfo* pCh
|
|||
|
||||
assert(pCheckInfo->iter == NULL && pCheckInfo->iiter == NULL);
|
||||
|
||||
if (pHandle->mem && pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
|
||||
// TODO: add uid check
|
||||
if (pHandle->mem && pCheckInfo->tableId.tid < pHandle->mem->maxTables &&
|
||||
pHandle->mem->tData[pCheckInfo->tableId.tid] != NULL) {
|
||||
pCheckInfo->iter = tSkipListCreateIterFromVal(pHandle->mem->tData[pCheckInfo->tableId.tid]->pData,
|
||||
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
|
||||
(const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
|
||||
}
|
||||
|
||||
if (pHandle->imem && pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
|
||||
if (pHandle->imem && pCheckInfo->tableId.tid < pHandle->imem->maxTables &&
|
||||
pHandle->imem->tData[pCheckInfo->tableId.tid] != NULL) {
|
||||
pCheckInfo->iiter = tSkipListCreateIterFromVal(pHandle->imem->tData[pCheckInfo->tableId.tid]->pData,
|
||||
(const char*) &pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
|
||||
(const char*)&pCheckInfo->lastKey, TSDB_DATA_TYPE_TIMESTAMP, order);
|
||||
}
|
||||
|
||||
// both iterators are NULL, no data in buffer right now
|
||||
|
@ -1566,8 +1569,7 @@ static int32_t getDataBlocksInFiles(STsdbQueryHandle* pQueryHandle, bool* exists
|
|||
|
||||
static bool doHasDataInBuffer(STsdbQueryHandle* pQueryHandle) {
|
||||
size_t numOfTables = taosArrayGetSize(pQueryHandle->pTableCheckInfo);
|
||||
assert(numOfTables <= ((STsdbRepo*)pQueryHandle->pTsdb)->config.maxTables);
|
||||
|
||||
|
||||
while (pQueryHandle->activeIndex < numOfTables) {
|
||||
if (hasMoreDataInCache(pQueryHandle)) {
|
||||
return true;
|
||||
|
@ -2454,8 +2456,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle) {
|
|||
tfree(pQueryHandle->statis);
|
||||
|
||||
// todo check error
|
||||
tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->mem);
|
||||
tsdbUnRefMemTable(pQueryHandle->pTsdb, pQueryHandle->imem);
|
||||
tsdbUnTakeMemSnapShot(pQueryHandle->pTsdb, pQueryHandle->mem, pQueryHandle->imem);
|
||||
|
||||
tsdbDestroyHelper(&pQueryHandle->rhelper);
|
||||
|
||||
|
|
|
@ -98,7 +98,7 @@ static void tsdbSetCfg(STsdbCfg *pCfg, int32_t tsdbId, int32_t cacheBlockSize, i
|
|||
pCfg->tsdbId = tsdbId;
|
||||
pCfg->cacheBlockSize = cacheBlockSize;
|
||||
pCfg->totalBlocks = totalBlocks;
|
||||
pCfg->maxTables = maxTables;
|
||||
// pCfg->maxTables = maxTables;
|
||||
pCfg->daysPerFile = daysPerFile;
|
||||
pCfg->keep = keep;
|
||||
pCfg->minRowsPerFileBlock = minRows;
|
||||
|
|
|
@ -123,7 +123,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
|
||||
tsdbCfg.cacheBlockSize = pVnodeCfg->cfg.cacheBlockSize;
|
||||
tsdbCfg.totalBlocks = pVnodeCfg->cfg.totalBlocks;
|
||||
tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables;
|
||||
// tsdbCfg.maxTables = pVnodeCfg->cfg.maxTables;
|
||||
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
|
||||
tsdbCfg.keep = pVnodeCfg->cfg.daysToKeep;
|
||||
tsdbCfg.minRowsPerFileBlock = pVnodeCfg->cfg.minRowsPerFileBlock;
|
||||
|
@ -630,14 +630,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
|||
len += snprintf(content + len, maxLen - len, " \"cfgVersion\": %d,\n", pVnodeCfg->cfg.cfgVersion);
|
||||
len += snprintf(content + len, maxLen - len, " \"cacheBlockSize\": %d,\n", pVnodeCfg->cfg.cacheBlockSize);
|
||||
len += snprintf(content + len, maxLen - len, " \"totalBlocks\": %d,\n", pVnodeCfg->cfg.totalBlocks);
|
||||
len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
|
||||
// len += snprintf(content + len, maxLen - len, " \"maxTables\": %d,\n", pVnodeCfg->cfg.maxTables);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysPerFile\": %d,\n", pVnodeCfg->cfg.daysPerFile);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep\": %d,\n", pVnodeCfg->cfg.daysToKeep);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep1\": %d,\n", pVnodeCfg->cfg.daysToKeep1);
|
||||
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2);
|
||||
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
|
||||
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
|
||||
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
|
||||
// len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
|
||||
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
|
||||
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
|
||||
len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel);
|
||||
|
@ -729,12 +729,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
|||
}
|
||||
pVnode->tsdbCfg.totalBlocks = totalBlocks->valueint;
|
||||
|
||||
cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
|
||||
if (!maxTables || maxTables->type != cJSON_Number) {
|
||||
vError("vgId:%d, failed to read vnode cfg, maxTables not found", pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.maxTables = maxTables->valueint;
|
||||
// cJSON *maxTables = cJSON_GetObjectItem(root, "maxTables");
|
||||
// if (!maxTables || maxTables->type != cJSON_Number) {
|
||||
// vError("vgId:%d, failed to read vnode cfg, maxTables not found", pVnode->vgId);
|
||||
// goto PARSE_OVER;
|
||||
// }
|
||||
// pVnode->tsdbCfg.maxTables = maxTables->valueint;
|
||||
|
||||
cJSON *daysPerFile = cJSON_GetObjectItem(root, "daysPerFile");
|
||||
if (!daysPerFile || daysPerFile->type != cJSON_Number) {
|
||||
|
@ -778,12 +778,12 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
|||
}
|
||||
pVnode->tsdbCfg.maxRowsPerFileBlock = maxRowsPerFileBlock->valueint;
|
||||
|
||||
cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
|
||||
if (!commitTime || commitTime->type != cJSON_Number) {
|
||||
vError("vgId:%d, failed to read vnode cfg, commitTime not found", pVnode->vgId);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
|
||||
// cJSON *commitTime = cJSON_GetObjectItem(root, "commitTime");
|
||||
// if (!commitTime || commitTime->type != cJSON_Number) {
|
||||
// vError("vgId:%d, failed to read vnode cfg, commitTime not found", pVnode->vgId);
|
||||
// goto PARSE_OVER;
|
||||
// }
|
||||
// pVnode->tsdbCfg.commitTime = (int8_t)commitTime->valueint;
|
||||
|
||||
cJSON *precision = cJSON_GetObjectItem(root, "precision");
|
||||
if (!precision || precision->type != cJSON_Number) {
|
||||
|
|
|
@ -0,0 +1,88 @@
|
|||
#!/bin/bash
|
||||
|
||||
DATA_DIR=/mnt/root/testdata
|
||||
NUM_LOOP=1
|
||||
NUM_OF_FILES=100
|
||||
OUT_FILE=cassandraWrite.out
|
||||
|
||||
rowsPerRequest=(1 10 50 100 500 1000 2000)
|
||||
|
||||
function printTo {
|
||||
if $verbose ; then
|
||||
echo $1
|
||||
fi
|
||||
}
|
||||
|
||||
function runTest {
|
||||
for c in `seq 1 $clients`; do
|
||||
avgRPR[$c]=0
|
||||
done
|
||||
|
||||
printf "R/R, "
|
||||
for c in `seq 1 $clients`; do
|
||||
if [ "$c" == "1" ]; then
|
||||
printf "$c client, "
|
||||
else
|
||||
printf "$c clients, "
|
||||
fi
|
||||
done
|
||||
printf "\n"
|
||||
|
||||
for r in ${rowsPerRequest[@]}; do
|
||||
printf "$r, "
|
||||
for c in `seq 1 $clients`; do
|
||||
totalRPR=0
|
||||
for i in `seq 1 $NUM_LOOP`; do
|
||||
printTo "loop i:$i, java -jar $CAS_TEST_DIR/cassandratest/target/cassandratest-1.0-SNAPSHOT-jar-with-dependencies.jar \
|
||||
-datadir $DATA_DIR \
|
||||
-numofFiles $NUM_OF_FILES \
|
||||
-rowsperrequest $r \
|
||||
-writeclients $c \
|
||||
-conf $CAS_TEST_DIR/application.conf"
|
||||
java -jar $CAS_TEST_DIR/cassandratest/target/cassandratest-1.0-SNAPSHOT-jar-with-dependencies.jar \
|
||||
-datadir $DATA_DIR \
|
||||
-numofFiles $NUM_OF_FILES \
|
||||
-rowsperrequest $r \
|
||||
-writeclients $c \
|
||||
-conf $CAS_TEST_DIR/application.conf \
|
||||
2>&1 > $OUT_FILE
|
||||
RPR=`cat $OUT_FILE | grep "insertation speed:" | awk '{print $(NF-1)}'`
|
||||
totalRPR=`echo "scale=4; $totalRPR + $RPR" | bc`
|
||||
printTo "rows:$r, clients:$c, i:$i RPR:$RPR"
|
||||
done
|
||||
avgRPR[$c]=`echo "scale=4; $totalRPR / $NUM_LOOP" | bc`
|
||||
done
|
||||
for c in `seq 1 $clients`; do
|
||||
printf "${avgRPR[$c]}, "
|
||||
done
|
||||
printf "\n"
|
||||
done
|
||||
}
|
||||
|
||||
################ Main ################
|
||||
|
||||
verbose=false
|
||||
clients=1
|
||||
|
||||
while : ; do
|
||||
case $1 in
|
||||
-v)
|
||||
verbose=true
|
||||
shift ;;
|
||||
|
||||
-c)
|
||||
clients=$2
|
||||
shift 2;;
|
||||
*)
|
||||
break ;;
|
||||
esac
|
||||
done
|
||||
|
||||
printTo "Cassandra Test begin.."
|
||||
|
||||
WORK_DIR=/mnt/root/TDengine
|
||||
CAS_TEST_DIR=$WORK_DIR/tests/comparisonTest/cassandra
|
||||
|
||||
runTest
|
||||
|
||||
printTo "Cassandra Test done!"
|
|
@ -162,6 +162,7 @@ python3 ./test.py -f client/client.py
|
|||
# Misc
|
||||
python3 testCompress.py
|
||||
python3 testNoCompress.py
|
||||
python3 testMinTablesPerVnode.py
|
||||
|
||||
# functions
|
||||
python3 ./test.py -f functions/function_avg.py
|
||||
|
@ -180,4 +181,4 @@ python3 ./test.py -f functions/function_spread.py
|
|||
python3 ./test.py -f functions/function_stddev.py
|
||||
python3 ./test.py -f functions/function_sum.py
|
||||
python3 ./test.py -f functions/function_top.py
|
||||
python3 ./test.py -f functions/function_twa.py
|
||||
python3 ./test.py -f functions/function_twa.py
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
EXEC_DIR=`dirname "$0"`
|
||||
if [[ $EXEC_DIR != "." ]]
|
||||
then
|
||||
echo "ERROR: Please execute `basename "$0"` in its own directory (for now anyway, pardon the dust)"
|
||||
exit -1
|
||||
fi
|
||||
CURR_DIR=`pwd`
|
||||
IN_TDINTERNAL="community"
|
||||
if [[ "$CURR_DIR" == *"$IN_TDINTERNAL"* ]]; then
|
||||
TAOS_DIR=$CURR_DIR/../../..
|
||||
else
|
||||
TAOS_DIR=$CURR_DIR/../..
|
||||
fi
|
||||
TAOSD_DIR=`find $TAOS_DIR -name "taosd"|grep bin|head -n1`
|
||||
LIB_DIR=`echo $TAOSD_DIR|rev|cut -d '/' -f 3,4,5,6|rev`/lib
|
||||
export PYTHONPATH=$(pwd)/../../src/connector/python/linux/python3
|
||||
export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$LIB_DIR
|
||||
|
||||
if [[ "$1" == *"test.py"* ]]; then
|
||||
python3 ./test.py $@
|
||||
else
|
||||
python3 $1 $@
|
||||
fi
|
|
@ -0,0 +1,131 @@
|
|||
#!/usr/bin/python
|
||||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
# install pip
|
||||
# pip install src/connector/python/linux/python2/
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
import sys
|
||||
import getopt
|
||||
import subprocess
|
||||
from distutils.log import warn as printf
|
||||
|
||||
from util.log import *
|
||||
from util.dnodes import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
|
||||
import taos
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
fileName = "all"
|
||||
deployPath = ""
|
||||
testCluster = False
|
||||
valgrind = 0
|
||||
logSql = True
|
||||
stop = 0
|
||||
opts, args = getopt.gnu_getopt(sys.argv[1:], 'l:sgh', [
|
||||
'logSql', 'stop', 'valgrind', 'help'])
|
||||
for key, value in opts:
|
||||
if key in ['-h', '--help']:
|
||||
tdLog.printNoPrefix(
|
||||
'A collection of test cases written using Python')
|
||||
tdLog.printNoPrefix('-l <True:False> logSql Flag')
|
||||
tdLog.printNoPrefix('-s stop All dnodes')
|
||||
tdLog.printNoPrefix('-g valgrind Test Flag')
|
||||
sys.exit(0)
|
||||
|
||||
if key in ['-l', '--logSql']:
|
||||
if (value.upper() == "TRUE"):
|
||||
logSql = True
|
||||
elif (value.upper() == "FALSE"):
|
||||
logSql = False
|
||||
else:
|
||||
tdLog.printNoPrefix("logSql value %s is invalid" % logSql)
|
||||
sys.exit(0)
|
||||
|
||||
if key in ['-g', '--valgrind']:
|
||||
valgrind = 1
|
||||
|
||||
if key in ['-s', '--stop']:
|
||||
stop = 1
|
||||
|
||||
if (stop != 0):
|
||||
if (valgrind == 0):
|
||||
toBeKilled = "taosd"
|
||||
else:
|
||||
toBeKilled = "valgrind.bin"
|
||||
|
||||
killCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}' | xargs kill -HUP > /dev/null 2>&1" % toBeKilled
|
||||
|
||||
psCmd = "ps -ef|grep -w %s| grep -v grep | awk '{print $2}'" % toBeKilled
|
||||
processID = subprocess.check_output(psCmd, shell=True)
|
||||
|
||||
while(processID):
|
||||
os.system(killCmd)
|
||||
time.sleep(1)
|
||||
processID = subprocess.check_output(psCmd, shell=True)
|
||||
|
||||
for port in range(6030, 6041):
|
||||
usePortPID = "lsof -i tcp:%d | grep LISTEn | awk '{print $2}'" % port
|
||||
processID = subprocess.check_output(usePortPID, shell=True)
|
||||
|
||||
if processID:
|
||||
killCmd = "kill -TERM %s" % processID
|
||||
os.system(killCmd)
|
||||
fuserCmd = "fuser -k -n tcp %d" % port
|
||||
os.system(fuserCmd)
|
||||
if valgrind:
|
||||
time.sleep(2)
|
||||
|
||||
tdLog.info('stop All dnodes')
|
||||
sys.exit(0)
|
||||
|
||||
tdDnodes.init(deployPath)
|
||||
tdDnodes.setTestCluster(testCluster)
|
||||
tdDnodes.setValgrind(valgrind)
|
||||
|
||||
tdDnodes.stopAll()
|
||||
tdDnodes.addSimExtraCfg("minTablesPerVnode", "100")
|
||||
tdDnodes.deploy(1)
|
||||
tdDnodes.start(1)
|
||||
|
||||
host = '127.0.0.1'
|
||||
|
||||
tdLog.info("Procedures for tdengine deployed in %s" % (host))
|
||||
|
||||
tdCases.logSql(logSql)
|
||||
|
||||
conn = taos.connect(
|
||||
host,
|
||||
config=tdDnodes.getSimCfgPath())
|
||||
|
||||
tdSql.init(conn.cursor(), True)
|
||||
|
||||
tdSql.execute("DROP DATABASE IF EXISTS db")
|
||||
tdSql.execute("CREATE DATABASE IF NOT EXISTS db")
|
||||
tdSql.execute("USE db")
|
||||
|
||||
for i in range(0, 100):
|
||||
tdSql.execute(
|
||||
"CREATE TABLE IF NOT EXISTS tb%d (ts TIMESTAMP, temperature INT, humidity FLOAT)" % i)
|
||||
|
||||
for i in range(1, 6):
|
||||
tdSql.execute("INSERT INTO tb99 values (now + %da, %d, %f)" % (i, i, i * 1.0))
|
||||
|
||||
tdSql.execute("DROP TABLE tb99")
|
||||
tdSql.execute(
|
||||
"CREATE TABLE IF NOT EXISTS tb99 (ts TIMESTAMP, temperature INT, humidity FLOAT)")
|
||||
tdSql.query("SELECT * FROM tb99")
|
||||
tdSql.checkRows(0)
|
||||
|
||||
conn.close()
|
Loading…
Reference in New Issue