change sync log
This commit is contained in:
parent
7749987f53
commit
66c6e5ba05
|
@ -13,8 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
//#include <stdint.h>
|
#define _DEFAULT_SOURCE
|
||||||
//#include <stdbool.h>
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "hash.h"
|
#include "hash.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
|
@ -392,7 +391,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
|
||||||
int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, msgLen);
|
int32_t retLen = taosWriteMsg(pPeer->peerFd, msg, msgLen);
|
||||||
|
|
||||||
if (retLen == msgLen) {
|
if (retLen == msgLen) {
|
||||||
sDebug("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
|
sTrace("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
|
||||||
} else {
|
} else {
|
||||||
sDebug("%s, failed to send forward ack, restart", pPeer->id);
|
sDebug("%s, failed to send forward ack, restart", pPeer->id);
|
||||||
syncRestartConnection(pPeer);
|
syncRestartConnection(pPeer);
|
||||||
|
@ -891,7 +890,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
|
||||||
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
||||||
SFwdInfo * pFwdInfo;
|
SFwdInfo * pFwdInfo;
|
||||||
|
|
||||||
sDebug("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version);
|
sTrace("%s, forward-rsp is received, code:%x hver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version);
|
||||||
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
|
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
|
||||||
|
|
||||||
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
|
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
|
||||||
|
@ -910,7 +909,7 @@ static void syncProcessForwardFromPeer(char *cont, SSyncPeer *pPeer) {
|
||||||
SSyncNode *pNode = pPeer->pSyncNode;
|
SSyncNode *pNode = pPeer->pSyncNode;
|
||||||
SWalHead * pHead = (SWalHead *)cont;
|
SWalHead * pHead = (SWalHead *)cont;
|
||||||
|
|
||||||
sDebug("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
|
sTrace("%s, forward is received, hver:%" PRIu64 ", len:%d", pPeer->id, pHead->version, pHead->len);
|
||||||
|
|
||||||
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
|
if (nodeRole == TAOS_SYNC_ROLE_SLAVE) {
|
||||||
// nodeVersion = pHead->version;
|
// nodeVersion = pHead->version;
|
||||||
|
@ -1191,7 +1190,7 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
|
||||||
pFwdInfo->time = time;
|
pFwdInfo->time = time;
|
||||||
|
|
||||||
pSyncFwds->fwds++;
|
pSyncFwds->fwds++;
|
||||||
sDebug("vgId:%d, fwd info is saved, hver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds);
|
sTrace("vgId:%d, fwd info is saved, hver:%" PRIu64 " fwds:%d ", pNode->vgId, version, pSyncFwds->fwds);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
|
static void syncRemoveConfirmedFwdInfo(SSyncNode *pNode) {
|
||||||
|
@ -1228,7 +1227,7 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
|
||||||
}
|
}
|
||||||
|
|
||||||
if (confirm && pFwdInfo->confirmed == 0) {
|
if (confirm && pFwdInfo->confirmed == 0) {
|
||||||
sDebug("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
|
sTrace("vgId:%d, forward is confirmed, hver:%" PRIu64 " code:%x", pNode->vgId, pFwdInfo->version, pFwdInfo->code);
|
||||||
(*pNode->confirmForward)(pNode->ahandle, pFwdInfo->mhandle, pFwdInfo->code);
|
(*pNode->confirmForward)(pNode->ahandle, pFwdInfo->mhandle, pFwdInfo->code);
|
||||||
pFwdInfo->confirmed = 1;
|
pFwdInfo->confirmed = 1;
|
||||||
}
|
}
|
||||||
|
@ -1335,7 +1334,7 @@ static int32_t syncForwardToPeerImpl(SSyncNode *pNode, void *data, void *mhandle
|
||||||
|
|
||||||
int32_t retLen = write(pPeer->peerFd, pSyncHead, fwdLen);
|
int32_t retLen = write(pPeer->peerFd, pSyncHead, fwdLen);
|
||||||
if (retLen == fwdLen) {
|
if (retLen == fwdLen) {
|
||||||
sDebug("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len);
|
sTrace("%s, forward is sent, hver:%" PRIu64 " contLen:%d", pPeer->id, pWalHead->version, pWalHead->len);
|
||||||
} else {
|
} else {
|
||||||
sError("%s, failed to forward, hver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen);
|
sError("%s, failed to forward, hver:%" PRIu64 " retLen:%d", pPeer->id, pWalHead->version, retLen);
|
||||||
syncRestartConnection(pPeer);
|
syncRestartConnection(pPeer);
|
||||||
|
|
|
@ -13,6 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
@ -154,7 +155,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
|
||||||
ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
|
ret = taosReadMsg(pPeer->syncFd, pHead->cont, pHead->len);
|
||||||
if (ret < 0) break;
|
if (ret < 0) break;
|
||||||
|
|
||||||
sDebug("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version);
|
sTrace("%s, restore a record, qtype:wal len:%d hver:%" PRIu64, pPeer->id, pHead->len, pHead->version);
|
||||||
|
|
||||||
if (lastVer == pHead->version) {
|
if (lastVer == pHead->version) {
|
||||||
sError("%s, failed to restore record, same hver:%" PRIu64 ", wal sync failed" PRIu64, pPeer->id, lastVer);
|
sError("%s, failed to restore record, same hver:%" PRIu64 ", wal sync failed" PRIu64, pPeer->id, lastVer);
|
||||||
|
@ -222,7 +223,7 @@ int32_t syncSaveIntoBuffer(SSyncPeer *pPeer, SWalHead *pHead) {
|
||||||
memcpy(pRecv->offset, pHead, len);
|
memcpy(pRecv->offset, pHead, len);
|
||||||
pRecv->offset += len;
|
pRecv->offset += len;
|
||||||
pRecv->forwards++;
|
pRecv->forwards++;
|
||||||
sDebug("%s, fwd is saved into queue, hver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards);
|
sTrace("%s, fwd is saved into queue, hver:%" PRIu64 " fwds:%d", pPeer->id, pHead->version, pRecv->forwards);
|
||||||
} else {
|
} else {
|
||||||
sError("%s, buffer size:%d is too small", pPeer->id, pRecv->bufferSize);
|
sError("%s, buffer size:%d is too small", pPeer->id, pRecv->bufferSize);
|
||||||
pRecv->code = -1; // set error code
|
pRecv->code = -1; // set error code
|
||||||
|
|
|
@ -13,10 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <stdint.h>
|
#define _DEFAULT_SOURCE
|
||||||
#include <stdbool.h>
|
|
||||||
#include <sys/inotify.h>
|
#include <sys/inotify.h>
|
||||||
#include <unistd.h>
|
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
@ -268,7 +266,7 @@ static int32_t syncRetrieveLastWal(SSyncPeer *pPeer, char *name, uint64_t fversi
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
sDebug("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
|
sTrace("%s, last wal is forwarded, hver:%" PRIu64, pPeer->id, pHead->version);
|
||||||
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
|
int32_t ret = taosWriteMsg(pPeer->syncFd, pHead, wsize);
|
||||||
if (ret != wsize) break;
|
if (ret != wsize) break;
|
||||||
pPeer->sversion = pHead->version;
|
pPeer->sversion = pHead->version;
|
||||||
|
|
Loading…
Reference in New Issue