|
|
|
@ -25,12 +25,12 @@ static inline bool vnodeIsMsgWeak(tmsg_t type) { return false; }
|
|
|
|
|
static inline void vnodeAccumBlockMsg(SVnode *pVnode, tmsg_t type) {
|
|
|
|
|
if (!vnodeIsMsgBlock(type)) return;
|
|
|
|
|
|
|
|
|
|
int32_t count = atomic_add_fetch_32(&pVnode->syncCount, 1);
|
|
|
|
|
int32_t count = atomic_add_fetch_32(&pVnode->blockCount, 1);
|
|
|
|
|
vTrace("vgId:%d, accum block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static inline void vnodeWaitBlockMsg(SVnode *pVnode) {
|
|
|
|
|
int32_t count = atomic_load_32(&pVnode->syncCount);
|
|
|
|
|
int32_t count = atomic_load_32(&pVnode->blockCount);
|
|
|
|
|
if (count <= 0) return;
|
|
|
|
|
|
|
|
|
|
vTrace("vgId:%d, wait block finish, count:%d", pVnode->config.vgId, count);
|
|
|
|
@ -40,10 +40,10 @@ static inline void vnodeWaitBlockMsg(SVnode *pVnode) {
|
|
|
|
|
static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
|
|
|
|
|
if (!vnodeIsMsgBlock(type)) return;
|
|
|
|
|
|
|
|
|
|
int32_t count = atomic_load_32(&pVnode->syncCount);
|
|
|
|
|
int32_t count = atomic_load_32(&pVnode->blockCount);
|
|
|
|
|
if (count <= 0) return;
|
|
|
|
|
|
|
|
|
|
count = atomic_sub_fetch_32(&pVnode->syncCount, 1);
|
|
|
|
|
count = atomic_sub_fetch_32(&pVnode->blockCount, 1);
|
|
|
|
|
vTrace("vgId:%d, post block, count:%d type:%s", pVnode->config.vgId, count, TMSG_INFO(type));
|
|
|
|
|
if (count <= 0) {
|
|
|
|
|
tsem_post(&pVnode->syncSem);
|
|
|
|
@ -84,8 +84,10 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
|
|
|
|
terrno = TSDB_CODE_INVALID_MSG;
|
|
|
|
|
return TSDB_CODE_INVALID_MSG;
|
|
|
|
|
}
|
|
|
|
|
STraceId *trace = &pMsg->info.traceId;
|
|
|
|
|
|
|
|
|
|
const STraceId *trace = &pMsg->info.traceId;
|
|
|
|
|
vGTrace("vgId:%d, start to alter vnode replica to %d, handle:%p", TD_VID(pVnode), req.replica, pMsg->info.handle);
|
|
|
|
|
|
|
|
|
|
SSyncCfg cfg = {.replicaNum = req.replica, .myIndex = req.selfIndex};
|
|
|
|
|
for (int32_t r = 0; r < req.replica; ++r) {
|
|
|
|
|
SNodeInfo *pNode = &cfg.nodeInfo[r];
|
|
|
|
@ -126,32 +128,23 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|
|
|
|
|
|
|
|
|
for (int32_t m = 0; m < numOfMsgs; m++) {
|
|
|
|
|
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
|
|
|
|
STraceId *trace = &pMsg->info.traceId;
|
|
|
|
|
const STraceId *trace = &pMsg->info.traceId;
|
|
|
|
|
vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
|
|
|
|
|
|
|
|
|
|
if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
|
|
|
|
|
code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
|
|
|
|
|
code = vnodePreProcessReq(pVnode, pMsg);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
|
|
|
|
|
} else {
|
|
|
|
|
code = vnodePreprocessReq(pVnode, pMsg);
|
|
|
|
|
if (code != 0) {
|
|
|
|
|
vError("vgId:%d, failed to pre-process msg:%p since %s", vgId, pMsg, terrstr());
|
|
|
|
|
if (pMsg->msgType == TDMT_VND_ALTER_REPLICA) {
|
|
|
|
|
code = vnodeProcessAlterReplicaReq(pVnode, pMsg);
|
|
|
|
|
} else {
|
|
|
|
|
code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
|
|
|
|
|
if (code == 1) {
|
|
|
|
|
do {
|
|
|
|
|
static int32_t cnt = 0;
|
|
|
|
|
if (cnt++ % 1000 == 1) {
|
|
|
|
|
vInfo("vgId:%d, msg:%p apply right now, apply index:%ld, msgtype:%s,%d", vgId, pMsg,
|
|
|
|
|
pMsg->info.conn.applyIndex, TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
|
|
|
|
}
|
|
|
|
|
} while (0);
|
|
|
|
|
|
|
|
|
|
if (code > 0) {
|
|
|
|
|
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
|
|
|
|
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
|
|
|
|
rsp.code = terrno;
|
|
|
|
|
vInfo("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
|
|
|
|
|
vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (rsp.info.handle != NULL) {
|
|
|
|
|
tmsgSendRsp(&rsp);
|
|
|
|
|
}
|
|
|
|
@ -161,33 +154,27 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|
|
|
|
|
|
|
|
|
if (code == 0) {
|
|
|
|
|
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
|
|
|
|
|
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
|
|
|
|
SEpSet newEpSet = {0};
|
|
|
|
|
syncGetRetryEpSet(pVnode->sync, &newEpSet);
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
syncGetEpSet(pVnode->sync, &newEpSet);
|
|
|
|
|
SEp *pEp = &newEpSet.eps[newEpSet.inUse];
|
|
|
|
|
if (pEp->port == tsServerPort && strcmp(pEp->fqdn, tsLocalFqdn) == 0) {
|
|
|
|
|
newEpSet.inUse = (newEpSet.inUse + 1) % newEpSet.numOfEps;
|
|
|
|
|
}
|
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
|
|
|
|
|
newEpSet.inUse);
|
|
|
|
|
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
|
|
|
|
|
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
|
|
|
|
|
}
|
|
|
|
|
pMsg->info.hasEpSet = 1;
|
|
|
|
|
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
|
|
|
|
tmsgSendRedirectRsp(&rsp, &newEpSet);
|
|
|
|
|
} else {
|
|
|
|
|
if (code != 1) {
|
|
|
|
|
} else if (code < 0) {
|
|
|
|
|
if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
|
|
|
|
SEpSet newEpSet = {0};
|
|
|
|
|
syncGetRetryEpSet(pVnode->sync, &newEpSet);
|
|
|
|
|
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
|
|
|
|
|
newEpSet.inUse);
|
|
|
|
|
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
|
|
|
|
|
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
|
|
|
|
|
}
|
|
|
|
|
pMsg->info.hasEpSet = 1;
|
|
|
|
|
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
|
|
|
|
tmsgSendRedirectRsp(&rsp, &newEpSet);
|
|
|
|
|
} else {
|
|
|
|
|
if (terrno != 0) code = terrno;
|
|
|
|
|
vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
|
|
|
|
|
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
|
|
|
|
tmsgSendRsp(&rsp);
|
|
|
|
|
if (rsp.info.handle != NULL) {
|
|
|
|
|
tmsgSendRsp(&rsp);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
|
|
|
|
@ -206,7 +193,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|
|
|
|
|
|
|
|
|
for (int32_t i = 0; i < numOfMsgs; ++i) {
|
|
|
|
|
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
|
|
|
|
|
STraceId *trace = &pMsg->info.traceId;
|
|
|
|
|
const STraceId *trace = &pMsg->info.traceId;
|
|
|
|
|
vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType),
|
|
|
|
|
pMsg->info.handle);
|
|
|
|
|
|
|
|
|
@ -229,172 +216,150 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|
|
|
|
int32_t ret = 0;
|
|
|
|
|
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|
|
|
|
int32_t code = 0;
|
|
|
|
|
const STraceId *trace = &pMsg->info.traceId;
|
|
|
|
|
|
|
|
|
|
if (syncEnvIsStart()) {
|
|
|
|
|
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
|
|
|
|
assert(pSyncNode != NULL);
|
|
|
|
|
|
|
|
|
|
SMsgHead *pHead = pMsg->pCont;
|
|
|
|
|
STraceId *trace = &pMsg->info.traceId;
|
|
|
|
|
|
|
|
|
|
do {
|
|
|
|
|
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
|
|
|
|
|
static int64_t vndTick = 0;
|
|
|
|
|
if (++vndTick % 10 == 1) {
|
|
|
|
|
vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
|
|
|
|
|
}
|
|
|
|
|
if (gRaftDetailLog) {
|
|
|
|
|
char logBuf[512] = {0};
|
|
|
|
|
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType,
|
|
|
|
|
syncNodeStr);
|
|
|
|
|
syncRpcMsgLog2(logBuf, pMsg);
|
|
|
|
|
}
|
|
|
|
|
taosMemoryFree(syncNodeStr);
|
|
|
|
|
} while (0);
|
|
|
|
|
|
|
|
|
|
SRpcMsg *pRpcMsg = pMsg;
|
|
|
|
|
|
|
|
|
|
// ToDo: ugly! use function pointer
|
|
|
|
|
// use different strategy
|
|
|
|
|
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
|
|
|
|
|
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
|
|
|
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncTimeoutDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
|
|
|
|
|
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncPingDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
|
|
|
|
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncPingReplyDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
|
|
|
|
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
|
|
|
|
syncClientRequestDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
|
|
|
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncRequestVoteDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
|
|
|
|
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncRequestVoteReplyDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
|
|
|
|
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncAppendEntriesDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
|
|
|
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
|
|
|
|
|
ret = vnodeSetStandBy(pVnode);
|
|
|
|
|
if (ret != 0 && terrno != 0) ret = terrno;
|
|
|
|
|
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
|
|
|
|
|
tmsgSendRsp(&rsp);
|
|
|
|
|
} else {
|
|
|
|
|
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
|
|
|
|
|
ret = -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
} else {
|
|
|
|
|
// use wal first strategy
|
|
|
|
|
|
|
|
|
|
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
|
|
|
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncTimeoutDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
|
|
|
|
|
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncPingDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
|
|
|
|
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncPingReplyDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
|
|
|
|
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
|
|
|
|
syncClientRequestDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
|
|
|
|
|
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncClientRequestBatchDestroyDeep(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
|
|
|
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncRequestVoteDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
|
|
|
|
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncRequestVoteReplyDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) {
|
|
|
|
|
SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncAppendEntriesBatchDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
|
|
|
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
ret = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
|
|
|
|
|
|
|
|
|
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
|
|
|
|
|
ret = vnodeSetStandBy(pVnode);
|
|
|
|
|
if (ret != 0 && terrno != 0) ret = terrno;
|
|
|
|
|
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
|
|
|
|
|
tmsgSendRsp(&rsp);
|
|
|
|
|
} else {
|
|
|
|
|
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
|
|
|
|
|
ret = -1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
syncNodeRelease(pSyncNode);
|
|
|
|
|
} else {
|
|
|
|
|
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
|
|
|
|
ret = -1;
|
|
|
|
|
if (!syncEnvIsStart()) {
|
|
|
|
|
vGError("vgId:%d, msg:%p failed to process since sync env not start", pVnode->config.vgId);
|
|
|
|
|
terrno = TSDB_CODE_APP_ERROR;
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (ret != 0 && terrno == 0) {
|
|
|
|
|
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
|
|
|
|
if (pSyncNode == NULL) {
|
|
|
|
|
vGError("vgId:%d, msg:%p failed to process since invalid sync node", pVnode->config.vgId);
|
|
|
|
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
|
|
|
|
return -1;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#if 1
|
|
|
|
|
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
|
|
|
|
|
static int64_t vndTick = 0;
|
|
|
|
|
if (++vndTick % 10 == 1) {
|
|
|
|
|
vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
|
|
|
|
|
}
|
|
|
|
|
if (gRaftDetailLog) {
|
|
|
|
|
char logBuf[512] = {0};
|
|
|
|
|
snprintf(logBuf, sizeof(logBuf), "vnode process syncmsg, msgType:%d, syncNode:%s", pMsg->msgType, syncNodeStr);
|
|
|
|
|
syncRpcMsgLog2(logBuf, pMsg);
|
|
|
|
|
}
|
|
|
|
|
taosMemoryFree(syncNodeStr);
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
|
|
|
|
|
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
|
|
|
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncTimeoutDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_PING) {
|
|
|
|
|
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncPingDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
|
|
|
|
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncPingReplyDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
|
|
|
|
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
|
|
|
|
syncClientRequestDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
|
|
|
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncRequestVoteDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
|
|
|
|
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncRequestVoteReplyDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
|
|
|
|
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncAppendEntriesDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
|
|
|
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
|
|
|
|
|
code = vnodeSetStandBy(pVnode);
|
|
|
|
|
if (code != 0 && terrno != 0) code = terrno;
|
|
|
|
|
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
|
|
|
|
tmsgSendRsp(&rsp);
|
|
|
|
|
} else {
|
|
|
|
|
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
|
|
|
|
|
code = -1;
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
// use wal first strategy
|
|
|
|
|
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
|
|
|
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncTimeoutDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_PING) {
|
|
|
|
|
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncPingDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
|
|
|
|
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncPingReplyDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
|
|
|
|
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
|
|
|
|
syncClientRequestDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
|
|
|
|
|
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncClientRequestBatchDestroyDeep(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
|
|
|
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncRequestVoteDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
|
|
|
|
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncRequestVoteReplyDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) {
|
|
|
|
|
SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncAppendEntriesBatchDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
|
|
|
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
|
|
|
|
ASSERT(pSyncMsg != NULL);
|
|
|
|
|
code = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg);
|
|
|
|
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
|
|
|
|
} else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
|
|
|
|
|
code = vnodeSetStandBy(pVnode);
|
|
|
|
|
if (code != 0 && terrno != 0) code = terrno;
|
|
|
|
|
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
|
|
|
|
tmsgSendRsp(&rsp);
|
|
|
|
|
} else {
|
|
|
|
|
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
|
|
|
|
|
code = -1;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
syncNodeRelease(pSyncNode);
|
|
|
|
|
if (code != 0 && terrno == 0) {
|
|
|
|
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
|
|
|
|
}
|
|
|
|
|
return ret;
|
|
|
|
|
return code;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
|
|
|
@ -427,7 +392,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
|
|
|
|
|
syncGetAndDelRespRpc(pVnode->sync, cbMeta.newCfgSeqNum, &rpcMsg.info);
|
|
|
|
|
rpcMsg.info.conn.applyIndex = cbMeta.index;
|
|
|
|
|
|
|
|
|
|
STraceId *trace = (STraceId *)&pMsg->info.traceId;
|
|
|
|
|
const STraceId *trace = (STraceId *)&pMsg->info.traceId;
|
|
|
|
|
vGTrace("vgId:%d, alter vnode replica is confirmed, type:%s contLen:%d seq:%" PRIu64 " handle:%p", TD_VID(pVnode),
|
|
|
|
|
TMSG_INFO(pMsg->msgType), pMsg->contLen, cbMeta.seqNum, rpcMsg.info.handle);
|
|
|
|
|
if (rpcMsg.info.handle != NULL) {
|
|
|
|
@ -444,9 +409,8 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
|
|
|
|
char logBuf[256] = {0};
|
|
|
|
|
|
|
|
|
|
snprintf(logBuf, sizeof(logBuf),
|
|
|
|
|
"==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n",
|
|
|
|
|
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
|
|
|
|
|
beginIndex);
|
|
|
|
|
"commitCb execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", pFsm,
|
|
|
|
|
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
|
|
|
|
|
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
|
|
|
|
|
|
|
|
|
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
|
|
|
@ -459,16 +423,15 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
|
|
|
|
|
|
|
|
|
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
|
|
|
|
char logBuf[256] = {0};
|
|
|
|
|
snprintf(logBuf, sizeof(logBuf),
|
|
|
|
|
"==callback== ==PreCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, cbMeta.index,
|
|
|
|
|
cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
|
|
|
|
snprintf(logBuf, sizeof(logBuf), "preCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm,
|
|
|
|
|
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
|
|
|
|
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
|
|
|
|
char logBuf[256] = {0};
|
|
|
|
|
snprintf(logBuf, sizeof(logBuf), "==callback== ==RollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n",
|
|
|
|
|
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
|
|
|
|
snprintf(logBuf, sizeof(logBuf), "rollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm,
|
|
|
|
|
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state));
|
|
|
|
|
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|