fix(stream): fix invalid read.
This commit is contained in:
parent
b4a6523aa6
commit
84eaed0bbb
|
@ -22,6 +22,8 @@
|
||||||
typedef struct SBuildScanWalMsgParam {
|
typedef struct SBuildScanWalMsgParam {
|
||||||
int64_t metaId;
|
int64_t metaId;
|
||||||
int32_t numOfTasks;
|
int32_t numOfTasks;
|
||||||
|
int8_t restored;
|
||||||
|
SMsgCb msgCb;
|
||||||
} SBuildScanWalMsgParam;
|
} SBuildScanWalMsgParam;
|
||||||
|
|
||||||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta);
|
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta);
|
||||||
|
@ -74,7 +76,6 @@ int32_t tqScanWal(STQ* pTq) {
|
||||||
|
|
||||||
static void doStartScanWal(void* param, void* tmrId) {
|
static void doStartScanWal(void* param, void* tmrId) {
|
||||||
int32_t vgId = 0;
|
int32_t vgId = 0;
|
||||||
STQ* pTq = NULL;
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
|
SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param;
|
||||||
|
@ -100,16 +101,17 @@ static void doStartScanWal(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
vgId = pMeta->vgId;
|
vgId = pMeta->vgId;
|
||||||
pTq = pMeta->ahandle;
|
|
||||||
|
|
||||||
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
|
tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d, vnd restored:%d", vgId, pParam->numOfTasks,
|
||||||
pTq->pVnode->restored);
|
pParam->restored);
|
||||||
|
#if 0
|
||||||
|
// wait for the vnode is freed, and invalid read may occur.
|
||||||
|
taosMsleep(10000);
|
||||||
|
#endif
|
||||||
|
|
||||||
if (pTq->pVnode != NULL) {
|
code = streamTaskSchedTask(&pParam->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
||||||
code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA);
|
if (code) {
|
||||||
if (code) {
|
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
|
||||||
tqError("vgId:%d failed sched task to scan wal, code:%s", vgId, tstrerror(code));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
|
code = taosReleaseRef(streamMetaRefPool, pParam->metaId);
|
||||||
|
@ -135,6 +137,8 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
|
||||||
|
|
||||||
pParam->metaId = pMeta->rid;
|
pParam->metaId = pMeta->rid;
|
||||||
pParam->numOfTasks = numOfTasks;
|
pParam->numOfTasks = numOfTasks;
|
||||||
|
pParam->restored = pTq->pVnode->restored;
|
||||||
|
pParam->msgCb = pTq->pVnode->msgCb;
|
||||||
|
|
||||||
code = streamTimerGetInstance(&pTimer);
|
code = streamTimerGetInstance(&pTimer);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
|
Loading…
Reference in New Issue