From 8e15c64499caabc5ce87b49994654bbe53cc6da2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 13 May 2023 23:22:36 +0800 Subject: [PATCH 01/10] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 10 ++--- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tqRead.c | 1 - source/dnode/vnode/src/tq/tqRestore.c | 14 +------ source/dnode/vnode/src/tq/tqUtil.c | 11 +++-- source/libs/stream/src/stream.c | 37 +++++++++-------- source/libs/stream/src/streamData.c | 36 ++++++++--------- source/libs/stream/src/streamExec.c | 8 ++-- source/libs/stream/src/streamQueue.c | 58 +++++++++++++++++++++++++++ 9 files changed, 115 insertions(+), 62 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9d7786cfba..72309620d1 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -113,14 +113,14 @@ typedef struct { int64_t ver; int32_t* dataRef; SPackedData submit; -} SStreamDataSubmit2; +} SStreamDataSubmit; typedef struct { int8_t type; int64_t ver; SArray* dataRefs; // SArray SArray* submits; // SArray -} SStreamMergedSubmit2; +} SStreamMergedSubmit; typedef struct { int8_t type; @@ -209,10 +209,10 @@ static FORCE_INLINE void streamQueueProcessFail(SStreamQueue* queue) { void* streamQueueNextItem(SStreamQueue* queue); -SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type); -void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit); +SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); +void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); -SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit); +SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit); typedef struct { char* qmsg; diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index cfed5f0529..2a4a471b97 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -183,7 +183,7 @@ int32_t tqStreamTasksScanWal(STQ* pTq); // tq util char* createStreamTaskIdStr(int64_t streamId, int32_t taskId); -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver); +int32_t tqAddBlockNLaunchTask(SStreamTask* pTask, SPackedData* pData); int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index edea7724b5..bd1ccfe5dc 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -312,7 +312,6 @@ int32_t extractSubmitMsgFromWal(SWalReader* pReader, SPackedData* pPackedData) { void* data = taosMemoryMalloc(len); if (data == NULL) { - // todo: for all stream in this vnode, keep this offset in the offset files, and wait for a moment, and then retry terrno = TSDB_CODE_OUT_OF_MEMORY; tqError("vgId:%d, failed to copy submit data for stream processing, since out of memory", 0); return -1; diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index b956027741..8ada268c4d 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -120,8 +120,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - - // append the data for the stream tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); } else { @@ -145,17 +143,9 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - SStreamDataSubmit2* p = streamDataSubmitNew(packData, STREAM_INPUT__DATA_SUBMIT); - if (p == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("%s failed to create data submit for stream since out of memory", pTask->id.idStr); - streamMetaReleaseTask(pStreamMeta, pTask); - continue; - } - noNewDataInWal = false; - code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)p, packData.ver); + code = tqAddBlockNLaunchTask(pTask, &packData); if (code == TSDB_CODE_SUCCESS) { pTask->chkInfo.currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, @@ -164,8 +154,6 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { tqError("s-task:%s append input queue failed, ver:%" PRId64, pTask->id.idStr, pTask->chkInfo.currentVer); } - streamDataSubmitDestroy(p); - taosFreeQitem(p); streamMetaReleaseTask(pStreamMeta, pTask); } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 93acd3a2b0..8f9a490048 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -26,10 +26,15 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { return taosStrdup(buf); } -int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueItem, int64_t ver) { - int32_t code = tAppendDataToInputQueue(pTask, pQueueItem); +int32_t tqAddBlockNLaunchTask(SStreamTask* pTask, SPackedData *pPackedData) { + SStreamDataSubmit* p = streamDataSubmitNew(pPackedData, STREAM_INPUT__DATA_SUBMIT); + + int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*) p); + streamDataSubmitDestroy(p); + taosFreeQitem(p); + if (code < 0) { - tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, ver); + tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, pPackedData->ver); return -1; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 635024519e..90f4bac242 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -18,6 +18,9 @@ #define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 #define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100) +#define ONE_MB_F (1048576.0) + +#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q)/ONE_MB_F) int32_t streamInit() { int8_t old; @@ -288,7 +291,22 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; if (type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit2* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit2*)pItem); + int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); + + SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; + qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, + px->submit.msgLen, px->submit.ver, numOfBlocks, size); + + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && + (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, + STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, + numOfBlocks, size); + return -1; + } + + SStreamDataSubmit* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit*)pItem); if (pSubmitBlock == NULL) { qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -296,26 +314,11 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; - - qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - pSubmitBlock->submit.msgLen, pSubmitBlock->submit.ver, numOfBlocks, size); - - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && - (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { - qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, - STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, - numOfBlocks, size); - streamDataSubmitDestroy(pSubmitBlock); - return -1; - } - taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index e574cdbe8a..67177268d3 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -67,8 +67,8 @@ int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock return 0; } -SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { - SStreamDataSubmit2* pDataSubmit = (SStreamDataSubmit2*)taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, submit.msgLen); +SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { + SStreamDataSubmit* pDataSubmit = (SStreamDataSubmit*)taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, pData->msgLen); if (pDataSubmit == NULL) { return NULL; } @@ -79,14 +79,14 @@ SStreamDataSubmit2* streamDataSubmitNew(SPackedData submit, int32_t type) { return NULL; } - pDataSubmit->submit = submit; + pDataSubmit->submit = *pData; *pDataSubmit->dataRef = 1; // initialize the reference count to be 1 pDataSubmit->type = type; return pDataSubmit; } -void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) { +void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); @@ -96,8 +96,8 @@ void streamDataSubmitDestroy(SStreamDataSubmit2* pDataSubmit) { } } -SStreamMergedSubmit2* streamMergedSubmitNew() { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)taosAllocateQitem(sizeof(SStreamMergedSubmit2), DEF_QITEM, 0); +SStreamMergedSubmit* streamMergedSubmitNew() { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)taosAllocateQitem(sizeof(SStreamMergedSubmit), DEF_QITEM, 0); if (pMerged == NULL) { return NULL; } @@ -116,30 +116,30 @@ SStreamMergedSubmit2* streamMergedSubmitNew() { return pMerged; } -int32_t streamMergeSubmit(SStreamMergedSubmit2* pMerged, SStreamDataSubmit2* pSubmit) { +int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef); taosArrayPush(pMerged->submits, &pSubmit->submit); pMerged->ver = pSubmit->ver; return 0; } -static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit2* pDataSubmit) { +static FORCE_INLINE void streamDataSubmitRefInc(SStreamDataSubmit* pDataSubmit) { atomic_add_fetch_32(pDataSubmit->dataRef, 1); } -SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit) { +SStreamDataSubmit* streamSubmitBlockClone(SStreamDataSubmit* pSubmit) { int32_t len = 0; if (pSubmit->type == STREAM_INPUT__DATA_SUBMIT) { len = pSubmit->submit.msgLen; } - SStreamDataSubmit2* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit2), DEF_QITEM, len); + SStreamDataSubmit* pSubmitClone = taosAllocateQitem(sizeof(SStreamDataSubmit), DEF_QITEM, len); if (pSubmitClone == NULL) { return NULL; } streamDataSubmitRefInc(pSubmit); - memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit2)); + memcpy(pSubmitClone, pSubmit, sizeof(SStreamDataSubmit)); return pSubmitClone; } @@ -152,17 +152,17 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* taosFreeQitem(pElem); return dst; } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)dst; - SStreamDataSubmit2* pBlockSrc = (SStreamDataSubmit2*)pElem; + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; + SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem; streamMergeSubmit(pMerged, pBlockSrc); taosFreeQitem(pElem); return dst; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamMergedSubmit2* pMerged = streamMergedSubmitNew(); + SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); // todo handle error - streamMergeSubmit(pMerged, (SStreamDataSubmit2*)dst); - streamMergeSubmit(pMerged, (SStreamDataSubmit2*)pElem); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst); + streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem); taosFreeQitem(dst); taosFreeQitem(pElem); return (SStreamQueueItem*)pMerged; @@ -180,10 +180,10 @@ void streamFreeQitem(SStreamQueueItem* data) { taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(data); } else if (type == STREAM_INPUT__DATA_SUBMIT) { - streamDataSubmitDestroy((SStreamDataSubmit2*)data); + streamDataSubmitDestroy((SStreamDataSubmit*)data); taosFreeQitem(data); } else if (type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit2* pMerge = (SStreamMergedSubmit2*)data; + SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; int32_t sz = taosArrayGetSize(pMerge->submits); for (int32_t i = 0; i < sz; i++) { int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0fb78fb589..c2e32f1027 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -51,7 +51,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); - const SStreamDataSubmit2* pSubmit = (const SStreamDataSubmit2*)data; + const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data; qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); @@ -63,7 +63,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* qDebug("s-task:%s set sdata blocks as input num:%d, ver:%"PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { - const SStreamMergedSubmit2* pMerged = (const SStreamMergedSubmit2*)data; + const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data; SArray* pBlockList = pMerged->submits; int32_t numOfBlocks = taosArrayGetSize(pBlockList); @@ -366,11 +366,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { qRes->blocks = pRes; if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit2* pSubmit = (SStreamDataSubmit2*)pInput; + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pInput; qRes->childId = pTask->selfChildId; qRes->sourceVer = pSubmit->ver; } else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit2* pMerged = (SStreamMergedSubmit2*)pInput; + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pInput; qRes->childId = pTask->selfChildId; qRes->sourceVer = pMerged->ver; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 20abcca197..52d6525769 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -105,3 +105,61 @@ SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { return (SStreamQueueRes){0}; } #endif + +#define MAX_STREAM_EXEC_BATCH_NUM 128 +#define MIN_STREAM_EXEC_BATCH_NUM 16 + +// todo refactor: +// read data from input queue +typedef struct SQueueReader { + SStreamQueue* pQueue; + int32_t taskLevel; + int32_t maxBlocks; // maximum block in one batch + int32_t waitDuration; // maximum wait time to format several block into a batch to process, unit: ms +} SQueueReader; + +SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* idstr) { + int32_t numOfBlocks = 0; + int32_t tryCount = 0; + SStreamQueueItem* pRet = NULL; + + while (1) { + SStreamQueueItem* qItem = streamQueueNextItem(pReader->pQueue); + if (qItem == NULL) { + if (pReader->taskLevel == TASK_LEVEL__SOURCE && numOfBlocks < MIN_STREAM_EXEC_BATCH_NUM && tryCount < pReader->waitDuration) { + tryCount++; + taosMsleep(1); + qDebug("===stream===try again batchSize:%d", numOfBlocks); + continue; + } + + qDebug("===stream===break batchSize:%d", numOfBlocks); + break; + } + + if (pRet == NULL) { + pRet = qItem; + streamQueueProcessSuccess(pReader->pQueue); + if (pReader->taskLevel == TASK_LEVEL__SINK) { + break; + } + } else { + // todo we need to sort the data block, instead of just appending into the array list. + void* newRet = NULL; + if ((newRet = streamMergeQueueItem(pRet, qItem)) == NULL) { + streamQueueProcessFail(pReader->pQueue); + break; + } else { + numOfBlocks++; + pRet = newRet; + streamQueueProcessSuccess(pReader->pQueue); + if (numOfBlocks > pReader->maxBlocks) { + qDebug("maximum blocks limit:%d reached, processing, %s", pReader->maxBlocks, idstr); + break; + } + } + } + } + + return pRet; +} From 3d00f6ce547f00a7a857b72cec7af5c6241a6aef Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 14 May 2023 22:08:17 +0800 Subject: [PATCH 02/10] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqSink.c | 39 +++++++++++++++--------------- source/libs/stream/src/stream.c | 10 ++++---- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 4a9e3dcee7..e2f9089730 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -19,10 +19,10 @@ #define MAX_CATCH_NUM 10240 -typedef struct STblInfo { +typedef struct STableSinkInfo { uint64_t uid; char tbName[TSDB_TABLE_NAME_LEN]; -} STblInfo; +} STableSinkInfo; int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { @@ -97,16 +97,17 @@ end: return ret; } -int32_t tqGetTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo** pTbl) { - void* pVal = tSimpleHashGet(tblInfo, &groupId, sizeof(uint64_t)); +static int32_t tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) { + void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t)); if (pVal) { - *pTbl = *(STblInfo**)pVal; + *pInfo = *(STableSinkInfo**)pVal; return TSDB_CODE_SUCCESS; } + return TSDB_CODE_FAILED; } -int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo* pTbl) { +int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) { if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) { return TSDB_CODE_SUCCESS; } @@ -274,7 +275,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d crTblArray = NULL; } else { SSubmitTbData tbData = {0}; - tqDebug("tq sink pipe, convert block1 %d, rows: %d", i, rows); + tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows); if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { goto _end; @@ -283,35 +284,35 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tbData.suid = suid; tbData.uid = 0; // uid is assigned by vnode tbData.sver = pTSchema->version; - STblInfo* pTblMeta = NULL; - int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTblMeta); + STableSinkInfo* pTableSinkInfo = NULL; + int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTableSinkInfo); if (res != TSDB_CODE_SUCCESS) { - pTblMeta = taosMemoryCalloc(1, sizeof(STblInfo)); + pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo)); } char* ctbName = pDataBlock->info.parTbName; if (!ctbName[0]) { if (res == TSDB_CODE_SUCCESS) { - memcpy(ctbName, pTblMeta->tbName, strlen(pTblMeta->tbName)); + memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName)); } else { char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); memcpy(ctbName, tmp, strlen(tmp)); - memcpy(pTblMeta->tbName, tmp, strlen(tmp)); + memcpy(pTableSinkInfo->tbName, tmp, strlen(tmp)); taosMemoryFree(tmp); - tqDebug("vgId:%d, gropuid:%" PRIu64 " datablock tabel name is null", TD_VID(pVnode), + tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode), pDataBlock->info.id.groupId); } } if (res == TSDB_CODE_SUCCESS) { - tbData.uid = pTblMeta->uid; + tbData.uid = pTableSinkInfo->uid; } else { SMetaReader mr = {0}; metaReaderInit(&mr, pVnode->pMeta, 0); if (metaGetTableEntryByName(&mr, ctbName) < 0) { metaReaderClear(&mr); - taosMemoryFree(pTblMeta); + taosMemoryFree(pTableSinkInfo); tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); SVCreateTbReq* pCreateTbReq = NULL; @@ -371,7 +372,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, mr.me.type); metaReaderClear(&mr); - taosMemoryFree(pTblMeta); + taosMemoryFree(pTableSinkInfo); continue; } @@ -380,13 +381,13 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ", actual suid %" PRId64 "", TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); metaReaderClear(&mr); - taosMemoryFree(pTblMeta); + taosMemoryFree(pTableSinkInfo); continue; } tbData.uid = mr.me.uid; - pTblMeta->uid = mr.me.uid; - tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTblMeta); + pTableSinkInfo->uid = mr.me.uid; + tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo); metaReaderClear(&mr); } } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 90f4bac242..6047f74ab2 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -284,7 +284,9 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S } bool tInputQueueIsFull(const SStreamTask* pTask) { - return taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; + bool isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); + return (isFull || size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); } int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { @@ -298,8 +300,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, px->submit.msgLen, px->submit.ver, numOfBlocks, size); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && - (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, size); @@ -320,8 +321,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && - (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, size); From ac287572efdc2f54a5cb33aba6fa5fca64629e84 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 May 2023 00:03:53 +0800 Subject: [PATCH 03/10] fix: free table sink info --- source/dnode/vnode/src/tq/tqSink.c | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index e2f9089730..0bd7d9a57b 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -17,7 +17,7 @@ #include "tmsg.h" #include "tq.h" -#define MAX_CATCH_NUM 10240 +#define MAX_CACHE_TABLE_INFO_NUM 10240 typedef struct STableSinkInfo { uint64_t uid; @@ -108,9 +108,10 @@ static int32_t tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableS } int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) { - if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) { - return TSDB_CODE_SUCCESS; + if (tSimpleHashGetSize(tblInfo) > MAX_CACHE_TABLE_INFO_NUM) { + return TSDB_CODE_FAILED; } + return tSimpleHashPut(tblInfo, &groupId, sizeof(uint64_t), &pTbl, POINTER_BYTES); } @@ -387,7 +388,10 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tbData.uid = mr.me.uid; pTableSinkInfo->uid = mr.me.uid; - tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo); + int32_t code = tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFreeClear(pTableSinkInfo); + } metaReaderClear(&mr); } } From 3b6e052c6b1d895113eed54e064cb99479703248 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 May 2023 10:11:08 +0800 Subject: [PATCH 04/10] enh(stream): avoid clone submit block. --- source/libs/stream/src/stream.c | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 6047f74ab2..a0a7d5caf7 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -307,15 +307,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - SStreamDataSubmit* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit*)pItem); - if (pSubmitBlock == NULL) { - qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask); - terrno = TSDB_CODE_OUT_OF_MEMORY; - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); - return -1; - } - - taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); + taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; @@ -340,10 +332,6 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); } -#if 0 - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); -#endif - return 0; } From 9258dcd7404e81748758b3986ab1ebbe4250d129 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 15 May 2023 11:19:17 +0800 Subject: [PATCH 05/10] fix:mem leak --- source/libs/executor/inc/executorInt.h | 1 - source/libs/executor/src/groupoperator.c | 6 ++++++ source/libs/stream/src/streamExec.c | 3 ++- 3 files changed, 8 insertions(+), 2 deletions(-) diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index c9d0eebe2b..d561d86472 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -297,7 +297,6 @@ typedef struct SPartitionBySupporter { typedef struct SPartitionDataInfo { uint64_t groupId; char* tbname; - SArray* tags; SArray* rowIds; } SPartitionDataInfo; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 280edf8b53..e2ba2a07c8 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -1215,6 +1215,11 @@ SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { return pBlock; } +void freePartItem(void* ptr) { + SPartitionDataInfo* pPart = (SPartitionDataInfo*)ptr; + taosArrayDestroy(pPart->rowIds); +} + SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; @@ -1293,6 +1298,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pPartitions = taosHashInit(1024, hashFn, false, HASH_NO_LOCK); + taosHashSetFreeFp(pInfo->pPartitions, freePartItem); pInfo->tsColIndex = 0; pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 0fb78fb589..918a203df4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -149,7 +149,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { int32_t batchCnt = 0; while (1) { if (streamTaskShouldStop(&pTask->status) || streamTaskShouldPause(&pTask->status)) { - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return 0; } @@ -203,6 +203,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { code = streamTaskOutput(pTask, qRes); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { taosFreeQitem(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; } From 17592ea2947bd2983781da16485ee25a81ef9d33 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 May 2023 13:49:26 +0800 Subject: [PATCH 06/10] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqUtil.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 8f9a490048..e78820bad8 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -28,11 +28,7 @@ char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { int32_t tqAddBlockNLaunchTask(SStreamTask* pTask, SPackedData *pPackedData) { SStreamDataSubmit* p = streamDataSubmitNew(pPackedData, STREAM_INPUT__DATA_SUBMIT); - int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*) p); - streamDataSubmitDestroy(p); - taosFreeQitem(p); - if (code < 0) { tqError("s-task:%s failed to put into queue, too many, next start ver:%" PRId64, pTask->id.idStr, pPackedData->ver); return -1; From 4c64260a7afd78a7cc91ee0aa5b7eed0d1d8b6d0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 May 2023 14:15:10 +0800 Subject: [PATCH 07/10] refactor: do internal refactor. --- source/libs/stream/src/stream.c | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index a0a7d5caf7..1c75519647 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,11 +16,10 @@ #include "streamInc.h" #include "ttimer.h" -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 -#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (100) -#define ONE_MB_F (1048576.0) - -#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q)/ONE_MB_F) +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY 20480 +#define STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE (50) +#define ONE_MB_F (1048576.0) +#define QUEUE_MEM_SIZE_IN_MB(_q) (taosQueueMemorySize(_q)/ONE_MB_F) int32_t streamInit() { int8_t old; From 502efb3d019ebe3dc3db2848e8983270b5c41f47 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Mon, 15 May 2023 16:26:24 +0800 Subject: [PATCH 08/10] op stream selectivity buff --- include/common/tcommon.h | 2 +- include/libs/stream/streamState.h | 5 +- include/libs/stream/tstreamFileState.h | 5 +- source/libs/executor/src/timewindowoperator.c | 48 ++++++++++++++----- source/libs/function/src/builtinsimpl.c | 9 +--- source/libs/stream/src/streamState.c | 30 +++++++----- source/libs/stream/src/tstreamFileState.c | 16 +++++-- 7 files changed, 73 insertions(+), 42 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 2bc67e439f..51d31ca91f 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -82,7 +82,7 @@ typedef struct STuplePos { int32_t pageId; int32_t offset; }; - STupleKey streamTupleKey; + SWinKey streamTupleKey; }; } STuplePos; diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 9b80ce2786..0de091bc4e 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -77,9 +77,8 @@ typedef struct { int64_t number; } SStreamStateCur; -int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen); -int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen); -int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key); +int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); +int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen); int32_t streamStatePut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen); int32_t streamStateGet(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen); diff --git a/include/libs/stream/tstreamFileState.h b/include/libs/stream/tstreamFileState.h index d50f0e0a31..7124e2d251 100644 --- a/include/libs/stream/tstreamFileState.h +++ b/include/libs/stream/tstreamFileState.h @@ -38,8 +38,8 @@ typedef SList SStreamSnapshot; typedef TSKEY (*GetTsFun)(void*); -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, - TSKEY delMark); +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, + GetTsFun fp, void* pFile, TSKEY delMark); void streamFileStateDestroy(SStreamFileState* pFileState); void streamFileStateClear(SStreamFileState* pFileState); bool needClearDiskBuff(SStreamFileState* pFileState); @@ -56,6 +56,7 @@ int32_t recoverSnapshot(SStreamFileState* pFileState); int32_t getSnapshotIdList(SStreamFileState* pFileState, SArray* list); int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark); +int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState); #ifdef __cplusplus } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index ca3165bf92..7570e3c112 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2675,6 +2675,29 @@ TSKEY compareTs(void* pKey) { return pWinKey->ts; } +int32_t getSelectivityBufSize(SqlFunctionCtx* pCtx) { + if (pCtx->subsidiaries.rowLen == 0) { + int32_t rowLen = 0; + for (int32_t j = 0; j < pCtx->subsidiaries.num; ++j) { + SqlFunctionCtx* pc = pCtx->subsidiaries.pCtx[j]; + rowLen += pc->pExpr->base.resSchema.bytes; + } + + return rowLen + pCtx->subsidiaries.num * sizeof(bool); + } else { + return pCtx->subsidiaries.rowLen; + } +} + +int32_t getMaxFunResSize(SExprSupp* pSup, int32_t numOfCols) { + int32_t size = 0; + for (int32_t i = 0; i < numOfCols; ++i) { + int32_t resSize = getSelectivityBufSize(pSup->pCtx + i); + size = TMAX(size, resSize); + } + return size; +} + SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; @@ -2721,8 +2744,11 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); + pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); + streamStateSetNumber(pInfo->pState, -1); int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState); + pInfo->pState); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2731,10 +2757,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); - pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); - *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); - streamStateSetNumber(pInfo->pState, -1); - initResultRowInfo(&pInfo->binfo.resultRowInfo); pInfo->numOfChild = numOfChild; @@ -2767,7 +2789,8 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, + int32_t funResSize= getMaxFunResSize(&pOperator->exprSupp, numOfCols); + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); pInfo->dataVersion = 0; @@ -4886,9 +4909,13 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; initResultSizeInfo(&pOperator->resultInfo, 4096); + pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); + *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); + streamStateSetNumber(pInfo->pState, -1); + size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, - pTaskInfo->streamInfo.pState); + pInfo->pState); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4909,10 +4936,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pDelRes = createSpecialDataBlock(STREAM_DELETE_RESULT); initResultRowInfo(&pInfo->binfo.resultRowInfo); - pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState)); - *(pInfo->pState) = *(pTaskInfo->streamInfo.pState); - streamStateSetNumber(pInfo->pState, -1); - pInfo->pPhyNode = NULL; // create new child pInfo->pPullDataMap = NULL; pInfo->pPullWins = NULL; // SPullWindowInfo @@ -4925,7 +4948,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->numOfDatapack = 0; pInfo->pUpdated = NULL; pInfo->pUpdatedMap = NULL; - pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, + int32_t funResSize= getMaxFunResSize(pSup, numOfCols); + pInfo->pState->pFileState = streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->aggSup.resultRowSize, funResSize, compareTs, pInfo->pState, pInfo->twAggSup.deleteMark); setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index af318a6bc5..e50eb0b784 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -881,10 +881,6 @@ int32_t setSelectivityValue(SqlFunctionCtx* pCtx, SSDataBlock* pBlock, const STu } pStart += pDstCol->info.bytes; } - - if (pCtx->saveHandle.pState) { - streamFreeVal((void*)p); - } } return TSDB_CODE_SUCCESS; @@ -3121,7 +3117,7 @@ void* serializeTupleData(const SSDataBlock* pSrcBlock, int32_t rowIndex, SSubsid return buf; } -static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, STupleKey* key, +static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, size_t length, SWinKey* key, STuplePos* pPos) { STuplePos p = {0}; if (pHandle->pBuf != NULL) { @@ -3169,7 +3165,7 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos) { prepareBuf(pCtx); - STupleKey key; + SWinKey key; if (pCtx->saveHandle.pBuf == NULL) { SColumnInfoData* pColInfo = taosArrayGet(pSrcBlock->pDataBlock, 0); if (pColInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP) { @@ -3177,7 +3173,6 @@ int32_t saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* key.groupId = pSrcBlock->info.id.groupId; key.ts = skey; - key.exprIdx = pCtx->exprIdx; } } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index fa9bfe6ca5..a607d6af74 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -262,26 +262,30 @@ int32_t streamStateCommit(SStreamState* pState) { #endif } -int32_t streamStateFuncPut(SStreamState* pState, const STupleKey* key, const void* value, int32_t vLen) { +int32_t streamStateFuncPut(SStreamState* pState, const SWinKey* key, const void* value, int32_t vLen) { #ifdef USE_ROCKSDB - return streamStateFuncPut_rocksdb(pState, key, value, vLen); + void* pVal = NULL; + int32_t len = 0; + int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), &pVal, &len); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + memcpy(buf + len - rowSize, value, vLen); + return code; #else return tdbTbUpsert(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), value, vLen, pState->pTdbState->txn); #endif } -int32_t streamStateFuncGet(SStreamState* pState, const STupleKey* key, void** pVal, int32_t* pVLen) { +int32_t streamStateFuncGet(SStreamState* pState, const SWinKey* key, void** ppVal, int32_t* pVLen) { #ifdef USE_ROCKSDB - return streamStateFuncGet_rocksdb(pState, key, pVal, pVLen); + void* pVal = NULL; + int32_t len = 0; + int32_t code = getRowBuff(pState->pFileState, (void*)key, sizeof(SWinKey), (void**)(&pVal), &len); + char* buf = ((SRowBuffPos*)pVal)->pRowBuff; + uint32_t rowSize = streamFileStateGeSelectRowSize(pState->pFileState); + *ppVal = buf + len - rowSize; + return code; #else - return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pVal, pVLen); -#endif -} - -int32_t streamStateFuncDel(SStreamState* pState, const STupleKey* key) { -#ifdef USE_ROCKSDB - return streamStateFuncDel_rocksdb(pState, key); -#else - return tdbTbDelete(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), pState->pTdbState->txn); + return tdbTbGet(pState->pTdbState->pFuncStateDb, key, sizeof(STupleKey), ppVal, pVLen); #endif } diff --git a/source/libs/stream/src/tstreamFileState.c b/source/libs/stream/src/tstreamFileState.c index b7401ec5d9..1b62b80b3d 100644 --- a/source/libs/stream/src/tstreamFileState.c +++ b/source/libs/stream/src/tstreamFileState.c @@ -31,6 +31,7 @@ struct SStreamFileState { SSHashObj* rowBuffMap; void* pFileStore; int32_t rowSize; + int32_t selectivityRowSize; int32_t keyLen; uint64_t preCheckPointVersion; uint64_t checkPointVersion; @@ -44,7 +45,7 @@ struct SStreamFileState { typedef SRowBuffPos SRowBuffInfo; -SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, GetTsFun fp, void* pFile, +SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_t rowSize, uint32_t selectRowSize, GetTsFun fp, void* pFile, TSKEY delMark) { if (memSize <= 0) { memSize = DEFAULT_MAX_STREAM_BUFFER_SIZE; @@ -57,6 +58,7 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ if (!pFileState) { goto _error; } + rowSize += selectRowSize; pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->usedBuffs = tdListNew(POINTER_BYTES); pFileState->freeBuffs = tdListNew(POINTER_BYTES); @@ -68,11 +70,11 @@ SStreamFileState* streamFileStateInit(int64_t memSize, uint32_t keySize, uint32_ } pFileState->keyLen = keySize; pFileState->rowSize = rowSize; + pFileState->selectivityRowSize = selectRowSize; pFileState->preCheckPointVersion = 0; pFileState->checkPointVersion = 1; pFileState->pFileStore = pFile; pFileState->getTs = fp; - pFileState->maxRowCount = TMAX((uint64_t)memSize / rowSize, FLUSH_NUM * 2); pFileState->curRowCount = 0; pFileState->deleteMark = delMark; pFileState->flushMark = INT64_MIN; @@ -440,7 +442,9 @@ int32_t deleteExpiredCheckPoint(SStreamFileState* pFileState, TSKEY mark) { int32_t recoverSnapshot(SStreamFileState* pFileState) { int32_t code = TSDB_CODE_SUCCESS; - deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark); + if (pFileState->maxTs != INT64_MIN) { + deleteExpiredCheckPoint(pFileState, pFileState->maxTs - pFileState->deleteMark); + } void* pStVal = NULL; int32_t len = 0; @@ -475,4 +479,8 @@ int32_t recoverSnapshot(SStreamFileState* pFileState) { streamStateFreeCur(pCur); return TSDB_CODE_SUCCESS; -} \ No newline at end of file +} + +int32_t streamFileStateGeSelectRowSize(SStreamFileState* pFileState) { + return pFileState->selectivityRowSize; +} From 04c8c933c850cb471d0578acae6f1815b402058f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 May 2023 17:15:03 +0800 Subject: [PATCH 09/10] fix(stream): add initial offset check. --- source/dnode/vnode/src/tq/tqRestore.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 8ada268c4d..091b8b9ef8 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -124,7 +124,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); } else { int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - if (currentVer != pTask->chkInfo.currentVer) { + if (currentVer != -1 && currentVer != pTask->chkInfo.currentVer) { int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit streamMetaReleaseTask(pStreamMeta, pTask); From cbb02458ed4653192484dfe52429cd27ee71b4c5 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 May 2023 11:19:11 +0800 Subject: [PATCH 10/10] fix(stream): fix the seek condition. --- source/dnode/vnode/src/tq/tqRestore.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 091b8b9ef8..63d17dd6c5 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -124,7 +124,7 @@ int32_t createStreamRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { tqDebug("vgId:%d s-task:%s wal reader seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.currentVer); } else { int64_t currentVer = walReaderGetCurrentVer(pTask->exec.pWalReader); - if (currentVer != -1 && currentVer != pTask->chkInfo.currentVer) { + if (currentVer == -1) { int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.currentVer); if (code != TSDB_CODE_SUCCESS) { // no data in wal, quit streamMetaReleaseTask(pStreamMeta, pTask);