refactor(sync): add double queues in mnode
This commit is contained in:
parent
fa30b94c67
commit
5745223bca
|
@ -98,6 +98,7 @@ int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad);
|
|||
*/
|
||||
int32_t mndProcessRpcMsg(SRpcMsg *pMsg);
|
||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg);
|
||||
int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg);
|
||||
int32_t mndPreProcessQueryMsg(SRpcMsg *pMsg);
|
||||
void mndPostProcessQueryMsg(SRpcMsg *pMsg);
|
||||
|
||||
|
|
|
@ -34,6 +34,7 @@ typedef struct SMnodeMgmt {
|
|||
SSingleWorker readWorker;
|
||||
SSingleWorker writeWorker;
|
||||
SSingleWorker syncWorker;
|
||||
SSingleWorker syncCtrlWorker;
|
||||
SSingleWorker monitorWorker;
|
||||
bool stopped;
|
||||
int32_t refCount;
|
||||
|
@ -56,6 +57,7 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt);
|
|||
void mmStopWorker(SMnodeMgmt *pMgmt);
|
||||
int32_t mmPutMsgToWriteQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmPutMsgToSyncCtrlQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
|
||||
|
|
|
@ -257,6 +257,9 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_MNODE_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SET_VNODE_STANDBY_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_HEARTBEAT_REPLY, mmPutMsgToSyncCtrlQueue, 1) == NULL) goto _OVER;
|
||||
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
|
|
|
@ -77,6 +77,24 @@ static void mmProcessRpcMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
|||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void mmProcessSyncCtrlMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
pMsg->info.node = pMgmt->pMnode;
|
||||
|
||||
const STraceId *trace = &pMsg->info.traceId;
|
||||
dGTrace("msg:%p, get from mnode-sync-ctrl queue", pMsg);
|
||||
|
||||
SMsgHead *pHead = pMsg->pCont;
|
||||
pHead->contLen = ntohl(pHead->contLen);
|
||||
pHead->vgId = ntohl(pHead->vgId);
|
||||
|
||||
int32_t code = mndProcessSyncCtrlMsg(pMsg);
|
||||
|
||||
dGTrace("msg:%p, is freed, code:0x%x", pMsg, code);
|
||||
rpcFreeCont(pMsg->pCont);
|
||||
taosFreeQitem(pMsg);
|
||||
}
|
||||
|
||||
static void mmProcessSyncMsg(SQueueInfo *pInfo, SRpcMsg *pMsg) {
|
||||
SMnodeMgmt *pMgmt = pInfo->ahandle;
|
||||
pMsg->info.node = pMgmt->pMnode;
|
||||
|
@ -118,6 +136,10 @@ int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
|||
return mmPutMsgToWorker(pMgmt, &pMgmt->syncWorker, pMsg);
|
||||
}
|
||||
|
||||
int32_t mmPutMsgToSyncCtrlQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
return mmPutMsgToWorker(pMgmt, &pMgmt->syncCtrlWorker, pMsg);
|
||||
}
|
||||
|
||||
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
|
||||
}
|
||||
|
@ -158,6 +180,9 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
|
|||
case SYNC_QUEUE:
|
||||
pWorker = &pMgmt->syncWorker;
|
||||
break;
|
||||
case SYNC_CTRL_QUEUE:
|
||||
pWorker = &pMgmt->syncCtrlWorker;
|
||||
break;
|
||||
default:
|
||||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
@ -237,6 +262,18 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg scCfg = {
|
||||
.min = 1,
|
||||
.max = 1,
|
||||
.name = "mnode-sync-ctrl",
|
||||
.fp = (FItem)mmProcessSyncCtrlMsg,
|
||||
.param = pMgmt,
|
||||
};
|
||||
if (tSingleWorkerInit(&pMgmt->syncCtrlWorker, &scCfg) != 0) {
|
||||
dError("failed to start mnode mnode-sync-ctrl worker since %s", terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSingleWorkerCfg mCfg = {
|
||||
.min = 1,
|
||||
.max = 1,
|
||||
|
@ -262,5 +299,6 @@ void mmStopWorker(SMnodeMgmt *pMgmt) {
|
|||
tSingleWorkerCleanup(&pMgmt->readWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->writeWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->syncWorker);
|
||||
tSingleWorkerCleanup(&pMgmt->syncCtrlWorker);
|
||||
dDebug("mnode workers are closed");
|
||||
}
|
||||
|
|
|
@ -444,6 +444,45 @@ void mndStop(SMnode *pMnode) {
|
|||
mndCleanupTimer(pMnode);
|
||||
}
|
||||
|
||||
int32_t mndProcessSyncCtrlMsg(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
int32_t code = 0;
|
||||
|
||||
mInfo("vgId:%d, process sync ctrl msg", 1);
|
||||
|
||||
if (!syncEnvIsStart()) {
|
||||
mError("failed to process sync msg:%p type:%s since syncEnv stop", pMsg, TMSG_INFO(pMsg->msgType));
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSyncNode *pSyncNode = syncNodeAcquire(pMgmt->sync);
|
||||
if (pSyncNode == NULL) {
|
||||
mError("failed to process sync msg:%p type:%s since syncNode is null", pMsg, TMSG_INFO(pMsg->msgType));
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pMsg->msgType == TDMT_SYNC_HEARTBEAT) {
|
||||
SyncHeartbeat *pSyncMsg = syncHeartbeatFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnHeartbeat(pSyncNode, pSyncMsg);
|
||||
syncHeartbeatDestroy(pSyncMsg);
|
||||
|
||||
} else if (pMsg->msgType == TDMT_SYNC_HEARTBEAT_REPLY) {
|
||||
SyncHeartbeatReply *pSyncMsg = syncHeartbeatReplyFromRpcMsg2(pMsg);
|
||||
code = syncNodeOnHeartbeatReply(pSyncNode, pSyncMsg);
|
||||
syncHeartbeatReplyDestroy(pSyncMsg);
|
||||
}
|
||||
|
||||
syncNodeRelease(pSyncNode);
|
||||
|
||||
if (code != 0) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->info.node;
|
||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||
|
|
|
@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender {
|
|||
bool start;
|
||||
int32_t seq;
|
||||
int32_t ack;
|
||||
void * pReader;
|
||||
void * pCurrentBlock;
|
||||
void *pReader;
|
||||
void *pCurrentBlock;
|
||||
int32_t blockLen;
|
||||
SSnapshotParam snapshotParam;
|
||||
SSnapshot snapshot;
|
||||
SSyncCfg lastConfig;
|
||||
int64_t sendingMS;
|
||||
SSyncNode * pSyncNode;
|
||||
SSyncNode *pSyncNode;
|
||||
int32_t replicaIndex;
|
||||
SyncTerm term;
|
||||
SyncTerm privateTerm;
|
||||
|
@ -64,20 +64,22 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender);
|
|||
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
|
||||
|
||||
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
|
||||
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
|
||||
char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
|
||||
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
|
||||
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
|
||||
|
||||
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId);
|
||||
|
||||
//---------------------------------------------------
|
||||
typedef struct SSyncSnapshotReceiver {
|
||||
bool start;
|
||||
int32_t ack;
|
||||
void * pWriter;
|
||||
void *pWriter;
|
||||
SyncTerm term;
|
||||
SyncTerm privateTerm;
|
||||
SSnapshotParam snapshotParam;
|
||||
SSnapshot snapshot;
|
||||
SRaftId fromId;
|
||||
SSyncNode * pSyncNode;
|
||||
SSyncNode *pSyncNode;
|
||||
|
||||
} SSyncSnapshotReceiver;
|
||||
|
||||
|
@ -89,8 +91,8 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver)
|
|||
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
|
||||
|
||||
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
|
||||
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
|
||||
char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
|
||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
|
||||
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
|
||||
|
||||
//---------------------------------------------------
|
||||
// on message
|
||||
|
|
|
@ -1493,15 +1493,19 @@ static int32_t syncNodeDoStartHeartbeatTimer(SSyncNode* pSyncNode) {
|
|||
}
|
||||
|
||||
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
|
||||
pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
|
||||
int32_t ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
|
||||
int32_t ret = 0;
|
||||
|
||||
do {
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
|
||||
syncHbTimerStart(pSyncNode, pSyncTimer);
|
||||
}
|
||||
} while (0);
|
||||
if (syncNodeIsMnode(pSyncNode)) {
|
||||
pSyncNode->heartbeatTimerMS = pSyncNode->hbBaseLine;
|
||||
ret = syncNodeDoStartHeartbeatTimer(pSyncNode);
|
||||
} else {
|
||||
do {
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
SSyncTimer* pSyncTimer = syncNodeGetHbTimer(pSyncNode, &(pSyncNode->peersId[i]));
|
||||
syncHbTimerStart(pSyncNode, pSyncTimer);
|
||||
}
|
||||
} while (0);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -2954,6 +2958,8 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
|
|||
}
|
||||
|
||||
int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) {
|
||||
syncNodeEventLog(ths, "on client request");
|
||||
|
||||
int32_t ret = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -3003,6 +3009,8 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SyncClientRequest* pMsg, SyncInd
|
|||
}
|
||||
|
||||
int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* pMsg) {
|
||||
syncNodeEventLog(ths, "on client request batch");
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
if (ths->state != TAOS_SYNC_STATE_LEADER) {
|
||||
|
|
|
@ -487,6 +487,8 @@ int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId) {
|
|||
SyncIndex logEndIndex = pSyncNode->pLogStore->syncLogEndIndex(pSyncNode->pLogStore);
|
||||
if (nextIndex < logStartIndex || nextIndex > logEndIndex) {
|
||||
// start snapshot
|
||||
int32_t code = syncNodeStartSnapshot(pSyncNode, pDestId);
|
||||
ASSERT(code == 0);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -546,6 +548,8 @@ int32_t syncNodeDoAppendEntries(SSyncNode* pSyncNode, SRaftId* pDestId) {
|
|||
}
|
||||
|
||||
int32_t syncNodeDoReplicate(SSyncNode* pSyncNode) {
|
||||
syncNodeEventLog(pSyncNode, "do replicate");
|
||||
|
||||
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -376,14 +376,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
|
|||
|
||||
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
|
||||
cJSON *pJson = snapshotSender2Json(pSender);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
|
||||
int32_t len = 256;
|
||||
char * s = taosMemoryMalloc(len);
|
||||
char *s = taosMemoryMalloc(len);
|
||||
|
||||
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
char host[64];
|
||||
|
@ -402,6 +402,8 @@ char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
|
|||
return s;
|
||||
}
|
||||
|
||||
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { return 0; }
|
||||
|
||||
// -------------------------------------
|
||||
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId fromId) {
|
||||
bool condition = (pSyncNode->pFsm->FpSnapshotStartWrite != NULL) && (pSyncNode->pFsm->FpSnapshotStopWrite != NULL) &&
|
||||
|
@ -655,7 +657,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
|||
cJSON_AddStringToObject(pFromId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pReceiver->fromId.addr;
|
||||
cJSON * pTmp = pFromId;
|
||||
cJSON *pTmp = pFromId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
|
@ -688,14 +690,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
|||
|
||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
|
||||
cJSON *pJson = snapshotReceiver2Json(pReceiver);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
|
||||
int32_t len = 256;
|
||||
char * s = taosMemoryMalloc(len);
|
||||
char *s = taosMemoryMalloc(len);
|
||||
|
||||
SRaftId fromId = pReceiver->fromId;
|
||||
char host[128];
|
||||
|
|
|
@ -111,6 +111,9 @@ else
|
|||
endi
|
||||
|
||||
|
||||
|
||||
return 0
|
||||
|
||||
vg_ready:
|
||||
print ====> create stable/child table
|
||||
sql create table stb (ts timestamp, c1 int, c2 float, c3 double) tags (t1 int)
|
||||
|
@ -123,6 +126,12 @@ endi
|
|||
sql create table ct1 using stb tags(1000)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
print ====> step1 insert 1000 records
|
||||
$N = 1000
|
||||
$count = 0
|
||||
|
|
Loading…
Reference in New Issue