From d8525123a2b03c54d35564d179b3077061a32f67 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 19 May 2023 11:03:41 +0800 Subject: [PATCH 01/21] enh(stream): add API to retrieve last ts for multi-tables. --- source/dnode/vnode/inc/vnode.h | 1 + source/dnode/vnode/src/tq/tq.c | 3 +- source/dnode/vnode/src/tsdb/tsdbRead.c | 55 ++++++++++++++++++++++++++ source/libs/stream/src/streamRecover.c | 17 ++++---- 4 files changed, 67 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index da5d6f8b3c..18fa893fa4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -198,6 +198,7 @@ void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader); +int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr); int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a8c2a09319..7dc7f999fb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -760,6 +760,7 @@ void freePtr(void *ptr) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); + pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; @@ -837,7 +838,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - SWalFilterCond cond = {.deleteMsg = 1}; + SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 36b18ce9f4..cde2672541 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -5506,3 +5506,58 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { } void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; } + +/*-------------todo:refactor the implementation of those APIs in this file to seperate the API into two files------*/ +// opt perf, do NOT create so many readers +int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr) { + SQueryTableDataCond cond = {.type = TIMEWINDOW_RANGE_CONTAINED, .numOfCols = 1, .order = TSDB_ORDER_DESC, + .startVersion = -1, .endVersion = -1}; + cond.twindows.skey = INT64_MIN; + cond.twindows.ekey = INT64_MAX; + + cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); + cond.pSlotList = taosMemoryMalloc(sizeof(int32_t) * cond.numOfCols); + if (cond.colList == NULL || cond.pSlotList == NULL) { + // todo + } + + cond.colList[0].colId = 1; + cond.colList[0].slotId = 0; + cond.colList[0].type = TSDB_DATA_TYPE_TIMESTAMP; + + cond.pSlotList[0] = 0; + + STableKeyInfo* pTableKeyInfo = pTableList; + STsdbReader* pReader = NULL; + SSDataBlock* pBlock = createDataBlock(); + + SColumnInfoData data = {0}; + data.info = (SColumnInfo) {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .bytes = TSDB_KEYSIZE}; + blockDataAppendColInfo(pBlock, &data); + + int64_t key = INT64_MIN; + + for(int32_t i = 0; i < numOfTables; ++i) { + int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, &pReader, pIdStr, false, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + bool hasData = false; + code = tsdbNextDataBlock(pReader, &hasData); + if (!hasData || code != TSDB_CODE_SUCCESS) { + continue; + } + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0); + int64_t k = *(int64_t*)pCol->pData; + + if (key < k) { + key = k; + } + + tsdbReaderClose(pReader); + } + + return 0; +} diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index f71d078a3d..7236c6c4b9 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -20,7 +20,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE); - qDebug("s-task:%s set task status:%d and start recover", pTask->id.idStr, pTask->status.taskStatus); + qDebug("s-task:%s set task status:%d and start to recover", pTask->id.idStr, pTask->status.taskStatus); streamSetParamForRecover(pTask); streamSourceRecoverPrepareStep1(pTask, version); @@ -46,6 +46,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { streamSetParamForRecover(pTask); streamAggRecoverPrepare(pTask); } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + // sink nodes has no specified operation for fill history atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); } @@ -71,23 +72,23 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; - qDebug("s-task:%s at node %d check downstream task %d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, + qDebug("s-task:%s at node %d check downstream task:%d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t vgSz = taosArrayGetSize(vgInfo); - pTask->recoverTryingDownstream = vgSz; - pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t)); + int32_t numOfVgs = taosArrayGetSize(vgInfo); + pTask->recoverTryingDownstream = numOfVgs; + pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t)); - for (int32_t i = 0; i < vgSz; i++) { + for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); req.reqId = tGenIdPI64(); taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("s-task:%s at node %d check downstream task %d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s at node %d check downstream task:%d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } @@ -161,7 +162,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; - qDebug("s-task:%s all downstream tasks:%d are ready, now enter into recover stage", pTask->id.idStr, numOfReqs); + qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", pTask->id.idStr, numOfReqs); streamTaskLaunchRecover(pTask, version); } } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { From 90687bae1c48cc80d9217fa541bd5b4df9463910 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 19 May 2023 15:30:50 +0800 Subject: [PATCH 02/21] refactor: do some internal refactor. --- source/libs/stream/inc/streamInc.h | 2 +- source/libs/stream/src/stream.c | 35 ++++++++++--------------- source/libs/stream/src/streamData.c | 3 ++- source/libs/stream/src/streamDispatch.c | 2 +- source/libs/stream/src/streamExec.c | 1 + 5 files changed, 19 insertions(+), 24 deletions(-) diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index c471bc2bd8..7e1b4dcf2d 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -34,7 +34,7 @@ typedef struct { static SStreamGlobalEnv streamEnv; int32_t streamDispatch(SStreamTask* pTask); -int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); +int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 26e1c2ab43..1a5ef2e007 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -121,24 +121,22 @@ int32_t streamSchedExec(SStreamTask* pTask) { int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - int8_t status; - // enqueue data block - if (pData != NULL) { + int8_t status = 0; + if (pData == NULL) { + streamTaskInputFail(pTask); + status = TASK_INPUT_STATUS__FAILED; + qDebug("vgId:%d, s-task:%s failed to received dispatch msg, reason: out of memory", pTask->pMeta->vgId, pTask->id.idStr); + } else { pData->type = STREAM_INPUT__DATA_BLOCK; pData->srcVgId = pReq->dataSrcVgId; - // decode - /*pData->blocks = pReq->data;*/ - /*pBlock->sourceVer = pReq->sourceVer;*/ - streamDispatchReqToData(pReq, pData); + + streamConvertDispatchMsgToData(pReq, pData); if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { // input queue is full, upstream is blocked now status = TASK_INPUT_STATUS__BLOCKED; } - } else { - streamTaskInputFail(pTask); - status = TASK_INPUT_STATUS__FAILED; } // rsp by input status @@ -219,7 +217,7 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { } int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("s-task:%s receive dispatch msg from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, + qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId); // todo add the input queue buffer limitation @@ -294,16 +292,13 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); if (type == STREAM_INPUT__DATA_SUBMIT) { - int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - px->submit.msgLen, px->submit.ver, numOfBlocks, size); + px->submit.msgLen, px->submit.ver, total, size); if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, + pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); taosFreeQitem(pItem); @@ -312,22 +307,20 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, + pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); return -1; } - qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, numOfBlocks); + qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total); taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__GET_RES) { taosWriteQitem(pTask->inputQueue->queue, pItem); + qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index b70a8c93a9..2233e21f60 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -15,7 +15,7 @@ #include "streamInc.h" -int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { +int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { int32_t blockNum = pReq->blockNum; SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum); if (pArray == NULL) { @@ -39,6 +39,7 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.childId = pReq->upstreamChildId; } + pData->blocks = pArray; return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a757d39d3f..7e0fbd30b1 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -320,7 +320,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p msg.pCont = buf; msg.msgType = pTask->dispatchMsgType; - qDebug("dispatch from s-task:%s to taskId:%d vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); + qDebug("dispatch from s-task:%s to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); tmsgSendReq(pEpSet, &msg); code = 0; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d14e8102d8..ec2738558a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -272,6 +272,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (streamTaskShouldPause(&pTask->status)) { return 0; } + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { From e2ec8d738c2f36f60612d135d8a557ef00cb922b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 19 May 2023 18:08:01 +0800 Subject: [PATCH 03/21] other: add some logs. --- source/dnode/vnode/src/tq/tqRestore.c | 4 ++-- source/libs/stream/src/streamExec.c | 7 ++++--- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 9acf2454af..ff9f95d5fa 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -119,7 +119,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t status = pTask->status.taskStatus; if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - tqDebug("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel); +// tqTrace("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -132,7 +132,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } if (tInputQueueIsFull(pTask)) { - tqDebug("s-task:%s input queue is full, do nothing", pTask->id.idStr); + tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index ec2738558a..6391a02ace 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -105,9 +105,10 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* block.info.childId = pTask->selfChildId; taosArrayPush(pRes, &block); - qDebug("task %d(child %d) processed retrieve, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId, + qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId, pRetrieveBlock->reqId); } + break; } @@ -118,12 +119,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* continue; } - qDebug("s-task:%s (child %d) executed and get block", pTask->id.idStr, pTask->selfChildId); - SSDataBlock block = {0}; assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; taosArrayPush(pRes, &block); + + qDebug("s-task:%s (child %d) executed and get block, total blocks:%d", pTask->id.idStr, pTask->selfChildId, (int32_t)taosArrayGetSize(pRes)); } return 0; From a9b7b8a5fdf4fec59477161ea753496eba71536a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 19 May 2023 19:26:15 +0800 Subject: [PATCH 04/21] other: add some logs. --- source/libs/stream/src/stream.c | 4 ++-- source/libs/stream/src/streamExec.c | 17 ++++++++++++----- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 1a5ef2e007..128d26bfb1 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -314,13 +314,13 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, total); + qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__GET_RES) { taosWriteQitem(pTask->inputQueue->queue, pItem); - qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); + qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 6391a02ace..2d950723bc 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -29,9 +29,10 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) { return (status == TASK_STATUS__PAUSE); } -static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { +static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes, int64_t* resSize) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; + *resSize = 0; while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); @@ -122,9 +123,12 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* SSDataBlock block = {0}; assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; + + (*resSize) += blockDataGetSize(output); taosArrayPush(pRes, &block); - qDebug("s-task:%s (child %d) executed and get block, total blocks:%d", pTask->id.idStr, pTask->selfChildId, (int32_t)taosArrayGetSize(pRes)); + qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr, pTask->selfChildId, (int32_t)taosArrayGetSize(pRes), + (*resSize)/1048576.0); } return 0; @@ -330,10 +334,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { continue; } + int64_t resSize = 0; + int64_t st = taosGetTimestampMs(); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize); - streamTaskExecImpl(pTask, pInput, pRes); + streamTaskExecImpl(pTask, pInput, pRes, &resSize); int64_t ckId = 0; int64_t dataVer = 0; @@ -356,10 +362,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { taosWUnLockLatch(&pTask->pMeta->lock); qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr); } - } else { - qDebug("s-task:%s exec end", pTask->id.idStr); } + double el = (taosGetTimestampMs() - st) / 1000.0; + qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB", pTask->id.idStr, el, resSize/1048576.0); + if (taosArrayGetSize(pRes) != 0) { SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); if (qRes == NULL) { From 56e5a483dd3d5dd98432dd2a43b13988cb260326 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 19 May 2023 20:00:32 +0800 Subject: [PATCH 05/21] other: update log. --- source/libs/stream/src/streamExec.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 2d950723bc..d77d6c21ce 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -124,7 +124,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; - (*resSize) += blockDataGetSize(output); + (*resSize) += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); taosArrayPush(pRes, &block); qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr, pTask->selfChildId, (int32_t)taosArrayGetSize(pRes), From 8ab065d01ecf6c50aae8e46ddb27a8830c5378dd Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 19 May 2023 23:53:23 +0800 Subject: [PATCH 06/21] other: update some logs. --- source/common/src/tdatablock.c | 3 --- source/libs/stream/src/streamExec.c | 6 ++++-- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 5f7e43668a..ff583baaa6 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2514,9 +2514,6 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { *actualLen = dataLen; *groupId = pBlock->info.id.groupId; ASSERT(dataLen > 0); - - uDebug("build data block, actualLen:%d, rows:%d, cols:%d", dataLen, *rows, *cols); - return dataLen; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d77d6c21ce..e888faf4d1 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -365,9 +365,11 @@ int32_t streamExecForAll(SStreamTask* pTask) { } double el = (taosGetTimestampMs() - st) / 1000.0; - qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB", pTask->id.idStr, el, resSize/1048576.0); - if (taosArrayGetSize(pRes) != 0) { + int32_t numOfBlocks = taosArrayGetSize(pRes); + qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize/1048576.0, numOfBlocks); + + if (numOfBlocks > 0) { SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); if (qRes == NULL) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); From 6ded3284da6a5e6adc6f35b943d32d3b86373d9c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 11:37:17 +0800 Subject: [PATCH 07/21] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 2 +- source/libs/stream/src/stream.c | 7 +- source/libs/stream/src/streamData.c | 4 +- source/libs/stream/src/streamExec.c | 261 ++++++++++++++++------------ source/libs/stream/src/streamMeta.c | 2 + 5 files changed, 165 insertions(+), 111 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8f5d6c38f3..7c7039c1c0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -536,7 +536,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); -int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); +int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 128d26bfb1..7f19529d11 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -195,7 +195,8 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } -int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { +// todo add log +int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t code = 0; if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); @@ -209,10 +210,14 @@ int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); code = taosWriteQitem(pTask->outputQueue->queue, pBlock); if (code != 0) { + taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); + taosFreeQitem(pBlock); return code; } + streamDispatch(pTask); } + return 0; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 2233e21f60..ca24414db8 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -185,11 +185,13 @@ void streamFreeQitem(SStreamQueueItem* data) { taosFreeQitem(data); } else if (type == STREAM_INPUT__MERGED_SUBMIT) { SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; - int32_t sz = taosArrayGetSize(pMerge->submits); + + int32_t sz = taosArrayGetSize(pMerge->submits); for (int32_t i = 0; i < sz; i++) { int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); int32_t ref = atomic_sub_fetch_32(pRef, 1); ASSERT(ref >= 0); + if (ref == 0) { SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i); taosMemoryFree(pSubmit->msgStr); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index e888faf4d1..122dbc47f5 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -19,7 +19,10 @@ #define MAX_STREAM_EXEC_BATCH_NUM 128 #define MIN_STREAM_EXEC_BATCH_NUM 16 -bool streamTaskShouldStop(const SStreamStatus* pStatus) { +static int32_t updateCheckPointInfo (SStreamTask* pTask); +static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); + + bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); } @@ -29,55 +32,17 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) { return (status == TASK_STATUS__PAUSE); } -static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes, int64_t* resSize) { +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; - *resSize = 0; - while (pTask->taskLevel == TASK_LEVEL__SOURCE) { - int8_t status = atomic_load_8(&pTask->status.taskStatus); - if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { - qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, - atomic_load_8(&pTask->status.taskStatus)); - taosMsleep(2); - } else { - break; - } - } + *totalBlocks = 0; + *totalSize = 0; - // set input - const SStreamQueueItem* pItem = (const SStreamQueueItem*)data; - if (pItem->type == STREAM_INPUT__GET_RES) { - const SStreamTrigger* pTrigger = (const SStreamTrigger*)data; - qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { - ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); - const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data; - qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); - qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, - pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); - } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { - const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data; + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + int32_t size = 0; + int32_t numOfBlocks = 0; - SArray* pBlockList = pBlock->blocks; - int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); - qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { - const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data; - - SArray* pBlockList = pMerged->submits; - int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); - qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); - } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { - const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data; - qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - } else { - ASSERT(0); - } - - // pExecutor while (1) { if (streamTaskShouldStop(&pTask->status)) { return 0; @@ -98,7 +63,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SSDataBlock block = {0}; - const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data; + const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem; ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); @@ -124,11 +89,49 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; - (*resSize) += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); + size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); + numOfBlocks += 1; + taosArrayPush(pRes, &block); - qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr, pTask->selfChildId, (int32_t)taosArrayGetSize(pRes), - (*resSize)/1048576.0); + qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr, + pTask->selfChildId, numOfBlocks, size / 1048576.0); + + // current output should be dispatched to down stream nodes + if (numOfBlocks > 1000) { + code = updateCheckPointInfo(pTask); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + ASSERT(numOfBlocks == taosArrayGetSize(pRes)); + + if (numOfBlocks > 0) { + SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes); + if (pStreamBlocks == NULL) { + return -1; + } + + qDebug("s-task:%s output exec stream data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0); + + code = streamTaskOutputResultBlock(pTask, pStreamBlocks); + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(pStreamBlocks); + return -1; + } + } else { + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + } + } + + *totalSize += size; + *totalBlocks += numOfBlocks; + + size = 0; + numOfBlocks = 0; + + ASSERT(taosArrayGetSize(pRes) == 0); } return 0; @@ -205,7 +208,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; - code = streamTaskOutput(pTask, qRes); + code = streamTaskOutputResultBlock(pTask, qRes); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosFreeQitem(qRes); @@ -251,7 +254,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { if (pTask->taskLevel == TASK_LEVEL__SINK) { ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK); - streamTaskOutput(pTask, (SStreamDataBlock*)pItem); + streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem); } // exec impl @@ -262,6 +265,57 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { } #endif +int32_t updateCheckPointInfo (SStreamTask* pTask) { + int64_t ckId = 0; + int64_t dataVer = 0; + qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); + + SCheckpointInfo* pCkInfo = &pTask->chkInfo; + if (ckId > pCkInfo->id) { // save it since the checkpoint is updated + qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 + ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId); + + pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer}; + + taosWLockLatch(&pTask->pMeta->lock); + + streamMetaSaveTask(pTask->pMeta, pTask); + if (streamMetaCommit(pTask->pMeta) < 0) { + taosWUnLockLatch(&pTask->pMeta->lock); + qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); + return -1; + } else { + taosWUnLockLatch(&pTask->pMeta->lock); + qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr); + } + } + + return TSDB_CODE_SUCCESS; +} + +SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) { + SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize); + if (pStreamBlocks == NULL) { + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + return NULL; + } + + pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; + pStreamBlocks->blocks = pRes; + + if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; + pStreamBlocks->childId = pTask->selfChildId; + pStreamBlocks->sourceVer = pSubmit->ver; + } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem; + pStreamBlocks->childId = pTask->selfChildId; + pStreamBlocks->sourceVer = pMerged->ver; + } + + return pStreamBlocks; +} + int32_t streamExecForAll(SStreamTask* pTask) { int32_t code = 0; while (1) { @@ -330,79 +384,70 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pTask->taskLevel == TASK_LEVEL__SINK) { ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize); - streamTaskOutput(pTask, (SStreamDataBlock*)pInput); + streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); continue; } - int64_t resSize = 0; + // wait for the task to be ready to go + while (pTask->taskLevel == TASK_LEVEL__SOURCE) { + int8_t status = atomic_load_8(&pTask->status.taskStatus); + if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { + qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, + atomic_load_8(&pTask->status.taskStatus)); + taosMsleep(2); + } else { + break; + } + } + int64_t st = taosGetTimestampMs(); - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize); - streamTaskExecImpl(pTask, pInput, pRes, &resSize); + { + // set input + void* pExecutor = pTask->exec.pExecutor; - int64_t ckId = 0; - int64_t dataVer = 0; - qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); - if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated - qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 - ", checkPoint id:%" PRId64 " -> %" PRId64, - pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); + const SStreamQueueItem* pItem = pInput; + if (pItem->type == STREAM_INPUT__GET_RES) { + const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput; + qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); + } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); + const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; + qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); + qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, + pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); + } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { + const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; - pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; + SArray* pBlockList = pBlock->blocks; + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); + qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); + } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { + const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput; - taosWLockLatch(&pTask->pMeta->lock); - - streamMetaSaveTask(pTask->pMeta, pTask); - if (streamMetaCommit(pTask->pMeta) < 0) { - taosWUnLockLatch(&pTask->pMeta->lock); - qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); - return -1; + SArray* pBlockList = pMerged->submits; + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); + qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); + } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { + const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; + qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else { - taosWUnLockLatch(&pTask->pMeta->lock); - qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr); + ASSERT(0); } } - double el = (taosGetTimestampMs() - st) / 1000.0; + int64_t resSize = 0; + int32_t totalBlocks = 0; + streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); - int32_t numOfBlocks = taosArrayGetSize(pRes); - qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize/1048576.0, numOfBlocks); - - if (numOfBlocks > 0) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - streamFreeQitem(pInput); - return -1; - } - - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - - if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pInput; - qRes->childId = pTask->selfChildId; - qRes->sourceVer = pSubmit->ver; - } else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pInput; - qRes->childId = pTask->selfChildId; - qRes->sourceVer = pMerged->ver; - } - - code = streamTaskOutput(pTask, qRes); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { - // backpressure and record position - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - streamFreeQitem(pInput); - taosFreeQitem(qRes); - return -1; - } - } else { - taosArrayDestroy(pRes); - } + double el = (taosGetTimestampMs() - st) / 1000.0; + qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks); streamFreeQitem(pInput); } + return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 682ce08c7f..e091b6864b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -296,6 +296,7 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { return 0; } +// todo add error log int32_t streamMetaCommit(SStreamMeta* pMeta) { if (tdbCommit(pMeta->db, pMeta->txn) < 0) { qError("failed to commit stream meta"); @@ -311,6 +312,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } + return 0; } From 5cbad1da3ef4355b861c167cbcff0d1643b5cf95 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 12:20:04 +0800 Subject: [PATCH 08/21] refactor: do some internal refactor. --- source/libs/stream/src/streamExec.c | 73 +++++++++++++++++++---------- 1 file changed, 47 insertions(+), 26 deletions(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 122dbc47f5..038bbbd35b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -18,6 +18,7 @@ // maximum allowed processed block batches. One block may include several submit blocks #define MAX_STREAM_EXEC_BATCH_NUM 128 #define MIN_STREAM_EXEC_BATCH_NUM 16 +#define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000 static int32_t updateCheckPointInfo (SStreamTask* pTask); static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); @@ -32,6 +33,39 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) { return (status == TASK_STATUS__PAUSE); } +static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, + int32_t size, int64_t* totalSize, int32_t* totalBlocks) { + int32_t code = updateCheckPointInfo(pTask); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + int32_t numOfBlocks = taosArrayGetSize(pRes); + if (numOfBlocks > 0) { + SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes); + if (pStreamBlocks == NULL) { + return -1; + } + + qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0); + + code = streamTaskOutputResultBlock(pTask, pStreamBlocks); + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(pStreamBlocks); + return -1; + } + } else { + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + } + + *totalSize += size; + *totalBlocks += numOfBlocks; + + ASSERT(taosArrayGetSize(pRes) == 0); + return TSDB_CODE_SUCCESS; +} + static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; @@ -98,38 +132,25 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i pTask->selfChildId, numOfBlocks, size / 1048576.0); // current output should be dispatched to down stream nodes - if (numOfBlocks > 1000) { - code = updateCheckPointInfo(pTask); + if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) { + ASSERT(numOfBlocks == taosArrayGetSize(pRes)); + code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); if (code != TSDB_CODE_SUCCESS) { return code; } - ASSERT(numOfBlocks == taosArrayGetSize(pRes)); - - if (numOfBlocks > 0) { - SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes); - if (pStreamBlocks == NULL) { - return -1; - } - - qDebug("s-task:%s output exec stream data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0); - - code = streamTaskOutputResultBlock(pTask, pStreamBlocks); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position - taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(pStreamBlocks); - return -1; - } - } else { - taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); - } + size = 0; + numOfBlocks = 0; + ASSERT(taosArrayGetSize(pRes) == 0); } + } - *totalSize += size; - *totalBlocks += numOfBlocks; - - size = 0; - numOfBlocks = 0; + if (numOfBlocks > 0) { + ASSERT(numOfBlocks == taosArrayGetSize(pRes)); + code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); + if (code != TSDB_CODE_SUCCESS) { + return code; + } ASSERT(taosArrayGetSize(pRes) == 0); } From 336102b8c52905d47d455cea4a3352f1fe47d14a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 13:34:49 +0800 Subject: [PATCH 09/21] refactor: do some internal refactor. --- source/libs/stream/inc/streamInc.h | 3 +- source/libs/stream/src/stream.c | 14 +++------ source/libs/stream/src/streamDispatch.c | 19 ++++++------ source/libs/stream/src/streamExec.c | 41 ++++++++++++++++--------- 4 files changed, 43 insertions(+), 34 deletions(-) diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 7e1b4dcf2d..bd13d9ec37 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -33,7 +33,8 @@ typedef struct { static SStreamGlobalEnv streamEnv; -int32_t streamDispatch(SStreamTask* pTask); +void destroyStreamDataBlock(SStreamDataBlock* pBlock); +int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock); int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 7f19529d11..80511cd491 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -200,22 +200,16 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock int32_t code = 0; if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); } else if (pTask->outputType == TASK_OUTPUT__SMA) { pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); } else { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); code = taosWriteQitem(pTask->outputQueue->queue, pBlock); if (code != 0) { - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); return code; } - streamDispatch(pTask); + streamDispatch(pTask, NULL); } return 0; @@ -260,8 +254,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i return 0; } - // continue dispatch - streamDispatch(pTask); + SStreamDataBlock* pBlock = NULL; + // continue dispatch one block to down stream in pipeline + streamDispatch(pTask, &pBlock); + destroyStreamDataBlock(pBlock); return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7e0fbd30b1..82b3f9a2f2 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -501,7 +501,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat return code; } -int32_t streamDispatch(SStreamTask* pTask) { +int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); @@ -517,23 +517,24 @@ int32_t streamDispatch(SStreamTask* pTask) { return 0; } - SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); - if (pBlock == NULL) { + SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue); + if (pDispatchedBlock == NULL) { qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); return 0; } - ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); + ASSERT(pDispatchedBlock->type == STREAM_INPUT__DATA_BLOCK); - int32_t code = 0; - if (streamDispatchAllBlocks(pTask, pBlock) < 0) { - code = -1; + int32_t code = streamDispatchAllBlocks(pTask, *pBlock); + if (code != TSDB_CODE_SUCCESS) { streamQueueProcessFail(pTask->outputQueue); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); } - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); + if (pBlock != NULL) { + *pBlock = pDispatchedBlock; + } + return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 038bbbd35b..18afc367bd 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -23,7 +23,7 @@ static int32_t updateCheckPointInfo (SStreamTask* pTask); static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); - bool streamTaskShouldStop(const SStreamStatus* pStatus) { +bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); return (status == TASK_STATUS__STOP) || (status == TASK_STATUS__DROPPING); } @@ -33,10 +33,11 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) { return (status == TASK_STATUS__PAUSE); } -static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, - int32_t size, int64_t* totalSize, int32_t* totalBlocks) { +static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize, + int32_t* totalBlocks) { int32_t code = updateCheckPointInfo(pTask); if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return code; } @@ -44,6 +45,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* if (numOfBlocks > 0) { SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes); if (pStreamBlocks == NULL) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return -1; } @@ -51,18 +53,19 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* code = streamTaskOutputResultBlock(pTask, pStreamBlocks); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position - taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(pStreamBlocks); + destroyStreamDataBlock(pStreamBlocks); return -1; } + + *totalSize += size; + *totalBlocks += numOfBlocks; + + ASSERT(taosArrayGetSize(pRes) == 0); + destroyStreamDataBlock(pStreamBlocks); } else { - taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } - *totalSize += size; - *totalBlocks += numOfBlocks; - - ASSERT(taosArrayGetSize(pRes) == 0); return TSDB_CODE_SUCCESS; } @@ -73,12 +76,13 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i *totalBlocks = 0; *totalSize = 0; - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); int32_t size = 0; int32_t numOfBlocks = 0; + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); while (1) { if (streamTaskShouldStop(&pTask->status)) { + taosArrayDestroy(pRes); // memory leak return 0; } @@ -141,7 +145,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i size = 0; numOfBlocks = 0; - ASSERT(taosArrayGetSize(pRes) == 0); } } @@ -151,8 +154,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i if (code != TSDB_CODE_SUCCESS) { return code; } - - ASSERT(taosArrayGetSize(pRes) == 0); + } else { + taosArrayDestroy(pRes); } return 0; @@ -238,7 +241,10 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); - streamDispatch(pTask); + + SStreamDataBlock* pBlock = NULL; + streamDispatch(pTask, &pBlock); + destroyStreamDataBlock(pBlock); } if (finished) { @@ -337,6 +343,11 @@ SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStr return pStreamBlocks; } +void destroyStreamDataBlock(SStreamDataBlock* pBlock) { + taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); + taosFreeQitem(pBlock); +} + int32_t streamExecForAll(SStreamTask* pTask) { int32_t code = 0; while (1) { From 4974ac78f7bc95188d5ccf45d53c951948236a26 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 13:42:09 +0800 Subject: [PATCH 10/21] fix: fix invalid ref. --- source/libs/stream/src/streamDispatch.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 82b3f9a2f2..62b734da4e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -526,7 +526,7 @@ int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock) { ASSERT(pDispatchedBlock->type == STREAM_INPUT__DATA_BLOCK); - int32_t code = streamDispatchAllBlocks(pTask, *pBlock); + int32_t code = streamDispatchAllBlocks(pTask, pDispatchedBlock); if (code != TSDB_CODE_SUCCESS) { streamQueueProcessFail(pTask->outputQueue); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); From 484788a60f1cacd700ea21e12d8b3e86c4ad8f70 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 14:29:51 +0800 Subject: [PATCH 11/21] fix:add null ptr check. --- source/libs/stream/src/streamDispatch.c | 3 +++ source/libs/stream/src/streamExec.c | 4 ++++ 2 files changed, 7 insertions(+) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 62b734da4e..33e7b949f9 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -503,6 +503,9 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); + if (pBlock != NULL) { + *pBlock = NULL; + } int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); if (numOfElems > 0) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 18afc367bd..eedf5fe90a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -344,6 +344,10 @@ SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStr } void destroyStreamDataBlock(SStreamDataBlock* pBlock) { + if (pBlock == NULL) { + return; + } + taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(pBlock); } From afc384bead58c2db5526415092c9f424c1da1f90 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 14:44:34 +0800 Subject: [PATCH 12/21] fix:remove invalid assert. --- source/libs/stream/src/streamExec.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index eedf5fe90a..77bac6cee7 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -60,7 +60,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* *totalSize += size; *totalBlocks += numOfBlocks; - ASSERT(taosArrayGetSize(pRes) == 0); destroyStreamDataBlock(pStreamBlocks); } else { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); From 10692fde5e7375b63fcf106d69b9088db84e2684 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 14:53:28 +0800 Subject: [PATCH 13/21] other: merge main. --- source/libs/stream/src/streamExec.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 77bac6cee7..a9c8ffca0c 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -77,9 +77,11 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i int32_t size = 0; int32_t numOfBlocks = 0; - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + SArray* pRes = NULL; while (1) { + pRes = taosArrayInit(4, sizeof(SSDataBlock)); + if (streamTaskShouldStop(&pTask->status)) { taosArrayDestroy(pRes); // memory leak return 0; From 6932c59510b070e6f053d577f06ace86d2806eb4 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 15:09:30 +0800 Subject: [PATCH 14/21] other: merge main. --- source/libs/stream/src/streamExec.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a9c8ffca0c..b325a8a961 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -80,7 +80,9 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i SArray* pRes = NULL; while (1) { - pRes = taosArrayInit(4, sizeof(SSDataBlock)); + if (pRes == NULL) { + pRes = taosArrayInit(4, sizeof(SSDataBlock)); + } if (streamTaskShouldStop(&pTask->status)) { taosArrayDestroy(pRes); // memory leak From 84146e6aaaed7ebdfafd8be87a3e4200789cf6cc Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 15:16:51 +0800 Subject: [PATCH 15/21] other: merge main. --- source/libs/stream/src/streamExec.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index b325a8a961..2f019529e4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -146,6 +146,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i return code; } + pRes = NULL; size = 0; numOfBlocks = 0; } From aa7ea60bca6217d589ca1453959e6cb6f6a63b86 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 15:43:48 +0800 Subject: [PATCH 16/21] fix(stream): update the free strategy. --- source/libs/stream/inc/streamInc.h | 2 +- source/libs/stream/src/stream.c | 8 ++++---- source/libs/stream/src/streamDispatch.c | 12 +++--------- source/libs/stream/src/streamExec.c | 7 ++----- 4 files changed, 10 insertions(+), 19 deletions(-) diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index bd13d9ec37..049aac6214 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -34,7 +34,7 @@ typedef struct { static SStreamGlobalEnv streamEnv; void destroyStreamDataBlock(SStreamDataBlock* pBlock); -int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock); +int32_t streamDispatch(SStreamTask* pTask); int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 80511cd491..7987490bf5 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -200,8 +200,10 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock int32_t code = 0; if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); + destroyStreamDataBlock(pBlock); } else if (pTask->outputType == TASK_OUTPUT__SMA) { pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); + destroyStreamDataBlock(pBlock); } else { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); code = taosWriteQitem(pTask->outputQueue->queue, pBlock); @@ -209,7 +211,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return code; } - streamDispatch(pTask, NULL); + streamDispatch(pTask); } return 0; @@ -254,10 +256,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i return 0; } - SStreamDataBlock* pBlock = NULL; // continue dispatch one block to down stream in pipeline - streamDispatch(pTask, &pBlock); - destroyStreamDataBlock(pBlock); + streamDispatch(pTask); return 0; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 33e7b949f9..5723802d34 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -501,12 +501,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat return code; } -int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock) { +int32_t streamDispatch(SStreamTask* pTask) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); - if (pBlock != NULL) { - *pBlock = NULL; - } - int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); if (numOfElems > 0) { qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, @@ -535,9 +531,7 @@ int32_t streamDispatch(SStreamTask* pTask, SStreamDataBlock** pBlock) { atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); } - if (pBlock != NULL) { - *pBlock = pDispatchedBlock; - } - + // this block can be freed only when it has been pushed to down stream. + destroyStreamDataBlock(pDispatchedBlock); return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 2f019529e4..077d011900 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -60,7 +60,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* *totalSize += size; *totalBlocks += numOfBlocks; - destroyStreamDataBlock(pStreamBlocks); +// destroyStreamDataBlock(pStreamBlocks); } else { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } @@ -245,10 +245,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); - - SStreamDataBlock* pBlock = NULL; - streamDispatch(pTask, &pBlock); - destroyStreamDataBlock(pBlock); + streamDispatch(pTask); } if (finished) { From 3cef1e7c42b4b2629445b454649edd24479543c8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 15:45:07 +0800 Subject: [PATCH 17/21] fix(stream): add some logs. --- source/libs/stream/src/stream.c | 2 +- source/libs/stream/src/streamExec.c | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 7987490bf5..00ce5aa40e 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -207,7 +207,7 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock } else { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); code = taosWriteQitem(pTask->outputQueue->queue, pBlock); - if (code != 0) { + if (code != 0) { // todo failed to add it into the output queue, free it. return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 077d011900..65643af963 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -59,8 +59,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* *totalSize += size; *totalBlocks += numOfBlocks; - -// destroyStreamDataBlock(pStreamBlocks); } else { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); } From de44c9160c2ef1b0c7715c4d13676ceabaa55d76 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 18:05:39 +0800 Subject: [PATCH 18/21] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 3 +- source/dnode/snode/src/snode.c | 10 ++--- source/dnode/vnode/src/tq/tq.c | 40 ++++++++++------- source/dnode/vnode/src/vnd/vnodeSync.c | 4 +- source/libs/stream/inc/streamInc.h | 7 ++- source/libs/stream/src/stream.c | 44 +++++++++--------- source/libs/stream/src/streamData.c | 50 ++++++++++++++++++--- source/libs/stream/src/streamDispatch.c | 60 +++++++++++++------------ source/libs/stream/src/streamExec.c | 37 +-------------- source/libs/stream/src/streamRecover.c | 20 +++++---- 10 files changed, 149 insertions(+), 126 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7c7039c1c0..8e7dd0bb0d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -372,6 +372,7 @@ typedef struct { int32_t upstreamChildId; int32_t upstreamNodeId; int32_t blockNum; + int64_t totalLen; SArray* dataLen; // SArray SArray* data; // SArray } SStreamDispatchReq; @@ -527,7 +528,7 @@ void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); -int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); +int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 2ff338242f..ec4500aee6 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -43,7 +43,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(pTask, &req, &rsp, false); + streamProcessDispatchMsg(pTask, &req, &rsp, false); streamMetaReleaseTask(pSnode->pMeta, pTask); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); @@ -203,17 +203,13 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) { SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); if (pTask) { - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; - streamProcessDispatchReq(pTask, &req, &rsp, exec); + SRpcMsg rsp = { .info = pMsg->info, .code = 0 }; + streamProcessDispatchMsg(pTask, &req, &rsp, exec); streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } else { return -1; } - return 0; } int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7dc7f999fb..c9749906d0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -853,14 +853,17 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SStreamTaskCheckReq req; SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeSStreamTaskCheckReq(&decoder, &req); tDecoderClear(&decoder); + int32_t taskId = req.downstreamTaskId; SStreamTaskCheckRsp rsp = { .reqId = req.reqId, @@ -874,18 +877,18 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - if (pTask) { + if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqDebug("tq recv task check req(reqId:0x%" PRIx64 + tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d", - rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId, - rsp.upstreamNodeId, rsp.status); + pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, + rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; - tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64 - ") %d at node %d, check req from task %d at node %d, rsp status %d", + tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 + ") %d at node %d, check req from task:0x%x at node %d, rsp status %d", taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } @@ -893,9 +896,10 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { SEncoder encoder; int32_t code; int32_t len; + tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code); if (code < 0) { - tqError("unable to encode rsp %d", __LINE__); + tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId); return -1; } @@ -908,6 +912,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { tEncoderClear(&encoder); SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info}; + tmsgSendRsp(&rspMsg); return 0; } @@ -919,17 +924,20 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp); + if (code < 0) { tDecoderClear(&decoder); return -1; } tDecoderClear(&decoder); - tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d", + tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d", rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); if (pTask == NULL) { + tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId, + pTq->pStreamMeta->vgId); return -1; } @@ -1230,15 +1238,17 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - SStreamDispatchReq req; - SDecoder decoder; + + SStreamDispatchReq req = {0}; + + SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessDispatchReq(pTask, &req, &rsp, exec); + streamProcessDispatchMsg(pTask, &req, &rsp, exec); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } else { @@ -1357,7 +1367,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessDispatchReq(pTask, &req, &rsp, false); + streamProcessDispatchMsg(pTask, &req, &rsp, false); streamMetaReleaseTask(pTq->pStreamMeta, pTask); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 4ea5e3c6ec..29f1ddc50f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -551,9 +551,9 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) // start to restore all stream tasks if (tsDisableStream) { - vInfo("vgId:%d, not restore stream tasks, since disabled", pVnode->config.vgId); + vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId); } else { - vInfo("vgId:%d start to restore stream tasks", pVnode->config.vgId); + vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); tqStartStreamTasks(pVnode->pTq); } } diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 049aac6214..2c1956998a 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -33,9 +33,12 @@ typedef struct { static SStreamGlobalEnv streamEnv; +int32_t streamDispatchStreamBlock(SStreamTask* pTask); + +SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); +SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); void destroyStreamDataBlock(SStreamDataBlock* pBlock); -int32_t streamDispatch(SStreamTask* pTask); -int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); + int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 00ce5aa40e..e33e7845bb 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -120,19 +120,16 @@ int32_t streamSchedExec(SStreamTask* pTask) { } int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - int8_t status = 0; - if (pData == NULL) { + + SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId); + if (pBlock == NULL) { streamTaskInputFail(pTask); status = TASK_INPUT_STATUS__FAILED; - qDebug("vgId:%d, s-task:%s failed to received dispatch msg, reason: out of memory", pTask->pMeta->vgId, pTask->id.idStr); + qDebug("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, + pTask->id.idStr); } else { - pData->type = STREAM_INPUT__DATA_BLOCK; - pData->srcVgId = pReq->dataSrcVgId; - - streamConvertDispatchMsgToData(pReq, pData); - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) { + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { // input queue is full, upstream is blocked now status = TASK_INPUT_STATUS__BLOCKED; @@ -142,15 +139,16 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR // rsp by input status void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); - SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); - pCont->inputStatus = status; - pCont->streamId = htobe64(pReq->streamId); - pCont->upstreamNodeId = htonl(pReq->upstreamNodeId); - pCont->upstreamTaskId = htonl(pReq->upstreamTaskId); - pCont->downstreamNodeId = htonl(pTask->nodeId); - pCont->downstreamTaskId = htonl(pTask->id.taskId); - pRsp->pCont = buf; + SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead)); + pDispatchRsp->inputStatus = status; + pDispatchRsp->streamId = htobe64(pReq->streamId); + pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); + pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); + pDispatchRsp->downstreamNodeId = htonl(pTask->nodeId); + pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); + + pRsp->pCont = buf; pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); @@ -211,15 +209,15 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return code; } - streamDispatch(pTask); + streamDispatchStreamBlock(pTask); } return 0; } -int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, - pReq->upstreamNodeId); +int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { + qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, + pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); // todo add the input queue buffer limitation streamTaskEnqueueBlocks(pTask, pReq, pRsp); @@ -257,7 +255,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } // continue dispatch one block to down stream in pipeline - streamDispatch(pTask); + streamDispatchStreamBlock(pTask); return 0; } @@ -267,7 +265,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { } /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ - /*streamDispatch(pTask);*/ + /*streamDispatchStreamBlock(pTask);*/ /*}*/ return 0; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index ca24414db8..7c06e7deb3 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -15,20 +15,28 @@ #include "streamInc.h" -int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { +SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) { + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen); + if (pData == NULL) { + return NULL; + } + + pData->type = blockType; + pData->srcVgId = srcVg; + int32_t blockNum = pReq->blockNum; SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum); if (pArray == NULL) { - return -1; + return NULL; } - ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data)); - ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen)); + ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen))); for (int32_t i = 0; i < blockNum; i++) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i); blockDecode(pDataBlock, pRetrieve->data); + // TODO: refactor pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); @@ -41,7 +49,39 @@ int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDa } pData->blocks = pArray; - return 0; + return pData; +} + +SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) { + SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize); + if (pStreamBlocks == NULL) { + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + return NULL; + } + + pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; + pStreamBlocks->blocks = pRes; + + if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; + pStreamBlocks->childId = pTask->selfChildId; + pStreamBlocks->sourceVer = pSubmit->ver; + } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem; + pStreamBlocks->childId = pTask->selfChildId; + pStreamBlocks->sourceVer = pMerged->ver; + } + + return pStreamBlocks; +} + +void destroyStreamDataBlock(SStreamDataBlock* pBlock) { + if (pBlock == NULL) { + return; + } + + taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); + taosFreeQitem(pBlock); } int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5723802d34..1473345730 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -24,6 +24,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); for (int32_t i = 0; i < pReq->blockNum; i++) { @@ -45,6 +46,8 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1; + ASSERT(pReq->blockNum > 0); pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*)); pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t)); @@ -178,7 +181,7 @@ CLEAR: return code; } -static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { +static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) return -1; @@ -205,6 +208,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis taosArrayPush(pReq->dataLen, &actualLen); taosArrayPush(pReq->data, &buf); + pReq->totalLen += dataStrLen; return 0; } @@ -291,7 +295,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov return 0; } -int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { +int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; SRpcMsg msg = {0}; @@ -325,6 +329,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p code = 0; return 0; + FAIL: if (buf) rpcFreeCont(buf); return code; @@ -360,7 +365,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); ASSERT(pVgInfo->vgId > 0); if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { - if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { + if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { return -1; } if (pReqs[j].blockNum == 0) { @@ -376,9 +381,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { - int32_t code = -1; - int32_t blockNum = taosArrayGetSize(pData->blocks); - ASSERT(blockNum != 0); + int32_t code = 0; + int32_t numOfBlocks = taosArrayGetSize(pData->blocks); + ASSERT(numOfBlocks != 0); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq req = { @@ -387,19 +392,23 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat .upstreamTaskId = pTask->id.taskId, .upstreamChildId = pTask->selfChildId, .upstreamNodeId = pTask->nodeId, - .blockNum = blockNum, + .blockNum = numOfBlocks, }; - req.data = taosArrayInit(blockNum, sizeof(void*)); - req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); + req.data = taosArrayInit(numOfBlocks, sizeof(void*)); + req.dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t)); if (req.data == NULL || req.dataLen == NULL) { - goto FAIL_FIXED_DISPATCH; + taosArrayDestroyP(req.data, taosMemoryFree); + taosArrayDestroy(req.dataLen); + return code; } - for (int32_t i = 0; i < blockNum; i++) { + for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); - if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) { - goto FAIL_FIXED_DISPATCH; + if (streamAddBlockIntoDispatchMsg(pDataBlock, &req) < 0) { + taosArrayDestroyP(req.data, taosMemoryFree); + taosArrayDestroy(req.dataLen); + return code; } } @@ -410,19 +419,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat req.taskId = downstreamTaskId; qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, - pTask->selfChildId, blockNum, downstreamTaskId, vgId); + pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId); - if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) { - goto FAIL_FIXED_DISPATCH; + if (doSendDispatchMsg(pTask, &req, vgId, pEpSet) < 0) { + taosArrayDestroyP(req.data, taosMemoryFree); + taosArrayDestroy(req.dataLen); + return code; } - code = 0; - - FAIL_FIXED_DISPATCH: - taosArrayDestroyP(req.data, taosMemoryFree); - taosArrayDestroy(req.dataLen); - return code; - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); ASSERT(rspCnt == 0); @@ -452,13 +456,13 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat pReqs[i].taskId = pVgInfo->taskId; } - for (int32_t i = 0; i < blockNum; i++) { + for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); // TODO: do not use broadcast if (pDataBlock->info.type == STREAM_DELETE_RESULT) { for (int32_t j = 0; j < vgSz; j++) { - if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { + if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { goto FAIL_SHUFFLE_DISPATCH; } if (pReqs[j].blockNum == 0) { @@ -475,7 +479,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat } qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId, - blockNum, vgSz); + numOfBlocks, vgSz); for (int32_t i = 0; i < vgSz; i++) { if (pReqs[i].blockNum > 0) { @@ -483,7 +487,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId, pReqs[i].blockNum, pVgInfo->vgId); - if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { + if (doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { goto FAIL_SHUFFLE_DISPATCH; } } @@ -501,7 +505,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat return code; } -int32_t streamDispatch(SStreamTask* pTask) { +int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); if (numOfElems > 0) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 65643af963..c7420b0bef 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -21,7 +21,6 @@ #define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000 static int32_t updateCheckPointInfo (SStreamTask* pTask); -static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); @@ -43,7 +42,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* int32_t numOfBlocks = taosArrayGetSize(pRes); if (numOfBlocks > 0) { - SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes); + SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes); if (pStreamBlocks == NULL) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return -1; @@ -243,7 +242,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); - streamDispatch(pTask); + streamDispatchStreamBlock(pTask); } if (finished) { @@ -319,38 +318,6 @@ int32_t updateCheckPointInfo (SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) { - SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize); - if (pStreamBlocks == NULL) { - taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); - return NULL; - } - - pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; - pStreamBlocks->blocks = pRes; - - if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; - pStreamBlocks->childId = pTask->selfChildId; - pStreamBlocks->sourceVer = pSubmit->ver; - } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem; - pStreamBlocks->childId = pTask->selfChildId; - pStreamBlocks->sourceVer = pMerged->ver; - } - - return pStreamBlocks; -} - -void destroyStreamDataBlock(SStreamDataBlock* pBlock) { - if (pBlock == NULL) { - return; - } - - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); -} - int32_t streamExecForAll(SStreamTask* pTask) { int32_t code = 0; while (1) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 7236c6c4b9..eb2535782e 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -55,7 +55,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { // checkstatus int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { - qDebug("s-taks:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version); + qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version); SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -72,7 +72,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; - qDebug("s-task:%s at node %d check downstream task:%d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, + qDebug("s-task:%s at node %d check downstream task:0x%x at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -88,7 +88,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("s-task:%s at node %d check downstream task:%d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } @@ -111,15 +111,16 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp .childId = pRsp->childId, }; - qDebug("s-task:%s at node %d check downstream task %d at node %d (recheck)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (recheck)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t vgSz = taosArrayGetSize(vgInfo); - for (int32_t i = 0; i < vgSz; i++) { + + int32_t numOfVgs = taosArrayGetSize(vgInfo); + for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); if (pVgInfo->taskId == req.downstreamTaskId) { streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet); @@ -135,7 +136,9 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) { } int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { - qDebug("task %d at node %d recv check rsp from task %d at node %d: status %d", pRsp->upstreamTaskId, + ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); + + qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status); if (pRsp->status == 1) { @@ -175,9 +178,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ASSERT(0); } } else { // not ready, wait for 100ms and retry - qDebug("s-task:%s downstream taskId:%d (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr, + qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr, pRsp->downstreamTaskId, pRsp->downstreamNodeId); taosMsleep(100); + streamRecheckOneDownstream(pTask, pRsp); } From 899e4d3350610d456bf3ae6c60b15272f6e4f989 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 18:13:58 +0800 Subject: [PATCH 19/21] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 8 +++++--- source/libs/stream/src/streamMeta.c | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c9749906d0..74361ec319 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -957,6 +957,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // 1.deserialize msg and build task SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask)); return -1; } @@ -974,9 +976,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // 2.save task, use the newest commit version as the initial start version of stream task. taosWLockLatch(&pTq->pStreamMeta->lock); code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); + int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); if (code < 0) { - tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, - streamMetaGetNumOfTasks(pTq->pStreamMeta)); + tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; } @@ -989,7 +991,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr, - pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); + pTask->status.taskStatus, numOfTasks); return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index e091b6864b..33375fe921 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -375,7 +375,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { } if (pTask->fillHistory) { - pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; + ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM); streamTaskCheckDownstream(pTask, ver); } } From cb75e5a863dd10b75aff6d645db8f58b86a26e36 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 22:07:04 +0800 Subject: [PATCH 20/21] fix(stream): fix memory leak. --- source/libs/stream/src/streamDispatch.c | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 1473345730..722013115f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -421,12 +421,10 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId); - if (doSendDispatchMsg(pTask, &req, vgId, pEpSet) < 0) { - taosArrayDestroyP(req.data, taosMemoryFree); - taosArrayDestroy(req.dataLen); - return code; - } - + code = doSendDispatchMsg(pTask, &req, vgId, pEpSet); + taosArrayDestroyP(req.data, taosMemoryFree); + taosArrayDestroy(req.dataLen); + return code; } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); ASSERT(rspCnt == 0); From dcb91082f9aa1c047b36909412ae980ca3e05b4e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 23:37:40 +0800 Subject: [PATCH 21/21] fix(stream): fix error. --- source/dnode/snode/src/snode.c | 6 ++---- source/libs/stream/src/stream.c | 4 ++-- source/libs/stream/src/streamDispatch.c | 20 ++++++++------------ source/libs/stream/src/streamExec.c | 7 ++----- 4 files changed, 14 insertions(+), 23 deletions(-) diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index ec4500aee6..e9225f3d6e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -223,11 +223,9 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderClear(&decoder); int32_t taskId = req.dstTaskId; SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + if (pTask) { - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; + SRpcMsg rsp = { .info = pMsg->info, .code = 0}; streamProcessRetrieveReq(pTask, &req, &rsp); streamMetaReleaseTask(pSnode->pMeta, pTask); tDeleteStreamRetrieveReq(&req); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index e33e7845bb..8cc1ef1dd3 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -161,7 +161,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, // enqueue if (pData != NULL) { - qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId, + qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId %" PRId64, pTask->id.idStr, pTask->selfChildId, pReq->srcTaskId, pReq->reqId); pData->type = STREAM_INPUT__DATA_RETRIEVE; @@ -271,7 +271,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { } int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { - qDebug("s-task:%s receive retrieve req from node %d taskId:%d", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId); + qDebug("s-task:%s receive retrieve req from node %d taskId:0x%x", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId); streamTaskEnqueueRetrieve(pTask, pReq, pRsp); ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 722013115f..401a8b9e74 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -138,7 +138,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i); req.dstNodeId = pEpInfo->nodeId; req.dstTaskId = pEpInfo->taskId; - int32_t code; int32_t len; tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code); if (code < 0) { @@ -158,23 +157,18 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) tEncodeStreamRetrieveReq(&encoder, &req); tEncoderClear(&encoder); - SRpcMsg rpcMsg = { - .code = 0, - .msgType = TDMT_STREAM_RETRIEVE, - .pCont = buf, - .contLen = sizeof(SMsgHead) + len, - }; - + SRpcMsg rpcMsg = { .code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len }; if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) { ASSERT(0); goto CLEAR; } - buf = NULL; - qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->id.idStr, + buf = NULL; + qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId); } code = 0; + CLEAR: taosMemoryFree(pRetrieve); rpcFreeCont(buf); @@ -400,12 +394,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat if (req.data == NULL || req.dataLen == NULL) { taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroy(req.dataLen); - return code; + return TSDB_CODE_OUT_OF_MEMORY; } for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); - if (streamAddBlockIntoDispatchMsg(pDataBlock, &req) < 0) { + code = streamAddBlockIntoDispatchMsg(pDataBlock, &req); + + if (code != TSDB_CODE_SUCCESS) { taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroy(req.dataLen); return code; diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index c7420b0bef..55474541ed 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -108,7 +108,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i block.info.type = STREAM_PULL_OVER; block.info.childId = pTask->selfChildId; taosArrayPush(pRes, &block); - + numOfBlocks += 1; qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId, pRetrieveBlock->reqId); } @@ -152,14 +152,11 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i if (numOfBlocks > 0) { ASSERT(numOfBlocks == taosArrayGetSize(pRes)); code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); - if (code != TSDB_CODE_SUCCESS) { - return code; - } } else { taosArrayDestroy(pRes); } - return 0; + return code; } int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {