From e457d94ac6a97c261262d0f23b9acccb7e760af2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 12 Sep 2023 18:09:02 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 6 ++-- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqSink.c | 2 +- source/dnode/vnode/src/tq/tqStreamTask.c | 2 +- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamData.c | 38 +++++------------------- source/libs/stream/src/streamDispatch.c | 5 ++-- source/libs/stream/src/streamExec.c | 18 ++++------- source/libs/stream/src/streamQueue.c | 21 +++++++++++-- source/libs/stream/src/streamRecover.c | 4 --- 10 files changed, 41 insertions(+), 59 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 30c60bcf0d..ac067823f8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -111,14 +111,12 @@ typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver); typedef struct { int8_t type; int64_t ver; - int32_t* dataRef; SPackedData submit; } SStreamDataSubmit; typedef struct { int8_t type; int64_t ver; - SArray* dataRefs; // SArray SArray* submits; // SArray } SStreamMergedSubmit; @@ -672,7 +670,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); -int32_t streamTaskGetInputQItems(const SStreamTask* pTask); +int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); // common int32_t streamRestoreParam(SStreamTask* pTask); @@ -696,7 +694,7 @@ void streamTaskOpenAllUpstreamInput(SStreamTask* pTask); // source level int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow); -int32_t streamSourceScanHistoryData(SStreamTask* pTask); +int32_t streamScanHistoryData(SStreamTask* pTask); int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask); // agg level diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e2888e8e9a..540eaed9ba 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1048,7 +1048,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { ASSERT(pTask->status.pauseAllowed == true); } - streamSourceScanHistoryData(pTask); + streamScanHistoryData(pTask); if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { double el = (taosGetTimestampMs() - pTask->tsInfo.step1Start) / 1000.0; tqDebug("s-task:%s is paused in the step1, elapsed time:%.2fs, sched-status:%d", pTask->id.idStr, el, diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 7f52750348..0925573248 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -537,7 +537,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, STSchema* pTSchema, SSDataBlock if (k == 0) { SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); void* colData = colDataGetData(pColData, j); - tqDebug("s-task:%s tq sink pipe2, row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData); + tqDebug("s-task:%s sink row %d, col %d ts %" PRId64, id, j, k, *(int64_t*)colData); } if (IS_SET_NULL(pCol)) { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 9bae6d6e33..14135b14dc 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -396,7 +396,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - int32_t numOfItems = streamTaskGetInputQItems(pTask); + int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputInfo.queue); int64_t maxVer = (pTask->info.fillHistory == 1) ? pTask->dataRange.range.maxVer : INT64_MAX; taosThreadMutexLock(&pTask->lock); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 75ac86b462..102787f019 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -177,7 +177,7 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp } int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { - SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SStreamDataBlock)); int8_t status = TASK_INPUT_STATUS__NORMAL; // enqueue diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index ea3e4f5985..00bf631d74 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -115,28 +115,16 @@ SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type) { return NULL; } - pDataSubmit->dataRef = (int32_t*)taosMemoryMalloc(sizeof(int32_t)); - if (pDataSubmit->dataRef == NULL) { - taosFreeQitem(pDataSubmit); - return NULL; - } - pDataSubmit->ver = pData->ver; pDataSubmit->submit = *pData; - *pDataSubmit->dataRef = 1; // initialize the reference count to be 1 pDataSubmit->type = type; return pDataSubmit; } void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit) { - int32_t ref = atomic_sub_fetch_32(pDataSubmit->dataRef, 1); - ASSERT(ref >= 0 && pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); - - if (ref == 0) { - taosMemoryFree(pDataSubmit->submit.msgStr); - taosMemoryFree(pDataSubmit->dataRef); - } + ASSERT(pDataSubmit->type == STREAM_INPUT__DATA_SUBMIT); + taosMemoryFree(pDataSubmit->submit.msgStr); } SStreamMergedSubmit* streamMergedSubmitNew() { @@ -146,11 +134,8 @@ SStreamMergedSubmit* streamMergedSubmitNew() { } pMerged->submits = taosArrayInit(0, sizeof(SPackedData)); - pMerged->dataRefs = taosArrayInit(0, sizeof(void*)); - - if (pMerged->dataRefs == NULL || pMerged->submits == NULL) { + if (pMerged->submits == NULL) { taosArrayDestroy(pMerged->submits); - taosArrayDestroy(pMerged->dataRefs); taosFreeQitem(pMerged); return NULL; } @@ -160,9 +145,10 @@ SStreamMergedSubmit* streamMergedSubmitNew() { } int32_t streamMergeSubmit(SStreamMergedSubmit* pMerged, SStreamDataSubmit* pSubmit) { - taosArrayPush(pMerged->dataRefs, &pSubmit->dataRef); taosArrayPush(pMerged->submits, &pSubmit->submit); - pMerged->ver = pSubmit->ver; + if (pSubmit->ver > pMerged->ver) { + pMerged->ver = pSubmit->ver; + } return 0; } @@ -222,18 +208,10 @@ void streamFreeQitem(SStreamQueueItem* data) { int32_t sz = taosArrayGetSize(pMerge->submits); for (int32_t i = 0; i < sz; i++) { - int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); - int32_t ref = atomic_sub_fetch_32(pRef, 1); - ASSERT(ref >= 0); - - if (ref == 0) { - SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i); - taosMemoryFree(pSubmit->msgStr); - taosMemoryFree(pRef); - } + SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i); + taosMemoryFree(pSubmit->msgStr); } taosArrayDestroy(pMerge->submits); - taosArrayDestroy(pMerge->dataRefs); taosFreeQitem(pMerge); } else if (type == STREAM_INPUT__REF_DATA_BLOCK) { SStreamRefDataBlock* pRefBlock = (SStreamRefDataBlock*)data; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index d04f55628c..89f067baeb 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -498,9 +498,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); const char* id = pTask->id.idStr; - int32_t numOfElems = taosQueueItemSize(pTask->outputInfo.queue->pQueue); + int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue); + int32_t size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue)); if (numOfElems > 0) { - qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", id, numOfElems); + qDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size); } // to make sure only one dispatch is running diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 8d282696c1..9595d3bf02 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -164,11 +164,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return code; } -int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) { +int32_t streamScanHistoryData(SStreamTask* pTask) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); + int32_t code = TSDB_CODE_SUCCESS; void* exec = pTask->exec.pExecutor; bool finished = false; + int32_t outputBatchSize = 100; qSetStreamOpOpen(exec); @@ -217,8 +219,8 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) { block.info.childId = pTask->info.selfChildId; taosArrayPush(pRes, &block); - if ((++numOfBlocks) >= batchSize) { - qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, batchSize); + if ((++numOfBlocks) >= outputBatchSize) { + qDebug("s-task:%s scan exec numOfBlocks:%d, output limit:%d reached", pTask->id.idStr, numOfBlocks, outputBatchSize); break; } } @@ -248,13 +250,6 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSize) { return 0; } -int32_t streamTaskGetInputQItems(const SStreamTask* pTask) { - int32_t numOfItems1 = taosQueueItemSize(pTask->inputInfo.queue->pQueue); - int32_t numOfItems2 = taosQallItemSize(pTask->inputInfo.queue->qall); - - return numOfItems1 + numOfItems2; -} - // wait for the stream task to be idle static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { const char* id = pTask->id.idStr; @@ -576,7 +571,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. if (type == STREAM_INPUT__CHECKPOINT) { -// ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); qDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus)); streamTaskBuildCheckpoint(pTask); @@ -608,8 +602,6 @@ int32_t streamTryExec(SStreamTask* pTask) { return -1; } -// streamTaskBuildCheckpoint(pTask); - atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 510c8dc9b1..a38095ea58 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -165,6 +165,13 @@ bool streamQueueIsFull(const STaosQueue* pQueue) { return (isFull || size >= STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE); } +int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { + int32_t numOfItems1 = taosQueueItemSize(pQueue->pQueue); + int32_t numOfItems2 = taosQallItemSize(pQueue->qall); + + return numOfItems1 + numOfItems2; +} + int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) { int32_t retryTimes = 0; int32_t MAX_RETRY_TIMES = 5; @@ -269,11 +276,11 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) int8_t type = pItem->type; STaosQueue* pQueue = pTask->inputInfo.queue->pQueue; int32_t total = taosQueueItemSize(pQueue) + 1; - double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue)) { + double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); qTrace( "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); @@ -292,24 +299,30 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) return code; } + double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already. qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, msgLen, ver, total, size + SIZE_IN_MB(msgLen)); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { if (streamQueueIsFull(pQueue)) { + double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + qTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_INPUT_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); destroyStreamDataBlock((SStreamDataBlock*)pItem); return -1; } - qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); int32_t code = taosWriteQitem(pQueue, pItem); if (code != TSDB_CODE_SUCCESS) { destroyStreamDataBlock((SStreamDataBlock*)pItem); return code; } + + double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + qDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { int32_t code = taosWriteQitem(pQueue, pItem); @@ -317,6 +330,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) taosFreeQitem(pItem); return code; } + + double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); qDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); } else if (type == STREAM_INPUT__GET_RES) { @@ -326,6 +341,8 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) taosFreeQitem(pItem); return code; } + + double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } else { ASSERT(0); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 0693495b14..54d5957900 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -371,10 +371,6 @@ int32_t initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8 return 0; } -int32_t streamSourceScanHistoryData(SStreamTask* pTask) { - return streamScanExec(pTask, 100); -} - int32_t streamTaskPutTranstateIntoInputQ(SStreamTask* pTask) { SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pTranstate == NULL) {