From de44c9160c2ef1b0c7715c4d13676ceabaa55d76 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 20 May 2023 18:05:39 +0800 Subject: [PATCH] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 3 +- source/dnode/snode/src/snode.c | 10 ++--- source/dnode/vnode/src/tq/tq.c | 40 ++++++++++------- source/dnode/vnode/src/vnd/vnodeSync.c | 4 +- source/libs/stream/inc/streamInc.h | 7 ++- source/libs/stream/src/stream.c | 44 +++++++++--------- source/libs/stream/src/streamData.c | 50 ++++++++++++++++++--- source/libs/stream/src/streamDispatch.c | 60 +++++++++++++------------ source/libs/stream/src/streamExec.c | 37 +-------------- source/libs/stream/src/streamRecover.c | 20 +++++---- 10 files changed, 149 insertions(+), 126 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7c7039c1c0..8e7dd0bb0d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -372,6 +372,7 @@ typedef struct { int32_t upstreamChildId; int32_t upstreamNodeId; int32_t blockNum; + int64_t totalLen; SArray* dataLen; // SArray SArray* data; // SArray } SStreamDispatchReq; @@ -527,7 +528,7 @@ void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); -int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); +int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 2ff338242f..ec4500aee6 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -43,7 +43,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(pTask, &req, &rsp, false); + streamProcessDispatchMsg(pTask, &req, &rsp, false); streamMetaReleaseTask(pSnode->pMeta, pTask); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); @@ -203,17 +203,13 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) { SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); if (pTask) { - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; - streamProcessDispatchReq(pTask, &req, &rsp, exec); + SRpcMsg rsp = { .info = pMsg->info, .code = 0 }; + streamProcessDispatchMsg(pTask, &req, &rsp, exec); streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } else { return -1; } - return 0; } int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7dc7f999fb..c9749906d0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -853,14 +853,17 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SStreamTaskCheckReq req; SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeSStreamTaskCheckReq(&decoder, &req); tDecoderClear(&decoder); + int32_t taskId = req.downstreamTaskId; SStreamTaskCheckRsp rsp = { .reqId = req.reqId, @@ -874,18 +877,18 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - if (pTask) { + if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqDebug("tq recv task check req(reqId:0x%" PRIx64 + tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d", - rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId, - rsp.upstreamNodeId, rsp.status); + pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, + rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; - tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64 - ") %d at node %d, check req from task %d at node %d, rsp status %d", + tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 + ") %d at node %d, check req from task:0x%x at node %d, rsp status %d", taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } @@ -893,9 +896,10 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { SEncoder encoder; int32_t code; int32_t len; + tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code); if (code < 0) { - tqError("unable to encode rsp %d", __LINE__); + tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId); return -1; } @@ -908,6 +912,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { tEncoderClear(&encoder); SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info}; + tmsgSendRsp(&rspMsg); return 0; } @@ -919,17 +924,20 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp); + if (code < 0) { tDecoderClear(&decoder); return -1; } tDecoderClear(&decoder); - tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d", + tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d", rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); if (pTask == NULL) { + tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId, + pTq->pStreamMeta->vgId); return -1; } @@ -1230,15 +1238,17 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - SStreamDispatchReq req; - SDecoder decoder; + + SStreamDispatchReq req = {0}; + + SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessDispatchReq(pTask, &req, &rsp, exec); + streamProcessDispatchMsg(pTask, &req, &rsp, exec); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } else { @@ -1357,7 +1367,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessDispatchReq(pTask, &req, &rsp, false); + streamProcessDispatchMsg(pTask, &req, &rsp, false); streamMetaReleaseTask(pTq->pStreamMeta, pTask); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 4ea5e3c6ec..29f1ddc50f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -551,9 +551,9 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) // start to restore all stream tasks if (tsDisableStream) { - vInfo("vgId:%d, not restore stream tasks, since disabled", pVnode->config.vgId); + vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId); } else { - vInfo("vgId:%d start to restore stream tasks", pVnode->config.vgId); + vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); tqStartStreamTasks(pVnode->pTq); } } diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 049aac6214..2c1956998a 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -33,9 +33,12 @@ typedef struct { static SStreamGlobalEnv streamEnv; +int32_t streamDispatchStreamBlock(SStreamTask* pTask); + +SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); +SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); void destroyStreamDataBlock(SStreamDataBlock* pBlock); -int32_t streamDispatch(SStreamTask* pTask); -int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); + int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 00ce5aa40e..e33e7845bb 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -120,19 +120,16 @@ int32_t streamSchedExec(SStreamTask* pTask) { } int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - int8_t status = 0; - if (pData == NULL) { + + SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId); + if (pBlock == NULL) { streamTaskInputFail(pTask); status = TASK_INPUT_STATUS__FAILED; - qDebug("vgId:%d, s-task:%s failed to received dispatch msg, reason: out of memory", pTask->pMeta->vgId, pTask->id.idStr); + qDebug("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, + pTask->id.idStr); } else { - pData->type = STREAM_INPUT__DATA_BLOCK; - pData->srcVgId = pReq->dataSrcVgId; - - streamConvertDispatchMsgToData(pReq, pData); - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) { + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { // input queue is full, upstream is blocked now status = TASK_INPUT_STATUS__BLOCKED; @@ -142,15 +139,16 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR // rsp by input status void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); - SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); - pCont->inputStatus = status; - pCont->streamId = htobe64(pReq->streamId); - pCont->upstreamNodeId = htonl(pReq->upstreamNodeId); - pCont->upstreamTaskId = htonl(pReq->upstreamTaskId); - pCont->downstreamNodeId = htonl(pTask->nodeId); - pCont->downstreamTaskId = htonl(pTask->id.taskId); - pRsp->pCont = buf; + SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead)); + pDispatchRsp->inputStatus = status; + pDispatchRsp->streamId = htobe64(pReq->streamId); + pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); + pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); + pDispatchRsp->downstreamNodeId = htonl(pTask->nodeId); + pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); + + pRsp->pCont = buf; pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); @@ -211,15 +209,15 @@ int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock return code; } - streamDispatch(pTask); + streamDispatchStreamBlock(pTask); } return 0; } -int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, - pReq->upstreamNodeId); +int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { + qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, + pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); // todo add the input queue buffer limitation streamTaskEnqueueBlocks(pTask, pReq, pRsp); @@ -257,7 +255,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i } // continue dispatch one block to down stream in pipeline - streamDispatch(pTask); + streamDispatchStreamBlock(pTask); return 0; } @@ -267,7 +265,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { } /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ - /*streamDispatch(pTask);*/ + /*streamDispatchStreamBlock(pTask);*/ /*}*/ return 0; } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index ca24414db8..7c06e7deb3 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -15,20 +15,28 @@ #include "streamInc.h" -int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { +SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) { + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen); + if (pData == NULL) { + return NULL; + } + + pData->type = blockType; + pData->srcVgId = srcVg; + int32_t blockNum = pReq->blockNum; SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum); if (pArray == NULL) { - return -1; + return NULL; } - ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data)); - ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen)); + ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen))); for (int32_t i = 0; i < blockNum; i++) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i); blockDecode(pDataBlock, pRetrieve->data); + // TODO: refactor pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); @@ -41,7 +49,39 @@ int32_t streamConvertDispatchMsgToData(const SStreamDispatchReq* pReq, SStreamDa } pData->blocks = pArray; - return 0; + return pData; +} + +SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) { + SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize); + if (pStreamBlocks == NULL) { + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + return NULL; + } + + pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; + pStreamBlocks->blocks = pRes; + + if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; + pStreamBlocks->childId = pTask->selfChildId; + pStreamBlocks->sourceVer = pSubmit->ver; + } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem; + pStreamBlocks->childId = pTask->selfChildId; + pStreamBlocks->sourceVer = pMerged->ver; + } + + return pStreamBlocks; +} + +void destroyStreamDataBlock(SStreamDataBlock* pBlock) { + if (pBlock == NULL) { + return; + } + + taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); + taosFreeQitem(pBlock); } int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 5723802d34..1473345730 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -24,6 +24,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); for (int32_t i = 0; i < pReq->blockNum; i++) { @@ -45,6 +46,8 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1; + ASSERT(pReq->blockNum > 0); pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*)); pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t)); @@ -178,7 +181,7 @@ CLEAR: return code; } -static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { +static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) return -1; @@ -205,6 +208,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis taosArrayPush(pReq->dataLen, &actualLen); taosArrayPush(pReq->data, &buf); + pReq->totalLen += dataStrLen; return 0; } @@ -291,7 +295,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov return 0; } -int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { +int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; SRpcMsg msg = {0}; @@ -325,6 +329,7 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p code = 0; return 0; + FAIL: if (buf) rpcFreeCont(buf); return code; @@ -360,7 +365,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); ASSERT(pVgInfo->vgId > 0); if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { - if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { + if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { return -1; } if (pReqs[j].blockNum == 0) { @@ -376,9 +381,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { - int32_t code = -1; - int32_t blockNum = taosArrayGetSize(pData->blocks); - ASSERT(blockNum != 0); + int32_t code = 0; + int32_t numOfBlocks = taosArrayGetSize(pData->blocks); + ASSERT(numOfBlocks != 0); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq req = { @@ -387,19 +392,23 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat .upstreamTaskId = pTask->id.taskId, .upstreamChildId = pTask->selfChildId, .upstreamNodeId = pTask->nodeId, - .blockNum = blockNum, + .blockNum = numOfBlocks, }; - req.data = taosArrayInit(blockNum, sizeof(void*)); - req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); + req.data = taosArrayInit(numOfBlocks, sizeof(void*)); + req.dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t)); if (req.data == NULL || req.dataLen == NULL) { - goto FAIL_FIXED_DISPATCH; + taosArrayDestroyP(req.data, taosMemoryFree); + taosArrayDestroy(req.dataLen); + return code; } - for (int32_t i = 0; i < blockNum; i++) { + for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); - if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) { - goto FAIL_FIXED_DISPATCH; + if (streamAddBlockIntoDispatchMsg(pDataBlock, &req) < 0) { + taosArrayDestroyP(req.data, taosMemoryFree); + taosArrayDestroy(req.dataLen); + return code; } } @@ -410,19 +419,14 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat req.taskId = downstreamTaskId; qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, - pTask->selfChildId, blockNum, downstreamTaskId, vgId); + pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId); - if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) { - goto FAIL_FIXED_DISPATCH; + if (doSendDispatchMsg(pTask, &req, vgId, pEpSet) < 0) { + taosArrayDestroyP(req.data, taosMemoryFree); + taosArrayDestroy(req.dataLen); + return code; } - code = 0; - - FAIL_FIXED_DISPATCH: - taosArrayDestroyP(req.data, taosMemoryFree); - taosArrayDestroy(req.dataLen); - return code; - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); ASSERT(rspCnt == 0); @@ -452,13 +456,13 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat pReqs[i].taskId = pVgInfo->taskId; } - for (int32_t i = 0; i < blockNum; i++) { + for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); // TODO: do not use broadcast if (pDataBlock->info.type == STREAM_DELETE_RESULT) { for (int32_t j = 0; j < vgSz; j++) { - if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { + if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { goto FAIL_SHUFFLE_DISPATCH; } if (pReqs[j].blockNum == 0) { @@ -475,7 +479,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat } qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId, - blockNum, vgSz); + numOfBlocks, vgSz); for (int32_t i = 0; i < vgSz; i++) { if (pReqs[i].blockNum > 0) { @@ -483,7 +487,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId, pReqs[i].blockNum, pVgInfo->vgId); - if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { + if (doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { goto FAIL_SHUFFLE_DISPATCH; } } @@ -501,7 +505,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat return code; } -int32_t streamDispatch(SStreamTask* pTask) { +int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); if (numOfElems > 0) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 65643af963..c7420b0bef 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -21,7 +21,6 @@ #define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000 static int32_t updateCheckPointInfo (SStreamTask* pTask); -static SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); @@ -43,7 +42,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* int32_t numOfBlocks = taosArrayGetSize(pRes); if (numOfBlocks > 0) { - SStreamDataBlock* pStreamBlocks = createStreamDataBlockFromResults(pItem, pTask, size, pRes); + SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes); if (pStreamBlocks == NULL) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); return -1; @@ -243,7 +242,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); - streamDispatch(pTask); + streamDispatchStreamBlock(pTask); } if (finished) { @@ -319,38 +318,6 @@ int32_t updateCheckPointInfo (SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } -SStreamDataBlock* createStreamDataBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) { - SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize); - if (pStreamBlocks == NULL) { - taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); - return NULL; - } - - pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; - pStreamBlocks->blocks = pRes; - - if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; - pStreamBlocks->childId = pTask->selfChildId; - pStreamBlocks->sourceVer = pSubmit->ver; - } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem; - pStreamBlocks->childId = pTask->selfChildId; - pStreamBlocks->sourceVer = pMerged->ver; - } - - return pStreamBlocks; -} - -void destroyStreamDataBlock(SStreamDataBlock* pBlock) { - if (pBlock == NULL) { - return; - } - - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); -} - int32_t streamExecForAll(SStreamTask* pTask) { int32_t code = 0; while (1) { diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 7236c6c4b9..eb2535782e 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -55,7 +55,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { // checkstatus int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { - qDebug("s-taks:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version); + qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version); SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -72,7 +72,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; - qDebug("s-task:%s at node %d check downstream task:%d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, + qDebug("s-task:%s at node %d check downstream task:0x%x at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -88,7 +88,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("s-task:%s at node %d check downstream task:%d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } @@ -111,15 +111,16 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp .childId = pRsp->childId, }; - qDebug("s-task:%s at node %d check downstream task %d at node %d (recheck)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (recheck)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t vgSz = taosArrayGetSize(vgInfo); - for (int32_t i = 0; i < vgSz; i++) { + + int32_t numOfVgs = taosArrayGetSize(vgInfo); + for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); if (pVgInfo->taskId == req.downstreamTaskId) { streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet); @@ -135,7 +136,9 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) { } int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { - qDebug("task %d at node %d recv check rsp from task %d at node %d: status %d", pRsp->upstreamTaskId, + ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); + + qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status); if (pRsp->status == 1) { @@ -175,9 +178,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ASSERT(0); } } else { // not ready, wait for 100ms and retry - qDebug("s-task:%s downstream taskId:%d (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr, + qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr, pRsp->downstreamTaskId, pRsp->downstreamNodeId); taosMsleep(100); + streamRecheckOneDownstream(pTask, pRsp); }