forward to peer only the data packet is from client or CQ
This commit is contained in:
parent
d7e03ac0e2
commit
3366f4af3e
|
@ -405,9 +405,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
|
||||||
|
|
||||||
static void vnodeNotifyFileSynced(void *ahandle) {
|
static void vnodeNotifyFileSynced(void *ahandle) {
|
||||||
SVnodeObj *pVnode = ahandle;
|
SVnodeObj *pVnode = ahandle;
|
||||||
vTrace("pVnode:%p vgId:%d, data file is synced", pVnode, pVnode->vgId);
|
vTrace("vgId:%d, data file is synced", pVnode->vgId);
|
||||||
|
|
||||||
// clsoe tsdb, then open tsdb
|
// close tsdb, then open tsdb
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
|
|
|
@ -65,7 +65,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
|
||||||
pRet->len = sizeof(SQueryTableRsp);
|
pRet->len = sizeof(SQueryTableRsp);
|
||||||
pRet->rsp = pRsp;
|
pRet->rsp = pRsp;
|
||||||
|
|
||||||
vTrace("vgId:%d QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
|
vTrace("vgId:%d, QInfo:%p, dnode query msg disposed", pVnode->vgId, pQInfo);
|
||||||
} else {
|
} else {
|
||||||
pQInfo = pCont;
|
pQInfo = pCont;
|
||||||
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
code = TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
|
@ -83,7 +83,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
vTrace("vgId:%d QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo);
|
vTrace("vgId:%d, QInfo:%p, retrieve msg is received", pVnode->vgId, pQInfo);
|
||||||
|
|
||||||
pRet->code = qRetrieveQueryResultInfo(pQInfo);
|
pRet->code = qRetrieveQueryResultInfo(pQInfo);
|
||||||
if (pRet->code != TSDB_CODE_SUCCESS) {
|
if (pRet->code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -104,6 +104,6 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, void *pCont, int32_t cont
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
vTrace("vgId:%d QInfo:%p, retrieve msg is disposed", pVnode->vgId, pQInfo);
|
vTrace("vgId:%d, QInfo:%p, retrieve msg is disposed", pVnode->vgId, pQInfo);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -51,10 +51,7 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
||||||
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL)
|
if (vnodeProcessWriteMsgFp[pHead->msgType] == NULL)
|
||||||
return TSDB_CODE_MSG_NOT_PROCESSED;
|
return TSDB_CODE_MSG_NOT_PROCESSED;
|
||||||
|
|
||||||
if (pVnode->status != TAOS_VN_STATUS_READY && qtype == TAOS_QTYPE_RPC)
|
if (pHead->version == 0) { // from client or CQ
|
||||||
return TSDB_CODE_NOT_ACTIVE_VNODE;
|
|
||||||
|
|
||||||
if (pHead->version == 0) { // from client
|
|
||||||
if (pVnode->status != TAOS_VN_STATUS_READY)
|
if (pVnode->status != TAOS_VN_STATUS_READY)
|
||||||
return TSDB_CODE_NOT_ACTIVE_VNODE;
|
return TSDB_CODE_NOT_ACTIVE_VNODE;
|
||||||
|
|
||||||
|
@ -64,12 +61,10 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
||||||
// assign version
|
// assign version
|
||||||
pVnode->version++;
|
pVnode->version++;
|
||||||
pHead->version = pVnode->version;
|
pHead->version = pVnode->version;
|
||||||
} else {
|
} else { // from wal or forward
|
||||||
// for data from WAL or forward, version may be smaller
|
// for data from WAL or forward, version may be smaller
|
||||||
if (pHead->version <= pVnode->version) return 0;
|
if (pHead->version <= pVnode->version) return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// more status and role checking here
|
|
||||||
|
|
||||||
pVnode->version = pHead->version;
|
pVnode->version = pHead->version;
|
||||||
|
|
||||||
|
@ -77,9 +72,13 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
||||||
code = walWrite(pVnode->wal, pHead);
|
code = walWrite(pVnode->wal, pHead);
|
||||||
if (code < 0) return code;
|
if (code < 0) return code;
|
||||||
|
|
||||||
int32_t syncCode = syncForwardToPeer(pVnode->sync, pHead, item);
|
// forward to peers if data is from RPC or CQ
|
||||||
|
int32_t syncCode = 0;
|
||||||
|
if (qtype == TAOS_QTYPE_RPC || qtype == TAOS_QTYPE_CQ)
|
||||||
|
syncCode = syncForwardToPeer(pVnode->sync, pHead, item);
|
||||||
if (syncCode < 0) return syncCode;
|
if (syncCode < 0) return syncCode;
|
||||||
|
|
||||||
|
// write data locally
|
||||||
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
|
code = (*vnodeProcessWriteMsgFp[pHead->msgType])(pVnode, pHead->cont, item);
|
||||||
if (code < 0) return code;
|
if (code < 0) return code;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue