diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 6f2a6126b3..34372dc2ff 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -123,7 +123,7 @@ int32_t qSetSMAInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numOfBlocks, * @param isAdd * @return */ -int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd, SArray* pList); +int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd); /** * Create the exec task object according to task json diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 70b2eb0c3b..e857cce70d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -31,7 +31,6 @@ extern "C" { #ifndef _STREAM_H_ #define _STREAM_H_ -typedef void (*_free_reader_fn_t)(void*); typedef struct SStreamTask SStreamTask; enum { @@ -221,7 +220,6 @@ SStreamDataSubmit2* streamSubmitBlockClone(SStreamDataSubmit2* pSubmit); typedef struct { char* qmsg; void* pExecutor; // not applicable to encoder and decoder - struct STqReader* pTqReader; // not applicable to encoder and decoder struct SWalReader* pWalReader; // not applicable to encoder and decoder } STaskExec; @@ -333,7 +331,6 @@ struct SStreamTask { int64_t checkpointingId; int32_t checkpointAlignCnt; struct SStreamMeta* pMeta; - _free_reader_fn_t freeFp; }; // meta @@ -343,7 +340,6 @@ typedef struct SStreamMeta { TTB* pTaskDb; TTB* pCheckpointDb; SHashObj* pTasks; - SHashObj* pWalReadTasks; void* ahandle; TXN* txn; FTaskExpand* expandFunc; @@ -577,7 +573,6 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamT int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, int64_t checkpointVer, char* msg, int32_t msgLen); int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta); -SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); diff --git a/source/dnode/mgmt/mgmt_snode/src/smInt.c b/source/dnode/mgmt/mgmt_snode/src/smInt.c index 28097311ac..e222349767 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smInt.c +++ b/source/dnode/mgmt/mgmt_snode/src/smInt.c @@ -55,6 +55,7 @@ int32_t smOpen(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { smClose(pMgmt); return -1; } + tmsgReportStartup("snode-impl", "initialized"); if (smStartWorker(pMgmt) != 0) { diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 36521fd778..734f624be0 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -356,6 +356,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { qDestroyQueryPlan(pPlan); return -1; } + pInnerTask->fillHistory = pStream->fillHistory; mndAddTaskToTaskSet(taskInnerLevel, pInnerTask); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index eb4ea284a5..ba3544bbda 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -32,6 +32,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) { tDecoderClear(&decoder); goto FAIL; } + tDecoderClear(&decoder); int32_t taskId = req.taskId; @@ -78,6 +79,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pSnode->msgCb; pTask->chkInfo.version = ver; + pTask->pMeta = pSnode->pMeta; pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); if (pTask->pState == NULL) { @@ -138,6 +140,7 @@ int32_t sndProcessTaskDeployReq(SSnode *pSnode, char *msg, int32_t msgLen) { if (pTask == NULL) { return -1; } + SDecoder decoder; tDecoderInit(&decoder, (uint8_t *)msg, msgLen); code = tDecodeStreamTask(&decoder, pTask); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 8aeb705d90..ce987ca88e 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -168,7 +168,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pRSmaInfo->taskInfo[i]) { - if ((terrno = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd, NULL)) < 0) { + if ((terrno = qUpdateTableListForStreamScanner(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0) { tdReleaseRSmaInfo(pSma, pRSmaInfo); smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i, terrstr()); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 940841bf70..60c6c3d7fb 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -567,6 +567,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pTq->pVnode->msgCb; pTask->pMeta = pTq->pStreamMeta; + pTask->chkInfo.version = ver; // expand executor if (pTask->fillHistory) { @@ -628,18 +629,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { } if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - pTask->exec.pTqReader = tqOpenReader(pTq->pVnode); - if (pTask->exec.pTqReader == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, NULL); - - pTask->freeFp = (_free_reader_fn_t)tqCloseReader; - SArray* pList = qGetQueriedTableListInfo(pTask->exec.pExecutor); - tqReaderAddTbUidList(pTask->exec.pTqReader, pList); - taosArrayDestroy(pList); } streamSetupTrigger(pTask); @@ -669,17 +659,22 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { }; SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); - if (pTask && atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) { - rsp.status = 1; + if (pTask) { + rsp.status = (atomic_load_8(&pTask->status.taskStatus) == TASK_STATUS__NORMAL) ? 1 : 0; + streamMetaReleaseTask(pTq->pStreamMeta, pTask); + + tqDebug("tq recv task check req(reqId:0x%" PRIx64 + ") %d at node %d task status:%d, check req from task %d at node %d, rsp status %d", + rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, pTask->status.taskStatus, rsp.upstreamTaskId, + rsp.upstreamNodeId, rsp.status); } else { rsp.status = 0; + tqDebug("tq recv task check(taskId:%d not built yet) req(reqId:0x%" PRIx64 + ") %d at node %d, check req from task %d at node %d, rsp status %d", + taskId, rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, + rsp.status); } - if (pTask) streamMetaReleaseTask(pTq->pStreamMeta, pTask); - - tqDebug("tq recv task check req(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d", - rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); - SEncoder encoder; int32_t code; int32_t len; @@ -697,13 +692,7 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { tEncodeSStreamTaskCheckRsp(&encoder, &rsp); tEncoderClear(&encoder); - SRpcMsg rspMsg = { - .code = 0, - .pCont = buf, - .contLen = sizeof(SMsgHead) + len, - .info = pMsg->info, - }; - + SRpcMsg rspMsg = { .code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = pMsg->info }; tmsgSendRsp(&rspMsg); return 0; } @@ -719,8 +708,8 @@ int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t sversion, char* msg, int32 tDecoderClear(&decoder); return -1; } - tDecoderClear(&decoder); + tDecoderClear(&decoder); tqDebug("tq recv task check rsp(reqId:0x%" PRIx64 ") %d at node %d check req from task %d at node %d, status %d", rsp.reqId, rsp.downstreamTaskId, rsp.downstreamNodeId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); @@ -774,8 +763,8 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms streamTaskCheckDownstream(pTask, sversion); } - tqDebug("vgId:%d s-task:%s is deployed from mnd, status:%d, total:%d", TD_VID(pTq->pVnode), pTask->id.idStr, - pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); + tqDebug("vgId:%d s-task:%s is deployed and add meta from mnd, status:%d, total:%d", TD_VID(pTq->pVnode), + pTask->id.idStr, pTask->status.taskStatus, streamMetaGetNumOfTasks(pTq->pStreamMeta)); return 0; } @@ -1127,7 +1116,7 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { return 0; } - SStreamTask* pTask = streamMetaAcquireTaskEx(pTq->pStreamMeta, taskId); + SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId); if (pTask != NULL) { if (pTask->status.taskStatus == TASK_STATUS__NORMAL) { tqDebug("vgId:%d s-task:%s start to process run req", vgId, pTask->id.idStr); @@ -1141,7 +1130,6 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { } streamMetaReleaseTask(pTq->pStreamMeta, pTask); - tqStartStreamTasks(pTq); return 0; } else { diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 69624f4d10..25ab7209d2 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -973,7 +973,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { STqHandle* pTqHandle = (STqHandle*)pIter; if (pTqHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd, NULL); + int32_t code = qUpdateTableListForStreamScanner(pTqHandle->execHandle.task, tbUidList, isAdd); if (code != 0) { tqError("update qualified table error for %s", pTqHandle->subKey); continue; @@ -1031,18 +1031,11 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { SStreamTask* pTask = *(SStreamTask**)pIter; if (pTask->taskLevel == TASK_LEVEL__SOURCE) { - SArray* pList = NULL; - int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd, pList); + int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd); if (code != 0) { tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr); continue; } - - if (isAdd) { // only add qualified tables - tqReaderAddTbUidList(pTask->exec.pTqReader, pList); - } else { - tqReaderRemoveTbUidList(pTask->exec.pTqReader, tbUidList); - } } } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 791bfbe6df..4c37e1052f 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -49,32 +49,6 @@ int32_t tqAddInputBlockNLaunchTask(SStreamTask* pTask, SStreamQueueItem* pQueueI return TSDB_CODE_SUCCESS; } -int32_t launchTaskForWalBlock(SStreamTask* pTask, SFetchRet* pRet, STqOffset* pOffset) { - SStreamDataBlock* pBlocks = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, 0); - if (pBlocks == NULL) { // failed, do nothing - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - pRet->data.info.type = STREAM_NORMAL; - pBlocks->type = STREAM_INPUT__DATA_BLOCK; - pBlocks->sourceVer = pOffset->val.version; - pBlocks->blocks = taosArrayInit(0, sizeof(SSDataBlock)); - taosArrayPush(pBlocks->blocks, &pRet->data); - -// int64_t* ts = (int64_t*)(((SColumnInfoData*)ret.data.pDataBlock->pData)->pData); -// tqDebug("-----------%ld\n", ts[0]); - - int32_t code = tqAddInputBlockNLaunchTask(pTask, (SStreamQueueItem*)pBlocks, pBlocks->sourceVer); - if (code == TSDB_CODE_SUCCESS) { - pOffset->val.version = walReaderGetCurrentVer(pTask->exec.pTqReader->pWalReader); - tqDebug("s-task:%s set the ver:%" PRId64 " from WALReader after extract block from WAL", pTask->id.idStr, - pOffset->val.version); - } - - return 0; -} - void initOffsetForAllRestoreTasks(STQ* pTq) { void* pIter = NULL; @@ -90,8 +64,7 @@ void initOffsetForAllRestoreTasks(STQ* pTq) { } if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, - pTask->status.taskStatus); + tqDebug("s-task:%s skip push data, since not ready, status %d", pTask->id.idStr, pTask->status.taskStatus); continue; } @@ -120,7 +93,7 @@ void saveOffsetForAllTasks(STQ* pTq, int64_t ver) { } if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, + tqDebug("s-task:%s skip push data, not ready for processing, status %d", pTask->id.idStr, pTask->status.taskStatus); continue; } diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 255949a588..2bc91f0cb3 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -370,7 +370,7 @@ static SArray* filterUnqualifiedTables(const SStreamScanInfo* pScanInfo, const S return qa; } -int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd, SArray* pList) { +int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; const char* id = GET_TASKID(pTaskInfo); int32_t code = 0; @@ -386,11 +386,6 @@ int32_t qUpdateTableListForStreamScanner(qTaskInfo_t tinfo, const SArray* tableI if (isAdd) { // add new table id SArray* qa = filterUnqualifiedTables(pScanInfo, tableIdList, GET_TASKID(pTaskInfo)); int32_t numOfQualifiedTables = taosArrayGetSize(qa); - - if (pList != NULL) { - taosArrayAddAll(pList, qa); - } - qDebug("%d qualified child tables added into stream scanner, %s", numOfQualifiedTables, id); code = tqReaderAddTbUidList(pScanInfo->tqReader, qa); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 0008c8dd8c..fecc01f295 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -57,11 +57,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF goto _err; } - pMeta->pWalReadTasks = taosHashInit(64, fp, true, HASH_ENTRY_LOCK); - if (pMeta->pWalReadTasks == NULL) { - goto _err; - } - if (streamMetaBegin(pMeta) < 0) { goto _err; } @@ -75,7 +70,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF _err: taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); - if (pMeta->pWalReadTasks) taosHashCleanup(pMeta->pWalReadTasks); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); @@ -112,7 +106,6 @@ void streamMetaClose(SStreamMeta* pMeta) { } taosHashCleanup(pMeta->pTasks); - taosHashCleanup(pMeta->pWalReadTasks); taosMemoryFree(pMeta->path); taosMemoryFree(pMeta); } @@ -190,15 +183,12 @@ int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* return -1; } - pTask->status.taskStatus = STREAM_STATUS__NORMAL; taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, POINTER_BYTES); return 0; } int32_t streamMetaGetNumOfTasks(const SStreamMeta* pMeta) { - int32_t numOfReady = taosHashGetSize(pMeta->pTasks); - int32_t numOfRestoring = taosHashGetSize(pMeta->pWalReadTasks); - return numOfReady + numOfRestoring; + return (int32_t) taosHashGetSize(pMeta->pTasks); } SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId) { @@ -224,37 +214,6 @@ void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask) { } } -SStreamTask* streamMetaAcquireTaskEx(SStreamMeta* pMeta, int32_t taskId) { - taosRLockLatch(&pMeta->lock); - - SStreamTask* pTask = NULL; - int32_t numOfRestored = taosHashGetSize(pMeta->pWalReadTasks); - if (numOfRestored > 0) { - SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pWalReadTasks, &taskId, sizeof(taskId)); - if (p != NULL) { - pTask = *p; - if (pTask != NULL && (atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING)) { - atomic_add_fetch_32(&pTask->refCnt, 1); - taosRUnLockLatch(&pMeta->lock); - return pTask; - } - } - } - - SStreamTask** p = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); - if (p != NULL) { - pTask = *p; - if (pTask != NULL && atomic_load_8(&(pTask->status.taskStatus)) != TASK_STATUS__DROPPING) { - atomic_add_fetch_32(&pTask->refCnt, 1); - taosRUnLockLatch(&pMeta->lock); - return pTask; - } - } - - taosRUnLockLatch(&pMeta->lock); - return NULL; -} - void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); if (ppTask) { @@ -344,7 +303,7 @@ int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver) { return -1; } - if (taosHashPut(pMeta->pWalReadTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + if (taosHashPut(pMeta->pTasks, &pTask->id.taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { tdbFree(pKey); tdbFree(pVal); tdbTbcClose(pCur); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 9962cdfcc0..03afc0692d 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -17,6 +17,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { qDebug("s-task:%s at node %d launch recover", pTask->id.idStr, pTask->nodeId); + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__RECOVER_PREPARE); streamSetParamForRecover(pTask); @@ -33,12 +34,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { memcpy(serializedReq, &req, len); - SRpcMsg rpcMsg = { - .contLen = len, - .pCont = serializedReq, - .msgType = TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE, - }; - + SRpcMsg rpcMsg = { .contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_RECOVER_NONBLOCKING_STAGE }; if (tmsgPutToQueue(pTask->pMsgCb, STREAM_QUEUE, &rpcMsg) < 0) { /*ASSERT(0);*/ } @@ -50,6 +46,7 @@ int32_t streamTaskLaunchRecover(SStreamTask* pTask, int64_t version) { } else if (pTask->taskLevel == TASK_LEVEL__SINK) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); } + return 0; } @@ -61,6 +58,7 @@ int32_t streamTaskCheckDownstream(SStreamTask* pTask, int64_t version) { .upstreamNodeId = pTask->nodeId, .childId = pTask->selfChildId, }; + // serialize if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { req.reqId = tGenIdPI64(); @@ -128,6 +126,7 @@ int32_t streamProcessTaskCheckReq(SStreamTask* pTask, const SStreamTaskCheckReq* int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp, int64_t version) { qDebug("task %d at node %d recv check rsp from task %d at node %d: status %d", pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status); + if (pRsp->status == 1) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { bool found = false; @@ -138,7 +137,11 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* break; } } - if (!found) return -1; + + if (!found) { + return -1; + } + int32_t left = atomic_sub_fetch_32(&pTask->recoverTryingDownstream, 1); ASSERT(left >= 0); if (left == 0) { @@ -147,7 +150,10 @@ int32_t streamProcessTaskCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* streamTaskLaunchRecover(pTask, version); } } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - if (pRsp->reqId != pTask->checkReqId) return -1; + if (pRsp->reqId != pTask->checkReqId) { + return -1; + } + streamTaskLaunchRecover(pTask, version); } else { ASSERT(0); @@ -167,6 +173,7 @@ int32_t streamRestoreParam(SStreamTask* pTask) { void* exec = pTask->exec.pExecutor; return qStreamRestoreParam(exec); } + int32_t streamSetStatusNormal(SStreamTask* pTask) { atomic_store_8(&pTask->status.taskStatus, TASK_STATUS__NORMAL); return 0; @@ -227,8 +234,8 @@ int32_t streamDispatchRecoverFinishReq(SStreamTask* pTask) { // agg int32_t streamAggRecoverPrepare(SStreamTask* pTask) { - void* exec = pTask->exec.pExecutor; pTask->recoverWaitingUpstream = taosArrayGetSize(pTask->childEpInfo); + qDebug("s-task:%s wait for %d upstreams", pTask->id.idStr, pTask->recoverWaitingUpstream); return 0; } @@ -247,6 +254,7 @@ int32_t streamAggChildrenRecoverFinish(SStreamTask* pTask) { int32_t streamProcessRecoverFinishReq(SStreamTask* pTask, int32_t childId) { if (pTask->taskLevel == TASK_LEVEL__AGG) { int32_t left = atomic_sub_fetch_32(&pTask->recoverWaitingUpstream, 1); + qDebug("s-task:%s remain unfinished child tasks:%d", pTask->id.idStr, left); ASSERT(left >= 0); if (left == 0) { streamAggChildrenRecoverFinish(pTask); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4783997276..67c60008fd 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -187,11 +187,6 @@ void tFreeStreamTask(SStreamTask* pTask) { pTask->exec.pExecutor = NULL; } - if (pTask->exec.pTqReader != NULL && pTask->freeFp != NULL) { - pTask->freeFp(pTask->exec.pTqReader); - pTask->exec.pTqReader = NULL; - } - if (pTask->exec.pWalReader != NULL) { walCloseReader(pTask->exec.pWalReader); }