From 3d00f6ce547f00a7a857b72cec7af5c6241a6aef Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 14 May 2023 22:08:17 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqSink.c | 39 +++++++++++++++--------------- source/libs/stream/src/stream.c | 10 ++++---- 2 files changed, 25 insertions(+), 24 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 4a9e3dcee7..e2f9089730 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -19,10 +19,10 @@ #define MAX_CATCH_NUM 10240 -typedef struct STblInfo { +typedef struct STableSinkInfo { uint64_t uid; char tbName[TSDB_TABLE_NAME_LEN]; -} STblInfo; +} STableSinkInfo; int32_t tqBuildDeleteReq(const char* stbFullName, const SSDataBlock* pDataBlock, SBatchDeleteReq* deleteReq, const char* pIdStr) { @@ -97,16 +97,17 @@ end: return ret; } -int32_t tqGetTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo** pTbl) { - void* pVal = tSimpleHashGet(tblInfo, &groupId, sizeof(uint64_t)); +static int32_t tqGetTableInfo(SSHashObj* pTableInfoMap,uint64_t groupId, STableSinkInfo** pInfo) { + void* pVal = tSimpleHashGet(pTableInfoMap, &groupId, sizeof(uint64_t)); if (pVal) { - *pTbl = *(STblInfo**)pVal; + *pInfo = *(STableSinkInfo**)pVal; return TSDB_CODE_SUCCESS; } + return TSDB_CODE_FAILED; } -int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STblInfo* pTbl) { +int32_t tqPutTableInfo(SSHashObj* tblInfo ,uint64_t groupId, STableSinkInfo* pTbl) { if (tSimpleHashGetSize(tblInfo) > MAX_CATCH_NUM) { return TSDB_CODE_SUCCESS; } @@ -274,7 +275,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d crTblArray = NULL; } else { SSubmitTbData tbData = {0}; - tqDebug("tq sink pipe, convert block1 %d, rows: %d", i, rows); + tqDebug("tq sink pipe, convert block:%d, rows:%d", i, rows); if (!(tbData.aRowP = taosArrayInit(rows, sizeof(SRow*)))) { goto _end; @@ -283,35 +284,35 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tbData.suid = suid; tbData.uid = 0; // uid is assigned by vnode tbData.sver = pTSchema->version; - STblInfo* pTblMeta = NULL; - int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTblMeta); + STableSinkInfo* pTableSinkInfo = NULL; + int32_t res = tqGetTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, &pTableSinkInfo); if (res != TSDB_CODE_SUCCESS) { - pTblMeta = taosMemoryCalloc(1, sizeof(STblInfo)); + pTableSinkInfo = taosMemoryCalloc(1, sizeof(STableSinkInfo)); } char* ctbName = pDataBlock->info.parTbName; if (!ctbName[0]) { if (res == TSDB_CODE_SUCCESS) { - memcpy(ctbName, pTblMeta->tbName, strlen(pTblMeta->tbName)); + memcpy(ctbName, pTableSinkInfo->tbName, strlen(pTableSinkInfo->tbName)); } else { char* tmp = buildCtbNameByGroupId(stbFullName, pDataBlock->info.id.groupId); memcpy(ctbName, tmp, strlen(tmp)); - memcpy(pTblMeta->tbName, tmp, strlen(tmp)); + memcpy(pTableSinkInfo->tbName, tmp, strlen(tmp)); taosMemoryFree(tmp); - tqDebug("vgId:%d, gropuid:%" PRIu64 " datablock tabel name is null", TD_VID(pVnode), + tqDebug("vgId:%d, gropuId:%" PRIu64 " datablock table name is null", TD_VID(pVnode), pDataBlock->info.id.groupId); } } if (res == TSDB_CODE_SUCCESS) { - tbData.uid = pTblMeta->uid; + tbData.uid = pTableSinkInfo->uid; } else { SMetaReader mr = {0}; metaReaderInit(&mr, pVnode->pMeta, 0); if (metaGetTableEntryByName(&mr, ctbName) < 0) { metaReaderClear(&mr); - taosMemoryFree(pTblMeta); + taosMemoryFree(pTableSinkInfo); tqDebug("vgId:%d, stream write into %s, table auto created", TD_VID(pVnode), ctbName); SVCreateTbReq* pCreateTbReq = NULL; @@ -371,7 +372,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d tqError("vgId:%d, failed to write into %s, since table type incorrect, type %d", TD_VID(pVnode), ctbName, mr.me.type); metaReaderClear(&mr); - taosMemoryFree(pTblMeta); + taosMemoryFree(pTableSinkInfo); continue; } @@ -380,13 +381,13 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d ", actual suid %" PRId64 "", TD_VID(pVnode), ctbName, suid, mr.me.ctbEntry.suid); metaReaderClear(&mr); - taosMemoryFree(pTblMeta); + taosMemoryFree(pTableSinkInfo); continue; } tbData.uid = mr.me.uid; - pTblMeta->uid = mr.me.uid; - tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTblMeta); + pTableSinkInfo->uid = mr.me.uid; + tqPutTableInfo(pTask->tbSink.pTblInfo, pDataBlock->info.id.groupId, pTableSinkInfo); metaReaderClear(&mr); } } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 90f4bac242..6047f74ab2 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -284,7 +284,9 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S } bool tInputQueueIsFull(const SStreamTask* pTask) { - return taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; + bool isFull = taosQueueItemSize((pTask->inputQueue->queue)) >= STREAM_TASK_INPUT_QUEUEU_CAPACITY; + double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); + return (isFull || size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE); } int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { @@ -298,8 +300,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, px->submit.msgLen, px->submit.ver, numOfBlocks, size); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && - (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, size); @@ -320,8 +321,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && - (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) { + if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, size);