Merge pull request #24542 from taosdata/fix/refactorRetry1
refactor retry
This commit is contained in:
commit
86272dee4e
|
@ -47,10 +47,11 @@ typedef struct SCorEpSet {
|
||||||
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
|
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp);
|
||||||
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
|
void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port);
|
||||||
|
|
||||||
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2);
|
bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2);
|
||||||
void epsetAssign(SEpSet* dst, const SEpSet* pSrc);
|
void epsetAssign(SEpSet* dst, const SEpSet* pSrc);
|
||||||
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet);
|
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet);
|
||||||
SEpSet getEpSet_s(SCorEpSet* pEpSet);
|
SEpSet getEpSet_s(SCorEpSet* pEpSet);
|
||||||
|
void epsetSort(SEpSet* pEpSet);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -15,11 +15,8 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "tmisce.h"
|
#include "tmisce.h"
|
||||||
#include "tjson.h"
|
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "tlog.h"
|
#include "tjson.h"
|
||||||
#include "tname.h"
|
|
||||||
|
|
||||||
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
|
int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) {
|
||||||
pEp->port = 0;
|
pEp->port = 0;
|
||||||
memset(pEp->fqdn, 0, TSDB_FQDN_LEN);
|
memset(pEp->fqdn, 0, TSDB_FQDN_LEN);
|
||||||
|
@ -63,7 +60,7 @@ bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) {
|
||||||
|
|
||||||
void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) {
|
void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) {
|
||||||
if (pSrc == NULL || pDst == NULL) {
|
if (pSrc == NULL || pDst == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pDst->inUse = pSrc->inUse;
|
pDst->inUse = pSrc->inUse;
|
||||||
|
@ -73,6 +70,47 @@ void epsetAssign(SEpSet* pDst, const SEpSet* pSrc) {
|
||||||
tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn));
|
tstrncpy(pDst->eps[i].fqdn, pSrc->eps[i].fqdn, tListLen(pSrc->eps[i].fqdn));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
void epAssign(SEp* pDst, SEp* pSrc) {
|
||||||
|
if (pSrc == NULL || pDst == NULL) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
memset(pDst->fqdn, 0, tListLen(pSrc->fqdn));
|
||||||
|
tstrncpy(pDst->fqdn, pSrc->fqdn, tListLen(pSrc->fqdn));
|
||||||
|
pDst->port = pSrc->port;
|
||||||
|
}
|
||||||
|
void epsetSort(SEpSet* pDst) {
|
||||||
|
if (pDst->numOfEps <= 1) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int validIdx = false;
|
||||||
|
SEp ep = {0};
|
||||||
|
if (pDst->inUse >= 0 && pDst->inUse < pDst->numOfEps) {
|
||||||
|
validIdx = true;
|
||||||
|
epAssign(&ep, &pDst->eps[pDst->inUse]);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < pDst->numOfEps - 1; i++) {
|
||||||
|
for (int j = 0; j < pDst->numOfEps - 1 - i; j++) {
|
||||||
|
SEp* f = &pDst->eps[j];
|
||||||
|
SEp* s = &pDst->eps[j + 1];
|
||||||
|
int cmp = strncmp(f->fqdn, s->fqdn, sizeof(f->fqdn));
|
||||||
|
if (cmp > 0 || (cmp == 0 && f->port > s->port)) {
|
||||||
|
SEp ep = {0};
|
||||||
|
epAssign(&ep, f);
|
||||||
|
epAssign(f, s);
|
||||||
|
epAssign(s, &ep);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (validIdx == true)
|
||||||
|
for (int i = 0; i < pDst->numOfEps; i++) {
|
||||||
|
int cmp = strncmp(ep.fqdn, pDst->eps[i].fqdn, sizeof(ep.fqdn));
|
||||||
|
if (cmp == 0 && ep.port == pDst->eps[i].port) {
|
||||||
|
pDst->inUse = i;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) {
|
void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) {
|
||||||
taosCorBeginWrite(&pEpSet->version);
|
taosCorBeginWrite(&pEpSet->version);
|
||||||
|
|
|
@ -12,9 +12,10 @@
|
||||||
#include "tcommon.h"
|
#include "tcommon.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tdef.h"
|
#include "tdef.h"
|
||||||
#include "tvariant.h"
|
#include "tmisce.h"
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
#include "ttokendef.h"
|
#include "ttokendef.h"
|
||||||
|
#include "tvariant.h"
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
//
|
//
|
||||||
|
@ -25,11 +26,10 @@ int main(int argc, char** argv) {
|
||||||
return RUN_ALL_TESTS();
|
return RUN_ALL_TESTS();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
TEST(testCase, toUIntegerEx_test) {
|
TEST(testCase, toUIntegerEx_test) {
|
||||||
uint64_t val = 0;
|
uint64_t val = 0;
|
||||||
|
|
||||||
char* s = "123";
|
char* s = "123";
|
||||||
int32_t ret = toUIntegerEx(s, strlen(s), TK_NK_INTEGER, &val);
|
int32_t ret = toUIntegerEx(s, strlen(s), TK_NK_INTEGER, &val);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(val, 123);
|
ASSERT_EQ(val, 123);
|
||||||
|
@ -59,7 +59,7 @@ TEST(testCase, toUIntegerEx_test) {
|
||||||
ASSERT_EQ(val, 18699);
|
ASSERT_EQ(val, 18699);
|
||||||
|
|
||||||
s = "-1";
|
s = "-1";
|
||||||
ret = toUIntegerEx(s, strlen(s),TK_NK_INTEGER, &val);
|
ret = toUIntegerEx(s, strlen(s), TK_NK_INTEGER, &val);
|
||||||
ASSERT_EQ(ret, -1);
|
ASSERT_EQ(ret, -1);
|
||||||
|
|
||||||
s = "-0b10010";
|
s = "-0b10010";
|
||||||
|
@ -103,7 +103,7 @@ TEST(testCase, toUIntegerEx_test) {
|
||||||
TEST(testCase, toIntegerEx_test) {
|
TEST(testCase, toIntegerEx_test) {
|
||||||
int64_t val = 0;
|
int64_t val = 0;
|
||||||
|
|
||||||
char* s = "123";
|
char* s = "123";
|
||||||
int32_t ret = toIntegerEx(s, strlen(s), TK_NK_INTEGER, &val);
|
int32_t ret = toIntegerEx(s, strlen(s), TK_NK_INTEGER, &val);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(val, 123);
|
ASSERT_EQ(val, 123);
|
||||||
|
@ -166,7 +166,7 @@ TEST(testCase, toIntegerEx_test) {
|
||||||
s = "-9223372036854775808";
|
s = "-9223372036854775808";
|
||||||
ret = toIntegerEx(s, strlen(s), TK_NK_INTEGER, &val);
|
ret = toIntegerEx(s, strlen(s), TK_NK_INTEGER, &val);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(val, -9223372036854775808);
|
// ASSERT_EQ(val, -9223372036854775808);
|
||||||
|
|
||||||
// out of range
|
// out of range
|
||||||
s = "9323372036854775807";
|
s = "9323372036854775807";
|
||||||
|
@ -186,7 +186,7 @@ TEST(testCase, toIntegerEx_test) {
|
||||||
TEST(testCase, toInteger_test) {
|
TEST(testCase, toInteger_test) {
|
||||||
int64_t val = 0;
|
int64_t val = 0;
|
||||||
|
|
||||||
char* s = "123";
|
char* s = "123";
|
||||||
int32_t ret = toInteger(s, strlen(s), 10, &val);
|
int32_t ret = toInteger(s, strlen(s), 10, &val);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(val, 123);
|
ASSERT_EQ(val, 123);
|
||||||
|
@ -223,10 +223,10 @@ TEST(testCase, toInteger_test) {
|
||||||
s = "-9223372036854775808";
|
s = "-9223372036854775808";
|
||||||
ret = toInteger(s, strlen(s), 10, &val);
|
ret = toInteger(s, strlen(s), 10, &val);
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
ASSERT_EQ(val, -9223372036854775808);
|
// ASSERT_EQ(val, -9223372036854775808);
|
||||||
|
|
||||||
// out of range
|
// out of range
|
||||||
s = "9323372036854775807";
|
s = "9323372036854775807";
|
||||||
ret = toInteger(s, strlen(s), 10, &val);
|
ret = toInteger(s, strlen(s), 10, &val);
|
||||||
ASSERT_EQ(ret, -1);
|
ASSERT_EQ(ret, -1);
|
||||||
|
|
||||||
|
@ -418,9 +418,10 @@ void check_tm(const STm* tm, int32_t y, int32_t mon, int32_t d, int32_t h, int32
|
||||||
ASSERT_EQ(tm->fsec, fsec);
|
ASSERT_EQ(tm->fsec, fsec);
|
||||||
}
|
}
|
||||||
|
|
||||||
void test_timestamp_tm_conversion(int64_t ts, int32_t precision, int32_t y, int32_t mon, int32_t d, int32_t h, int32_t m, int32_t s, int64_t fsec) {
|
void test_timestamp_tm_conversion(int64_t ts, int32_t precision, int32_t y, int32_t mon, int32_t d, int32_t h,
|
||||||
int64_t ts_tmp;
|
int32_t m, int32_t s, int64_t fsec) {
|
||||||
char buf[128] = {0};
|
int64_t ts_tmp;
|
||||||
|
char buf[128] = {0};
|
||||||
struct STm tm;
|
struct STm tm;
|
||||||
taosFormatUtcTime(buf, 128, ts, precision);
|
taosFormatUtcTime(buf, 128, ts, precision);
|
||||||
printf("formated ts of %ld, precision: %d is: %s\n", ts, precision, buf);
|
printf("formated ts of %ld, precision: %d is: %s\n", ts, precision, buf);
|
||||||
|
@ -457,7 +458,7 @@ TEST(timeTest, timestamp2tm) {
|
||||||
test_timestamp_tm_conversion(ts, TSDB_TIME_PRECISION_MILLI, 1970 - 1900, 0 /* mon start from 0*/, 1, 8, 0, 0,
|
test_timestamp_tm_conversion(ts, TSDB_TIME_PRECISION_MILLI, 1970 - 1900, 0 /* mon start from 0*/, 1, 8, 0, 0,
|
||||||
000000000L);
|
000000000L);
|
||||||
|
|
||||||
ts = -62198784343000; // milliseconds before epoch, Friday, January 1, -0001 12:00:00 AM GMT+08:06
|
ts = -62198784343000; // milliseconds before epoch, Friday, January 1, -0001 12:00:00 AM GMT+08:06
|
||||||
test_timestamp_tm_conversion(ts, TSDB_TIME_PRECISION_MILLI, -1 - 1900, 0 /* mon start from 0*/, 1,
|
test_timestamp_tm_conversion(ts, TSDB_TIME_PRECISION_MILLI, -1 - 1900, 0 /* mon start from 0*/, 1,
|
||||||
0 /* hour start from 0*/, 0, 0, 000000000L);
|
0 /* hour start from 0*/, 0, 0, 000000000L);
|
||||||
}
|
}
|
||||||
|
@ -472,7 +473,7 @@ void test_ts2char(int64_t ts, const char* format, int32_t precison, const char*
|
||||||
TEST(timeTest, ts2char) {
|
TEST(timeTest, ts2char) {
|
||||||
osDefaultInit();
|
osDefaultInit();
|
||||||
if (tsTimezone != TdEastZone8) GTEST_SKIP();
|
if (tsTimezone != TdEastZone8) GTEST_SKIP();
|
||||||
int64_t ts;
|
int64_t ts;
|
||||||
const char* format = "YYYY-MM-DD";
|
const char* format = "YYYY-MM-DD";
|
||||||
ts = 0;
|
ts = 0;
|
||||||
test_ts2char(ts, format, TSDB_TIME_PRECISION_MILLI, "1970-01-01");
|
test_ts2char(ts, format, TSDB_TIME_PRECISION_MILLI, "1970-01-01");
|
||||||
|
@ -493,12 +494,13 @@ TEST(timeTest, ts2char) {
|
||||||
"2023-023-23-3-2023-023-23-3-年-OCTOBER -OCT-October -Oct-october "
|
"2023-023-23-3-2023-023-23-3-年-OCTOBER -OCT-October -Oct-october "
|
||||||
"-oct-月-286-13-6-286-13-6-FRIDAY -Friday -friday -日");
|
"-oct-月-286-13-6-286-13-6-FRIDAY -Friday -friday -日");
|
||||||
#endif
|
#endif
|
||||||
ts = 1697182085123L; // Friday, October 13, 2023 3:28:05.123 PM GMT+08:00
|
ts = 1697182085123L; // Friday, October 13, 2023 3:28:05.123 PM GMT+08:00
|
||||||
test_ts2char(ts, "HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am", TSDB_TIME_PRECISION_MILLI,
|
test_ts2char(ts, "HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am", TSDB_TIME_PRECISION_MILLI,
|
||||||
"15:15:03:03:03:03:28:28:05:05:123:123:123000:123000:123000000:123000000:PM:PM:pm:pm");
|
"15:15:03:03:03:03:28:28:05:05:123:123:123000:123000:123000000:123000000:PM:PM:pm:pm");
|
||||||
|
|
||||||
// double quotes normal output
|
// double quotes normal output
|
||||||
test_ts2char(ts, "\\\"HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am\\\"", TSDB_TIME_PRECISION_MILLI,
|
test_ts2char(ts, "\\\"HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am\\\"",
|
||||||
|
TSDB_TIME_PRECISION_MILLI,
|
||||||
"\"15:15:03:03:03:03:28:28:05:05:123:123:123000:123000:123000000:123000000:PM:PM:pm:pm\"");
|
"\"15:15:03:03:03:03:28:28:05:05:123:123:123000:123000:123000000:123000000:PM:PM:pm:pm\"");
|
||||||
test_ts2char(ts, "\\\"HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am", TSDB_TIME_PRECISION_MILLI,
|
test_ts2char(ts, "\\\"HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am", TSDB_TIME_PRECISION_MILLI,
|
||||||
"\"15:15:03:03:03:03:28:28:05:05:123:123:123000:123000:123000000:123000000:PM:PM:pm:pm");
|
"\"15:15:03:03:03:03:28:28:05:05:123:123:123000:123000:123000000:123000000:PM:PM:pm:pm");
|
||||||
|
@ -506,14 +508,18 @@ TEST(timeTest, ts2char) {
|
||||||
test_ts2char(ts, "\"HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am", TSDB_TIME_PRECISION_MILLI,
|
test_ts2char(ts, "\"HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am", TSDB_TIME_PRECISION_MILLI,
|
||||||
"HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am");
|
"HH24:hh24:HH12:hh12:HH:hh:MI:mi:SS:ss:MS:ms:US:us:NS:ns:PM:AM:pm:am");
|
||||||
test_ts2char(ts, "yyyy-mm-dd hh24:mi:ss.nsamaaa", TSDB_TIME_PRECISION_MILLI, "2023-10-13 15:28:05.123000000pmaaa");
|
test_ts2char(ts, "yyyy-mm-dd hh24:mi:ss.nsamaaa", TSDB_TIME_PRECISION_MILLI, "2023-10-13 15:28:05.123000000pmaaa");
|
||||||
test_ts2char(ts, "aaa--yyyy-mm-dd hh24:mi:ss.nsamaaa", TSDB_TIME_PRECISION_MILLI, "aaa--2023-10-13 15:28:05.123000000pmaaa");
|
test_ts2char(ts, "aaa--yyyy-mm-dd hh24:mi:ss.nsamaaa", TSDB_TIME_PRECISION_MILLI,
|
||||||
test_ts2char(ts, "add--yyyy-mm-dd hh24:mi:ss.nsamaaa", TSDB_TIME_PRECISION_MILLI, "a13--2023-10-13 15:28:05.123000000pmaaa");
|
"aaa--2023-10-13 15:28:05.123000000pmaaa");
|
||||||
|
test_ts2char(ts, "add--yyyy-mm-dd hh24:mi:ss.nsamaaa", TSDB_TIME_PRECISION_MILLI,
|
||||||
|
"a13--2023-10-13 15:28:05.123000000pmaaa");
|
||||||
|
|
||||||
ts = 1693946405000;
|
ts = 1693946405000;
|
||||||
test_ts2char(ts, "Day, Month dd, YYYY hh24:mi:ss AM TZH:tzh", TSDB_TIME_PRECISION_MILLI, "Wednesday, September 06, 2023 04:40:05 AM +08:+08");
|
test_ts2char(ts, "Day, Month dd, YYYY hh24:mi:ss AM TZH:tzh", TSDB_TIME_PRECISION_MILLI,
|
||||||
|
"Wednesday, September 06, 2023 04:40:05 AM +08:+08");
|
||||||
|
|
||||||
ts = -62198784343000; // milliseconds before epoch, Friday, January 1, -0001 12:00:00 AM GMT+08:06
|
ts = -62198784343000; // milliseconds before epoch, Friday, January 1, -0001 12:00:00 AM GMT+08:06
|
||||||
test_ts2char(ts, "Day, Month dd, YYYY hh12:mi:ss AM", TSDB_TIME_PRECISION_MILLI, "Friday , January 01, -001 12:00:00 AM");
|
test_ts2char(ts, "Day, Month dd, YYYY hh12:mi:ss AM", TSDB_TIME_PRECISION_MILLI,
|
||||||
|
"Friday , January 01, -001 12:00:00 AM");
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(timeTest, char2ts) {
|
TEST(timeTest, char2ts) {
|
||||||
|
@ -609,7 +615,7 @@ TEST(timeTest, char2ts) {
|
||||||
ASSERT_EQ(-1, TEST_char2ts("yyyyMMdd ", &ts, TSDB_TIME_PRECISION_MICRO, "2100/2/1"));
|
ASSERT_EQ(-1, TEST_char2ts("yyyyMMdd ", &ts, TSDB_TIME_PRECISION_MICRO, "2100/2/1"));
|
||||||
// nothing to be converted to dd
|
// nothing to be converted to dd
|
||||||
ASSERT_EQ(0, TEST_char2ts("yyyyMMdd ", &ts, TSDB_TIME_PRECISION_MICRO, "210012"));
|
ASSERT_EQ(0, TEST_char2ts("yyyyMMdd ", &ts, TSDB_TIME_PRECISION_MICRO, "210012"));
|
||||||
ASSERT_EQ(ts, 4131273600000000LL); // 2100-12-1
|
ASSERT_EQ(ts, 4131273600000000LL); // 2100-12-1
|
||||||
ASSERT_EQ(-1, TEST_char2ts("yyyyMMdd ", &ts, TSDB_TIME_PRECISION_MICRO, "21001"));
|
ASSERT_EQ(-1, TEST_char2ts("yyyyMMdd ", &ts, TSDB_TIME_PRECISION_MICRO, "21001"));
|
||||||
ASSERT_EQ(-1, TEST_char2ts("yyyyMM-dd ", &ts, TSDB_TIME_PRECISION_MICRO, "23a1-1"));
|
ASSERT_EQ(-1, TEST_char2ts("yyyyMM-dd ", &ts, TSDB_TIME_PRECISION_MICRO, "23a1-1"));
|
||||||
|
|
||||||
|
@ -635,8 +641,55 @@ TEST(timeTest, char2ts) {
|
||||||
ASSERT_EQ(0, TEST_char2ts("yyyy年 MM/ddTZH", &ts, TSDB_TIME_PRECISION_MICRO, "1970年 1/1+0"));
|
ASSERT_EQ(0, TEST_char2ts("yyyy年 MM/ddTZH", &ts, TSDB_TIME_PRECISION_MICRO, "1970年 1/1+0"));
|
||||||
ASSERT_EQ(ts, 0);
|
ASSERT_EQ(ts, 0);
|
||||||
ASSERT_EQ(0, TEST_char2ts("yyyy年 a a a MM/ddTZH", &ts, TSDB_TIME_PRECISION_MICRO, "1970年 a a a 1/1+0"));
|
ASSERT_EQ(0, TEST_char2ts("yyyy年 a a a MM/ddTZH", &ts, TSDB_TIME_PRECISION_MICRO, "1970年 a a a 1/1+0"));
|
||||||
ASSERT_EQ(0, TEST_char2ts("yyyy年 a a a a a a a a a a a a a a a MM/ddTZH", &ts, TSDB_TIME_PRECISION_MICRO, "1970年 a "));
|
ASSERT_EQ(0, TEST_char2ts("yyyy年 a a a a a a a a a a a a a a a MM/ddTZH", &ts, TSDB_TIME_PRECISION_MICRO,
|
||||||
|
"1970年 a "));
|
||||||
ASSERT_EQ(-3, TEST_char2ts("yyyy-mm-DDD", &ts, TSDB_TIME_PRECISION_MILLI, "1970-01-001"));
|
ASSERT_EQ(-3, TEST_char2ts("yyyy-mm-DDD", &ts, TSDB_TIME_PRECISION_MILLI, "1970-01-001"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST(timeTest, epSet) {
|
||||||
|
{
|
||||||
|
SEpSet ep = {0};
|
||||||
|
addEpIntoEpSet(&ep, "local", 14);
|
||||||
|
addEpIntoEpSet(&ep, "aocal", 13);
|
||||||
|
addEpIntoEpSet(&ep, "abcal", 12);
|
||||||
|
addEpIntoEpSet(&ep, "abcaleb", 11);
|
||||||
|
epsetSort(&ep);
|
||||||
|
ASSERT_EQ(strcmp(ep.eps[0].fqdn, "abcal"), 0);
|
||||||
|
ASSERT_EQ(ep.eps[0].port, 12);
|
||||||
|
|
||||||
|
ASSERT_EQ(strcmp(ep.eps[1].fqdn, "abcaleb"), 0);
|
||||||
|
ASSERT_EQ(ep.eps[1].port, 11);
|
||||||
|
|
||||||
|
ASSERT_EQ(strcmp(ep.eps[2].fqdn, "aocal"), 0);
|
||||||
|
ASSERT_EQ(ep.eps[2].port, 13);
|
||||||
|
|
||||||
|
ASSERT_EQ(strcmp(ep.eps[3].fqdn, "local"), 0);
|
||||||
|
ASSERT_EQ(ep.eps[3].port, 14);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
SEpSet ep = {0};
|
||||||
|
addEpIntoEpSet(&ep, "local", 14);
|
||||||
|
addEpIntoEpSet(&ep, "local", 13);
|
||||||
|
addEpIntoEpSet(&ep, "local", 12);
|
||||||
|
addEpIntoEpSet(&ep, "local", 11);
|
||||||
|
epsetSort(&ep);
|
||||||
|
ASSERT_EQ(strcmp(ep.eps[0].fqdn, "local"), 0);
|
||||||
|
ASSERT_EQ(ep.eps[0].port, 11);
|
||||||
|
|
||||||
|
ASSERT_EQ(strcmp(ep.eps[0].fqdn, "local"), 0);
|
||||||
|
ASSERT_EQ(ep.eps[1].port, 12);
|
||||||
|
|
||||||
|
ASSERT_EQ(strcmp(ep.eps[0].fqdn, "local"), 0);
|
||||||
|
ASSERT_EQ(ep.eps[2].port, 13);
|
||||||
|
|
||||||
|
ASSERT_EQ(strcmp(ep.eps[0].fqdn, "local"), 0);
|
||||||
|
ASSERT_EQ(ep.eps[3].port, 14);
|
||||||
|
}
|
||||||
|
{
|
||||||
|
SEpSet ep = {0};
|
||||||
|
addEpIntoEpSet(&ep, "local", 14);
|
||||||
|
epsetSort(&ep);
|
||||||
|
ASSERT_EQ(ep.numOfEps, 1);
|
||||||
|
}
|
||||||
|
}
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
|
@ -223,7 +223,7 @@ int32_t dmWriteEps(SDnodeData *pData) {
|
||||||
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
if((code == dmInitDndInfo(pData)) != 0) goto _OVER;
|
if ((code == dmInitDndInfo(pData)) != 0) goto _OVER;
|
||||||
pJson = tjsonCreateObject();
|
pJson = tjsonCreateObject();
|
||||||
if (pJson == NULL) goto _OVER;
|
if (pJson == NULL) goto _OVER;
|
||||||
pData->engineVer = tsVersion;
|
pData->engineVer = tsVersion;
|
||||||
|
@ -289,6 +289,7 @@ static void dmResetEps(SDnodeData *pData, SArray *dnodeEps) {
|
||||||
pData->mnodeEps.eps[mIndex] = pDnodeEp->ep;
|
pData->mnodeEps.eps[mIndex] = pDnodeEp->ep;
|
||||||
mIndex++;
|
mIndex++;
|
||||||
}
|
}
|
||||||
|
epsetSort(&pData->mnodeEps);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfEps; i++) {
|
for (int32_t i = 0; i < numOfEps; i++) {
|
||||||
SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
|
SDnodeEp *pDnodeEp = taosArrayGet(dnodeEps, i);
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "mndMnode.h"
|
#include "mndMnode.h"
|
||||||
|
#include "audit.h"
|
||||||
#include "mndCluster.h"
|
#include "mndCluster.h"
|
||||||
#include "mndDnode.h"
|
#include "mndDnode.h"
|
||||||
#include "mndPrivilege.h"
|
#include "mndPrivilege.h"
|
||||||
|
@ -22,7 +23,6 @@
|
||||||
#include "mndSync.h"
|
#include "mndSync.h"
|
||||||
#include "mndTrans.h"
|
#include "mndTrans.h"
|
||||||
#include "tmisce.h"
|
#include "tmisce.h"
|
||||||
#include "audit.h"
|
|
||||||
|
|
||||||
#define MNODE_VER_NUMBER 2
|
#define MNODE_VER_NUMBER 2
|
||||||
#define MNODE_RESERVE_SIZE 64
|
#define MNODE_RESERVE_SIZE 64
|
||||||
|
@ -168,7 +168,7 @@ static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
|
||||||
if(sver >=2){
|
if (sver >= 2) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pObj->role, _OVER)
|
SDB_GET_INT32(pRaw, dataPos, &pObj->role, _OVER)
|
||||||
SDB_GET_INT64(pRaw, dataPos, &pObj->lastIndex, _OVER)
|
SDB_GET_INT64(pRaw, dataPos, &pObj->lastIndex, _OVER)
|
||||||
}
|
}
|
||||||
|
@ -251,6 +251,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
|
||||||
pEpSet->inUse = pEpSet->numOfEps;
|
pEpSet->inUse = pEpSet->numOfEps;
|
||||||
} else {
|
} else {
|
||||||
pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
|
pEpSet->inUse = (pEpSet->numOfEps + 1) % totalMnodes;
|
||||||
|
// pEpSet->inUse = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (pObj->pDnode != NULL) {
|
if (pObj->pDnode != NULL) {
|
||||||
|
@ -266,6 +267,7 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
|
||||||
if (pEpSet->inUse >= pEpSet->numOfEps) {
|
if (pEpSet->inUse >= pEpSet->numOfEps) {
|
||||||
pEpSet->inUse = 0;
|
pEpSet->inUse = 0;
|
||||||
}
|
}
|
||||||
|
epsetSort(pEpSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) {
|
||||||
|
@ -320,8 +322,8 @@ static int32_t mndBuildCreateMnodeRedoAction(STrans *pTrans, SDCreateMnodeReq *p
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans,
|
static int32_t mndBuildAlterMnodeTypeRedoAction(STrans *pTrans, SDAlterMnodeTypeReq *pAlterMnodeTypeReq,
|
||||||
SDAlterMnodeTypeReq *pAlterMnodeTypeReq, SEpSet *pAlterMnodeTypeEpSet) {
|
SEpSet *pAlterMnodeTypeEpSet) {
|
||||||
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
|
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, pAlterMnodeTypeReq);
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
tSerializeSDCreateMnodeReq(pReq, contLen, pAlterMnodeTypeReq);
|
tSerializeSDCreateMnodeReq(pReq, contLen, pAlterMnodeTypeReq);
|
||||||
|
@ -396,13 +398,12 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
|
||||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if(pMObj->role == TAOS_SYNC_ROLE_VOTER){
|
if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
|
||||||
createReq.replicas[numOfReplicas].id = pMObj->id;
|
createReq.replicas[numOfReplicas].id = pMObj->id;
|
||||||
createReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
|
createReq.replicas[numOfReplicas].port = pMObj->pDnode->port;
|
||||||
memcpy(createReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(createReq.replicas[numOfReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
numOfReplicas++;
|
numOfReplicas++;
|
||||||
}
|
} else {
|
||||||
else{
|
|
||||||
createReq.learnerReplicas[numOfLearnerReplicas].id = pMObj->id;
|
createReq.learnerReplicas[numOfLearnerReplicas].id = pMObj->id;
|
||||||
createReq.learnerReplicas[numOfLearnerReplicas].port = pMObj->pDnode->port;
|
createReq.learnerReplicas[numOfLearnerReplicas].port = pMObj->pDnode->port;
|
||||||
memcpy(createReq.learnerReplicas[numOfLearnerReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(createReq.learnerReplicas[numOfLearnerReplicas].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
@ -441,18 +442,17 @@ int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
|
||||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if(pMObj->id == pDnode->id) {
|
if (pMObj->id == pDnode->id) {
|
||||||
sdbRelease(pSdb, pMObj);
|
sdbRelease(pSdb, pMObj);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pMObj->role == TAOS_SYNC_ROLE_VOTER){
|
if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
|
||||||
createReq.replicas[createReq.replica].id = pMObj->id;
|
createReq.replicas[createReq.replica].id = pMObj->id;
|
||||||
createReq.replicas[createReq.replica].port = pMObj->pDnode->port;
|
createReq.replicas[createReq.replica].port = pMObj->pDnode->port;
|
||||||
memcpy(createReq.replicas[createReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(createReq.replicas[createReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
createReq.replica++;
|
createReq.replica++;
|
||||||
}
|
} else {
|
||||||
else{
|
|
||||||
createReq.learnerReplicas[createReq.learnerReplica].id = pMObj->id;
|
createReq.learnerReplicas[createReq.learnerReplica].id = pMObj->id;
|
||||||
createReq.learnerReplicas[createReq.learnerReplica].port = pMObj->pDnode->port;
|
createReq.learnerReplicas[createReq.learnerReplica].port = pMObj->pDnode->port;
|
||||||
memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
@ -480,23 +480,22 @@ int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SDAlterMnodeTypeReq alterReq = {0};
|
SDAlterMnodeTypeReq alterReq = {0};
|
||||||
SEpSet createEpset = {0};
|
SEpSet createEpset = {0};
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SMnodeObj *pMObj = NULL;
|
SMnodeObj *pMObj = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if(pMObj->role == TAOS_SYNC_ROLE_VOTER){
|
if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
|
||||||
alterReq.replicas[alterReq.replica].id = pMObj->id;
|
alterReq.replicas[alterReq.replica].id = pMObj->id;
|
||||||
alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
|
alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
|
||||||
memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
alterReq.replica++;
|
alterReq.replica++;
|
||||||
}
|
} else {
|
||||||
else{
|
|
||||||
alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
|
alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
|
||||||
alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
|
alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
|
||||||
memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
@ -524,28 +523,27 @@ static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, S
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
SDAlterMnodeTypeReq alterReq = {0};
|
SDAlterMnodeTypeReq alterReq = {0};
|
||||||
SEpSet createEpset = {0};
|
SEpSet createEpset = {0};
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
SMnodeObj *pMObj = NULL;
|
SMnodeObj *pMObj = NULL;
|
||||||
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
|
||||||
if(pMObj->id == pDnode->id) {
|
if (pMObj->id == pDnode->id) {
|
||||||
sdbRelease(pSdb, pMObj);
|
sdbRelease(pSdb, pMObj);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pMObj->role == TAOS_SYNC_ROLE_VOTER){
|
if (pMObj->role == TAOS_SYNC_ROLE_VOTER) {
|
||||||
alterReq.replicas[alterReq.replica].id = pMObj->id;
|
alterReq.replicas[alterReq.replica].id = pMObj->id;
|
||||||
alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
|
alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port;
|
||||||
memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
alterReq.replica++;
|
alterReq.replica++;
|
||||||
}
|
} else {
|
||||||
else{
|
|
||||||
alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
|
alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id;
|
||||||
alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
|
alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port;
|
||||||
memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
@ -959,8 +957,11 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
int32_t updatingMnodes = 0;
|
int32_t updatingMnodes = 0;
|
||||||
int32_t readyMnodes = 0;
|
int32_t readyMnodes = 0;
|
||||||
SSyncCfg cfg = {.myIndex = -1, .lastIndex = 0,};
|
SSyncCfg cfg = {
|
||||||
SyncIndex maxIndex = 0;
|
.myIndex = -1,
|
||||||
|
.lastIndex = 0,
|
||||||
|
};
|
||||||
|
SyncIndex maxIndex = 0;
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetchAll(pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, false);
|
pIter = sdbFetchAll(pSdb, SDB_MNODE, pIter, (void **)&pObj, &objStatus, false);
|
||||||
|
@ -986,17 +987,17 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
|
||||||
if (pObj->pDnode->id == pMnode->selfDnodeId) {
|
if (pObj->pDnode->id == pMnode->selfDnodeId) {
|
||||||
cfg.myIndex = cfg.totalReplicaNum;
|
cfg.myIndex = cfg.totalReplicaNum;
|
||||||
}
|
}
|
||||||
if(pNode->nodeRole == TAOS_SYNC_ROLE_VOTER){
|
if (pNode->nodeRole == TAOS_SYNC_ROLE_VOTER) {
|
||||||
cfg.replicaNum++;
|
cfg.replicaNum++;
|
||||||
}
|
}
|
||||||
cfg.totalReplicaNum++;
|
cfg.totalReplicaNum++;
|
||||||
if(pObj->lastIndex > cfg.lastIndex){
|
if (pObj->lastIndex > cfg.lastIndex) {
|
||||||
cfg.lastIndex = pObj->lastIndex;
|
cfg.lastIndex = pObj->lastIndex;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (objStatus == SDB_STATUS_DROPPING) {
|
if (objStatus == SDB_STATUS_DROPPING) {
|
||||||
if(pObj->lastIndex > cfg.lastIndex){
|
if (pObj->lastIndex > cfg.lastIndex) {
|
||||||
cfg.lastIndex = pObj->lastIndex;
|
cfg.lastIndex = pObj->lastIndex;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1006,10 +1007,10 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
|
||||||
sdbReleaseLock(pSdb, pObj, false);
|
sdbReleaseLock(pSdb, pObj, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
//if (readyMnodes <= 0 || updatingMnodes <= 0) {
|
// if (readyMnodes <= 0 || updatingMnodes <= 0) {
|
||||||
// mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
|
// mInfo("vgId:1, mnode sync not reconfig since readyMnodes:%d updatingMnodes:%d", readyMnodes, updatingMnodes);
|
||||||
// return;
|
// return;
|
||||||
//}
|
// }
|
||||||
|
|
||||||
if (cfg.myIndex == -1) {
|
if (cfg.myIndex == -1) {
|
||||||
#if 1
|
#if 1
|
||||||
|
@ -1023,8 +1024,8 @@ static void mndReloadSyncConfig(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMnode->syncMgmt.sync > 0) {
|
if (pMnode->syncMgmt.sync > 0) {
|
||||||
mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d",
|
mInfo("vgId:1, mnode sync reconfig, totalReplica:%d replica:%d myIndex:%d", cfg.totalReplicaNum, cfg.replicaNum,
|
||||||
cfg.totalReplicaNum, cfg.replicaNum, cfg.myIndex);
|
cfg.myIndex);
|
||||||
|
|
||||||
for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) {
|
for (int32_t i = 0; i < cfg.totalReplicaNum; ++i) {
|
||||||
SNodeInfo *pNode = &cfg.nodeInfo[i];
|
SNodeInfo *pNode = &cfg.nodeInfo[i];
|
||||||
|
|
|
@ -877,6 +877,7 @@ SEpSet mndGetVgroupEpset(SMnode *pMnode, const SVgObj *pVgroup) {
|
||||||
addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port);
|
addEpIntoEpSet(&epset, pDnode->fqdn, pDnode->port);
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
mndReleaseDnode(pMnode, pDnode);
|
||||||
}
|
}
|
||||||
|
epsetSort(&epset);
|
||||||
|
|
||||||
return epset;
|
return epset;
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
#include "syncVoteMgr.h"
|
#include "syncVoteMgr.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
#include "tmisce.h"
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
|
|
||||||
static void syncNodeEqPingTimer(void* param, void* tmrId);
|
static void syncNodeEqPingTimer(void* param, void* tmrId);
|
||||||
|
@ -106,7 +107,7 @@ _err:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg *cfg){
|
int32_t syncNodeGetConfig(int64_t rid, SSyncCfg* cfg) {
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||||
|
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
|
@ -546,7 +547,7 @@ SSyncState syncGetState(int64_t rid) {
|
||||||
state.progress = -1;
|
state.progress = -1;
|
||||||
}
|
}
|
||||||
sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
|
sDebug("vgId:%d, learner progress state, commitIndex:%" PRId64 " totalIndex:%" PRId64 ", "
|
||||||
"progress:%lf, progress:%d",
|
"progress:%lf, progress:%d",
|
||||||
pSyncNode->vgId,
|
pSyncNode->vgId,
|
||||||
pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
|
pSyncNode->pLogBuf->commitIndex, pSyncNode->pLogBuf->totalIndex, progress, state.progress);
|
||||||
*/
|
*/
|
||||||
|
@ -579,17 +580,21 @@ void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet) {
|
||||||
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
SSyncNode* pSyncNode = syncNodeAcquire(rid);
|
||||||
if (pSyncNode == NULL) return;
|
if (pSyncNode == NULL) return;
|
||||||
|
|
||||||
|
int j = 0;
|
||||||
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
|
for (int32_t i = 0; i < pSyncNode->raftCfg.cfg.totalReplicaNum; ++i) {
|
||||||
if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
|
if (pSyncNode->raftCfg.cfg.nodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) continue;
|
||||||
SEp* pEp = &pEpSet->eps[i];
|
SEp* pEp = &pEpSet->eps[j];
|
||||||
tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
|
tstrncpy(pEp->fqdn, pSyncNode->raftCfg.cfg.nodeInfo[i].nodeFqdn, TSDB_FQDN_LEN);
|
||||||
pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
|
pEp->port = (pSyncNode->raftCfg.cfg.nodeInfo)[i].nodePort;
|
||||||
pEpSet->numOfEps++;
|
pEpSet->numOfEps++;
|
||||||
sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
|
sDebug("vgId:%d, sync get retry epset, index:%d %s:%d", pSyncNode->vgId, i, pEp->fqdn, pEp->port);
|
||||||
|
j++;
|
||||||
}
|
}
|
||||||
if (pEpSet->numOfEps > 0) {
|
if (pEpSet->numOfEps > 0) {
|
||||||
pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
|
pEpSet->inUse = (pSyncNode->raftCfg.cfg.myIndex + 1) % pEpSet->numOfEps;
|
||||||
|
// pEpSet->inUse = 0;
|
||||||
}
|
}
|
||||||
|
epsetSort(pEpSet);
|
||||||
|
|
||||||
sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
|
sInfo("vgId:%d, sync get retry epset numOfEps:%d inUse:%d", pSyncNode->vgId, pEpSet->numOfEps, pEpSet->inUse);
|
||||||
syncNodeRelease(pSyncNode);
|
syncNodeRelease(pSyncNode);
|
||||||
|
@ -614,7 +619,7 @@ int32_t syncCheckMember(int64_t rid) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER){
|
if (pSyncNode->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -682,24 +687,24 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
|
||||||
}
|
}
|
||||||
|
|
||||||
// optimized one replica
|
// optimized one replica
|
||||||
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
|
if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
|
||||||
SyncIndex retIndex;
|
SyncIndex retIndex;
|
||||||
int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
|
int32_t code = syncNodeOnClientRequest(pSyncNode, pMsg, &retIndex);
|
||||||
if (code >= 0) {
|
if (code >= 0) {
|
||||||
pMsg->info.conn.applyIndex = retIndex;
|
pMsg->info.conn.applyIndex = retIndex;
|
||||||
pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
|
pMsg->info.conn.applyTerm = raftStoreGetTerm(pSyncNode);
|
||||||
|
|
||||||
//after raft member change, need to handle 1->2 switching point
|
// after raft member change, need to handle 1->2 switching point
|
||||||
//at this point, need to switch entry handling thread
|
// at this point, need to switch entry handling thread
|
||||||
if(pSyncNode->replicaNum == 1){
|
if (pSyncNode->replicaNum == 1) {
|
||||||
sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
|
sTrace("vgId:%d, propose optimized msg, index:%" PRId64 " type:%s", pSyncNode->vgId, retIndex,
|
||||||
TMSG_INFO(pMsg->msgType));
|
TMSG_INFO(pMsg->msgType));
|
||||||
return 1;
|
return 1;
|
||||||
}
|
} else {
|
||||||
else{
|
sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64
|
||||||
sTrace("vgId:%d, propose optimized msg, return to normal, index:%" PRId64 " type:%s, "
|
" type:%s, "
|
||||||
"handle:%p", pSyncNode->vgId, retIndex,
|
"handle:%p",
|
||||||
TMSG_INFO(pMsg->msgType), pMsg->info.handle);
|
pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType), pMsg->info.handle);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -844,7 +849,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
||||||
goto _error;
|
goto _error;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion){
|
if (vnodeVersion > pSyncNode->raftCfg.cfg.changeVersion) {
|
||||||
if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
|
if (pSyncInfo->syncCfg.totalReplicaNum > 0 && syncIsConfigChanged(&pSyncNode->raftCfg.cfg, &pSyncInfo->syncCfg)) {
|
||||||
sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
|
sInfo("vgId:%d, use sync config from input options and write to cfg file", pSyncNode->vgId);
|
||||||
pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
|
pSyncNode->raftCfg.cfg = pSyncInfo->syncCfg;
|
||||||
|
@ -856,15 +861,13 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
||||||
sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
|
sInfo("vgId:%d, use sync config from sync cfg file", pSyncNode->vgId);
|
||||||
pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
|
pSyncInfo->syncCfg = pSyncNode->raftCfg.cfg;
|
||||||
}
|
}
|
||||||
}
|
} else {
|
||||||
else{
|
sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d", pSyncNode->vgId, vnodeVersion,
|
||||||
sInfo("vgId:%d, skip save sync cfg file since request ver:%d <= file ver:%d",
|
pSyncInfo->syncCfg.changeVersion);
|
||||||
pSyncNode->vgId, vnodeVersion, pSyncInfo->syncCfg.changeVersion);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// init by SSyncInfo
|
||||||
// init by SSyncInfo
|
|
||||||
pSyncNode->vgId = pSyncInfo->vgId;
|
pSyncNode->vgId = pSyncInfo->vgId;
|
||||||
SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
|
SSyncCfg* pCfg = &pSyncNode->raftCfg.cfg;
|
||||||
bool updated = false;
|
bool updated = false;
|
||||||
|
@ -879,7 +882,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
||||||
pNode->nodeId, pNode->clusterId);
|
pNode->nodeId, pNode->clusterId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if(vnodeVersion > pSyncInfo->syncCfg.changeVersion){
|
if (vnodeVersion > pSyncInfo->syncCfg.changeVersion) {
|
||||||
if (updated) {
|
if (updated) {
|
||||||
sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
|
sInfo("vgId:%d, save config info since dnode info changed", pSyncNode->vgId);
|
||||||
if (syncWriteCfgFile(pSyncNode) != 0) {
|
if (syncWriteCfgFile(pSyncNode) != 0) {
|
||||||
|
@ -888,7 +891,7 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pSyncNode->pWal = pSyncInfo->pWal;
|
pSyncNode->pWal = pSyncInfo->pWal;
|
||||||
pSyncNode->msgcb = pSyncInfo->msgcb;
|
pSyncNode->msgcb = pSyncInfo->msgcb;
|
||||||
pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
|
pSyncNode->syncSendMSg = pSyncInfo->syncSendMSg;
|
||||||
|
@ -2335,47 +2338,49 @@ int32_t syncCacheEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, LRUHand
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncBuildConfigFromReq(SAlterVnodeReplicaReq *pReq, SSyncCfg *cfg){//TODO SAlterVnodeReplicaReq name is proper?
|
void syncBuildConfigFromReq(SAlterVnodeReplicaReq* pReq, SSyncCfg* cfg) { // TODO SAlterVnodeReplicaReq name is proper?
|
||||||
cfg->replicaNum = 0;
|
cfg->replicaNum = 0;
|
||||||
cfg->totalReplicaNum = 0;
|
cfg->totalReplicaNum = 0;
|
||||||
|
|
||||||
for (int i = 0; i < pReq->replica; ++i) {
|
for (int i = 0; i < pReq->replica; ++i) {
|
||||||
SNodeInfo *pNode = &cfg->nodeInfo[i];
|
SNodeInfo* pNode = &cfg->nodeInfo[i];
|
||||||
pNode->nodeId = pReq->replicas[i].id;
|
pNode->nodeId = pReq->replicas[i].id;
|
||||||
pNode->nodePort = pReq->replicas[i].port;
|
pNode->nodePort = pReq->replicas[i].port;
|
||||||
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
|
tstrncpy(pNode->nodeFqdn, pReq->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
|
||||||
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
|
pNode->nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||||
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
||||||
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->nodeRole);
|
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort,
|
||||||
|
pNode->nodeId, pNode->nodeRole);
|
||||||
cfg->replicaNum++;
|
cfg->replicaNum++;
|
||||||
}
|
}
|
||||||
if(pReq->selfIndex != -1){
|
if (pReq->selfIndex != -1) {
|
||||||
cfg->myIndex = pReq->selfIndex;
|
cfg->myIndex = pReq->selfIndex;
|
||||||
}
|
}
|
||||||
for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
|
for (int i = cfg->replicaNum; i < pReq->replica + pReq->learnerReplica; ++i) {
|
||||||
SNodeInfo *pNode = &cfg->nodeInfo[i];
|
SNodeInfo* pNode = &cfg->nodeInfo[i];
|
||||||
pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
|
pNode->nodeId = pReq->learnerReplicas[cfg->totalReplicaNum].id;
|
||||||
pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
|
pNode->nodePort = pReq->learnerReplicas[cfg->totalReplicaNum].port;
|
||||||
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
pNode->nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
||||||
tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
|
tstrncpy(pNode->nodeFqdn, pReq->learnerReplicas[cfg->totalReplicaNum].fqdn, sizeof(pNode->nodeFqdn));
|
||||||
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
(void)tmsgUpdateDnodeInfo(&pNode->nodeId, &pNode->clusterId, pNode->nodeFqdn, &pNode->nodePort);
|
||||||
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort, pNode->nodeId, pNode->nodeRole);
|
sInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d nodeRole:%d", pReq->vgId, i, pNode->nodeFqdn, pNode->nodePort,
|
||||||
|
pNode->nodeId, pNode->nodeRole);
|
||||||
cfg->totalReplicaNum++;
|
cfg->totalReplicaNum++;
|
||||||
}
|
}
|
||||||
cfg->totalReplicaNum += pReq->replica;
|
cfg->totalReplicaNum += pReq->replica;
|
||||||
if(pReq->learnerSelfIndex != -1){
|
if (pReq->learnerSelfIndex != -1) {
|
||||||
cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
|
cfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
|
||||||
}
|
}
|
||||||
cfg->changeVersion = pReq->changeVersion;
|
cfg->changeVersion = pReq->changeVersion;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry){
|
int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
||||||
if(pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE){
|
if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMsgHead *head = (SMsgHead *)pEntry->data;
|
SMsgHead* head = (SMsgHead*)pEntry->data;
|
||||||
void *pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
|
void* pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
|
||||||
|
|
||||||
SAlterVnodeTypeReq req = {0};
|
SAlterVnodeTypeReq req = {0};
|
||||||
if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
|
if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
|
||||||
|
@ -2386,17 +2391,17 @@ int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry){
|
||||||
SSyncCfg cfg = {0};
|
SSyncCfg cfg = {0};
|
||||||
syncBuildConfigFromReq(&req, &cfg);
|
syncBuildConfigFromReq(&req, &cfg);
|
||||||
|
|
||||||
if(cfg.totalReplicaNum >= 1 && ths->state == TAOS_SYNC_STATE_LEADER){
|
if (cfg.totalReplicaNum >= 1 && ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
bool incfg = false;
|
bool incfg = false;
|
||||||
for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){
|
for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
|
||||||
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0
|
if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
|
||||||
&& ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){
|
ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
|
||||||
incfg = true;
|
incfg = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(!incfg){
|
if (!incfg) {
|
||||||
SyncTerm currentTerm = raftStoreGetTerm(ths);
|
SyncTerm currentTerm = raftStoreGetTerm(ths);
|
||||||
syncNodeStepDown(ths, currentTerm);
|
syncNodeStepDown(ths, currentTerm);
|
||||||
return 1;
|
return 1;
|
||||||
|
@ -2405,26 +2410,25 @@ int32_t syncNodeCheckChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry){
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg *cfg, char *str){
|
void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg* cfg, char* str) {
|
||||||
sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d, "
|
sInfo("vgId:%d, %s. SyncNode, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64
|
||||||
"restoreFinish:%d",
|
", changeVersion:%d, "
|
||||||
ths->vgId, str,
|
"restoreFinish:%d",
|
||||||
ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
|
ths->vgId, str, ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion,
|
||||||
ths->restoreFinish);
|
ths->restoreFinish);
|
||||||
|
|
||||||
sInfo("vgId:%d, %s, myNodeInfo, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d",
|
sInfo("vgId:%d, %s, myNodeInfo, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
|
||||||
ths->vgId, str, ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn,
|
ths->myNodeInfo.clusterId, ths->myNodeInfo.nodeId, ths->myNodeInfo.nodeFqdn, ths->myNodeInfo.nodePort,
|
||||||
ths->myNodeInfo.nodePort, ths->myNodeInfo.nodeRole);
|
ths->myNodeInfo.nodeRole);
|
||||||
|
|
||||||
for (int32_t i = 0; i < ths->peersNum; ++i){
|
for (int32_t i = 0; i < ths->peersNum; ++i) {
|
||||||
sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d",
|
sInfo("vgId:%d, %s, peersNodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str,
|
||||||
ths->vgId, str, i, ths->peersNodeInfo[i].clusterId,
|
i, ths->peersNodeInfo[i].clusterId, ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
|
||||||
ths->peersNodeInfo[i].nodeId, ths->peersNodeInfo[i].nodeFqdn,
|
ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
|
||||||
ths->peersNodeInfo[i].nodePort, ths->peersNodeInfo[i].nodeRole);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < ths->peersNum; ++i){
|
for (int32_t i = 0; i < ths->peersNum; ++i) {
|
||||||
char buf[256];
|
char buf[256];
|
||||||
int32_t len = 256;
|
int32_t len = 256;
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
n += snprintf(buf + n, len - n, "%s", "{");
|
n += snprintf(buf + n, len - n, "%s", "{");
|
||||||
|
@ -2434,37 +2438,33 @@ void syncNodeLogConfigInfo(SSyncNode* ths, SSyncCfg *cfg, char *str){
|
||||||
}
|
}
|
||||||
n += snprintf(buf + n, len - n, "%s", "}");
|
n += snprintf(buf + n, len - n, "%s", "}");
|
||||||
|
|
||||||
sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d",
|
sInfo("vgId:%d, %s, peersEpset%d, %s, inUse:%d", ths->vgId, str, i, buf, ths->peersEpset->inUse);
|
||||||
ths->vgId, str, i, buf, ths->peersEpset->inUse);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < ths->peersNum; ++i){
|
for (int32_t i = 0; i < ths->peersNum; ++i) {
|
||||||
sInfo("vgId:%d, %s, peersId%d, addr:%"PRId64,
|
sInfo("vgId:%d, %s, peersId%d, addr:%" PRId64, ths->vgId, str, i, ths->peersId[i].addr);
|
||||||
ths->vgId, str, i, ths->peersId[i].addr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){
|
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
|
||||||
sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d",
|
sInfo("vgId:%d, %s, nodeInfo%d, clusterId:%" PRId64 ", nodeId:%d, Fqdn:%s, port:%d, role:%d", ths->vgId, str, i,
|
||||||
ths->vgId, str, i, ths->raftCfg.cfg.nodeInfo[i].clusterId,
|
ths->raftCfg.cfg.nodeInfo[i].clusterId, ths->raftCfg.cfg.nodeInfo[i].nodeId,
|
||||||
ths->raftCfg.cfg.nodeInfo[i].nodeId, ths->raftCfg.cfg.nodeInfo[i].nodeFqdn,
|
ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, ths->raftCfg.cfg.nodeInfo[i].nodePort,
|
||||||
ths->raftCfg.cfg.nodeInfo[i].nodePort, ths->raftCfg.cfg.nodeInfo[i].nodeRole);
|
ths->raftCfg.cfg.nodeInfo[i].nodeRole);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i){
|
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
|
||||||
sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64,
|
sInfo("vgId:%d, %s, replicasId%d, addr:%" PRId64, ths->vgId, str, i, ths->replicasId[i].addr);
|
||||||
ths->vgId, str, i, ths->replicasId[i].addr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){
|
int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg* cfg) {
|
||||||
int32_t i = 0;
|
int32_t i = 0;
|
||||||
|
|
||||||
//change peersNodeInfo
|
// change peersNodeInfo
|
||||||
i = 0;
|
i = 0;
|
||||||
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
|
for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
|
||||||
if(!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
|
if (!(strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
|
||||||
&& ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)){
|
ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
|
||||||
ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
|
ths->peersNodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
|
||||||
ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
|
ths->peersNodeInfo[i].clusterId = cfg->nodeInfo[j].clusterId;
|
||||||
tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
|
tstrncpy(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
|
||||||
|
@ -2483,11 +2483,11 @@ int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){
|
||||||
}
|
}
|
||||||
ths->peersNum = i;
|
ths->peersNum = i;
|
||||||
|
|
||||||
//change cfg nodeInfo
|
// change cfg nodeInfo
|
||||||
ths->raftCfg.cfg.replicaNum = 0;
|
ths->raftCfg.cfg.replicaNum = 0;
|
||||||
i = 0;
|
i = 0;
|
||||||
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
|
for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
|
||||||
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
|
if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
|
||||||
ths->raftCfg.cfg.replicaNum++;
|
ths->raftCfg.cfg.replicaNum++;
|
||||||
}
|
}
|
||||||
ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
|
ths->raftCfg.cfg.nodeInfo[i].nodeRole = cfg->nodeInfo[j].nodeRole;
|
||||||
|
@ -2495,9 +2495,9 @@ int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){
|
||||||
tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
|
tstrncpy(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn, TSDB_FQDN_LEN);
|
||||||
ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
|
ths->raftCfg.cfg.nodeInfo[i].nodeId = cfg->nodeInfo[j].nodeId;
|
||||||
ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
|
ths->raftCfg.cfg.nodeInfo[i].nodePort = cfg->nodeInfo[j].nodePort;
|
||||||
if((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
|
if ((strcmp(ths->myNodeInfo.nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
|
||||||
&& ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)){
|
ths->myNodeInfo.nodePort == cfg->nodeInfo[j].nodePort)) {
|
||||||
ths->raftCfg.cfg.myIndex = i;
|
ths->raftCfg.cfg.myIndex = i;
|
||||||
}
|
}
|
||||||
i++;
|
i++;
|
||||||
}
|
}
|
||||||
|
@ -2506,26 +2506,26 @@ int32_t syncNodeRebuildPeerAndCfg(SSyncNode* ths, SSyncCfg *cfg){
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg *cfg){
|
void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg* cfg) {
|
||||||
//change peersNodeInfo
|
// change peersNodeInfo
|
||||||
for (int32_t i = 0; i < ths->peersNum; ++i) {
|
for (int32_t i = 0; i < ths->peersNum; ++i) {
|
||||||
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
|
for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
|
||||||
if(strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
|
if (strcmp(ths->peersNodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
|
||||||
&& ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort){
|
ths->peersNodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
|
||||||
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
|
if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
|
||||||
ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
ths->peersNodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//change cfg nodeInfo
|
// change cfg nodeInfo
|
||||||
ths->raftCfg.cfg.replicaNum = 0;
|
ths->raftCfg.cfg.replicaNum = 0;
|
||||||
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
|
for (int32_t i = 0; i < ths->raftCfg.cfg.totalReplicaNum; ++i) {
|
||||||
for(int32_t j = 0; j < cfg->totalReplicaNum; ++j){
|
for (int32_t j = 0; j < cfg->totalReplicaNum; ++j) {
|
||||||
if(strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0
|
if (strcmp(ths->raftCfg.cfg.nodeInfo[i].nodeFqdn, cfg->nodeInfo[j].nodeFqdn) == 0 &&
|
||||||
&& ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort){
|
ths->raftCfg.cfg.nodeInfo[i].nodePort == cfg->nodeInfo[j].nodePort) {
|
||||||
if(cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
|
if (cfg->nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
|
||||||
ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
ths->raftCfg.cfg.nodeInfo[i].nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||||
ths->raftCfg.cfg.replicaNum++;
|
ths->raftCfg.cfg.replicaNum++;
|
||||||
}
|
}
|
||||||
|
@ -2534,8 +2534,8 @@ void syncNodeChangePeerAndCfgToVoter(SSyncNode* ths, SSyncCfg *cfg){
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum){
|
int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum) {
|
||||||
//1.rebuild replicasId, remove deleted one
|
// 1.rebuild replicasId, remove deleted one
|
||||||
SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
|
SRaftId oldReplicasId[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
|
||||||
memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
|
memcpy(oldReplicasId, ths->replicasId, sizeof(oldReplicasId));
|
||||||
|
|
||||||
|
@ -2545,9 +2545,8 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
|
||||||
syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]);
|
syncUtilNodeInfo2RaftId(&ths->raftCfg.cfg.nodeInfo[i], ths->vgId, &ths->replicasId[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 2.rebuild MatchIndex, remove deleted one
|
||||||
//2.rebuild MatchIndex, remove deleted one
|
SSyncIndexMgr* oldIndex = ths->pMatchIndex;
|
||||||
SSyncIndexMgr *oldIndex = ths->pMatchIndex;
|
|
||||||
|
|
||||||
ths->pMatchIndex = syncIndexMgrCreate(ths);
|
ths->pMatchIndex = syncIndexMgrCreate(ths);
|
||||||
|
|
||||||
|
@ -2555,9 +2554,8 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
|
||||||
|
|
||||||
syncIndexMgrDestroy(oldIndex);
|
syncIndexMgrDestroy(oldIndex);
|
||||||
|
|
||||||
|
// 3.rebuild NextIndex, remove deleted one
|
||||||
//3.rebuild NextIndex, remove deleted one
|
SSyncIndexMgr* oldNextIndex = ths->pNextIndex;
|
||||||
SSyncIndexMgr *oldNextIndex = ths->pNextIndex;
|
|
||||||
|
|
||||||
ths->pNextIndex = syncIndexMgrCreate(ths);
|
ths->pNextIndex = syncIndexMgrCreate(ths);
|
||||||
|
|
||||||
|
@ -2565,17 +2563,15 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
|
||||||
|
|
||||||
syncIndexMgrDestroy(oldNextIndex);
|
syncIndexMgrDestroy(oldNextIndex);
|
||||||
|
|
||||||
|
// 4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
|
||||||
//4.rebuild pVotesGranted, pVotesRespond, no need to keep old vote state, only rebuild
|
|
||||||
voteGrantedUpdate(ths->pVotesGranted, ths);
|
voteGrantedUpdate(ths->pVotesGranted, ths);
|
||||||
votesRespondUpdate(ths->pVotesRespond, ths);
|
votesRespondUpdate(ths->pVotesRespond, ths);
|
||||||
|
|
||||||
|
// 5.rebuild logReplMgr
|
||||||
//5.rebuild logReplMgr
|
for (int i = 0; i < oldtotalReplicaNum; ++i) {
|
||||||
for(int i = 0; i < oldtotalReplicaNum; ++i){
|
sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
|
||||||
sDebug("vgId:%d, old logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId, i,
|
i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
|
||||||
ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
|
ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
|
||||||
ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncLogReplMgr* oldLogReplMgrs = NULL;
|
SSyncLogReplMgr* oldLogReplMgrs = NULL;
|
||||||
|
@ -2584,32 +2580,32 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
|
||||||
if (NULL == oldLogReplMgrs) return -1;
|
if (NULL == oldLogReplMgrs) return -1;
|
||||||
memset(oldLogReplMgrs, 0, length);
|
memset(oldLogReplMgrs, 0, length);
|
||||||
|
|
||||||
for(int i = 0; i < oldtotalReplicaNum; i++){
|
for (int i = 0; i < oldtotalReplicaNum; i++) {
|
||||||
oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
|
oldLogReplMgrs[i] = *(ths->logReplMgrs[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncNodeLogReplDestroy(ths);
|
syncNodeLogReplDestroy(ths);
|
||||||
syncNodeLogReplInit(ths);
|
syncNodeLogReplInit(ths);
|
||||||
|
|
||||||
for(int i = 0; i < ths->totalReplicaNum; ++i){
|
for (int i = 0; i < ths->totalReplicaNum; ++i) {
|
||||||
for(int j = 0; j < oldtotalReplicaNum; j++){
|
for (int j = 0; j < oldtotalReplicaNum; j++) {
|
||||||
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
|
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
|
||||||
*(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
|
*(ths->logReplMgrs[i]) = oldLogReplMgrs[j];
|
||||||
ths->logReplMgrs[i]->peerId = i;
|
ths->logReplMgrs[i]->peerId = i;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
for(int i = 0; i < ths->totalReplicaNum; ++i){
|
|
||||||
sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")" , ths->vgId, i,
|
|
||||||
ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
|
|
||||||
ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
//6.rebuild sender
|
for (int i = 0; i < ths->totalReplicaNum; ++i) {
|
||||||
for(int i = 0; i < oldtotalReplicaNum; ++i){
|
sDebug("vgId:%d, new logReplMgrs i:%d, peerId:%d, restoreed:%d, [%" PRId64 " %" PRId64 ", %" PRId64 ")", ths->vgId,
|
||||||
sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64,
|
i, ths->logReplMgrs[i]->peerId, ths->logReplMgrs[i]->restored, ths->logReplMgrs[i]->startIndex,
|
||||||
ths->vgId, i, ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
|
ths->logReplMgrs[i]->matchIndex, ths->logReplMgrs[i]->endIndex);
|
||||||
|
}
|
||||||
|
|
||||||
|
// 6.rebuild sender
|
||||||
|
for (int i = 0; i < oldtotalReplicaNum; ++i) {
|
||||||
|
sDebug("vgId:%d, old sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
|
||||||
|
ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
|
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
|
||||||
|
@ -2633,13 +2629,12 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
|
||||||
sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
|
sSDebug(pSender, "snapshot sender create while open sync node, data:%p", pSender);
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int i = 0; i < ths->totalReplicaNum; i++){
|
for (int i = 0; i < ths->totalReplicaNum; i++) {
|
||||||
sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64,
|
sDebug("vgId:%d, new sender i:%d, replicaIndex:%d, lastSendTime:%" PRId64, ths->vgId, i,
|
||||||
ths->vgId, i, ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
|
ths->senders[i]->replicaIndex, ths->senders[i]->lastSendTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 7.rebuild synctimer
|
||||||
//7.rebuild synctimer
|
|
||||||
syncNodeStopHeartbeatTimer(ths);
|
syncNodeStopHeartbeatTimer(ths);
|
||||||
|
|
||||||
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
|
for (int32_t i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; ++i) {
|
||||||
|
@ -2648,16 +2643,15 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
|
||||||
|
|
||||||
syncNodeStartHeartbeatTimer(ths);
|
syncNodeStartHeartbeatTimer(ths);
|
||||||
|
|
||||||
|
// 8.rebuild peerStates
|
||||||
//8.rebuild peerStates
|
|
||||||
SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
|
SPeerState oldState[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA] = {0};
|
||||||
for(int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++){
|
for (int i = 0; i < TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA; i++) {
|
||||||
oldState[i] = ths->peerStates[i];
|
oldState[i] = ths->peerStates[i];
|
||||||
}
|
}
|
||||||
|
|
||||||
for(int i = 0; i < ths->totalReplicaNum; i++){
|
for (int i = 0; i < ths->totalReplicaNum; i++) {
|
||||||
for(int j = 0; j < oldtotalReplicaNum; j++){
|
for (int j = 0; j < oldtotalReplicaNum; j++) {
|
||||||
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])){
|
if (syncUtilSameId(&ths->replicasId[i], &oldReplicasId[j])) {
|
||||||
ths->peerStates[i] = oldState[j];
|
ths->peerStates[i] = oldState[j];
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2668,32 +2662,32 @@ int32_t syncNodeRebuildAndCopyIfExist(SSyncNode* ths, int32_t oldtotalReplicaNum
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeChangeToVoter(SSyncNode* ths){
|
void syncNodeChangeToVoter(SSyncNode* ths) {
|
||||||
//replicasId, only need to change replicaNum when 1->3
|
// replicasId, only need to change replicaNum when 1->3
|
||||||
ths->replicaNum = ths->raftCfg.cfg.replicaNum;
|
ths->replicaNum = ths->raftCfg.cfg.replicaNum;
|
||||||
sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
|
sDebug("vgId:%d, totalReplicaNum:%d", ths->vgId, ths->totalReplicaNum);
|
||||||
for (int32_t i = 0; i < ths->totalReplicaNum; ++i){
|
for (int32_t i = 0; i < ths->totalReplicaNum; ++i) {
|
||||||
sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
|
sDebug("vgId:%d, i:%d, replicaId.addr:%" PRIx64, ths->vgId, i, ths->replicasId[i].addr);
|
||||||
}
|
}
|
||||||
|
|
||||||
//pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
|
// pMatchIndex, pNextIndex, only need to change replicaNum when 1->3
|
||||||
ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
|
ths->pMatchIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
|
||||||
ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
|
ths->pNextIndex->replicaNum = ths->raftCfg.cfg.replicaNum;
|
||||||
|
|
||||||
sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
|
sDebug("vgId:%d, pMatchIndex->totalReplicaNum:%d", ths->vgId, ths->pMatchIndex->totalReplicaNum);
|
||||||
for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i){
|
for (int32_t i = 0; i < ths->pMatchIndex->totalReplicaNum; ++i) {
|
||||||
sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
|
sDebug("vgId:%d, i:%d, match.index:%" PRId64, ths->vgId, i, ths->pMatchIndex->index[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
//pVotesGranted, pVotesRespond
|
// pVotesGranted, pVotesRespond
|
||||||
voteGrantedUpdate(ths->pVotesGranted, ths);
|
voteGrantedUpdate(ths->pVotesGranted, ths);
|
||||||
votesRespondUpdate(ths->pVotesRespond, ths);
|
votesRespondUpdate(ths->pVotesRespond, ths);
|
||||||
|
|
||||||
//logRepMgrs
|
// logRepMgrs
|
||||||
//no need to change logRepMgrs when 1->3
|
// no need to change logRepMgrs when 1->3
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncNodeResetPeerAndCfg(SSyncNode* ths){
|
void syncNodeResetPeerAndCfg(SSyncNode* ths) {
|
||||||
SNodeInfo node = {0};
|
SNodeInfo node = {0};
|
||||||
for (int32_t i = 0; i < ths->peersNum; ++i) {
|
for (int32_t i = 0; i < ths->peersNum; ++i) {
|
||||||
memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
|
memcpy(&ths->peersNodeInfo[i], &node, sizeof(SNodeInfo));
|
||||||
|
@ -2704,13 +2698,13 @@ void syncNodeResetPeerAndCfg(SSyncNode* ths){
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
|
int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str) {
|
||||||
if(pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE){
|
if (pEntry->originalRpcType != TDMT_SYNC_CONFIG_CHANGE) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMsgHead *head = (SMsgHead *)pEntry->data;
|
SMsgHead* head = (SMsgHead*)pEntry->data;
|
||||||
void *pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
|
void* pReq = POINTER_SHIFT(head, sizeof(SMsgHead));
|
||||||
|
|
||||||
SAlterVnodeTypeReq req = {0};
|
SAlterVnodeTypeReq req = {0};
|
||||||
if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
|
if (tDeserializeSAlterVnodeReplicaReq(pReq, head->contLen, &req) != 0) {
|
||||||
|
@ -2719,141 +2713,143 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
|
||||||
}
|
}
|
||||||
|
|
||||||
SSyncCfg cfg = {0};
|
SSyncCfg cfg = {0};
|
||||||
syncBuildConfigFromReq(&req, &cfg);
|
syncBuildConfigFromReq(&req, &cfg);
|
||||||
|
|
||||||
if(cfg.changeVersion <= ths->raftCfg.cfg.changeVersion){
|
if (cfg.changeVersion <= ths->raftCfg.cfg.changeVersion) {
|
||||||
sInfo("vgId:%d, skip conf change entry since lower version. "
|
sInfo(
|
||||||
"this entry, index:%" PRId64 ", term:%" PRId64 ", totalReplicaNum:%d, changeVersion:%d; "
|
"vgId:%d, skip conf change entry since lower version. "
|
||||||
"current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64", changeVersion:%d",
|
"this entry, index:%" PRId64 ", term:%" PRId64
|
||||||
ths->vgId,
|
", totalReplicaNum:%d, changeVersion:%d; "
|
||||||
pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion,
|
"current node, replicaNum:%d, peersNum:%d, lastConfigIndex:%" PRId64 ", changeVersion:%d",
|
||||||
ths->replicaNum, ths->peersNum, ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
|
ths->vgId, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
|
||||||
|
ths->raftCfg.lastConfigIndex, ths->raftCfg.cfg.changeVersion);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(strcmp(str, "Commit") == 0){
|
if (strcmp(str, "Commit") == 0) {
|
||||||
sInfo("vgId:%d, change config from %s. "
|
sInfo(
|
||||||
"this, i:%" PRId64 ", trNum:%d, vers:%d; "
|
"vgId:%d, change config from %s. "
|
||||||
"node, rNum:%d, pNum:%d, trNum:%d, "
|
"this, i:%" PRId64
|
||||||
"buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), "
|
", trNum:%d, vers:%d; "
|
||||||
"cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
|
"node, rNum:%d, pNum:%d, trNum:%d, "
|
||||||
ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion,
|
"buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
|
||||||
ths->replicaNum, ths->peersNum, ths->totalReplicaNum,
|
"), "
|
||||||
ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex,
|
"cond:(next i:%" PRId64 ", t:%" PRId64 " ==%s)",
|
||||||
pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
|
ths->vgId, str, pEntry->index - 1, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum, ths->peersNum,
|
||||||
}
|
ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex,
|
||||||
else{
|
ths->pLogBuf->endIndex, pEntry->index, pEntry->term, TMSG_INFO(pEntry->originalRpcType));
|
||||||
sInfo("vgId:%d, change config from %s. "
|
} else {
|
||||||
"this, i:%" PRId64 ", t:%" PRId64 ", trNum:%d, vers:%d; "
|
sInfo(
|
||||||
"node, rNum:%d, pNum:%d, trNum:%d, "
|
"vgId:%d, change config from %s. "
|
||||||
"buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 "), "
|
"this, i:%" PRId64 ", t:%" PRId64
|
||||||
"cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
|
", trNum:%d, vers:%d; "
|
||||||
ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion,
|
"node, rNum:%d, pNum:%d, trNum:%d, "
|
||||||
ths->replicaNum, ths->peersNum, ths->totalReplicaNum,
|
"buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
|
||||||
ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex, ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex,
|
"), "
|
||||||
pEntry->index -1, ths->commitIndex, ths->pLogBuf->commitIndex);
|
"cond:(pre i:%" PRId64 "==ci:%" PRId64 ", bci:%" PRId64 ")",
|
||||||
|
ths->vgId, str, pEntry->index, pEntry->term, cfg.totalReplicaNum, cfg.changeVersion, ths->replicaNum,
|
||||||
|
ths->peersNum, ths->totalReplicaNum, ths->pLogBuf->startIndex, ths->pLogBuf->commitIndex,
|
||||||
|
ths->pLogBuf->matchIndex, ths->pLogBuf->endIndex, pEntry->index - 1, ths->commitIndex,
|
||||||
|
ths->pLogBuf->commitIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
syncNodeLogConfigInfo(ths, &cfg, "before config change");
|
syncNodeLogConfigInfo(ths, &cfg, "before config change");
|
||||||
|
|
||||||
int32_t oldTotalReplicaNum = ths->totalReplicaNum;
|
int32_t oldTotalReplicaNum = ths->totalReplicaNum;
|
||||||
|
|
||||||
if(cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2){//remove replica
|
if (cfg.totalReplicaNum == 1 || cfg.totalReplicaNum == 2) { // remove replica
|
||||||
|
|
||||||
bool incfg = false;
|
bool incfg = false;
|
||||||
for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){
|
for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
|
||||||
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0
|
if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
|
||||||
&& ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){
|
ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
|
||||||
incfg = true;
|
incfg = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(incfg){//remove other
|
if (incfg) { // remove other
|
||||||
syncNodeResetPeerAndCfg(ths);
|
syncNodeResetPeerAndCfg(ths);
|
||||||
|
|
||||||
//no need to change myNodeInfo
|
// no need to change myNodeInfo
|
||||||
|
|
||||||
if(syncNodeRebuildPeerAndCfg(ths, &cfg) != 0){
|
if (syncNodeRebuildPeerAndCfg(ths, &cfg) != 0) {
|
||||||
return -1;
|
|
||||||
};
|
|
||||||
|
|
||||||
if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){
|
|
||||||
return -1;
|
return -1;
|
||||||
};
|
};
|
||||||
}
|
|
||||||
else{//remove myself
|
if (syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0) {
|
||||||
//no need to do anything actually, to change the following to reduce distruptive server chance
|
return -1;
|
||||||
|
};
|
||||||
|
} else { // remove myself
|
||||||
|
// no need to do anything actually, to change the following to reduce distruptive server chance
|
||||||
|
|
||||||
syncNodeResetPeerAndCfg(ths);
|
syncNodeResetPeerAndCfg(ths);
|
||||||
|
|
||||||
//change myNodeInfo
|
// change myNodeInfo
|
||||||
ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_LEARNER;
|
||||||
|
|
||||||
//change peer and cfg
|
// change peer and cfg
|
||||||
ths->peersNum = 0;
|
ths->peersNum = 0;
|
||||||
memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
|
memcpy(&ths->raftCfg.cfg.nodeInfo[0], &ths->myNodeInfo, sizeof(SNodeInfo));
|
||||||
ths->raftCfg.cfg.replicaNum = 0;
|
ths->raftCfg.cfg.replicaNum = 0;
|
||||||
ths->raftCfg.cfg.totalReplicaNum = 1;
|
ths->raftCfg.cfg.totalReplicaNum = 1;
|
||||||
|
|
||||||
//change other
|
// change other
|
||||||
if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){
|
if (syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
//change state
|
// change state
|
||||||
ths->state = TAOS_SYNC_STATE_LEARNER;
|
ths->state = TAOS_SYNC_STATE_LEARNER;
|
||||||
}
|
}
|
||||||
|
|
||||||
ths->restoreFinish = false;
|
ths->restoreFinish = false;
|
||||||
}
|
} else { // add replica, or change replica type
|
||||||
else{//add replica, or change replica type
|
if (ths->totalReplicaNum == 3) { // change replica type
|
||||||
if(ths->totalReplicaNum == 3){ //change replica type
|
sInfo("vgId:%d, begin change replica type", ths->vgId);
|
||||||
sInfo("vgId:%d, begin change replica type", ths->vgId);
|
|
||||||
|
|
||||||
//change myNodeInfo
|
// change myNodeInfo
|
||||||
for(int32_t j = 0; j < cfg.totalReplicaNum; ++j){
|
for (int32_t j = 0; j < cfg.totalReplicaNum; ++j) {
|
||||||
if(strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0
|
if (strcmp(ths->myNodeInfo.nodeFqdn, cfg.nodeInfo[j].nodeFqdn) == 0 &&
|
||||||
&& ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort){
|
ths->myNodeInfo.nodePort == cfg.nodeInfo[j].nodePort) {
|
||||||
if(cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER){
|
if (cfg.nodeInfo[j].nodeRole == TAOS_SYNC_ROLE_VOTER) {
|
||||||
ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
|
ths->myNodeInfo.nodeRole = TAOS_SYNC_ROLE_VOTER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//change peer and cfg
|
// change peer and cfg
|
||||||
syncNodeChangePeerAndCfgToVoter(ths, &cfg);
|
syncNodeChangePeerAndCfgToVoter(ths, &cfg);
|
||||||
|
|
||||||
//change other
|
// change other
|
||||||
syncNodeChangeToVoter(ths);
|
syncNodeChangeToVoter(ths);
|
||||||
|
|
||||||
//change state
|
// change state
|
||||||
if(ths->state ==TAOS_SYNC_STATE_LEARNER){
|
if (ths->state == TAOS_SYNC_STATE_LEARNER) {
|
||||||
if(ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER ){
|
if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_VOTER) {
|
||||||
ths->state = TAOS_SYNC_STATE_FOLLOWER;
|
ths->state = TAOS_SYNC_STATE_FOLLOWER;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ths->restoreFinish = false;
|
ths->restoreFinish = false;
|
||||||
}
|
} else { // add replica
|
||||||
else{//add replica
|
|
||||||
sInfo("vgId:%d, begin add replica", ths->vgId);
|
sInfo("vgId:%d, begin add replica", ths->vgId);
|
||||||
|
|
||||||
//no need to change myNodeInfo
|
// no need to change myNodeInfo
|
||||||
|
|
||||||
//change peer and cfg
|
// change peer and cfg
|
||||||
if(syncNodeRebuildPeerAndCfg(ths, &cfg) != 0){
|
if (syncNodeRebuildPeerAndCfg(ths, &cfg) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
};
|
};
|
||||||
|
|
||||||
//change other
|
// change other
|
||||||
if(syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0){
|
if (syncNodeRebuildAndCopyIfExist(ths, oldTotalReplicaNum) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
};
|
};
|
||||||
|
|
||||||
//no need to change state
|
// no need to change state
|
||||||
|
|
||||||
if(ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER){
|
if (ths->myNodeInfo.nodeRole == TAOS_SYNC_ROLE_LEARNER) {
|
||||||
ths->restoreFinish = false;
|
ths->restoreFinish = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2867,7 +2863,7 @@ int32_t syncNodeChangeConfig(SSyncNode* ths, SSyncRaftEntry* pEntry, char* str){
|
||||||
|
|
||||||
syncNodeLogConfigInfo(ths, &cfg, "after config change");
|
syncNodeLogConfigInfo(ths, &cfg, "after config change");
|
||||||
|
|
||||||
if(syncWriteCfgFile(ths) != 0){
|
if (syncWriteCfgFile(ths) != 0) {
|
||||||
sError("vgId:%d, failed to create sync cfg file", ths->vgId);
|
sError("vgId:%d, failed to create sync cfg file", ths->vgId);
|
||||||
return -1;
|
return -1;
|
||||||
};
|
};
|
||||||
|
@ -2896,7 +2892,7 @@ int32_t syncNodeAppend(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
||||||
code = 0;
|
code = 0;
|
||||||
_out:;
|
_out:;
|
||||||
// proceed match index, with replicating on needed
|
// proceed match index, with replicating on needed
|
||||||
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
|
SyncIndex matchIndex = syncLogBufferProceed(ths->pLogBuf, ths, NULL, "Append");
|
||||||
|
|
||||||
sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
|
sTrace("vgId:%d, append raft entry. index:%" PRId64 ", term:%" PRId64 " pBuf: [%" PRId64 " %" PRId64 " %" PRId64
|
||||||
", %" PRId64 ")",
|
", %" PRId64 ")",
|
||||||
|
@ -2927,7 +2923,7 @@ bool syncNodeHeartbeatReplyTimeout(SSyncNode* pSyncNode) {
|
||||||
int32_t toCount = 0;
|
int32_t toCount = 0;
|
||||||
int64_t tsNow = taosGetTimestampMs();
|
int64_t tsNow = taosGetTimestampMs();
|
||||||
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
for (int32_t i = 0; i < pSyncNode->peersNum; ++i) {
|
||||||
if(pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER){
|
if (pSyncNode->peersNodeInfo[i].nodeRole == TAOS_SYNC_ROLE_LEARNER) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
|
int64_t recvTime = syncIndexMgrGetRecvTime(pSyncNode->pMatchIndex, &(pSyncNode->peersId[i]));
|
||||||
|
@ -3191,9 +3187,9 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
|
||||||
pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
|
pEntry = syncEntryBuildFromRpcMsg(pMsg, term, index);
|
||||||
}
|
}
|
||||||
|
|
||||||
//1->2, config change is add in write thread, and will continue in sync thread
|
// 1->2, config change is add in write thread, and will continue in sync thread
|
||||||
//need save message for it
|
// need save message for it
|
||||||
if(pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE){
|
if (pMsg->msgType == TDMT_SYNC_CONFIG_CHANGE) {
|
||||||
SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
|
SRespStub stub = {.createTime = taosGetTimestampMs(), .rpcMsg = *pMsg};
|
||||||
uint64_t seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
|
uint64_t seqNum = syncRespMgrAdd(ths->pSyncRespMgr, &stub);
|
||||||
pEntry->seqNum = seqNum;
|
pEntry->seqNum = seqNum;
|
||||||
|
@ -3209,21 +3205,21 @@ int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIn
|
||||||
(*pRetIndex) = index;
|
(*pRetIndex) = index;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE){
|
if (pEntry->originalRpcType == TDMT_SYNC_CONFIG_CHANGE) {
|
||||||
int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
|
int32_t code = syncNodeCheckChangeConfig(ths, pEntry);
|
||||||
if(code < 0){
|
if (code < 0) {
|
||||||
sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr());
|
sError("vgId:%d, failed to check change config since %s.", ths->vgId, terrstr());
|
||||||
syncEntryDestroy(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
pEntry = NULL;
|
pEntry = NULL;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(code > 0){
|
if (code > 0) {
|
||||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||||
(void)syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
|
(void)syncRespMgrGetAndDel(ths->pSyncRespMgr, pEntry->seqNum, &rsp.info);
|
||||||
if (rsp.info.handle != NULL) {
|
if (rsp.info.handle != NULL) {
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
}
|
}
|
||||||
syncEntryDestroy(pEntry);
|
syncEntryDestroy(pEntry);
|
||||||
pEntry = NULL;
|
pEntry = NULL;
|
||||||
return -1;
|
return -1;
|
||||||
|
|
Loading…
Reference in New Issue