fix(stream): fix the dead lock.

This commit is contained in:
Haojun Liao 2023-12-24 23:30:00 +08:00
parent bf895fadcb
commit cd85dae3fd
4 changed files with 17 additions and 7 deletions

View File

@ -109,8 +109,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq);
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId); int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
int32_t streamSaveAllTaskStatus(SStreamMeta* pMeta, int64_t checkpointId);
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);

View File

@ -114,6 +114,7 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1); return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1);
} }
// todo handle down the transId of checkpoint to sink/agg tasks.
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) { static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) { if (pChkpoint == NULL) {

View File

@ -1102,7 +1102,10 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId); entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId);
entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId; entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId;
entry.chkpointTransId = (*pTask)->chkInfo.transId; entry.chkpointTransId = (*pTask)->chkInfo.transId;
stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
if (entry.checkpointFailed) {
stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
}
} }
if ((*pTask)->exec.pWalReader != NULL) { if ((*pTask)->exec.pWalReader != NULL) {

View File

@ -379,10 +379,11 @@ int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask) {
stDebug("s-task:%s enter into scan-history data stage, status:%s", id, p); stDebug("s-task:%s enter into scan-history data stage, status:%s", id, p);
streamTaskStartScanHistory(pTask); streamTaskStartScanHistory(pTask);
// start the related fill-history task, when current task is ready // NOTE: there will be an deadlock if launch fill history here.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { // // start the related fill-history task, when current task is ready
streamLaunchFillHistoryTask(pTask); // if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
} // streamLaunchFillHistoryTask(pTask);
// }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -400,6 +401,13 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
int64_t initTs = pTask->execInfo.init; int64_t initTs = pTask->execInfo.init;
int64_t startTs = pTask->execInfo.start; int64_t startTs = pTask->execInfo.start;
streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true); streamMetaUpdateTaskDownstreamStatus(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true);
// start the related fill-history task, when current task is ready
// not invoke in success callback due to the deadlock.
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr);
streamLaunchFillHistoryTask(pTask);
}
} }
static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) { static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {