sync integration
This commit is contained in:
parent
f7fb9b023f
commit
e0332e9c12
|
@ -20,7 +20,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t vnodeSyncOpen(SVnode *pVnode);
|
int32_t vnodeSyncOpen(SVnode *pVnode, char *path);
|
||||||
int32_t vnodeSyncStart(SVnode *pVnode);
|
int32_t vnodeSyncStart(SVnode *pVnode);
|
||||||
void vnodeSyncClose(SVnode *pVnode);
|
void vnodeSyncClose(SVnode *pVnode);
|
||||||
|
|
||||||
|
|
|
@ -82,6 +82,18 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
||||||
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
if (tjsonAddIntegerToObject(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
||||||
|
|
||||||
|
// sync integration
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1;
|
||||||
|
if (tjsonAddIntegerToObject(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1;
|
||||||
|
SJson *pNodeInfoArr = tjsonCreateArray();
|
||||||
|
tjsonAddItemToObject(pJson, "syncCfg.nodeInfo", pNodeInfoArr);
|
||||||
|
for (int i = 0; i < pCfg->syncCfg.replicaNum; ++i) {
|
||||||
|
SJson *pNodeInfo = tjsonCreateObject();
|
||||||
|
tjsonAddIntegerToObject(pNodeInfo, "nodePort", (pCfg->syncCfg.nodeInfo)[i].nodePort);
|
||||||
|
tjsonAddStringToObject(pNodeInfo, "nodeFqdn", (pCfg->syncCfg.nodeInfo)[i].nodeFqdn);
|
||||||
|
tjsonAddItemToArray(pNodeInfoArr, pNodeInfo);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -120,6 +132,21 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
|
||||||
if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
if (tjsonGetNumberValue(pJson, "hashEnd", pCfg->hashEnd) < 0) return -1;
|
||||||
if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
if (tjsonGetNumberValue(pJson, "hashMethod", pCfg->hashMethod) < 0) return -1;
|
||||||
|
|
||||||
|
// sync integration
|
||||||
|
if (tjsonGetNumberValue(pJson, "syncCfg.replicaNum", pCfg->syncCfg.replicaNum) < 0) return -1;
|
||||||
|
if (tjsonGetNumberValue(pJson, "syncCfg.myIndex", pCfg->syncCfg.myIndex) < 0) return -1;
|
||||||
|
|
||||||
|
SJson *pNodeInfoArr = tjsonGetObjectItem(pJson, "syncCfg.nodeInfo");
|
||||||
|
int arraySize = tjsonGetArraySize(pNodeInfoArr);
|
||||||
|
assert(arraySize == pCfg->syncCfg.replicaNum);
|
||||||
|
|
||||||
|
for (int i = 0; i < arraySize; ++i) {
|
||||||
|
cJSON *pNodeInfo = tjsonGetArrayItem(pNodeInfoArr, i);
|
||||||
|
assert(pNodeInfo != NULL);
|
||||||
|
tjsonGetNumberValue(pNodeInfo, "nodePort", (pCfg->syncCfg.nodeInfo)[i].nodePort);
|
||||||
|
tjsonGetStringValue(pNodeInfo, "nodeFqdn", (pCfg->syncCfg.nodeInfo)[i].nodeFqdn);
|
||||||
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -126,7 +126,7 @@ SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb) {
|
||||||
|
|
||||||
// sync integration
|
// sync integration
|
||||||
// open sync
|
// open sync
|
||||||
if (vnodeSyncOpen(pVnode)) {
|
if (vnodeSyncOpen(pVnode, dir)) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
|
|
||||||
// sync integration
|
// sync integration
|
||||||
|
|
||||||
int32_t vnodeSyncOpen(SVnode *pVnode) {
|
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
||||||
SSyncInfo syncInfo;
|
SSyncInfo syncInfo;
|
||||||
syncInfo.vgId = pVnode->config.vgId;
|
syncInfo.vgId = pVnode->config.vgId;
|
||||||
SSyncCfg *pCfg = &(syncInfo.syncCfg);
|
SSyncCfg *pCfg = &(syncInfo.syncCfg);
|
||||||
|
@ -28,7 +28,7 @@ int32_t vnodeSyncOpen(SVnode *pVnode) {
|
||||||
pCfg->myIndex = pVnode->config.syncCfg.myIndex;
|
pCfg->myIndex = pVnode->config.syncCfg.myIndex;
|
||||||
memcpy(pCfg->nodeInfo, pVnode->config.syncCfg.nodeInfo, sizeof(pCfg->nodeInfo));
|
memcpy(pCfg->nodeInfo, pVnode->config.syncCfg.nodeInfo, sizeof(pCfg->nodeInfo));
|
||||||
|
|
||||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", pVnode->path);
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s/sync", path);
|
||||||
syncInfo.pWal = pVnode->pWal;
|
syncInfo.pWal = pVnode->pWal;
|
||||||
|
|
||||||
syncInfo.pFsm = syncVnodeMakeFsm(pVnode);
|
syncInfo.pFsm = syncVnodeMakeFsm(pVnode);
|
||||||
|
|
Loading…
Reference in New Issue