split sync/status channel
This commit is contained in:
parent
3f0236ec17
commit
b5e5167b8f
|
@ -53,9 +53,12 @@ typedef struct {
|
||||||
void* mgmt;
|
void* mgmt;
|
||||||
void* clientRpc;
|
void* clientRpc;
|
||||||
void* serverRpc;
|
void* serverRpc;
|
||||||
|
void* statusRpc;
|
||||||
|
void* syncRpc;
|
||||||
PutToQueueFp putToQueueFp;
|
PutToQueueFp putToQueueFp;
|
||||||
GetQueueSizeFp qsizeFp;
|
GetQueueSizeFp qsizeFp;
|
||||||
SendReqFp sendReqFp;
|
SendReqFp sendReqFp;
|
||||||
|
SendReqFp sendSyncReqFp;
|
||||||
SendRspFp sendRspFp;
|
SendRspFp sendRspFp;
|
||||||
RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
|
RegisterBrokenLinkArgFp registerBrokenLinkArgFp;
|
||||||
ReleaseHandleFp releaseHandleFp;
|
ReleaseHandleFp releaseHandleFp;
|
||||||
|
@ -67,6 +70,7 @@ void tmsgSetDefault(const SMsgCb* msgcb);
|
||||||
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg);
|
int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg);
|
||||||
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype);
|
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype);
|
||||||
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
|
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg);
|
||||||
|
int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg);
|
||||||
void tmsgSendRsp(SRpcMsg* pMsg);
|
void tmsgSendRsp(SRpcMsg* pMsg);
|
||||||
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
|
void tmsgRegisterBrokenLinkArg(SRpcMsg* pMsg);
|
||||||
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
|
void tmsgReleaseHandle(SRpcHandleInfo* pHandle, int8_t type);
|
||||||
|
|
|
@ -161,7 +161,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
|
||||||
SEpSet epSet = {0};
|
SEpSet epSet = {0};
|
||||||
int8_t epUpdated = 0;
|
int8_t epUpdated = 0;
|
||||||
dmGetMnodeEpSet(pMgmt->pData, &epSet);
|
dmGetMnodeEpSet(pMgmt->pData, &epSet);
|
||||||
rpcSendRecvWithTimeout(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, 5000);
|
rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 10);
|
||||||
if (rpcRsp.code != 0) {
|
if (rpcRsp.code != 0) {
|
||||||
dmRotateMnodeEpSet(pMgmt->pData);
|
dmRotateMnodeEpSet(pMgmt->pData);
|
||||||
char tbuf[512];
|
char tbuf[512];
|
||||||
|
|
|
@ -48,6 +48,8 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
void *serverRpc;
|
void *serverRpc;
|
||||||
void *clientRpc;
|
void *clientRpc;
|
||||||
|
void *statusRpc;
|
||||||
|
void *syncRpc;
|
||||||
SDnodeHandle msgHandles[TDMT_MAX];
|
SDnodeHandle msgHandles[TDMT_MAX];
|
||||||
} SDnodeTrans;
|
} SDnodeTrans;
|
||||||
|
|
||||||
|
@ -136,8 +138,10 @@ int32_t dmInitServer(SDnode *pDnode);
|
||||||
void dmCleanupServer(SDnode *pDnode);
|
void dmCleanupServer(SDnode *pDnode);
|
||||||
int32_t dmInitClient(SDnode *pDnode);
|
int32_t dmInitClient(SDnode *pDnode);
|
||||||
int32_t dmInitStatusClient(SDnode *pDnode);
|
int32_t dmInitStatusClient(SDnode *pDnode);
|
||||||
|
int32_t dmInitSyncClient(SDnode *pDnode);
|
||||||
void dmCleanupClient(SDnode *pDnode);
|
void dmCleanupClient(SDnode *pDnode);
|
||||||
void dmCleanupStatusClient(SDnode *pDnode);
|
void dmCleanupStatusClient(SDnode *pDnode);
|
||||||
|
void dmCleanupSyncClient(SDnode *pDnode);
|
||||||
SMsgCb dmGetMsgcb(SDnode *pDnode);
|
SMsgCb dmGetMsgcb(SDnode *pDnode);
|
||||||
#ifdef TD_MODULE_OPTIMIZE
|
#ifdef TD_MODULE_OPTIMIZE
|
||||||
int32_t dmInitMsgHandle(SDnode *pDnode, SMgmtWrapper *wrappers);
|
int32_t dmInitMsgHandle(SDnode *pDnode, SMgmtWrapper *wrappers);
|
||||||
|
|
|
@ -93,6 +93,9 @@ int32_t dmInitDnode(SDnode *pDnode) {
|
||||||
indexInit(tsNumOfCommitThreads);
|
indexInit(tsNumOfCommitThreads);
|
||||||
streamMetaInit();
|
streamMetaInit();
|
||||||
|
|
||||||
|
dmInitStatusClient(pDnode);
|
||||||
|
dmInitSyncClient(pDnode);
|
||||||
|
|
||||||
dmReportStartup("dnode-transport", "initialized");
|
dmReportStartup("dnode-transport", "initialized");
|
||||||
dDebug("dnode is created, ptr:%p", pDnode);
|
dDebug("dnode is created, ptr:%p", pDnode);
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -112,7 +115,9 @@ void dmCleanupDnode(SDnode *pDnode) {
|
||||||
|
|
||||||
dmCleanupClient(pDnode);
|
dmCleanupClient(pDnode);
|
||||||
dmCleanupStatusClient(pDnode);
|
dmCleanupStatusClient(pDnode);
|
||||||
|
dmCleanupSyncClient(pDnode);
|
||||||
dmCleanupServer(pDnode);
|
dmCleanupServer(pDnode);
|
||||||
|
|
||||||
dmClearVars(pDnode);
|
dmClearVars(pDnode);
|
||||||
rpcCleanup();
|
rpcCleanup();
|
||||||
streamMetaCleanup();
|
streamMetaCleanup();
|
||||||
|
|
|
@ -322,6 +322,23 @@ static inline int32_t dmSendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static inline int32_t dmSendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
|
SDnode *pDnode = dmInstance();
|
||||||
|
if (pDnode->status != DND_STAT_RUNNING && pMsg->msgType < TDMT_SYNC_MSG) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
|
if (pDnode->status == DND_STAT_INIT) {
|
||||||
|
terrno = TSDB_CODE_APP_IS_STARTING;
|
||||||
|
} else {
|
||||||
|
terrno = TSDB_CODE_APP_IS_STOPPING;
|
||||||
|
}
|
||||||
|
dError("failed to send rpc msg:%s since %s, handle:%p", TMSG_INFO(pMsg->msgType), terrstr(), pMsg->info.handle);
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
rpcSendRequest(pDnode->trans.syncRpc, pEpSet, pMsg, NULL);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLinkArg(pMsg); }
|
static inline void dmRegisterBrokenLinkArg(SRpcMsg *pMsg) { rpcRegisterBrokenLinkArg(pMsg); }
|
||||||
|
|
||||||
|
@ -421,16 +438,61 @@ int32_t dmInitStatusClient(SDnode *pDnode) {
|
||||||
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||||
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||||
|
|
||||||
// pTrans->statusClientRpc = rpcOpen(&rpcInit);
|
pTrans->statusRpc = rpcOpen(&rpcInit);
|
||||||
// if (pTrans->statusClientRpc == NULL) {
|
if (pTrans->statusRpc == NULL) {
|
||||||
// dError("failed to init dnode rpc status client");
|
dError("failed to init dnode rpc status client");
|
||||||
// return -1;
|
return -1;
|
||||||
// }
|
}
|
||||||
|
|
||||||
dDebug("dnode rpc status client is initialized");
|
dDebug("dnode rpc status client is initialized");
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t dmInitSyncClient(SDnode *pDnode) {
|
||||||
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
|
|
||||||
|
SRpcInit rpcInit = {0};
|
||||||
|
rpcInit.label = "DND-SYNC";
|
||||||
|
rpcInit.numOfThreads = tsNumOfRpcThreads;
|
||||||
|
rpcInit.cfp = (RpcCfp)dmProcessRpcMsg;
|
||||||
|
rpcInit.sessions = 1024;
|
||||||
|
rpcInit.connType = TAOS_CONN_CLIENT;
|
||||||
|
rpcInit.user = TSDB_DEFAULT_USER;
|
||||||
|
rpcInit.idleTime = tsShellActivityTimer * 1000;
|
||||||
|
rpcInit.parent = pDnode;
|
||||||
|
rpcInit.rfp = rpcRfp;
|
||||||
|
rpcInit.compressSize = tsCompressMsgSize;
|
||||||
|
|
||||||
|
rpcInit.retryMinInterval = tsRedirectPeriod;
|
||||||
|
rpcInit.retryStepFactor = tsRedirectFactor;
|
||||||
|
rpcInit.retryMaxInterval = tsRedirectMaxPeriod;
|
||||||
|
rpcInit.retryMaxTimeout = tsMaxRetryWaitTime;
|
||||||
|
|
||||||
|
rpcInit.failFastInterval = 5000; // interval threshold(ms)
|
||||||
|
rpcInit.failFastThreshold = 3; // failed threshold
|
||||||
|
rpcInit.ffp = dmFailFastFp;
|
||||||
|
|
||||||
|
int32_t connLimitNum = 100;
|
||||||
|
connLimitNum = TMAX(connLimitNum, 10);
|
||||||
|
connLimitNum = TMIN(connLimitNum, 500);
|
||||||
|
|
||||||
|
rpcInit.connLimitNum = connLimitNum;
|
||||||
|
rpcInit.connLimitLock = 1;
|
||||||
|
rpcInit.supportBatch = 1;
|
||||||
|
rpcInit.batchSize = 8 * 1024;
|
||||||
|
rpcInit.timeToGetConn = tsTimeToGetAvailableConn;
|
||||||
|
taosVersionStrToInt(version, &(rpcInit.compatibilityVer));
|
||||||
|
|
||||||
|
pTrans->syncRpc = rpcOpen(&rpcInit);
|
||||||
|
if (pTrans->syncRpc == NULL) {
|
||||||
|
dError("failed to init dnode rpc sync client");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
dDebug("dnode rpc sync client is initialized");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
void dmCleanupClient(SDnode *pDnode) {
|
void dmCleanupClient(SDnode *pDnode) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
if (pTrans->clientRpc) {
|
if (pTrans->clientRpc) {
|
||||||
|
@ -441,11 +503,19 @@ void dmCleanupClient(SDnode *pDnode) {
|
||||||
}
|
}
|
||||||
void dmCleanupStatusClient(SDnode *pDnode) {
|
void dmCleanupStatusClient(SDnode *pDnode) {
|
||||||
SDnodeTrans *pTrans = &pDnode->trans;
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
// if (pTrans->statusClientRpc) {
|
if (pTrans->statusRpc) {
|
||||||
// rpcClose(pTrans->statusClientRpc);
|
rpcClose(pTrans->statusRpc);
|
||||||
// pTrans->statusClientRpc = NULL;
|
pTrans->statusRpc = NULL;
|
||||||
// dDebug("dnode rpc status client is closed");
|
dDebug("dnode rpc status client is closed");
|
||||||
// }
|
}
|
||||||
|
}
|
||||||
|
void dmCleanupSyncClient(SDnode *pDnode) {
|
||||||
|
SDnodeTrans *pTrans = &pDnode->trans;
|
||||||
|
if (pTrans->syncRpc) {
|
||||||
|
rpcClose(pTrans->syncRpc);
|
||||||
|
pTrans->syncRpc = NULL;
|
||||||
|
dDebug("dnode rpc sync client is closed");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmInitServer(SDnode *pDnode) {
|
int32_t dmInitServer(SDnode *pDnode) {
|
||||||
|
@ -486,7 +556,10 @@ SMsgCb dmGetMsgcb(SDnode *pDnode) {
|
||||||
SMsgCb msgCb = {
|
SMsgCb msgCb = {
|
||||||
.clientRpc = pDnode->trans.clientRpc,
|
.clientRpc = pDnode->trans.clientRpc,
|
||||||
.serverRpc = pDnode->trans.serverRpc,
|
.serverRpc = pDnode->trans.serverRpc,
|
||||||
|
.statusRpc = pDnode->trans.statusRpc,
|
||||||
|
.syncRpc = pDnode->trans.syncRpc,
|
||||||
.sendReqFp = dmSendReq,
|
.sendReqFp = dmSendReq,
|
||||||
|
.sendSyncReqFp = dmSendSyncReq,
|
||||||
.sendRspFp = dmSendRsp,
|
.sendRspFp = dmSendRsp,
|
||||||
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
|
.registerBrokenLinkArgFp = dmRegisterBrokenLinkArg,
|
||||||
.releaseHandleFp = dmReleaseHandle,
|
.releaseHandleFp = dmReleaseHandle,
|
||||||
|
|
|
@ -32,6 +32,10 @@ int32_t sendReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
terrno = TSDB_CODE_INVALID_PTR;
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
int32_t sendSyncReq(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PTR;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
char *i642str(int64_t val) {
|
char *i642str(int64_t val) {
|
||||||
static char str[24] = {0};
|
static char str[24] = {0};
|
||||||
|
@ -568,6 +572,7 @@ void mndDumpSdb() {
|
||||||
SMsgCb msgCb = {0};
|
SMsgCb msgCb = {0};
|
||||||
msgCb.reportStartupFp = reportStartup;
|
msgCb.reportStartupFp = reportStartup;
|
||||||
msgCb.sendReqFp = sendReq;
|
msgCb.sendReqFp = sendReq;
|
||||||
|
msgCb.sendSyncReqFp = sendSyncReq;
|
||||||
msgCb.sendRspFp = sendRsp;
|
msgCb.sendRspFp = sendRsp;
|
||||||
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
|
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
|
||||||
tmsgSetDefault(&msgCb);
|
tmsgSetDefault(&msgCb);
|
||||||
|
@ -590,7 +595,7 @@ void mndDumpSdb() {
|
||||||
dumpTopic(pSdb, json);
|
dumpTopic(pSdb, json);
|
||||||
dumpConsumer(pSdb, json);
|
dumpConsumer(pSdb, json);
|
||||||
dumpSubscribe(pSdb, json);
|
dumpSubscribe(pSdb, json);
|
||||||
// dumpOffset(pSdb, json);
|
// dumpOffset(pSdb, json);
|
||||||
dumpStream(pSdb, json);
|
dumpStream(pSdb, json);
|
||||||
dumpAcct(pSdb, json);
|
dumpAcct(pSdb, json);
|
||||||
dumpAuth(pSdb, json);
|
dumpAuth(pSdb, json);
|
||||||
|
@ -605,7 +610,7 @@ void mndDumpSdb() {
|
||||||
char *pCont = tjsonToString(json);
|
char *pCont = tjsonToString(json);
|
||||||
int32_t contLen = strlen(pCont);
|
int32_t contLen = strlen(pCont);
|
||||||
char file[] = "sdb.json";
|
char file[] = "sdb.json";
|
||||||
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC| TD_FILE_WRITE_THROUGH);
|
TdFilePtr pFile = taosOpenFile(file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
mError("failed to write %s since %s", file, terrstr());
|
mError("failed to write %s since %s", file, terrstr());
|
||||||
|
|
|
@ -61,6 +61,7 @@ class MndTestTrans2 : public ::testing::Test {
|
||||||
static SMsgCb msgCb = {0};
|
static SMsgCb msgCb = {0};
|
||||||
msgCb.reportStartupFp = reportStartup;
|
msgCb.reportStartupFp = reportStartup;
|
||||||
msgCb.sendReqFp = sendReq;
|
msgCb.sendReqFp = sendReq;
|
||||||
|
msgCb.sendSyncReqFp = sendSyncReq;
|
||||||
msgCb.sendRspFp = sendRsp;
|
msgCb.sendRspFp = sendRsp;
|
||||||
msgCb.queueFps[SYNC_QUEUE] = putToQueue;
|
msgCb.queueFps[SYNC_QUEUE] = putToQueue;
|
||||||
msgCb.queueFps[WRITE_QUEUE] = putToQueue;
|
msgCb.queueFps[WRITE_QUEUE] = putToQueue;
|
||||||
|
|
|
@ -14,11 +14,11 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "tq.h"
|
|
||||||
#include "sync.h"
|
#include "sync.h"
|
||||||
|
#include "tq.h"
|
||||||
|
#include "tqCommon.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "vnd.h"
|
#include "vnd.h"
|
||||||
#include "tqCommon.h"
|
|
||||||
|
|
||||||
#define BATCH_ENABLE 0
|
#define BATCH_ENABLE 0
|
||||||
|
|
||||||
|
@ -411,7 +411,7 @@ static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
static int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||||
int32_t code = tmsgSendReq(pEpSet, pMsg);
|
int32_t code = tmsgSendSyncReq(pEpSet, pMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
pMsg->pCont = NULL;
|
pMsg->pCont = NULL;
|
||||||
|
@ -477,8 +477,8 @@ static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, SFsmCbMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
static int32_t vnodeSnapshotStartRead(const SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
int32_t code = vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
|
int32_t code = vnodeSnapReaderOpen(pVnode, (SSnapshotParam *)pParam, (SVSnapReader **)ppReader);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -555,7 +555,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
walApplyVer(pVnode->pWal, commitIdx);
|
walApplyVer(pVnode->pWal, commitIdx);
|
||||||
pVnode->restored = true;
|
pVnode->restored = true;
|
||||||
|
|
||||||
SStreamMeta* pMeta = pVnode->pTq->pStreamMeta;
|
SStreamMeta *pMeta = pVnode->pTq->pStreamMeta;
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
if (pMeta->startInfo.tasksWillRestart) {
|
if (pMeta->startInfo.tasksWillRestart) {
|
||||||
|
|
|
@ -48,6 +48,14 @@ int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
int32_t tmsgSendSyncReq(const SEpSet* epSet, SRpcMsg* pMsg) {
|
||||||
|
int32_t code = (*defaultMsgCb.sendSyncReqFp)(epSet, pMsg);
|
||||||
|
if (code != 0) {
|
||||||
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
pMsg->pCont = NULL;
|
||||||
|
}
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
void tmsgSendRsp(SRpcMsg* pMsg) {
|
void tmsgSendRsp(SRpcMsg* pMsg) {
|
||||||
#if 1
|
#if 1
|
||||||
|
|
Loading…
Reference in New Issue