fix(stream): add some logs.

This commit is contained in:
Haojun Liao 2024-06-17 19:34:09 +08:00
parent 0f4ffdf49a
commit 60fac80345
2 changed files with 4 additions and 3 deletions

View File

@ -635,8 +635,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
// drop the related fill-history task firstly
if (hTaskId.taskId != 0 && hTaskId.streamId != 0) {
streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId);
streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId);
}
// drop the stream task now

View File

@ -677,8 +677,8 @@ static void doRemoveIdFromList(SStreamMeta* pMeta, int32_t num, SStreamTaskId* i
for (int32_t i = 0; i < num; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
if (pTaskId->streamId == id->streamId && pTaskId->taskId == id->taskId) {
stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x succ", pMeta->vgId, id->streamId, id->taskId);
taosArrayRemove(pMeta->pTaskList, i);
stDebug("vgId:%d remove streamId:0x%" PRIx64 " taskId:0x%x", pMeta->vgId, id->streamId, id->taskId);
remove = true;
break;
} else {
@ -723,7 +723,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
}
streamMetaWUnLock(pMeta);
stDebug("s-task:0x%x set task status:dropping and start to unregister it", taskId);
stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, pMeta->vgId);
while (1) {
streamMetaRLock(pMeta);
@ -750,6 +750,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask) {
pTask = *ppTask;
ASSERT(pTask->id.taskId == id.taskId && pTask->id.streamId == id.streamId);
// it is an fill-history task, remove the related stream task's id that points to it
if (pTask->info.fillHistory == 0) {