fix(stream): add some logs.

This commit is contained in:
Haojun Liao 2024-10-10 15:54:15 +08:00
parent 5e94ed5e25
commit 5c1cffed69
3 changed files with 11 additions and 6 deletions

View File

@ -1119,10 +1119,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
int32_t code = 0; int32_t code = 0;
if (pTask == NULL) {
return -1;
}
streamTaskResume(pTask); streamTaskResume(pTask);
ETaskStatus status = streamTaskGetStatus(pTask).state; ETaskStatus status = streamTaskGetStatus(pTask).state;
@ -1150,7 +1146,6 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} }
} }
streamMetaReleaseTask(pMeta, pTask);
return code; return code;
} }
@ -1173,6 +1168,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode); code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
if (code != 0) { if (code != 0) {
streamMetaReleaseTask(pMeta, pTask);
return code; return code;
} }
@ -1186,6 +1182,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
streamMutexUnlock(&pHTask->lock); streamMutexUnlock(&pHTask->lock);
code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode); code = tqProcessTaskResumeImpl(handle, pHTask, sversion, pReq->igUntreated, fromVnode);
streamMetaReleaseTask(pMeta, pHTask);
} }
return code; return code;

View File

@ -759,6 +759,10 @@ void streamMetaAcquireOneTask(SStreamTask* pTask) {
} }
void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) { void streamMetaReleaseTask(SStreamMeta* UNUSED_PARAM(pMeta), SStreamTask* pTask) {
if (pTask == NULL) {
return;
}
int32_t taskId = pTask->id.taskId; int32_t taskId = pTask->id.taskId;
int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1); int32_t ref = atomic_sub_fetch_32(&pTask->refCnt, 1);

View File

@ -63,7 +63,11 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
pRunReq->reqType = execType; pRunReq->reqType = execType;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)}; SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
return tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg); int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
if (code) {
stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
}
return code;
} }
void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; } void streamTaskClearSchedIdleInfo(SStreamTask* pTask) { pTask->status.schedIdleTime = 0; }