refactor: do some internal refactor.
This commit is contained in:
parent
6693efaa85
commit
db848f7e6b
|
@ -137,161 +137,6 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
|
||||||
return pTask;
|
return pTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
|
||||||
if (tStartEncode(pEncoder) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1;
|
|
||||||
if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1;
|
|
||||||
|
|
||||||
if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1;
|
|
||||||
|
|
||||||
if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1;
|
|
||||||
if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1;
|
|
||||||
if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1;
|
|
||||||
|
|
||||||
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
|
|
||||||
|
|
||||||
if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1;
|
|
||||||
int32_t taskId = pTask->hTaskInfo.id.taskId;
|
|
||||||
if (tEncodeI32(pEncoder, taskId)) return -1;
|
|
||||||
|
|
||||||
if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1;
|
|
||||||
taskId = pTask->streamTaskId.taskId;
|
|
||||||
if (tEncodeI32(pEncoder, taskId)) return -1;
|
|
||||||
|
|
||||||
if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
|
|
||||||
if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
|
|
||||||
if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;
|
|
||||||
|
|
||||||
int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
|
||||||
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
|
|
||||||
for (int32_t i = 0; i < epSz; i++) {
|
|
||||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
|
||||||
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
|
||||||
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
|
||||||
if (tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid) < 0) return -1;
|
|
||||||
if (tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1;
|
|
||||||
if (tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1;
|
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
|
||||||
if (tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId) < 0) return -1;
|
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
|
||||||
if (tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved) < 0) return -1;
|
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
|
||||||
if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1;
|
|
||||||
if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1;
|
|
||||||
if (tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1;
|
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1;
|
|
||||||
if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
|
|
||||||
}
|
|
||||||
if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1;
|
|
||||||
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
|
|
||||||
|
|
||||||
tEndEncode(pEncoder);
|
|
||||||
return pEncoder->pos;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
|
||||||
int32_t taskId = 0;
|
|
||||||
|
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
|
|
||||||
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1;
|
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1;
|
|
||||||
if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1;
|
|
||||||
|
|
||||||
if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1;
|
|
||||||
|
|
||||||
if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1;
|
|
||||||
if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1;
|
|
||||||
if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1;
|
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
|
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &taskId)) return -1;
|
|
||||||
pTask->hTaskInfo.id.taskId = taskId;
|
|
||||||
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &taskId)) return -1;
|
|
||||||
pTask->streamTaskId.taskId = taskId;
|
|
||||||
|
|
||||||
if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)) return -1;
|
|
||||||
if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;
|
|
||||||
|
|
||||||
int32_t epSz = -1;
|
|
||||||
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
|
|
||||||
|
|
||||||
pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES);
|
|
||||||
for (int32_t i = 0; i < epSz; i++) {
|
|
||||||
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
|
|
||||||
if (pInfo == NULL) return -1;
|
|
||||||
if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
|
|
||||||
taosMemoryFreeClear(pInfo);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
taosArrayPush(pTask->upstreamInfo.pList, &pInfo);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
|
||||||
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid) < 0) return -1;
|
|
||||||
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1;
|
|
||||||
pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
|
||||||
if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) return -1;
|
|
||||||
if (tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1;
|
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId) < 0) return -1;
|
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
|
||||||
if (tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved) < 0) return -1;
|
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
|
||||||
if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1;
|
|
||||||
if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1;
|
|
||||||
if (tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1;
|
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1;
|
|
||||||
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
|
|
||||||
}
|
|
||||||
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
|
|
||||||
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
|
||||||
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
|
|
||||||
}
|
|
||||||
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
|
|
||||||
|
|
||||||
tEndDecode(pDecoder);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
|
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
|
||||||
int64_t skip64;
|
int64_t skip64;
|
||||||
int8_t skip8;
|
int8_t skip8;
|
||||||
|
|
|
@ -429,4 +429,159 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
|
||||||
if (pMsg->pTaskStatus != NULL) {
|
if (pMsg->pTaskStatus != NULL) {
|
||||||
taosArrayDestroy(pMsg->pTaskStatus);
|
taosArrayDestroy(pMsg->pTaskStatus);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
|
||||||
|
if (tStartEncode(pEncoder) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1;
|
||||||
|
if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1;
|
||||||
|
|
||||||
|
if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1;
|
||||||
|
|
||||||
|
if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1;
|
||||||
|
if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1;
|
||||||
|
if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1;
|
||||||
|
|
||||||
|
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
|
||||||
|
|
||||||
|
if (tEncodeI64(pEncoder, pTask->hTaskInfo.id.streamId)) return -1;
|
||||||
|
int32_t taskId = pTask->hTaskInfo.id.taskId;
|
||||||
|
if (tEncodeI32(pEncoder, taskId)) return -1;
|
||||||
|
|
||||||
|
if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1;
|
||||||
|
taskId = pTask->streamTaskId.taskId;
|
||||||
|
if (tEncodeI32(pEncoder, taskId)) return -1;
|
||||||
|
|
||||||
|
if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
|
||||||
|
if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
|
||||||
|
if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;
|
||||||
|
|
||||||
|
int32_t epSz = taosArrayGetSize(pTask->upstreamInfo.pList);
|
||||||
|
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < epSz; i++) {
|
||||||
|
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->upstreamInfo.pList, i);
|
||||||
|
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||||
|
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||||
|
if (tEncodeI64(pEncoder, pTask->outputInfo.tbSink.stbUid) < 0) return -1;
|
||||||
|
if (tEncodeCStr(pEncoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1;
|
||||||
|
if (tEncodeSSchemaWrapper(pEncoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1;
|
||||||
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||||
|
if (tEncodeI64(pEncoder, pTask->outputInfo.smaSink.smaId) < 0) return -1;
|
||||||
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||||
|
if (tEncodeI8(pEncoder, pTask->outputInfo.fetchSink.reserved) < 0) return -1;
|
||||||
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1;
|
||||||
|
if (tEncodeI32(pEncoder, pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1;
|
||||||
|
if (tEncodeSEpSet(pEncoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1;
|
||||||
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
if (tSerializeSUseDbRspImp(pEncoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1;
|
||||||
|
if (tEncodeCStr(pEncoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
|
||||||
|
}
|
||||||
|
if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pTask->subtableWithoutMd5) < 0) return -1;
|
||||||
|
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
|
||||||
|
|
||||||
|
tEndEncode(pEncoder);
|
||||||
|
return pEncoder->pos;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
|
||||||
|
int32_t taskId = 0;
|
||||||
|
|
||||||
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
|
||||||
|
if (pTask->ver <= SSTREAM_TASK_INCOMPATIBLE_VER || pTask->ver > SSTREAM_TASK_VER) return -1;
|
||||||
|
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1;
|
||||||
|
if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1;
|
||||||
|
if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1;
|
||||||
|
if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
|
||||||
|
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->hTaskInfo.id.streamId)) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &taskId)) return -1;
|
||||||
|
pTask->hTaskInfo.id.taskId = taskId;
|
||||||
|
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &taskId)) return -1;
|
||||||
|
pTask->streamTaskId.taskId = taskId;
|
||||||
|
|
||||||
|
if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.minVer)) return -1;
|
||||||
|
if (tDecodeU64(pDecoder, (uint64_t*)&pTask->dataRange.range.maxVer)) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;
|
||||||
|
|
||||||
|
int32_t epSz = -1;
|
||||||
|
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
|
||||||
|
|
||||||
|
pTask->upstreamInfo.pList = taosArrayInit(epSz, POINTER_BYTES);
|
||||||
|
for (int32_t i = 0; i < epSz; i++) {
|
||||||
|
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
|
||||||
|
if (pInfo == NULL) return -1;
|
||||||
|
if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
|
||||||
|
taosMemoryFreeClear(pInfo);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
taosArrayPush(pTask->upstreamInfo.pList, &pInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||||
|
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->outputInfo.tbSink.stbUid) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.tbSink.stbFullName) < 0) return -1;
|
||||||
|
pTask->outputInfo.tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
|
||||||
|
if (pTask->outputInfo.tbSink.pSchemaWrapper == NULL) return -1;
|
||||||
|
if (tDecodeSSchemaWrapper(pDecoder, pTask->outputInfo.tbSink.pSchemaWrapper) < 0) return -1;
|
||||||
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->outputInfo.smaSink.smaId) < 0) return -1;
|
||||||
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
|
||||||
|
if (tDecodeI8(pDecoder, &pTask->outputInfo.fetchSink.reserved) < 0) return -1;
|
||||||
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.taskId) < 0) return -1;
|
||||||
|
if (tDecodeI32(pDecoder, &pTask->outputInfo.fixedDispatcher.nodeId) < 0) return -1;
|
||||||
|
if (tDecodeSEpSet(pDecoder, &pTask->outputInfo.fixedDispatcher.epSet) < 0) return -1;
|
||||||
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->outputInfo.shuffleDispatcher.dbInfo) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(pDecoder, pTask->outputInfo.shuffleDispatcher.stbFullName) < 0) return -1;
|
||||||
|
}
|
||||||
|
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
|
||||||
|
if (pTask->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER){
|
||||||
|
if (tDecodeI8(pDecoder, &pTask->subtableWithoutMd5) < 0) return -1;
|
||||||
|
}
|
||||||
|
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
|
||||||
|
|
||||||
|
tEndDecode(pDecoder);
|
||||||
|
return 0;
|
||||||
}
|
}
|
Loading…
Reference in New Issue