Merge pull request #25246 from taosdata/fix/3_liaohj

fix(stream): add lock for when set checkpoint dispatch msg.
This commit is contained in:
Haojun Liao 2024-04-03 12:42:29 +08:00 committed by GitHub
commit a3bcd9e9c9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 29 additions and 5 deletions

View File

@ -355,6 +355,8 @@ typedef struct SMetaHbInfo SMetaHbInfo;
typedef struct SDispatchMsgInfo { typedef struct SDispatchMsgInfo {
SStreamDispatchReq* pData; // current dispatch data SStreamDispatchReq* pData; // current dispatch data
int8_t dispatchMsgType; int8_t dispatchMsgType;
int64_t checkpointId;// checkpoint id msg
int32_t transId; // transId for current checkpoint
int16_t msgType; // dispatch msg type int16_t msgType; // dispatch msg type
int32_t retryCount; // retry send data count int32_t retryCount; // retry send data count
int64_t startTs; // dispatch start time, record total elapsed time for dispatch int64_t startTs; // dispatch start time, record total elapsed time for dispatch

View File

@ -850,12 +850,18 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
taosThreadMutexLock(&pTask->lock);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks // clear flag set during do checkpoint, and open inputQ for all upstream tasks
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d",
pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
streamTaskClearCheckInfo(pTask, true); streamTaskClearCheckInfo(pTask, true);
streamTaskSetStatusReady(pTask); streamTaskSetStatusReady(pTask);
} }
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -321,6 +321,8 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) {
destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask)); destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask));
} }
pMsgInfo->checkpointId = -1;
pMsgInfo->transId = -1;
pMsgInfo->pData = NULL; pMsgInfo->pData = NULL;
pMsgInfo->dispatchMsgType = 0; pMsgInfo->dispatchMsgType = 0;
} }
@ -332,6 +334,12 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
pTask->msgInfo.dispatchMsgType = pData->type; pTask->msgInfo.dispatchMsgType = pData->type;
if (pData->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
SSDataBlock* p = taosArrayGet(pData->blocks, 0);
pTask->msgInfo.checkpointId = p->info.version;
pTask->msgInfo.transId = p->info.window.ekey;
}
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
@ -950,9 +958,21 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) {
// this message has been sent successfully, let's try next one. // this message has been sent successfully, let's try next one.
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
if (delayDispatch) { if (delayDispatch) {
taosThreadMutexLock(&pTask->lock);
// we only set the dispatch msg info for current checkpoint trans
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && pTask->chkInfo.checkpointingId == pTask->msgInfo.checkpointId) {
ASSERT(pTask->chkInfo.transId == pTask->msgInfo.transId);
pTask->chkInfo.dispatchCheckpointTrigger = true; pTask->chkInfo.dispatchCheckpointTrigger = true;
stDebug("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d confirmed",
pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
} else {
stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d discard, since expired",
pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
}
taosThreadMutexUnlock(&pTask->lock);
} }
clearBufferedDispatchMsg(pTask); clearBufferedDispatchMsg(pTask);

View File

@ -543,8 +543,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
return; return;
} }
taosThreadMutexLock(&pTask->lock);
pSM->prev.state = pSM->current; pSM->prev.state = pSM->current;
pSM->prev.evt = 0; pSM->prev.evt = 0;
@ -552,8 +550,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
pSM->startTs = taosGetTimestampMs(); pSM->startTs = taosGetTimestampMs();
pSM->pActiveTrans = NULL; pSM->pActiveTrans = NULL;
taosArrayClear(pSM->pWaitingEventList); taosArrayClear(pSM->pWaitingEventList);
taosThreadMutexUnlock(&pTask->lock);
} }
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,