diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 3648af5fcb..8bd3db0217 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -38,6 +38,15 @@ extern "C" { dataPos += sizeof(int32_t); \ } +#define SDB_GET_INT16(pData, pRow, dataPos, val) \ + { \ + if (sdbGetRawInt16(pRaw, dataPos, val) != 0) { \ + sdbFreeRow(pRow); \ + return NULL; \ + } \ + dataPos += sizeof(int16_t); \ + } + #define SDB_GET_INT8(pData, pRow, dataPos, val) \ { \ if (sdbGetRawInt8(pRaw, dataPos, val) != 0) { \ @@ -74,6 +83,15 @@ extern "C" { dataPos += sizeof(int32_t); \ } +#define SDB_SET_INT16(pRaw, dataPos, val) \ + { \ + if (sdbSetRawInt16(pRaw, dataPos, val) != 0) { \ + sdbFreeRaw(pRaw); \ + return NULL; \ + } \ + dataPos += sizeof(int16_t); \ + } + #define SDB_SET_INT8(pRaw, dataPos, val) \ { \ if (sdbSetRawInt8(pRaw, dataPos, val) != 0) { \ @@ -100,6 +118,7 @@ extern "C" { } \ } +typedef struct SMnode SMnode; typedef struct SSdbRaw SSdbRaw; typedef struct SSdbRow SSdbRow; typedef enum { SDB_KEY_BINARY = 1, SDB_KEY_INT32 = 2, SDB_KEY_INT64 = 3 } EKeyType; @@ -130,7 +149,7 @@ typedef struct SSdb SSdb; typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj); typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj); typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj); -typedef int32_t (*SdbDeployFp)(SSdb *pSdb); +typedef int32_t (*SdbDeployFp)(SMnode *pMnode); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); @@ -190,6 +209,12 @@ typedef struct SSdbOpt { * */ const char *path; + + /** + * @brief The mnode object. + * + */ + SMnode *pMnode; } SSdbOpt; /** @@ -291,12 +316,14 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type); SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen); void sdbFreeRaw(SSdbRaw *pRaw); int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val); +int32_t sdbSetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t val); int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val); int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val); int32_t sdbSetRawBinary(SSdbRaw *pRaw, int32_t dataPos, const char *pVal, int32_t valLen); int32_t sdbSetRawDataLen(SSdbRaw *pRaw, int32_t dataLen); int32_t sdbSetRawStatus(SSdbRaw *pRaw, ESdbStatus status); int32_t sdbGetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t *val); +int32_t sdbGetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t *val); int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val); int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val); int32_t sdbGetRawBinary(SSdbRaw *pRaw, int32_t dataPos, char *pVal, int32_t valLen); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 9b8c6eccc8..8bd3bd3d34 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -78,6 +78,38 @@ typedef enum { typedef enum { TRN_POLICY_ROLLBACK = 1, TRN_POLICY_RETRY = 2 } ETrnPolicy; +typedef enum { + DND_STATUS_OFFLINE = 0, + DND_STATUS_READY = 1, + DND_STATUS_CREATING = 2, + DND_STATUS_DROPPING = 3 +} EDndStatus; + +typedef enum { + DND_REASON_ONLINE = 0, + DND_REASON_STATUS_MSG_TIMEOUT, + DND_REASON_STATUS_NOT_RECEIVED, + DND_REASON_RESET_BY_MNODE, + DND_REASON_VERSION_NOT_MATCH, + DND_REASON_DNODE_ID_NOT_MATCH, + DND_REASON_CLUSTER_ID_NOT_MATCH, + DND_REASON_NUM_OF_MNODES_NOT_MATCH, + DND_REASON_ENABLE_BALANCE_NOT_MATCH, + DND_REASON_MN_EQUAL_VN_NOT_MATCH, + DND_REASON_OFFLINE_THRESHOLD_NOT_MATCH, + DND_REASON_STATUS_INTERVAL_NOT_MATCH, + DND_REASON_MAX_TAB_PER_VN_NOT_MATCH, + DND_REASON_MAX_VG_PER_DB_NOT_MATCH, + DND_REASON_ARBITRATOR_NOT_MATCH, + DND_REASON_TIME_ZONE_NOT_MATCH, + DND_REASON_LOCALE_NOT_MATCH, + DND_REASON_CHARSET_NOT_MATCH, + DND_REASON_FLOW_CTRL_NOT_MATCH, + DND_REASON_SLAVE_QUERY_NOT_MATCH, + DND_REASON_ADJUST_MASTER_NOT_MATCH, + DND_REASON_OTHERS +} EDndReason; + typedef struct STrans { int32_t id; ETrnStage stage; @@ -99,29 +131,32 @@ typedef struct SClusterObj { } SClusterObj; typedef struct SDnodeObj { - int32_t id; - int32_t vnodes; - int64_t createdTime; - int64_t updateTime; - int64_t lastAccess; - int64_t rebootTime; // time stamp for last reboot - char fqdn[TSDB_FQDN_LEN]; - char ep[TSDB_EP_LEN]; - uint16_t port; - int16_t numOfCores; // from dnode status msg - int8_t alternativeRole; // from dnode status msg, 0-any, 1-mgmt, 2-dnode - int8_t status; // set in balance function - int8_t offlineReason; + int32_t id; + int64_t createdTime; + int64_t updateTime; + int64_t rebootTime; + int64_t lastAccessTime; + int16_t numOfMnodes; + int16_t numOfVnodes; + int16_t numOfQnodes; + int16_t numOfSupportMnodes; + int16_t numOfSupportVnodes; + int16_t numOfSupportQnodes; + EDndStatus status; + EDndReason offlineReason; + uint16_t port; + char fqdn[TSDB_FQDN_LEN]; + char ep[TSDB_EP_LEN]; } SDnodeObj; typedef struct SMnodeObj { int32_t id; + int64_t createdTime; + int64_t updateTime; int8_t status; int8_t role; int32_t roleTerm; int64_t roleTime; - int64_t createdTime; - int64_t updateTime; SDnodeObj *pDnode; } SMnodeObj; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 745c0a0447..9559a96255 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_TRANSACTION_INT_H_ -#define _TD_TRANSACTION_INT_H_ +#ifndef _TD_MND_TRANS_H_ +#define _TD_MND_TRANS_H_ #include "mndInt.h" @@ -44,4 +44,4 @@ SSdbRow *mndTransActionDecode(SSdbRaw *pRaw); } #endif -#endif /*_TD_TRANSACTION_INT_H_*/ +#endif /*_TD_MND_TRANS_H_*/ diff --git a/source/dnode/mnode/impl/src/mndAcct.c b/source/dnode/mnode/impl/src/mndAcct.c index dd91de1c95..73b0167422 100644 --- a/source/dnode/mnode/impl/src/mndAcct.c +++ b/source/dnode/mnode/impl/src/mndAcct.c @@ -79,7 +79,7 @@ static int32_t mnodeAcctActionUpdate(SSdb *pSdb, SAcctObj *pSrcAcct, SAcctObj *p return 0; } -static int32_t mnodeCreateDefaultAcct(SSdb *pSdb) { +static int32_t mnodeCreateDefaultAcct(SMnode *pMnode) { int32_t code = 0; SAcctObj acctObj = {0}; @@ -98,7 +98,7 @@ static int32_t mnodeCreateDefaultAcct(SSdb *pSdb) { if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - return sdbWrite(pSdb, pRaw); + return sdbWrite(pMnode->pSdb, pRaw); } int32_t mndInitAcct(SMnode *pMnode) { diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 11f3dc1ee9..d9e13178cc 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -14,8 +14,115 @@ */ #define _DEFAULT_SOURCE -#include "os.h" -#include "mndInt.h" +#include "mndTrans.h" -int32_t mndInitDnode(SMnode *pMnode) { return 0; } -void mndCleanupDnode(SMnode *pMnode) {} \ No newline at end of file +#define SDB_DNODE_VER 1 + +static SSdbRaw *mndDnodeActionEncode(SDnodeObj *pDnode) { + SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_DNODE_VER, sizeof(SDnodeObj)); + if (pRaw == NULL) return NULL; + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, pDnode->id); + SDB_SET_INT64(pRaw, dataPos, pDnode->createdTime) + SDB_SET_INT64(pRaw, dataPos, pDnode->updateTime) + SDB_SET_INT16(pRaw, dataPos, pDnode->port) + SDB_SET_BINARY(pRaw, dataPos, pDnode->fqdn, TSDB_FQDN_LEN) + SDB_SET_DATALEN(pRaw, dataPos); + + return pRaw; +} + +static SSdbRow *mndDnodeActionDecode(SSdbRaw *pRaw) { + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; + + if (sver != SDB_DNODE_VER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + mError("failed to decode dnode since %s", terrstr()); + return NULL; + } + + SSdbRow *pRow = sdbAllocRow(sizeof(SDnodeObj)); + SDnodeObj *pDnode = sdbGetRowObj(pRow); + if (pDnode == NULL) return NULL; + + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, pRow, dataPos, &pDnode->id) + SDB_GET_INT64(pRaw, pRow, dataPos, &pDnode->createdTime) + SDB_GET_INT64(pRaw, pRow, dataPos, &pDnode->updateTime) + SDB_GET_INT16(pRaw, pRow, dataPos, &pDnode->port) + SDB_GET_BINARY(pRaw, pRow, dataPos, pDnode->fqdn, TSDB_FQDN_LEN) + + return pRow; +} + +static void mnodeResetDnode(SDnodeObj *pDnode) { + pDnode->rebootTime = 0; + pDnode->lastAccessTime = 0; + pDnode->numOfMnodes = 0; + pDnode->numOfVnodes = 0; + pDnode->numOfQnodes = 0; + pDnode->numOfSupportMnodes = 0; + pDnode->numOfSupportVnodes = 0; + pDnode->numOfSupportQnodes = 0; + pDnode->status = DND_STATUS_OFFLINE; + pDnode->offlineReason = DND_REASON_RESET_BY_MNODE; + snprintf(pDnode->ep, TSDB_EP_LEN, "%s:%u", pDnode->fqdn, pDnode->port); +} + +static int32_t mndDnodeActionInsert(SSdb *pSdb, SDnodeObj *pDnode) { + mnodeResetDnode(pDnode); + return 0; +} + +static int32_t mndDnodeActionDelete(SSdb *pSdb, SDnodeObj *pDnode) { return 0; } + +static int32_t mndDnodeActionUpdate(SSdb *pSdb, SDnodeObj *pSrcDnode, SDnodeObj *pDstDnode) { + pSrcDnode->id = pDstDnode->id; + pSrcDnode->createdTime = pDstDnode->createdTime; + pSrcDnode->updateTime = pDstDnode->updateTime; + pSrcDnode->port = pDstDnode->port; + memcpy(pSrcDnode->fqdn, pDstDnode->fqdn, TSDB_FQDN_LEN); + mnodeResetDnode(pSrcDnode); +} + +static int32_t mndCreateDefaultDnode(SMnode *pMnode) { + SDnodeObj dnodeObj = {0}; + dnodeObj.id = 0; + dnodeObj.createdTime = taosGetTimestampMs(); + dnodeObj.updateTime = dnodeObj.createdTime; + dnodeObj.port = pMnode->replicas[0].port; + memcpy(&dnodeObj.fqdn, pMnode->replicas[0].fqdn, TSDB_FQDN_LEN); + + SSdbRaw *pRaw = mndDnodeActionEncode(&dnodeObj); + if (pRaw == NULL) return -1; + sdbSetRawStatus(pRaw, SDB_STATUS_READY); + + return sdbWrite(pMnode->pSdb, pRaw); +} + +static int32_t mndProcessCreateDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessDropDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessConfigDnodeMsg(SMnode *pMnode, SMnodeMsg *pMsg) { return 0; } + +int32_t mndInitDnode(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_USER, + .keyType = SDB_KEY_BINARY, + .deployFp = (SdbDeployFp)mndCreateDefaultDnode, + .encodeFp = (SdbEncodeFp)mndDnodeActionEncode, + .decodeFp = (SdbDecodeFp)mndDnodeActionDecode, + .insertFp = (SdbInsertFp)mndDnodeActionInsert, + .updateFp = (SdbUpdateFp)mndDnodeActionUpdate, + .deleteFp = (SdbDeleteFp)mndDnodeActionDelete}; + + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_DNODE, mndProcessCreateDnodeMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_DNODE, mndProcessDropDnodeMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CONFIG_DNODE, mndProcessConfigDnodeMsg); + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupDnode(SMnode *pMnode) {} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index d63e3662e0..4156d2ab37 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -21,7 +21,7 @@ #define SDB_USER_VER 1 static SSdbRaw *mndUserActionEncode(SUserObj *pUser) { - SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SAcctObj)); + SSdbRaw *pRaw = sdbAllocRaw(SDB_USER, SDB_USER_VER, sizeof(SUserObj)); if (pRaw == NULL) return NULL; int32_t dataPos = 0; @@ -97,7 +97,7 @@ static int32_t mndUserActionUpdate(SSdb *pSdb, SUserObj *pSrcUser, SUserObj *pDs return 0; } -static int32_t mndCreateDefaultUser(SSdb *pSdb, char *acct, char *user, char *pass) { +static int32_t mndCreateDefaultUser(SMnode *pMnode, char *acct, char *user, char *pass) { SUserObj userObj = {0}; tstrncpy(userObj.user, user, TSDB_USER_LEN); tstrncpy(userObj.acct, acct, TSDB_USER_LEN); @@ -113,15 +113,15 @@ static int32_t mndCreateDefaultUser(SSdb *pSdb, char *acct, char *user, char *pa if (pRaw == NULL) return -1; sdbSetRawStatus(pRaw, SDB_STATUS_READY); - return sdbWrite(pSdb, pRaw); + return sdbWrite(pMnode->pSdb, pRaw); } -static int32_t mndCreateDefaultUsers(SSdb *pSdb) { - if (mndCreateDefaultUser(pSdb, TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { +static int32_t mndCreateDefaultUsers(SMnode *pMnode) { + if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { return -1; } - if (mndCreateDefaultUser(pSdb, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { + if (mndCreateDefaultUser(pMnode, TSDB_DEFAULT_USER, "_" TSDB_DEFAULT_USER, TSDB_DEFAULT_PASS) != 0) { return -1; } diff --git a/source/dnode/mnode/sdb/inc/sdbInt.h b/source/dnode/mnode/sdb/inc/sdbInt.h index 6162617568..e492f28557 100644 --- a/source/dnode/mnode/sdb/inc/sdbInt.h +++ b/source/dnode/mnode/sdb/inc/sdbInt.h @@ -38,8 +38,8 @@ extern "C" { typedef struct SSdbRaw { int8_t type; - int8_t sver; int8_t status; + int8_t sver; int8_t reserved; int32_t dataLen; char pData[]; @@ -53,6 +53,7 @@ typedef struct SSdbRow { } SSdbRow; typedef struct SSdb { + SMnode *pMnode; char *currDir; char *syncDir; char *tmpDir; diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 4f1267498c..1ceb3862ee 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -27,7 +27,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { } char path[PATH_MAX + 100]; - snprintf(path, PATH_MAX + 100, "%s", pOption->path); + snprintf(path, PATH_MAX + 100, "%s%sdata", pOption->path, TD_DIRSEP); pSdb->currDir = strdup(path); snprintf(path, PATH_MAX + 100, "%s%ssync", pOption->path, TD_DIRSEP); pSdb->syncDir = strdup(path); @@ -44,6 +44,7 @@ SSdb *sdbInit(SSdbOpt *pOption) { taosInitRWLatch(&pSdb->locks[i]); } + pSdb->pMnode = pOption->pMnode; mDebug("sdb init successfully"); return pSdb; } diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index 37fcdc19ef..b285675b85 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -46,7 +46,7 @@ static int32_t sdbRunDeployFp(SSdb *pSdb) { SdbDeployFp fp = pSdb->deployFps[i]; if (fp == NULL) continue; - if ((*fp)(pSdb) != 0) { + if ((*fp)(pSdb->pMnode) != 0) { mError("failed to deploy sdb:%d since %s", i, terrstr()); return -1; } diff --git a/source/dnode/mnode/sdb/src/sdbRaw.c b/source/dnode/mnode/sdb/src/sdbRaw.c index 2abff74168..7ed1a427f5 100644 --- a/source/dnode/mnode/sdb/src/sdbRaw.c +++ b/source/dnode/mnode/sdb/src/sdbRaw.c @@ -61,6 +61,21 @@ int32_t sdbSetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t val) { return 0; } +int32_t sdbSetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int16_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *(int16_t *)(pRaw->pData + dataPos) = val; + return 0; +} + int32_t sdbSetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t val) { if (pRaw == NULL) { terrno = TSDB_CODE_INVALID_PTR; @@ -146,6 +161,21 @@ int32_t sdbGetRawInt32(SSdbRaw *pRaw, int32_t dataPos, int32_t *val) { return 0; } +int32_t sdbGetRawInt16(SSdbRaw *pRaw, int32_t dataPos, int16_t *val) { + if (pRaw == NULL) { + terrno = TSDB_CODE_INVALID_PTR; + return -1; + } + + if (dataPos + sizeof(int16_t) > pRaw->dataLen) { + terrno = TSDB_CODE_SDB_INVALID_DATA_LEN; + return -1; + } + + *val = *(int16_t *)(pRaw->pData + dataPos); + return 0; +} + int32_t sdbGetRawInt64(SSdbRaw *pRaw, int32_t dataPos, int64_t *val) { if (pRaw == NULL) { terrno = TSDB_CODE_INVALID_PTR;