merge 3.0

This commit is contained in:
54liuyao 2024-09-25 15:25:54 +08:00
parent 68b2ca0f7b
commit dfa374bc8a
1 changed files with 1 additions and 32 deletions

View File

@ -27,7 +27,7 @@ static int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) {
if ((*pSubmit)->submits == NULL) {
taosFreeQitem(*pSubmit);
*pSubmit = NULL;
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
(*pSubmit)->type = STREAM_INPUT__MERGED_SUBMIT;
@ -215,37 +215,6 @@ void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) {
}
}
int32_t streamMergedSubmitNew(SStreamMergedSubmit** pSubmit) {
*pSubmit = NULL;
int32_t code = taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0, (void**)pSubmit);
if (code) {
return TSDB_CODE_OUT_OF_MEMORY;
}
(*pSubmit)->submits = taosArrayInit(0, sizeof(SPackedData));
if ((*pSubmit)->submits == NULL) {
taosFreeQitem(*pSubmit);
*pSubmit = NULL;
return terrno;
}
(*pSubmit)->type = STREAM_INPUT__MERGED_SUBMIT;
return TSDB_CODE_SUCCESS;
}
int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) {
void* p = taosArrayPush(pMerged->submits, &pSubmit->submit);
if (p == NULL) {
return terrno;
}
if (pSubmit->ver > pMerged->ver) {
pMerged->ver = pSubmit->ver;
}
return 0;
}
// todo handle memory error
int32_t streamQueueMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem, SStreamQueueItem** pRes) {
*pRes = NULL;