commit
3cfd246dbe
|
@ -352,7 +352,7 @@ void syncConfirmForward(int64_t rid, uint64_t version, int32_t code) {
|
||||||
int32_t retLen = write(pPeer->peerFd, msg, msgLen);
|
int32_t retLen = write(pPeer->peerFd, msg, msgLen);
|
||||||
|
|
||||||
if (retLen == msgLen) {
|
if (retLen == msgLen) {
|
||||||
sDebug("%s, forward-rsp is sent, ver:%" PRIu64, pPeer->id, version);
|
sDebug("%s, forward-rsp is sent, code:%x hver:%" PRIu64, pPeer->id, code, version);
|
||||||
} else {
|
} else {
|
||||||
sDebug("%s, failed to send forward ack, restart", pPeer->id);
|
sDebug("%s, failed to send forward ack, restart", pPeer->id);
|
||||||
syncRestartConnection(pPeer);
|
syncRestartConnection(pPeer);
|
||||||
|
@ -831,7 +831,7 @@ static void syncProcessFwdResponse(char *cont, SSyncPeer *pPeer) {
|
||||||
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
SSyncFwds *pSyncFwds = pNode->pSyncFwds;
|
||||||
SFwdInfo * pFwdInfo;
|
SFwdInfo * pFwdInfo;
|
||||||
|
|
||||||
sDebug("%s, forward-rsp is received, ver:%" PRIu64, pPeer->id, pFwdRsp->version);
|
sDebug("%s, forward-rsp is received, code:%x ver:%" PRIu64, pPeer->id, pFwdRsp->code, pFwdRsp->version);
|
||||||
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
|
SFwdInfo *pFirst = pSyncFwds->fwdInfo + pSyncFwds->first;
|
||||||
|
|
||||||
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
|
if (pFirst->version <= pFwdRsp->version && pSyncFwds->fwds > 0) {
|
||||||
|
@ -1125,10 +1125,9 @@ static void syncSaveFwdInfo(SSyncNode *pNode, uint64_t version, void *mhandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
|
SFwdInfo *pFwdInfo = pSyncFwds->fwdInfo + pSyncFwds->last;
|
||||||
|
memset(pFwdInfo, 0, sizeof(SFwdInfo));
|
||||||
pFwdInfo->version = version;
|
pFwdInfo->version = version;
|
||||||
pFwdInfo->mhandle = mhandle;
|
pFwdInfo->mhandle = mhandle;
|
||||||
pFwdInfo->acks = 0;
|
|
||||||
pFwdInfo->confirmed = 0;
|
|
||||||
pFwdInfo->time = time;
|
pFwdInfo->time = time;
|
||||||
|
|
||||||
pSyncFwds->fwds++;
|
pSyncFwds->fwds++;
|
||||||
|
|
|
@ -637,9 +637,8 @@ static int vnodeResetTsdb(SVnodeObj *pVnode) {
|
||||||
char rootDir[128] = "\0";
|
char rootDir[128] = "\0";
|
||||||
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
|
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
|
||||||
|
|
||||||
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) !=
|
if (pVnode->status != TAOS_VN_STATUS_CLOSING && pVnode->status != TAOS_VN_STATUS_INIT) {
|
||||||
TAOS_VN_STATUS_READY) {
|
pVnode->status = TAOS_VN_STATUS_RESET;
|
||||||
return -1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tsdb = pVnode->tsdb;
|
void *tsdb = pVnode->tsdb;
|
||||||
|
|
|
@ -56,19 +56,19 @@ int32_t vnodeProcessRead(void *param, SVReadMsg *pRead) {
|
||||||
static int32_t vnodeCheckRead(void *param) {
|
static int32_t vnodeCheckRead(void *param) {
|
||||||
SVnodeObj *pVnode = param;
|
SVnodeObj *pVnode = param;
|
||||||
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
||||||
vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
|
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
|
||||||
pVnode->refCount, pVnode);
|
pVnode->refCount, pVnode);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
// tsdb may be in reset state
|
// tsdb may be in reset state
|
||||||
if (pVnode->tsdb == NULL) {
|
if (pVnode->tsdb == NULL) {
|
||||||
vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||||
vDebug("vgId:%d, replica:%d role:%s, recCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica,
|
vDebug("vgId:%d, replica:%d role:%s, refCount:%d pVnode:%p", pVnode->vgId, pVnode->syncCfg.replica,
|
||||||
syncRole[pVnode->role], pVnode->refCount, pVnode);
|
syncRole[pVnode->role], pVnode->refCount, pVnode);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,18 +103,18 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
|
||||||
static int32_t vnodeCheckWrite(void *param) {
|
static int32_t vnodeCheckWrite(void *param) {
|
||||||
SVnodeObj *pVnode = param;
|
SVnodeObj *pVnode = param;
|
||||||
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
|
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
|
||||||
vDebug("vgId:%d, no write auth, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||||
return TSDB_CODE_VND_NO_WRITE_AUTH;
|
return TSDB_CODE_VND_NO_WRITE_AUTH;
|
||||||
}
|
}
|
||||||
|
|
||||||
// tsdb may be in reset state
|
// tsdb may be in reset state
|
||||||
if (pVnode->tsdb == NULL) {
|
if (pVnode->tsdb == NULL) {
|
||||||
vDebug("vgId:%d, tsdb is null, recCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
vDebug("vgId:%d, tsdb is null, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
|
if (pVnode->status == TAOS_VN_STATUS_CLOSING) {
|
||||||
vDebug("vgId:%d, vnode status is %s, recCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
|
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
|
||||||
pVnode->refCount, pVnode);
|
pVnode->refCount, pVnode);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue