Merge pull request #15679 from taosdata/fix/dnode
refactor: adjust sync log
This commit is contained in:
commit
0e29a23a30
|
@ -146,7 +146,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
|
||||||
|
|
||||||
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, pHead->vgId);
|
||||||
if (pVnode == NULL) {
|
if (pVnode == NULL) {
|
||||||
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, msgtype:%s qtype:%d", pHead->vgId, pMsg,
|
dGError("vgId:%d, msg:%p failed to put into vnode queue since %s, type:%s qtype:%d", pHead->vgId, pMsg,
|
||||||
terrstr(), TMSG_INFO(pMsg->msgType), qtype);
|
terrstr(), TMSG_INFO(pMsg->msgType), qtype);
|
||||||
return terrno != 0 ? terrno : -1;
|
return terrno != 0 ? terrno : -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,7 +71,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbDebug("vgId:%d, tsdb is opened for %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days,
|
tsdbDebug("vgId:%d, tsdb is opened at %s, days:%d, keep:%d,%d,%d", TD_VID(pVnode), pTsdb->path, pTsdb->keepCfg.days,
|
||||||
pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2);
|
pTsdb->keepCfg.keep0, pTsdb->keepCfg.keep1, pTsdb->keepCfg.keep2);
|
||||||
|
|
||||||
*ppTsdb = pTsdb;
|
*ppTsdb = pTsdb;
|
||||||
|
|
|
@ -40,7 +40,7 @@ int vnodeOpenBufPool(SVnode *pVnode, int64_t size) {
|
||||||
pVnode->pPool = pPool;
|
pVnode->pPool = pPool;
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("vgId:%d, vnode buffer pool is opened, pool size: %" PRId64, TD_VID(pVnode), size);
|
vDebug("vgId:%d, vnode buffer pool is opened, size:%" PRId64, TD_VID(pVnode), size);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -296,7 +296,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
vTrace("message in fetch queue is processing");
|
vTrace("vgId:%d, msg:%p in fetch queue is processing", pVnode->config.vgId, pMsg);
|
||||||
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG ||
|
||||||
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
pMsg->msgType == TDMT_VND_BATCH_META) &&
|
||||||
!vnodeIsLeader(pVnode)) {
|
!vnodeIsLeader(pVnode)) {
|
||||||
|
|
|
@ -518,15 +518,15 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
||||||
rpcMsg.info.conn.applyTerm = cbMeta.term;
|
rpcMsg.info.conn.applyTerm = cbMeta.term;
|
||||||
|
|
||||||
vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
|
vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
|
||||||
", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
|
", weak:%d, code:%d, state:%d %s, type:%s",
|
||||||
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak,
|
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak,
|
||||||
cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||||
} else {
|
} else {
|
||||||
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
|
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
|
||||||
vError("vgId:%d, sync commit error, msgtype:%d,%s, index:%ld, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
|
vError("vgId:%d, commit-cb execute error, type:%s, index:%" PRId64 ", error:0x%x %s", syncGetVgId(pVnode->sync),
|
||||||
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code));
|
TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code));
|
||||||
if (rsp.info.handle != NULL) {
|
if (rsp.info.handle != NULL) {
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
}
|
}
|
||||||
|
@ -537,10 +537,9 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
|
||||||
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
if (cbMeta.isWeak == 1) {
|
if (cbMeta.isWeak == 1) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64
|
vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
|
||||||
", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
|
|
||||||
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
||||||
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
|
||||||
|
|
||||||
if (cbMeta.code == 0) {
|
if (cbMeta.code == 0) {
|
||||||
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
|
||||||
|
@ -552,8 +551,8 @@ static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMet
|
||||||
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
|
||||||
} else {
|
} else {
|
||||||
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
|
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
|
||||||
vError("vgId:%d, sync pre-commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
|
vError("vgId:%d, pre-commit-cb execute error, type:%s, error:0x%x %s", syncGetVgId(pVnode->sync),
|
||||||
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
|
TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
|
||||||
if (rsp.info.handle != NULL) {
|
if (rsp.info.handle != NULL) {
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
}
|
}
|
||||||
|
@ -563,9 +562,9 @@ static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMet
|
||||||
|
|
||||||
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
|
vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s",
|
||||||
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
|
||||||
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
|
syncUtilState2String(cbMeta.state), TMSG_INFO(pMsg->msgType));
|
||||||
}
|
}
|
||||||
|
|
||||||
#define USE_TSDB_SNAPSHOT
|
#define USE_TSDB_SNAPSHOT
|
||||||
|
|
|
@ -198,7 +198,7 @@ TFileReader* tfileReaderCreate(IFileCtx* ctx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 != tfileReaderLoadFst(reader)) {
|
if (0 != tfileReaderLoadFst(reader)) {
|
||||||
indexError("failed to load index fst, suid: %" PRIu64 ", colName: %s, errno: %d", reader->header.suid,
|
indexError("failed to load index fst, suid:%" PRIu64 ", colName:%s, code:0x%x", reader->header.suid,
|
||||||
reader->header.colName, errno);
|
reader->header.colName, errno);
|
||||||
tfileReaderDestroy(reader);
|
tfileReaderDestroy(reader);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -874,7 +874,7 @@ static int tfileReaderLoadHeader(TFileReader* reader) {
|
||||||
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
|
int64_t nread = reader->ctx->readFrom(reader->ctx, buf, sizeof(buf), 0);
|
||||||
|
|
||||||
if (nread == -1) {
|
if (nread == -1) {
|
||||||
indexError("actual Read: %d, to read: %d, errno: %d, filename: %s", (int)(nread), (int)sizeof(buf), errno,
|
indexError("actual Read: %d, to read: %d, code:0x%x, filename: %s", (int)(nread), (int)sizeof(buf), errno,
|
||||||
reader->ctx->file.buf);
|
reader->ctx->file.buf);
|
||||||
} else {
|
} else {
|
||||||
indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), reader->ctx->file.buf);
|
indexInfo("actual Read: %d, to read: %d, filename: %s", (int)(nread), (int)sizeof(buf), reader->ctx->file.buf);
|
||||||
|
|
|
@ -94,7 +94,7 @@ int64_t syncOpen(const SSyncInfo* pSyncInfo) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
sDebug("vgId:%d, rid:%" PRId64 " is added to rsetId:%" PRId64, pSyncInfo->vgId, pSyncNode->rid, tsNodeRefId);
|
sDebug("vgId:%d, sync rid:%" PRId64 " is added to rsetId:%" PRId64, pSyncInfo->vgId, pSyncNode->rid, tsNodeRefId);
|
||||||
return pSyncNode->rid;
|
return pSyncNode->rid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -142,7 +142,7 @@ void syncStop(int64_t rid) {
|
||||||
|
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
taosRemoveRef(tsNodeRefId, rid);
|
taosRemoveRef(tsNodeRefId, rid);
|
||||||
sDebug("vgId:%d, rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId);
|
sDebug("vgId:%d, sync rid:%" PRId64 " is removed from rsetId:%" PRId64, vgId, rid, tsNodeRefId);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncSetStandby(int64_t rid) {
|
int32_t syncSetStandby(int64_t rid) {
|
||||||
|
@ -730,8 +730,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIs
|
||||||
for (int i = 0; i < arrSize; ++i) {
|
for (int i = 0; i < arrSize; ++i) {
|
||||||
do {
|
do {
|
||||||
char eventLog[128];
|
char eventLog[128];
|
||||||
snprintf(eventLog, sizeof(eventLog), "propose type:%s,%d, batch:%d", TMSG_INFO(pMsgPArr[i]->msgType),
|
snprintf(eventLog, sizeof(eventLog), "propose type:%s, batch:%d", TMSG_INFO(pMsgPArr[i]->msgType), arrSize);
|
||||||
pMsgPArr[i]->msgType, arrSize);
|
|
||||||
syncNodeEventLog(pSyncNode, eventLog);
|
syncNodeEventLog(pSyncNode, eventLog);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
@ -791,7 +790,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||||
|
|
||||||
do {
|
do {
|
||||||
char eventLog[128];
|
char eventLog[128];
|
||||||
snprintf(eventLog, sizeof(eventLog), "propose type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
snprintf(eventLog, sizeof(eventLog), "propose type:%s", TMSG_INFO(pMsg->msgType));
|
||||||
syncNodeEventLog(pSyncNode, eventLog);
|
syncNodeEventLog(pSyncNode, eventLog);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
@ -799,7 +798,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||||
if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
|
if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
|
||||||
ret = -1;
|
ret = -1;
|
||||||
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
|
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
|
||||||
sError("vgId:%d, sync propose not ready, type:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
sError("vgId:%d, failed to sync propose since not ready, type:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
|
||||||
goto _END;
|
goto _END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -808,8 +807,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||||
if (!syncNodeCanChange(pSyncNode)) {
|
if (!syncNodeCanChange(pSyncNode)) {
|
||||||
ret = -1;
|
ret = -1;
|
||||||
terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY;
|
terrno = TSDB_CODE_SYN_RECONFIG_NOT_READY;
|
||||||
sError("vgId:%d, sync reconfig not ready, type:%s,%d", pSyncNode->vgId, TMSG_INFO(pMsg->msgType),
|
sError("vgId:%d, failed to sync reconfig since not ready, type:%s", pSyncNode->vgId, TMSG_INFO(pMsg->msgType));
|
||||||
pMsg->msgType);
|
|
||||||
goto _END;
|
goto _END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -836,13 +834,12 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||||
rpcFreeCont(rpcMsg.pCont);
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
|
||||||
ret = 1;
|
ret = 1;
|
||||||
sDebug("vgId:%d, optimized index:%" PRId64 " success, msgtype:%s,%d", pSyncNode->vgId, retIndex,
|
sDebug("vgId:%d, sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType));
|
||||||
TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
|
||||||
} else {
|
} else {
|
||||||
ret = -1;
|
ret = -1;
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
sError("vgId:%d, optimized index:%" PRId64 " error, msgtype:%s,%d", pSyncNode->vgId, retIndex,
|
sError("vgId:%d, failed to sync optimize index:%" PRId64 ", type:%s", pSyncNode->vgId, retIndex,
|
||||||
TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
TMSG_INFO(pMsg->msgType));
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
@ -851,7 +848,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||||
} else {
|
} else {
|
||||||
ret = -1;
|
ret = -1;
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
sError("vgId:%d, enqueue msg error, FpEqMsg is NULL", pSyncNode->vgId);
|
sError("vgId:%d, failed to enqueue msg since its null", pSyncNode->vgId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -861,8 +858,8 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||||
} else {
|
} else {
|
||||||
ret = -1;
|
ret = -1;
|
||||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||||
sError("vgId:%d, sync propose not leader, %s, msgtype:%s,%d", pSyncNode->vgId,
|
sError("vgId:%d, sync propose not leader, %s, type:%s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state),
|
||||||
syncUtilState2String(pSyncNode->state), TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
TMSG_INFO(pMsg->msgType));
|
||||||
goto _END;
|
goto _END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -887,7 +884,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);
|
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
|
||||||
if (!taosCheckExistFile(pSyncNode->configPath)) {
|
if (!taosCheckExistFile(pSyncNode->configPath)) {
|
||||||
// create a new raft config file
|
// create a new raft config file
|
||||||
SRaftCfgMeta meta;
|
SRaftCfgMeta meta;
|
||||||
|
@ -910,8 +907,9 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
||||||
// init by SSyncInfo
|
// init by SSyncInfo
|
||||||
pSyncNode->vgId = pSyncInfo->vgId;
|
pSyncNode->vgId = pSyncInfo->vgId;
|
||||||
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
|
memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path));
|
||||||
snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path);
|
snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s%sraft_store.json", pSyncInfo->path,
|
||||||
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);
|
TD_DIRSEP);
|
||||||
|
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s%sraft_config.json", pSyncInfo->path, TD_DIRSEP);
|
||||||
|
|
||||||
pSyncNode->pWal = pSyncInfo->pWal;
|
pSyncNode->pWal = pSyncInfo->pWal;
|
||||||
pSyncNode->msgcb = pSyncInfo->msgcb;
|
pSyncNode->msgcb = pSyncInfo->msgcb;
|
||||||
|
@ -2764,7 +2762,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
||||||
ESyncState state = flag;
|
ESyncState state = flag;
|
||||||
|
|
||||||
char eventLog[128];
|
char eventLog[128];
|
||||||
snprintf(eventLog, sizeof(eventLog), "commit by wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
|
snprintf(eventLog, sizeof(eventLog), "commit wal from index:%" PRId64 " to index:%" PRId64, beginIndex, endIndex);
|
||||||
syncNodeEventLog(ths, eventLog);
|
syncNodeEventLog(ths, eventLog);
|
||||||
|
|
||||||
// execute fsm
|
// execute fsm
|
||||||
|
@ -2782,13 +2780,13 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
|
||||||
// user commit
|
// user commit
|
||||||
if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
|
if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
|
||||||
bool internalExecute = true;
|
bool internalExecute = true;
|
||||||
if ((ths->replicaNum == 1) && ths->restoreFinish && (ths->vgId != 1)) {
|
if ((ths->replicaNum == 1) && ths->restoreFinish && ths->vgId != 1) {
|
||||||
internalExecute = false;
|
internalExecute = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
do {
|
do {
|
||||||
char logBuf[128];
|
char logBuf[128];
|
||||||
snprintf(logBuf, sizeof(logBuf), "index:%" PRId64 ", internalExecute:%d", i, internalExecute);
|
snprintf(logBuf, sizeof(logBuf), "commit index:%" PRId64 ", internal:%d", i, internalExecute);
|
||||||
syncNodeEventLog(ths, logBuf);
|
syncNodeEventLog(ths, logBuf);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
|
|
@ -229,8 +229,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
||||||
|
|
||||||
do {
|
do {
|
||||||
char eventLog[128];
|
char eventLog[128];
|
||||||
snprintf(eventLog, sizeof(eventLog), "write index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index,
|
snprintf(eventLog, sizeof(eventLog), "write index:%" PRId64 ", type:%s, origin type:%s", pEntry->index,
|
||||||
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
|
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType));
|
||||||
syncNodeEventLog(pData->pSyncNode, eventLog);
|
syncNodeEventLog(pData->pSyncNode, eventLog);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
@ -468,8 +468,8 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||||
|
|
||||||
do {
|
do {
|
||||||
char eventLog[128];
|
char eventLog[128];
|
||||||
snprintf(eventLog, sizeof(eventLog), "write2 index:%" PRId64 ", type:%s,%d, type2:%s,%d", pEntry->index,
|
snprintf(eventLog, sizeof(eventLog), "write2 index:%" PRId64 ", type:%s, origin type:%s", pEntry->index,
|
||||||
TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType);
|
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType));
|
||||||
syncNodeEventLog(pData->pSyncNode, eventLog);
|
syncNodeEventLog(pData->pSyncNode, eventLog);
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
|
|
@ -50,9 +50,8 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
|
||||||
|
|
||||||
SSyncNode *pSyncNode = pObj->data;
|
SSyncNode *pSyncNode = pObj->data;
|
||||||
char eventLog[128];
|
char eventLog[128];
|
||||||
snprintf(eventLog, sizeof(eventLog), "resp mgr add, type:%s,%d, seq:%" PRIu64 ", handle:%p, ahandle:%p",
|
snprintf(eventLog, sizeof(eventLog), "save response handle, type:%s, seq:%" PRIu64 ", handle:%p, ahandle:%p",
|
||||||
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle,
|
TMSG_INFO(pStub->rpcMsg.msgType), keyCode, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
|
||||||
pStub->rpcMsg.info.ahandle);
|
|
||||||
syncNodeEventLog(pSyncNode, eventLog);
|
syncNodeEventLog(pSyncNode, eventLog);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
taosThreadMutexUnlock(&(pObj->mutex));
|
||||||
|
@ -77,9 +76,8 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
|
||||||
|
|
||||||
SSyncNode *pSyncNode = pObj->data;
|
SSyncNode *pSyncNode = pObj->data;
|
||||||
char eventLog[128];
|
char eventLog[128];
|
||||||
snprintf(eventLog, sizeof(eventLog), "resp mgr get, type:%s,%d, seq:%" PRIu64 ", handle:%p, ahandle:%p",
|
snprintf(eventLog, sizeof(eventLog), "get response handle, type:%s, seq:%" PRIu64 ", handle:%p, ahandle:%p",
|
||||||
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
|
TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
|
||||||
pStub->rpcMsg.info.ahandle);
|
|
||||||
syncNodeEventLog(pSyncNode, eventLog);
|
syncNodeEventLog(pSyncNode, eventLog);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&(pObj->mutex));
|
taosThreadMutexUnlock(&(pObj->mutex));
|
||||||
|
@ -98,9 +96,8 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
|
||||||
|
|
||||||
SSyncNode *pSyncNode = pObj->data;
|
SSyncNode *pSyncNode = pObj->data;
|
||||||
char eventLog[128];
|
char eventLog[128];
|
||||||
snprintf(eventLog, sizeof(eventLog), "resp mgr get-and-del, type:%s,%d, seq:%" PRIu64 ", handle:%p, ahandle:%p",
|
snprintf(eventLog, sizeof(eventLog), "get-and-del response handle, type:%s, seq:%" PRIu64 ", handle:%p, ahandle:%p",
|
||||||
TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, pStub->rpcMsg.info.handle,
|
TMSG_INFO(pStub->rpcMsg.msgType), index, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle);
|
||||||
pStub->rpcMsg.info.ahandle);
|
|
||||||
syncNodeEventLog(pSyncNode, eventLog);
|
syncNodeEventLog(pSyncNode, eventLog);
|
||||||
|
|
||||||
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
|
taosHashRemove(pObj->pRespHash, &index, sizeof(index));
|
||||||
|
|
|
@ -93,7 +93,7 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) {
|
||||||
// // We specified a non-zero wait. Time must advance.
|
// // We specified a non-zero wait. Time must advance.
|
||||||
// if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime)
|
// if (ft_before.dwLowDateTime == ft_after.dwLowDateTime && ft_before.dwHighDateTime == ft_after.dwHighDateTime)
|
||||||
// {
|
// {
|
||||||
// printf("nanoseconds: %d, rc: %d, errno: %d. before filetime: %d, %d; after filetime: %d, %d\n",
|
// printf("nanoseconds: %d, rc: %d, code:0x%x. before filetime: %d, %d; after filetime: %d, %d\n",
|
||||||
// nanosecs, rc, errno,
|
// nanosecs, rc, errno,
|
||||||
// (int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime,
|
// (int)ft_before.dwLowDateTime, (int)ft_before.dwHighDateTime,
|
||||||
// (int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime);
|
// (int)ft_after.dwLowDateTime, (int)ft_after.dwHighDateTime);
|
||||||
|
|
|
@ -0,0 +1,47 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/deploy.sh -n dnode2 -i 2
|
||||||
|
system sh/cfg.sh -n dnode1 -c supportVnodes -v 0
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
system sh/exec.sh -n dnode2 -s start
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
print =============== step1: create dnodes
|
||||||
|
sql create dnode $hostname port 7200
|
||||||
|
|
||||||
|
$x = 0
|
||||||
|
step1:
|
||||||
|
$x = $x + 1
|
||||||
|
sleep 1000
|
||||||
|
if $x == 10 then
|
||||||
|
print ====> dnode not ready!
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql show dnodes
|
||||||
|
print ===> rows: $rows
|
||||||
|
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||||
|
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
if $data(1)[4] != ready then
|
||||||
|
goto step1
|
||||||
|
endi
|
||||||
|
if $data(2)[4] != ready then
|
||||||
|
goto step1
|
||||||
|
endi
|
||||||
|
|
||||||
|
print =============== step2: create database
|
||||||
|
sql create database db vgroups 1 replica 1
|
||||||
|
sql show databases
|
||||||
|
if $rows != 3 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
sql use db;
|
||||||
|
sql create table stb (ts timestamp, c int) tags (t int);
|
||||||
|
sql create table t0 using stb tags (0);
|
||||||
|
sql insert into t0 values(now, 1);
|
||||||
|
sql insert into t0 values(now+1s, 1);
|
||||||
|
|
||||||
|
return
|
Loading…
Reference in New Issue