diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 930b7be3ef..0b647934ff 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -54,12 +54,12 @@ enum { enum { STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_BLOCK, + STREAM_INPUT__MERGED_SUBMIT, // STREAM_INPUT__TABLE_SCAN, STREAM_INPUT__TQ_SCAN, STREAM_INPUT__DATA_RETRIEVE, STREAM_INPUT__GET_RES, STREAM_INPUT__CHECKPOINT, - STREAM_INPUT__DROP, }; typedef enum EStreamType { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8c69c0f2de..7057227a00 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -77,6 +77,13 @@ typedef struct { SSubmitReq* data; } SStreamDataSubmit; +typedef struct { + int8_t type; + int64_t ver; + SArray* dataRefs; // SArray + SArray* reqs; // SArray +} SStreamMergedSubmit; + typedef struct { int8_t type; diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 6be15222db..6b447d05ad 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -77,6 +77,27 @@ FAIL: return NULL; } +SStreamMergedSubmit* streamMergedSubmitNew() { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM); + if (pMerged == NULL) return NULL; + pMerged->reqs = taosArrayInit(0, sizeof(void*)); + pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); + if (pMerged->dataRefs == NULL || pMerged->reqs == NULL) goto FAIL; + return pMerged; +FAIL: + if (pMerged->reqs) taosArrayDestroy(pMerged->reqs); + if (pMerged->dataRefs) taosArrayDestroy(pMerged->dataRefs); + taosFreeQitem(pMerged); + return NULL; +} + +int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { + taosArrayPush(pMerged->dataRefs, pSubmit->dataRef); + taosArrayPush(pMerged->reqs, pSubmit->data); + pMerged->ver = pSubmit->ver; + return 0; +} + static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { atomic_add_fetch_32(pDataSubmit->dataRef, 1); } @@ -102,10 +123,26 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { ASSERT(elem); - if (dst->type == elem->type && dst->type == STREAM_INPUT__DATA_BLOCK) { + if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)elem; taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); + taosArrayDestroy(pBlockSrc->blocks); + taosFreeQitem(elem); + return 0; + } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; + SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)elem; + streamMergeSubmit(pMerged, pBlockSrc); + taosFreeQitem(elem); + return 0; + } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && elem->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); + ASSERT(pMerged); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)elem); + taosFreeQitem(dst); + taosFreeQitem(elem); return 0; } else { return -1; @@ -123,5 +160,20 @@ void streamFreeQitem(SStreamQueueItem* data) { } else if (type == STREAM_INPUT__DATA_SUBMIT) { streamDataSubmitRefDec((SStreamDataSubmit*)data); taosFreeQitem(data); + } else if (type == STREAM_INPUT__MERGED_SUBMIT) { + SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; + int32_t sz = taosArrayGetSize(pMerge->reqs); + for (int32_t i = 0; i < sz; i++) { + int32_t* ref = taosArrayGet(pMerge->dataRefs, i); + (*ref)--; + if (*ref == 0) { + void* data = taosArrayGet(pMerge->reqs, i); + taosMemoryFree(data); + taosMemoryFree(ref); + } + } + taosArrayDestroy(pMerge->reqs); + taosArrayDestroy(pMerge->dataRefs); + taosFreeQitem(pMerge); } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b59a812678..c49e6c9b6c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -33,9 +33,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) SArray* blocks = pBlock->blocks; qDebug("task %d %p set ssdata input", pTask->taskId, pTask); qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_BLOCK, false); - } else if (pItem->type == STREAM_INPUT__DROP) { - // TODO exec drop - return 0; + } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data; + SArray* blocks = pMerged->reqs; + qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_SUBMIT, false); + } else { + ASSERT(0); } // exec @@ -155,11 +158,9 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { if (data == NULL) { data = qItem; streamQueueProcessSuccess(pTask->inputQueue); - if (qItem->type == STREAM_INPUT__DATA_BLOCK) { - /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ - } else { - break; - } + /*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/ + /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ + /*}*/ } else { if (streamAppendQueueItem(data, qItem) < 0) { streamQueueProcessFail(pTask->inputQueue); @@ -168,11 +169,10 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { cnt++; /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ streamQueueProcessSuccess(pTask->inputQueue); - taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks); - taosFreeQitem(qItem); } } } + if (pTask->taskStatus == TASK_STATUS__DROPPING) { if (data) streamFreeQitem(data); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); @@ -194,6 +194,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { if (taosArrayGetSize(pRes) != 0) { SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); if (qRes == NULL) { + // TODO log failed ver streamQueueProcessFail(pTask->inputQueue); taosArrayDestroy(pRes); return NULL; @@ -201,6 +202,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; if (streamTaskOutput(pTask, qRes) < 0) { + // TODO log failed ver /*streamQueueProcessFail(pTask->inputQueue);*/ taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosFreeQitem(qRes);