refactor: let all operations of mnode into the sync log
This commit is contained in:
parent
7f3c60421a
commit
9beb5ebee3
|
@ -93,6 +93,7 @@ typedef struct SMnode {
|
||||||
int32_t selfId;
|
int32_t selfId;
|
||||||
int64_t clusterId;
|
int64_t clusterId;
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
|
bool deploy;
|
||||||
bool stopped;
|
bool stopped;
|
||||||
int8_t replica;
|
int8_t replica;
|
||||||
int8_t selfIndex;
|
int8_t selfIndex;
|
||||||
|
|
|
@ -90,13 +90,40 @@ static int32_t mndCreateDefaultDnode(SMnode *pMnode) {
|
||||||
dnodeObj.updateTime = dnodeObj.createdTime;
|
dnodeObj.updateTime = dnodeObj.createdTime;
|
||||||
dnodeObj.port = pMnode->replicas[0].port;
|
dnodeObj.port = pMnode->replicas[0].port;
|
||||||
memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN);
|
memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN);
|
||||||
|
snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port);
|
||||||
|
|
||||||
SSdbRaw *pRaw = mndDnodeActionEncode(&dnodeObj);
|
SSdbRaw *pRaw = mndDnodeActionEncode(&dnodeObj);
|
||||||
if (pRaw == NULL) return -1;
|
if (pRaw == NULL) return -1;
|
||||||
if (sdbSetRawStatus(pRaw, SDB_STATUS_READY) != 0) return -1;
|
if (sdbSetRawStatus(pRaw, SDB_STATUS_READY) != 0) return -1;
|
||||||
|
|
||||||
mDebug("dnode:%d, will be created while deploy sdb, raw:%p", dnodeObj.id, pRaw);
|
mDebug("dnode:%d, will be created while deploy sdb, raw:%p", dnodeObj.id, pRaw);
|
||||||
|
|
||||||
|
#if 0
|
||||||
return sdbWrite(pMnode->pSdb, pRaw);
|
return sdbWrite(pMnode->pSdb, pRaw);
|
||||||
|
#else
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_DNODE, NULL);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("dnode:%s, failed to create since %s", dnodeObj.ep, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
mDebug("trans:%d, used to create dnode:%s", pTrans->id, dnodeObj.ep);
|
||||||
|
|
||||||
|
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return 0;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
|
static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) {
|
||||||
|
@ -701,7 +728,7 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&pDnode->id, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pDnode->id, false);
|
||||||
|
|
||||||
char buf[tListLen(pDnode->ep) + VARSTR_HEADER_SIZE] = {0};
|
char buf[tListLen(pDnode->ep) + VARSTR_HEADER_SIZE] = {0};
|
||||||
STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
|
STR_WITH_MAXSIZE_TO_VARSTR(buf, pDnode->ep, pShow->pMeta->pSchemas[cols].bytes);
|
||||||
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, buf, false);
|
colDataAppend(pColInfo, numOfRows, buf, false);
|
||||||
|
|
|
@ -110,7 +110,33 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
|
||||||
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
mDebug("mnode:%d, will be created while deploy sdb, raw:%p", mnodeObj.id, pRaw);
|
mDebug("mnode:%d, will be created while deploy sdb, raw:%p", mnodeObj.id, pRaw);
|
||||||
|
|
||||||
|
#if 0
|
||||||
return sdbWrite(pMnode->pSdb, pRaw);
|
return sdbWrite(pMnode->pSdb, pRaw);
|
||||||
|
#else
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_DNODE, NULL);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("mnode:%d, failed to create since %s", mnodeObj.id, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
mDebug("trans:%d, used to create mnode:%d", pTrans->id, mnodeObj.id);
|
||||||
|
|
||||||
|
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return 0;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
|
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
|
||||||
|
|
|
@ -563,7 +563,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnType type, const S
|
||||||
pTrans->policy = policy;
|
pTrans->policy = policy;
|
||||||
pTrans->type = type;
|
pTrans->type = type;
|
||||||
pTrans->createdTime = taosGetTimestampMs();
|
pTrans->createdTime = taosGetTimestampMs();
|
||||||
pTrans->rpcInfo = pReq->info;
|
if (pReq != NULL) pTrans->rpcInfo = pReq->info;
|
||||||
pTrans->redoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
|
pTrans->redoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
|
||||||
pTrans->undoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
|
pTrans->undoLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
|
||||||
pTrans->commitLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
|
pTrans->commitLogs = taosArrayInit(TRANS_ARRAY_SIZE, sizeof(void *));
|
||||||
|
|
|
@ -78,7 +78,33 @@ static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char
|
||||||
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
mDebug("user:%s, will be created while deploy sdb, raw:%p", userObj.user, pRaw);
|
mDebug("user:%s, will be created while deploy sdb, raw:%p", userObj.user, pRaw);
|
||||||
|
|
||||||
|
#if 0
|
||||||
return sdbWrite(pMnode->pSdb, pRaw);
|
return sdbWrite(pMnode->pSdb, pRaw);
|
||||||
|
#else
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_USER, NULL);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("user:%s, failed to create since %s", userObj.user, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
mDebug("trans:%d, used to create user:%s", pTrans->id, userObj.user);
|
||||||
|
|
||||||
|
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return 0;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateDefaultUsers(SMnode *pMnode) {
|
static int32_t mndCreateDefaultUsers(SMnode *pMnode) {
|
||||||
|
|
|
@ -153,8 +153,14 @@ static int32_t mndInitSdb(SMnode *pMnode) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndDeploySdb(SMnode *pMnode) { return sdbDeploy(pMnode->pSdb); }
|
static int32_t mndOpenSdb(SMnode *pMnode) {
|
||||||
static int32_t mndReadSdb(SMnode *pMnode) { return sdbReadFile(pMnode->pSdb); }
|
if (!pMnode->deploy) {
|
||||||
|
return sdbReadFile(pMnode->pSdb);
|
||||||
|
} else {
|
||||||
|
// return sdbDeploy(pMnode->pSdb);;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void mndCleanupSdb(SMnode *pMnode) {
|
static void mndCleanupSdb(SMnode *pMnode) {
|
||||||
if (pMnode->pSdb) {
|
if (pMnode->pSdb) {
|
||||||
|
@ -176,7 +182,7 @@ static int32_t mndAllocStep(SMnode *pMnode, char *name, MndInitFp initFp, MndCle
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndInitSteps(SMnode *pMnode, bool deploy) {
|
static int32_t mndInitSteps(SMnode *pMnode) {
|
||||||
if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-sdb", mndInitSdb, mndCleanupSdb) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-trans", mndInitTrans, mndCleanupTrans) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-cluster", mndInitCluster, mndCleanupCluster) != 0) return -1;
|
||||||
|
@ -201,11 +207,7 @@ static int32_t mndInitSteps(SMnode *pMnode, bool deploy) {
|
||||||
if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-perfs", mndInitPerfs, mndCleanupPerfs) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1;
|
||||||
if (deploy) {
|
if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-sdb-deploy", mndDeploySdb, NULL) != 0) return -1;
|
|
||||||
} else {
|
|
||||||
if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
|
|
||||||
}
|
|
||||||
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
|
||||||
if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
|
if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
|
||||||
|
@ -280,6 +282,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
|
||||||
(void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
|
(void)taosParseTime(timestr, &pMnode->checkTime, (int32_t)strlen(timestr), TSDB_TIME_PRECISION_MILLI, 0);
|
||||||
mndSetOptions(pMnode, pOption);
|
mndSetOptions(pMnode, pOption);
|
||||||
|
|
||||||
|
pMnode->deploy = pOption->deploy;
|
||||||
pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
|
pMnode->pSteps = taosArrayInit(24, sizeof(SMnodeStep));
|
||||||
if (pMnode->pSteps == NULL) {
|
if (pMnode->pSteps == NULL) {
|
||||||
taosMemoryFree(pMnode);
|
taosMemoryFree(pMnode);
|
||||||
|
@ -297,7 +300,7 @@ SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = mndInitSteps(pMnode, pOption->deploy);
|
code = mndInitSteps(pMnode);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
code = terrno;
|
code = terrno;
|
||||||
mError("failed to open mnode since %s", terrstr());
|
mError("failed to open mnode since %s", terrstr());
|
||||||
|
@ -332,6 +335,9 @@ void mndClose(SMnode *pMnode) {
|
||||||
|
|
||||||
int32_t mndStart(SMnode *pMnode) {
|
int32_t mndStart(SMnode *pMnode) {
|
||||||
mndSyncStart(pMnode);
|
mndSyncStart(pMnode);
|
||||||
|
if (pMnode->deploy && sdbDeploy(pMnode->pSdb) != 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
return mndInitTimer(pMnode);
|
return mndInitTimer(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue