refact: adjust util log
This commit is contained in:
parent
aef00cfb83
commit
660bfde593
|
@ -75,8 +75,6 @@ int32_t syncUtilElectRandomMS(int32_t min, int32_t max);
|
||||||
int32_t syncUtilQuorum(int32_t replicaNum);
|
int32_t syncUtilQuorum(int32_t replicaNum);
|
||||||
const char* syncStr(ESyncState state);
|
const char* syncStr(ESyncState state);
|
||||||
void syncUtilMsgHtoN(void* msg);
|
void syncUtilMsgHtoN(void* msg);
|
||||||
bool syncUtilUserPreCommit(tmsg_t msgType);
|
|
||||||
bool syncUtilUserRollback(tmsg_t msgType);
|
|
||||||
|
|
||||||
void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf);
|
void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf);
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,7 @@ int32_t syncInit() {
|
||||||
uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF);
|
uint32_t seed = (uint32_t)(taosGetTimestampNs() & 0x00000000FFFFFFFF);
|
||||||
taosSeedRand(seed);
|
taosSeedRand(seed);
|
||||||
|
|
||||||
memset(&gSyncEnv, 0, sizeof(SSyncEnv));
|
(void)memset(&gSyncEnv, 0, sizeof(SSyncEnv));
|
||||||
gSyncEnv.pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
|
gSyncEnv.pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV");
|
||||||
|
|
||||||
gNodeRefId = taosOpenRef(200, (RefFp)syncNodeClose);
|
gNodeRefId = taosOpenRef(200, (RefFp)syncNodeClose);
|
||||||
|
@ -59,7 +59,7 @@ int32_t syncInit() {
|
||||||
void syncCleanUp() {
|
void syncCleanUp() {
|
||||||
atomic_store_8(&gSyncEnv.isStart, 0);
|
atomic_store_8(&gSyncEnv.isStart, 0);
|
||||||
taosTmrCleanUp(gSyncEnv.pTimerManager);
|
taosTmrCleanUp(gSyncEnv.pTimerManager);
|
||||||
memset(&gSyncEnv, 0, sizeof(SSyncEnv));
|
(void)memset(&gSyncEnv, 0, sizeof(SSyncEnv));
|
||||||
|
|
||||||
if (gNodeRefId != -1) {
|
if (gNodeRefId != -1) {
|
||||||
sDebug("sync node rset is closed, rsetId:%d", gNodeRefId);
|
sDebug("sync node rset is closed, rsetId:%d", gNodeRefId);
|
||||||
|
@ -77,8 +77,7 @@ void syncCleanUp() {
|
||||||
int64_t syncNodeAdd(SSyncNode *pNode) {
|
int64_t syncNodeAdd(SSyncNode *pNode) {
|
||||||
pNode->rid = taosAddRef(gNodeRefId, pNode);
|
pNode->rid = taosAddRef(gNodeRefId, pNode);
|
||||||
if (pNode->rid < 0) {
|
if (pNode->rid < 0) {
|
||||||
terrno = TSDB_CODE_SYN_WRONG_REF;
|
return terrno = TSDB_CODE_SYN_WRONG_REF;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sDebug("vgId:%d, sync node refId:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId);
|
sDebug("vgId:%d, sync node refId:%" PRId64 " is added to rsetId:%d", pNode->vgId, pNode->rid, gNodeRefId);
|
||||||
|
@ -111,8 +110,7 @@ void syncNodeRelease(SSyncNode *pNode) {
|
||||||
int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
|
int64_t syncHbTimerDataAdd(SSyncHbTimerData *pData) {
|
||||||
pData->rid = taosAddRef(gHbDataRefId, pData);
|
pData->rid = taosAddRef(gHbDataRefId, pData);
|
||||||
if (pData->rid < 0) {
|
if (pData->rid < 0) {
|
||||||
terrno = TSDB_CODE_SYN_WRONG_REF;
|
return terrno = TSDB_CODE_SYN_WRONG_REF;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pData->rid;
|
return pData->rid;
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
#include "syncSnapshot.h"
|
#include "syncSnapshot.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
|
||||||
void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
|
static void syncCfg2SimpleStr(const SSyncCfg* pCfg, char* buf, int32_t bufLen) {
|
||||||
int32_t len = snprintf(buf, bufLen, "{num:%d, as:%d, [", pCfg->replicaNum, pCfg->myIndex);
|
int32_t len = snprintf(buf, bufLen, "{num:%d, as:%d, [", pCfg->replicaNum, pCfg->myIndex);
|
||||||
for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
|
for (int32_t i = 0; i < pCfg->replicaNum; ++i) {
|
||||||
len += snprintf(buf + len, bufLen - len, "%s:%d", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
|
len += snprintf(buf + len, bufLen - len, "%s:%d", pCfg->nodeInfo[i].nodeFqdn, pCfg->nodeInfo[i].nodePort);
|
||||||
|
@ -43,14 +43,13 @@ void syncUtilNodeInfo2EpSet(const SNodeInfo* pInfo, SEpSet* pEpSet) {
|
||||||
|
|
||||||
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
|
bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId* raftId) {
|
||||||
uint32_t ipv4 = 0xFFFFFFFF;
|
uint32_t ipv4 = 0xFFFFFFFF;
|
||||||
sDebug(
|
sDebug("vgId:%d, resolve sync addr from fqdn, dnode:%d cluster:%" PRId64 " fqdn:%s port:%u", vgId, pInfo->nodeId,
|
||||||
"vgId:%d, start to resolve sync addr fqdn in %d seconds, "
|
pInfo->clusterId, pInfo->nodeFqdn, pInfo->nodePort);
|
||||||
"dnode:%d cluster:%" PRId64 " fqdn:%s port:%u ",
|
for (int32_t i = 0; i < tsResolveFQDNRetryTime; i++) {
|
||||||
vgId, tsResolveFQDNRetryTime, pInfo->nodeId, pInfo->clusterId, pInfo->nodeFqdn, pInfo->nodePort);
|
|
||||||
for (int i = 0; i < tsResolveFQDNRetryTime; i++) {
|
|
||||||
int32_t code = taosGetIpv4FromFqdn(pInfo->nodeFqdn, &ipv4);
|
int32_t code = taosGetIpv4FromFqdn(pInfo->nodeFqdn, &ipv4);
|
||||||
if (code) {
|
if (code) {
|
||||||
sError("failed to resolve ipv4 addr, fqdn:%s, wait one second", pInfo->nodeFqdn);
|
sError("vgId:%d, failed to resolve sync addr, dnode:%d fqdn:%s, wait one second", vgId, pInfo->nodeId,
|
||||||
|
pInfo->nodeFqdn);
|
||||||
taosSsleep(1);
|
taosSsleep(1);
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
@ -58,7 +57,7 @@ bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId*
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
|
if (ipv4 == 0xFFFFFFFF || ipv4 == 1) {
|
||||||
sError("failed to resolve ipv4 addr, fqdn:%s", pInfo->nodeFqdn);
|
sError("vgId:%d, failed to resolve sync addr, fqdn:%s", vgId, pInfo->nodeFqdn);
|
||||||
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -68,14 +67,20 @@ bool syncUtilNodeInfo2RaftId(const SNodeInfo* pInfo, SyncGroupId vgId, SRaftId*
|
||||||
raftId->addr = SYNC_ADDR(pInfo);
|
raftId->addr = SYNC_ADDR(pInfo);
|
||||||
raftId->vgId = vgId;
|
raftId->vgId = vgId;
|
||||||
|
|
||||||
sInfo("vgId:%d, sync addr:%" PRIu64 ", dnode:%d cluster:%" PRId64 " fqdn:%s ip:%s port:%u ipv4:%u", vgId,
|
sInfo("vgId:%d, sync addr:%" PRIu64 " is resolved, dnode:%d cluster:%" PRId64 " fqdn:%s port:%u ip:%s ipv4:%u", vgId,
|
||||||
raftId->addr, pInfo->nodeId, pInfo->clusterId, pInfo->nodeFqdn, ipbuf, pInfo->nodePort, ipv4);
|
raftId->addr, pInfo->nodeId, pInfo->clusterId, pInfo->nodeFqdn, pInfo->nodePort, ipbuf, ipv4);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
|
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
|
||||||
if (pId1->addr == pId2->addr && pId1->vgId == pId2->vgId) return true;
|
if (pId1->addr == pId2->addr && pId1->vgId == pId2->vgId) {
|
||||||
if ((CID(pId1) == 0 || CID(pId2) == 0) && (DID(pId1) == DID(pId2)) && pId1->vgId == pId2->vgId) return true;
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((CID(pId1) == 0 || CID(pId2) == 0) && (DID(pId1) == DID(pId2)) && pId1->vgId == pId2->vgId) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -98,10 +103,6 @@ void syncUtilMsgHtoN(void* msg) {
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool syncUtilUserPreCommit(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
|
||||||
|
|
||||||
bool syncUtilUserRollback(tmsg_t msgType) { return msgType != TDMT_SYNC_NOOP && msgType != TDMT_SYNC_LEADER_TRANSFER; }
|
|
||||||
|
|
||||||
void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf) {
|
void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf) {
|
||||||
(void)memset(buf, 0, TSDB_ARB_TOKEN_SIZE);
|
(void)memset(buf, 0, TSDB_ARB_TOKEN_SIZE);
|
||||||
int32_t randVal = taosSafeRand() % 1000;
|
int32_t randVal = taosSafeRand() % 1000;
|
||||||
|
@ -142,18 +143,18 @@ static void syncLogBufferStates2Str(SSyncNode* pSyncNode, char* buf, int32_t buf
|
||||||
if (pBuf == NULL) {
|
if (pBuf == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
int len = 0;
|
int32_t len = 0;
|
||||||
len += snprintf(buf + len, bufLen - len, "[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pBuf->startIndex,
|
len += snprintf(buf + len, bufLen - len, "[%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")", pBuf->startIndex,
|
||||||
pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
|
static void syncLogReplStates2Str(SSyncNode* pSyncNode, char* buf, int32_t bufLen) {
|
||||||
int len = 0;
|
int32_t len = 0;
|
||||||
len += snprintf(buf + len, bufLen - len, "%s", "{");
|
len += snprintf(buf + len, bufLen - len, "%s", "{");
|
||||||
for (int32_t i = 0; i < pSyncNode->replicaNum; i++) {
|
for (int32_t i = 0; i < pSyncNode->replicaNum; i++) {
|
||||||
SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i];
|
SSyncLogReplMgr* pMgr = pSyncNode->logReplMgrs[i];
|
||||||
if (pMgr == NULL) break;
|
if (pMgr == NULL) break;
|
||||||
len += snprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 " %" PRId64 ", %" PRId64 ")", i, pMgr->restored,
|
len += snprintf(buf + len, bufLen - len, "%d:%d [%" PRId64 " %" PRId64 ", %" PRId64 "]", i, pMgr->restored,
|
||||||
pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
|
pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
|
||||||
if (i + 1 < pSyncNode->replicaNum) {
|
if (i + 1 < pSyncNode->replicaNum) {
|
||||||
len += snprintf(buf + len, bufLen - len, "%s", ", ");
|
len += snprintf(buf + len, bufLen - len, "%s", ", ");
|
||||||
|
@ -280,7 +281,7 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
|
||||||
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRId64 " last-cfg:%" PRId64
|
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRId64 " last-cfg:%" PRId64
|
||||||
", seq:%d, ack:%d, "
|
", seq:%d, ack:%d, "
|
||||||
" buf:[%" PRId64 " %" PRId64 ", %" PRId64
|
" buf:[%" PRId64 " %" PRId64 ", %" PRId64
|
||||||
"), finish:%d, as:%d, to-dnode:%d}"
|
"], finish:%d, as:%d, to-dnode:%d}"
|
||||||
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64
|
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64
|
||||||
", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64
|
", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64
|
||||||
"}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
|
"}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
|
||||||
|
|
Loading…
Reference in New Issue