diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 72d0fa3232..688b58622a 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -27,7 +27,7 @@ static int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) { if ((*pSubmit)->submits == NULL) { taosFreeQitem(*pSubmit); *pSubmit = NULL; - return TSDB_CODE_OUT_OF_MEMORY; + return terrno; } (*pSubmit)->type = STREAM_INPUT__MERGED_SUBMIT; @@ -215,37 +215,6 @@ void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { } } -int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) { - *pSubmit = NULL; - - int32_t code = taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0, (void**)pSubmit); - if (code) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - (*pSubmit)->submits = taosArrayInit(0, sizeof(SPackedData)); - if ((*pSubmit)->submits == NULL) { - taosFreeQitem(*pSubmit); - *pSubmit = NULL; - return terrno; - } - - (*pSubmit)->type = STREAM_INPUT__MERGED_SUBMIT; - return TSDB_CODE_SUCCESS; -} - -int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { - void* p = taosArrayPush(pMerged->submits, &pSubmit->submit); - if (p == NULL) { - return terrno; - } - - if (pSubmit->ver > pMerged->ver) { - pMerged->ver = pSubmit->ver; - } - return 0; -} - // todo handle memory error int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem, SStreamQueueItem** pRes) { *pRes = NULL;