fix(stream): fix error and do some internal refactor.

This commit is contained in:
Haojun Liao 2024-07-17 19:17:53 +08:00
parent 9bdc65ceb1
commit 0b6a49ac7d
2 changed files with 44 additions and 8 deletions

View File

@ -99,6 +99,8 @@ int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpda
}
int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg* pMsg) {
int32_t code = 0;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pMsg->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pMsg->taskId) < 0) return -1;
@ -111,13 +113,17 @@ int32_t tDecodeStreamTaskUpdateMsg(SDecoder* pDecoder, SStreamTaskNodeUpdateMsg*
if (tDecodeI32(pDecoder, &info.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &info.prevEp) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &info.newEp) < 0) return -1;
taosArrayPush(pMsg->pNodeList, &info);
void* p = taosArrayPush(pMsg->pNodeList, &info);
if (p == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
}
}
if (tDecodeI32(pDecoder, &pMsg->transId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
return code;
}
int32_t tEncodeStreamTaskCheckReq(SEncoder* pEncoder, const SStreamTaskCheckReq* pReq) {
@ -257,8 +263,18 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tDecodeI32(pDecoder, &len1) < 0) return -1;
if (tDecodeBinaryAlloc(pDecoder, &data, &len2) < 0) return -1;
ASSERT(len1 == len2);
taosArrayPush(pReq->dataLen, &len1);
taosArrayPush(pReq->data, &data);
void* p = taosArrayPush(pReq->dataLen, &len1);
if (p == NULL) {
tEndDecode(pDecoder);
return TSDB_CODE_OUT_OF_MEMORY;
}
p = taosArrayPush(pReq->data, &data);
if (p == NULL) {
tEndDecode(pDecoder);
return TSDB_CODE_OUT_OF_MEMORY;
}
}
tEndDecode(pDecoder);
@ -371,6 +387,8 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
}
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
int32_t code = 0;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->numOfTasks) < 0) return -1;
@ -413,7 +431,11 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI64(pDecoder, &entry.hTaskId) < 0) return -1;
entry.id.taskId = taskId;
taosArrayPush(pReq->pTaskStatus, &entry);
void* p = taosArrayPush(pReq->pTaskStatus, &entry);
if (p == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
int32_t numOfVgs = 0;
@ -424,12 +446,20 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
for (int j = 0; j < numOfVgs; ++j) {
int32_t vgId = 0;
if (tDecodeI32(pDecoder, &vgId) < 0) return -1;
taosArrayPush(pReq->pUpdateNodes, &vgId);
void* p = taosArrayPush(pReq->pUpdateNodes, &vgId);
if (p == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
_err:
tEndDecode(pDecoder);
return code;
}
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
@ -572,7 +602,11 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
taosMemoryFreeClear(pInfo);
return -1;
}
taosArrayPush(pTask->upstreamInfo.pList, &pInfo);
void* p = taosArrayPush(pTask->upstreamInfo.pList, &pInfo);
if (p == NULL) {
tEndDecode(pDecoder);
return -1;
}
}
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {

View File

@ -43,7 +43,9 @@ SStreamState *stateCreate(const char *path) {
pTask->ver = 1024;
pTask->id.streamId = 1023;
pTask->id.taskId = 1111111;
SStreamMeta *pMeta = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL);
SStreamMeta *pMeta = NULL;
int32_t code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pMeta);
pTask->pMeta = pMeta;
SStreamState *p = streamStateOpen((char *)path, pTask, 0, 0);