diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index b7a516190b..b169d82574 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -337,6 +337,7 @@ struct SStreamTask { SMsgCb* pMsgCb; // msg handle SStreamState* pState; // state backend SArray* pRspMsgList; + TdThreadMutex lock; // the followings attributes don't be serialized int32_t notReadyTasks; diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6288a048f7..8a7b61135b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -91,6 +91,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->exec.pExecutor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle, 0); ASSERT(pTask->exec.pExecutor); + taosThreadMutexInit(&pTask->lock, NULL); streamSetupScheduleTrigger(pTask); qDebug("snode:%d expand stream task on snode, s-task:%s, checkpoint ver:%" PRId64 " child id:%d, level:%d", SNODE_HANDLE, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index aeb9b55d12..5cae6793be 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -921,6 +921,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->status.taskStatus = TASK_STATUS__NORMAL; } + taosThreadMutexInit(&pTask->lock, NULL); streamSetupScheduleTrigger(pTask); tqInfo("vgId:%d expand stream task, s-task:%s, checkpoint ver:%" PRId64 diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a22a9ec534..88af841f05 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -691,10 +691,11 @@ int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len); info.msg.info = *pRpcInfo; - // todo: fix race condition here + taosThreadMutexLock(&pTask->lock); if (pTask->pRspMsgList == NULL) { pTask->pRspMsgList = taosArrayInit(4, sizeof(SStreamContinueExecInfo)); } + taosThreadMutexUnlock(&pTask->lock); taosArrayPush(pTask->pRspMsgList, &info); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index ca4586a1b4..d54d5fa8b8 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -251,5 +251,10 @@ void tFreeStreamTask(SStreamTask* pTask) { tSimpleHashCleanup(pTask->pNameMap); } + if (pTask->pRspMsgList != NULL) { + pTask->pRspMsgList = taosArrayDestroy(pTask->pRspMsgList); + } + + taosThreadMutexDestroy(&pTask->lock); taosMemoryFree(pTask); }