From e947a2fb0de9881739b5241eddab4c676b18ba10 Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Wed, 7 Feb 2024 11:40:47 +0800 Subject: [PATCH] catalog cache get stream progress --- include/common/tmsg.h | 30 +++- include/common/tmsgdef.h | 1 + source/common/src/tmsg.c | 103 +++++++++++- source/dnode/mgmt/mgmt_snode/src/smHandle.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 1 + source/dnode/mnode/impl/src/mndSma.c | 18 ++- source/dnode/vnode/src/inc/vnd.h | 1 + source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/tq/tq.c | 38 +++++ source/dnode/vnode/src/tq/tqUtil.c | 4 +- source/dnode/vnode/src/vnd/vnodeQuery.c | 54 +++++++ source/dnode/vnode/src/vnd/vnodeSvr.c | 2 + source/libs/catalog/inc/catalogInt.h | 25 ++- source/libs/catalog/src/ctgAsync.c | 167 +++++++++++++++++--- source/libs/catalog/src/ctgCache.c | 10 +- source/libs/catalog/src/ctgRemote.c | 84 ++++++++++ source/libs/catalog/src/ctgUtil.c | 33 ++++ source/libs/qcom/src/querymsg.c | 29 ++++ tests/system-test/2-query/tsma.py | 2 +- 19 files changed, 568 insertions(+), 36 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f649b93ac2..70d16d9e1c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4222,6 +4222,12 @@ typedef struct { SArray* pTags; // SArray SArray* pUsedCols; // SArray char* ast; + + int64_t streamUid; + int64_t reqTs; + int64_t rspTs; + int64_t delayDuration; + bool fillHistoryFinished; } STableTSMAInfo; typedef struct { @@ -4229,7 +4235,7 @@ typedef struct { } STableTSMAInfoRsp; int32_t tSerializeTableTSMAInfoRsp(void* buf, int32_t bufLen, const STableTSMAInfoRsp* pRsp); -int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRsp* pReq); +int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRsp* pRsp); int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes); void tFreeTableTSMAInfo(void* p); void tFreeAndClearTableTSMAInfo(void* p); @@ -4240,6 +4246,28 @@ void tFreeTableTSMAInfoRsp(STableTSMAInfoRsp* pRsp); #define tDeserializeTSMAHbRsp tDeserializeTableTSMAInfoRsp #define tFreeTSMAHbRsp tFreeTableTSMAInfoRsp +typedef struct SStreamProgressReq { + int64_t streamId; + int32_t vgId; + int32_t fetchIdx; + int32_t subFetchIdx; +} SStreamProgressReq; + +int32_t tSerializeStreamProgressReq(void* buf, int32_t bufLen, const SStreamProgressReq* pReq); +int32_t tDeserializeStreamProgressReq(void* buf, int32_t bufLen, SStreamProgressReq* pReq); + +typedef struct SStreamProgressRsp { + int64_t streamId; + int32_t vgId; + bool fillHisFinished; + int64_t progressDelay; + int32_t fetchIdx; + int32_t subFetchIdx; +} SStreamProgressRsp; + +int32_t tSerializeStreamProgressRsp(void* buf, int32_t bufLen, const SStreamProgressRsp* pRsp); +int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgressRsp* pRsp); + #pragma pack(pop) #ifdef __cplusplus diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index b95693e0a5..0854d53699 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -364,6 +364,7 @@ TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_GET_STREAM_PROGRESS, "vnd-stream-progress", NULL, NULL) TD_CLOSE_MSG_SEG(TDMT_END_VND_STREAM_MSG) TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) //8 << 8 diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 5d25d8e330..cca094670d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -10004,6 +10004,11 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT } if (tEncodeCStr(pEncoder, pTsmaInfo->ast) < 0) return -1; + if (tEncodeI64(pEncoder, pTsmaInfo->streamUid) < 0) return -1; + if (tEncodeI64(pEncoder, pTsmaInfo->reqTs) < 0) return -1; + if (tEncodeI64(pEncoder, pTsmaInfo->rspTs) < 0) return -1; + if (tEncodeI64(pEncoder, pTsmaInfo->delayDuration) < 0) return -1; + if (tEncodeI8(pEncoder, pTsmaInfo->fillHistoryFinished) < 0) return -1; return 0; } @@ -10056,7 +10061,11 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf } } if (tDecodeCStrAlloc(pDecoder, &pTsmaInfo->ast) < 0) return -1; - + if (tDecodeI64(pDecoder, &pTsmaInfo->streamUid) < 0) return -1; + if (tDecodeI64(pDecoder, &pTsmaInfo->reqTs) < 0) return -1; + if (tDecodeI64(pDecoder, &pTsmaInfo->rspTs) < 0) return -1; + if (tDecodeI64(pDecoder, &pTsmaInfo->delayDuration) < 0) return -1; + if (tDecodeI8(pDecoder, (int8_t*)&pTsmaInfo->fillHistoryFinished) < 0) return -1; return 0; } @@ -10065,7 +10074,7 @@ static int32_t tEncodeTableTSMAInfoRsp(SEncoder *pEncoder, const STableTSMAInfoR if (tEncodeI32(pEncoder, size) < 0) return -1; for (int32_t i = 0; i < size; ++i) { STableTSMAInfo* pInfo = taosArrayGetP(pRsp->pTsmas, i); - if (tEncodeTableTSMAInfo(pEncoder, pInfo) < 0) return -1; + if (tEncodeTableTSMAInfo(pEncoder, pInfo) < 0) return -1; } return 0; } @@ -10166,3 +10175,93 @@ void tFreeTableTSMAInfoRsp(STableTSMAInfoRsp *pRsp) { taosArrayDestroyP(pRsp->pTsmas, tFreeAndClearTableTSMAInfo); } } + +static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) { + if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->fetchIdx) < 0) return -1; + if (tEncodeI32(pEncoder, pReq->subFetchIdx) < 0) return -1; + return 0; +} + +int32_t tSerializeStreamProgressReq(void* buf, int32_t bufLen, const SStreamProgressReq* pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeStreamProgressReq(&encoder, pReq) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +static int32_t tDecodeStreamProgressReq(SDecoder* pDecoder, SStreamProgressReq* pReq) { + if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->fetchIdx) < 0) return -1; + if (tDecodeI32(pDecoder, &pReq->subFetchIdx) < 0) return -1; + return 0; +} + +int32_t tDeserializeStreamProgressReq(void* buf, int32_t bufLen, SStreamProgressReq* pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, (char *)buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeStreamProgressReq(&decoder, pReq) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +static int32_t tEncodeStreamProgressRsp(SEncoder* pEncoder, const SStreamProgressRsp* pRsp) { + if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->vgId) < 0) return -1; + if (tEncodeI8(pEncoder, pRsp->fillHisFinished) < 0) return -1; + if (tEncodeI64(pEncoder, pRsp->progressDelay) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->fetchIdx) < 0) return -1; + if (tEncodeI32(pEncoder, pRsp->subFetchIdx) < 0) return -1; + return 0; +} + +int32_t tSerializeStreamProgressRsp(void* buf, int32_t bufLen, const SStreamProgressRsp* pRsp) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeStreamProgressRsp(&encoder, pRsp) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +static int32_t tDecodeStreamProgressRsp(SDecoder* pDecoder, SStreamProgressRsp* pRsp) { + if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1; + if (tDecodeI8(pDecoder, (int8_t*)&pRsp->fillHisFinished) < 0) return -1; + if (tDecodeI64(pDecoder, &pRsp->progressDelay) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->fetchIdx) < 0) return -1; + if (tDecodeI32(pDecoder, &pRsp->subFetchIdx) < 0) return -1; + return 0; +} + +int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgressRsp* pRsp) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeStreamProgressRsp(&decoder, pRsp) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} diff --git a/source/dnode/mgmt/mgmt_snode/src/smHandle.c b/source/dnode/mgmt/mgmt_snode/src/smHandle.c index 1b1dcc9b54..880e96adfb 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smHandle.c +++ b/source/dnode/mgmt/mgmt_snode/src/smHandle.c @@ -91,6 +91,7 @@ SArray *smGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER; code = 0; _OVER: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index c9c1cad643..a83e0ecad3 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -930,6 +930,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index 0aa57da20d..51d2b8a607 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -2035,8 +2035,8 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp SSmaObj * pSma = NULL; SSdb * pSdb = pMnode->pSdb; void * pIter = NULL; - - SStbObj *pStb = NULL; + SStreamObj * pStreamObj = NULL; + SStbObj * pStb = NULL; /* SStbObj *pStb = mndAcquireStb(pMnode, tbFName); if (NULL == pStb) { @@ -2060,6 +2060,19 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp continue; } + SName smaName; + char streamName[TSDB_TABLE_FNAME_LEN] = {0}; + tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE); + sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname); + pStreamObj = mndAcquireStream(pMnode, streamName); + if (!pStreamObj) { + sdbRelease(pSdb, pSma); + continue; + } + + int64_t streamId = pStreamObj->uid; + mndReleaseStream(pMnode, pStreamObj); + STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo)); if (!pTsma) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -2068,6 +2081,7 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp return code; } terrno = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma); + pTsma->streamUid = streamId; mndReleaseStb(pMnode, pStb); sdbRelease(pSdb, pSma); if (terrno) return code; diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index f9f4b603fd..304716744c 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -120,6 +120,7 @@ void vnodeQueryClose(SVnode* pVnode); int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct); int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg); +int32_t vnodeGetStreamProgress(SVnode* pVnode, SRpcMsg* pMsg, bool direct); // vnodeCommit.c int32_t vnodeBegin(SVnode* pVnode); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 13a13d81ca..d79565813b 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -294,6 +294,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg); +int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg); // sma int32_t smaInit(); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 30ca4c7a36..628ccd2d6d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1031,6 +1031,44 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { return 0; } +int32_t tqStreamProgressRetrieveReq(STQ *pTq, SRpcMsg *pMsg) { + char* msgStr = pMsg->pCont; + char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead)); + int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; + SStreamProgressReq req; + char* pRspBuf = taosMemoryCalloc(1, sizeof(SMsgHead) + sizeof(SStreamProgressRsp)); + SStreamProgressRsp* pRsp = POINTER_SHIFT(pRspBuf, sizeof(SMsgHead)); + if (!pRspBuf) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; + goto _OVER; + } + + code = tDeserializeStreamProgressReq(msgBody, msgLen, &req); + if (code == TSDB_CODE_SUCCESS) { + code = tqGetStreamExecInfo(pTq->pVnode, req.streamId, &pRsp->progressDelay, &pRsp->fillHisFinished); + } + if (code == TSDB_CODE_SUCCESS) { + pRsp->fetchIdx = req.fetchIdx; + pRsp->subFetchIdx = req.subFetchIdx; + pRsp->vgId = req.vgId; + pRsp->streamId = req.streamId; + tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp); + SRpcMsg rsp = {.info = pMsg->info, .code = 0}; + rsp.pCont = pRspBuf; + pRspBuf = NULL; + rsp.contLen = sizeof(SMsgHead) + sizeof(SStreamProgressRsp); + tmsgSendRsp(&rsp); + } + +_OVER: + if (pRspBuf) { + taosMemoryFree(pRspBuf); + } + return code; +} + int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 5e5c77265b..3e6a3d34fa 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -545,12 +545,12 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b int64_t latest = 0; code = walFetchHead(pReader, ver); - if (code != TSDB_CODE_SUCCESS) { + if (code == TSDB_CODE_SUCCESS) { cur = pReader->pHead->head.ingestTs; } code = walFetchHead(pReader, verRange.maxVer); - if (code != TSDB_CODE_SUCCESS) { + if (code == TSDB_CODE_SUCCESS) { latest = pReader->pHead->head.ingestTs; } diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index a85192546f..9a510ed362 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -323,6 +323,9 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) { case TDMT_VND_TABLE_CFG: vnodeGetTableCfg(pVnode, &reqMsg, false); break; + case TDMT_VND_GET_STREAM_PROGRESS: + vnodeGetStreamProgress(pVnode, &reqMsg, false); + break; default: qError("invalid req msgType %d", req->msgType); reqMsg.code = TSDB_CODE_INVALID_MSG; @@ -711,3 +714,54 @@ void *vnodeGetIvtIdx(void *pVnode) { int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid) { return tsdbGetTableSchema(((SVnode *)pVnode)->pMeta, uid, pSchema, suid); } + +int32_t vnodeGetStreamProgress(SVnode* pVnode, SRpcMsg* pMsg, bool direct) { + int32_t code = 0; + SStreamProgressReq req; + SStreamProgressRsp rsp = {0}; + SRpcMsg rpcMsg = {.info = pMsg->info, .code = 0}; + char * buf = NULL; + int32_t rspLen = 0; + code = tDeserializeStreamProgressReq(pMsg->pCont, pMsg->contLen, &req); + + if (code == TSDB_CODE_SUCCESS) { + rsp.fetchIdx = req.fetchIdx; + rsp.subFetchIdx = req.subFetchIdx; + rsp.vgId = req.vgId; + rsp.streamId = req.streamId; + rspLen = tSerializeStreamProgressRsp(0, 0, &rsp); + if (direct) { + buf = rpcMallocCont(rspLen); + } else { + buf = taosMemoryCalloc(1, rspLen); + } + if (!buf) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = -1; + goto _OVER; + } + } + + if (code == TSDB_CODE_SUCCESS) { + code = tqGetStreamExecInfo(pVnode, req.streamId, &rsp.progressDelay, &rsp.fillHisFinished); + } + if (code == TSDB_CODE_SUCCESS) { + tSerializeStreamProgressRsp(buf, rspLen, &rsp); + rpcMsg.pCont = buf; + buf = NULL; + rpcMsg.contLen = rspLen; + rpcMsg.code = code; + rpcMsg.msgType = pMsg->msgType; + if (direct) { + tmsgSendRsp(&rpcMsg); + } else { + *pMsg = rpcMsg; + } + } + +_OVER: + if (buf) { + taosMemoryFree(buf); + } + return code; +} diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 7a6b1e69d9..582026ccef 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -838,6 +838,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg); case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP: return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg); + case TDMT_VND_GET_STREAM_PROGRESS: + return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in stream queue", pMsg->msgType); return TSDB_CODE_APP_ERROR; diff --git a/source/libs/catalog/inc/catalogInt.h b/source/libs/catalog/inc/catalogInt.h index dbaf4b4aa7..819ba6362a 100644 --- a/source/libs/catalog/inc/catalogInt.h +++ b/source/libs/catalog/inc/catalogInt.h @@ -262,9 +262,25 @@ typedef struct SCtgViewsCtx { SArray* pFetchs; } SCtgViewsCtx; +typedef enum { + FETCH_TSMA_FOR_TB, + FETCH_PROGRESS_FOR_TSMA, +} CTG_TSMA_FETCH_TYPE; + +typedef struct SCtgTSMAFetch { + CTG_TSMA_FETCH_TYPE fetchType; + int32_t dbIdx; + int32_t tbIdx; + int32_t fetchIdx; + int32_t resIdx; + int32_t subFetchNum; + int32_t finishedSubFetchNum; + int32_t vgNum; +} SCtgTSMAFetch; + typedef struct SCtgTbTSMACtx { int32_t fetchNum; - SArray* pNames; + SArray* pNames; // SArray SArray* pResList; SArray* pFetches; } SCtgTbTSMACtx; @@ -1128,6 +1144,13 @@ int32_t ctgOpDropTbTSMA(SCtgCacheOperation* operation); int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation* operation); uint64_t ctgGetTbTSMACacheSize(STSMACache* pTsmaInfo); void ctgFreeTbTSMAInfo(void* p); +bool hasOutOfDateTSMACache(SArray* pTsmas); +bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache); +int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTbName, + SVgroupInfo* vgroupInfo, SStreamProgressRsp* out, SCtgTaskReq* tReq, + void* bInput); +int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, + int32_t flag); extern SCatalogMgmt gCtgMgmt; extern SCtgDebug gCTGDebug; diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 0f772d0eba..110f5ce308 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -2622,6 +2622,31 @@ int32_t ctgLaunchGetViewsTask(SCtgTask* pTask) { return TSDB_CODE_SUCCESS; } +int32_t ctgAsyncRefreshTbTsma(SCtgTaskReq* pReq, const SCtgTSMAFetch* pFetch) { + int32_t code = 0; + SCtgTask* pTask = pReq->pTask; + SCatalog* pCtg = pTask->pJob->pCtg; + SRequestConnInfo* pConn = &pTask->pJob->conn; + SCtgTbTSMACtx* pTaskCtx = pTask->taskCtx; + + SCtgDBCache* pDbCache = NULL; + STablesReq* pTbReq = taosArrayGet(pTaskCtx->pNames, pFetch->dbIdx); + + ctgAcquireVgInfoFromCache(pCtg, pTbReq->dbFName, &pDbCache); + if (pDbCache) { + + ctgReleaseVgInfoToCache(pCtg, pDbCache); + } else { + SBuildUseDBInput input = {0}; + tstrncpy(input.db, pTbReq->dbFName, tListLen(input.db)); + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; + + CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pReq)); + } +_return: + return code; +} + int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) { SCatalog* pCtg = pTask->pJob->pCtg; SCtgTbTSMACtx* pCtx = (SCtgTbTSMACtx*)pTask->taskCtx; @@ -2646,9 +2671,10 @@ int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) { pTask->msgCtxs = taosArrayInit_s(sizeof(SCtgMsgCtx), pCtx->fetchNum); for (int32_t i = 0; i < pCtx->fetchNum; ++i) { - SCtgFetch* pFetch = taosArrayGet(pCtx->pFetches, i); - SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); - SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); + SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, i); + STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); + SName* pName = taosArrayGet(pReq->pTables, pFetch->tbIdx); + SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i); if (!pMsgCtx->pBatchs) pMsgCtx->pBatchs = pJob->pBatchs; SCtgTaskReq tReq; @@ -2749,39 +2775,128 @@ _return: } int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) { + bool taskDone = false; + bool hasSubFetch = false; int32_t code = 0; SCtgTask* pTask = tReq->pTask; SCatalog* pCtg = pTask->pJob->pCtg; SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx); SCtgTbTSMACtx* pCtx = pTask->taskCtx; - SCtgFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); - SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx); - SName* pName = ctgGetFetchName(pCtx->pNames, pFetch); + SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); SArray* pTsmas = NULL; - bool taskDone = false; + SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx); + SCtgDBCache* pDbCache = NULL; STableTSMAInfo* pTsma = NULL; + SRequestConnInfo* pConn = &pTask->pJob->conn; CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target)); - STableTSMAInfoRsp* pOut = pMsgCtx->out; - if (pOut->pTsmas && taosArrayGetSize(pOut->pTsmas) > 0) { - for (int32_t i = 0; i < taosArrayGetSize(pOut->pTsmas); ++i) { - STableTSMAInfo* pInfo = taosArrayGetP(pOut->pTsmas, i); - CTG_ERR_JRET(tCloneTbTSMAInfo(pInfo, &pTsma)); - CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pTask->pJob->pCtg, &pTsma, false)); - } - } + switch (reqType) { + case TDMT_MND_GET_TABLE_TSMA: { + STableTSMAInfoRsp* pOut = pMsgCtx->out; + pRes->pRes = pOut; + pMsgCtx->out = NULL; - pRes->code = 0; - pRes->pRes = pOut; - pMsgCtx->out = NULL; - if (atomic_sub_fetch_32(&pCtx->fetchNum, 1) == 0) { - TSWAP(pTask->res, pCtx->pResList); - taskDone = true; + if (pOut->pTsmas && taosArrayGetSize(pOut->pTsmas) > 0) { + // fetch progress + STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); + const SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx); + ctgAcquireVgInfoFromCache(pCtg, pTbReq->dbFName, &pDbCache); + if (!pDbCache) { + // do not know which vnodes to fetch, fetch vnode list first + SBuildUseDBInput input = {0}; + tstrncpy(input.db, pTbReq->dbFName, tListLen(input.db)); + input.vgVersion = CTG_DEFAULT_INVALID_VERSION; + CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq)); + } else { + // fetch progress from every vnode + int32_t subFetchIdx = 0; + pFetch->vgNum = taosHashGetSize(pDbCache->vgCache.vgInfo->vgHash); + for (int32_t i = 0; i < taosArrayGetSize(pOut->pTsmas); ++i) { + STableTSMAInfo* pTsmaInfo = taosArrayGetP(pOut->pTsmas, i); + pTsmaInfo->reqTs = taosGetTimestampMs(); + SVgroupInfo* pVgInfo = taosHashIterate(pDbCache->vgCache.vgInfo->vgHash, NULL); + while (pVgInfo) { + // make StreamProgressReq, send it + SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx, + .streamId = pTsmaInfo->streamUid, + .subFetchIdx = subFetchIdx++, + .vgId = pVgInfo->vgId}; + CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req)); + pFetch->subFetchNum++; + hasSubFetch = true; + pVgInfo = taosHashIterate(pDbCache->vgCache.vgInfo->vgHash, pVgInfo); + } + } + ctgReleaseVgInfoToCache(pCtg, pDbCache); + pDbCache = NULL; + } + } else { + // no tsmas + if (atomic_sub_fetch_32(&pCtx->fetchNum, 1) == 0) { + TSWAP(pTask->res, pCtx->pResList); + taskDone = true; + } + } + } break; + case TDMT_VND_GET_STREAM_PROGRESS: { + // update progress into res + // TODO pack all streams into one req, and handle all stream rsps together + STableTSMAInfoRsp* pTsmasRsp = pRes->pRes; + SArray* pTsmas = pTsmasRsp->pTsmas; + SStreamProgressRsp* pRsp = pMsgCtx->out; + int32_t tsmaIdx = pRsp->subFetchIdx / pFetch->vgNum; + STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas, tsmaIdx); + if (pTsmaInfo->rspTs == 0) pTsmaInfo->fillHistoryFinished = true; + pTsmaInfo->rspTs = taosGetTimestampMs(); + pTsmaInfo->delayDuration = MAX(pRsp->progressDelay, pTsmaInfo->delayDuration); + pTsmaInfo->fillHistoryFinished = pTsmaInfo->fillHistoryFinished && pRsp->fillHisFinished; + + if (atomic_add_fetch_32(&pFetch->finishedSubFetchNum, 1) == pFetch->subFetchNum) { + // subfetch all finished + for (int32_t i = 0; i < taosArrayGetSize(pTsmas); ++i) { + STableTSMAInfo* pInfo = taosArrayGetP(pTsmas, i); + CTG_ERR_JRET(tCloneTbTSMAInfo(pInfo, &pTsma)); + CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, &pTsma, false)); + } + + if (atomic_sub_fetch_32(&pCtx->fetchNum, 1) == 0) { + TSWAP(pTask->res, pCtx->pResList); + taskDone = true; + } + } + } break; + case TDMT_MND_USE_DB: { + STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); + SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx); + SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out; + STableTSMAInfoRsp* pTsmas = pRes->pRes; + int32_t subFetchIdx = 0; + pFetch->vgNum = taosHashGetSize(pOut->dbVgroup->vgHash); + for (int32_t i = 0; i < taosArrayGetSize(pTsmas->pTsmas); ++i) { + STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas->pTsmas, i); + SVgroupInfo* pVgInfo = taosHashIterate(pOut->dbVgroup->vgHash, NULL); + while (pVgInfo) { + // make StreamProgressReq, send it + SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx, + .streamId = pTsma->streamUid, + .subFetchIdx = subFetchIdx++, + .vgId = pVgInfo->vgId}; + CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req)); + pFetch->subFetchNum++; + hasSubFetch = true; + pVgInfo = taosHashIterate(pOut->dbVgroup->vgHash, pVgInfo); + } + } + } break; + default: + ASSERT(0); } _return: - + if (pDbCache) { + ctgReleaseVgInfoToCache(pCtg, pDbCache); + } if (pTsma) { tFreeTableTSMAInfo(pTsma); pTsma = NULL; @@ -2793,10 +2908,16 @@ _return: if (TSDB_CODE_MND_SMA_NOT_EXIST == code) { code = TSDB_CODE_SUCCESS; } else { + STablesReq* pReq = (STablesReq*)taosArrayGet(pCtx->pNames, pFetch->dbIdx); + SName* pName = taosArrayGet(pReq->pTables, pFetch->tbIdx); ctgTaskError("Get tsma for %d.%s.%s faield with err: %s", pName->acctId, pName->dbname, pName->tname, tstrerror(code)); } - if (0 == atomic_sub_fetch_32(&pCtx->fetchNum, 1)) { + bool allSubFetchFinished = false; + if (reqType == TDMT_VND_GET_STREAM_PROGRESS) { + allSubFetchFinished = atomic_add_fetch_32(&pFetch->finishedSubFetchNum, 1) >= pFetch->subFetchNum; + } + if ((allSubFetchFinished || !hasSubFetch) && 0 == atomic_sub_fetch_32(&pCtx->fetchNum, 1)) { TSWAP(pTask->res, pCtx->pResList); taskDone = true; } diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index eb6ecd903c..eb1ce8d074 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -3246,8 +3246,10 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &dbCache)); if (!dbCache) { ctgDebug("DB %s not in cache", dbFName); + // TODO test no db cache, select from another db for (int32_t i = 0; i < tbNum; ++i) { - ctgAddFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag); + ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag); + //ctgAddTSMAFetch(); taosArrayPush(pCtx->pResList, &(SMetaData){0}); } return TSDB_CODE_SUCCESS; @@ -3258,18 +3260,18 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx pCache = taosHashAcquire(dbCache->tsmaCache, pName->tname, strlen(pName->tname)); if (!pCache) { ctgDebug("tsma for tb: %s.%s not in cache", dbFName, pName->tname); - ctgAddFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag); + ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag); taosArrayPush(pCtx->pResList, &(SMetaRes){0}); CTG_CACHE_NHIT_INC(CTG_CI_TBL_SMA, 1); continue; } CTG_LOCK(CTG_READ, &pCache->tsmaLock); - if (!pCache->pTsmas || pCache->pTsmas->size == 0) { + if (!pCache->pTsmas || pCache->pTsmas->size == 0 || hasOutOfDateTSMACache(pCache->pTsmas)) { CTG_UNLOCK(CTG_READ, &pCache->tsmaLock); taosHashRelease(dbCache->tsmaCache, pCache); ctgDebug("tsma for tb: %s.%s not in cache", pName->tname, dbFName); - ctgAddFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag); + ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag); taosArrayPush(pCtx->pResList, &(SMetaRes){0}); CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1); continue; diff --git a/source/libs/catalog/src/ctgRemote.c b/source/libs/catalog/src/ctgRemote.c index 804f47d2d7..6dd38cac48 100644 --- a/source/libs/catalog/src/ctgRemote.c +++ b/source/libs/catalog/src/ctgRemote.c @@ -355,6 +355,17 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize, qDebug("Got table tsma from mnode, tbFName:%s", target); break; } + case TDMT_VND_GET_STREAM_PROGRESS: { + if (TSDB_CODE_SUCCESS != rspCode) { + CTG_ERR_RET(rspCode); + } + code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize); + if (code) { + qError("Process get stream progress rsp failed, err: %s, tbFName: %s", tstrerror(code), target); + CTG_ERR_RET(code); + } + break; + } default: if (TSDB_CODE_SUCCESS != rspCode) { qError("Got error rsp, error:%s", tstrerror(rspCode)); @@ -550,6 +561,11 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; pName = ctx->pName; } + } else if (TDMT_VND_GET_STREAM_PROGRESS == msgType) { + SCtgTbTSMACtx* pCtx = pTask->taskCtx; + SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); + STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); + pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx); } else { ctgError("invalid vnode msgType %d", msgType); CTG_ERR_JRET(TSDB_CODE_APP_ERROR); @@ -600,6 +616,11 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; pName = ctx->pName; } + } else if (TDMT_VND_GET_STREAM_PROGRESS == msgType) { + SCtgTbTSMACtx* pCtx = pTask->taskCtx; + SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx); + STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx); + pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx); } else { ctgError("invalid vnode msgType %d", msgType); CTG_ERR_JRET(TSDB_CODE_APP_ERROR); @@ -1535,3 +1556,66 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa return TSDB_CODE_SUCCESS; } +int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTbName, + SVgroupInfo* vgroupInfo, SStreamProgressRsp* out, SCtgTaskReq* tReq, + void* bInput) { + char* msg = NULL; + int32_t msgLen = 0; + int32_t reqType = TDMT_VND_GET_STREAM_PROGRESS; + char tbFName[TSDB_TABLE_FNAME_LEN]; + tNameExtractFullName(pTbName, tbFName); + SCtgTask* pTask = tReq ? tReq->pTask : NULL; + void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont; + + SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse]; + ctgDebug("try to get stream progress from vnode, vgId:%d, ep num:%d, ep %s:%d, target:%s", vgroupInfo->vgId, + vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName); + + int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](bInput, &msg, 0, &msgLen, mallocFp); + if (code) { + ctgError("Build get stream progress failed, code:%s, tbFName:%s", tstrerror(code), tbFName); + CTG_ERR_RET(code); + } + + if (pTask) { + SStreamProgressRsp* pOut = taosMemoryCalloc(1, sizeof(SStreamProgressRsp)); + if (!pOut) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName)); + + SRequestConnInfo vConn = {.pTrans = pConn->pTrans, + .requestId = pConn->requestId, + .requestObjRefId = pConn->requestObjRefId, + .mgmtEps = vgroupInfo->epSet}; +#if CTG_BATCH_FETCH + CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen)); +#else + char dbFName[TSDB_DB_FNAME_LEN]; + tNameGetFullDbName(pTbName, dbFName); + SArray* pTaskId = taosArrayInit(1, sizeof(int32_t)); + if (NULL == pTaskId) { + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + taosArrayPush(pTaskId, &pTask->taskId); + + CTG_RET( + ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, vgroupInfo->vgId, reqType, msg, msgLen)); +#endif + } + + SRpcMsg rpcMsg = { + .msgType = reqType, + .pCont = msg, + .contLen = msgLen, + }; + + SRpcMsg rpcRsp = {0}; + rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp); + + CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName)); + + rpcFreeCont(rpcRsp.pCont); + + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index 211a21dd4a..ee85880d38 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -2392,3 +2392,36 @@ uint64_t ctgGetTbTSMACacheSize(STableTSMAInfo* pTsmaInfo) { //TODO return 0; } + +bool hasOutOfDateTSMACache(SArray* pTsmas) { + if (!pTsmas || pTsmas->size == 0) { + return false; + } + for (int32_t i = 0; i < pTsmas->size; ++i) { + STSMACache* pTsmaInfo = taosArrayGetP(pTsmas, i); + if (isCtgTSMACacheOutOfDate(pTsmaInfo)) return true; + } + return false; +} + +bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache) { + int64_t now = taosGetTimestampMs(); + return !pTsmaCache->fillHistoryFinished || (30 * 1000 - pTsmaCache->delayDuration) < (now - pTsmaCache->reqTs); +} + +int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx, + int32_t flag) { + if (NULL == (*pFetchs)) { + *pFetchs = taosArrayInit(CTG_DEFAULT_FETCH_NUM, sizeof(SCtgTSMAFetch)); + } + + SCtgTSMAFetch fetch = {0}; + fetch.dbIdx = dbIdx; + fetch.tbIdx = tbIdx; + fetch.fetchIdx = (*fetchIdx)++; + fetch.resIdx = resIdx; + + taosArrayPush(*pFetchs, &fetch); + + return TSDB_CODE_SUCCESS; +} diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index 84b009bf2e..04cef3e644 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -341,6 +341,21 @@ int32_t queryBuildGetTSMAMsg(void *input, char **msg, int32_t msgSize, int32_t * return TSDB_CODE_SUCCESS; } +int32_t queryBuildGetStreamProgressMsg(void* input, char** msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int64_t)) { + if (!msg || !msgLen) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + int32_t len = tSerializeStreamProgressReq(NULL, 0, input); + void* pBuf = (*mallcFp)(len); + + tSerializeStreamProgressReq(pBuf, len, input); + + *msg = pBuf; + *msgLen = len; + return TSDB_CODE_SUCCESS; +} + int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) { SUseDbOutput *pOut = output; SUseDbRsp usedbRsp = {0}; @@ -733,6 +748,18 @@ int32_t queryProcessGetTbTSMARsp(void* output, char* msg, int32_t msgSize) { return TSDB_CODE_SUCCESS; } +int32_t queryProcessStreamProgressRsp(void* output, char* msg, int32_t msgSize) { + if (!output || !msg || msgSize <= 0) { + return TSDB_CODE_TSC_INVALID_INPUT; + } + + if (tDeserializeSStreamProgressRsp(msg, msgSize, output) != 0) { + qError("tDeserializeStreamProgressRsp failed, msgSize: %d", msgSize); + return TSDB_CODE_INVALID_MSG; + } + return TSDB_CODE_SUCCESS; +} + void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg; @@ -751,6 +778,7 @@ void initQueryModuleMsgHandle() { queryBuildMsg[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryBuildGetViewMetaMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_TSMA)] = queryBuildGetTableTSMAMsg; queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TSMA)] = queryBuildGetTSMAMsg; + queryBuildMsg[TMSG_INDEX(TDMT_VND_GET_STREAM_PROGRESS)] = queryBuildGetStreamProgressMsg; queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp; @@ -768,6 +796,7 @@ void initQueryModuleMsgHandle() { queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryProcessGetViewMetaRsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_TSMA)] = queryProcessGetTbTSMARsp; queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TSMA)] = queryProcessGetTbTSMARsp; + queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_GET_STREAM_PROGRESS)] = queryProcessStreamProgressRsp; } #pragma GCC diagnostic pop diff --git a/tests/system-test/2-query/tsma.py b/tests/system-test/2-query/tsma.py index 9b832372cd..a44d728c38 100644 --- a/tests/system-test/2-query/tsma.py +++ b/tests/system-test/2-query/tsma.py @@ -446,7 +446,7 @@ class TDTestCase: self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m') ## why need 5s, calculation not finished yet. time.sleep(5) - time.sleep(9999999) + #time.sleep(9999999) self.test_query_with_tsma_interval() self.test_query_with_tsma_agg()