From 9394e3380f6f4d8da734bc0789a2ab0a10dd5648 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 15 Jul 2022 17:48:48 +0800 Subject: [PATCH] feat(stream): recover from failure --- include/common/tcommon.h | 10 +- include/common/tmsg.h | 29 ++++- include/common/tmsgdef.h | 1 + include/libs/executor/executor.h | 2 + include/libs/stream/tstream.h | 49 +++++++- source/common/src/tmsg.c | 39 +++++- source/dnode/mnode/impl/inc/mndDef.h | 18 +++ source/dnode/mnode/impl/inc/mndTrans.h | 1 + source/dnode/mnode/impl/src/mndDef.c | 2 + source/dnode/mnode/impl/src/mndScheduler.c | 1 + source/dnode/mnode/impl/src/mndStream.c | 66 ++++++++++- source/dnode/mnode/sdb/inc/sdb.h | 25 ++-- source/libs/executor/inc/executorimpl.h | 10 ++ source/libs/executor/src/executorMain.c | 9 ++ source/libs/executor/src/scanoperator.c | 33 ++++-- source/libs/stream/src/stream.c | 9 +- source/libs/stream/src/streamData.c | 2 +- source/libs/stream/src/streamExec.c | 27 ++++- source/libs/stream/src/streamRecover.c | 131 +++++++++++++++++++++ source/libs/stream/src/streamTask.c | 2 + 20 files changed, 423 insertions(+), 43 deletions(-) create mode 100644 source/libs/stream/src/streamRecover.c diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 5a7b32d20b..444a17c7f8 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -57,7 +57,7 @@ enum { // STREAM_INPUT__TABLE_SCAN, STREAM_INPUT__TQ_SCAN, STREAM_INPUT__DATA_RETRIEVE, - STREAM_INPUT__TRIGGER, + STREAM_INPUT__GET_RES, STREAM_INPUT__CHECKPOINT, STREAM_INPUT__DROP, }; @@ -155,10 +155,10 @@ typedef struct SQueryTableDataCond { int32_t numOfCols; SColumnInfo* colList; int32_t type; // data block load type: -// int32_t numOfTWindows; - STimeWindow twindows; - int64_t startVersion; - int64_t endVersion; + // int32_t numOfTWindows; + STimeWindow twindows; + int64_t startVersion; + int64_t endVersion; } SQueryTableDataCond; int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index c8e13fce3d..49feba4d3c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1968,7 +1968,7 @@ typedef struct SVCreateTbReq { int8_t type; union { struct { - char* name; // super table name + char* name; // super table name tb_uid_t suid; SArray* tagName; uint8_t* pTag; @@ -2437,9 +2437,6 @@ typedef struct { int8_t igNotExists; } SMDropStreamReq; -int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq); -int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq); - typedef struct { int8_t reserved; } SMDropStreamRsp; @@ -2454,6 +2451,27 @@ typedef struct { int8_t reserved; } SVDropStreamTaskRsp; +int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq); +int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq); + +typedef struct { + char name[TSDB_STREAM_FNAME_LEN]; + int8_t igNotExists; +} SMRecoverStreamReq; + +typedef struct { + int8_t reserved; +} SMRecoverStreamRsp; + +typedef struct { + int64_t recoverObjUid; + int32_t taskId; + int32_t hasCheckPoint; +} SMVStreamGatherInfoReq; + +int32_t tSerializeSMRecoverStreamReq(void* buf, int32_t bufLen, const SMRecoverStreamReq* pReq); +int32_t tDeserializeSMRecoverStreamReq(void* buf, int32_t bufLen, SMRecoverStreamReq* pReq); + typedef struct { int64_t leftForVer; int32_t vgId; @@ -2876,7 +2894,8 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp } static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) { - buf = taosDecodeFixedI64(buf, &pRsp->reqOffset);buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); + buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); + buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI16(buf, &pRsp->resMsgType); buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen); buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen); diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 488bc6346e..a24eda33bb 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -131,6 +131,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_CREATE_STREAM, "create-stream", SCMCreateStreamReq, SCMCreateStreamRsp) TD_DEF_MSG_TYPE(TDMT_MND_ALTER_STREAM, "alter-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_STREAM, "drop-stream", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_RECOVER_STREAM, "recover-stream", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_CREATE_INDEX, "create-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index dd64c5bf71..8b0a836ad2 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -192,6 +192,8 @@ int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner); int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem); +int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer); + #ifdef __cplusplus } #endif diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 071c539ff3..d03fab0ebc 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -34,6 +34,10 @@ typedef struct SStreamTask SStreamTask; enum { TASK_STATUS__NORMAL = 0, TASK_STATUS__DROPPING, + TASK_STATUS__FAIL, + TASK_STATUS__STOP, + TASK_STATUS__PREPARE_RECOVER, + TASK_STATUS__RECOVERING, }; enum { @@ -72,6 +76,7 @@ typedef struct { int8_t type; int32_t srcVgId; + int32_t childId; int64_t sourceVer; SArray* blocks; // SArray @@ -222,6 +227,8 @@ typedef struct { int32_t nodeId; int32_t childId; int32_t taskId; + int64_t checkpointVer; + int64_t processedVer; SEpSet epSet; } SStreamChildEpInfo; @@ -232,6 +239,7 @@ typedef struct SStreamTask { int8_t execType; int8_t sinkType; int8_t dispatchType; + int8_t isStreamDistributed; int16_t dispatchMsgType; int8_t taskStatus; @@ -242,6 +250,13 @@ typedef struct SStreamTask { int32_t nodeId; SEpSet epSet; + // used for semi or single task, + // while final task should have processedVer for each child + int64_t recoverSnapVer; + int64_t startVer; + int64_t checkpointVer; + int64_t processedVer; + // children info SArray* childEpInfo; // SArray @@ -316,12 +331,12 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem } else if (pItem->type == STREAM_INPUT__CHECKPOINT) { taosWriteQitem(pTask->inputQueue->queue, pItem); // qStreamInput(pTask->exec.executor, pItem); - } else if (pItem->type == STREAM_INPUT__TRIGGER) { + } else if (pItem->type == STREAM_INPUT__GET_RES) { taosWriteQitem(pTask->inputQueue->queue, pItem); // qStreamInput(pTask->exec.executor, pItem); } - if (pItem->type != STREAM_INPUT__TRIGGER && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { + if (pItem->type != STREAM_INPUT__GET_RES && pItem->type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE); } @@ -420,6 +435,36 @@ typedef struct { int8_t inputStatus; } SStreamTaskRecoverRsp; +int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq); +int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq); + +int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp); +int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pRsp); + +typedef struct { + int64_t streamId; + int32_t taskId; +} SMStreamTaskRecoverReq; + +typedef struct { + int64_t streamId; + int32_t taskId; +} SMStreamTaskRecoverRsp; + +int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq); +int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq); + +int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp); +int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pRsp); + +typedef struct { + int64_t streamId; +} SPStreamTaskRecoverReq; + +typedef struct { + int8_t reserved; +} SPStreamTaskRecoverRsp; + int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq); int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 9ebfa78b80..788f2cdfba 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4823,6 +4823,35 @@ int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq * return 0; } +int32_t tSerializeSMRecoverStreamReq(void *buf, int32_t bufLen, const SMRecoverStreamReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->name) < 0) return -1; + if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSMRecoverStreamReq(void *buf, int32_t bufLen, SMRecoverStreamReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) { taosMemoryFreeClear(pReq->sql); taosMemoryFreeClear(pReq->ast); @@ -4945,8 +4974,8 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) { if (tEncodeTag(pCoder, (const STag *)pReq->ctb.pTag) < 0) return -1; int32_t len = taosArrayGetSize(pReq->ctb.tagName); if (tEncodeI32(pCoder, len) < 0) return -1; - for (int32_t i = 0; i < len; i++){ - char* name = taosArrayGet(pReq->ctb.tagName, i); + for (int32_t i = 0; i < len; i++) { + char *name = taosArrayGet(pReq->ctb.tagName, i); if (tEncodeCStr(pCoder, name) < 0) return -1; } } else if (pReq->type == TSDB_NORMAL_TABLE) { @@ -4982,9 +5011,9 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { int32_t len = 0; if (tDecodeI32(pCoder, &len) < 0) return -1; pReq->ctb.tagName = taosArrayInit(len, TSDB_COL_NAME_LEN); - if(pReq->ctb.tagName == NULL) return -1; - for (int32_t i = 0; i < len; i++){ - char name[TSDB_COL_NAME_LEN] = {0}; + if (pReq->ctb.tagName == NULL) return -1; + for (int32_t i = 0; i < len; i++) { + char name[TSDB_COL_NAME_LEN] = {0}; char *tmp = NULL; if (tDecodeCStr(pCoder, &tmp) < 0) return -1; strcpy(name, tmp); diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 7ac991451e..5cf6e26ee8 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -559,6 +559,7 @@ typedef struct { // info int64_t uid; int8_t status; + int8_t isDistributed; // config int8_t igExpired; int8_t trigger; @@ -586,6 +587,23 @@ typedef struct { int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj); int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj); +typedef struct { + char streamName[TSDB_STREAM_FNAME_LEN]; + int64_t uid; + int64_t streamUid; + SArray* childInfo; // SArray +} SStreamCheckpointObj; + +#if 0 +typedef struct { + int64_t uid; + int64_t streamId; + int8_t isDistributed; + int8_t status; + int8_t stage; +} SStreamRecoverObj; +#endif + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 1497bba11c..6e367fbe24 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -27,6 +27,7 @@ typedef enum { TRANS_STOP_FUNC_TEST = 2, TRANS_START_FUNC_MQ_REB = 3, TRANS_STOP_FUNC_MQ_REB = 4, + TRANS_FUNC_RECOVER_STREAM_STEP_NEXT = 5, } ETrnFunc; typedef enum { diff --git a/source/dnode/mnode/impl/src/mndDef.c b/source/dnode/mnode/impl/src/mndDef.c index 727b20ef8a..c26424e049 100644 --- a/source/dnode/mnode/impl/src/mndDef.c +++ b/source/dnode/mnode/impl/src/mndDef.c @@ -27,6 +27,7 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; + if (tEncodeI8(pEncoder, pObj->isDistributed) < 0) return -1; if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1; if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; @@ -72,6 +73,7 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; + if (tDecodeI8(pDecoder, &pObj->isDistributed) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index aa6e7192fc..ec0ea90f46 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -319,6 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); ASSERT(totLevel <= 2); pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); + pStream->isDistributed = totLevel == 2; bool hasExtraSink = false; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5777df4fa6..8248a26c28 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -36,7 +36,7 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); -/*static int32_t mndProcessDropStreamInRsp(SRpcMsg *pRsp);*/ +static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq); static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); @@ -55,6 +55,7 @@ int32_t mndInitStream(SMnode *pMnode) { mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); + mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); @@ -672,6 +673,69 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { return TSDB_CODE_ACTION_IN_PROGRESS; } +static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + SStreamObj *pStream = NULL; + /*SDbObj *pDb = NULL;*/ + /*SUserObj *pUser = NULL;*/ + + SMRecoverStreamReq recoverReq = {0}; + if (tDeserializeSMRecoverStreamReq(pReq->pCont, pReq->contLen, &recoverReq) < 0) { + ASSERT(0); + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + pStream = mndAcquireStream(pMnode, recoverReq.name); + + if (pStream == NULL) { + if (recoverReq.igNotExists) { + mDebug("stream:%s, not exist, ignore not exist is set", recoverReq.name); + sdbRelease(pMnode->pSdb, pStream); + return 0; + } else { + terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; + return -1; + } + } + + if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { + return -1; + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq); + if (pTrans == NULL) { + mError("stream:%s, failed to recover since %s", recoverReq.name, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + mDebug("trans:%d, used to drop stream:%s", pTrans->id, recoverReq.name); + + // broadcast to recover all tasks + if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) { + mError("stream:%s, failed to recover task since %s", recoverReq.name, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + // update stream status + if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { + sdbRelease(pMnode->pSdb, pStream); + return -1; + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare recover stream trans since %s", pTrans->id, terrstr()); + sdbRelease(pMnode->pSdb, pStream); + mndTransDrop(pTrans); + return -1; + } + + sdbRelease(pMnode->pSdb, pStream); + + return TSDB_CODE_ACTION_IN_PROGRESS; +} + int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index be56d901de..4f2486d551 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -137,17 +137,18 @@ typedef enum { SDB_USER = 7, SDB_AUTH = 8, SDB_ACCT = 9, - SDB_STREAM = 10, - SDB_OFFSET = 11, - SDB_SUBSCRIBE = 12, - SDB_CONSUMER = 13, - SDB_TOPIC = 14, - SDB_VGROUP = 15, - SDB_SMA = 16, - SDB_STB = 17, - SDB_DB = 18, - SDB_FUNC = 19, - SDB_MAX = 20 + SDB_STREAM_CK = 10, + SDB_STREAM = 11, + SDB_OFFSET = 12, + SDB_SUBSCRIBE = 13, + SDB_CONSUMER = 14, + SDB_TOPIC = 15, + SDB_VGROUP = 16, + SDB_SMA = 17, + SDB_STB = 18, + SDB_DB = 19, + SDB_FUNC = 20, + SDB_MAX = 21 } ESdbType; typedef struct SSdbRaw { @@ -309,7 +310,7 @@ void sdbRelease(SSdb *pSdb, void *pObj); * @return void* The next iterator of the table. */ void *sdbFetch(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj); -void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStatus *status) ; +void *sdbFetchAll(SSdb *pSdb, ESdbType type, void *pIter, void **ppObj, ESdbStatus *status); /** * @brief Cancel a traversal diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 1b11941849..99845713a9 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -139,6 +139,12 @@ typedef struct STaskIdInfo { char* str; } STaskIdInfo; +enum { + STREAM_RECOVER_STEP__NONE = 0, + STREAM_RECOVER_STEP__PREPARE, + STREAM_RECOVER_STEP__SCAN, +}; + typedef struct { //TODO remove prepareStatus STqOffsetVal prepareStatus; // for tmq @@ -147,6 +153,10 @@ typedef struct { SSDataBlock* pullOverBlk; // for streaming SWalFilterCond cond; int64_t lastScanUid; + int8_t recoverStep; + SQueryTableDataCond tableCond; + int64_t recoverStartVer; + int64_t recoverEndVer; } SStreamTaskInfo; typedef struct SExecTaskInfo { diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index d910b8be34..37d2521e5d 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -261,6 +261,15 @@ int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) { } #endif +int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer) { + SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; + ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM); + pTaskInfo->streamInfo.recoverStartVer = startVer; + pTaskInfo->streamInfo.recoverEndVer = endVer; + pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE; + return 0; +} + void* qExtractReaderFromStreamScanner(void* scanner) { SStreamScanInfo* pInfo = scanner; return (void*)pInfo->tqReader; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f5b6b0c7a6..dcb34c621d 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -517,14 +517,12 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, pInfo->currentGroupId); - int32_t code; - if (pInfo->dataReader != NULL) { - code = tsdbReaderReset(pInfo->dataReader, &pInfo->cond); - ASSERT(code == 0); - } else { - code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader, - GET_TASKID(pTaskInfo)); - ASSERT(code == 0); + tsdbReaderClose(pInfo->dataReader); + + int32_t code = tsdbReaderOpen(pInfo->readHandle.vnode, &pInfo->cond, tableList, (STsdbReader**)&pInfo->dataReader, + GET_TASKID(pTaskInfo)); + if (code != 0) { + // TODO } } @@ -1264,6 +1262,24 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { return NULL; } + if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__PREPARE) { + STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; + memcpy(&pTSInfo->cond, &pTaskInfo->streamInfo.tableCond, sizeof(SQueryTableDataCond)); + pTSInfo->scanTimes = 0; + pTSInfo->currentGroupId = -1; + pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__SCAN; + } + + if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) { + SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp); + if (pBlock != NULL) { + return pBlock; + } + // TODO fill in bloom filter + pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__NONE; + return NULL; + } + size_t total = taosArrayGetSize(pInfo->pBlockLists); // TODO: refactor if (pInfo->blockType == STREAM_INPUT__DATA_BLOCK) { @@ -1556,6 +1572,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys goto _error; } taosArrayDestroy(tableIdList); + memcpy(&pTaskInfo->streamInfo.tableCond, &pTSInfo->cond, sizeof(SQueryTableDataCond)); } // create the pseduo columns info diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 566d9209a8..2a96db3bfc 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -57,7 +57,7 @@ void streamTriggerByTimer(void* param, void* tmrId) { if (atomic_load_8(&pTask->triggerStatus) == TASK_TRIGGER_STATUS__ACTIVE) { SStreamTrigger* trigger = taosAllocateQitem(sizeof(SStreamTrigger), DEF_QITEM); if (trigger == NULL) return; - trigger->type = STREAM_INPUT__TRIGGER; + trigger->type = STREAM_INPUT__GET_RES; trigger->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (trigger->pBlock == NULL) { taosFreeQitem(trigger); @@ -183,8 +183,11 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S // 2.1. idle: exec // 2.2. executing: return // 2.3. closing: keep trying +#if 0 if (pTask->execType != TASK_EXEC__NONE) { - streamExec(pTask, pTask->pMsgCb); +#endif + streamExec(pTask, pTask->pMsgCb); +#if 0 } else { ASSERT(pTask->sinkType != TASK_SINK__NONE); while (1) { @@ -195,11 +198,13 @@ int32_t streamProcessDispatchReq(SStreamTask* pTask, SStreamDispatchReq* pReq, S } } } +#endif // 3. handle output // 3.1 check and set status // 3.2 dispatch / sink if (pTask->dispatchType != TASK_DISPATCH__NONE) { + ASSERT(pTask->sinkType == TASK_SINK__NONE); streamDispatch(pTask, pTask->pMsgCb); } diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 8e16b23e56..dbf6350c93 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -112,7 +112,7 @@ int32_t streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { void streamFreeQitem(SStreamQueueItem* data) { int8_t type = data->type; - if (type == STREAM_INPUT__TRIGGER) { + if (type == STREAM_INPUT__GET_RES) { blockDataDestroy(((SStreamTrigger*)data)->pBlock); taosFreeQitem(data); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 1e46a9622b..cffd519d2a 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -20,7 +20,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) // set input SStreamQueueItem* pItem = (SStreamQueueItem*)data; - if (pItem->type == STREAM_INPUT__TRIGGER) { + if (pItem->type == STREAM_INPUT__GET_RES) { SStreamTrigger* pTrigger = (SStreamTrigger*)data; qSetMultiStreamInput(exec, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK, false); } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { @@ -73,6 +73,15 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) return 0; } +static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock* pBlock) { + ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); + int32_t childId = pBlock->childId; + int64_t ver = pBlock->sourceVer; + SStreamChildEpInfo* pChildInfo = taosArrayGetP(pTask->childEpInfo, childId); + pChildInfo->processedVer = ver; + return 0; +} + static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { int32_t cnt = 0; void* data = NULL; @@ -84,14 +93,17 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { } if (data == NULL) { data = qItem; + if (qItem->type == STREAM_INPUT__DATA_BLOCK) { + /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ + } streamQueueProcessSuccess(pTask->inputQueue); - continue; } else { if (streamAppendQueueItem(data, qItem) < 0) { streamQueueProcessFail(pTask->inputQueue); break; } else { cnt++; + /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ streamQueueProcessSuccess(pTask->inputQueue); taosArrayDestroy(((SStreamDataBlock*)qItem)->blocks); taosFreeQitem(qItem); @@ -106,6 +118,12 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { if (data == NULL) return pRes; + if (pTask->execType == TASK_EXEC__NONE) { + ASSERT(((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_BLOCK); + streamTaskOutput(pTask, data); + return pRes; + } + qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); streamTaskExecImpl(pTask, data, pRes); qDebug("stream task %d exec end", pTask->taskId); @@ -125,6 +143,11 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { taosFreeQitem(qRes); return NULL; } + if (((SStreamQueueItem*)data)->type == STREAM_INPUT__DATA_SUBMIT) { + SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; + qRes->childId = pTask->selfChildId; + qRes->sourceVer = pSubmit->ver; + } /*streamQueueProcessSuccess(pTask->inputQueue);*/ pRes = taosArrayInit(0, sizeof(SSDataBlock)); } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c new file mode 100644 index 0000000000..4175f60446 --- /dev/null +++ b/source/libs/stream/src/streamRecover.c @@ -0,0 +1,131 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "streamInc.h" + +int32_t tEncodeStreamTaskRecoverReq(SEncoder* pEncoder, const SStreamTaskRecoverReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->sourceTaskId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->sourceVg) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamTaskRecoverReq(SDecoder* pDecoder, SStreamTaskRecoverReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->sourceTaskId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->sourceVg) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeStreamTaskRecoverRsp(SEncoder* pEncoder, const SStreamTaskRecoverRsp* pRsp) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->inputStatus) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamTaskRecoverRsp* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + if (tDecodeI8(pDecoder, &pReq->inputStatus) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeSMStreamTaskRecoverReq(SEncoder* pEncoder, const SMStreamTaskRecoverReq* pReq) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeSMStreamTaskRecoverReq(SDecoder* pDecoder, SMStreamTaskRecoverReq* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t tEncodeSMStreamTaskRecoverRsp(SEncoder* pEncoder, const SMStreamTaskRecoverRsp* pRsp) { + if (tStartEncode(pEncoder) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1; + tEndEncode(pEncoder); + return pEncoder->pos; +} + +int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp* pReq) { + if (tStartDecode(pDecoder) < 0) return -1; + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; + tEndDecode(pDecoder); + return 0; +} + +int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) { + if (pTask->taskStatus != TASK_STATUS__FAIL) { + return 0; + } + + if (pTask->isStreamDistributed) { + if (pTask->isDataScan) { + pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER; + } else if (pTask->execType != TASK_EXEC__NONE) { + pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER; + bool hasCheckpoint = false; + int32_t childSz = taosArrayGetSize(pTask->childEpInfo); + for (int32_t i = 0; i < childSz; i++) { + SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i); + if (pEpInfo->checkpointVer == -1) { + hasCheckpoint = true; + break; + } + } + if (hasCheckpoint) { + // load from checkpoint + } else { + // recover child + } + } + } else { + if (pTask->isDataScan) { + if (pTask->checkpointVer != -1) { + // load from checkpoint + } else { + // reset stream query task info + // TODO get snapshot ver + pTask->recoverSnapVer = -1; + qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer); + pTask->taskStatus = TASK_STATUS__RECOVERING; + } + } + } + + if (pTask->taskStatus == TASK_STATUS__RECOVERING) { + streamProcessRunReq(pTask); + } + return 0; +} diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7488c009bd..2d15c31bf1 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -34,6 +34,7 @@ int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1; + if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1; return 0; } @@ -42,6 +43,7 @@ int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) { if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1; + if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1; return 0; }