Merge pull request #29598 from taosdata/fix/disp_lost
fix(stream): adjust the free stream meta position and check the close flag
This commit is contained in:
commit
3b74de43cb
|
@ -255,6 +255,9 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||||
int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver);
|
int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver);
|
||||||
int32_t tqScanWal(STQ* pTq);
|
int32_t tqScanWal(STQ* pTq);
|
||||||
|
|
||||||
|
// injection error
|
||||||
|
void streamMetaFreeTQDuringScanWalError(STQ* pTq);
|
||||||
|
|
||||||
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd);
|
||||||
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
|
int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId);
|
||||||
// tq-mq
|
// tq-mq
|
||||||
|
|
|
@ -75,12 +75,14 @@ int32_t tqOpen(const char* path, SVnode* pVnode) {
|
||||||
if (pTq == NULL) {
|
if (pTq == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
pVnode->pTq = pTq;
|
pVnode->pTq = pTq;
|
||||||
|
pTq->pVnode = pVnode;
|
||||||
|
|
||||||
pTq->path = taosStrdup(path);
|
pTq->path = taosStrdup(path);
|
||||||
if (pTq->path == NULL) {
|
if (pTq->path == NULL) {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
pTq->pVnode = pVnode;
|
|
||||||
|
|
||||||
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
pTq->pHandle = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
|
||||||
if (pTq->pHandle == NULL) {
|
if (pTq->pHandle == NULL) {
|
||||||
|
@ -131,11 +133,19 @@ void tqClose(STQ* pTq) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vgId = 0;
|
||||||
|
if (pTq->pVnode != NULL) {
|
||||||
|
vgId = TD_VID(pTq->pVnode);
|
||||||
|
} else if (pTq->pStreamMeta != NULL) {
|
||||||
|
vgId = pTq->pStreamMeta->vgId;
|
||||||
|
}
|
||||||
|
|
||||||
|
// close the stream meta firstly
|
||||||
|
streamMetaClose(pTq->pStreamMeta);
|
||||||
|
|
||||||
void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
|
void* pIter = taosHashIterate(pTq->pPushMgr, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
STqHandle* pHandle = *(STqHandle**)pIter;
|
STqHandle* pHandle = *(STqHandle**)pIter;
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
|
||||||
|
|
||||||
if (pHandle->msg != NULL) {
|
if (pHandle->msg != NULL) {
|
||||||
tqPushEmptyDataRsp(pHandle, vgId);
|
tqPushEmptyDataRsp(pHandle, vgId);
|
||||||
rpcFreeCont(pHandle->msg->pCont);
|
rpcFreeCont(pHandle->msg->pCont);
|
||||||
|
@ -151,8 +161,12 @@ void tqClose(STQ* pTq) {
|
||||||
taosHashCleanup(pTq->pOffset);
|
taosHashCleanup(pTq->pOffset);
|
||||||
taosMemoryFree(pTq->path);
|
taosMemoryFree(pTq->path);
|
||||||
tqMetaClose(pTq);
|
tqMetaClose(pTq);
|
||||||
qDebug("vgId:%d end to close tq", pTq->pStreamMeta != NULL ? pTq->pStreamMeta->vgId : -1);
|
qDebug("vgId:%d end to close tq", vgId);
|
||||||
streamMetaClose(pTq->pStreamMeta);
|
|
||||||
|
#if 0
|
||||||
|
streamMetaFreeTQDuringScanWalError(pTq);
|
||||||
|
#endif
|
||||||
|
|
||||||
taosMemoryFree(pTq);
|
taosMemoryFree(pTq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -22,6 +22,8 @@
|
||||||
typedef struct SBuildScanWalMsgParam {
|
typedef struct SBuildScanWalMsgParam {
|
||||||
int64_t metaId;
|
int64_t metaId;
|
||||||
int32_t numOfTasks;
|
int32_t numOfTasks;
|
||||||
|
int8_t restored;
|
||||||
|
SMsgCb msgCb;
|
||||||
} SBuildScanWalMsgParam;
|
} SBuildScanWalMsgParam;
|
||||||
|
|
||||||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta);
|
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta);
|
||||||
|
@ -74,7 +76,6 @@ int32_t tqScanWal(STQ* pTq) {
|
||||||
|
|
||||||
static void doStartScanWal(void* param, void* tmrId) {
|
static void doStartScanWal(void* param, void* tmrId) {
|
||||||
int32_t vgId = 0;
|
int32_t vgId = 0;
|
||||||
STQ* pTq = NULL;
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
|
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
|
||||||
|
@ -86,13 +87,29 @@ static void doStartScanWal(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pMeta->closeFlag) {
|
||||||
|
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
tqDebug("vgId:%d jump out of scan wal timer since closed", vgId);
|
||||||
|
} else {
|
||||||
|
tqError("vgId:%d failed to release ref for streamMeta, rid:%" PRId64 " code:%s", vgId, pParam->metaId,
|
||||||
|
tstrerror(code));
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pParam);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
vgId = pMeta->vgId;
|
vgId = pMeta->vgId;
|
||||||
pTq = pMeta->ahandle;
|
|
||||||
|
|
||||||
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
|
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
|
||||||
pTq->pVnode->restored);
|
pParam->restored);
|
||||||
|
#if 0
|
||||||
|
// wait for the vnode is freed, and invalid read may occur.
|
||||||
|
taosMsleep(10000);
|
||||||
|
#endif
|
||||||
|
|
||||||
code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||||
if (code) {
|
if (code) {
|
||||||
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
|
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
@ -120,6 +137,8 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
|
||||||
|
|
||||||
pParam->metaId = pMeta->rid;
|
pParam->metaId = pMeta->rid;
|
||||||
pParam->numOfTasks = numOfTasks;
|
pParam->numOfTasks = numOfTasks;
|
||||||
|
pParam->restored = pTq->pVnode->restored;
|
||||||
|
pParam->msgCb = pTq->pVnode->msgCb;
|
||||||
|
|
||||||
code = streamTimerGetInstance(&pTimer);
|
code = streamTimerGetInstance(&pTimer);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
@ -330,13 +349,13 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt
|
||||||
|
|
||||||
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
|
int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) {
|
||||||
int32_t vgId = pStreamMeta->vgId;
|
int32_t vgId = pStreamMeta->vgId;
|
||||||
|
SArray* pTaskList = NULL;
|
||||||
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList);
|
||||||
if (numOfTasks == 0) {
|
if (numOfTasks == 0) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// clone the task list, to avoid the task update during scan wal files
|
// clone the task list, to avoid the task update during scan wal files
|
||||||
SArray* pTaskList = NULL;
|
|
||||||
streamMetaWLock(pStreamMeta);
|
streamMetaWLock(pStreamMeta);
|
||||||
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
|
pTaskList = taosArrayDup(pStreamMeta->pTaskList, NULL);
|
||||||
streamMetaWUnLock(pStreamMeta);
|
streamMetaWUnLock(pStreamMeta);
|
||||||
|
@ -447,3 +466,11 @@ int32_t doScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
|
|
||||||
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamMetaFreeTQDuringScanWalError(STQ* pTq) {
|
||||||
|
SBuildScanWalMsgParam* p = taosMemoryCalloc(1, sizeof(SBuildScanWalMsgParam));
|
||||||
|
p->metaId = pTq->pStreamMeta->rid;
|
||||||
|
p->numOfTasks = 0;
|
||||||
|
|
||||||
|
doStartScanWal(p, 0);
|
||||||
|
}
|
|
@ -331,7 +331,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
|
||||||
} else {
|
} else {
|
||||||
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
|
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
|
||||||
}
|
}
|
||||||
// taosMemoryFree(param);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -576,6 +576,7 @@ void streamMetaClose(SStreamMeta* pMeta) {
|
||||||
if (pMeta == NULL) {
|
if (pMeta == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
|
int32_t code = taosRemoveRef(streamMetaRefPool, pMeta->rid);
|
||||||
if (code) {
|
if (code) {
|
||||||
stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
|
stError("vgId:%d failed to remove meta ref:%" PRId64 ", code:%s", pMeta->vgId, pMeta->rid, tstrerror(code));
|
||||||
|
|
Loading…
Reference in New Issue