fix(stream): do some internal refactor and rewrite the error code for encode.
This commit is contained in:
parent
d013e02762
commit
e0a275f1d4
|
@ -586,6 +586,10 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
||||||
bool isLeader, bool restored) {
|
bool isLeader, bool restored) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
int32_t numOfTasks = 0;
|
||||||
|
int32_t taskId = -1;
|
||||||
|
int64_t streamId = -1;
|
||||||
|
bool added = false;
|
||||||
|
|
||||||
if (tsDisableStream) {
|
if (tsDisableStream) {
|
||||||
tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
|
tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
|
||||||
|
@ -613,13 +617,12 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2.save task, use the latest commit version as the initial start version of stream task.
|
// 2.save task, use the latest commit version as the initial start version of stream task.
|
||||||
int32_t taskId = pTask->id.taskId;
|
taskId = pTask->id.taskId;
|
||||||
int64_t streamId = pTask->id.streamId;
|
streamId = pTask->id.streamId;
|
||||||
bool added = false;
|
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
|
code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
|
@ -654,7 +657,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
|
||||||
tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
|
tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId);
|
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
|
||||||
tFreeStreamTask(pTask);
|
tFreeStreamTask(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -268,6 +268,7 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa
|
||||||
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
|
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
|
||||||
tmsgSendRsp(&rspMsg);
|
tmsgSendRsp(&rspMsg);
|
||||||
|
|
||||||
|
code = (code >= 0)? 0:code;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue