Merge pull request #28900 from taosdata/fix/liaohj
fix(stream): check whether dispatch trigger block is expired or not
This commit is contained in:
commit
642a5152f7
|
@ -714,7 +714,7 @@ int32_t streamTaskSetActiveCheckpointInfo(SStreamTask* pTask, int64_t activeChec
|
||||||
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId);
|
void streamTaskSetFailedChkptInfo(SStreamTask* pTask, int32_t transId, int64_t checkpointId);
|
||||||
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId);
|
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId);
|
||||||
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
|
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
|
||||||
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
|
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId);
|
||||||
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
|
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
|
||||||
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
|
||||||
SRpcHandleInfo* pInfo, int32_t code);
|
SRpcHandleInfo* pInfo, int32_t code);
|
||||||
|
|
|
@ -379,9 +379,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
|
if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
|
||||||
mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId);
|
mError("vgId:%d HbMsgId:%d already handled, bh msg discard, and send HbRsp", pEntry->nodeId, req.msgId);
|
||||||
|
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
|
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
|
||||||
|
|
||||||
streamMutexUnlock(&execInfo.lock);
|
streamMutexUnlock(&execInfo.lock);
|
||||||
|
|
|
@ -652,7 +652,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
tqError("failed to add s-task:0x%x into vgId:%d meta, existed:%d, code:%s", vgId, taskId, numOfTasks,
|
tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks,
|
||||||
tstrerror(code));
|
tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1208,34 +1208,37 @@ void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_
|
||||||
|
|
||||||
// record the dispatch checkpoint trigger info in the list
|
// record the dispatch checkpoint trigger info in the list
|
||||||
// memory insufficient may cause the stream computing stopped
|
// memory insufficient may cause the stream computing stopped
|
||||||
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
|
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId) {
|
||||||
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
streamMutexLock(&pInfo->lock);
|
streamMutexLock(&pInfo->lock);
|
||||||
|
|
||||||
pInfo->dispatchTrigger = true;
|
if (sendingChkptId > pInfo->failedId) {
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
pInfo->dispatchTrigger = true;
|
||||||
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
|
||||||
|
|
||||||
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
|
STaskTriggerSendInfo p = {
|
||||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
|
||||||
if (px == NULL) { // pause the stream task, if memory not enough
|
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||||
code = terrno;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
|
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
|
|
||||||
if (pVgInfo == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
|
|
||||||
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
|
||||||
if (px == NULL) { // pause the stream task, if memory not enough
|
if (px == NULL) { // pause the stream task, if memory not enough
|
||||||
code = terrno;
|
code = terrno;
|
||||||
break;
|
}
|
||||||
|
} else {
|
||||||
|
for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
|
||||||
|
if (pVgInfo == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
|
||||||
|
void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
|
||||||
|
if (px == NULL) { // pause the stream task, if memory not enough
|
||||||
|
code = terrno;
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -845,7 +845,15 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
streamMutexUnlock(&pTask->msgInfo.lock);
|
streamMutexUnlock(&pTask->msgInfo.lock);
|
||||||
|
|
||||||
code = doBuildDispatchMsg(pTask, pBlock);
|
code = doBuildDispatchMsg(pTask, pBlock);
|
||||||
|
|
||||||
|
int64_t chkptId = 0;
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
|
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||||
|
SSDataBlock* p = taosArrayGet(pBlock->blocks, 0);
|
||||||
|
if (pBlock != NULL) {
|
||||||
|
chkptId = p->info.version;
|
||||||
|
}
|
||||||
|
}
|
||||||
destroyStreamDataBlock(pBlock);
|
destroyStreamDataBlock(pBlock);
|
||||||
} else { // todo handle build dispatch msg failed
|
} else { // todo handle build dispatch msg failed
|
||||||
}
|
}
|
||||||
|
@ -862,7 +870,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = streamTaskInitTriggerDispatchInfo(pTask);
|
code = streamTaskInitTriggerDispatchInfo(pTask, chkptId);
|
||||||
if (code != TSDB_CODE_SUCCESS) { // todo handle error
|
if (code != TSDB_CODE_SUCCESS) { // todo handle error
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -723,8 +723,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
|
|
||||||
pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
|
pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
|
||||||
code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
|
code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
|
||||||
if (code) { // todo remove it from task list
|
if (code) {
|
||||||
stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
|
stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
|
||||||
|
void* pUnused = taosArrayPop(pMeta->pTaskList);
|
||||||
|
|
||||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
|
@ -734,6 +735,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
|
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
|
||||||
|
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
void* pUnused = taosArrayPop(pMeta->pTaskList);
|
||||||
|
|
||||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
|
stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
|
||||||
|
@ -742,6 +746,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((code = streamMetaCommit(pMeta)) != 0) {
|
if ((code = streamMetaCommit(pMeta)) != 0) {
|
||||||
|
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
void* pUnused = taosArrayPop(pMeta->pTaskList);
|
||||||
|
|
||||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||||
if (ret) {
|
if (ret) {
|
||||||
stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
|
stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
|
||||||
|
|
Loading…
Reference in New Issue