diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9d7786cfba..72309620d1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -113,14 +113,14 @@ typedef struct { int64_t ver; int32_t* dataRef; SPackedData submit; -} SStreamDataSubmit2; +} SStreamDataSubmit; typedef struct { int8_t type; int64_t ver; SArray* dataRefs; // SArray SArray* submits; // SArray -} SStreamMergedSubmit2; +} SStreamMergedSubmit; typedef struct { int8_t type; @@ -209,10 +209,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { void* streamQueueNextItem(SStreamQueue* queue); -SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type); -void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit); +SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); +void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); -SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit); +SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit); typedef struct { char* qmsg; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index cfed5f0529..2a4a471b97 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -183,7 +183,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq); // tq util char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); +int32_t tqAddBlockNLaunchTask(SStreamTask* pTask, SPackedData* pData); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index edea7724b5..bd1ccfe5dc 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -312,7 +312,6 @@ int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) { void* data = taosMemoryMalloc(len); if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); return -1; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index b956027741..8ada268c4d 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -120,8 +120,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - - // append the data for the stream tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); } else { @@ -145,17 +143,9 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT); - if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr); - streamMetaReleaseTask(pStreamMeta, pTask); - continue; - } - noNewDataInWal = false; - code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); + code = tqAddBlockNLaunchTask(pTask, &packData); if (code == TSDB_CODE_SUCCESS) { pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, @@ -164,8 +154,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { tqError("s-task:%s append input queue failed, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); } - streamDataSubmitDestroy(p); - taosFreeQitem(p); streamMetaReleaseTask(pStreamMeta, pTask); } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 93acd3a2b0..8f9a490048 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -26,10 +26,15 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { return taosStrdup(buf); } -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { - int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); +int32_t tqAddBlockNLaunchTask(SStreamTask* pTask, SPackedData *pPackedData) { + SStreamDataSubmit* p = streamDataSubmitNew(pPackedData, STREAM_INPUT__DATA_SUBMIT); + + int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*) p); + streamDataSubmitDestroy(p); + taosFreeQitem(p); + if (code < 0) { - tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver); + tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, pPackedData->ver); return -1; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 635024519e..90f4bac242 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -18,6 +18,9 @@ #define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100) +#define ONE_MB_F (1048576.0) + +#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q)/ONE_MB_F) int32_t streamInit() { int8_t old; @@ -288,7 +291,22 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; if (type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem); + int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); + + SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; + qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, + px->submit.msgLen, px->submit.ver, numOfBlocks, size); + + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && + (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, + STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, + numOfBlocks, size); + return -1; + } + + SStreamDataSubmit* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit*)pItem); if (pSubmitBlock == NULL) { qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -296,26 +314,11 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; - - qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, numOfBlocks, size); - - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && - (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { - qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, - STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, - numOfBlocks, size); - streamDataSubmitDestroy(pSubmitBlock); - return -1; - } - taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index e574cdbe8a..67177268d3 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -67,8 +67,8 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock return 0; } -SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { - SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen); +SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { + SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen); if (pDataSubmit == NULL) { return NULL; } @@ -79,14 +79,14 @@ SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { return NULL; } - pDataSubmit->submit = submit; + pDataSubmit->submit = *pData; *pDataSubmit->dataRef = 1; // initialize the reference count to be 1 pDataSubmit->type = type; return pDataSubmit; } -void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) { +void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); @@ -96,8 +96,8 @@ void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) { } } -SStreamMergedSubmit2* streamMergedSubmitNew() { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0); +SStreamMergedSubmit* streamMergedSubmitNew() { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0); if (pMerged == NULL) { return NULL; } @@ -116,30 +116,30 @@ SStreamMergedSubmit2* streamMergedSubmitNew() { return pMerged; } -int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) { +int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef); taosArrayPush(pMerged->submits, &pSubmit->submit); pMerged->ver = pSubmit->ver; return 0; } -static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) { +static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { atomic_add_fetch_32(pDataSubmit->dataRef, 1); } -SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) { +SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit) { int32_t len = 0; if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) { len = pSubmit->submit.msgLen; } - SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len); + SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, len); if (pSubmitClone == NULL) { return NULL; } streamDataSubmitRefInc(pSubmit); - memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2)); + memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); return pSubmitClone; } @@ -152,17 +152,17 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* taosFreeQitem(pElem); return dst; } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst; - SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)pElem; + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; + SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem; streamMergeSubmit(pMerged, pBlockSrc); taosFreeQitem(pElem); return dst; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamMergedSubmit2* pMerged = streamMergedSubmitNew(); + SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); // todo handle error - streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst); - streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem); taosFreeQitem(dst); taosFreeQitem(pElem); return (SStreamQueueItem*)pMerged; @@ -180,10 +180,10 @@ void streamFreeQitem(SStreamQueueItem* data) { taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(data); } else if (type == STREAM_INPUT__DATA_SUBMIT) { - streamDataSubmitDestroy((SStreamDataSubmit2*)data); + streamDataSubmitDestroy((SStreamDataSubmit*)data); taosFreeQitem(data); } else if (type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data; + SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; int32_t sz = taosArrayGetSize(pMerge->submits); for (int32_t i = 0; i < sz; i++) { int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0fb78fb589..c2e32f1027 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -51,7 +51,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); - const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; + const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data; qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); @@ -63,7 +63,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { - const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data; + const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data; SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); @@ -366,11 +366,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { qRes->blocks = pRes; if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)pInput; + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pInput; qRes->childId = pTask->selfChildId; qRes->sourceVer = pSubmit->ver; } else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)pInput; + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pInput; qRes->childId = pTask->selfChildId; qRes->sourceVer = pMerged->ver; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 20abcca197..52d6525769 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -105,3 +105,61 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { return (SStreamQueueRes){0}; } #endif + +#define MAX_STREAM_EXEC_BATCH_NUM 128 +#define MIN_STREAM_EXEC_BATCH_NUM 16 + +// todo refactor: +// read data from input queue +typedef struct SQueueReader { + SStreamQueue* pQueue; + int32_t taskLevel; + int32_t maxBlocks; // maximum block in one batch + int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms +} SQueueReader; + +SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* idstr) { + int32_t numOfBlocks = 0; + int32_t tryCount = 0; + SStreamQueueItem* pRet = NULL; + + while (1) { + SStreamQueueItem* qItem = streamQueueNextItem(pReader->pQueue); + if (qItem == NULL) { + if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) { + tryCount++; + taosMsleep(1); + qDebug("===stream===try again batchSize:%d", numOfBlocks); + continue; + } + + qDebug("===stream===break batchSize:%d", numOfBlocks); + break; + } + + if (pRet == NULL) { + pRet = qItem; + streamQueueProcessSuccess(pReader->pQueue); + if (pReader->taskLevel == TASK_LEVEL__SINK) { + break; + } + } else { + // todo we need to sort the data block, instead of just appending into the array list. + void* newRet = NULL; + if ((newRet = streamMergeQueueItem(pRet, qItem)) == NULL) { + streamQueueProcessFail(pReader->pQueue); + break; + } else { + numOfBlocks++; + pRet = newRet; + streamQueueProcessSuccess(pReader->pQueue); + if (numOfBlocks > pReader->maxBlocks) { + qDebug("maximum blocks limit:%d reached, processing, %s", pReader->maxBlocks, idstr); + break; + } + } + } + } + + return pRet; +}