fix: reduce drop dnode speed makes data more secure
This commit is contained in:
parent
191c43654d
commit
ead5d8edf2
|
@ -68,7 +68,7 @@ typedef uint16_t tmsg_t;
|
||||||
|
|
||||||
static inline bool vnodeIsMsgBlock(tmsg_t type) {
|
static inline bool vnodeIsMsgBlock(tmsg_t type) {
|
||||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||||
(type == TDMT_VND_UPDATE_TAG_VAL);
|
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
|
||||||
}
|
}
|
||||||
|
|
||||||
static inline bool syncUtilUserCommit(tmsg_t msgType) {
|
static inline bool syncUtilUserCommit(tmsg_t msgType) {
|
||||||
|
|
|
@ -1126,8 +1126,12 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!force) {
|
if (!force) {
|
||||||
|
#if 1
|
||||||
|
{
|
||||||
|
#else
|
||||||
if (newVg.replica == 1) {
|
if (newVg.replica == 1) {
|
||||||
mInfo("vgId:%d, will add 1 vnode, replca:1", pVgroup->vgId);
|
#endif
|
||||||
|
mInfo("vgId:%d, will add 1 vnode, replca:%d", pVgroup->vgId, newVg.replica);
|
||||||
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
|
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
|
||||||
for (int32_t i = 0; i < newVg.replica - 1; ++i) {
|
for (int32_t i = 0; i < newVg.replica - 1; ++i) {
|
||||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
|
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
|
||||||
|
@ -1155,6 +1159,9 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
||||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
|
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
|
||||||
}
|
}
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
||||||
|
#if 1
|
||||||
|
}
|
||||||
|
#else
|
||||||
} else { // new replica == 3
|
} else { // new replica == 3
|
||||||
mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
|
mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
|
||||||
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
|
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
|
||||||
|
@ -1181,6 +1188,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
|
||||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
} else {
|
} else {
|
||||||
mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
|
mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
|
||||||
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
|
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
|
||||||
|
|
|
@ -233,7 +233,7 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
|
||||||
rpcSendResponse(&rpcMsg);
|
rpcSendResponse(&rpcMsg);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
sInfo("no rpcinfo to send timeout response, seq:%" PRId64, seq);
|
sError("no message handle to send timeout response, seq:%" PRId64, seq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,11 +35,16 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
|
||||||
pObj->seqNum = 0;
|
pObj->seqNum = 0;
|
||||||
taosThreadMutexInit(&(pObj->mutex), NULL);
|
taosThreadMutexInit(&(pObj->mutex), NULL);
|
||||||
|
|
||||||
|
SSyncNode *pNode = pObj->data;
|
||||||
|
sTrace("vgId:%d, create resp manager", pNode->vgId);
|
||||||
return pObj;
|
return pObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
|
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
|
||||||
if (pObj != NULL) {
|
if (pObj != NULL) {
|
||||||
|
SSyncNode *pNode = pObj->data;
|
||||||
|
sTrace("vgId:%d, destroy resp manager", pNode->vgId);
|
||||||
|
|
||||||
taosThreadMutexLock(&pObj->mutex);
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
taosHashCleanup(pObj->pRespHash);
|
taosHashCleanup(pObj->pRespHash);
|
||||||
taosThreadMutexUnlock(&pObj->mutex);
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
|
@ -81,6 +86,8 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub) {
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pObj->mutex);
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
return 1; // get one object
|
return 1; // get one object
|
||||||
|
} else {
|
||||||
|
sNError(pObj->data, "get message handle, no object of seq:%" PRIu64, seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pObj->mutex);
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
|
@ -99,6 +106,8 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *p
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pObj->mutex);
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
return 1; // get one object
|
return 1; // get one object
|
||||||
|
} else {
|
||||||
|
sNError(pObj->data, "get-and-del message handle, no object of seq:%" PRIu64, seq);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pObj->mutex);
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
|
@ -114,7 +123,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
|
SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
|
||||||
if (delIndexArray == NULL) return;
|
if (delIndexArray == NULL) return;
|
||||||
|
|
||||||
sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId);
|
sDebug("vgId:%d, resp manager begin clean by ttl", pSyncNode->vgId);
|
||||||
while (pStub) {
|
while (pStub) {
|
||||||
size_t len;
|
size_t len;
|
||||||
void *key = taosHashGetKey(pStub, &len);
|
void *key = taosHashGetKey(pStub, &len);
|
||||||
|
@ -143,34 +152,39 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
||||||
|
|
||||||
// TODO: and make rpcMsg body, call commit cb
|
// TODO: and make rpcMsg body, call commit cb
|
||||||
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &pStub->rpcMsg, cbMeta);
|
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &pStub->rpcMsg, cbMeta);
|
||||||
|
SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT};
|
||||||
pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER;
|
sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pSyncNode->vgId, rpcMsg.info.handle,
|
||||||
if (pStub->rpcMsg.info.handle != NULL) {
|
TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle);
|
||||||
tmsgSendRsp(&pStub->rpcMsg);
|
rpcSendResponse(&rpcMsg);
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pStub = taosHashIterate(pObj->pRespHash, pStub);
|
pStub = taosHashIterate(pObj->pRespHash, pStub);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t arraySize = taosArrayGetSize(delIndexArray);
|
int32_t arraySize = taosArrayGetSize(delIndexArray);
|
||||||
sDebug("vgId:%d, resp mgr end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize);
|
sDebug("vgId:%d, resp manager end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize);
|
||||||
|
|
||||||
for (int32_t i = 0; i < arraySize; ++i) {
|
for (int32_t i = 0; i < arraySize; ++i) {
|
||||||
uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
|
uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
|
||||||
taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t));
|
taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t));
|
||||||
sDebug("vgId:%d, resp mgr clean by ttl, seq:%" PRId64 "", pSyncNode->vgId, *pSeqNum);
|
sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pSyncNode->vgId, *pSeqNum);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(delIndexArray);
|
taosArrayDestroy(delIndexArray);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
||||||
|
SSyncNode *pNode = pObj->data;
|
||||||
|
sTrace("vgId:%d, clean all rsp", pNode->vgId);
|
||||||
|
|
||||||
taosThreadMutexLock(&pObj->mutex);
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
syncRespCleanByTTL(pObj, -1, true);
|
syncRespCleanByTTL(pObj, -1, true);
|
||||||
taosThreadMutexUnlock(&pObj->mutex);
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRespClean(SSyncRespMgr *pObj) {
|
void syncRespClean(SSyncRespMgr *pObj) {
|
||||||
|
SSyncNode *pNode = pObj->data;
|
||||||
|
sTrace("vgId:%d, clean rsp by ttl", pNode->vgId);
|
||||||
|
|
||||||
taosThreadMutexLock(&pObj->mutex);
|
taosThreadMutexLock(&pObj->mutex);
|
||||||
syncRespCleanByTTL(pObj, pObj->ttl, false);
|
syncRespCleanByTTL(pObj, pObj->ttl, false);
|
||||||
taosThreadMutexUnlock(&pObj->mutex);
|
taosThreadMutexUnlock(&pObj->mutex);
|
||||||
|
|
Loading…
Reference in New Issue