fix(stream): fix error in decode stream task.

This commit is contained in:
Haojun Liao 2023-09-17 18:07:26 +08:00
parent 9d6199adf7
commit 57bf1cadc4
2 changed files with 9 additions and 17 deletions

View File

@ -799,17 +799,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
}
// pDb = mndAcquireDb(pMnode, streamObj.sourceDb);
// if (pDb->cfg.replications != 1) {
// mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications);
// terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB;
// mndReleaseDb(pMnode, pDb);
// pDb = NULL;
// goto _OVER;
// }
// mndReleaseDb(pMnode, pDb);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());

View File

@ -97,9 +97,12 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1;
if (tEncodeI32(pEncoder, pTask->historyTaskId.taskId)) return -1;
int32_t taskId = pTask->historyTaskId.taskId;
if (tEncodeI32(pEncoder, taskId)) return -1;
if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1;
if (tEncodeI32(pEncoder, pTask->streamTaskId.taskId)) return -1;
taskId = pTask->streamTaskId.taskId;
if (tEncodeI32(pEncoder, taskId)) return -1;
if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
@ -141,6 +144,8 @@ int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
}
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
int32_t taskId = 0;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
if (pTask->ver != SSTREAM_TASK_VER) return -1;
@ -165,14 +170,12 @@ int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
int32_t taskId = pTask->historyTaskId.taskId;
if (tDecodeI32(pDecoder, &taskId)) return -1;
pTask->historyTaskId.taskId = taskId;
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
taskId = pTask->streamTaskId.taskId;
if (tDecodeI32(pDecoder, &taskId)) return -1;
pTask->streamTaskId.taskId = taskId;
if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;