diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index d03fab0ebc..5c4d8ce250 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -31,6 +31,11 @@ extern "C" { typedef struct SStreamTask SStreamTask; +enum { + STREAM_STATUS__NORMAL = 0, + STREAM_STATUS__RECOVER, +}; + enum { TASK_STATUS__NORMAL = 0, TASK_STATUS__DROPPING, diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 8248a26c28..5e8867fdfb 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -177,7 +177,7 @@ static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pOldStream, SStream taosWLockLatch(&pOldStream->lock); - // TODO handle update + pOldStream->status = pNewStream->status; taosWUnLockLatch(&pOldStream->lock); return 0; @@ -395,6 +395,20 @@ int32_t mndPersistDropStreamLog(SMnode *pMnode, STrans *pTrans, SStreamObj *pStr return 0; } +static int32_t mndSetStreamRecover(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream) { + SStreamObj streamObj = {0}; + memcpy(streamObj.name, pStream->name, TSDB_STREAM_FNAME_LEN); + streamObj.status = STREAM_STATUS__RECOVER; + SSdbRaw *pCommitRaw = mndStreamActionEncode(&streamObj); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("stream trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + return 0; +} + static int32_t mndCreateStbForStream(SMnode *pMnode, STrans *pTrans, const SStreamObj *pStream, const char *user) { SStbObj *pStb = NULL; SDbObj *pDb = NULL; @@ -492,6 +506,76 @@ static int32_t mndPersistTaskDropReq(STrans *pTrans, SStreamTask *pTask) { return 0; } +static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) { + SMStreamTaskRecoverReq *pReq = taosMemoryCalloc(1, sizeof(SMStreamTaskRecoverReq)); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pReq->streamId = pTask->streamId; + pReq->taskId = pTask->taskId; + int32_t len; + int32_t code; + tEncodeSize(tEncodeSMStreamTaskRecoverReq, pReq, len, code); + if (code != 0) { + return -1; + } + void *buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); + if (buf == NULL) { + return -1; + } + void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + SEncoder encoder; + tEncoderInit(&encoder, abuf, len); + tEncodeSMStreamTaskRecoverReq(&encoder, pReq); + ((SMsgHead *)buf)->vgId = pTask->nodeId; + + STransAction action = {0}; + memcpy(&action.epSet, &pTask->epSet, sizeof(SEpSet)); + action.pCont = buf; + action.contLen = sizeof(SMsgHead) + len; + action.msgType = TDMT_STREAM_TASK_RECOVER; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(buf); + return -1; + } + return 0; +} + +int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { + if (pStream->isDistributed) { + int32_t lv = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < lv; i++) { + SArray *pTasks = taosArrayGetP(pStream->tasks, i); + int32_t sz = taosArrayGetSize(pTasks); + SStreamTask *pTask = taosArrayGetP(pTasks, 0); + if (!pTask->isDataScan && pTask->execType != TASK_EXEC__NONE) { + ASSERT(sz == 1); + if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) { + return -1; + } + } else { + continue; + } + } + } else { + int32_t lv = taosArrayGetSize(pStream->tasks); + for (int32_t i = 0; i < lv; i++) { + SArray *pTasks = taosArrayGetP(pStream->tasks, i); + int32_t sz = taosArrayGetSize(pTasks); + for (int32_t j = 0; j < sz; j++) { + SStreamTask *pTask = taosArrayGetP(pTasks, j); + if (!pTask->isDataScan) break; + ASSERT(pTask->execType != TASK_EXEC__NONE); + if (mndPersistTaskRecoverReq(pTrans, pTask) < 0) { + return -1; + } + } + } + } + return 0; +} + int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t lv = taosArrayGetSize(pStream->tasks); for (int32_t i = 0; i < lv; i++) { @@ -712,14 +796,14 @@ static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) { mDebug("trans:%d, used to drop stream:%s", pTrans->id, recoverReq.name); // broadcast to recover all tasks - if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + if (mndRecoverStreamTasks(pMnode, pTrans, pStream) < 0) { mError("stream:%s, failed to recover task since %s", recoverReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); return -1; } // update stream status - if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { + if (mndSetStreamRecover(pMnode, pTrans, pStream) < 0) { sdbRelease(pMnode->pSdb, pStream); return -1; } diff --git a/source/dnode/snode/inc/sndInt.h b/source/dnode/snode/inc/sndInt.h index 8916e2a31c..1f0019ef46 100644 --- a/source/dnode/snode/inc/sndInt.h +++ b/source/dnode/snode/inc/sndInt.h @@ -30,15 +30,6 @@ extern "C" { #endif -enum { - STREAM_STATUS__RUNNING = 1, - STREAM_STATUS__STOPPED, - STREAM_STATUS__CREATING, - STREAM_STATUS__STOPING, - STREAM_STATUS__RESTORING, - STREAM_STATUS__DELETING, -}; - typedef struct { SHashObj* pHash; // taskId -> SStreamTask } SStreamMeta; diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index f9f4e62774..d10ea76c83 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -33,6 +33,8 @@ typedef struct { static SStreamGlobalEnv streamEnv; int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb); +int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum); + int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb); int32_t streamDispatchReqToData(const SStreamDispatchReq* pReq, SStreamDataBlock* pData); int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cffd519d2a..9d22da8662 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -82,6 +82,63 @@ static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock return 0; } +int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { + ASSERT(pTask->execType != TASK_EXEC__NONE); + + void* exec = pTask->exec.executor; + + while (1) { + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + if (pRes == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + int32_t batchCnt = 0; + while (1) { + SSDataBlock* output = NULL; + uint64_t ts = 0; + if (qExecTask(exec, &output, &ts) < 0) { + ASSERT(0); + } + if (output == NULL) break; + + SSDataBlock block = {0}; + assignOneDataBlock(&block, output); + block.info.childId = pTask->selfChildId; + taosArrayPush(pRes, &block); + + if (++batchCnt >= batchNum) break; + } + if (taosArrayGetSize(pRes) == 0) { + taosArrayDestroy(pRes); + break; + } + SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); + if (qRes == NULL) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + return -1; + } + + qRes->type = STREAM_INPUT__DATA_BLOCK; + qRes->blocks = pRes; + qRes->childId = pTask->selfChildId; + + if (streamTaskOutput(pTask, qRes) < 0) { + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); + taosFreeQitem(qRes); + return -1; + } + + if (pTask->dispatchType != TASK_DISPATCH__NONE) { + ASSERT(pTask->sinkType == TASK_SINK__NONE); + streamDispatch(pTask, pTask->pMsgCb); + } + } + + return 0; +} + static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { int32_t cnt = 0; void* data = NULL; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 4175f60446..87b27daf60 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -125,7 +125,11 @@ int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* } if (pTask->taskStatus == TASK_STATUS__RECOVERING) { - streamProcessRunReq(pTask); + if (streamPipelineExec(pTask, 10) < 0) { + // set fail + return -1; + } } + return 0; }