diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8f5d6c38f3..8e7dd0bb0d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -372,6 +372,7 @@ typedef struct { int32_t upstreamChildId; int32_t upstreamNodeId; int32_t blockNum; + int64_t totalLen; SArray* dataLen; // SArray SArray* data; // SArray } SStreamDispatchReq; @@ -527,7 +528,7 @@ void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq); int32_t streamSetupTrigger(SStreamTask* pTask); int32_t streamProcessRunReq(SStreamTask* pTask); -int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); +int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec); int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code); int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg); @@ -536,7 +537,7 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S void streamTaskInputFail(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask); -int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock); +int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock); bool streamTaskShouldStop(const SStreamStatus* pStatus); bool streamTaskShouldPause(const SStreamStatus* pStatus); diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 2a125a5831..42424b6436 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -2513,9 +2513,6 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { *actualLen = dataLen; *groupId = pBlock->info.id.groupId; ASSERT(dataLen > 0); - - uDebug("build data block, actualLen:%d, rows:%d, cols:%d", dataLen, *rows, *cols); - return dataLen; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 2ff338242f..e9225f3d6e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -43,7 +43,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { .info = pMsg->info, .code = 0, }; - streamProcessDispatchReq(pTask, &req, &rsp, false); + streamProcessDispatchMsg(pTask, &req, &rsp, false); streamMetaReleaseTask(pSnode->pMeta, pTask); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); @@ -203,17 +203,13 @@ int32_t sndProcessTaskDispatchReq(SSnode *pSnode, SRpcMsg *pMsg, bool exec) { SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); if (pTask) { - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; - streamProcessDispatchReq(pTask, &req, &rsp, exec); + SRpcMsg rsp = { .info = pMsg->info, .code = 0 }; + streamProcessDispatchMsg(pTask, &req, &rsp, exec); streamMetaReleaseTask(pSnode->pMeta, pTask); return 0; } else { return -1; } - return 0; } int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { @@ -227,11 +223,9 @@ int32_t sndProcessTaskRetrieveReq(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderClear(&decoder); int32_t taskId = req.dstTaskId; SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, taskId); + if (pTask) { - SRpcMsg rsp = { - .info = pMsg->info, - .code = 0, - }; + SRpcMsg rsp = { .info = pMsg->info, .code = 0}; streamProcessRetrieveReq(pTask, &req, &rsp); streamMetaReleaseTask(pSnode->pMeta, pTask); tDeleteStreamRetrieveReq(&req); diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index da5d6f8b3c..18fa893fa4 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -198,6 +198,7 @@ void *tsdbGetIdx(SMeta *pMeta); void *tsdbGetIvtIdx(SMeta *pMeta); uint64_t tsdbGetReaderMaxVersion(STsdbReader *pReader); void tsdbReaderSetCloseFlag(STsdbReader *pReader); +int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr); int32_t tsdbReuseCacherowsReader(void* pReader, void* pTableIdList, int32_t numOfTables); int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a8c2a09319..74361ec319 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -760,6 +760,7 @@ void freePtr(void *ptr) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { int32_t vgId = TD_VID(pTq->pVnode); + pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId); pTask->refCnt = 1; pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE; @@ -837,7 +838,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - SWalFilterCond cond = {.deleteMsg = 1}; + SWalFilterCond cond = {.deleteMsg = 1}; // delete msg also extract from wal files pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond); } @@ -852,14 +853,17 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { - char* msgStr = pMsg->pCont; - char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); - int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + SStreamTaskCheckReq req; SDecoder decoder; + tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeSStreamTaskCheckReq(&decoder, &req); tDecoderClear(&decoder); + int32_t taskId = req.downstreamTaskId; SStreamTaskCheckRsp rsp = { .reqId = req.reqId, @@ -873,18 +877,18 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - if (pTask) { + if (pTask != NULL) { rsp.status = streamTaskCheckStatus(pTask); streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqDebug("tq recv task check req(reqId:0x%" PRIx64 + tqDebug("s-task:%s recv task check req(reqId:0x%" PRIx64 ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d", - rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId, - rsp.upstreamNodeId, rsp.status); + pTask->id.idStr, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, + rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; - tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64 - ") %d at node %d, check req from task %d at node %d, rsp status %d", + tqDebug("tq recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 + ") %d at node %d, check req from task:0x%x at node %d, rsp status %d", taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } @@ -892,9 +896,10 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { SEncoder encoder; int32_t code; int32_t len; + tEncodeSize(tEncodeSStreamTaskCheckRsp, &rsp, len, code); if (code < 0) { - tqError("unable to encode rsp %d", __LINE__); + tqError("vgId:%d failed to encode task check rsp, task:0x%x", pTq->pStreamMeta->vgId, taskId); return -1; } @@ -907,6 +912,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { tEncoderClear(&encoder); SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info}; + tmsgSendRsp(&rspMsg); return 0; } @@ -918,17 +924,20 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, msgLen); code = tDecodeSStreamTaskCheckRsp(&decoder, &rsp); + if (code < 0) { tDecoderClear(&decoder); return -1; } tDecoderClear(&decoder); - tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d", + tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task:0x%x at node %d, status %d", rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, rsp.upstreamTaskId); if (pTask == NULL) { + tqError("tq failed to locate the stream task:0x%x vgId:%d, it may have been destroyed", rsp.upstreamTaskId, + pTq->pStreamMeta->vgId); return -1; } @@ -948,6 +957,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // 1.deserialize msg and build task SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + tqError("vgId:%d failed to create stream task due to out of memory, alloc size:%d", vgId, (int32_t) sizeof(SStreamTask)); return -1; } @@ -965,9 +976,9 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms // 2.save task, use the newest commit version as the initial start version of stream task. taosWLockLatch(&pTq->pStreamMeta->lock); code = streamMetaAddDeployedTask(pTq->pStreamMeta, sversion, pTask); + int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta); if (code < 0) { - tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, - streamMetaGetNumOfTasks(pTq->pStreamMeta)); + tqError("vgId:%d failed to add s-task:%s, total:%d", vgId, pTask->id.idStr, numOfTasks); taosWUnLockLatch(&pTq->pStreamMeta->lock); return -1; } @@ -980,7 +991,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms } tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", vgId, pTask->id.idStr, - pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); + pTask->status.taskStatus, numOfTasks); return 0; } @@ -1229,15 +1240,17 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { char* msgStr = pMsg->pCont; char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); - SStreamDispatchReq req; - SDecoder decoder; + + SStreamDispatchReq req = {0}; + + SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamDispatchReq(&decoder, &req); SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.taskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessDispatchReq(pTask, &req, &rsp, exec); + streamProcessDispatchMsg(pTask, &req, &rsp, exec); streamMetaReleaseTask(pTq->pStreamMeta, pTask); return 0; } else { @@ -1356,7 +1369,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask) { SRpcMsg rsp = {.info = pMsg->info, .code = 0}; - streamProcessDispatchReq(pTask, &req, &rsp, false); + streamProcessDispatchMsg(pTask, &req, &rsp, false); streamMetaReleaseTask(pTq->pStreamMeta, pTask); rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 9acf2454af..ff9f95d5fa 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -119,7 +119,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { int32_t status = pTask->status.taskStatus; if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - tqDebug("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel); +// tqTrace("s-task:%s level:%d not source task, no need to start", pTask->id.idStr, pTask->taskLevel); streamMetaReleaseTask(pStreamMeta, pTask); continue; } @@ -132,7 +132,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } if (tInputQueueIsFull(pTask)) { - tqDebug("s-task:%s input queue is full, do nothing", pTask->id.idStr); + tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 36b18ce9f4..cde2672541 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -5506,3 +5506,58 @@ void tsdbReaderSetId(STsdbReader* pReader, const char* idstr) { } void tsdbReaderSetCloseFlag(STsdbReader* pReader) { pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED; } + +/*-------------todo:refactor the implementation of those APIs in this file to seperate the API into two files------*/ +// opt perf, do NOT create so many readers +int64_t tsdbGetLastTimestamp(SVnode* pVnode, void* pTableList, int32_t numOfTables, const char* pIdStr) { + SQueryTableDataCond cond = {.type = TIMEWINDOW_RANGE_CONTAINED, .numOfCols = 1, .order = TSDB_ORDER_DESC, + .startVersion = -1, .endVersion = -1}; + cond.twindows.skey = INT64_MIN; + cond.twindows.ekey = INT64_MAX; + + cond.colList = taosMemoryCalloc(1, sizeof(SColumnInfo)); + cond.pSlotList = taosMemoryMalloc(sizeof(int32_t) * cond.numOfCols); + if (cond.colList == NULL || cond.pSlotList == NULL) { + // todo + } + + cond.colList[0].colId = 1; + cond.colList[0].slotId = 0; + cond.colList[0].type = TSDB_DATA_TYPE_TIMESTAMP; + + cond.pSlotList[0] = 0; + + STableKeyInfo* pTableKeyInfo = pTableList; + STsdbReader* pReader = NULL; + SSDataBlock* pBlock = createDataBlock(); + + SColumnInfoData data = {0}; + data.info = (SColumnInfo) {.type = TSDB_DATA_TYPE_TIMESTAMP, .colId = 1, .bytes = TSDB_KEYSIZE}; + blockDataAppendColInfo(pBlock, &data); + + int64_t key = INT64_MIN; + + for(int32_t i = 0; i < numOfTables; ++i) { + int32_t code = tsdbReaderOpen(pVnode, &cond, &pTableKeyInfo[i], 1, pBlock, &pReader, pIdStr, false, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + bool hasData = false; + code = tsdbNextDataBlock(pReader, &hasData); + if (!hasData || code != TSDB_CODE_SUCCESS) { + continue; + } + + SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, 0); + int64_t k = *(int64_t*)pCol->pData; + + if (key < k) { + key = k; + } + + tsdbReaderClose(pReader); + } + + return 0; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 4ea5e3c6ec..29f1ddc50f 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -551,9 +551,9 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx) // start to restore all stream tasks if (tsDisableStream) { - vInfo("vgId:%d, not restore stream tasks, since disabled", pVnode->config.vgId); + vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", pVnode->config.vgId); } else { - vInfo("vgId:%d start to restore stream tasks", pVnode->config.vgId); + vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId); tqStartStreamTasks(pVnode->pTq); } } diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index c471bc2bd8..2c1956998a 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -33,8 +33,12 @@ typedef struct { static SStreamGlobalEnv streamEnv; -int32_t streamDispatch(SStreamTask* pTask); -int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); +int32_t streamDispatchStreamBlock(SStreamTask* pTask); + +SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg); +SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes); +void destroyStreamDataBlock(SStreamDataBlock* pBlock); + int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* data); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 26e1c2ab43..8cc1ef1dd3 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -120,39 +120,35 @@ int32_t streamSchedExec(SStreamTask* pTask) { } int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) { - SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - int8_t status; + int8_t status = 0; - // enqueue data block - if (pData != NULL) { - pData->type = STREAM_INPUT__DATA_BLOCK; - pData->srcVgId = pReq->dataSrcVgId; - // decode - /*pData->blocks = pReq->data;*/ - /*pBlock->sourceVer = pReq->sourceVer;*/ - streamDispatchReqToData(pReq, pData); - if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pData) == 0) { + SStreamDataBlock* pBlock = createStreamDataFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId); + if (pBlock == NULL) { + streamTaskInputFail(pTask); + status = TASK_INPUT_STATUS__FAILED; + qDebug("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId, + pTask->id.idStr); + } else { + if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock) == 0) { status = TASK_INPUT_STATUS__NORMAL; } else { // input queue is full, upstream is blocked now status = TASK_INPUT_STATUS__BLOCKED; } - } else { - streamTaskInputFail(pTask); - status = TASK_INPUT_STATUS__FAILED; } // rsp by input status void* buf = rpcMallocCont(sizeof(SMsgHead) + sizeof(SStreamDispatchRsp)); ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); - SStreamDispatchRsp* pCont = POINTER_SHIFT(buf, sizeof(SMsgHead)); - pCont->inputStatus = status; - pCont->streamId = htobe64(pReq->streamId); - pCont->upstreamNodeId = htonl(pReq->upstreamNodeId); - pCont->upstreamTaskId = htonl(pReq->upstreamTaskId); - pCont->downstreamNodeId = htonl(pTask->nodeId); - pCont->downstreamTaskId = htonl(pTask->id.taskId); - pRsp->pCont = buf; + SStreamDispatchRsp* pDispatchRsp = POINTER_SHIFT(buf, sizeof(SMsgHead)); + pDispatchRsp->inputStatus = status; + pDispatchRsp->streamId = htobe64(pReq->streamId); + pDispatchRsp->upstreamNodeId = htonl(pReq->upstreamNodeId); + pDispatchRsp->upstreamTaskId = htonl(pReq->upstreamTaskId); + pDispatchRsp->downstreamNodeId = htonl(pTask->nodeId); + pDispatchRsp->downstreamTaskId = htonl(pTask->id.taskId); + + pRsp->pCont = buf; pRsp->contLen = sizeof(SMsgHead) + sizeof(SStreamDispatchRsp); tmsgSendRsp(pRsp); @@ -165,7 +161,7 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, // enqueue if (pData != NULL) { - qDebug("task %d(child %d) recv retrieve req from task %d, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId, + qDebug("s-task:%s (child %d) recv retrieve req from task:0x%x, reqId %" PRId64, pTask->id.idStr, pTask->selfChildId, pReq->srcTaskId, pReq->reqId); pData->type = STREAM_INPUT__DATA_RETRIEVE; @@ -197,30 +193,31 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq, return status == TASK_INPUT_STATUS__NORMAL ? 0 : -1; } -int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { +// todo add log +int32_t streamTaskOutputResultBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t code = 0; if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); + destroyStreamDataBlock(pBlock); } else if (pTask->outputType == TASK_OUTPUT__SMA) { pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); + destroyStreamDataBlock(pBlock); } else { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); code = taosWriteQitem(pTask->outputQueue->queue, pBlock); - if (code != 0) { + if (code != 0) { // todo failed to add it into the output queue, free it. return code; } - streamDispatch(pTask); + + streamDispatchStreamBlock(pTask); } + return 0; } -int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { - qDebug("s-task:%s receive dispatch msg from taskId:%d(vgId:%d)", pTask->id.idStr, pReq->upstreamTaskId, - pReq->upstreamNodeId); +int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp, bool exec) { + qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr, + pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen); // todo add the input queue buffer limitation streamTaskEnqueueBlocks(pTask, pReq, pRsp); @@ -257,8 +254,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i return 0; } - // continue dispatch - streamDispatch(pTask); + // continue dispatch one block to down stream in pipeline + streamDispatchStreamBlock(pTask); return 0; } @@ -268,13 +265,13 @@ int32_t streamProcessRunReq(SStreamTask* pTask) { } /*if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {*/ - /*streamDispatch(pTask);*/ + /*streamDispatchStreamBlock(pTask);*/ /*}*/ return 0; } int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pRsp) { - qDebug("s-task:%s receive retrieve req from node %d taskId:%d", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId); + qDebug("s-task:%s receive retrieve req from node %d taskId:0x%x", pTask->id.idStr, pReq->srcNodeId, pReq->srcTaskId); streamTaskEnqueueRetrieve(pTask, pReq, pRsp); ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); @@ -294,16 +291,13 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); if (type == STREAM_INPUT__DATA_SUBMIT) { - int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - px->submit.msgLen, px->submit.ver, numOfBlocks, size); + px->submit.msgLen, px->submit.ver, total, size); if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) { qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, + pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); streamDataSubmitDestroy(px); taosFreeQitem(pItem); @@ -312,22 +306,20 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { - int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; - double size = QUEUE_MEM_SIZE_IN_MB(pTask->inputQueue->queue); - if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) { qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", - pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks, + pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total, size); return -1; } - qDebug("s-task:%s data block enqueue, total in queue:%d", pTask->id.idStr, numOfBlocks); + qDebug("s-task:%s data block enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__GET_RES) { taosWriteQitem(pTask->inputQueue->queue, pItem); + qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index b70a8c93a9..7c06e7deb3 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -15,20 +15,28 @@ #include "streamInc.h" -int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData) { +SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) { + SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen); + if (pData == NULL) { + return NULL; + } + + pData->type = blockType; + pData->srcVgId = srcVg; + int32_t blockNum = pReq->blockNum; SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum); if (pArray == NULL) { - return -1; + return NULL; } - ASSERT(pReq->blockNum == taosArrayGetSize(pReq->data)); - ASSERT(pReq->blockNum == taosArrayGetSize(pReq->dataLen)); + ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen))); for (int32_t i = 0; i < blockNum; i++) { SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i); SSDataBlock* pDataBlock = taosArrayGet(pArray, i); blockDecode(pDataBlock, pRetrieve->data); + // TODO: refactor pDataBlock->info.window.skey = be64toh(pRetrieve->skey); pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey); @@ -39,8 +47,41 @@ int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock pDataBlock->info.type = pRetrieve->streamBlockType; pDataBlock->info.childId = pReq->upstreamChildId; } + pData->blocks = pArray; - return 0; + return pData; +} + +SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize, SArray* pRes) { + SStreamDataBlock* pStreamBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, resultSize); + if (pStreamBlocks == NULL) { + taosArrayClearEx(pRes, (FDelete)blockDataFreeRes); + return NULL; + } + + pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK; + pStreamBlocks->blocks = pRes; + + if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem; + pStreamBlocks->childId = pTask->selfChildId; + pStreamBlocks->sourceVer = pSubmit->ver; + } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem; + pStreamBlocks->childId = pTask->selfChildId; + pStreamBlocks->sourceVer = pMerged->ver; + } + + return pStreamBlocks; +} + +void destroyStreamDataBlock(SStreamDataBlock* pBlock) { + if (pBlock == NULL) { + return; + } + + taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); + taosFreeQitem(pBlock); } int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData) { @@ -184,11 +225,13 @@ void streamFreeQitem(SStreamQueueItem* data) { taosFreeQitem(data); } else if (type == STREAM_INPUT__MERGED_SUBMIT) { SStreamMergedSubmit* pMerge = (SStreamMergedSubmit*)data; - int32_t sz = taosArrayGetSize(pMerge->submits); + + int32_t sz = taosArrayGetSize(pMerge->submits); for (int32_t i = 0; i < sz; i++) { int32_t* pRef = taosArrayGetP(pMerge->dataRefs, i); int32_t ref = atomic_sub_fetch_32(pRef, 1); ASSERT(ref >= 0); + if (ref == 0) { SPackedData* pSubmit = (SPackedData*)taosArrayGet(pMerge->submits, i); taosMemoryFree(pSubmit->msgStr); diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index a757d39d3f..401a8b9e74 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -24,6 +24,7 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->totalLen) < 0) return -1; ASSERT(taosArrayGetSize(pReq->data) == pReq->blockNum); ASSERT(taosArrayGetSize(pReq->dataLen) == pReq->blockNum); for (int32_t i = 0; i < pReq->blockNum; i++) { @@ -45,6 +46,8 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) { if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->totalLen) < 0) return -1; + ASSERT(pReq->blockNum > 0); pReq->data = taosArrayInit(pReq->blockNum, sizeof(void*)); pReq->dataLen = taosArrayInit(pReq->blockNum, sizeof(int32_t)); @@ -135,7 +138,6 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i); req.dstNodeId = pEpInfo->nodeId; req.dstTaskId = pEpInfo->taskId; - int32_t code; int32_t len; tEncodeSize(tEncodeStreamRetrieveReq, &req, len, code); if (code < 0) { @@ -155,30 +157,25 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) tEncodeStreamRetrieveReq(&encoder, &req); tEncoderClear(&encoder); - SRpcMsg rpcMsg = { - .code = 0, - .msgType = TDMT_STREAM_RETRIEVE, - .pCont = buf, - .contLen = sizeof(SMsgHead) + len, - }; - + SRpcMsg rpcMsg = { .code = 0, .msgType = TDMT_STREAM_RETRIEVE, .pCont = buf, .contLen = sizeof(SMsgHead) + len }; if (tmsgSendReq(&pEpInfo->epSet, &rpcMsg) < 0) { ASSERT(0); goto CLEAR; } - buf = NULL; - qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId %" PRId64, pTask->id.idStr, + buf = NULL; + qDebug("s-task:%s (child %d) send retrieve req to task %d at node %d, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId, pEpInfo->taskId, pEpInfo->nodeId, req.reqId); } code = 0; + CLEAR: taosMemoryFree(pRetrieve); rpcFreeCont(buf); return code; } -static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { +static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq) { int32_t dataStrLen = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); void* buf = taosMemoryCalloc(1, dataStrLen); if (buf == NULL) return -1; @@ -205,6 +202,7 @@ static int32_t streamAddBlockToDispatchMsg(const SSDataBlock* pBlock, SStreamDis taosArrayPush(pReq->dataLen, &actualLen); taosArrayPush(pReq->data, &buf); + pReq->totalLen += dataStrLen; return 0; } @@ -291,7 +289,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov return 0; } -int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { +int32_t doSendDispatchMsg(SStreamTask* pTask, const SStreamDispatchReq* pReq, int32_t vgId, SEpSet* pEpSet) { void* buf = NULL; int32_t code = -1; SRpcMsg msg = {0}; @@ -320,11 +318,12 @@ int32_t streamDispatchOneDataReq(SStreamTask* pTask, const SStreamDispatchReq* p msg.pCont = buf; msg.msgType = pTask->dispatchMsgType; - qDebug("dispatch from s-task:%s to taskId:%d vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); + qDebug("dispatch from s-task:%s to taskId:0x%x vgId:%d data msg", pTask->id.idStr, pReq->taskId, vgId); tmsgSendReq(pEpSet, &msg); code = 0; return 0; + FAIL: if (buf) rpcFreeCont(buf); return code; @@ -360,7 +359,7 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j); ASSERT(pVgInfo->vgId > 0); if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { - if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { + if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { return -1; } if (pReqs[j].blockNum == 0) { @@ -376,9 +375,9 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S } int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) { - int32_t code = -1; - int32_t blockNum = taosArrayGetSize(pData->blocks); - ASSERT(blockNum != 0); + int32_t code = 0; + int32_t numOfBlocks = taosArrayGetSize(pData->blocks); + ASSERT(numOfBlocks != 0); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq req = { @@ -387,19 +386,25 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat .upstreamTaskId = pTask->id.taskId, .upstreamChildId = pTask->selfChildId, .upstreamNodeId = pTask->nodeId, - .blockNum = blockNum, + .blockNum = numOfBlocks, }; - req.data = taosArrayInit(blockNum, sizeof(void*)); - req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); + req.data = taosArrayInit(numOfBlocks, sizeof(void*)); + req.dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t)); if (req.data == NULL || req.dataLen == NULL) { - goto FAIL_FIXED_DISPATCH; + taosArrayDestroyP(req.data, taosMemoryFree); + taosArrayDestroy(req.dataLen); + return TSDB_CODE_OUT_OF_MEMORY; } - for (int32_t i = 0; i < blockNum; i++) { + for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); - if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) { - goto FAIL_FIXED_DISPATCH; + code = streamAddBlockIntoDispatchMsg(pDataBlock, &req); + + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroyP(req.data, taosMemoryFree); + taosArrayDestroy(req.dataLen); + return code; } } @@ -410,19 +415,12 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat req.taskId = downstreamTaskId; qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr, - pTask->selfChildId, blockNum, downstreamTaskId, vgId); + pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId); - if (streamDispatchOneDataReq(pTask, &req, vgId, pEpSet) < 0) { - goto FAIL_FIXED_DISPATCH; - } - - code = 0; - - FAIL_FIXED_DISPATCH: + code = doSendDispatchMsg(pTask, &req, vgId, pEpSet); taosArrayDestroyP(req.data, taosMemoryFree); taosArrayDestroy(req.dataLen); return code; - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); ASSERT(rspCnt == 0); @@ -452,13 +450,13 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat pReqs[i].taskId = pVgInfo->taskId; } - for (int32_t i = 0; i < blockNum; i++) { + for (int32_t i = 0; i < numOfBlocks; i++) { SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i); // TODO: do not use broadcast if (pDataBlock->info.type == STREAM_DELETE_RESULT) { for (int32_t j = 0; j < vgSz; j++) { - if (streamAddBlockToDispatchMsg(pDataBlock, &pReqs[j]) < 0) { + if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) { goto FAIL_SHUFFLE_DISPATCH; } if (pReqs[j].blockNum == 0) { @@ -475,7 +473,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat } qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroups", pTask->id.idStr, pTask->selfChildId, - blockNum, vgSz); + numOfBlocks, vgSz); for (int32_t i = 0; i < vgSz; i++) { if (pReqs[i].blockNum > 0) { @@ -483,7 +481,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat qDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr, pTask->selfChildId, pReqs[i].blockNum, pVgInfo->vgId); - if (streamDispatchOneDataReq(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { + if (doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet) < 0) { goto FAIL_SHUFFLE_DISPATCH; } } @@ -501,9 +499,8 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat return code; } -int32_t streamDispatch(SStreamTask* pTask) { +int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); - int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue); if (numOfElems > 0) { qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr, @@ -517,23 +514,22 @@ int32_t streamDispatch(SStreamTask* pTask) { return 0; } - SStreamDataBlock* pBlock = streamQueueNextItem(pTask->outputQueue); - if (pBlock == NULL) { + SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue); + if (pDispatchedBlock == NULL) { qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); return 0; } - ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); + ASSERT(pDispatchedBlock->type == STREAM_INPUT__DATA_BLOCK); - int32_t code = 0; - if (streamDispatchAllBlocks(pTask, pBlock) < 0) { - code = -1; + int32_t code = streamDispatchAllBlocks(pTask, pDispatchedBlock); + if (code != TSDB_CODE_SUCCESS) { streamQueueProcessFail(pTask->outputQueue); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); } - taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); - taosFreeQitem(pBlock); + // this block can be freed only when it has been pushed to down stream. + destroyStreamDataBlock(pDispatchedBlock); return code; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index d14e8102d8..55474541ed 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -18,6 +18,9 @@ // maximum allowed processed block batches. One block may include several submit blocks #define MAX_STREAM_EXEC_BATCH_NUM 128 #define MIN_STREAM_EXEC_BATCH_NUM 16 +#define MAX_STREAM_RESULT_DUMP_THRESHOLD 1000 + +static int32_t updateCheckPointInfo (SStreamTask* pTask); bool streamTaskShouldStop(const SStreamStatus* pStatus) { int32_t status = atomic_load_8((int8_t*)&pStatus->taskStatus); @@ -29,56 +32,57 @@ bool streamTaskShouldPause(const SStreamStatus* pStatus) { return (status == TASK_STATUS__PAUSE); } -static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* pRes) { +static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* pRes, int32_t size, int64_t* totalSize, + int32_t* totalBlocks) { + int32_t code = updateCheckPointInfo(pTask); + if (code != TSDB_CODE_SUCCESS) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return code; + } + + int32_t numOfBlocks = taosArrayGetSize(pRes); + if (numOfBlocks > 0) { + SStreamDataBlock* pStreamBlocks = createStreamBlockFromResults(pItem, pTask, size, pRes); + if (pStreamBlocks == NULL) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return -1; + } + + qDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, size/1048576.0); + + code = streamTaskOutputResultBlock(pTask, pStreamBlocks); + if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { // back pressure and record position + destroyStreamDataBlock(pStreamBlocks); + return -1; + } + + *totalSize += size; + *totalBlocks += numOfBlocks; + } else { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) { int32_t code = TSDB_CODE_SUCCESS; void* pExecutor = pTask->exec.pExecutor; - while (pTask->taskLevel == TASK_LEVEL__SOURCE) { - int8_t status = atomic_load_8(&pTask->status.taskStatus); - if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { - qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, - atomic_load_8(&pTask->status.taskStatus)); - taosMsleep(2); - } else { - break; - } - } + *totalBlocks = 0; + *totalSize = 0; - // set input - const SStreamQueueItem* pItem = (const SStreamQueueItem*)data; - if (pItem->type == STREAM_INPUT__GET_RES) { - const SStreamTrigger* pTrigger = (const SStreamTrigger*)data; - qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { - ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); - const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)data; - qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); - qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, - pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); - } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { - const SStreamDataBlock* pBlock = (const SStreamDataBlock*)data; + int32_t size = 0; + int32_t numOfBlocks = 0; + SArray* pRes = NULL; - SArray* pBlockList = pBlock->blocks; - int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); - qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); - } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { - const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)data; - - SArray* pBlockList = pMerged->submits; - int32_t numOfBlocks = taosArrayGetSize(pBlockList); - qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); - qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); - } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { - const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)data; - qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - } else { - ASSERT(0); - } - - // pExecutor while (1) { + if (pRes == NULL) { + pRes = taosArrayInit(4, sizeof(SSDataBlock)); + } + if (streamTaskShouldStop(&pTask->status)) { + taosArrayDestroy(pRes); // memory leak return 0; } @@ -97,17 +101,18 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* if (pItem->type == STREAM_INPUT__DATA_RETRIEVE) { SSDataBlock block = {0}; - const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*)data; + const SStreamDataBlock* pRetrieveBlock = (const SStreamDataBlock*) pItem; ASSERT(taosArrayGetSize(pRetrieveBlock->blocks) == 1); assignOneDataBlock(&block, taosArrayGet(pRetrieveBlock->blocks, 0)); block.info.type = STREAM_PULL_OVER; block.info.childId = pTask->selfChildId; taosArrayPush(pRes, &block); - - qDebug("task %d(child %d) processed retrieve, reqId %" PRId64, pTask->id.taskId, pTask->selfChildId, + numOfBlocks += 1; + qDebug("s-task:%s(child %d) processed retrieve, reqId:0x%" PRIx64, pTask->id.idStr, pTask->selfChildId, pRetrieveBlock->reqId); } + break; } @@ -118,15 +123,40 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* continue; } - qDebug("s-task:%s (child %d) executed and get block", pTask->id.idStr, pTask->selfChildId); - SSDataBlock block = {0}; assignOneDataBlock(&block, output); block.info.childId = pTask->selfChildId; + + size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); + numOfBlocks += 1; + taosArrayPush(pRes, &block); + + qDebug("s-task:%s (child %d) executed and get block, total blocks:%d, size:%.2fMiB", pTask->id.idStr, + pTask->selfChildId, numOfBlocks, size / 1048576.0); + + // current output should be dispatched to down stream nodes + if (numOfBlocks >= MAX_STREAM_RESULT_DUMP_THRESHOLD) { + ASSERT(numOfBlocks == taosArrayGetSize(pRes)); + code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + pRes = NULL; + size = 0; + numOfBlocks = 0; + } } - return 0; + if (numOfBlocks > 0) { + ASSERT(numOfBlocks == taosArrayGetSize(pRes)); + code = doDumpResult(pTask, pItem, pRes, size, totalSize, totalBlocks); + } else { + taosArrayDestroy(pRes); + } + + return code; } int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { @@ -200,7 +230,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { qRes->type = STREAM_INPUT__DATA_BLOCK; qRes->blocks = pRes; - code = streamTaskOutput(pTask, qRes); + code = streamTaskOutputResultBlock(pTask, qRes); if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosFreeQitem(qRes); @@ -209,7 +239,7 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) { if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt); - streamDispatch(pTask); + streamDispatchStreamBlock(pTask); } if (finished) { @@ -246,7 +276,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { if (pTask->taskLevel == TASK_LEVEL__SINK) { ASSERT(((SStreamQueueItem*)pItem)->type == STREAM_INPUT__DATA_BLOCK); - streamTaskOutput(pTask, (SStreamDataBlock*)pItem); + streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pItem); } // exec impl @@ -257,6 +287,34 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { } #endif +int32_t updateCheckPointInfo (SStreamTask* pTask) { + int64_t ckId = 0; + int64_t dataVer = 0; + qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); + + SCheckpointInfo* pCkInfo = &pTask->chkInfo; + if (ckId > pCkInfo->id) { // save it since the checkpoint is updated + qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 + ", checkPoint id:%" PRId64 " -> %" PRId64, pTask->id.idStr, pCkInfo->version, dataVer, pCkInfo->id, ckId); + + pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pCkInfo->currentVer}; + + taosWLockLatch(&pTask->pMeta->lock); + + streamMetaSaveTask(pTask->pMeta, pTask); + if (streamMetaCommit(pTask->pMeta) < 0) { + taosWUnLockLatch(&pTask->pMeta->lock); + qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); + return -1; + } else { + taosWUnLockLatch(&pTask->pMeta->lock); + qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr); + } + } + + return TSDB_CODE_SUCCESS; +} + int32_t streamExecForAll(SStreamTask* pTask) { int32_t code = 0; while (1) { @@ -272,6 +330,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (streamTaskShouldPause(&pTask->status)) { return 0; } + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { @@ -324,74 +383,70 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pTask->taskLevel == TASK_LEVEL__SINK) { ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK); qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize); - streamTaskOutput(pTask, (SStreamDataBlock*)pInput); + streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput); continue; } - SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + // wait for the task to be ready to go + while (pTask->taskLevel == TASK_LEVEL__SOURCE) { + int8_t status = atomic_load_8(&pTask->status.taskStatus); + if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { + qError("stream task wait for the end of fill history, s-task:%s, status:%d", pTask->id.idStr, + atomic_load_8(&pTask->status.taskStatus)); + taosMsleep(2); + } else { + break; + } + } + + int64_t st = taosGetTimestampMs(); qDebug("s-task:%s start to execute, block batches:%d", pTask->id.idStr, batchSize); - streamTaskExecImpl(pTask, pInput, pRes); + { + // set input + void* pExecutor = pTask->exec.pExecutor; - int64_t ckId = 0; - int64_t dataVer = 0; - qGetCheckpointVersion(pTask->exec.pExecutor, &dataVer, &ckId); - if (ckId > pTask->chkInfo.id) { // save it since the checkpoint is updated - qDebug("s-task:%s exec end, start to update check point, ver from %" PRId64 " to %" PRId64 - ", checkPoint id:%" PRId64 " -> %" PRId64, - pTask->id.idStr, pTask->chkInfo.version, dataVer, pTask->chkInfo.id, ckId); + const SStreamQueueItem* pItem = pInput; + if (pItem->type == STREAM_INPUT__GET_RES) { + const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput; + qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); + } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { + ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); + const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; + qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); + qDebug("s-task:%s set submit blocks as source block completed, %p %p len:%d ver:%" PRId64, pTask->id.idStr, pSubmit, + pSubmit->submit.msgStr, pSubmit->submit.msgLen, pSubmit->submit.ver); + } else if (pItem->type == STREAM_INPUT__DATA_BLOCK || pItem->type == STREAM_INPUT__DATA_RETRIEVE) { + const SStreamDataBlock* pBlock = (const SStreamDataBlock*)pInput; - pTask->chkInfo = (SCheckpointInfo){.version = dataVer, .id = ckId, .currentVer = pTask->chkInfo.currentVer}; + SArray* pBlockList = pBlock->blocks; + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + qDebug("s-task:%s set sdata blocks as input num:%d, ver:%" PRId64, pTask->id.idStr, numOfBlocks, pBlock->sourceVer); + qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__DATA_BLOCK); + } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { + const SStreamMergedSubmit* pMerged = (const SStreamMergedSubmit*)pInput; - taosWLockLatch(&pTask->pMeta->lock); - - streamMetaSaveTask(pTask->pMeta, pTask); - if (streamMetaCommit(pTask->pMeta) < 0) { - taosWUnLockLatch(&pTask->pMeta->lock); - qError("s-task:%s failed to commit stream meta, since %s", pTask->id.idStr, terrstr()); - return -1; + SArray* pBlockList = pMerged->submits; + int32_t numOfBlocks = taosArrayGetSize(pBlockList); + qDebug("s-task:%s %p set submit input (merged), batch num:%d", pTask->id.idStr, pTask, numOfBlocks); + qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT); + } else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) { + const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput; + qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else { - taosWUnLockLatch(&pTask->pMeta->lock); - qDebug("s-task:%s update checkpoint ver succeed", pTask->id.idStr); + ASSERT(0); } - } else { - qDebug("s-task:%s exec end", pTask->id.idStr); } - if (taosArrayGetSize(pRes) != 0) { - SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - if (qRes == NULL) { - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - streamFreeQitem(pInput); - return -1; - } + int64_t resSize = 0; + int32_t totalBlocks = 0; + streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); - qRes->type = STREAM_INPUT__DATA_BLOCK; - qRes->blocks = pRes; - - if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__DATA_SUBMIT) { - SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pInput; - qRes->childId = pTask->selfChildId; - qRes->sourceVer = pSubmit->ver; - } else if (((SStreamQueueItem*)pInput)->type == STREAM_INPUT__MERGED_SUBMIT) { - SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pInput; - qRes->childId = pTask->selfChildId; - qRes->sourceVer = pMerged->ver; - } - - code = streamTaskOutput(pTask, qRes); - if (code == TSDB_CODE_UTIL_QUEUE_OUT_OF_MEMORY) { - // backpressure and record position - taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - streamFreeQitem(pInput); - taosFreeQitem(qRes); - return -1; - } - } else { - taosArrayDestroy(pRes); - } + double el = (taosGetTimestampMs() - st) / 1000.0; + qDebug("s-task:%s exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", pTask->id.idStr, el, resSize / 1048576.0, totalBlocks); streamFreeQitem(pInput); } + return 0; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 682ce08c7f..33375fe921 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -296,6 +296,7 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { return 0; } +// todo add error log int32_t streamMetaCommit(SStreamMeta* pMeta) { if (tdbCommit(pMeta->db, pMeta->txn) < 0) { qError("failed to commit stream meta"); @@ -311,6 +312,7 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { return -1; } + return 0; } @@ -373,7 +375,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { } if (pTask->fillHistory) { - pTask->status.taskStatus = TASK_STATUS__WAIT_DOWNSTREAM; + ASSERT(pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM); streamTaskCheckDownstream(pTask, ver); } } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index f71d078a3d..eb2535782e 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -20,7 +20,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE); - qDebug("s-task:%s set task status:%d and start recover", pTask->id.idStr, pTask->status.taskStatus); + qDebug("s-task:%s set task status:%d and start to recover", pTask->id.idStr, pTask->status.taskStatus); streamSetParamForRecover(pTask); streamSourceRecoverPrepareStep1(pTask, version); @@ -46,6 +46,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { streamSetParamForRecover(pTask); streamAggRecoverPrepare(pTask); } else if (pTask->taskLevel == TASK_LEVEL__SINK) { + // sink nodes has no specified operation for fill history atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); } @@ -54,7 +55,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { // checkstatus int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { - qDebug("s-taks:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version); + qDebug("s-task:%s in fill history stage, ver:%"PRId64, pTask->id.idStr, version); SStreamTaskCheckReq req = { .streamId = pTask->id.streamId, @@ -71,23 +72,23 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { req.downstreamTaskId = pTask->fixedEpDispatcher.taskId; pTask->checkReqId = req.reqId; - qDebug("s-task:%s at node %d check downstream task %d at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, + qDebug("s-task:%s at node %d check downstream task:0x%x at node %d", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t vgSz = taosArrayGetSize(vgInfo); - pTask->recoverTryingDownstream = vgSz; - pTask->checkReqIds = taosArrayInit(vgSz, sizeof(int64_t)); + int32_t numOfVgs = taosArrayGetSize(vgInfo); + pTask->recoverTryingDownstream = numOfVgs; + pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t)); - for (int32_t i = 0; i < vgSz; i++) { + for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); req.reqId = tGenIdPI64(); taosArrayPush(pTask->checkReqIds, &req.reqId); req.downstreamNodeId = pVgInfo->vgId; req.downstreamTaskId = pVgInfo->taskId; - qDebug("s-task:%s at node %d check downstream task %d at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (shuffle)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); streamDispatchCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } @@ -110,15 +111,16 @@ int32_t streamRecheckOneDownstream(SStreamTask* pTask, const SStreamTaskCheckRsp .childId = pRsp->childId, }; - qDebug("s-task:%s at node %d check downstream task %d at node %d (recheck)", pTask->id.idStr, pTask->nodeId, + qDebug("s-task:%s at node %d check downstream task:0x%x at node %d (recheck)", pTask->id.idStr, pTask->nodeId, req.downstreamTaskId, req.downstreamNodeId); if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pTask->fixedEpDispatcher.epSet); } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t vgSz = taosArrayGetSize(vgInfo); - for (int32_t i = 0; i < vgSz; i++) { + + int32_t numOfVgs = taosArrayGetSize(vgInfo); + for (int32_t i = 0; i < numOfVgs; i++) { SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); if (pVgInfo->taskId == req.downstreamTaskId) { streamDispatchCheckMsg(pTask, &req, pRsp->downstreamNodeId, &pVgInfo->epSet); @@ -134,7 +136,9 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask) { } int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { - qDebug("task %d at node %d recv check rsp from task %d at node %d: status %d", pRsp->upstreamTaskId, + ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); + + qDebug("s-task:%s at node %d recv check rsp from task:0x%x at node %d: status %d", pTask->id.idStr, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status); if (pRsp->status == 1) { @@ -161,7 +165,7 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* taosArrayDestroy(pTask->checkReqIds); pTask->checkReqIds = NULL; - qDebug("s-task:%s all downstream tasks:%d are ready, now enter into recover stage", pTask->id.idStr, numOfReqs); + qDebug("s-task:%s all %d downstream tasks are ready, now enter into recover stage", pTask->id.idStr, numOfReqs); streamTaskLaunchRecover(pTask, version); } } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { @@ -174,9 +178,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* ASSERT(0); } } else { // not ready, wait for 100ms and retry - qDebug("s-task:%s downstream taskId:%d (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr, + qDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, wait for 100ms and retry", pTask->id.idStr, pRsp->downstreamTaskId, pRsp->downstreamNodeId); taosMsleep(100); + streamRecheckOneDownstream(pTask, pRsp); }