fix(stream): adjust the free stream meta position and check the close flag of streamMeta before starting scan wal.

This commit is contained in:
Haojun Liao 2025-01-17 19:15:08 +08:00
parent 4075223206
commit 6274eea7c4
4 changed files with 35 additions and 10 deletions

View File

@ -75,12 +75,14 @@ int32_t tqOpen(const char* path, SVnode* pVnode) {
if (pTq == NULL) {
return terrno;
}
pVnode->pTq = pTq;
pTq->pVnode = pVnode;
pTq->path = taosStrdup(path);
if (pTq->path == NULL) {
return terrno;
}
pTq->pVnode = pVnode;
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
if (pTq->pHandle == NULL) {
@ -131,11 +133,19 @@ void tqClose(STQ* pTq) {
return;
}
int32_t vgId = 0;
if (pTq->pVnode != NULL) {
vgId = TD_VID(pTq->pVnode);
} else if (pTq->pStreamMeta != NULL) {
vgId = pTq->pStreamMeta->vgId;
}
// close the stream meta firstly
streamMetaClose(pTq->pStreamMeta);
void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
while (pIter) {
STqHandle* pHandle = *(STqHandle**)pIter;
int32_t vgId = TD_VID(pTq->pVnode);
if (pHandle->msg != NULL) {
tqPushEmptyDataRsp(pHandle, vgId);
rpcFreeCont(pHandle->msg->pCont);
@ -151,8 +161,8 @@ void tqClose(STQ* pTq) {
taosHashCleanup(pTq->pOffset);
taosMemoryFree(pTq->path);
tqMetaClose(pTq);
qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1);
streamMetaClose(pTq->pStreamMeta);
qDebug("vgId:%d end to close tq", vgId);
taosMemoryFree(pTq);
}

View File

@ -86,16 +86,31 @@ static void doStartScanWal(void* param, void* tmrId) {
return;
}
if (pMeta->closeFlag) {
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
if (code == TSDB_CODE_SUCCESS) {
tqDebug("vgId:%d jump out of scan wal timer since closed", vgId);
} else {
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
tstrerror(code));
}
taosMemoryFree(pParam);
return;
}
vgId = pMeta->vgId;
pTq = pMeta->ahandle;
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
pTq->pVnode->restored);
if (pTq->pVnode != NULL) {
code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
if (code) {
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
}
}
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
if (code) {
@ -330,13 +345,13 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
int32_t vgId = pStreamMeta->vgId;
SArray* pTaskList = NULL;
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
// clone the task list, to avoid the task update during scan wal files
SArray* pTaskList = NULL;
streamMetaWLock(pStreamMeta);
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
streamMetaWUnLock(pStreamMeta);

View File

@ -331,7 +331,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
} else {
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
}
// taosMemoryFree(param);
return;
}

View File

@ -576,6 +576,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
if (pMeta == NULL) {
return;
}
int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
if (code) {
stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));