sync index

This commit is contained in:
Minghao Li 2022-03-14 14:05:40 +08:00
parent 51ceed983f
commit 1a78917767
7 changed files with 72 additions and 29 deletions

View File

@ -27,6 +27,9 @@ extern "C" {
#include "syncRaftEntry.h" #include "syncRaftEntry.h"
#include "taosdef.h" #include "taosdef.h"
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
typedef struct SSyncLogStoreData { typedef struct SSyncLogStoreData {
SSyncNode* pSyncNode; SSyncNode* pSyncNode;
SWal* pWal; SWal* pWal;

View File

@ -52,6 +52,8 @@ const char* syncUtilState2String(ESyncState state);
bool syncUtilCanPrint(char c); bool syncUtilCanPrint(char c);
char* syncUtilprintBin(char* ptr, uint32_t len); char* syncUtilprintBin(char* ptr, uint32_t len);
char* syncUtilprintBin2(char* ptr, uint32_t len); char* syncUtilprintBin2(char* ptr, uint32_t len);
SyncIndex syncUtilMinIndex(SyncIndex a, SyncIndex b);
SyncIndex syncUtilMaxIndex(SyncIndex a, SyncIndex b);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -327,33 +327,6 @@ char* syncNode2Str(const SSyncNode* pSyncNode) {
return serialized; return serialized;
} }
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void syncNodePrint2(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void syncNodeLog(SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTrace("syncNodeLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void syncNodeLog2(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTrace("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
SEpSet epSet; SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet); syncUtilraftId2EpSet(destRaftId, &epSet);
@ -491,6 +464,33 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
return 0; return 0;
} }
// for debug --------------
void syncNodePrint(SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void syncNodePrint2(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void syncNodeLog(SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTrace("syncNodeLog | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void syncNodeLog2(char* s, SSyncNode* pObj) {
char* serialized = syncNode2Str(pObj);
sTrace("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}
// ------ local funciton --------- // ------ local funciton ---------
static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
int32_t ret = 0; int32_t ret = 0;

View File

@ -130,7 +130,7 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) {
cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal);
cJSON_AddStringToObject(pRoot, "pWal", u64buf); cJSON_AddStringToObject(pRoot, "pWal", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastIndex(pLogStore)); snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore));
cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); cJSON_AddStringToObject(pRoot, "LastIndex", u64buf);
snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore)); snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore));
cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); cJSON_AddStringToObject(pRoot, "LastTerm", u64buf);

View File

@ -14,7 +14,9 @@
*/ */
#include "syncReplication.h" #include "syncReplication.h"
#include "syncIndexMgr.h"
#include "syncMessage.h" #include "syncMessage.h"
#include "syncRaftEntry.h"
// TLA+ Spec // TLA+ Spec
// AppendEntries(i, j) == // AppendEntries(i, j) ==
@ -42,7 +44,24 @@
// /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>> // /\ UNCHANGED <<serverVars, candidateVars, leaderVars, logVars>>
// //
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
int32_t ret = 0; int32_t ret = 0;
for (int i = 0; i < pSyncNode->peersNum; ++i) {
SRaftId* pDestId = &(pSyncNode->peersId[i]);
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
SyncIndex preLogIndex = nextIndex - 1;
SyncTerm preLogTerm = 0;
if (preLogIndex >= 0) {
SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex);
preLogTerm = pPreEntry->term;
} else {
preLogTerm = 0;
}
// SyncTerm lastIndex =
// pSyncNode->pLogStore->getLastIndex < nextIndex ? pSyncNode->pLogStore->getLastIndex : nextIndex;
}
return ret; return ret;
} }

View File

@ -185,3 +185,13 @@ char* syncUtilprintBin2(char* ptr, uint32_t len) {
} }
return s; return s;
} }
SyncIndex syncUtilMinIndex(SyncIndex a, SyncIndex b) {
SyncIndex r = a < b ? a : b;
return r;
}
SyncIndex syncUtilMaxIndex(SyncIndex a, SyncIndex b) {
SyncIndex r = a > b ? a : b;
return r;
}

View File

@ -81,7 +81,10 @@ SSyncNode* syncNodeInit() {
SSyncNode* syncInitTest() { return syncNodeInit(); } SSyncNode* syncInitTest() { return syncNodeInit(); }
void logStoreTest() { void logStoreTest() {
logStorePrint(pSyncNode->pLogStore); logStorePrint2((char*)"logStoreTest2", pSyncNode->pLogStore);
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_INVALID);
for (int i = 0; i < 5; ++i) { for (int i = 0; i < 5; ++i) {
int32_t dataLen = 10; int32_t dataLen = 10;
SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); SSyncRaftEntry* pEntry = syncEntryBuild(dataLen);
@ -97,6 +100,10 @@ void logStoreTest() {
// syncEntryPrint2((char*)"write entry:", pEntry); // syncEntryPrint2((char*)"write entry:", pEntry);
pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry); pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry);
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
if (i == 0) {
assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_BEGIN);
}
} }
logStorePrint(pSyncNode->pLogStore); logStorePrint(pSyncNode->pLogStore);
@ -129,6 +136,8 @@ int main(int argc, char** argv) {
ret = syncEnvStart(); ret = syncEnvStart();
assert(ret == 0); assert(ret == 0);
taosRemoveDir("./wal_test");
pSyncNode = syncInitTest(); pSyncNode = syncInitTest();
assert(pSyncNode != NULL); assert(pSyncNode != NULL);