diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index fd5591c488..e44bca123b 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -904,7 +904,7 @@ static int32_t doChkptStatusCheck(SStreamTask* pTask) { return -1; } - if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) { + if (pTmrInfo->launchChkptId != pActiveInfo->activeId) { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64 ", quit, ref:%d", @@ -1055,7 +1055,8 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { numOfNotSend = taosArrayGetSize(pNotSendList); if (numOfNotSend > 0) { stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); - streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor"); + streamTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + "trigger-recv-monitor"); } else { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a3146ae9d4..78cbd844a0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -526,6 +526,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) { int32_t msgId = pMsgInfo->msgId; int32_t code = 0; int64_t now = taosGetTimestampMs(); + bool inDispatch = true; stDebug("s-task:%s start monitor dispatch data", id); @@ -550,12 +551,15 @@ static void doMonitorDispatchData(void* param, void* tmrId) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref); - pTask->msgInfo.inMonitor = 0; - streamMutexUnlock(&pMsgInfo->lock); - return; + pMsgInfo->inMonitor = 0; + inDispatch = false; } streamMutexUnlock(&pMsgInfo->lock); + if (!inDispatch) { + return; + } + int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now); if (numOfFailed == 0) { stDebug("s-task:%s no error occurs, check again in %dms", id, DISPATCH_RETRY_INTERVAL_MS); @@ -638,15 +642,54 @@ void streamStartMonitorDispatchData(SStreamTask* pTask, int64_t waitDuration) { "dispatch-monitor"); } +static int32_t doAddDispatchBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, + SArray* vgInfo, uint32_t hashValue, int64_t now, bool* pFound) { + size_t numOfVgroups = taosArrayGetSize(vgInfo); + int32_t code = 0; + + *pFound = false; + + for (int32_t j = 0; j < numOfVgroups; j++) { + SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); + if (pVgInfo == NULL) { + continue; + } + + if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { + if ((code = streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j])) < 0) { + stError("s-task:%s failed to add dispatch block, code:%s", pTask->id.idStr, tstrerror(terrno)); + return code; + } + + if (pReqs[j].blockNum == 0) { + SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); + if (pDstVgroupInfo != NULL) { + addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false); + } + } + + pReqs[j].blockNum++; + *pFound = true; + break; + } + } + + return code; +} + int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, SSDataBlock* pDataBlock, int64_t groupId, int64_t now) { bool found = false; uint32_t hashValue = 0; - int32_t numOfVgroups = 0; + int32_t code = 0; + SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; - SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; if (pTask->pNameMap == NULL) { pTask->pNameMap = tSimpleHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); + if (pTask->pNameMap == NULL) { + stError("s-task:%s failed to init the name map, code:%s", pTask->id.idStr, tstrerror(terrno)); + return terrno; + } } void* pVal = tSimpleHashGet(pTask->pNameMap, &groupId, sizeof(int64_t)); @@ -669,11 +712,11 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } } } else { - int32_t code = buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, - pDataBlock->info.parTbName); + code = buildCtbNameByGroupIdImpl(pTask->outputInfo.shuffleDispatcher.stbFullName, groupId, + pDataBlock->info.parTbName); if (code) { - stError("s-task:%s failed to build child table name for group:%" PRId64 ", code:%s", pTask->id.idStr, - groupId, tstrerror(code)); + stError("s-task:%s failed to build child table name for group:%" PRId64 ", code:%s", pTask->id.idStr, groupId, + tstrerror(code)); } } @@ -688,44 +731,21 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S memcpy(bln.parTbName, pDataBlock->info.parTbName, strlen(pDataBlock->info.parTbName)); // failed to put into name buffer, no need to do anything - if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { // allow error, and do nothing - int32_t code = tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); + if (tSimpleHashGetSize(pTask->pNameMap) < MAX_BLOCK_NAME_NUM) { // allow error, and do nothing + code = tSimpleHashPut(pTask->pNameMap, &groupId, sizeof(int64_t), &bln, sizeof(SBlockName)); } } - numOfVgroups = taosArrayGetSize(vgInfo); - - // TODO: optimize search streamMutexLock(&pTask->msgInfo.lock); + code = doAddDispatchBlock(pTask, pReqs, pDataBlock, vgInfo, hashValue, now, &found); + streamMutexUnlock(&pTask->msgInfo.lock); - for (int32_t j = 0; j < numOfVgroups; j++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); - if (pVgInfo == NULL) { - continue; - } - - if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { - if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { - streamMutexUnlock(&pTask->msgInfo.lock); - return -1; - } - - if (pReqs[j].blockNum == 0) { - SVgroupInfo* pDstVgroupInfo = taosArrayGet(vgInfo, j); - if (pDstVgroupInfo != NULL) { - addDispatchEntry(&pTask->msgInfo, pDstVgroupInfo->vgId, now, false); - } - } - - pReqs[j].blockNum++; - found = true; - break; - } + if (code) { + return code; } - streamMutexUnlock(&pTask->msgInfo.lock); if (!found) { - stError("s-task:%s not found req hash value:%u", pTask->id.idStr, hashValue); + stError("s-task:%s not found req hash value:%u, failed to add dispatch block", pTask->id.idStr, hashValue); return TSDB_CODE_STREAM_INTERNAL_ERROR; } else { return 0; @@ -919,7 +939,7 @@ static int32_t doTaskChkptStatusCheck(SStreamTask* pTask, int32_t num) { } static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, int32_t num, int32_t vgId, int32_t level, - const char* id) { + const char* id) { SArray* pTmp = taosArrayInit(4, sizeof(int32_t)); if (pTmp == NULL) { return terrno; @@ -940,8 +960,8 @@ static int32_t doFindNotConfirmUpstream(SArray** ppNotRspList, SArray* pList, in stError("s-task:%s vgId:%d failed to record not rsp task, code: out of memory", id, vgId); return terrno; } else { - stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, - level, pInfo->upstreamTaskId); + stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId, level, + pInfo->upstreamTaskId); } } @@ -987,13 +1007,48 @@ static void doSendChkptReadyMsg(SStreamTask* pTask, SArray* pNotRspList, int64_t } } -static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { +static int32_t chkptReadyMsgSendHelper(SStreamTask* pTask, SArray* pNotRspList) { + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; + SArray* pList = pActiveInfo->pReadyMsgList; + int32_t num = taosArrayGetSize(pList); + int32_t vgId = pTask->pMeta->vgId; + int32_t checkpointId = pActiveInfo->activeId; + const char* id = pTask->id.idStr; + int32_t notRsp = 0; + + int32_t code = doTaskChkptStatusCheck(pTask, num); + if (code) { + return code; + } + + code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); + if (code) { + int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); + stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id, + tstrerror(code), ref); + return code; + } + + notRsp = taosArrayGetSize(pNotRspList); + if (notRsp == 0) { + streamClearChkptReadyMsg(pActiveInfo); + } else { + doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList); + } + + return code; +} + +static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; int32_t vgId = pTask->pMeta->vgId; const char* id = pTask->id.idStr; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; SArray* pNotRspList = NULL; + int32_t code = 0; + int32_t notRsp = 0; // check the status every 100ms if (streamTaskShouldStop(pTask)) { @@ -1004,7 +1059,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { } if (++pTmrInfo->activeCounter < 50) { - streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); return; } @@ -1027,45 +1082,26 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) { } streamMutexLock(&pActiveInfo->lock); + code = chkptReadyMsgSendHelper(pTask, pNotRspList); + streamMutexUnlock(&pActiveInfo->lock); - SArray* pList = pActiveInfo->pReadyMsgList; - int32_t num = taosArrayGetSize(pList); - int32_t code = doTaskChkptStatusCheck(pTask, num); - if (code) { - streamMutexUnlock(&pActiveInfo->lock); + if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pTask->pMeta, pTask); - return; - } - - code = doFindNotConfirmUpstream(&pNotRspList, pList, num, vgId, pTask->info.taskLevel, id); - if (code) { - int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); - stError("s-task:%s failed to find not rsp checkpoint-ready downstream, code:%s, out of tmr, ref:%d", id, - tstrerror(code), ref); - streamMutexUnlock(&pActiveInfo->lock); - streamMetaReleaseTask(pTask->pMeta, pTask); - taosArrayDestroy(pNotRspList); return; } - int32_t checkpointId = pActiveInfo->activeId; - int32_t notRsp = taosArrayGetSize(pNotRspList); - doSendChkptReadyMsg(pTask, pNotRspList, checkpointId, pList); - + notRsp = taosArrayGetSize(pNotRspList); if (notRsp > 0) { // send checkpoint-ready msg again - streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + stDebug("s-task:%s start to monitor checkpoint-ready msg recv status in 10s", id); + streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); - streamMutexUnlock(&pActiveInfo->lock); } else { int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask); stDebug( "s-task:%s vgId:%d checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg and quit " "from timer, ref:%d", id, vgId, ref); - - streamClearChkptReadyMsg(pActiveInfo); - streamMutexUnlock(&pActiveInfo->lock); // release should be the last execution, since pTask may be destroy after it immidiately. streamMetaReleaseTask(pTask->pMeta, pTask); } @@ -1124,7 +1160,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) { stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); streamMetaAcquireOneTask(pTask); - streamTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, + streamTmrStart(chkptReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor"); // mark the timer monitor checkpointId @@ -1190,6 +1226,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch taosMemoryFree(buf); return terrno; } + SET_PAYLOAD_LEN(pRetrieve->data, actualLen, actualLen); int32_t payloadLen = actualLen + PAYLOAD_PREFIX_LEN; @@ -1359,29 +1396,11 @@ void initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t upstr pReadyInfo->childId = childId; } -int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) { - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - return TSDB_CODE_SUCCESS; - } - - SStreamUpstreamEpInfo* pInfo = NULL; - streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo); - if (pInfo == NULL) { - return TSDB_CODE_STREAM_TASK_NOT_EXIST; - } - - STaskCheckpointReadyInfo info = {0}; - initCheckpointReadyInfo(&info, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId); - - stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64 "-0x%x (vgId:%d) idx:%d", - pTask->id.idStr, pTask->info.taskLevel, pTask->id.streamId, pInfo->taskId, pInfo->nodeId, index); - +static int32_t doAddChkptReadyMsg(SStreamTask* pTask, STaskCheckpointReadyInfo* pInfo) { SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; - streamMutexLock(&pActiveInfo->lock); - void* px = taosArrayPush(pActiveInfo->pReadyMsgList, &info); + void* px = taosArrayPush(pActiveInfo->pReadyMsgList, pInfo); if (px == NULL) { - streamMutexUnlock(&pActiveInfo->lock); stError("s-task:%s failed to add readyMsg info, code: out of memory", pTask->id.idStr); return terrno; } @@ -1395,10 +1414,36 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, stDebug("s-task:%s %d/%d checkpoint-trigger recv", pTask->id.idStr, numOfRecv, total); } - streamMutexUnlock(&pActiveInfo->lock); return 0; } +int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) { + int32_t code = 0; + STaskCheckpointReadyInfo info = {0}; + + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + return TSDB_CODE_SUCCESS; + } + + SStreamUpstreamEpInfo* pInfo = NULL; + streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId, &pInfo); + if (pInfo == NULL) { + return TSDB_CODE_STREAM_TASK_NOT_EXIST; + } + + initCheckpointReadyInfo(&info, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId); + + stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64 "-0x%x (vgId:%d) idx:%d", + pTask->id.idStr, pTask->info.taskLevel, pTask->id.streamId, pInfo->taskId, pInfo->nodeId, index); + + SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; + + streamMutexLock(&pActiveInfo->lock); + code = doAddChkptReadyMsg(pTask, &info); + streamMutexUnlock(&pActiveInfo->lock); + return code; +} + void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) { if (pActiveInfo == NULL) { return; diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 63e24b0975..98920e6f70 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -83,12 +83,14 @@ void streamTaskResumeInFuture(SStreamTask* pTask) { ////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// void streamTaskResumeHelper(void* param, void* tmrId) { - SStreamTask* pTask = (SStreamTask*)param; - SStreamTaskId* pId = &pTask->id; - SStreamTaskState p = streamTaskGetStatus(pTask); + SStreamTask* pTask = (SStreamTask*)param; + SStreamTaskId* pId = &pTask->id; + SStreamTaskState p = streamTaskGetStatus(pTask); + int32_t code = 0; if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) { int8_t status = streamTaskSetSchedStatusInactive(pTask); + TAOS_UNUSED(status); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s not resume task, ref:%d", pId->idStr, p.name, ref); @@ -97,13 +99,12 @@ void streamTaskResumeHelper(void* param, void* tmrId) { return; } - int32_t code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK); + code = streamTaskSchedTask(pTask->pMsgCb, pTask->info.nodeId, pId->streamId, pId->taskId, STREAM_EXEC_T_RESUME_TASK); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); if (code) { stError("s-task:%s sched task failed, code:%s, ref:%d", pId->idStr, tstrerror(code), ref); } else { - stDebug("trigger to resume s-task:%s after being idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, - ref); + stDebug("trigger to resume s-task:%s after idled for %dms, ref:%d", pId->idStr, pTask->status.schedIdleTime, ref); // release the task ref count streamTaskClearSchedIdleInfo(pTask); diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index b376dbd16b..4d7bf2ba87 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -64,7 +64,6 @@ static int32_t streamTaskSetReady(SStreamTask* pTask) { int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) { SStreamScanHistoryReq req; - int32_t code = 0; initScanHistoryReq(pTask, &req, igUntreated); int32_t len = sizeof(SStreamScanHistoryReq); @@ -173,7 +172,7 @@ int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask) { code = streamTaskStartScanHistory(pTask); } - // NOTE: there will be an deadlock if launch fill history here. + // NOTE: there will be a deadlock if launch fill history here. // start the related fill-history task, when current task is ready // if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { // streamLaunchFillHistoryTask(pTask); @@ -219,7 +218,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId); - // Set the execute conditions, including the query time window and the version range + // Set the execution conditions, including the query time window and the version range streamMetaRLock(pMeta); SStreamTask** pHTask = taosHashGet(pMeta->pTasksMap, &pTask->hTaskInfo.id, sizeof(pTask->hTaskInfo.id)); streamMetaRUnLock(pMeta);