diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index a902aae9f4..1e622f615d 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -137,161 +137,6 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, return pTask; } -int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { - if (tStartEncode(pEncoder) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1; - if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1; - - if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1; - - if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1; - - if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1; - if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1; - - if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1; - int32_t taskId = pTask->hTaskInfo.id.taskId; - if (tEncodeI32(pEncoder, taskId)) return -1; - - if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1; - taskId = pTask->streamTaskId.taskId; - if (tEncodeI32(pEncoder, taskId)) return -1; - - if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1; - if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1; - if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1; - if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1; - - int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); - if (tEncodeI32(pEncoder, epSz) < 0) return -1; - for (int32_t i = 0; i < epSz; i++) { - SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); - if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1; - } - - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; - } - - if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - if (tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid) < 0) return -1; - if (tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1; - if (tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - if (tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - if (tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1; - if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; - if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; - } - if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1; - if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1; - - tEndEncode(pEncoder); - return pEncoder->pos; -} - -int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { - int32_t taskId = 0; - - if (tStartDecode(pDecoder) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; - if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1; - - if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1; - if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1; - - if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1; - - if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1; - - if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1; - if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; - - if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1; - if (tDecodeI32(pDecoder, &taskId)) return -1; - pTask->hTaskInfo.id.taskId = taskId; - - if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; - if (tDecodeI32(pDecoder, &taskId)) return -1; - pTask->streamTaskId.taskId = taskId; - - if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)) return -1; - if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)) return -1; - if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1; - if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1; - - int32_t epSz = -1; - if (tDecodeI32(pDecoder, &epSz) < 0) return -1; - - pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES); - for (int32_t i = 0; i < epSz; i++) { - SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo)); - if (pInfo == NULL) return -1; - if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) { - taosMemoryFreeClear(pInfo); - return -1; - } - taosArrayPush(pTask->upstreamInfo.pList, &pInfo); - } - - if (pTask->info.taskLevel != TASK_LEVEL__SINK) { - if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; - } - - if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { - if (tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1; - pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); - if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) return -1; - if (tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { - if (tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { - if (tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1; - if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1; - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; - if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; - } - if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; - if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ - if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; - } - if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; - - tEndDecode(pDecoder); - return 0; -} - int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) { int64_t skip64; int8_t skip8; diff --git a/source/libs/stream/src/streammsg.c b/source/libs/stream/src/streammsg.c index b37299f54f..5e52b927c6 100644 --- a/source/libs/stream/src/streammsg.c +++ b/source/libs/stream/src/streammsg.c @@ -429,4 +429,159 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) { if (pMsg->pTaskStatus != NULL) { taosArrayDestroy(pMsg->pTaskStatus); } +} + +int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1; + if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1; + + if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1; + + if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1; + + if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1; + if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1; + + if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1; + int32_t taskId = pTask->hTaskInfo.id.taskId; + if (tEncodeI32(pEncoder, taskId)) return -1; + + if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1; + taskId = pTask->streamTaskId.taskId; + if (tEncodeI32(pEncoder, taskId)) return -1; + + if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1; + if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1; + if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1; + if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1; + + int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList); + if (tEncodeI32(pEncoder, epSz) < 0) return -1; + for (int32_t i = 0; i < epSz; i++) { + SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i); + if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1; + } + + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; + } + + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { + if (tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid) < 0) return -1; + if (tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1; + if (tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { + if (tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { + if (tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; + if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; + } + if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1; + if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1; + + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { + int32_t taskId = 0; + + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1; + if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1; + + if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1; + if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1; + + if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1; + + if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1; + + if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1; + if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1; + + if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1; + if (tDecodeI32(pDecoder, &taskId)) return -1; + pTask->hTaskInfo.id.taskId = taskId; + + if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1; + if (tDecodeI32(pDecoder, &taskId)) return -1; + pTask->streamTaskId.taskId = taskId; + + if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)) return -1; + if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)) return -1; + if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1; + if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1; + + int32_t epSz = -1; + if (tDecodeI32(pDecoder, &epSz) < 0) return -1; + + pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES); + for (int32_t i = 0; i < epSz; i++) { + SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo)); + if (pInfo == NULL) return -1; + if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) { + taosMemoryFreeClear(pInfo); + return -1; + } + taosArrayPush(pTask->upstreamInfo.pList, &pInfo); + } + + if (pTask->info.taskLevel != TASK_LEVEL__SINK) { + if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; + } + + if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) { + if (tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1; + pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); + if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) return -1; + if (tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) { + if (tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) { + if (tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { + if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1; + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1; + if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1; + } + if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1; + if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){ + if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1; + } + if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1; + + tEndDecode(pDecoder); + return 0; } \ No newline at end of file