fix(stream): fix race condition.

This commit is contained in:
Haojun Liao 2023-12-22 09:34:49 +08:00
parent 0fab9a1827
commit c9475060de
4 changed files with 16 additions and 12 deletions

View File

@ -90,7 +90,7 @@ bool streamTransConflictOtherTrans(SMnode* pMnode, int64_t streamUid, const char
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if (strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) {
mWarn("conflict with other transId:%d streamUid:%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
tInfo.name);
return true;
} else {
@ -98,7 +98,7 @@ bool streamTransConflictOtherTrans(SMnode* pMnode, int64_t streamUid, const char
}
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) {
mWarn("conflict with other transId:%d streamUid:%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
tInfo.name);
return true;
}

View File

@ -126,10 +126,13 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
int32_t tqStopStreamTasks(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = TD_VID(pTq->pVnode);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
streamMetaRLock(pMeta);
int32_t num = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d stop all %d stream task(s)", vgId, num);
if (num == 0) {
streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}
@ -149,6 +152,8 @@ int32_t tqStopStreamTasks(STQ* pTq) {
}
taosArrayDestroy(pTaskList);
streamMetaRUnLock(pMeta);
return 0;
}

View File

@ -39,6 +39,8 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
SStreamMeta* pMeta = pTq->pStreamMeta;
int64_t stage = pMeta->stage;
streamMetaWLock(pMeta);
pMeta->stage = state.term;
// mark the sign to send msg before close all tasks
@ -52,9 +54,11 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
state.term, stage, isLeader);
streamMetaStartHb(pMeta);
} else {
tqInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d", pMeta->vgId, state.term, stage,
isLeader);
tqInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,
state.term, stage, isLeader, pMeta->sendMsgBeforeClosing);
}
streamMetaWUnLock(pMeta);
}
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {

View File

@ -1328,14 +1328,9 @@ int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, voi
}
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
SArray* pTaskList = NULL;
bool sendMsg = false;
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
sendMsg = pMeta->sendMsgBeforeClosing;
streamMetaWUnLock(pMeta);
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
bool sendMsg = pMeta->sendMsgBeforeClosing;
if (!sendMsg) {
stDebug("vgId:%d no need to send msg to mnode before closing tasks", pMeta->vgId);
return pTaskList;