fix(stream): fix syntax error.
This commit is contained in:
parent
eb42d47d96
commit
af259945b6
|
@ -794,6 +794,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) {
|
||||||
|
|
||||||
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
||||||
// pre-delete operation
|
// pre-delete operation
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
@ -806,19 +807,19 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
// desc the paused task counter
|
// desc the paused task counter
|
||||||
if (streamTaskShouldPause(pTask)) {
|
if (streamTaskShouldPause(pTask)) {
|
||||||
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
|
int32_t num = atomic_sub_fetch_32(&pMeta->numOfPausedTasks, 1);
|
||||||
stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", pMeta->vgId, pTask->id.idStr, num);
|
stInfo("vgId:%d s-task:%s drop stream task. pause task num:%d", vgId, pTask->id.idStr, num);
|
||||||
}
|
}
|
||||||
|
|
||||||
// handle the dropping event
|
// handle the dropping event
|
||||||
(void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
|
(void)streamTaskHandleEventAsync(pTask->status.pSM, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL);
|
||||||
} else {
|
} else {
|
||||||
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", pMeta->vgId, taskId);
|
stDebug("vgId:%d failed to find the task:0x%x, it may be dropped already", vgId, taskId);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, pMeta->vgId);
|
stDebug("s-task:0x%x vgId:%d set task status:dropping and start to unregister it", taskId, vgId);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int32_t timerActive = 0;
|
int32_t timerActive = 0;
|
||||||
|
@ -859,13 +860,13 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
int32_t size = (int32_t) taosHashGetSize(pMeta->pTasksMap);
|
int32_t size = (int32_t) taosHashGetSize(pMeta->pTasksMap);
|
||||||
int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
|
int32_t sizeInList = taosArrayGetSize(pMeta->pTaskList);
|
||||||
if (sizeInList != size) {
|
if (sizeInList != size) {
|
||||||
stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", pMeta->vgId, sizeInList, size);
|
stError("vgId:%d tasks number not consistent in list:%d and map:%d, ", vgId, sizeInList, size);
|
||||||
}
|
}
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
int32_t numOfTmr = pTask->status.timerActive;
|
int32_t numOfTmr = pTask->status.timerActive;
|
||||||
if (numOfTmr != 0) {
|
if (numOfTmr != 0) {
|
||||||
stError("s-task:%s vgId:%d invalid timer Active record:%d, internal error", pTask->id.idStr, numOfTmr);
|
stError("s-task:%s vgId:%d invalid timer Active record:%d, internal error", pTask->id.idStr, vgId, numOfTmr);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
if (pTask->info.delaySchedParam != 0 && pTask->info.fillHistory == 0) {
|
||||||
|
@ -877,7 +878,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
} else {
|
} else {
|
||||||
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", pMeta->vgId, taskId);
|
stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue