diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1d4bbf073e..8316e6ef50 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -206,7 +206,7 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { void* streamQueueNextItem(SStreamQueue* queue); SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); -void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); +void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit); @@ -284,7 +284,7 @@ struct SStreamTask { int16_t dispatchMsgType; SStreamStatus status; int32_t selfChildId; - int32_t nodeId; // vgroup id + int32_t nodeId; // vgroup id SEpSet epSet; SCheckpointInfo chkInfo; STaskExec exec; @@ -346,12 +346,14 @@ typedef struct SStreamMeta { void* streamBackend; int32_t streamBackendId; int64_t streamBackendRid; + SHashObj* pTaskBackendUnique; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); -SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList); +SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, + SArray* pTaskList); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask);