Avoid creating the same ID task multiple times

This commit is contained in:
yihaoDeng 2023-06-01 11:13:27 +00:00
parent 0315e89591
commit 32c3cfd51a
1 changed files with 5 additions and 3 deletions

View File

@ -206,7 +206,7 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) {
void* streamQueueNextItem(SStreamQueue* queue); void* streamQueueNextItem(SStreamQueue* queue);
SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type);
void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit);
SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit); SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit);
@ -284,7 +284,7 @@ struct SStreamTask {
int16_t dispatchMsgType; int16_t dispatchMsgType;
SStreamStatus status; SStreamStatus status;
int32_t selfChildId; int32_t selfChildId;
int32_t nodeId; // vgroup id int32_t nodeId; // vgroup id
SEpSet epSet; SEpSet epSet;
SCheckpointInfo chkInfo; SCheckpointInfo chkInfo;
STaskExec exec; STaskExec exec;
@ -346,12 +346,14 @@ typedef struct SStreamMeta {
void* streamBackend; void* streamBackend;
int32_t streamBackendId; int32_t streamBackendId;
int64_t streamBackendRid; int64_t streamBackendRid;
SHashObj* pTaskBackendUnique;
} SStreamMeta; } SStreamMeta;
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo);
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam, SArray* pTaskList); SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam,
SArray* pTaskList);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask); int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask); int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeStreamTask(SStreamTask* pTask); void tFreeStreamTask(SStreamTask* pTask);