refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-09-22 09:24:36 +08:00
parent b075b4438b
commit b1ba716299
3 changed files with 7 additions and 1 deletions

View File

@ -1335,6 +1335,8 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
tDecodeStreamDispatchReq(&decoder, &req);
tDecoderClear(&decoder);
tqDebug("s-task:0x%x recv dispatch msg from 0x%x(vgId:%d)", req.taskId, req.upstreamTaskId, req.upstreamNodeId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
if (pTask) {
SRpcMsg rsp = {.info = pMsg->info, .code = 0};

View File

@ -355,7 +355,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
const char* pStatus = streamGetTaskStatusStr(status);
if (status != TASK_STATUS__NORMAL) {
tqDebug("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, pStatus);
tqTrace("s-task:%s not ready for new submit block from wal, status:%s", pTask->id.idStr, pStatus);
streamMetaReleaseTask(pStreamMeta, pTask);
continue;
}

View File

@ -218,6 +218,10 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
ASSERT(pInfo != NULL);
if (!pTask->pMeta->leader) {
ASSERT(0);
}
// upstream task has restarted/leader-follower switch/transferred to other dnodes
if (pReq->stage > pInfo->stage) {
stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64