fix(stream): fix syntax check failure.
This commit is contained in:
parent
0fa7bf1a26
commit
51f3f29d5b
|
@ -1126,7 +1126,10 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
|
|||
tqError("vgId:%d, s-task:0x%x update qualified table error for stream task", vgId, taskId);
|
||||
}
|
||||
}
|
||||
taosReleaseRef(streamTaskRefPool, refId);
|
||||
int32_t ret = taosReleaseRef(streamTaskRefPool, refId);
|
||||
if (ret) {
|
||||
tqError("vgId:%d release task refId failed, refId:%" PRId64, vgId, refId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -135,7 +135,7 @@ int32_t metaRefMgtAdd(int64_t vgId, int64_t* rid) {
|
|||
void metaRefMgtRemove(int64_t* pRefId) {
|
||||
streamMutexLock(&gMetaRefMgt.mutex);
|
||||
|
||||
taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId));
|
||||
int32_t code = taosHashRemove(gMetaRefMgt.pTable, &pRefId, sizeof(pRefId));
|
||||
taosMemoryFree(pRefId);
|
||||
streamMutexUnlock(&gMetaRefMgt.mutex);
|
||||
}
|
||||
|
@ -534,7 +534,10 @@ void streamMetaClear(SStreamMeta* pMeta) {
|
|||
p->info.delaySchedParam = 0;
|
||||
}
|
||||
|
||||
taosRemoveRef(streamTaskRefPool, refId);
|
||||
int32_t code = taosRemoveRef(streamTaskRefPool, refId);
|
||||
if (code) {
|
||||
stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
|
||||
}
|
||||
}
|
||||
|
||||
if (pMeta->streamBackendRid != 0) {
|
||||
|
@ -722,12 +725,19 @@ int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTa
|
|||
}
|
||||
|
||||
if ((code = streamMetaSaveTask(pMeta, pTask)) != 0) {
|
||||
taosRemoveRef(streamTaskRefPool, refId);
|
||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||
if (ret) {
|
||||
stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
if ((code = streamMetaCommit(pMeta)) != 0) {
|
||||
taosRemoveRef(streamTaskRefPool, refId);
|
||||
int32_t ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||
if (ret) {
|
||||
stError("vgId:%d remove task refId failed, refId:%" PRId64, pMeta->vgId, refId);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1215,7 +1225,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
|||
streamMetaReleaseTask(pMeta, pTask);
|
||||
ret = taosRemoveRef(streamTaskRefPool, refId);
|
||||
if (ret) {
|
||||
stError("vgId:%d failed to remove task:0x%x, refId:%"PRId64, pMeta->vgId, pTaskId->taskId, refId);
|
||||
stError("vgId:%d failed to remove task:0x%x, refId:%" PRId64, pMeta->vgId, pTaskId->taskId, refId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1285,8 +1285,12 @@ int32_t streamTaskAllocRefId(SStreamTask* pTask, int64_t** pRefId) {
|
|||
*pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
if (*pRefId != NULL) {
|
||||
**pRefId = pTask->id.refId;
|
||||
metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
|
||||
return 0;
|
||||
int32_t code = metaRefMgtAdd(pTask->pMeta->vgId, *pRefId);
|
||||
if (code != 0) {
|
||||
stError("s-task:%s failed to add refId:%" PRId64 " into refId-mgmt, code:%s", pTask->id.idStr, pTask->id.refId,
|
||||
tstrerror(code));
|
||||
}
|
||||
return code;
|
||||
} else {
|
||||
stError("s-task:%s failed to alloc new ref id, code:%s", pTask->id.idStr, tstrerror(terrno));
|
||||
return terrno;
|
||||
|
|
Loading…
Reference in New Issue