fix(stream): remove failed task in hash table and array list.

This commit is contained in:
Haojun Liao 2024-11-22 19:22:37 +08:00
parent 9ea53119e5
commit d397c9a9b7
2 changed files with 9 additions and 2 deletions

View File

@ -652,7 +652,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
if (code < 0) { if (code < 0) {
tqError("failed to add s-task:0x%x into vgId:%d meta, existed:%d, code:%s", vgId, taskId, numOfTasks, tqError("vgId:%d failed to register s-task:0x%x into meta, existed tasks:%d, code:%s", vgId, taskId, numOfTasks,
tstrerror(code)); tstrerror(code));
return code; return code;
} }

View File

@ -723,8 +723,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask); pTask->id.refId = refId = taosAddRef(streamTaskRefPool, pTask);
code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t)); code = taosHashPut(pMeta->pTasksMap, &id, sizeof(id), &pTask->id.refId, sizeof(int64_t));
if (code) { // todo remove it from task list if (code) {
stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId); stError("s-task:0x%" PRIx64 " failed to register task into meta-list, code: out of memory", id.taskId);
void* pUnused = taosArrayPop(pMeta->pTaskList);
int32_t ret = taosRemoveRef(streamTaskRefPool, refId); int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
if (ret != 0) { if (ret != 0) {
@ -734,6 +735,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
} }
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) { if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
void* pUnused = taosArrayPop(pMeta->pTaskList);
int32_t ret = taosRemoveRef(streamTaskRefPool, refId); int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
if (ret) { if (ret) {
stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId); stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
@ -742,6 +746,9 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
} }
if ((code = streamMetaCommit(pMeta)) != 0) { if ((code = streamMetaCommit(pMeta)) != 0) {
int32_t unused = taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));
void* pUnused = taosArrayPop(pMeta->pTaskList);
int32_t ret = taosRemoveRef(streamTaskRefPool, refId); int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
if (ret) { if (ret) {
stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId); stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);