fix(stream): check failedId before sending checkpoint msg
This commit is contained in:
parent
d397c9a9b7
commit
96d4a2908e
|
@ -714,7 +714,7 @@ int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeChec
|
|||
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId);
|
||||
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId);
|
||||
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
|
||||
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
|
||||
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId);
|
||||
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
|
||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||
SRpcHandleInfo* pInfo, int32_t code);
|
||||
|
|
|
@ -1208,34 +1208,37 @@ void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_
|
|||
|
||||
// record the dispatch checkpoint trigger info in the list
|
||||
// memory insufficient may cause the stream computing stopped
|
||||
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
|
||||
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId) {
|
||||
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
int32_t code = 0;
|
||||
|
||||
streamMutexLock(&pInfo->lock);
|
||||
|
||||
pInfo->dispatchTrigger = true;
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
|
||||
if (sendingChkptId > pInfo->failedId) {
|
||||
pInfo->dispatchTrigger = true;
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
|
||||
|
||||
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
|
||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||
if (px == NULL) { // pause the stream task, if memory not enough
|
||||
code = terrno;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
|
||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||
STaskTriggerSendInfo p = {
|
||||
.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
|
||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||
if (px == NULL) { // pause the stream task, if memory not enough
|
||||
code = terrno;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
|
||||
if (pVgInfo == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
|
||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||
if (px == NULL) { // pause the stream task, if memory not enough
|
||||
code = terrno;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -845,7 +845,15 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
streamMutexUnlock(&pTask->msgInfo.lock);
|
||||
|
||||
code = doBuildDispatchMsg(pTask, pBlock);
|
||||
|
||||
int64_t chkptId = 0;
|
||||
if (code == 0) {
|
||||
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||
SSDataBlock* p = taosArrayGet(pBlock->blocks, 0);
|
||||
if (pBlock != NULL) {
|
||||
chkptId = p->info.version;
|
||||
}
|
||||
}
|
||||
destroyStreamDataBlock(pBlock);
|
||||
} else { // todo handle build dispatch msg failed
|
||||
}
|
||||
|
@ -862,7 +870,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
continue;
|
||||
}
|
||||
|
||||
code = streamTaskInitTriggerDispatchInfo(pTask);
|
||||
code = streamTaskInitTriggerDispatchInfo(pTask, chkptId);
|
||||
if (code != TSDB_CODE_SUCCESS) { // todo handle error
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue