commit
91823d06f0
|
@ -1051,7 +1051,10 @@ static int32_t sdbWriteFwdToQueue(int32_t vgId, void *wparam, int32_t qtype, voi
|
||||||
memcpy(pRow->pHead, pHead, sizeof(SWalHead) + pHead->len);
|
memcpy(pRow->pHead, pHead, sizeof(SWalHead) + pHead->len);
|
||||||
pRow->rowData = pRow->pHead->cont;
|
pRow->rowData = pRow->pHead->cont;
|
||||||
|
|
||||||
return sdbWriteToQueue(pRow, qtype);
|
int32_t code = sdbWriteToQueue(pRow, qtype);
|
||||||
|
if (code == TSDB_CODE_MND_ACTION_IN_PROGRESS) code = 0;
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbWriteRowToQueue(SSdbRow *pInputRow, int32_t action) {
|
static int32_t sdbWriteRowToQueue(SSdbRow *pInputRow, int32_t action) {
|
||||||
|
|
|
@ -195,7 +195,11 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer, uint64_t *wver) {
|
||||||
}
|
}
|
||||||
lastVer = pHead->version;
|
lastVer = pHead->version;
|
||||||
|
|
||||||
(*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL);
|
ret = (*pNode->writeToCache)(pNode->vgId, pHead, TAOS_QTYPE_WAL, NULL);
|
||||||
|
if (ret != 0) {
|
||||||
|
sError("%s, failed to restore record since %s, hver:%" PRIu64, pPeer->id, tstrerror(ret), pHead->version);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
|
|
|
@ -37,6 +37,7 @@ bool vnodeSetResetStatus(SVnodeObj* pVnode);
|
||||||
|
|
||||||
bool vnodeInInitStatus(SVnodeObj* pVnode);
|
bool vnodeInInitStatus(SVnodeObj* pVnode);
|
||||||
bool vnodeInReadyStatus(SVnodeObj* pVnode);
|
bool vnodeInReadyStatus(SVnodeObj* pVnode);
|
||||||
|
bool vnodeInReadyOrUpdatingStatus(SVnodeObj* pVnode);
|
||||||
bool vnodeInClosingStatus(SVnodeObj* pVnode);
|
bool vnodeInClosingStatus(SVnodeObj* pVnode);
|
||||||
bool vnodeInResetStatus(SVnodeObj* pVnode);
|
bool vnodeInResetStatus(SVnodeObj* pVnode);
|
||||||
|
|
||||||
|
|
|
@ -135,6 +135,18 @@ bool vnodeInReadyStatus(SVnodeObj* pVnode) {
|
||||||
return in;
|
return in;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool vnodeInReadyOrUpdatingStatus(SVnodeObj* pVnode) {
|
||||||
|
bool in = false;
|
||||||
|
pthread_mutex_lock(&pVnode->statusMutex);
|
||||||
|
|
||||||
|
if (pVnode->status == TAOS_VN_STATUS_READY || pVnode->status == TAOS_VN_STATUS_UPDATING) {
|
||||||
|
in = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&pVnode->statusMutex);
|
||||||
|
return in;
|
||||||
|
}
|
||||||
|
|
||||||
bool vnodeInClosingStatus(SVnodeObj* pVnode) {
|
bool vnodeInClosingStatus(SVnodeObj* pVnode) {
|
||||||
bool in = false;
|
bool in = false;
|
||||||
pthread_mutex_lock(&pVnode->statusMutex);
|
pthread_mutex_lock(&pVnode->statusMutex);
|
||||||
|
|
|
@ -245,15 +245,16 @@ static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
|
||||||
if (pWrite->qtype == TAOS_QTYPE_RPC) {
|
if (pWrite->qtype == TAOS_QTYPE_RPC) {
|
||||||
int32_t code = vnodeCheckWrite(pVnode);
|
int32_t code = vnodeCheckWrite(pVnode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
vError("vgId:%d, failed to write into vwqueue since %s", pVnode->vgId, tstrerror(code));
|
||||||
taosFreeQitem(pWrite);
|
taosFreeQitem(pWrite);
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!vnodeInReadyStatus(pVnode)) {
|
if (!vnodeInReadyOrUpdatingStatus(pVnode)) {
|
||||||
vDebug("vgId:%d, vnode status is %s, refCount:%d pVnode:%p", pVnode->vgId, vnodeStatus[pVnode->status],
|
vError("vgId:%d, failed to write into vwqueue, vstatus is %s, refCount:%d pVnode:%p", pVnode->vgId,
|
||||||
pVnode->refCount, pVnode);
|
vnodeStatus[pVnode->status], pVnode->refCount, pVnode);
|
||||||
taosFreeQitem(pWrite);
|
taosFreeQitem(pWrite);
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
return TSDB_CODE_APP_NOT_READY;
|
return TSDB_CODE_APP_NOT_READY;
|
||||||
|
|
Loading…
Reference in New Issue