refactor(sync): syncUtilJson2Line
This commit is contained in:
parent
2547e8d02e
commit
77cd6f44f4
|
@ -41,8 +41,8 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr);
|
||||||
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr);
|
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr);
|
||||||
void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index);
|
void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index);
|
||||||
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
|
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
|
||||||
cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr);
|
cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr);
|
||||||
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr);
|
char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr);
|
||||||
|
|
||||||
// void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term);
|
// void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term);
|
||||||
// SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
|
// SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
|
||||||
|
|
|
@ -61,6 +61,7 @@ bool syncUtilIsData(tmsg_t msgType);
|
||||||
bool syncUtilUserPreCommit(tmsg_t msgType);
|
bool syncUtilUserPreCommit(tmsg_t msgType);
|
||||||
bool syncUtilUserCommit(tmsg_t msgType);
|
bool syncUtilUserCommit(tmsg_t msgType);
|
||||||
bool syncUtilUserRollback(tmsg_t msgType);
|
bool syncUtilUserRollback(tmsg_t msgType);
|
||||||
|
void syncUtilJson2Line(char* jsonStr);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -296,7 +296,7 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
|
||||||
|
|
||||||
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
|
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
|
||||||
cJSON *pJson = snapshotSender2Json(pSender);
|
cJSON *pJson = snapshotSender2Json(pSender);
|
||||||
char *serialized = cJSON_Print(pJson);
|
char * serialized = cJSON_Print(pJson);
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
@ -416,7 +416,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
||||||
|
|
||||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
|
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
|
||||||
cJSON *pJson = snapshotReceiver2Json(pReceiver);
|
cJSON *pJson = snapshotReceiver2Json(pReceiver);
|
||||||
char *serialized = cJSON_Print(pJson);
|
char * serialized = cJSON_Print(pJson);
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
@ -438,7 +438,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("snapshot recv begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
sTrace("snapshot recv begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex,
|
||||||
|
pMsg->lastTerm, msgStr);
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
|
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
|
||||||
|
@ -447,10 +448,13 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
|
pSyncNode->pFsm->FpSnapshotStopWrite(pSyncNode->pFsm, pReceiver->pWriter, true);
|
||||||
|
|
||||||
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
|
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, pMsg->lastIndex + 1);
|
||||||
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
|
char * logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
||||||
sInfo("snapshot recv finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, snapshot.lastApplyTerm:%lu, raft log:%s", pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr);
|
sInfo(
|
||||||
|
"snapshot recv finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, "
|
||||||
|
"snapshot.lastApplyTerm:%lu, raft log:%s",
|
||||||
|
pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr);
|
||||||
taosMemoryFree(logSimpleStr);
|
taosMemoryFree(logSimpleStr);
|
||||||
|
|
||||||
// walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex);
|
// walRestoreFromSnapshot(pSyncNode->pWal, pMsg->lastIndex);
|
||||||
|
@ -462,7 +466,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("snapshot recv end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
sTrace("snapshot recv end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex,
|
||||||
|
pMsg->lastTerm, msgStr);
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
|
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
|
||||||
|
@ -471,9 +476,9 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
needRsp = false;
|
needRsp = false;
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("snapshot recv force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
sTrace("snapshot recv force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
|
||||||
|
pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
||||||
|
|
||||||
|
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
|
} else if (pMsg->seq > SYNC_SNAPSHOT_SEQ_BEGIN && pMsg->seq < SYNC_SNAPSHOT_SEQ_END) {
|
||||||
|
@ -485,7 +490,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("snapshot recv receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
sTrace("snapshot recv receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
|
||||||
|
pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -240,4 +240,26 @@ bool syncUtilUserRollback(tmsg_t msgType) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncUtilJson2Line(char* jsonStr) {
|
||||||
|
int p, q, len;
|
||||||
|
p = 0;
|
||||||
|
q = 1;
|
||||||
|
len = strlen(jsonStr);
|
||||||
|
while (1) {
|
||||||
|
if (jsonStr[q] == '\0') {
|
||||||
|
jsonStr[p + 1] = '\0';
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (jsonStr[q] == '\n' || jsonStr[q] == ' ' || jsonStr[q] == '\t') {
|
||||||
|
q++;
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
jsonStr[p + 1] = jsonStr[q];
|
||||||
|
p++;
|
||||||
|
q++;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -66,9 +66,9 @@ int main(int argc, char** argv) {
|
||||||
syncIndexMgrSetIndex(pSyncIndexMgr, &ids[0], 100);
|
syncIndexMgrSetIndex(pSyncIndexMgr, &ids[0], 100);
|
||||||
syncIndexMgrSetIndex(pSyncIndexMgr, &ids[1], 200);
|
syncIndexMgrSetIndex(pSyncIndexMgr, &ids[1], 200);
|
||||||
syncIndexMgrSetIndex(pSyncIndexMgr, &ids[2], 300);
|
syncIndexMgrSetIndex(pSyncIndexMgr, &ids[2], 300);
|
||||||
//syncIndexMgrSetTerm(pSyncIndexMgr, &ids[0], 700);
|
// syncIndexMgrSetTerm(pSyncIndexMgr, &ids[0], 700);
|
||||||
//syncIndexMgrSetTerm(pSyncIndexMgr, &ids[1], 800);
|
// syncIndexMgrSetTerm(pSyncIndexMgr, &ids[1], 800);
|
||||||
//syncIndexMgrSetTerm(pSyncIndexMgr, &ids[2], 900);
|
// syncIndexMgrSetTerm(pSyncIndexMgr, &ids[2], 900);
|
||||||
{
|
{
|
||||||
char* serialized = syncIndexMgr2Str(pSyncIndexMgr);
|
char* serialized = syncIndexMgr2Str(pSyncIndexMgr);
|
||||||
assert(serialized != NULL);
|
assert(serialized != NULL);
|
||||||
|
@ -80,8 +80,8 @@ int main(int argc, char** argv) {
|
||||||
printf("---------------------------------------\n");
|
printf("---------------------------------------\n");
|
||||||
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
|
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
|
||||||
SyncIndex idx = syncIndexMgrGetIndex(pSyncIndexMgr, &ids[i]);
|
SyncIndex idx = syncIndexMgrGetIndex(pSyncIndexMgr, &ids[i]);
|
||||||
//SyncTerm term = syncIndexMgrGetTerm(pSyncIndexMgr, &ids[i]);
|
// SyncTerm term = syncIndexMgrGetTerm(pSyncIndexMgr, &ids[i]);
|
||||||
//printf("%d: index:%ld term:%lu \n", i, idx, term);
|
// printf("%d: index:%ld term:%lu \n", i, idx, term);
|
||||||
}
|
}
|
||||||
printf("---------------------------------------\n");
|
printf("---------------------------------------\n");
|
||||||
|
|
||||||
|
|
|
@ -78,6 +78,26 @@ void test5() {
|
||||||
syncTimeoutDestroy(pMsg2);
|
syncTimeoutDestroy(pMsg2);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void test6() {
|
||||||
|
SyncTimeout *pMsg = createMsg();
|
||||||
|
char * jsonStr = syncTimeout2Str(pMsg);
|
||||||
|
sTrace("jsonStr: %s", jsonStr);
|
||||||
|
|
||||||
|
syncUtilJson2Line(jsonStr);
|
||||||
|
sTrace("jsonStr: %s", jsonStr);
|
||||||
|
|
||||||
|
char str[10];
|
||||||
|
snprintf(str, sizeof(str), "%s", "{}");
|
||||||
|
sTrace("str: %s", str);
|
||||||
|
syncUtilJson2Line(str);
|
||||||
|
sTrace("str: %s", str);
|
||||||
|
|
||||||
|
snprintf(str, sizeof(str), "%s", "");
|
||||||
|
sTrace("str: %s", str);
|
||||||
|
syncUtilJson2Line(str);
|
||||||
|
sTrace("str: %s", str);
|
||||||
|
}
|
||||||
|
|
||||||
int main() {
|
int main() {
|
||||||
tsAsyncLog = 0;
|
tsAsyncLog = 0;
|
||||||
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||||
|
@ -88,6 +108,7 @@ int main() {
|
||||||
test3();
|
test3();
|
||||||
test4();
|
test4();
|
||||||
test5();
|
test5();
|
||||||
|
test6();
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue