Merge pull request #17385 from taosdata/feature/stream
fix: error handling
This commit is contained in:
commit
eb26fbeaf2
|
@ -966,14 +966,14 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
||||||
SCMSubscribeReq req = {0};
|
SCMSubscribeReq req = {0};
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
|
tscDebug("call tmq subscribe, consumer: %ld, topic num %d", tmq->consumerId, sz);
|
||||||
|
|
||||||
req.consumerId = tmq->consumerId;
|
req.consumerId = tmq->consumerId;
|
||||||
tstrncpy(req.clientId, tmq->clientId, 256);
|
tstrncpy(req.clientId, tmq->clientId, 256);
|
||||||
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
|
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
|
||||||
req.topicNames = taosArrayInit(sz, sizeof(void*));
|
req.topicNames = taosArrayInit(sz, sizeof(void*));
|
||||||
if (req.topicNames == NULL) goto FAIL;
|
if (req.topicNames == NULL) goto FAIL;
|
||||||
|
|
||||||
tscDebug("call tmq subscribe, consumer: %ld, topic num %d", tmq->consumerId, sz);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
char* topic = taosArrayGetP(container, i);
|
char* topic = taosArrayGetP(container, i);
|
||||||
|
|
||||||
|
|
|
@ -416,7 +416,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
SStreamTask* pTask = tNewSStreamTask(pStream->uid);
|
||||||
if (pInnerTask == NULL) {
|
if (pTask == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
sdbRelease(pSdb, pVgroup);
|
sdbRelease(pSdb, pVgroup);
|
||||||
qDestroyQueryPlan(pPlan);
|
qDestroyQueryPlan(pPlan);
|
||||||
|
|
|
@ -1010,16 +1010,21 @@ int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver) {
|
||||||
|
|
||||||
if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
|
if (streamTaskInput(pTask, (SStreamQueueItem*)pRefBlock) < 0) {
|
||||||
qError("stream task input del failed, task id %d", pTask->taskId);
|
qError("stream task input del failed, task id %d", pTask->taskId);
|
||||||
|
|
||||||
|
taosFreeQitem(pRefBlock);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamSchedExec(pTask) < 0) {
|
if (streamSchedExec(pTask) < 0) {
|
||||||
qError("stream task launch failed, task id %d", pTask->taskId);
|
qError("stream task launch failed, task id %d", pTask->taskId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
streamTaskInputFail(pTask);
|
streamTaskInputFail(pTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ref = atomic_sub_fetch_32(pRef, 1);
|
int32_t ref = atomic_sub_fetch_32(pRef, 1);
|
||||||
ASSERT(ref >= 0);
|
ASSERT(ref >= 0);
|
||||||
if (ref == 0) {
|
if (ref == 0) {
|
||||||
|
|
|
@ -45,6 +45,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
||||||
if (taosRemoveFile(fnameStr) < 0) {
|
if (taosRemoveFile(fnameStr) < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
|
wError("vgId:%d restore from snapshot, cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
wInfo("vgId:%d restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
|
wInfo("vgId:%d restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
|
||||||
|
@ -53,6 +54,7 @@ int32_t walRestoreFromSnapshot(SWal *pWal, int64_t ver) {
|
||||||
if (taosRemoveFile(fnameStr) < 0) {
|
if (taosRemoveFile(fnameStr) < 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
wError("vgId:%d cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
|
wError("vgId:%d cannot remove file %s since %s", pWal->cfg.vgId, fnameStr, terrstr());
|
||||||
|
taosThreadMutexUnlock(&pWal->mutex);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
wInfo("vgId:%d restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
|
wInfo("vgId:%d restore from snapshot, remove file %s", pWal->cfg.vgId, fnameStr);
|
||||||
|
|
Loading…
Reference in New Issue