fix: skip msg
This commit is contained in:
parent
292bf01902
commit
0d56da7508
|
@ -121,6 +121,7 @@ typedef struct {
|
||||||
struct STQ {
|
struct STQ {
|
||||||
SVnode* pVnode;
|
SVnode* pVnode;
|
||||||
char* path;
|
char* path;
|
||||||
|
int64_t walLogLastVer;
|
||||||
|
|
||||||
SRWLatch pushLock;
|
SRWLatch pushLock;
|
||||||
|
|
||||||
|
|
|
@ -201,6 +201,7 @@ int32_t tqProcessTaskRecover1Req(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t version, char* msg, int32_t msgLen);
|
||||||
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRecoverFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
int32_t tqProcessTaskRecoverFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
|
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
|
||||||
|
|
||||||
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema,
|
SSubmitReq* tqBlockToSubmit(SVnode* pVnode, const SArray* pBlocks, const STSchema* pSchema,
|
||||||
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
|
SSchemaWrapper* pTagSchemaWrapper, bool createTb, int64_t suid, const char* stbFullName,
|
||||||
|
|
|
@ -80,6 +80,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
||||||
}
|
}
|
||||||
pTq->path = strdup(path);
|
pTq->path = strdup(path);
|
||||||
pTq->pVnode = pVnode;
|
pTq->pVnode = pVnode;
|
||||||
|
pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
|
||||||
|
|
||||||
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
taosHashSetFreeFp(pTq->pHandle, destroySTqHandle);
|
taosHashSetFreeFp(pTq->pHandle, destroySTqHandle);
|
||||||
|
@ -1536,3 +1537,5 @@ FAIL:
|
||||||
taosFreeQitem(pMsg);
|
taosFreeQitem(pMsg);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqCheckLogInWal(STQ* pTq, int64_t version) { return version <= pTq->walLogLastVer; }
|
||||||
|
|
|
@ -197,6 +197,10 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp
|
||||||
|
|
||||||
if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;
|
if (!syncUtilUserCommit(pMsg->msgType)) goto _exit;
|
||||||
|
|
||||||
|
if (pMsg->msgType == TDMT_VND_STREAM_RECOVER_BLOCKING_STAGE) {
|
||||||
|
if (tqCheckLogInWal(pVnode->pTq, version)) return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// skip header
|
// skip header
|
||||||
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
len = pMsg->contLen - sizeof(SMsgHead);
|
len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
|
Loading…
Reference in New Issue