From 66c6e5ba05a109e9328e08dd096276b9f5bd7b5e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 20 Nov 2020 16:51:16 +0800 Subject: [PATCH 1/2] change sync log --- src/sync/src/syncMain.c | 15 +++++++-------- src/sync/src/syncRestore.c | 5 +++-- src/sync/src/syncRetrieve.c | 6 ++---- 3 files changed, 12 insertions(+), 14 deletions(-) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 2ca77fa404..4281d85386 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -13,8 +13,7 @@ * along with this program. If not, see . */ -//#include -//#include +#define _DEFAULT_SOURCE #include "os.h" #include "hash.h" #include "tlog.h" @@ -392,7 +391,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) { int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, msgLen); if (retLen == msgLen) { - sDebug("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version); + sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version); } else { sDebug("%s, failed to send forward ack, restart", pPeer->id); syncRestartConnection(pPeer); @@ -891,7 +890,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) { SSyncFwds *pSyncFwds = pNode->pSyncFwds; SFwdInfo * pFwdInfo; - sDebug("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version); + sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version); SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first; if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) { @@ -910,7 +909,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) { SSyncNode *pNode = pPeer->pSyncNode; SWalHead * pHead = (SWalHead *)cont; - sDebug("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len); + sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len); if (nodeRole == TAOS_SYNC_ROLE_SLAVE) { // nodeVersion = pHead->version; @@ -1191,7 +1190,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) { pFwdInfo->time = time; pSyncFwds->fwds++; - sDebug("vgId:%d, fwd info is saved, hver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds); + sTrace("vgId:%d, fwd info is saved, hver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds); } static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) { @@ -1228,7 +1227,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code } if (confirm && pFwdInfo->confirmed == 0) { - sDebug("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); + sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code); (*pNode->confirmForward)(pNode->ahandle, pFwdInfo->mhandle, pFwdInfo->code); pFwdInfo->confirmed = 1; } @@ -1335,7 +1334,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle int32_t retLen = write(pPeer->peerFd, pSyncHead, fwdLen); if (retLen == fwdLen) { - sDebug("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); + sTrace("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len); } else { sError("%s, failed to forward, hver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen); syncRestartConnection(pPeer); diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index e7901e6eb8..ebf1e91d90 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +#define _DEFAULT_SOURCE #include "os.h" #include "tlog.h" #include "tutil.h" @@ -154,7 +155,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len); if (ret < 0) break; - sDebug("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version); + sTrace("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version); if (lastVer == pHead->version) { sError("%s, failed to restore record, same hver:%" PRIu64 ", wal sync failed" PRIu64, pPeer->id, lastVer); @@ -222,7 +223,7 @@ int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) { memcpy(pRecv->offset, pHead, len); pRecv->offset += len; pRecv->forwards++; - sDebug("%s, fwd is saved into queue, hver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards); + sTrace("%s, fwd is saved into queue, hver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards); } else { sError("%s, buffer size:%d is too small", pPeer->id, pRecv->bufferSize); pRecv->code = -1; // set error code diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index 03f5a7bd94..c02bcbe11d 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -13,10 +13,8 @@ * along with this program. If not, see . */ -#include -#include +#define _DEFAULT_SOURCE #include -#include #include "os.h" #include "tlog.h" #include "tutil.h" @@ -268,7 +266,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi break; } - sDebug("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version); + sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version); int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize); if (ret != wsize) break; pPeer->sversion = pHead->version; From 2b887c4140cf1482aa0431ccccd6b835005e4fe5 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 20 Nov 2020 17:48:07 +0800 Subject: [PATCH 2/2] TD-2000 --- src/sync/src/syncMain.c | 13 +++++++------ src/sync/src/syncRestore.c | 5 +++-- src/sync/src/syncRetrieve.c | 18 +++++++++--------- src/sync/src/tarbitrator.c | 8 ++++---- 4 files changed, 23 insertions(+), 21 deletions(-) diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 4281d85386..843de9461f 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -820,7 +820,7 @@ static void syncProcessSyncRequest(char *msg, SSyncPeer *pPeer) { pthread_attr_destroy(&thattr); if (ret != 0) { - sError("%s, failed to create sync thread(%s)", pPeer->id, strerror(errno)); + sError("%s, failed to create sync thread since %s", pPeer->id, strerror(errno)); syncDecPeerRef(pPeer); } else { pPeer->sstatus = TAOS_SYNC_STATUS_START; @@ -1105,7 +1105,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { SFirstPkt firstPkt; if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { - sError("failed to read peer first pkt from ip:%s(%s)", ipstr, strerror(errno)); + sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno)); taosCloseSocket(connFd); return; } @@ -1159,7 +1159,7 @@ static void syncProcessBrokenLink(void *param) { if (taosAcquireRef(tsSyncRefId, pNode->rid) == NULL) return; pthread_mutex_lock(&(pNode->mutex)); - sDebug("%s, TCP link is broken(%s)", pPeer->id, strerror(errno)); + sDebug("%s, TCP link is broken since %s", pPeer->id, strerror(errno)); pPeer->peerFd = -1; if (syncDecPeerRef(pPeer) != 0) { @@ -1242,9 +1242,10 @@ static void syncMonitorNodeRole(void *param, void *tmrId) { if (index == pNode->selfIndex) continue; SSyncPeer *pPeer = pNode->peerInfo[index]; - if (pPeer->role <= TAOS_SYNC_ROLE_UNSYNCED || nodeRole <= TAOS_SYNC_ROLE_UNSYNCED) { - syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId()); - } + if (pPeer->role > TAOS_SYNC_ROLE_UNSYNCED && nodeRole > TAOS_SYNC_ROLE_UNSYNCED) continue; + if (pPeer->sstatus > TAOS_SYNC_STATUS_INIT || nodeSStatus > TAOS_SYNC_STATUS_INIT) continue; + + syncSendPeersStatusMsgToPeer(pPeer, 1, SYNC_STATUS_CHECK_ROLE, syncGenTranId()); } pNode->pRoleTimer = taosTmrStart(syncMonitorNodeRole, SYNC_ROLE_TIMER, (void *)pNode->rid, tsSyncTmrCtrl); diff --git a/src/sync/src/syncRestore.c b/src/sync/src/syncRestore.c index ebf1e91d90..cc2315fb15 100644 --- a/src/sync/src/syncRestore.c +++ b/src/sync/src/syncRestore.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "os.h" +#include "taoserror.h" #include "tlog.h" #include "tutil.h" #include "ttimer.h" @@ -127,7 +128,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) { } if (code < 0) { - sError("%s, failed to restore %s(%s)", pPeer->id, name, strerror(errno)); + sError("%s, failed to restore %s since %s", pPeer->id, name, strerror(errno)); } return code; @@ -167,7 +168,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) { } if (code < 0) { - sError("%s, failed to restore wal(%s)", pPeer->id, strerror(errno)); + sError("%s, failed to restore wal from syncFd:%d since %s", pPeer->id, pPeer->syncFd, strerror(errno)); } free(buffer); diff --git a/src/sync/src/syncRetrieve.c b/src/sync/src/syncRetrieve.c index c02bcbe11d..82f40854e8 100644 --- a/src/sync/src/syncRetrieve.c +++ b/src/sync/src/syncRetrieve.c @@ -32,7 +32,7 @@ static int32_t syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { pPeer->watchNum = 0; pPeer->notifyFd = inotify_init1(IN_NONBLOCK); if (pPeer->notifyFd < 0) { - sError("%s, failed to init inotify(%s)", pPeer->id, strerror(errno)); + sError("%s, failed to init inotify since %s", pPeer->id, strerror(errno)); return -1; } @@ -49,14 +49,14 @@ static int32_t syncAddIntoWatchList(SSyncPeer *pPeer, char *name) { if (*wd >= 0) { if (inotify_rm_watch(pPeer->notifyFd, *wd) < 0) { - sError("%s, failed to remove wd:%d(%s)", pPeer->id, *wd, strerror(errno)); + sError("%s, failed to remove wd:%d since %s", pPeer->id, *wd, strerror(errno)); return -1; } } *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_DELETE); if (*wd == -1) { - sError("%s, failed to add %s(%s)", pPeer->id, name, strerror(errno)); + sError("%s, failed to add %s since %s", pPeer->id, name, strerror(errno)); return -1; } else { sDebug("%s, monitor %s, wd:%d watchNum:%d", pPeer->id, name, *wd, pPeer->watchNum); @@ -73,7 +73,7 @@ static int32_t syncAreFilesModified(SSyncPeer *pPeer) { char buf[2048]; int32_t len = read(pPeer->notifyFd, buf, sizeof(buf)); if (len < 0 && errno != EAGAIN) { - sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); + sError("%s, failed to read notify FD since %s", pPeer->id, strerror(errno)); return -1; } @@ -159,7 +159,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) { } if (code < 0) { - sError("%s, failed to retrieve file(%s)", pPeer->id, strerror(errno)); + sError("%s, failed to retrieve file since %s", pPeer->id, strerror(errno)); } return code; @@ -199,7 +199,7 @@ static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) { taosClose(pPeer->notifyFd); pPeer->notifyFd = inotify_init1(IN_NONBLOCK); if (pPeer->notifyFd < 0) { - sError("%s, failed to init inotify(%s)", pPeer->id, strerror(errno)); + sError("%s, failed to init inotify since %s", pPeer->id, strerror(errno)); return -1; } @@ -214,7 +214,7 @@ static int32_t syncMonitorLastWal(SSyncPeer *pPeer, char *name) { *wd = inotify_add_watch(pPeer->notifyFd, name, IN_MODIFY | IN_CLOSE_WRITE); if (*wd == -1) { - sError("%s, failed to watch last wal(%s)", pPeer->id, strerror(errno)); + sError("%s, failed to watch last wal since %s", pPeer->id, strerror(errno)); return -1; } @@ -225,7 +225,7 @@ static int32_t syncCheckLastWalChanges(SSyncPeer *pPeer, uint32_t *pEvent) { char buf[2048]; int32_t len = read(pPeer->notifyFd, buf, sizeof(buf)); if (len < 0 && errno != EAGAIN) { - sError("%s, failed to read notify FD(%s)", pPeer->id, strerror(errno)); + sError("%s, failed to read notify FD since %s", pPeer->id, strerror(errno)); return -1; } @@ -422,7 +422,7 @@ static int32_t syncRetrieveWal(SSyncPeer *pPeer) { memset(&walHead, 0, sizeof(walHead)); code = taosWriteMsg(pPeer->syncFd, &walHead, sizeof(walHead)); } else { - sError("%s, failed to send wal(%s)", pPeer->id, strerror(errno)); + sError("%s, failed to send wal since %s", pPeer->id, strerror(errno)); } return code; diff --git a/src/sync/src/tarbitrator.c b/src/sync/src/tarbitrator.c index b7f819a3cd..4016042de2 100644 --- a/src/sync/src/tarbitrator.c +++ b/src/sync/src/tarbitrator.c @@ -115,14 +115,14 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { SFirstPkt firstPkt; if (taosReadMsg(connFd, &firstPkt, sizeof(firstPkt)) != sizeof(firstPkt)) { - sError("failed to read peer first pkt from ip:%s(%s)", ipstr, strerror(errno)); + sError("failed to read peer first pkt from ip:%s since %s", ipstr, strerror(errno)); taosCloseSocket(connFd); return; } - SNodeConn *pNode = (SNodeConn *)calloc(sizeof(SNodeConn), 1); + SNodeConn *pNode = calloc(sizeof(SNodeConn), 1); if (pNode == NULL) { - sError("failed to allocate memory(%s)", strerror(errno)); + sError("failed to allocate memory since %s", strerror(errno)); taosCloseSocket(connFd); return; } @@ -146,7 +146,7 @@ static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { static void arbProcessBrokenLink(void *param) { SNodeConn *pNode = param; - sDebug("%s, TCP link is broken(%s), close connection", pNode->id, strerror(errno)); + sDebug("%s, TCP link is broken since %s, close connection", pNode->id, strerror(errno)); tfree(pNode); }