Merge pull request #12715 from taosdata/fix/dnode
refactor: adjust msgcb
This commit is contained in:
commit
cec8ecfe09
|
@ -20,14 +20,11 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
//#include <tdatablock.h>
|
||||
#include "os.h"
|
||||
|
||||
#include "cJSON.h"
|
||||
#include "tdef.h"
|
||||
//#include "taosdef.h"
|
||||
//#include "trpc.h"
|
||||
//#include "wal.h"
|
||||
#include "tmsgcb.h"
|
||||
|
||||
typedef uint64_t SyncNodeId;
|
||||
typedef int32_t SyncGroupId;
|
||||
|
@ -132,11 +129,10 @@ typedef struct SSyncInfo {
|
|||
char path[TSDB_FILENAME_LEN];
|
||||
SWal* pWal;
|
||||
SSyncFSM* pFsm;
|
||||
SMsgCb* msgcb;
|
||||
|
||||
void* rpcClient;
|
||||
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||
void* queue;
|
||||
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
|
||||
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
|
||||
|
||||
} SSyncInfo;
|
||||
|
||||
|
|
|
@ -20,13 +20,10 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#include <stdbool.h>
|
||||
#include <stdint.h>
|
||||
//#include <tdatablock.h>
|
||||
#include "os.h"
|
||||
|
||||
#include "cJSON.h"
|
||||
//#include "taosdef.h"
|
||||
#include "trpc.h"
|
||||
//#include "wal.h"
|
||||
|
||||
// ------------------ ds -------------------
|
||||
typedef struct SRaftId {
|
||||
|
@ -43,8 +40,7 @@ void syncNodeRelease(SSyncNode* pNode);
|
|||
|
||||
int32_t syncGetRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg);
|
||||
int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg);
|
||||
void syncSetQ(int64_t rid, void* queueHandle);
|
||||
void syncSetRpc(int64_t rid, void* rpcHandle);
|
||||
void syncSetMsgCb(int64_t rid, const SMsgCb* msgcb);
|
||||
char* sync2SimpleStr(int64_t rid);
|
||||
|
||||
// set timer ms
|
||||
|
|
|
@ -85,10 +85,9 @@ int vnodeAsyncCommit(SVnode* pVnode);
|
|||
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
||||
int32_t vnodeSyncStart(SVnode* pVnode);
|
||||
void vnodeSyncClose(SVnode* pVnode);
|
||||
void vnodeSyncSetQ(SVnode* pVnode, void* qHandle);
|
||||
void vnodeSyncSetRpc(SVnode* pVnode, void* rpcHandle);
|
||||
int32_t vnodeSyncEqMsg(void* qHandle, SRpcMsg* pMsg);
|
||||
int32_t vnodeSendMsg(void* rpcHandle, const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||
void vnodeSyncSetMsgCb(SVnode* pVnode);
|
||||
int32_t vnodeSyncEqMsg(const SMsgCb* msgcb, SRpcMsg* pMsg);
|
||||
int32_t vnodeSyncSendMsg(const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||
void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||
void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||
void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||
|
|
|
@ -180,8 +180,7 @@ void vnodeClose(SVnode *pVnode) {
|
|||
|
||||
// start the sync timer after the queue is ready
|
||||
int32_t vnodeStart(SVnode *pVnode) {
|
||||
vnodeSyncSetQ(pVnode, NULL);
|
||||
vnodeSyncSetRpc(pVnode, NULL);
|
||||
vnodeSyncSetMsgCb(pVnode);
|
||||
vnodeSyncStart(pVnode);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -27,9 +27,8 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
|||
syncInfo.pWal = pVnode->pWal;
|
||||
|
||||
syncInfo.pFsm = syncVnodeMakeFsm(pVnode);
|
||||
syncInfo.rpcClient = NULL;
|
||||
syncInfo.FpSendMsg = vnodeSendMsg;
|
||||
syncInfo.queue = NULL;
|
||||
syncInfo.msgcb = NULL;
|
||||
syncInfo.FpSendMsg = vnodeSyncSendMsg;
|
||||
syncInfo.FpEqMsg = vnodeSyncEqMsg;
|
||||
|
||||
pVnode->sync = syncOpen(&syncInfo);
|
||||
|
@ -53,31 +52,13 @@ void vnodeSyncClose(SVnode *pVnode) {
|
|||
syncStop(pVnode->sync);
|
||||
}
|
||||
|
||||
void vnodeSyncSetQ(SVnode *pVnode, void *qHandle) { syncSetQ(pVnode->sync, (void *)(&(pVnode->msgCb))); }
|
||||
void vnodeSyncSetMsgCb(SVnode *pVnode) { syncSetMsgCb(pVnode->sync, &pVnode->msgCb); }
|
||||
|
||||
void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle) { syncSetRpc(pVnode->sync, (void *)(&(pVnode->msgCb))); }
|
||||
int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { return tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg); }
|
||||
|
||||
int32_t vnodeSyncEqMsg(void *qHandle, SRpcMsg *pMsg) {
|
||||
int32_t ret = 0;
|
||||
SMsgCb *pMsgCb = qHandle;
|
||||
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
|
||||
tmsgPutToQueue(qHandle, SYNC_QUEUE, pMsg);
|
||||
} else {
|
||||
vError("vnodeSyncEqMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
int32_t ret = 0;
|
||||
SMsgCb *pMsgCb = rpcHandle;
|
||||
if (pMsgCb->queueFps[SYNC_QUEUE] != NULL) {
|
||||
int32_t vnodeSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
pMsg->info.noResp = 1;
|
||||
tmsgSendReq(pEpSet, pMsg);
|
||||
} else {
|
||||
vError("vnodeSendMsg queue is NULL, SYNC_QUEUE:%d", SYNC_QUEUE);
|
||||
}
|
||||
return ret;
|
||||
return tmsgSendReq(pEpSet, pMsg);
|
||||
}
|
||||
|
||||
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||
|
|
|
@ -36,10 +36,10 @@ typedef struct SSyncIO {
|
|||
STaosQueue *pMsgQ;
|
||||
STaosQset * pQset;
|
||||
TdThread consumerTid;
|
||||
|
||||
void * serverRpc;
|
||||
void * clientRpc;
|
||||
void *serverRpc;
|
||||
void *clientRpc;
|
||||
SEpSet myAddr;
|
||||
SMsgCb msgcb;
|
||||
|
||||
tmr_h qTimer;
|
||||
int32_t qTimerMS;
|
||||
|
@ -65,8 +65,8 @@ extern SSyncIO *gSyncIO;
|
|||
|
||||
int32_t syncIOStart(char *host, uint16_t port);
|
||||
int32_t syncIOStop();
|
||||
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg);
|
||||
int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg);
|
||||
int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg);
|
||||
int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg);
|
||||
|
||||
int32_t syncIOQTimerStart();
|
||||
int32_t syncIOQTimerStop();
|
||||
|
|
|
@ -160,10 +160,9 @@ typedef struct SSyncNode {
|
|||
|
||||
// sync io
|
||||
SWal* pWal;
|
||||
void* rpcClient;
|
||||
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||
void* queue;
|
||||
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
|
||||
const SMsgCb* msgcb;
|
||||
int32_t (*FpSendMsg)(const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||
int32_t (*FpEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg);
|
||||
|
||||
// init internal
|
||||
SNodeInfo myNodeInfo;
|
||||
|
|
|
@ -66,7 +66,7 @@ int32_t syncIOStop() {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
int32_t syncIOSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
||||
assert(pEpSet->inUse == 0);
|
||||
assert(pEpSet->numOfEps == 1);
|
||||
|
||||
|
@ -83,11 +83,11 @@ int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
|
|||
|
||||
pMsg->info.handle = NULL;
|
||||
pMsg->info.noResp = 1;
|
||||
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
|
||||
rpcSendRequest(gSyncIO->clientRpc, pEpSet, pMsg, NULL);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
|
||||
int32_t syncIOEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||
int32_t ret = 0;
|
||||
char logBuf[128];
|
||||
syncRpcMsgLog2((char *)"==syncIOEqMsg==", pMsg);
|
||||
|
@ -96,7 +96,7 @@ int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
|
|||
pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
|
||||
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
|
||||
|
||||
STaosQueue *pMsgQ = queue;
|
||||
STaosQueue *pMsgQ = gSyncIO->pMsgQ;
|
||||
taosWriteQitem(pMsgQ, pTemp);
|
||||
|
||||
return ret;
|
||||
|
|
|
@ -240,26 +240,14 @@ int32_t syncGetAndDelRespRpc(int64_t rid, uint64_t index, SRpcMsg* msg) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
void syncSetQ(int64_t rid, void* queue) {
|
||||
void syncSetMsgCb(int64_t rid, const SMsgCb *msgcb) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
sTrace("syncSetQ get pSyncNode is NULL, rid:%ld", rid);
|
||||
return;
|
||||
}
|
||||
assert(rid == pSyncNode->rid);
|
||||
pSyncNode->queue = queue;
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
}
|
||||
|
||||
void syncSetRpc(int64_t rid, void* rpcHandle) {
|
||||
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
sTrace("syncSetRpc get pSyncNode is NULL, rid:%ld", rid);
|
||||
return;
|
||||
}
|
||||
assert(rid == pSyncNode->rid);
|
||||
pSyncNode->rpcClient = rpcHandle;
|
||||
pSyncNode->msgcb = msgcb;
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
}
|
||||
|
@ -332,7 +320,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
|
|||
SRpcMsg rpcMsg;
|
||||
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
if (pSyncNode->FpEqMsg != NULL) {
|
||||
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
|
||||
pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||
} else {
|
||||
sTrace("syncPropose pSyncNode->FpEqMsg is NULL");
|
||||
}
|
||||
|
@ -375,9 +363,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
|
|||
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);
|
||||
|
||||
pSyncNode->pWal = pSyncInfo->pWal;
|
||||
pSyncNode->rpcClient = pSyncInfo->rpcClient;
|
||||
pSyncNode->msgcb = pSyncInfo->msgcb;
|
||||
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
|
||||
pSyncNode->queue = pSyncInfo->queue;
|
||||
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
|
||||
|
||||
// init raft config
|
||||
|
@ -691,7 +678,7 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
|
|||
// htonl
|
||||
syncUtilMsgHtoN(pMsg->pCont);
|
||||
|
||||
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
|
||||
pSyncNode->FpSendMsg(&epSet, pMsg);
|
||||
} else {
|
||||
sTrace("syncNodeSendMsgById pSyncNode->FpSendMsg is NULL");
|
||||
}
|
||||
|
@ -706,7 +693,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
|
|||
// htonl
|
||||
syncUtilMsgHtoN(pMsg->pCont);
|
||||
|
||||
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
|
||||
pSyncNode->FpSendMsg(&epSet, pMsg);
|
||||
} else {
|
||||
sTrace("syncNodeSendMsgByInfo pSyncNode->FpSendMsg is NULL");
|
||||
}
|
||||
|
@ -728,12 +715,12 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) {
|
|||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal);
|
||||
cJSON_AddStringToObject(pRoot, "pWal", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
|
||||
cJSON_AddStringToObject(pRoot, "rpcClient", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpSendMsg);
|
||||
cJSON_AddStringToObject(pRoot, "FpSendMsg", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->queue);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->msgcb);
|
||||
cJSON_AddStringToObject(pRoot, "queue", u64buf);
|
||||
snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->FpEqMsg);
|
||||
cJSON_AddStringToObject(pRoot, "FpEqMsg", u64buf);
|
||||
|
@ -1095,7 +1082,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) {
|
|||
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
syncRpcMsgLog2((char*)"==syncNodeEqPingTimer==", &rpcMsg);
|
||||
if (pSyncNode->FpEqMsg != NULL) {
|
||||
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
|
||||
pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||
} else {
|
||||
sTrace("syncNodeEqPingTimer pSyncNode->FpEqMsg is NULL");
|
||||
}
|
||||
|
@ -1118,7 +1105,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) {
|
|||
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
syncRpcMsgLog2((char*)"==syncNodeEqElectTimer==", &rpcMsg);
|
||||
if (pSyncNode->FpEqMsg != NULL) {
|
||||
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
|
||||
pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||
} else {
|
||||
sTrace("syncNodeEqElectTimer pSyncNode->FpEqMsg is NULL");
|
||||
}
|
||||
|
@ -1145,7 +1132,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
|||
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
|
||||
if (pSyncNode->FpEqMsg != NULL) {
|
||||
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
|
||||
pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||
} else {
|
||||
sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL");
|
||||
}
|
||||
|
@ -1175,10 +1162,10 @@ static int32_t syncNodeEqNoop(SSyncNode* ths) {
|
|||
assert(pSyncMsg->dataLen == entryLen);
|
||||
memcpy(pSyncMsg->data, serialized, entryLen);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
SRpcMsg rpcMsg = {0};
|
||||
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
if (ths->FpEqMsg != NULL) {
|
||||
ths->FpEqMsg(ths->queue, &rpcMsg);
|
||||
ths->FpEqMsg(ths->msgcb, &rpcMsg);
|
||||
} else {
|
||||
sTrace("syncNodeEqNoop pSyncNode->FpEqMsg is NULL");
|
||||
}
|
||||
|
|
|
@ -100,9 +100,8 @@ SWal* createWal(char* path, int32_t vgId) {
|
|||
int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy) {
|
||||
SSyncInfo syncInfo;
|
||||
syncInfo.vgId = vgId;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = createFsm();
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
|
||||
|
|
|
@ -44,9 +44,8 @@ SWal* createWal(char* path, int32_t vgId) {
|
|||
SSyncNode* createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path) {
|
||||
SSyncInfo syncInfo;
|
||||
syncInfo.vgId = vgId;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = NULL;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
|
||||
|
|
|
@ -31,9 +31,8 @@ SSyncNode *pSyncNode;
|
|||
|
||||
SSyncNode *syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||
|
|
|
@ -25,9 +25,7 @@ SSyncFSM* pFsm;
|
|||
|
||||
SSyncNode* syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||
|
@ -99,7 +97,7 @@ int main(int argc, char** argv) {
|
|||
SyncPingReply* pSyncMsg = syncPingReplyBuild2(&pSyncNode->myRaftId, &pSyncNode->myRaftId, 1000, "syncEnqTest");
|
||||
SRpcMsg rpcMsg;
|
||||
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
|
||||
pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
||||
|
|
|
@ -43,7 +43,7 @@ int main() {
|
|||
SRpcMsg rpcMsg;
|
||||
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
|
||||
syncIOSendMsg(gSyncIO->clientRpc, &epSet, &rpcMsg);
|
||||
syncIOSendMsg(&epSet, &rpcMsg);
|
||||
taosSsleep(1);
|
||||
}
|
||||
|
||||
|
|
|
@ -25,9 +25,8 @@ SSyncFSM* pFsm;
|
|||
|
||||
SSyncNode* syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||
|
@ -103,7 +102,7 @@ int main(int argc, char** argv) {
|
|||
|
||||
SEpSet epSet;
|
||||
syncUtilnodeInfo2EpSet(&pSyncNode->myNodeInfo, &epSet);
|
||||
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, &rpcMsg);
|
||||
pSyncNode->FpSendMsg(&epSet, &rpcMsg);
|
||||
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
|
|
@ -28,9 +28,8 @@ SSyncNode* pSyncNode;
|
|||
|
||||
SSyncNode* syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||
|
|
|
@ -25,9 +25,8 @@ SSyncFSM* pFsm;
|
|||
|
||||
SSyncNode* syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./sync_init_test");
|
||||
|
|
|
@ -25,9 +25,8 @@ SSyncFSM* pFsm;
|
|||
|
||||
SSyncNode* syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||
|
|
|
@ -25,9 +25,8 @@ SSyncFSM* pFsm;
|
|||
|
||||
SSyncNode* syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||
|
|
|
@ -25,9 +25,8 @@ SSyncFSM* pFsm;
|
|||
|
||||
SSyncNode* syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||
|
|
|
@ -97,9 +97,8 @@ SWal* createWal(char* path, int32_t vgId) {
|
|||
int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path) {
|
||||
SSyncInfo syncInfo;
|
||||
syncInfo.vgId = vgId;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = createFsm();
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
|
||||
|
|
|
@ -83,9 +83,8 @@ void initFsm() {
|
|||
|
||||
SSyncNode *syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir);
|
||||
|
@ -200,7 +199,7 @@ int main(int argc, char **argv) {
|
|||
SyncClientRequest *pSyncClientRequest = pMsg1;
|
||||
SRpcMsg rpcMsg;
|
||||
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
|
||||
gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg);
|
||||
gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg);
|
||||
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
|
|
@ -27,9 +27,8 @@ SSyncNode* pSyncNode;
|
|||
|
||||
SSyncNode* syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||
|
|
|
@ -27,9 +27,8 @@ SSyncNode* pSyncNode;
|
|||
|
||||
SSyncNode* syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./");
|
||||
|
|
|
@ -62,9 +62,8 @@ void initFsm() {
|
|||
|
||||
SSyncNode *syncNodeInit() {
|
||||
syncInfo.vgId = 1234;
|
||||
syncInfo.rpcClient = gSyncIO->clientRpc;
|
||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||
syncInfo.FpSendMsg = syncIOSendMsg;
|
||||
syncInfo.queue = gSyncIO->pMsgQ;
|
||||
syncInfo.FpEqMsg = syncIOEqMsg;
|
||||
syncInfo.pFsm = pFsm;
|
||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir);
|
||||
|
@ -178,7 +177,7 @@ int main(int argc, char **argv) {
|
|||
SyncClientRequest *pSyncClientRequest = pMsg1;
|
||||
SRpcMsg rpcMsg;
|
||||
syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
|
||||
gSyncNode->FpEqMsg(gSyncNode->queue, &rpcMsg);
|
||||
gSyncNode->FpEqMsg(gSyncNode->msgcb, &rpcMsg);
|
||||
|
||||
taosMsleep(1000);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue