Merge branch 'develop' into feature/TD-1925_new
This commit is contained in:
commit
95d31db2a1
|
@ -237,21 +237,21 @@ static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
|
|||
bool isReady = false;
|
||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
SVnodeGid *pVnode = pVgroup->vnodeGid + i;
|
||||
SDnodeObj *pDnode = pVnode->pDnode;
|
||||
if (pVnode == pRmVnode) continue;
|
||||
int32_t vver = mnodeGetVgidVer(pVnode->vver);
|
||||
|
||||
mTrace("vgId:%d, check vgroup status, vindex:%d dnode:%d status:%s role:%s vver:%d, rmvver:%d" , pVgroup->vgId, i,
|
||||
pVnode->dnodeId, dnodeStatus[pVnode->pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer);
|
||||
if (pVnode->pDnode->status == TAOS_DN_STATUS_DROPPING) continue;
|
||||
if (pVnode->pDnode->status == TAOS_DN_STATUS_OFFLINE) continue;
|
||||
mTrace("vgId:%d, check vgroup status, vindex:%d dnode:%d status:%s role:%s vver:%d, rmvver:%d", pVgroup->vgId, i,
|
||||
pVnode->dnodeId, dnodeStatus[pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer);
|
||||
if (pDnode->status == TAOS_DN_STATUS_DROPPING) continue;
|
||||
if (pDnode->status == TAOS_DN_STATUS_OFFLINE) continue;
|
||||
if (pVnode->role != TAOS_SYNC_ROLE_SLAVE && pVnode->role != TAOS_SYNC_ROLE_MASTER) continue;
|
||||
|
||||
if (rmVnodeVer == 0 || vver >= rmVnodeVer) {
|
||||
mInfo("vgId:%d, is ready for vindex:%d in dnode:%d status:%s role:%s vver:%d larger than rmvver:%d", pVgroup->vgId, i,
|
||||
pVnode->dnodeId, dnodeStatus[pVnode->pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer);
|
||||
mInfo("vgId:%d, is ready for vindex:%d in dnode:%d status:%s role:%s vver:%d larger than rmvver:%d",
|
||||
pVgroup->vgId, i, pVnode->dnodeId, dnodeStatus[pDnode->status], syncRole[pVnode->role], vver, rmVnodeVer);
|
||||
isReady = true;
|
||||
}
|
||||
|
||||
isReady = true;
|
||||
}
|
||||
|
||||
return isReady;
|
||||
|
|
|
@ -101,8 +101,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
|
|||
return syncCode;
|
||||
}
|
||||
|
||||
static int32_t vnodeCheckWrite(void *vparam) {
|
||||
SVnodeObj *pVnode = vparam;
|
||||
static int32_t vnodeCheckWrite(SVnodeObj *pVnode) {
|
||||
if (!(pVnode->accessState & TSDB_VN_WRITE_ACCCESS)) {
|
||||
vDebug("vgId:%d, no write auth, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
|
||||
return TSDB_CODE_VND_NO_WRITE_AUTH;
|
||||
|
@ -216,29 +215,21 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
|
||||
SVnodeObj *pVnode = vparam;
|
||||
SWalHead * pHead = wparam;
|
||||
int32_t code = 0;
|
||||
|
||||
if (qtype == TAOS_QTYPE_RPC) {
|
||||
code = vnodeCheckWrite(pVnode);
|
||||
if (code != TSDB_CODE_SUCCESS) return code;
|
||||
}
|
||||
|
||||
static SVWriteMsg *vnodeBuildVWriteMsg(SVnodeObj *pVnode, SWalHead *pHead, int32_t qtype, SRpcMsg *pRpcMsg) {
|
||||
if (pHead->len > TSDB_MAX_WAL_SIZE) {
|
||||
vError("vgId:%d, wal len:%d exceeds limit, hver:%" PRIu64, pVnode->vgId, pHead->len, pHead->version);
|
||||
return TSDB_CODE_WAL_SIZE_LIMIT;
|
||||
terrno = TSDB_CODE_WAL_SIZE_LIMIT;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t size = sizeof(SVWriteMsg) + sizeof(SWalHead) + pHead->len;
|
||||
SVWriteMsg *pWrite = taosAllocateQitem(size);
|
||||
if (pWrite == NULL) {
|
||||
return TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
terrno = TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (rparam != NULL) {
|
||||
SRpcMsg *pRpcMsg = rparam;
|
||||
if (pRpcMsg != NULL) {
|
||||
pWrite->rpcMsg = *pRpcMsg;
|
||||
}
|
||||
|
||||
|
@ -248,6 +239,21 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
|
|||
|
||||
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||
|
||||
return pWrite;
|
||||
}
|
||||
|
||||
static int32_t vnodeWriteToWQueueImp(SVWriteMsg *pWrite) {
|
||||
SVnodeObj *pVnode = pWrite->pVnode;
|
||||
|
||||
if (pWrite->qtype == TAOS_QTYPE_RPC) {
|
||||
int32_t code = vnodeCheckWrite(pVnode);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosFreeQitem(pWrite);
|
||||
vnodeRelease(pVnode);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t queued = atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
|
||||
if (queued > MAX_QUEUED_MSG_NUM) {
|
||||
int32_t ms = (queued / MAX_QUEUED_MSG_NUM) * 10 + 3;
|
||||
|
@ -256,15 +262,25 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
|
|||
taosMsleep(ms);
|
||||
}
|
||||
|
||||
code = vnodePerformFlowCtrl(pWrite);
|
||||
if (code != 0) return 0;
|
||||
|
||||
vTrace("vgId:%d, write into vwqueue, refCount:%d queued:%d", pVnode->vgId, pVnode->refCount, pVnode->queuedWMsg);
|
||||
|
||||
taosWriteQitem(pVnode->wqueue, qtype, pWrite);
|
||||
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rparam) {
|
||||
SVWriteMsg *pWrite = vnodeBuildVWriteMsg(vparam, wparam, qtype, rparam);
|
||||
if (pWrite == NULL) {
|
||||
assert(terrno != 0);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
int32_t code = vnodePerformFlowCtrl(pWrite);
|
||||
if (code != 0) return 0;
|
||||
|
||||
return vnodeWriteToWQueueImp(pWrite);
|
||||
}
|
||||
|
||||
void vnodeFreeFromWQueue(void *vparam, SVWriteMsg *pWrite) {
|
||||
SVnodeObj *pVnode = vparam;
|
||||
|
||||
|
@ -294,7 +310,10 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
|
|||
vDebug("vgId:%d, msg:%p, write into vwqueue after flowctrl, retry:%d", pVnode->vgId, pWrite,
|
||||
pWrite->processedCount);
|
||||
pWrite->processedCount = 0;
|
||||
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
|
||||
code = vnodeWriteToWQueueImp(pWrite);
|
||||
if (code != 0) {
|
||||
dnodeSendRpcVWriteRsp(pWrite->pVnode, pWrite, code);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -318,4 +337,4 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
|
|||
pWrite->processedCount);
|
||||
return TSDB_CODE_VND_ACTION_IN_PROGRESS;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue