DB option: new option cacheLastRow
This commit is contained in:
parent
d48ce512f0
commit
131af1718f
|
@ -94,6 +94,7 @@ extern int32_t tsFsyncPeriod;
|
||||||
extern int32_t tsReplications;
|
extern int32_t tsReplications;
|
||||||
extern int32_t tsQuorum;
|
extern int32_t tsQuorum;
|
||||||
extern int32_t tsUpdate;
|
extern int32_t tsUpdate;
|
||||||
|
extern int32_t tsCacheLastRow;
|
||||||
|
|
||||||
// balance
|
// balance
|
||||||
extern int32_t tsEnableBalance;
|
extern int32_t tsEnableBalance;
|
||||||
|
|
|
@ -127,6 +127,7 @@ int32_t tsFsyncPeriod = TSDB_DEFAULT_FSYNC_PERIOD;
|
||||||
int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
|
int32_t tsReplications = TSDB_DEFAULT_DB_REPLICA_OPTION;
|
||||||
int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION;
|
int32_t tsQuorum = TSDB_DEFAULT_DB_QUORUM_OPTION;
|
||||||
int32_t tsUpdate = TSDB_DEFAULT_DB_UPDATE_OPTION;
|
int32_t tsUpdate = TSDB_DEFAULT_DB_UPDATE_OPTION;
|
||||||
|
int32_t tsCacheLastRow = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
|
||||||
int32_t tsMaxVgroupsPerDb = 0;
|
int32_t tsMaxVgroupsPerDb = 0;
|
||||||
int32_t tsMinTablePerVnode = TSDB_TABLES_STEP;
|
int32_t tsMinTablePerVnode = TSDB_TABLES_STEP;
|
||||||
int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES;
|
int32_t tsMaxTablePerVnode = TSDB_DEFAULT_TABLES;
|
||||||
|
|
|
@ -141,6 +141,7 @@ static SCreateVnodeMsg* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
|
pCreate->cfg.maxRowsPerFileBlock = htonl(pCreate->cfg.maxRowsPerFileBlock);
|
||||||
pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod);
|
pCreate->cfg.fsyncPeriod = htonl(pCreate->cfg.fsyncPeriod);
|
||||||
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
|
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
|
||||||
|
pCreate->cfg.cacheLastRow = htonl(pCreate->cfg.cacheLastRow);
|
||||||
|
|
||||||
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
|
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
|
||||||
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
|
pCreate->nodes[j].nodeId = htonl(pCreate->nodes[j].nodeId);
|
||||||
|
|
|
@ -369,6 +369,10 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size, void* buf
|
||||||
#define TSDB_MAX_DB_UPDATE 1
|
#define TSDB_MAX_DB_UPDATE 1
|
||||||
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
|
#define TSDB_DEFAULT_DB_UPDATE_OPTION 0
|
||||||
|
|
||||||
|
#define TSDB_MIN_DB_CACHE_LAST_ROW 0
|
||||||
|
#define TSDB_MAX_DB_CACHE_LAST_ROW 1
|
||||||
|
#define TSDB_DEFAULT_CACHE_LAST_ROW 0
|
||||||
|
|
||||||
#define TSDB_MIN_FSYNC_PERIOD 0
|
#define TSDB_MIN_FSYNC_PERIOD 0
|
||||||
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
|
#define TSDB_MAX_FSYNC_PERIOD 180000 // millisecond
|
||||||
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
|
#define TSDB_DEFAULT_FSYNC_PERIOD 3000 // three second
|
||||||
|
|
|
@ -548,7 +548,8 @@ typedef struct {
|
||||||
int8_t quorum;
|
int8_t quorum;
|
||||||
int8_t ignoreExist;
|
int8_t ignoreExist;
|
||||||
int8_t update;
|
int8_t update;
|
||||||
int8_t reserve[9];
|
int8_t cacheLastRow;
|
||||||
|
int8_t reserve[8];
|
||||||
} SCreateDbMsg, SAlterDbMsg;
|
} SCreateDbMsg, SAlterDbMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -660,7 +661,8 @@ typedef struct {
|
||||||
int8_t wals;
|
int8_t wals;
|
||||||
int8_t quorum;
|
int8_t quorum;
|
||||||
int8_t update;
|
int8_t update;
|
||||||
int8_t reserved[15];
|
int8_t cacheLastRow;
|
||||||
|
int8_t reserved[14];
|
||||||
} SVnodeCfg;
|
} SVnodeCfg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -173,7 +173,8 @@ typedef struct {
|
||||||
int8_t replications;
|
int8_t replications;
|
||||||
int8_t quorum;
|
int8_t quorum;
|
||||||
int8_t update;
|
int8_t update;
|
||||||
int8_t reserved[11];
|
int8_t cacheLastRow;
|
||||||
|
int8_t reserved[10];
|
||||||
} SDbCfg;
|
} SDbCfg;
|
||||||
|
|
||||||
typedef struct SDbObj {
|
typedef struct SDbObj {
|
||||||
|
|
|
@ -322,6 +322,11 @@ static int32_t mnodeCheckDbCfg(SDbCfg *pCfg) {
|
||||||
return TSDB_CODE_MND_INVALID_DB_OPTION;
|
return TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pCfg->cacheLastRow < TSDB_MIN_DB_CACHE_LAST_ROW || pCfg->cacheLastRow > TSDB_MAX_DB_CACHE_LAST_ROW) {
|
||||||
|
mError("invalid db option cacheLastRow:%d valid range: [%d, %d]", pCfg->cacheLastRow, TSDB_MIN_DB_CACHE_LAST_ROW, TSDB_MAX_DB_CACHE_LAST_ROW);
|
||||||
|
return TSDB_CODE_MND_INVALID_DB_OPTION;
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -343,6 +348,7 @@ static void mnodeSetDefaultDbCfg(SDbCfg *pCfg) {
|
||||||
if (pCfg->replications < 0) pCfg->replications = tsReplications;
|
if (pCfg->replications < 0) pCfg->replications = tsReplications;
|
||||||
if (pCfg->quorum < 0) pCfg->quorum = tsQuorum;
|
if (pCfg->quorum < 0) pCfg->quorum = tsQuorum;
|
||||||
if (pCfg->update < 0) pCfg->update = tsUpdate;
|
if (pCfg->update < 0) pCfg->update = tsUpdate;
|
||||||
|
if (pCfg->cacheLastRow < 0) pCfg->cacheLastRow = tsCacheLastRow;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) {
|
static int32_t mnodeCreateDbCb(SMnodeMsg *pMsg, int32_t code) {
|
||||||
|
@ -396,7 +402,8 @@ static int32_t mnodeCreateDb(SAcctObj *pAcct, SCreateDbMsg *pCreate, SMnodeMsg *
|
||||||
.walLevel = pCreate->walLevel,
|
.walLevel = pCreate->walLevel,
|
||||||
.replications = pCreate->replications,
|
.replications = pCreate->replications,
|
||||||
.quorum = pCreate->quorum,
|
.quorum = pCreate->quorum,
|
||||||
.update = pCreate->update
|
.update = pCreate->update,
|
||||||
|
.cacheLastRow = pCreate->cacheLastRow
|
||||||
};
|
};
|
||||||
|
|
||||||
mnodeSetDefaultDbCfg(&pDb->cfg);
|
mnodeSetDefaultDbCfg(&pDb->cfg);
|
||||||
|
@ -750,6 +757,10 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
|
||||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
*(int8_t *)pWrite = pDb->cfg.compression;
|
*(int8_t *)pWrite = pDb->cfg.compression;
|
||||||
cols++;
|
cols++;
|
||||||
|
|
||||||
|
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||||
|
*(int8_t *)pWrite = pDb->cfg.cacheLastRow;
|
||||||
|
cols++;
|
||||||
#ifndef __CLOUD_VERSION__
|
#ifndef __CLOUD_VERSION__
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
@ -864,6 +875,7 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
|
||||||
int8_t quorum = pAlter->quorum;
|
int8_t quorum = pAlter->quorum;
|
||||||
int8_t precision = pAlter->precision;
|
int8_t precision = pAlter->precision;
|
||||||
int8_t update = pAlter->update;
|
int8_t update = pAlter->update;
|
||||||
|
int8_t cacheLastRow = pAlter->cacheLastRow;
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
@ -976,6 +988,11 @@ static SDbCfg mnodeGetAlterDbOption(SDbObj *pDb, SAlterDbMsg *pAlter) {
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (cacheLastRow >= 0 && cacheLastRow != pDb->cfg.cacheLastRow) {
|
||||||
|
mDebug("db:%s, cacheLastRow:%d change to %d", pDb->name, pDb->cfg.cacheLastRow, cacheLastRow);
|
||||||
|
newCfg.cacheLastRow = cacheLastRow;
|
||||||
|
}
|
||||||
|
|
||||||
return newCfg;
|
return newCfg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -859,6 +859,7 @@ static SCreateVnodeMsg *mnodeBuildVnodeMsg(SVgObj *pVgroup) {
|
||||||
pCfg->wals = 3;
|
pCfg->wals = 3;
|
||||||
pCfg->quorum = pDb->cfg.quorum;
|
pCfg->quorum = pDb->cfg.quorum;
|
||||||
pCfg->update = pDb->cfg.update;
|
pCfg->update = pDb->cfg.update;
|
||||||
|
pCfg->cacheLastRow = pDb->cfg.cacheLastRow;
|
||||||
|
|
||||||
SVnodeDesc *pNodes = pVnode->nodes;
|
SVnodeDesc *pNodes = pVnode->nodes;
|
||||||
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
|
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
|
||||||
|
|
|
@ -33,6 +33,7 @@ static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
|
||||||
pVnode->tsdbCfg.maxRowsPerFileBlock = vnodeMsg->cfg.maxRowsPerFileBlock;
|
pVnode->tsdbCfg.maxRowsPerFileBlock = vnodeMsg->cfg.maxRowsPerFileBlock;
|
||||||
pVnode->tsdbCfg.precision = vnodeMsg->cfg.precision;
|
pVnode->tsdbCfg.precision = vnodeMsg->cfg.precision;
|
||||||
pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression;
|
pVnode->tsdbCfg.compression = vnodeMsg->cfg.compression;
|
||||||
|
pVnode->tsdbCfg.cacheLastRow = vnodeMsg->cfg.cacheLastRow;
|
||||||
pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel;
|
pVnode->walCfg.walLevel = vnodeMsg->cfg.walLevel;
|
||||||
pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod;
|
pVnode->walCfg.fsyncPeriod = vnodeMsg->cfg.fsyncPeriod;
|
||||||
pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP;
|
pVnode->walCfg.keep = TAOS_WAL_NOT_KEEP;
|
||||||
|
@ -207,6 +208,13 @@ int32_t vnodeReadCfg(SVnodeObj *pVnode) {
|
||||||
}
|
}
|
||||||
vnodeMsg.cfg.quorum = (int8_t)quorum->valueint;
|
vnodeMsg.cfg.quorum = (int8_t)quorum->valueint;
|
||||||
|
|
||||||
|
cJSON *cacheLastRow = cJSON_GetObjectItem(root, "cacheLastRow");
|
||||||
|
if (!cacheLastRow || cacheLastRow->type != cJSON_Number) {
|
||||||
|
vError("vgId: %d, failed to read %s, cacheLastRow not found", pVnode->vgId, file);
|
||||||
|
goto PARSE_VCFG_ERROR;
|
||||||
|
}
|
||||||
|
vnodeMsg.cfg.cacheLastRow = (int8_t)cacheLastRow->valueint;
|
||||||
|
|
||||||
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
|
cJSON *nodeInfos = cJSON_GetObjectItem(root, "nodeInfos");
|
||||||
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
|
if (!nodeInfos || nodeInfos->type != cJSON_Array) {
|
||||||
vError("vgId:%d, failed to read %s, nodeInfos not found", pVnode->vgId, file);
|
vError("vgId:%d, failed to read %s, nodeInfos not found", pVnode->vgId, file);
|
||||||
|
@ -294,6 +302,7 @@ int32_t vnodeWriteCfg(SCreateVnodeMsg *pMsg) {
|
||||||
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.replications);
|
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pMsg->cfg.replications);
|
||||||
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals);
|
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pMsg->cfg.wals);
|
||||||
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum);
|
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pMsg->cfg.quorum);
|
||||||
|
len += snprintf(content + len, maxLen - len, " \"cacheLastRow\": %d,\n", pMsg->cfg.cacheLastRow);
|
||||||
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
|
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
|
||||||
for (int32_t i = 0; i < pMsg->cfg.replications; i++) {
|
for (int32_t i = 0; i < pMsg->cfg.replications; i++) {
|
||||||
SVnodeDesc *node = &pMsg->nodes[i];
|
SVnodeDesc *node = &pMsg->nodes[i];
|
||||||
|
|
|
@ -86,6 +86,7 @@ int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
|
||||||
tsdbCfg.precision = pVnodeCfg->cfg.precision;
|
tsdbCfg.precision = pVnodeCfg->cfg.precision;
|
||||||
tsdbCfg.compression = pVnodeCfg->cfg.compression;
|
tsdbCfg.compression = pVnodeCfg->cfg.compression;
|
||||||
tsdbCfg.update = pVnodeCfg->cfg.update;
|
tsdbCfg.update = pVnodeCfg->cfg.update;
|
||||||
|
tsdbCfg.cacheLastRow = pVnodeCfg->cfg.cacheLastRow;
|
||||||
|
|
||||||
char tsdbDir[TSDB_FILENAME_LEN] = {0};
|
char tsdbDir[TSDB_FILENAME_LEN] = {0};
|
||||||
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
|
||||||
|
|
Loading…
Reference in New Issue