diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 404d81465b..103ca6a4f0 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -66,6 +66,25 @@ enum { TASK_OUTPUT_STATUS__BLOCKED, }; +enum { + TASK_TRIGGER_STATUS__INACTIVE = 1, + TASK_TRIGGER_STATUS__ACTIVE, +}; + +enum { + TASK_LEVEL__SOURCE = 1, + TASK_LEVEL__AGG, + TASK_LEVEL__SINK, +}; + +enum { + TASK_OUTPUT__FIXED_DISPATCH = 1, + TASK_OUTPUT__SHUFFLE_DISPATCH, + TASK_OUTPUT__TABLE, + TASK_OUTPUT__SMA, + TASK_OUTPUT__FETCH, +}; + typedef struct { int8_t type; } SStreamQueueItem; @@ -202,29 +221,6 @@ typedef struct { int8_t reserved; } STaskSinkFetch; -enum { - TASK_EXEC__NONE = 1, - TASK_EXEC__PIPE, -}; - -enum { - TASK_DISPATCH__NONE = 1, - TASK_DISPATCH__FIXED, - TASK_DISPATCH__SHUFFLE, -}; - -enum { - TASK_SINK__NONE = 1, - TASK_SINK__TABLE, - TASK_SINK__SMA, - TASK_SINK__FETCH, -}; - -enum { - TASK_TRIGGER_STATUS__IN_ACTIVE = 1, - TASK_TRIGGER_STATUS__ACTIVE, -}; - typedef struct { int32_t nodeId; int32_t childId; @@ -237,11 +233,8 @@ typedef struct { typedef struct SStreamTask { int64_t streamId; int32_t taskId; - int8_t isDataScan; - int8_t execType; - int8_t sinkType; - int8_t dispatchType; - int8_t isStreamDistributed; + int8_t taskLevel; + int8_t outputType; int16_t dispatchMsgType; int8_t taskStatus; @@ -252,13 +245,12 @@ typedef struct SStreamTask { int32_t nodeId; SEpSet epSet; - // used for semi or single task, - // while final task should have processedVer for each child + // used for task source and sink, + // while task agg should have processedVer for each child int64_t recoverSnapVer; int64_t startVer; int64_t checkpointVer; int64_t processedVer; - // int32_t numOfVgroups; // children info SArray* childEpInfo; // SArray @@ -266,19 +258,13 @@ typedef struct SStreamTask { // exec STaskExec exec; - // TODO: unify sink and dispatch - - // local sink - union { - STaskSinkTb tbSink; - STaskSinkSma smaSink; - STaskSinkFetch fetchSink; - }; - - // remote dispatcher + // output union { STaskDispatcherFixedEp fixedEpDispatcher; STaskDispatcherShuffle shuffleDispatcher; + STaskSinkTb tbSink; + STaskSinkSma smaSink; + STaskSinkFetch fetchSink; }; int8_t inputStatus; @@ -292,9 +278,6 @@ typedef struct SStreamTask { int64_t triggerParam; void* timer; - // application storage - // void* ahandle; - // msg handle SMsgCb* pMsgCb; } SStreamTask; @@ -331,7 +314,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem } if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { - atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE); + atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); } #if 0 @@ -346,18 +329,15 @@ static FORCE_INLINE void streamTaskInputFail(SStreamTask* pTask) { } static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBlock* pBlock) { - if (pTask->sinkType == TASK_SINK__TABLE) { - ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); + if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks); taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(pBlock); - } else if (pTask->sinkType == TASK_SINK__SMA) { - ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE); + } else if (pTask->outputType == TASK_OUTPUT__SMA) { pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks); taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes); taosFreeQitem(pBlock); } else { - ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); taosWriteQitem(pTask->outputQueue->queue, pBlock); } return 0; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 9d7fa537bb..218f82df18 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -98,13 +98,11 @@ END: } int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { - pTask->dispatchType = TASK_DISPATCH__NONE; - // sink if (pStream->smaId != 0) { - pTask->sinkType = TASK_SINK__SMA; + pTask->outputType = TASK_OUTPUT__SMA; pTask->smaSink.smaId = pStream->smaId; } else { - pTask->sinkType = TASK_SINK__TABLE; + pTask->outputType = TASK_OUTPUT__TABLE; pTask->tbSink.stbUid = pStream->targetStbUid; memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); @@ -113,8 +111,6 @@ int32_t mndAddSinkToTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask } int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStreamTask* pTask) { - pTask->sinkType = TASK_SINK__NONE; - bool isShuffle = false; if (pStream->fixedSinkVgId == 0) { @@ -122,7 +118,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream ASSERT(pDb); if (pDb->cfg.numOfVgroups > 1) { isShuffle = true; - pTask->dispatchType = TASK_DISPATCH__SHUFFLE; + pTask->outputType = TASK_OUTPUT__SHUFFLE_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; if (mndExtractDbInfo(pMnode, pDb, &pTask->shuffleDispatcher.dbInfo, NULL) < 0) { ASSERT(0); @@ -152,7 +148,7 @@ int32_t mndAddDispatcherToInnerTask(SMnode* pMnode, SStreamObj* pStream, SStream } } } else { - pTask->dispatchType = TASK_DISPATCH__FIXED; + pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; SArray* pArray = taosArrayGetP(pStream->tasks, 0); // one sink only @@ -178,7 +174,6 @@ int32_t mndAssignTaskToVg(SMnode* pMnode, SStreamTask* pTask, SSubplan* plan, co terrno = TSDB_CODE_QRY_INVALID_INPUT; return -1; } - ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE || pTask->sinkType != TASK_SINK__NONE); return 0; } @@ -249,26 +244,20 @@ int32_t mndAddShuffleSinkTasksToStream(SMnode* pMnode, SStreamObj* pStream) { pTask->nodeId = pVgroup->vgId; pTask->epSet = mndGetVgroupEpset(pMnode, pVgroup); - // source - pTask->isDataScan = 0; - - // exec - pTask->execType = TASK_EXEC__NONE; + // type + pTask->taskLevel = TASK_LEVEL__SINK; // sink if (pStream->smaId != 0) { - pTask->sinkType = TASK_SINK__SMA; + pTask->outputType = TASK_OUTPUT__SMA; pTask->smaSink.smaId = pStream->smaId; } else { - pTask->sinkType = TASK_SINK__TABLE; + pTask->outputType = TASK_OUTPUT__TABLE; pTask->tbSink.stbUid = pStream->targetStbUid; memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); ASSERT(pTask->tbSink.pSchemaWrapper); } - - // dispatch - pTask->dispatchType = TASK_DISPATCH__NONE; } return 0; } @@ -295,25 +284,19 @@ int32_t mndAddFixedSinkTaskToStream(SMnode* pMnode, SStreamObj* pStream) { #endif pTask->epSet = mndGetVgroupEpset(pMnode, &pStream->fixedSinkVg); - // source - pTask->isDataScan = 0; - - // exec - pTask->execType = TASK_EXEC__NONE; + pTask->taskLevel = TASK_LEVEL__SINK; // sink if (pStream->smaId != 0) { - pTask->sinkType = TASK_SINK__SMA; + pTask->outputType = TASK_OUTPUT__SMA; pTask->smaSink.smaId = pStream->smaId; } else { - pTask->sinkType = TASK_SINK__TABLE; + pTask->outputType = TASK_OUTPUT__TABLE; pTask->tbSink.stbUid = pStream->targetStbUid; memcpy(pTask->tbSink.stbFullName, pStream->targetSTbName, TSDB_TABLE_FNAME_LEN); pTask->tbSink.pSchemaWrapper = tCloneSSchemaWrapper(&pStream->outputSchema); } - // dispatch - pTask->dispatchType = TASK_DISPATCH__NONE; return 0; } @@ -338,6 +321,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { bool multiTarget = pDbObj->cfg.numOfVgroups > 1; if (totLevel == 2 || externalTargetDB || multiTarget) { + /*if (true) {*/ SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); taosArrayPush(pStream->tasks, &taskOneLevel); // add extra sink @@ -376,8 +360,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { pInnerTask->childEpInfo = taosArrayInit(0, sizeof(void*)); - // source - pInnerTask->isDataScan = 0; + pInnerTask->taskLevel = TASK_LEVEL__AGG; // trigger pInnerTask->triggerParam = pStream->triggerParam; @@ -388,9 +371,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { return -1; } - // exec - pInnerTask->execType = TASK_EXEC__PIPE; - #if 0 SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb); ASSERT(pDbObj != NULL); @@ -452,19 +432,16 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { mndAddTaskToTaskSet(taskSourceLevel, pTask); // source - pTask->isDataScan = 1; + pTask->taskLevel = TASK_LEVEL__SOURCE; // add fixed vg dispatch - pTask->sinkType = TASK_SINK__NONE; pTask->dispatchMsgType = TDMT_STREAM_TASK_DISPATCH; - pTask->dispatchType = TASK_DISPATCH__FIXED; + pTask->outputType = TASK_OUTPUT__FIXED_DISPATCH; pTask->fixedEpDispatcher.taskId = pInnerTask->taskId; pTask->fixedEpDispatcher.nodeId = pInnerTask->nodeId; pTask->fixedEpDispatcher.epSet = pInnerTask->epSet; - // exec - pTask->execType = TASK_EXEC__PIPE; if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); @@ -515,7 +492,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { mndAddTaskToTaskSet(taskOneLevel, pTask); // source - pTask->isDataScan = 1; + pTask->taskLevel = TASK_LEVEL__SOURCE; // trigger pTask->triggerParam = pStream->triggerParam; @@ -527,8 +504,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { mndAddSinkToTask(pMnode, pStream, pTask); } - // exec - pTask->execType = TASK_EXEC__PIPE; if (mndAssignTaskToVg(pMnode, pTask, plan, pVgroup) < 0) { sdbRelease(pSdb, pVgroup); qDestroyQueryPlan(pPlan); diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index b8501db9fb..902879701c 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -323,8 +323,7 @@ FAIL: } int32_t mndPersistTaskDeployReq(STrans *pTrans, const SStreamTask *pTask) { - ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1); - if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) { + if (pTask->taskLevel == TASK_LEVEL__AGG) { ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); } SEncoder encoder; @@ -548,7 +547,7 @@ int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea SArray *pTasks = taosArrayGetP(pStream->tasks, i); int32_t sz = taosArrayGetSize(pTasks); SStreamTask *pTask = taosArrayGetP(pTasks, 0); - if (!pTask->isDataScan && pTask->execType != TASK_EXEC__NONE) { + if (pTask->taskLevel == TASK_LEVEL__AGG) { ASSERT(sz == 1); if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) { return -1; @@ -564,8 +563,8 @@ int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea int32_t sz = taosArrayGetSize(pTasks); for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pTasks, j); - if (!pTask->isDataScan) break; - ASSERT(pTask->execType != TASK_EXEC__NONE); + if (pTask->taskLevel != TASK_LEVEL__SOURCE) break; + ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) { return -1; } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 2561031bac..cda4663285 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -110,9 +110,6 @@ static int32_t sndProcessTaskDeployReq(SSnode *pNode, SRpcMsg *pMsg) { pTask->pMsgCb = &pNode->msgCb; - ASSERT(pTask->execType != TASK_EXEC__NONE); - - ASSERT(pTask->isDataScan == 0); pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL); ASSERT(pTask->exec.executor); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 84a1191ef3..62e37f048e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -604,8 +604,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { int32_t code = 0; - ASSERT(pTask->isDataScan == 0 || pTask->isDataScan == 1); - if (pTask->isDataScan == 0 && pTask->sinkType == TASK_SINK__NONE) { + + if (pTask->taskLevel == TASK_LEVEL__AGG) { ASSERT(taosArrayGetSize(pTask->childEpInfo) != 0); } @@ -624,32 +624,30 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { pTask->pMsgCb = &pTq->pVnode->msgCb; - // exec - if (pTask->execType != TASK_EXEC__NONE) { - // expand runners - if (pTask->isDataScan) { - SReadHandle handle = { - .meta = pTq->pVnode->pMeta, - .vnode = pTq->pVnode, - .initTqReader = 1, - }; - pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); - } else { - SReadHandle mgHandle = { - .vnode = NULL, - .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), - }; - pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); - } + // expand executor + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { + SReadHandle handle = { + .meta = pTq->pVnode->pMeta, + .vnode = pTq->pVnode, + .initTqReader = 1, + }; + pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); + ASSERT(pTask->exec.executor); + } else if (pTask->taskLevel == TASK_LEVEL__AGG) { + SReadHandle mgHandle = { + .vnode = NULL, + .numOfVgroups = (int32_t)taosArrayGetSize(pTask->childEpInfo), + }; + pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &mgHandle); ASSERT(pTask->exec.executor); } // sink /*pTask->ahandle = pTq->pVnode;*/ - if (pTask->sinkType == TASK_SINK__SMA) { + if (pTask->outputType == TASK_OUTPUT__SMA) { pTask->smaSink.vnode = pTq->pVnode; pTask->smaSink.smaSink = smaHandleRes; - } else if (pTask->sinkType == TASK_SINK__TABLE) { + } else if (pTask->outputType == TASK_OUTPUT__TABLE) { pTask->tbSink.vnode = pTq->pVnode; pTask->tbSink.tbSinkFunc = tqTableSink; @@ -715,7 +713,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { pIter = taosHashIterate(pTq->pStreamTasks, pIter); if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; - if (!pTask->isDataScan) continue; + if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; qDebug("data submit enqueue stream task: %d, ver: %" PRId64, pTask->taskId, ver); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index f20c7e7e55..5017893853 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -416,7 +416,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { pIter = taosHashIterate(pTq->pStreamTasks, pIter); if (pIter == NULL) break; SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->isDataScan) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { int32_t code = qUpdateQualifiedTableId(pTask->exec.executor, tbUidList, isAdd); ASSERT(code == 0); } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 82da396b30..30f0919cee 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -65,7 +65,7 @@ void streamSchedByTimer(void* param, void* tmrId) { } trigger->pBlock->info.type = STREAM_GET_ALL; - atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE); + atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); streamTaskInput(pTask, (SStreamQueueItem*)trigger); streamSchedExec(pTask); @@ -77,7 +77,7 @@ void streamSchedByTimer(void* param, void* tmrId) { int32_t streamSetupTrigger(SStreamTask* pTask) { if (pTask->triggerParam != 0) { pTask->timer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); - pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; + pTask->triggerStatus = TASK_TRIGGER_STATUS__INACTIVE; } return 0; } @@ -186,7 +186,7 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S if (exec) { streamTryExec(pTask); - if (pTask->dispatchType != TASK_DISPATCH__NONE) { + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamDispatch(pTask); } } else { @@ -201,7 +201,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { qDebug("task %d receive dispatch rsp", pTask->taskId); - if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); qDebug("task %d is shuffle, left waiting rsp %d", pTask->taskId, leftRsp); if (leftRsp > 0) return 0; @@ -222,7 +222,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp) { int32_t streamProcessRunReq(SStreamTask* pTask) { streamTryExec(pTask); - if (pTask->dispatchType != TASK_DISPATCH__NONE) { + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamDispatch(pTask); } return 0; @@ -250,7 +250,7 @@ int32_t streamProcessRecoverRsp(SStreamTask* pTask, SStreamTaskRecoverRsp* pRsp) streamProcessRunReq(pTask); - if (pTask->isDataScan) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE) { // scan data to recover pTask->inputStatus = TASK_INPUT_STATUS__RECOVER; pTask->taskStatus = TASK_STATUS__RECOVERING; @@ -272,12 +272,11 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, S streamTaskEnqueueRetrieve(pTask, pReq, pRsp); - ASSERT(pTask->execType != TASK_EXEC__NONE); + ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); streamSchedExec(pTask); /*streamTryExec(pTask);*/ - /*ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);*/ /*streamDispatch(pTask);*/ return 0; diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index ef7c10c8e1..66e689dd3e 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -242,7 +242,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat int32_t blockNum = taosArrayGetSize(pData->blocks); ASSERT(blockNum != 0); - if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq req = { .streamId = pTask->streamId, .dataSrcVgId = pData->srcVgId, @@ -282,7 +282,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat taosArrayDestroy(req.dataLen); return code; - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t rspCnt = atomic_load_32(&pTask->shuffleDispatcher.waitingRspCnt); ASSERT(rspCnt == 0); @@ -393,11 +393,11 @@ int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, int32_t vgId = 0; int32_t downstreamTaskId = 0; // find ep - if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { vgId = pTask->fixedEpDispatcher.nodeId; *ppEpSet = &pTask->fixedEpDispatcher.epSet; downstreamTaskId = pTask->fixedEpDispatcher.taskId; - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { // TODO get ctbName for each block SSDataBlock* pBlock = taosArrayGet(data->blocks, 0); char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId); @@ -439,8 +439,7 @@ FAIL: } int32_t streamDispatch(SStreamTask* pTask) { - ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE); - ASSERT(pTask->sinkType == TASK_SINK__NONE); + ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 79c35f2889..e662c18a15 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -24,7 +24,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) SStreamTrigger* pTrigger = (SStreamTrigger*)data; qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { - ASSERT(pTask->isDataScan); + ASSERT(pTask->taskLevel == TASK_LEVEL__SOURCE); SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; qDebug("task %d %p set submit input %p %p %d 1", pTask->taskId, pTask, pSubmit, pSubmit->data, *pSubmit->dataRef); qSetMultiStreamInput(exec, pSubmit->data, 1, STREAM_INPUT__DATA_SUBMIT); @@ -92,7 +92,7 @@ static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock } int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { - ASSERT(pTask->execType != TASK_EXEC__NONE); + ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); void* exec = pTask->exec.executor; @@ -139,8 +139,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { return -1; } - if (pTask->dispatchType != TASK_DISPATCH__NONE) { - ASSERT(pTask->sinkType == TASK_SINK__NONE); + if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamDispatch(pTask); } } @@ -161,7 +160,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (data == NULL) { data = qItem; streamQueueProcessSuccess(pTask->inputQueue); - if (pTask->execType == TASK_EXEC__NONE) { + if (pTask->taskLevel == TASK_LEVEL__SINK) { break; } } else { @@ -187,7 +186,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { break; } - if (pTask->execType == TASK_EXEC__NONE) { + if (pTask->taskLevel == TASK_LEVEL__SINK) { ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); streamTaskOutput(pTask, data); continue; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index be9dc81c3c..7dfeefb261 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -52,15 +52,16 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - + return pMeta; _err: - return NULL; } void streamMetaClose(SStreamMeta* pMeta) { - // - return; + tdbCommit(pMeta->db, &pMeta->txn); + tdbTbClose(pMeta->pTaskDb); + tdbTbClose(pMeta->pStateDb); + tdbClose(pMeta->db); } int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { @@ -123,13 +124,32 @@ int32_t streamMetaCommit(SStreamMeta* pMeta) { if (tdbCommit(pMeta->db, &pMeta->txn) < 0) { return -1; } + memset(&pMeta->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + if (tdbBegin(pMeta->db, &pMeta->txn) < 0) { + return -1; + } return 0; } -int32_t streamMetaRollBack(SStreamMeta* pMeta) { - // TODO tdb rollback +int32_t streamMetaAbort(SStreamMeta* pMeta) { + if (tdbAbort(pMeta->db, &pMeta->txn) < 0) { + return -1; + } + memset(&pMeta->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pMeta->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + if (tdbBegin(pMeta->db, &pMeta->txn) < 0) { + return -1; + } return 0; } + int32_t streamRestoreTask(SStreamMeta* pMeta) { TBC* pCur = NULL; if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { @@ -153,6 +173,18 @@ int32_t streamRestoreTask(SStreamMeta* pMeta) { tDecoderInit(&decoder, (uint8_t*)pVal, vLen); tDecodeSStreamTask(&decoder, pTask); tDecoderClear(&decoder); + + if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { + return -1; + } + + if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + return -1; + } + } + + if (tdbTbcClose(pCur) < 0) { + return -1; } return 0; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index dec23cd151..3530c05688 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -88,14 +88,15 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp } int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) { +#if 0 if (pTask->taskStatus != TASK_STATUS__FAIL) { return 0; } if (pTask->isStreamDistributed) { - if (pTask->isDataScan) { + if (pTask->taskType == TASK_TYPE__SOURCE) { pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER; - } else if (pTask->execType != TASK_EXEC__NONE) { + } else if (pTask->taskType != TASK_TYPE__SINK) { pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER; bool hasCheckpoint = false; int32_t childSz = taosArrayGetSize(pTask->childEpInfo); @@ -113,7 +114,7 @@ int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* } } } else { - if (pTask->isDataScan) { + if (pTask->taskType == TASK_TYPE__SOURCE) { if (pTask->checkpointVer != -1) { // load from checkpoint } else { @@ -133,5 +134,6 @@ int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* } } +#endif return 0; } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index c4e946e191..3a54981989 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -52,10 +52,8 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { /*if (tStartEncode(pEncoder) < 0) return -1;*/ if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->isDataScan) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->execType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->sinkType) < 0) return -1; - if (tEncodeI8(pEncoder, pTask->dispatchType) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1; + if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->taskStatus) < 0) return -1; @@ -73,27 +71,23 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1; } - if (pTask->execType != TASK_EXEC__NONE) { + if (pTask->taskLevel != TASK_LEVEL__SINK) { if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1; } - if (pTask->sinkType == TASK_SINK__TABLE) { + if (pTask->outputType == TASK_OUTPUT__TABLE) { if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1; if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; - } else if (pTask->sinkType == TASK_SINK__SMA) { + } else if (pTask->outputType == TASK_OUTPUT__SMA) { if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1; - } else if (pTask->sinkType == TASK_SINK__FETCH) { + } else if (pTask->outputType == TASK_OUTPUT__FETCH) { if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1; - } else { - ASSERT(pTask->sinkType == TASK_SINK__NONE); - } - - if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; } @@ -107,10 +101,8 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { /*if (tStartDecode(pDecoder) < 0) return -1;*/ if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->isDataScan) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->execType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->sinkType) < 0) return -1; - if (tDecodeI8(pDecoder, &pTask->dispatchType) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1; + if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->taskStatus) < 0) return -1; @@ -131,29 +123,25 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { taosArrayPush(pTask->childEpInfo, &pInfo); } - if (pTask->execType != TASK_EXEC__NONE) { + if (pTask->taskLevel != TASK_LEVEL__SINK) { if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1; } - if (pTask->sinkType == TASK_SINK__TABLE) { + if (pTask->outputType == TASK_OUTPUT__TABLE) { if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1; pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper)); if (pTask->tbSink.pSchemaWrapper == NULL) return -1; if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1; - } else if (pTask->sinkType == TASK_SINK__SMA) { + } else if (pTask->outputType == TASK_OUTPUT__SMA) { if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1; - } else if (pTask->sinkType == TASK_SINK__FETCH) { + } else if (pTask->outputType == TASK_OUTPUT__FETCH) { if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1; - } else { - ASSERT(pTask->sinkType == TASK_SINK__NONE); - } - - if (pTask->dispatchType == TASK_DISPATCH__FIXED) { + } else if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->fixedEpDispatcher.nodeId) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->fixedEpDispatcher.epSet) < 0) return -1; - } else if (pTask->dispatchType == TASK_DISPATCH__SHUFFLE) { + } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1; if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1; }