Merge pull request #18990 from taosdata/fix/TD-21299
fix: reduce drop dnode speed to make data more secure
This commit is contained in:
commit
bc092d7407
|
@ -68,7 +68,7 @@ typedef uint16_t tmsg_t;
|
|||
|
||||
static inline bool vnodeIsMsgBlock(tmsg_t type) {
|
||||
return (type == TDMT_VND_CREATE_TABLE) || (type == TDMT_VND_ALTER_TABLE) || (type == TDMT_VND_DROP_TABLE) ||
|
||||
(type == TDMT_VND_UPDATE_TAG_VAL);
|
||||
(type == TDMT_VND_UPDATE_TAG_VAL) || (type == TDMT_VND_ALTER_CONFIRM);
|
||||
}
|
||||
|
||||
static inline bool syncUtilUserCommit(tmsg_t msgType) {
|
||||
|
|
|
@ -1126,8 +1126,12 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
|||
}
|
||||
|
||||
if (!force) {
|
||||
#if 1
|
||||
{
|
||||
#else
|
||||
if (newVg.replica == 1) {
|
||||
mInfo("vgId:%d, will add 1 vnode, replca:1", pVgroup->vgId);
|
||||
#endif
|
||||
mInfo("vgId:%d, will add 1 vnode, replca:%d", pVgroup->vgId, newVg.replica);
|
||||
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
|
||||
for (int32_t i = 0; i < newVg.replica - 1; ++i) {
|
||||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
|
||||
|
@ -1155,6 +1159,9 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
|||
if (mndAddAlterVnodeReplicaAction(pMnode, pTrans, pDb, &newVg, newVg.vnodeGid[i].dnodeId) != 0) return -1;
|
||||
}
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
||||
#if 1
|
||||
}
|
||||
#else
|
||||
} else { // new replica == 3
|
||||
mInfo("vgId:%d, will add 1 vnode, replca:3", pVgroup->vgId);
|
||||
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
|
||||
|
@ -1181,6 +1188,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
|||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVg, &newVg.vnodeGid[vnIndex]) != 0) return -1;
|
||||
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg) != 0) return -1;
|
||||
}
|
||||
#endif
|
||||
} else {
|
||||
mInfo("vgId:%d, will add 1 vnode and force remove 1 vnode", pVgroup->vgId);
|
||||
if (mndAddVnodeToVgroup(pMnode, pTrans, &newVg, pArray) != 0) return -1;
|
||||
|
|
|
@ -233,7 +233,7 @@ int32_t syncSendTimeoutRsp(int64_t rid, int64_t seq) {
|
|||
rpcSendResponse(&rpcMsg);
|
||||
return 0;
|
||||
} else {
|
||||
sInfo("no rpcinfo to send timeout response, seq:%" PRId64, seq);
|
||||
sError("no message handle to send timeout response, seq:%" PRId64, seq);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,11 +35,16 @@ SSyncRespMgr *syncRespMgrCreate(void *data, int64_t ttl) {
|
|||
pObj->seqNum = 0;
|
||||
taosThreadMutexInit(&(pObj->mutex), NULL);
|
||||
|
||||
SSyncNode *pNode = pObj->data;
|
||||
sTrace("vgId:%d, create resp manager", pNode->vgId);
|
||||
return pObj;
|
||||
}
|
||||
|
||||
void syncRespMgrDestroy(SSyncRespMgr *pObj) {
|
||||
if (pObj != NULL) {
|
||||
SSyncNode *pNode = pObj->data;
|
||||
sTrace("vgId:%d, destroy resp manager", pNode->vgId);
|
||||
|
||||
taosThreadMutexLock(&pObj->mutex);
|
||||
taosHashCleanup(pObj->pRespHash);
|
||||
taosThreadMutexUnlock(&pObj->mutex);
|
||||
|
@ -81,6 +86,8 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t seq, SRespStub *pStub) {
|
|||
|
||||
taosThreadMutexUnlock(&pObj->mutex);
|
||||
return 1; // get one object
|
||||
} else {
|
||||
sNError(pObj->data, "get message handle, no object of seq:%" PRIu64, seq);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pObj->mutex);
|
||||
|
@ -99,6 +106,8 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t seq, SRpcHandleInfo *p
|
|||
|
||||
taosThreadMutexUnlock(&pObj->mutex);
|
||||
return 1; // get one object
|
||||
} else {
|
||||
sNError(pObj->data, "get-and-del message handle, no object of seq:%" PRIu64, seq);
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pObj->mutex);
|
||||
|
@ -114,7 +123,7 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
|||
SArray *delIndexArray = taosArrayInit(4, sizeof(uint64_t));
|
||||
if (delIndexArray == NULL) return;
|
||||
|
||||
sDebug("vgId:%d, resp mgr begin clean by ttl", pSyncNode->vgId);
|
||||
sDebug("vgId:%d, resp manager begin clean by ttl", pSyncNode->vgId);
|
||||
while (pStub) {
|
||||
size_t len;
|
||||
void *key = taosHashGetKey(pStub, &len);
|
||||
|
@ -143,34 +152,39 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) {
|
|||
|
||||
// TODO: and make rpcMsg body, call commit cb
|
||||
// pSyncNode->pFsm->FpCommitCb(pSyncNode->pFsm, &pStub->rpcMsg, cbMeta);
|
||||
|
||||
pStub->rpcMsg.code = TSDB_CODE_SYN_NOT_LEADER;
|
||||
if (pStub->rpcMsg.info.handle != NULL) {
|
||||
tmsgSendRsp(&pStub->rpcMsg);
|
||||
}
|
||||
SRpcMsg rpcMsg = {.info = pStub->rpcMsg.info, .code = TSDB_CODE_SYN_TIMEOUT};
|
||||
sInfo("vgId:%d, message handle:%p expired, type:%s ahandle:%p", pSyncNode->vgId, rpcMsg.info.handle,
|
||||
TMSG_INFO(pStub->rpcMsg.msgType), rpcMsg.info.ahandle);
|
||||
rpcSendResponse(&rpcMsg);
|
||||
}
|
||||
|
||||
pStub = taosHashIterate(pObj->pRespHash, pStub);
|
||||
}
|
||||
|
||||
int32_t arraySize = taosArrayGetSize(delIndexArray);
|
||||
sDebug("vgId:%d, resp mgr end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize);
|
||||
sDebug("vgId:%d, resp manager end clean by ttl, sum:%d, cnt:%d, array-size:%d", pSyncNode->vgId, sum, cnt, arraySize);
|
||||
|
||||
for (int32_t i = 0; i < arraySize; ++i) {
|
||||
uint64_t *pSeqNum = taosArrayGet(delIndexArray, i);
|
||||
taosHashRemove(pObj->pRespHash, pSeqNum, sizeof(uint64_t));
|
||||
sDebug("vgId:%d, resp mgr clean by ttl, seq:%" PRId64 "", pSyncNode->vgId, *pSeqNum);
|
||||
sDebug("vgId:%d, resp manager clean by ttl, seq:%" PRId64, pSyncNode->vgId, *pSeqNum);
|
||||
}
|
||||
taosArrayDestroy(delIndexArray);
|
||||
}
|
||||
|
||||
void syncRespCleanRsp(SSyncRespMgr *pObj) {
|
||||
SSyncNode *pNode = pObj->data;
|
||||
sTrace("vgId:%d, clean all rsp", pNode->vgId);
|
||||
|
||||
taosThreadMutexLock(&pObj->mutex);
|
||||
syncRespCleanByTTL(pObj, -1, true);
|
||||
taosThreadMutexUnlock(&pObj->mutex);
|
||||
}
|
||||
|
||||
void syncRespClean(SSyncRespMgr *pObj) {
|
||||
SSyncNode *pNode = pObj->data;
|
||||
sTrace("vgId:%d, clean rsp by ttl", pNode->vgId);
|
||||
|
||||
taosThreadMutexLock(&pObj->mutex);
|
||||
syncRespCleanByTTL(pObj, pObj->ttl, false);
|
||||
taosThreadMutexUnlock(&pObj->mutex);
|
||||
|
|
Loading…
Reference in New Issue