Merge pull request #9094 from taosdata/feature/dnode3
TD-10431 get sdbGetMaxId func
This commit is contained in:
commit
2e643995d8
|
@ -295,6 +295,15 @@ void sdbCancelFetch(SSdb *pSdb, void *pIter);
|
||||||
*/
|
*/
|
||||||
int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
|
int32_t sdbGetSize(SSdb *pSdb, ESdbType type);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Get the max id of the table, keyType of table should be INT32
|
||||||
|
*
|
||||||
|
* @param pSdb The sdb object.
|
||||||
|
* @param pIter The type of the table.
|
||||||
|
* @record int32_t The max id of the table
|
||||||
|
*/
|
||||||
|
int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type);
|
||||||
|
|
||||||
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
|
SSdbRaw *sdbAllocRaw(ESdbType type, int8_t sver, int32_t dataLen);
|
||||||
void sdbFreeRaw(SSdbRaw *pRaw);
|
void sdbFreeRaw(SSdbRaw *pRaw);
|
||||||
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val);
|
int32_t sdbSetRawInt8(SSdbRaw *pRaw, int32_t dataPos, int8_t val);
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
#include "mndShow.h"
|
#include "mndShow.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
#include "tutil.h"
|
#include "tep.h"
|
||||||
|
|
||||||
#define TSDB_DNODE_VER 1
|
#define TSDB_DNODE_VER 1
|
||||||
#define TSDB_DNODE_RESERVE_SIZE 64
|
#define TSDB_DNODE_RESERVE_SIZE 64
|
||||||
|
@ -27,8 +27,6 @@
|
||||||
#define TSDB_CONIIG_VALUE_LEN 48
|
#define TSDB_CONIIG_VALUE_LEN 48
|
||||||
#define TSDB_CONFIG_NUMBER 8
|
#define TSDB_CONFIG_NUMBER 8
|
||||||
|
|
||||||
static int32_t id = 2;
|
|
||||||
|
|
||||||
static const char *offlineReason[] = {
|
static const char *offlineReason[] = {
|
||||||
"",
|
"",
|
||||||
"status msg timeout",
|
"status msg timeout",
|
||||||
|
@ -389,7 +387,7 @@ static int32_t mndProcessStatusMsg(SMnodeMsg *pMsg) {
|
||||||
|
|
||||||
static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) {
|
static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg *pCreate) {
|
||||||
SDnodeObj dnodeObj = {0};
|
SDnodeObj dnodeObj = {0};
|
||||||
dnodeObj.id = id++;
|
dnodeObj.id = sdbGetMaxId(pMnode->pSdb, SDB_DNODE);
|
||||||
dnodeObj.createdTime = taosGetTimestampMs();
|
dnodeObj.createdTime = taosGetTimestampMs();
|
||||||
dnodeObj.updateTime = dnodeObj.createdTime;
|
dnodeObj.updateTime = dnodeObj.createdTime;
|
||||||
taosGetFqdnPortFromEp(pCreate->ep, dnodeObj.fqdn, &dnodeObj.port);
|
taosGetFqdnPortFromEp(pCreate->ep, dnodeObj.fqdn, &dnodeObj.port);
|
||||||
|
|
|
@ -262,11 +262,6 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans) {
|
||||||
sdbRelease(pSdb, pTrans);
|
sdbRelease(pSdb, pTrans);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndGenerateTransId() {
|
|
||||||
static int32_t tmp = 0;
|
|
||||||
return ++tmp;
|
|
||||||
}
|
|
||||||
|
|
||||||
char *mndTransStageStr(ETrnStage stage) {
|
char *mndTransStageStr(ETrnStage stage) {
|
||||||
switch (stage) {
|
switch (stage) {
|
||||||
case TRN_STAGE_PREPARE:
|
case TRN_STAGE_PREPARE:
|
||||||
|
@ -303,7 +298,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, void *rpcHandle) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTrans->id = mndGenerateTransId();
|
pTrans->id = sdbGetMaxId(pMnode->pSdb, SDB_TRANS);
|
||||||
pTrans->stage = TRN_STAGE_PREPARE;
|
pTrans->stage = TRN_STAGE_PREPARE;
|
||||||
pTrans->policy = policy;
|
pTrans->policy = policy;
|
||||||
pTrans->rpcHandle = rpcHandle;
|
pTrans->rpcHandle = rpcHandle;
|
||||||
|
|
|
@ -219,58 +219,6 @@ static int32_t mndCreateUser(SMnode *pMnode, char *acct, char *user, char *pass,
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOldUser, SUserObj *pNewUser, SMnodeMsg *pMsg) {
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
|
||||||
if (pTrans == NULL) {
|
|
||||||
mError("user:%s, failed to update since %s", pOldUser->user, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
mDebug("trans:%d, used to update user:%s", pTrans->id, pOldUser->user);
|
|
||||||
|
|
||||||
SSdbRaw *pRedoRaw = mndUserActionEncode(pNewUser);
|
|
||||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
|
||||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pMsg, SUserObj *pUser) {
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
|
||||||
if (pTrans == NULL) {
|
|
||||||
mError("user:%s, failed to drop since %s", pUser->user, terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
mDebug("trans:%d, used to drop user:%s", pTrans->id, pUser->user);
|
|
||||||
|
|
||||||
SSdbRaw *pRedoRaw = mndUserActionEncode(pUser);
|
|
||||||
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
|
||||||
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);
|
|
||||||
|
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
|
||||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
mndTransDrop(pTrans);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont;
|
SCreateUserMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||||
|
@ -315,6 +263,32 @@ static int32_t mndProcessCreateUserMsg(SMnodeMsg *pMsg) {
|
||||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndUpdateUser(SMnode *pMnode, SUserObj *pOldUser, SUserObj *pNewUser, SMnodeMsg *pMsg) {
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("user:%s, failed to update since %s", pOldUser->user, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
mDebug("trans:%d, used to update user:%s", pTrans->id, pOldUser->user);
|
||||||
|
|
||||||
|
SSdbRaw *pRedoRaw = mndUserActionEncode(pNewUser);
|
||||||
|
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
|
||||||
|
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessAlterUserMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessAlterUserMsg(SMnodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
SAlterUserMsg *pAlter = pMsg->rpcMsg.pCont;
|
SAlterUserMsg *pAlter = pMsg->rpcMsg.pCont;
|
||||||
|
@ -363,6 +337,32 @@ static int32_t mndProcessAlterUserMsg(SMnodeMsg *pMsg) {
|
||||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndDropUser(SMnode *pMnode, SMnodeMsg *pMsg, SUserObj *pUser) {
|
||||||
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle);
|
||||||
|
if (pTrans == NULL) {
|
||||||
|
mError("user:%s, failed to drop since %s", pUser->user, terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
mDebug("trans:%d, used to drop user:%s", pTrans->id, pUser->user);
|
||||||
|
|
||||||
|
SSdbRaw *pRedoRaw = mndUserActionEncode(pUser);
|
||||||
|
if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
|
||||||
|
mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPED);
|
||||||
|
|
||||||
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
|
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndTransDrop(pTrans);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessDropUserMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessDropUserMsg(SMnodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
SDropUserMsg *pDrop = pMsg->rpcMsg.pCont;
|
SDropUserMsg *pDrop = pMsg->rpcMsg.pCont;
|
||||||
|
|
|
@ -59,6 +59,7 @@ typedef struct SSdb {
|
||||||
char *tmpDir;
|
char *tmpDir;
|
||||||
int64_t lastCommitVer;
|
int64_t lastCommitVer;
|
||||||
int64_t curVer;
|
int64_t curVer;
|
||||||
|
int32_t maxId[SDB_MAX];
|
||||||
EKeyType keyTypes[SDB_MAX];
|
EKeyType keyTypes[SDB_MAX];
|
||||||
SHashObj *hashObjs[SDB_MAX];
|
SHashObj *hashObjs[SDB_MAX];
|
||||||
SRWLatch locks[SDB_MAX];
|
SRWLatch locks[SDB_MAX];
|
||||||
|
|
|
@ -127,6 +127,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pSdb->maxId[sdbType] = 0;
|
||||||
pSdb->hashObjs[sdbType] = hash;
|
pSdb->hashObjs[sdbType] = hash;
|
||||||
taosInitRWLatch(&pSdb->locks[sdbType]);
|
taosInitRWLatch(&pSdb->locks[sdbType]);
|
||||||
mDebug("sdb table:%d is initialized", sdbType);
|
mDebug("sdb table:%d is initialized", sdbType);
|
||||||
|
|
|
@ -72,6 +72,10 @@ static int32_t sdbInsertRow(SSdb *pSdb, SHashObj *hash, SSdbRaw *pRaw, SSdbRow *
|
||||||
|
|
||||||
taosWUnLockLatch(pLock);
|
taosWUnLockLatch(pLock);
|
||||||
|
|
||||||
|
if (pSdb->keyTypes[pRow->type] == SDB_KEY_INT32) {
|
||||||
|
pSdb->maxId[pRow->type] = MAX(pSdb->maxId[pRow->type], *((int32_t *)pRow->pObj));
|
||||||
|
}
|
||||||
|
|
||||||
SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
|
SdbInsertFp insertFp = pSdb->insertFps[pRow->type];
|
||||||
if (insertFp != NULL) {
|
if (insertFp != NULL) {
|
||||||
code = (*insertFp)(pSdb, pRow->pObj);
|
code = (*insertFp)(pSdb, pRow->pObj);
|
||||||
|
@ -290,3 +294,28 @@ int32_t sdbGetSize(SSdb *pSdb, ESdbType type) {
|
||||||
|
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t sdbGetMaxId(SSdb *pSdb, ESdbType type) {
|
||||||
|
SHashObj *hash = sdbGetHash(pSdb, type);
|
||||||
|
if (hash == NULL) return -1;
|
||||||
|
|
||||||
|
if (pSdb->keyTypes[type] != SDB_KEY_INT32) return -1;
|
||||||
|
|
||||||
|
int32_t maxId = 0;
|
||||||
|
|
||||||
|
SRWLatch *pLock = &pSdb->locks[type];
|
||||||
|
taosRLockLatch(pLock);
|
||||||
|
|
||||||
|
SSdbRow **ppRow = taosHashIterate(hash, NULL);
|
||||||
|
while (ppRow != NULL) {
|
||||||
|
SSdbRow *pRow = *ppRow;
|
||||||
|
int32_t id = *(int32_t *)pRow->pObj;
|
||||||
|
maxId = MAX(id, maxId);
|
||||||
|
ppRow = taosHashIterate(hash, ppRow);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosRUnLockLatch(pLock);
|
||||||
|
|
||||||
|
maxId = MAX(maxId, pSdb->maxId[type]);
|
||||||
|
return maxId + 1;
|
||||||
|
}
|
||||||
|
|
|
@ -19,5 +19,5 @@ target_link_libraries(
|
||||||
|
|
||||||
# test
|
# test
|
||||||
if(${BUILD_TEST})
|
if(${BUILD_TEST})
|
||||||
add_subdirectory(test)
|
# add_subdirectory(test)
|
||||||
endif(${BUILD_TEST})
|
endif(${BUILD_TEST})
|
Loading…
Reference in New Issue