Merge pull request #1636 from taosdata/feature/mpeer
old wal of sdb is deleted after recovery
This commit is contained in:
commit
d41b230819
|
@ -23,8 +23,6 @@ extern "C" {
|
||||||
int32_t dnodeInitMClient();
|
int32_t dnodeInitMClient();
|
||||||
void dnodeCleanupMClient();
|
void dnodeCleanupMClient();
|
||||||
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
|
void dnodeSendMsgToMnode(SRpcMsg *rpcMsg);
|
||||||
void * dnodeGetMnodeList();
|
|
||||||
int32_t dnodeGetDnodeId();
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,11 +22,11 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "twal.h"
|
#include "twal.h"
|
||||||
|
#include "vnode.h"
|
||||||
#include "dnodeMClient.h"
|
#include "dnodeMClient.h"
|
||||||
#include "dnodeMgmt.h"
|
#include "dnodeMgmt.h"
|
||||||
#include "dnodeRead.h"
|
#include "dnodeRead.h"
|
||||||
#include "dnodeWrite.h"
|
#include "dnodeWrite.h"
|
||||||
#include "vnode.h"
|
|
||||||
|
|
||||||
static int32_t dnodeOpenVnodes();
|
static int32_t dnodeOpenVnodes();
|
||||||
static void dnodeCloseVnodes();
|
static void dnodeCloseVnodes();
|
||||||
|
|
|
@ -118,7 +118,7 @@ void dnodeProcessModuleStatus(uint32_t moduleStatus) {
|
||||||
dPrint("module status is received, start mgmt module", tsModuleStatus, moduleStatus);
|
dPrint("module status is received, start mgmt module", tsModuleStatus, moduleStatus);
|
||||||
tsModule[TSDB_MOD_MGMT].enable = true;
|
tsModule[TSDB_MOD_MGMT].enable = true;
|
||||||
dnodeSetModuleStatus(TSDB_MOD_MGMT);
|
dnodeSetModuleStatus(TSDB_MOD_MGMT);
|
||||||
(*tsModule[TSDB_MOD_MGMT].stopFp)();
|
(*tsModule[TSDB_MOD_MGMT].startFp)();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tsModule[TSDB_MOD_MGMT].enable && !enableMgmtModule) {
|
if (tsModule[TSDB_MOD_MGMT].enable && !enableMgmtModule) {
|
||||||
|
|
|
@ -43,6 +43,8 @@ void dnodeSendRpcWriteRsp(void *pVnode, void *param, int32_t code);
|
||||||
|
|
||||||
bool dnodeIsFirstDeploy();
|
bool dnodeIsFirstDeploy();
|
||||||
uint32_t dnodeGetMnodeMasteIp();
|
uint32_t dnodeGetMnodeMasteIp();
|
||||||
|
void * dnodeGetMnodeList();
|
||||||
|
int32_t dnodeGetDnodeId();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -584,6 +584,7 @@ typedef struct {
|
||||||
char dnodeName[TSDB_NODE_NAME_LEN + 1];
|
char dnodeName[TSDB_NODE_NAME_LEN + 1];
|
||||||
uint32_t privateIp;
|
uint32_t privateIp;
|
||||||
uint32_t publicIp;
|
uint32_t publicIp;
|
||||||
|
uint32_t moduleStatus;
|
||||||
uint32_t lastReboot; // time stamp for last reboot
|
uint32_t lastReboot; // time stamp for last reboot
|
||||||
uint16_t numOfTotalVnodes; // from config file
|
uint16_t numOfTotalVnodes; // from config file
|
||||||
uint16_t openVnodes;
|
uint16_t openVnodes;
|
||||||
|
|
|
@ -31,7 +31,6 @@ int32_t mgmtInitDnodes();
|
||||||
void mgmtCleanupDnodes();
|
void mgmtCleanupDnodes();
|
||||||
|
|
||||||
char* mgmtGetDnodeStatusStr(int32_t dnodeStatus);
|
char* mgmtGetDnodeStatusStr(int32_t dnodeStatus);
|
||||||
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType);
|
|
||||||
void mgmtMonitorDnodeModule();
|
void mgmtMonitorDnodeModule();
|
||||||
|
|
||||||
int32_t mgmtGetDnodesNum();
|
int32_t mgmtGetDnodesNum();
|
||||||
|
|
|
@ -129,7 +129,7 @@ static int32_t mgmtDnodeActionDecode(SSdbOperDesc *pOper) {
|
||||||
|
|
||||||
static int32_t mgmtDnodeActionRestored() {
|
static int32_t mgmtDnodeActionRestored() {
|
||||||
int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb);
|
int32_t numOfRows = sdbGetNumOfRows(tsDnodeSdb);
|
||||||
if (numOfRows <= 0 && strcmp(tsMasterIp, tsPrivateIp) == 0) {
|
if (numOfRows <= 0 && dnodeIsFirstDeploy()) {
|
||||||
uint32_t ip = inet_addr(tsPrivateIp);
|
uint32_t ip = inet_addr(tsPrivateIp);
|
||||||
mgmtCreateDnode(ip);
|
mgmtCreateDnode(ip);
|
||||||
SDnodeObj *pDnode = mgmtGetDnodeByIp(ip);
|
SDnodeObj *pDnode = mgmtGetDnodeByIp(ip);
|
||||||
|
@ -276,6 +276,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
|
||||||
pStatus->dnodeId = htonl(pStatus->dnodeId);
|
pStatus->dnodeId = htonl(pStatus->dnodeId);
|
||||||
pStatus->privateIp = htonl(pStatus->privateIp);
|
pStatus->privateIp = htonl(pStatus->privateIp);
|
||||||
pStatus->publicIp = htonl(pStatus->publicIp);
|
pStatus->publicIp = htonl(pStatus->publicIp);
|
||||||
|
pStatus->moduleStatus = htonl(pStatus->moduleStatus);
|
||||||
pStatus->lastReboot = htonl(pStatus->lastReboot);
|
pStatus->lastReboot = htonl(pStatus->lastReboot);
|
||||||
pStatus->numOfCores = htons(pStatus->numOfCores);
|
pStatus->numOfCores = htons(pStatus->numOfCores);
|
||||||
pStatus->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
|
pStatus->numOfTotalVnodes = htons(pStatus->numOfTotalVnodes);
|
||||||
|
@ -311,6 +312,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
|
||||||
pDnode->diskAvailable = pStatus->diskAvailable;
|
pDnode->diskAvailable = pStatus->diskAvailable;
|
||||||
pDnode->alternativeRole = pStatus->alternativeRole;
|
pDnode->alternativeRole = pStatus->alternativeRole;
|
||||||
pDnode->totalVnodes = pStatus->numOfTotalVnodes;
|
pDnode->totalVnodes = pStatus->numOfTotalVnodes;
|
||||||
|
pDnode->moduleStatus = pStatus->moduleStatus;
|
||||||
|
|
||||||
if (pStatus->dnodeId == 0) {
|
if (pStatus->dnodeId == 0) {
|
||||||
mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName);
|
mTrace("dnode:%d, first access, privateIp:%s, name:%s", pDnode->dnodeId, taosIpStr(pDnode->privateIp), pDnode->dnodeName);
|
||||||
|
@ -353,7 +355,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
|
||||||
mgmtGetMnodeList(&pRsp->mnodes);
|
mgmtGetMnodeList(&pRsp->mnodes);
|
||||||
|
|
||||||
pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId);
|
pRsp->dnodeState.dnodeId = htonl(pDnode->dnodeId);
|
||||||
pRsp->dnodeState.moduleStatus = htonl(pDnode->moduleStatus);
|
pRsp->dnodeState.moduleStatus = htonl((int32_t)pDnode->isMgmt);
|
||||||
pRsp->dnodeState.createdTime = htonl(pDnode->createdTime / 1000);
|
pRsp->dnodeState.createdTime = htonl(pDnode->createdTime / 1000);
|
||||||
pRsp->dnodeState.numOfVnodes = 0;
|
pRsp->dnodeState.numOfVnodes = 0;
|
||||||
|
|
||||||
|
@ -391,10 +393,6 @@ static int32_t mgmtCreateDnode(uint32_t ip) {
|
||||||
pDnode->totalVnodes = TSDB_INVALID_VNODE_NUM;
|
pDnode->totalVnodes = TSDB_INVALID_VNODE_NUM;
|
||||||
sprintf(pDnode->dnodeName, "n%d", sdbGetId(tsDnodeSdb) + 1);
|
sprintf(pDnode->dnodeName, "n%d", sdbGetId(tsDnodeSdb) + 1);
|
||||||
|
|
||||||
if (pDnode->privateIp == inet_addr(tsMasterIp)) {
|
|
||||||
pDnode->moduleStatus |= (1 << TSDB_MOD_MGMT);
|
|
||||||
}
|
|
||||||
|
|
||||||
SSdbOperDesc oper = {
|
SSdbOperDesc oper = {
|
||||||
.type = SDB_OPER_GLOBAL,
|
.type = SDB_OPER_GLOBAL,
|
||||||
.table = tsDnodeSdb,
|
.table = tsDnodeSdb,
|
||||||
|
@ -620,7 +618,7 @@ static int32_t mgmtRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, voi
|
||||||
return numOfRows;
|
return numOfRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
|
static bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int32_t moduleType) {
|
||||||
uint32_t status = pDnode->moduleStatus & (1 << moduleType);
|
uint32_t status = pDnode->moduleStatus & (1 << moduleType);
|
||||||
return status > 0;
|
return status > 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,6 +20,7 @@
|
||||||
#include "treplica.h"
|
#include "treplica.h"
|
||||||
#include "tgrant.h"
|
#include "tgrant.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
|
#include "dnode.h"
|
||||||
#include "mgmtDef.h"
|
#include "mgmtDef.h"
|
||||||
#include "mgmtLog.h"
|
#include "mgmtLog.h"
|
||||||
#include "mgmtAcct.h"
|
#include "mgmtAcct.h"
|
||||||
|
@ -100,6 +101,10 @@ int32_t mgmtStartSystem() {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (replicaInit() < 0) {
|
||||||
|
mError("failed to init replica")
|
||||||
|
}
|
||||||
|
|
||||||
if (mgmtInitDClient() < 0) {
|
if (mgmtInitDClient() < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -108,10 +113,6 @@ int32_t mgmtStartSystem() {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (replicaInit() < 0) {
|
|
||||||
mError("failed to init dnode balance")
|
|
||||||
}
|
|
||||||
|
|
||||||
grantReset(TSDB_GRANT_ALL, 0);
|
grantReset(TSDB_GRANT_ALL, 0);
|
||||||
tsMgmtIsRunning = true;
|
tsMgmtIsRunning = true;
|
||||||
|
|
||||||
|
|
|
@ -55,6 +55,12 @@ static int32_t mgmtMnodeActionInsert(SSdbOperDesc *pOper) {
|
||||||
|
|
||||||
static int32_t mgmtMnodeActionDelete(SSdbOperDesc *pOper) {
|
static int32_t mgmtMnodeActionDelete(SSdbOperDesc *pOper) {
|
||||||
SMnodeObj *pMnode = pOper->pObj;
|
SMnodeObj *pMnode = pOper->pObj;
|
||||||
|
|
||||||
|
SDnodeObj *pDnode = mgmtGetDnode(pMnode->mnodeId);
|
||||||
|
if (pDnode == NULL) return TSDB_CODE_DNODE_NOT_EXIST;
|
||||||
|
pDnode->isMgmt = false;
|
||||||
|
mgmtReleaseDnode(pDnode);
|
||||||
|
|
||||||
mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId);
|
mTrace("mnode:%d, is dropped from sdb", pMnode->mnodeId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -69,10 +69,16 @@ static void *(*sdbGetIndexFp[])(void *handle, void *key) = {sdbGetStrHashData, s
|
||||||
static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash};
|
static void (*sdbCleanUpIndexFp[])(void *handle) = {sdbCloseStrHash, sdbCloseIntHash, sdbCloseIntHash};
|
||||||
static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData};
|
static void *(*sdbFetchRowFp[])(void *handle, void *ptr, void **ppRow) = {sdbFetchStrHashData, sdbFetchIntHashData, sdbFetchIntHashData};
|
||||||
|
|
||||||
uint64_t sdbGetVersion() { return tsSdbObj->version; }
|
|
||||||
int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; }
|
int32_t sdbGetId(void *handle) { return ((SSdbTable *)handle)->autoIndex; }
|
||||||
int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; }
|
int64_t sdbGetNumOfRows(void *handle) { return ((SSdbTable *)handle)->numOfRows; }
|
||||||
|
|
||||||
|
uint64_t sdbGetVersion() {
|
||||||
|
if (tsSdbObj)
|
||||||
|
return tsSdbObj->version;
|
||||||
|
else
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static char *sdbGetActionStr(int32_t action) {
|
static char *sdbGetActionStr(int32_t action) {
|
||||||
switch (action) {
|
switch (action) {
|
||||||
case SDB_ACTION_INSERT:
|
case SDB_ACTION_INSERT:
|
||||||
|
@ -147,10 +153,6 @@ void sdbCleanUp() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbObject *sdbGetObj() {
|
|
||||||
return tsSdbObj;
|
|
||||||
}
|
|
||||||
|
|
||||||
void sdbIncRef(void *handle, void *pRow) {
|
void sdbIncRef(void *handle, void *pRow) {
|
||||||
if (pRow) {
|
if (pRow) {
|
||||||
SSdbTable *pTable = handle;
|
SSdbTable *pTable = handle;
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
#include "dnode.h"
|
||||||
#include "mgmtDef.h"
|
#include "mgmtDef.h"
|
||||||
#include "mgmtLog.h"
|
#include "mgmtLog.h"
|
||||||
#include "mgmtAcct.h"
|
#include "mgmtAcct.h"
|
||||||
|
@ -93,7 +94,7 @@ static int32_t mgmtUserActionDecode(SSdbOperDesc *pOper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mgmtUserActionRestored() {
|
static int32_t mgmtUserActionRestored() {
|
||||||
if (strcmp(tsMasterIp, tsPrivateIp) == 0) {
|
if (dnodeIsFirstDeploy()) {
|
||||||
SAcctObj *pAcct = mgmtGetAcct("root");
|
SAcctObj *pAcct = mgmtGetAcct("root");
|
||||||
mgmtCreateUser(pAcct, "root", "taosdata");
|
mgmtCreateUser(pAcct, "root", "taosdata");
|
||||||
mgmtCreateUser(pAcct, "monitor", tsInternalPass);
|
mgmtCreateUser(pAcct, "monitor", tsInternalPass);
|
||||||
|
|
|
@ -110,12 +110,7 @@ short tsDaysPerFile = 10;
|
||||||
int tsDaysToKeep = 3650;
|
int tsDaysToKeep = 3650;
|
||||||
int tsReplications = TSDB_REPLICA_MIN_NUM;
|
int tsReplications = TSDB_REPLICA_MIN_NUM;
|
||||||
|
|
||||||
#ifdef _MPEER
|
|
||||||
int tsNumOfMPeers = 3;
|
int tsNumOfMPeers = 3;
|
||||||
#else
|
|
||||||
int tsNumOfMPeers = 1;
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int tsMaxShellConns = 2000;
|
int tsMaxShellConns = 2000;
|
||||||
int tsMaxTables = 100000;
|
int tsMaxTables = 100000;
|
||||||
|
|
||||||
|
@ -556,7 +551,7 @@ static void doInitGlobalConfig() {
|
||||||
tsInitConfigOption(cfg++, "tblocks", &tsNumOfBlocksPerMeter, TSDB_CFG_VTYPE_SHORT,
|
tsInitConfigOption(cfg++, "tblocks", &tsNumOfBlocksPerMeter, TSDB_CFG_VTYPE_SHORT,
|
||||||
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
|
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
|
||||||
32, 4096, 0, TSDB_CFG_UTYPE_NONE);
|
32, 4096, 0, TSDB_CFG_UTYPE_NONE);
|
||||||
#ifdef _MPEER
|
#ifdef _SYNC
|
||||||
tsInitConfigOption(cfg++, "numOfMPeers", &tsNumOfMPeers, TSDB_CFG_VTYPE_INT,
|
tsInitConfigOption(cfg++, "numOfMPeers", &tsNumOfMPeers, TSDB_CFG_VTYPE_INT,
|
||||||
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLUSTER,
|
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW | TSDB_CFG_CTYPE_B_CLUSTER,
|
||||||
1, 3, 0, TSDB_CFG_UTYPE_NONE);
|
1, 3, 0, TSDB_CFG_UTYPE_NONE);
|
||||||
|
|
|
@ -52,6 +52,7 @@ static uint32_t walSignature = 0xFAFBFDFE;
|
||||||
static int walHandleExistingFiles(const char *path);
|
static int walHandleExistingFiles(const char *path);
|
||||||
static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp);
|
static int walRestoreWalFile(const char *name, void *pVnode, FWalWrite writeFp);
|
||||||
static int walRemoveWalFiles(const char *path);
|
static int walRemoveWalFiles(const char *path);
|
||||||
|
static int walMoveOldWalFilesBack(const char *path);
|
||||||
|
|
||||||
void *walOpen(const char *path, const SWalCfg *pCfg) {
|
void *walOpen(const char *path, const SWalCfg *pCfg) {
|
||||||
SWal *pWal = calloc(sizeof(SWal), 1);
|
SWal *pWal = calloc(sizeof(SWal), 1);
|
||||||
|
@ -213,7 +214,11 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
code = walRemoveWalFiles(opath);
|
if (pWal->keep) {
|
||||||
|
code = walMoveOldWalFilesBack(pWal->path);
|
||||||
|
} else {
|
||||||
|
code = walRemoveWalFiles(opath);
|
||||||
|
}
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
if (remove(opath) < 0) {
|
if (remove(opath) < 0) {
|
||||||
wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
|
wError("wal:%s, failed to remove directory(%s)", opath, strerror(errno));
|
||||||
|
@ -365,4 +370,40 @@ static int walRemoveWalFiles(const char *path) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int walMoveOldWalFilesBack(const char *path) {
|
||||||
|
char oname[TSDB_FILENAME_LEN * 3];
|
||||||
|
char nname[TSDB_FILENAME_LEN * 3];
|
||||||
|
char opath[TSDB_FILENAME_LEN];
|
||||||
|
struct dirent *ent;
|
||||||
|
int plen = strlen(walPrefix);
|
||||||
|
int code = 0;
|
||||||
|
|
||||||
|
sprintf(opath, "%s/old", path);
|
||||||
|
|
||||||
|
if (access(opath, F_OK) == 0) {
|
||||||
|
// move all old files to wal directory
|
||||||
|
int count = 0;
|
||||||
|
|
||||||
|
DIR *dir = opendir(opath);
|
||||||
|
while ((ent = readdir(dir))!= NULL) {
|
||||||
|
if ( strncmp(ent->d_name, walPrefix, plen) == 0) {
|
||||||
|
sprintf(oname, "%s/%s", opath, ent->d_name);
|
||||||
|
sprintf(nname, "%s/%s", path, ent->d_name);
|
||||||
|
if (rename(oname, nname) < 0) {
|
||||||
|
wError("wal:%s, failed to move to new:%s", oname, nname);
|
||||||
|
code = -1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
wTrace("wal:%s, %d old files are move back for keep option is set", path, count);
|
||||||
|
closedir(dir);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue