fix(stream): set correct return code.
This commit is contained in:
parent
0d131116e7
commit
93cc7aa9ad
|
@ -351,7 +351,7 @@ int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
||||||
if (pTask) {
|
if (pTask && (code == 0)) {
|
||||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||||
if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
|
if (streamProcessDispatchMsg(pTask, &req, &rsp) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -441,7 +441,7 @@ int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
|
code = streamMetaAcquireTask(pMeta, req.streamId, req.dstTaskId, &pTask);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL || code != 0) {
|
||||||
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d process retrieve req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||||
req.dstTaskId);
|
req.dstTaskId);
|
||||||
tCleanupStreamRetrieveReq(&req);
|
tCleanupStreamRetrieveReq(&req);
|
||||||
|
@ -520,7 +520,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
|
code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.upstreamTaskId, &pTask);
|
||||||
if (pTask == NULL) {
|
if ((pTask == NULL) || (code != 0)) {
|
||||||
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
|
return streamMetaAddFailedTask(pMeta, rsp.streamId, rsp.upstreamTaskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -550,7 +550,7 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
||||||
code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
|
code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
|
tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
|
tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
|
||||||
|
@ -642,7 +642,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
||||||
if (restored) {
|
if (restored) {
|
||||||
SStreamTask* p = NULL;
|
SStreamTask* p = NULL;
|
||||||
code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
|
code = streamMetaAcquireTask(pMeta, streamId, taskId, &p);
|
||||||
if ((p != NULL) && (p->info.fillHistory == 0)) {
|
if ((p != NULL) && (code == 0) && (p->info.fillHistory == 0)) {
|
||||||
code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
|
code = tqStreamStartOneTaskAsync(pMeta, cb, streamId, taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -831,7 +831,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||||
|
|
||||||
if (pTask != NULL || (code != 0)) {
|
if (pTask != NULL && (code == 0)) {
|
||||||
char* pStatus = NULL;
|
char* pStatus = NULL;
|
||||||
if (streamTaskReadyToRun(pTask, &pStatus)) {
|
if (streamTaskReadyToRun(pTask, &pStatus)) {
|
||||||
int64_t execTs = pTask->status.lastExecTs;
|
int64_t execTs = pTask->status.lastExecTs;
|
||||||
|
@ -852,7 +852,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||||
if (pTask != NULL || (code != 0)) { // even in halt status, the data in inputQ must be processed
|
if ((pTask != NULL) && (code == 0)) { // even in halt status, the data in inputQ must be processed
|
||||||
char* p = NULL;
|
char* p = NULL;
|
||||||
if (streamTaskReadyToRun(pTask, &p)) {
|
if (streamTaskReadyToRun(pTask, &p)) {
|
||||||
tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
|
tqDebug("vgId:%d s-task:%s status:%s start to process block from inputQ, next checked ver:%" PRId64, vgId,
|
||||||
|
@ -963,7 +963,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId, &pTask);
|
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId, &pTask);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL || (code != 0)) {
|
||||||
tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64
|
tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64
|
||||||
" from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
|
" from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
|
||||||
pMeta->vgId, pReq->checkpointId, (int32_t)pReq->downstreamTaskId, pReq->upstreamTaskId);
|
pMeta->vgId, pReq->checkpointId, (int32_t)pReq->downstreamTaskId, pReq->upstreamTaskId);
|
||||||
|
@ -1031,11 +1031,11 @@ int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg)
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId, &pTask);
|
int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId, &pTask);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL || (code != 0)) {
|
||||||
tqError(
|
tqError(
|
||||||
"vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
|
"vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
|
||||||
pMeta->vgId, pRsp->taskId);
|
pMeta->vgId, pRsp->taskId);
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s recv re-send checkpoint-trigger msg from upstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
|
tqDebug("s-task:%s recv re-send checkpoint-trigger msg from upstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
|
||||||
|
@ -1051,7 +1051,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL || (code != 0)) {
|
||||||
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
tqError("vgId:%d process pause req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
|
||||||
pReq->taskId);
|
pReq->taskId);
|
||||||
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
// since task is in [STOP|DROPPING] state, it is safe to assume the pause is active
|
||||||
|
@ -1065,7 +1065,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
pHistoryTask = NULL;
|
pHistoryTask = NULL;
|
||||||
code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
|
code = streamMetaAcquireTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId, &pHistoryTask);
|
||||||
if (pHistoryTask == NULL) {
|
if (pHistoryTask == NULL || (code != 0)) {
|
||||||
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
|
tqError("vgId:%d process pause req, failed to acquire fill-history task:0x%" PRIx64
|
||||||
", it may have been dropped already",
|
", it may have been dropped already",
|
||||||
pMeta->vgId, pTask->hTaskInfo.id.taskId);
|
pMeta->vgId, pTask->hTaskInfo.id.taskId);
|
||||||
|
@ -1135,7 +1135,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
SStreamTask* pTask = NULL;
|
||||||
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL || (code != 0)) {
|
||||||
tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
|
tqError("s-task:0x%x failed to acquire task to resume, it may have been dropped or stopped", pReq->taskId);
|
||||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||||
}
|
}
|
||||||
|
@ -1153,7 +1153,7 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
|
||||||
STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
STaskId* pHTaskId = &pTask->hTaskInfo.id;
|
||||||
SStreamTask* pHTask = NULL;
|
SStreamTask* pHTask = NULL;
|
||||||
code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
|
code = streamMetaAcquireTask(pMeta, pHTaskId->streamId, pHTaskId->taskId, &pHTask);
|
||||||
if (pHTask) {
|
if (pHTask && (code == 0)) {
|
||||||
streamMutexLock(&pHTask->lock);
|
streamMutexLock(&pHTask->lock);
|
||||||
SStreamTaskState p = streamTaskGetStatus(pHTask);
|
SStreamTaskState p = streamTaskGetStatus(pHTask);
|
||||||
tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
|
tqDebug("s-task:%s related history task start to resume from paused, current status:%s", pHTask->id.idStr, p.name);
|
||||||
|
@ -1191,7 +1191,7 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
if (pTask == NULL || (code != 0)) {
|
if (pTask == NULL || (code != 0)) {
|
||||||
tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
|
tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
|
||||||
pRsp->downstreamNodeId, pRsp->downstreamTaskId);
|
pRsp->downstreamNodeId, pRsp->downstreamTaskId);
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
|
code = streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
|
||||||
|
|
Loading…
Reference in New Issue