fix:[TD-30365] ci case error & drop topic error if vnode is splitted

This commit is contained in:
wangmm0220 2024-06-06 11:15:23 +08:00
parent 48ecba95da
commit 697b81aa8a
3 changed files with 9 additions and 9 deletions

View File

@ -133,7 +133,7 @@ int32_t tqMetaSaveCheckInfo(STQ* pTq, const char* key, const void* value, int32_
int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key); int32_t tqMetaDeleteCheckInfo(STQ* pTq, const char* key);
int32_t tqMetaRestoreCheckInfo(STQ* pTq); int32_t tqMetaRestoreCheckInfo(STQ* pTq);
int32_t tqMetaGetHandle(STQ* pTq, const char* key); int32_t tqMetaGetHandle(STQ* pTq, const char* key);
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle); int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t snapshotVer);
STqOffsetStore* tqOffsetOpen(STQ* pTq); STqOffsetStore* tqOffsetOpen(STQ* pTq);
int32_t tqMetaTransform(STQ* pTq); int32_t tqMetaTransform(STQ* pTq);

View File

@ -50,6 +50,9 @@ void tqDestroyTqHandle(void* data) {
if (pData->block != NULL) { if (pData->block != NULL) {
blockDataDestroy(pData->block); blockDataDestroy(pData->block);
} }
if (pData->pRef) {
walCloseRef(pData->pRef->pWal, pData->pRef->refId);
}
} }
static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) { static bool tqOffsetEqual(const STqOffset* pLeft, const STqOffset* pRight) {
@ -571,9 +574,6 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
taosMsleep(10); taosMsleep(10);
continue; continue;
} }
if (pHandle->pRef) {
walCloseRef(pTq->pVnode->pWal, pHandle->pRef->refId);
}
tqUnregisterPushHandle(pTq, pHandle); tqUnregisterPushHandle(pTq, pHandle);
@ -660,7 +660,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
goto end; goto end;
} }
STqHandle handle = {0}; STqHandle handle = {0};
ret = tqCreateHandle(pTq, &req, &handle); ret = tqCreateHandle(pTq, &req, &handle, walGetCommittedVer(pTq->pVnode->pWal));
if (ret < 0) { if (ret < 0) {
tqDestroyTqHandle(&handle); tqDestroyTqHandle(&handle);
goto end; goto end;
@ -689,7 +689,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
// update handle to avoid req->qmsg changed if spilt vnode is failed // update handle to avoid req->qmsg changed if spilt vnode is failed
STqHandle handle = {0}; STqHandle handle = {0};
ret = tqCreateHandle(pTq, &req, &handle); ret = tqCreateHandle(pTq, &req, &handle, pHandle->snapshotVer);
if (ret < 0) { if (ret < 0) {
tqDestroyTqHandle(&handle); tqDestroyTqHandle(&handle);
goto end; goto end;

View File

@ -346,7 +346,7 @@ end:
return code; return code;
} }
int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle, int64_t snapshotVer){
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN); memcpy(handle->subKey, req->subKey, TSDB_SUBSCRIBE_KEY_LEN);
@ -364,7 +364,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg); handle->execHandle.execTb.qmsg = taosStrdup(req->qmsg);
} }
handle->snapshotVer = walGetCommittedVer(pTq->pVnode->pWal); handle->snapshotVer = snapshotVer;
if(buildHandle(pTq, handle) < 0){ if(buildHandle(pTq, handle) < 0){
return -1; return -1;