catalog cache get stream progress

This commit is contained in:
wangjiaming0909 2024-02-07 11:40:47 +08:00
parent a4d2dfed67
commit e947a2fb0d
19 changed files with 568 additions and 36 deletions

View File

@ -4222,6 +4222,12 @@ typedef struct {
SArray* pTags; // SArray<SSchema>
SArray* pUsedCols; // SArray<SSchema>
char* ast;
int64_t streamUid;
int64_t reqTs;
int64_t rspTs;
int64_t delayDuration;
bool fillHistoryFinished;
} STableTSMAInfo;
typedef struct {
@ -4229,7 +4235,7 @@ typedef struct {
} STableTSMAInfoRsp;
int32_t tSerializeTableTSMAInfoRsp(void* buf, int32_t bufLen, const STableTSMAInfoRsp* pRsp);
int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRsp* pReq);
int32_t tDeserializeTableTSMAInfoRsp(void* buf, int32_t bufLen, STableTSMAInfoRsp* pRsp);
int32_t tCloneTbTSMAInfo(STableTSMAInfo* pInfo, STableTSMAInfo** pRes);
void tFreeTableTSMAInfo(void* p);
void tFreeAndClearTableTSMAInfo(void* p);
@ -4240,6 +4246,28 @@ void tFreeTableTSMAInfoRsp(STableTSMAInfoRsp* pRsp);
#define tDeserializeTSMAHbRsp tDeserializeTableTSMAInfoRsp
#define tFreeTSMAHbRsp tFreeTableTSMAInfoRsp
typedef struct SStreamProgressReq {
int64_t streamId;
int32_t vgId;
int32_t fetchIdx;
int32_t subFetchIdx;
} SStreamProgressReq;
int32_t tSerializeStreamProgressReq(void* buf, int32_t bufLen, const SStreamProgressReq* pReq);
int32_t tDeserializeStreamProgressReq(void* buf, int32_t bufLen, SStreamProgressReq* pReq);
typedef struct SStreamProgressRsp {
int64_t streamId;
int32_t vgId;
bool fillHisFinished;
int64_t progressDelay;
int32_t fetchIdx;
int32_t subFetchIdx;
} SStreamProgressRsp;
int32_t tSerializeStreamProgressRsp(void* buf, int32_t bufLen, const SStreamProgressRsp* pRsp);
int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgressRsp* pRsp);
#pragma pack(pop)
#ifdef __cplusplus

View File

@ -364,6 +364,7 @@
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_RESET, "vnode-stream-reset", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_TASK_CHECK, "vnode-stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_STREAM_MAX_MSG, "vnd-stream-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_GET_STREAM_PROGRESS, "vnd-stream-progress", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_VND_STREAM_MSG)
TD_NEW_MSG_SEG(TDMT_VND_TMQ_MSG) //8 << 8

View File

@ -10004,6 +10004,11 @@ static int32_t tEncodeTableTSMAInfo(SEncoder* pEncoder, const STableTSMAInfo* pT
}
if (tEncodeCStr(pEncoder, pTsmaInfo->ast) < 0) return -1;
if (tEncodeI64(pEncoder, pTsmaInfo->streamUid) < 0) return -1;
if (tEncodeI64(pEncoder, pTsmaInfo->reqTs) < 0) return -1;
if (tEncodeI64(pEncoder, pTsmaInfo->rspTs) < 0) return -1;
if (tEncodeI64(pEncoder, pTsmaInfo->delayDuration) < 0) return -1;
if (tEncodeI8(pEncoder, pTsmaInfo->fillHistoryFinished) < 0) return -1;
return 0;
}
@ -10056,7 +10061,11 @@ static int32_t tDecodeTableTSMAInfo(SDecoder* pDecoder, STableTSMAInfo* pTsmaInf
}
}
if (tDecodeCStrAlloc(pDecoder, &pTsmaInfo->ast) < 0) return -1;
if (tDecodeI64(pDecoder, &pTsmaInfo->streamUid) < 0) return -1;
if (tDecodeI64(pDecoder, &pTsmaInfo->reqTs) < 0) return -1;
if (tDecodeI64(pDecoder, &pTsmaInfo->rspTs) < 0) return -1;
if (tDecodeI64(pDecoder, &pTsmaInfo->delayDuration) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&pTsmaInfo->fillHistoryFinished) < 0) return -1;
return 0;
}
@ -10065,7 +10074,7 @@ static int32_t tEncodeTableTSMAInfoRsp(SEncoder *pEncoder, const STableTSMAInfoR
if (tEncodeI32(pEncoder, size) < 0) return -1;
for (int32_t i = 0; i < size; ++i) {
STableTSMAInfo* pInfo = taosArrayGetP(pRsp->pTsmas, i);
if (tEncodeTableTSMAInfo(pEncoder, pInfo) < 0) return -1;
if (tEncodeTableTSMAInfo(pEncoder, pInfo) < 0) return -1;
}
return 0;
}
@ -10166,3 +10175,93 @@ void tFreeTableTSMAInfoRsp(STableTSMAInfoRsp *pRsp) {
taosArrayDestroyP(pRsp->pTsmas, tFreeAndClearTableTSMAInfo);
}
}
static int32_t tEncodeStreamProgressReq(SEncoder *pEncoder, const SStreamProgressReq *pReq) {
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->fetchIdx) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->subFetchIdx) < 0) return -1;
return 0;
}
int32_t tSerializeStreamProgressReq(void* buf, int32_t bufLen, const SStreamProgressReq* pReq) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeStreamProgressReq(&encoder, pReq) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
static int32_t tDecodeStreamProgressReq(SDecoder* pDecoder, SStreamProgressReq* pReq) {
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->fetchIdx) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->subFetchIdx) < 0) return -1;
return 0;
}
int32_t tDeserializeStreamProgressReq(void* buf, int32_t bufLen, SStreamProgressReq* pReq) {
SDecoder decoder = {0};
tDecoderInit(&decoder, (char *)buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeStreamProgressReq(&decoder, pReq) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
static int32_t tEncodeStreamProgressRsp(SEncoder* pEncoder, const SStreamProgressRsp* pRsp) {
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->vgId) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->fillHisFinished) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->progressDelay) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->fetchIdx) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->subFetchIdx) < 0) return -1;
return 0;
}
int32_t tSerializeStreamProgressRsp(void* buf, int32_t bufLen, const SStreamProgressRsp* pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeStreamProgressRsp(&encoder, pRsp) < 0) return -1;
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
static int32_t tDecodeStreamProgressRsp(SDecoder* pDecoder, SStreamProgressRsp* pRsp) {
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->vgId) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&pRsp->fillHisFinished) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->progressDelay) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->fetchIdx) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->subFetchIdx) < 0) return -1;
return 0;
}
int32_t tDeserializeSStreamProgressRsp(void* buf, int32_t bufLen, SStreamProgressRsp* pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeStreamProgressRsp(&decoder, pRsp) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}

View File

@ -91,6 +91,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -930,6 +930,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -2035,8 +2035,8 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
SSmaObj * pSma = NULL;
SSdb * pSdb = pMnode->pSdb;
void * pIter = NULL;
SStbObj *pStb = NULL;
SStreamObj * pStreamObj = NULL;
SStbObj * pStb = NULL;
/*
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (NULL == pStb) {
@ -2060,6 +2060,19 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
continue;
}
SName smaName;
char streamName[TSDB_TABLE_FNAME_LEN] = {0};
tNameFromString(&smaName, pSma->name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
sprintf(streamName, "%d.%s", smaName.acctId, smaName.tname);
pStreamObj = mndAcquireStream(pMnode, streamName);
if (!pStreamObj) {
sdbRelease(pSdb, pSma);
continue;
}
int64_t streamId = pStreamObj->uid;
mndReleaseStream(pMnode, pStreamObj);
STableTSMAInfo *pTsma = taosMemoryCalloc(1, sizeof(STableTSMAInfo));
if (!pTsma) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -2068,6 +2081,7 @@ static int32_t mndGetTableTSMA(SMnode *pMnode, char *tbFName, STableTSMAInfoRsp
return code;
}
terrno = dumpTSMAInfoFromSmaObj(pSma, pStb, pTsma);
pTsma->streamUid = streamId;
mndReleaseStb(pMnode, pStb);
sdbRelease(pSdb, pSma);
if (terrno) return code;

View File

@ -120,6 +120,7 @@ void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int32_t vnodeGetBatchMeta(SVnode* pVnode, SRpcMsg* pMsg);
int32_t vnodeGetStreamProgress(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);

View File

@ -294,6 +294,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
// sma
int32_t smaInit();

View File

@ -1031,6 +1031,44 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
int32_t tqStreamProgressRetrieveReq(STQ *pTq, SRpcMsg *pMsg) {
char* msgStr = pMsg->pCont;
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamProgressReq req;
char* pRspBuf = taosMemoryCalloc(1, sizeof(SMsgHead) + sizeof(SStreamProgressRsp));
SStreamProgressRsp* pRsp = POINTER_SHIFT(pRspBuf, sizeof(SMsgHead));
if (!pRspBuf) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
code = tDeserializeStreamProgressReq(msgBody, msgLen, &req);
if (code == TSDB_CODE_SUCCESS) {
code = tqGetStreamExecInfo(pTq->pVnode, req.streamId, &pRsp->progressDelay, &pRsp->fillHisFinished);
}
if (code == TSDB_CODE_SUCCESS) {
pRsp->fetchIdx = req.fetchIdx;
pRsp->subFetchIdx = req.subFetchIdx;
pRsp->vgId = req.vgId;
pRsp->streamId = req.streamId;
tSerializeStreamProgressRsp(pRsp, sizeof(SStreamProgressRsp) + sizeof(SMsgHead), pRsp);
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
rsp.pCont = pRspBuf;
pRspBuf = NULL;
rsp.contLen = sizeof(SMsgHead) + sizeof(SStreamProgressRsp);
tmsgSendRsp(&rsp);
}
_OVER:
if (pRspBuf) {
taosMemoryFree(pRspBuf);
}
return code;
}
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;

View File

@ -545,12 +545,12 @@ int32_t tqGetStreamExecInfo(SVnode* pVnode, int64_t streamId, int64_t* pDelay, b
int64_t latest = 0;
code = walFetchHead(pReader, ver);
if (code != TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_SUCCESS) {
cur = pReader->pHead->head.ingestTs;
}
code = walFetchHead(pReader, verRange.maxVer);
if (code != TSDB_CODE_SUCCESS) {
if (code == TSDB_CODE_SUCCESS) {
latest = pReader->pHead->head.ingestTs;
}

View File

@ -323,6 +323,9 @@ int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
case TDMT_VND_TABLE_CFG:
vnodeGetTableCfg(pVnode, &reqMsg, false);
break;
case TDMT_VND_GET_STREAM_PROGRESS:
vnodeGetStreamProgress(pVnode, &reqMsg, false);
break;
default:
qError("invalid req msgType %d", req->msgType);
reqMsg.code = TSDB_CODE_INVALID_MSG;
@ -711,3 +714,54 @@ void *vnodeGetIvtIdx(void *pVnode) {
int32_t vnodeGetTableSchema(void *pVnode, int64_t uid, STSchema **pSchema, int64_t *suid) {
return tsdbGetTableSchema(((SVnode *)pVnode)->pMeta, uid, pSchema, suid);
}
int32_t vnodeGetStreamProgress(SVnode* pVnode, SRpcMsg* pMsg, bool direct) {
int32_t code = 0;
SStreamProgressReq req;
SStreamProgressRsp rsp = {0};
SRpcMsg rpcMsg = {.info = pMsg->info, .code = 0};
char * buf = NULL;
int32_t rspLen = 0;
code = tDeserializeStreamProgressReq(pMsg->pCont, pMsg->contLen, &req);
if (code == TSDB_CODE_SUCCESS) {
rsp.fetchIdx = req.fetchIdx;
rsp.subFetchIdx = req.subFetchIdx;
rsp.vgId = req.vgId;
rsp.streamId = req.streamId;
rspLen = tSerializeStreamProgressRsp(0, 0, &rsp);
if (direct) {
buf = rpcMallocCont(rspLen);
} else {
buf = taosMemoryCalloc(1, rspLen);
}
if (!buf) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto _OVER;
}
}
if (code == TSDB_CODE_SUCCESS) {
code = tqGetStreamExecInfo(pVnode, req.streamId, &rsp.progressDelay, &rsp.fillHisFinished);
}
if (code == TSDB_CODE_SUCCESS) {
tSerializeStreamProgressRsp(buf, rspLen, &rsp);
rpcMsg.pCont = buf;
buf = NULL;
rpcMsg.contLen = rspLen;
rpcMsg.code = code;
rpcMsg.msgType = pMsg->msgType;
if (direct) {
tmsgSendRsp(&rpcMsg);
} else {
*pMsg = rpcMsg;
}
}
_OVER:
if (buf) {
taosMemoryFree(buf);
}
return code;
}

View File

@ -838,6 +838,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessStreamReqCheckpointRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg);
case TDMT_VND_GET_STREAM_PROGRESS:
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;

View File

@ -262,9 +262,25 @@ typedef struct SCtgViewsCtx {
SArray* pFetchs;
} SCtgViewsCtx;
typedef enum {
FETCH_TSMA_FOR_TB,
FETCH_PROGRESS_FOR_TSMA,
} CTG_TSMA_FETCH_TYPE;
typedef struct SCtgTSMAFetch {
CTG_TSMA_FETCH_TYPE fetchType;
int32_t dbIdx;
int32_t tbIdx;
int32_t fetchIdx;
int32_t resIdx;
int32_t subFetchNum;
int32_t finishedSubFetchNum;
int32_t vgNum;
} SCtgTSMAFetch;
typedef struct SCtgTbTSMACtx {
int32_t fetchNum;
SArray* pNames;
SArray* pNames; // SArray<STablesReq>
SArray* pResList;
SArray* pFetches;
} SCtgTbTSMACtx;
@ -1128,6 +1144,13 @@ int32_t ctgOpDropTbTSMA(SCtgCacheOperation* operation);
int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation* operation);
uint64_t ctgGetTbTSMACacheSize(STSMACache* pTsmaInfo);
void ctgFreeTbTSMAInfo(void* p);
bool hasOutOfDateTSMACache(SArray* pTsmas);
bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache);
int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTbName,
SVgroupInfo* vgroupInfo, SStreamProgressRsp* out, SCtgTaskReq* tReq,
void* bInput);
int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx,
int32_t flag);
extern SCatalogMgmt gCtgMgmt;
extern SCtgDebug gCTGDebug;

View File

@ -2622,6 +2622,31 @@ int32_t ctgLaunchGetViewsTask(SCtgTask* pTask) {
return TSDB_CODE_SUCCESS;
}
int32_t ctgAsyncRefreshTbTsma(SCtgTaskReq* pReq, const SCtgTSMAFetch* pFetch) {
int32_t code = 0;
SCtgTask* pTask = pReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbTSMACtx* pTaskCtx = pTask->taskCtx;
SCtgDBCache* pDbCache = NULL;
STablesReq* pTbReq = taosArrayGet(pTaskCtx->pNames, pFetch->dbIdx);
ctgAcquireVgInfoFromCache(pCtg, pTbReq->dbFName, &pDbCache);
if (pDbCache) {
ctgReleaseVgInfoToCache(pCtg, pDbCache);
} else {
SBuildUseDBInput input = {0};
tstrncpy(input.db, pTbReq->dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, pReq));
}
_return:
return code;
}
int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) {
SCatalog* pCtg = pTask->pJob->pCtg;
SCtgTbTSMACtx* pCtx = (SCtgTbTSMACtx*)pTask->taskCtx;
@ -2646,9 +2671,10 @@ int32_t ctgLaunchGetTbTSMATask(SCtgTask* pTask) {
pTask->msgCtxs = taosArrayInit_s(sizeof(SCtgMsgCtx), pCtx->fetchNum);
for (int32_t i = 0; i < pCtx->fetchNum; ++i) {
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetches, i);
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, i);
STablesReq* pReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
SName* pName = taosArrayGet(pReq->pTables, pFetch->tbIdx);
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, i);
if (!pMsgCtx->pBatchs) pMsgCtx->pBatchs = pJob->pBatchs;
SCtgTaskReq tReq;
@ -2749,39 +2775,128 @@ _return:
}
int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
bool taskDone = false;
bool hasSubFetch = false;
int32_t code = 0;
SCtgTask* pTask = tReq->pTask;
SCatalog* pCtg = pTask->pJob->pCtg;
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, tReq->msgIdx);
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx);
SName* pName = ctgGetFetchName(pCtx->pNames, pFetch);
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
SArray* pTsmas = NULL;
bool taskDone = false;
SMetaRes* pRes = taosArrayGet(pCtx->pResList, pFetch->resIdx);
SCtgDBCache* pDbCache = NULL;
STableTSMAInfo* pTsma = NULL;
SRequestConnInfo* pConn = &pTask->pJob->conn;
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
STableTSMAInfoRsp* pOut = pMsgCtx->out;
if (pOut->pTsmas && taosArrayGetSize(pOut->pTsmas) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pOut->pTsmas); ++i) {
STableTSMAInfo* pInfo = taosArrayGetP(pOut->pTsmas, i);
CTG_ERR_JRET(tCloneTbTSMAInfo(pInfo, &pTsma));
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pTask->pJob->pCtg, &pTsma, false));
}
}
switch (reqType) {
case TDMT_MND_GET_TABLE_TSMA: {
STableTSMAInfoRsp* pOut = pMsgCtx->out;
pRes->pRes = pOut;
pMsgCtx->out = NULL;
pRes->code = 0;
pRes->pRes = pOut;
pMsgCtx->out = NULL;
if (atomic_sub_fetch_32(&pCtx->fetchNum, 1) == 0) {
TSWAP(pTask->res, pCtx->pResList);
taskDone = true;
if (pOut->pTsmas && taosArrayGetSize(pOut->pTsmas) > 0) {
// fetch progress
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
const SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx);
ctgAcquireVgInfoFromCache(pCtg, pTbReq->dbFName, &pDbCache);
if (!pDbCache) {
// do not know which vnodes to fetch, fetch vnode list first
SBuildUseDBInput input = {0};
tstrncpy(input.db, pTbReq->dbFName, tListLen(input.db));
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
CTG_ERR_JRET(ctgGetDBVgInfoFromMnode(pCtg, pConn, &input, NULL, tReq));
} else {
// fetch progress from every vnode
int32_t subFetchIdx = 0;
pFetch->vgNum = taosHashGetSize(pDbCache->vgCache.vgInfo->vgHash);
for (int32_t i = 0; i < taosArrayGetSize(pOut->pTsmas); ++i) {
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pOut->pTsmas, i);
pTsmaInfo->reqTs = taosGetTimestampMs();
SVgroupInfo* pVgInfo = taosHashIterate(pDbCache->vgCache.vgInfo->vgHash, NULL);
while (pVgInfo) {
// make StreamProgressReq, send it
SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx,
.streamId = pTsmaInfo->streamUid,
.subFetchIdx = subFetchIdx++,
.vgId = pVgInfo->vgId};
CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req));
pFetch->subFetchNum++;
hasSubFetch = true;
pVgInfo = taosHashIterate(pDbCache->vgCache.vgInfo->vgHash, pVgInfo);
}
}
ctgReleaseVgInfoToCache(pCtg, pDbCache);
pDbCache = NULL;
}
} else {
// no tsmas
if (atomic_sub_fetch_32(&pCtx->fetchNum, 1) == 0) {
TSWAP(pTask->res, pCtx->pResList);
taskDone = true;
}
}
} break;
case TDMT_VND_GET_STREAM_PROGRESS: {
// update progress into res
// TODO pack all streams into one req, and handle all stream rsps together
STableTSMAInfoRsp* pTsmasRsp = pRes->pRes;
SArray* pTsmas = pTsmasRsp->pTsmas;
SStreamProgressRsp* pRsp = pMsgCtx->out;
int32_t tsmaIdx = pRsp->subFetchIdx / pFetch->vgNum;
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas, tsmaIdx);
if (pTsmaInfo->rspTs == 0) pTsmaInfo->fillHistoryFinished = true;
pTsmaInfo->rspTs = taosGetTimestampMs();
pTsmaInfo->delayDuration = MAX(pRsp->progressDelay, pTsmaInfo->delayDuration);
pTsmaInfo->fillHistoryFinished = pTsmaInfo->fillHistoryFinished && pRsp->fillHisFinished;
if (atomic_add_fetch_32(&pFetch->finishedSubFetchNum, 1) == pFetch->subFetchNum) {
// subfetch all finished
for (int32_t i = 0; i < taosArrayGetSize(pTsmas); ++i) {
STableTSMAInfo* pInfo = taosArrayGetP(pTsmas, i);
CTG_ERR_JRET(tCloneTbTSMAInfo(pInfo, &pTsma));
CTG_ERR_JRET(ctgUpdateTbTSMAEnqueue(pCtg, &pTsma, false));
}
if (atomic_sub_fetch_32(&pCtx->fetchNum, 1) == 0) {
TSWAP(pTask->res, pCtx->pResList);
taskDone = true;
}
}
} break;
case TDMT_MND_USE_DB: {
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->fetchIdx);
SUseDbOutput* pOut = (SUseDbOutput*)pMsgCtx->out;
STableTSMAInfoRsp* pTsmas = pRes->pRes;
int32_t subFetchIdx = 0;
pFetch->vgNum = taosHashGetSize(pOut->dbVgroup->vgHash);
for (int32_t i = 0; i < taosArrayGetSize(pTsmas->pTsmas); ++i) {
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas->pTsmas, i);
SVgroupInfo* pVgInfo = taosHashIterate(pOut->dbVgroup->vgHash, NULL);
while (pVgInfo) {
// make StreamProgressReq, send it
SStreamProgressReq req = {.fetchIdx = pFetch->fetchIdx,
.streamId = pTsma->streamUid,
.subFetchIdx = subFetchIdx++,
.vgId = pVgInfo->vgId};
CTG_ERR_JRET(ctgGetStreamProgressFromVnode(pCtg, pConn, pTbName, pVgInfo, NULL, tReq, &req));
pFetch->subFetchNum++;
hasSubFetch = true;
pVgInfo = taosHashIterate(pOut->dbVgroup->vgHash, pVgInfo);
}
}
} break;
default:
ASSERT(0);
}
_return:
if (pDbCache) {
ctgReleaseVgInfoToCache(pCtg, pDbCache);
}
if (pTsma) {
tFreeTableTSMAInfo(pTsma);
pTsma = NULL;
@ -2793,10 +2908,16 @@ _return:
if (TSDB_CODE_MND_SMA_NOT_EXIST == code) {
code = TSDB_CODE_SUCCESS;
} else {
STablesReq* pReq = (STablesReq*)taosArrayGet(pCtx->pNames, pFetch->dbIdx);
SName* pName = taosArrayGet(pReq->pTables, pFetch->tbIdx);
ctgTaskError("Get tsma for %d.%s.%s faield with err: %s", pName->acctId, pName->dbname, pName->tname,
tstrerror(code));
}
if (0 == atomic_sub_fetch_32(&pCtx->fetchNum, 1)) {
bool allSubFetchFinished = false;
if (reqType == TDMT_VND_GET_STREAM_PROGRESS) {
allSubFetchFinished = atomic_add_fetch_32(&pFetch->finishedSubFetchNum, 1) >= pFetch->subFetchNum;
}
if ((allSubFetchFinished || !hasSubFetch) && 0 == atomic_sub_fetch_32(&pCtx->fetchNum, 1)) {
TSWAP(pTask->res, pCtx->pResList);
taskDone = true;
}

View File

@ -3246,8 +3246,10 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &dbCache));
if (!dbCache) {
ctgDebug("DB %s not in cache", dbFName);
// TODO test no db cache, select from another db
for (int32_t i = 0; i < tbNum; ++i) {
ctgAddFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag);
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag);
//ctgAddTSMAFetch();
taosArrayPush(pCtx->pResList, &(SMetaData){0});
}
return TSDB_CODE_SUCCESS;
@ -3258,18 +3260,18 @@ int32_t ctgGetTbTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, int32_t dbIdx
pCache = taosHashAcquire(dbCache->tsmaCache, pName->tname, strlen(pName->tname));
if (!pCache) {
ctgDebug("tsma for tb: %s.%s not in cache", dbFName, pName->tname);
ctgAddFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag);
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
CTG_CACHE_NHIT_INC(CTG_CI_TBL_SMA, 1);
continue;
}
CTG_LOCK(CTG_READ, &pCache->tsmaLock);
if (!pCache->pTsmas || pCache->pTsmas->size == 0) {
if (!pCache->pTsmas || pCache->pTsmas->size == 0 || hasOutOfDateTSMACache(pCache->pTsmas)) {
CTG_UNLOCK(CTG_READ, &pCache->tsmaLock);
taosHashRelease(dbCache->tsmaCache, pCache);
ctgDebug("tsma for tb: %s.%s not in cache", pName->tname, dbFName);
ctgAddFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag);
ctgAddTSMAFetch(&pCtx->pFetches, dbIdx, i, fetchIdx, baseResIdx + i, flag);
taosArrayPush(pCtx->pResList, &(SMetaRes){0});
CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1);
continue;

View File

@ -355,6 +355,17 @@ int32_t ctgProcessRspMsg(void* out, int32_t reqType, char* msg, int32_t msgSize,
qDebug("Got table tsma from mnode, tbFName:%s", target);
break;
}
case TDMT_VND_GET_STREAM_PROGRESS: {
if (TSDB_CODE_SUCCESS != rspCode) {
CTG_ERR_RET(rspCode);
}
code = queryProcessMsgRsp[TMSG_INDEX(reqType)](out, msg, msgSize);
if (code) {
qError("Process get stream progress rsp failed, err: %s, tbFName: %s", tstrerror(code), target);
CTG_ERR_RET(code);
}
break;
}
default:
if (TSDB_CODE_SUCCESS != rspCode) {
qError("Got error rsp, error:%s", tstrerror(rspCode));
@ -550,6 +561,11 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
pName = ctx->pName;
}
} else if (TDMT_VND_GET_STREAM_PROGRESS == msgType) {
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
} else {
ctgError("invalid vnode msgType %d", msgType);
CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
@ -600,6 +616,11 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
pName = ctx->pName;
}
} else if (TDMT_VND_GET_STREAM_PROGRESS == msgType) {
SCtgTbTSMACtx* pCtx = pTask->taskCtx;
SCtgTSMAFetch* pFetch = taosArrayGet(pCtx->pFetches, tReq->msgIdx);
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
pName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
} else {
ctgError("invalid vnode msgType %d", msgType);
CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
@ -1535,3 +1556,66 @@ int32_t ctgGetTbTSMAFromMnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SNa
return TSDB_CODE_SUCCESS;
}
int32_t ctgGetStreamProgressFromVnode(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTbName,
SVgroupInfo* vgroupInfo, SStreamProgressRsp* out, SCtgTaskReq* tReq,
void* bInput) {
char* msg = NULL;
int32_t msgLen = 0;
int32_t reqType = TDMT_VND_GET_STREAM_PROGRESS;
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(pTbName, tbFName);
SCtgTask* pTask = tReq ? tReq->pTask : NULL;
void* (*mallocFp)(int64_t) = pTask ? (MallocType)taosMemoryMalloc : (MallocType)rpcMallocCont;
SEp* pEp = &vgroupInfo->epSet.eps[vgroupInfo->epSet.inUse];
ctgDebug("try to get stream progress from vnode, vgId:%d, ep num:%d, ep %s:%d, target:%s", vgroupInfo->vgId,
vgroupInfo->epSet.numOfEps, pEp->fqdn, pEp->port, tbFName);
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](bInput, &msg, 0, &msgLen, mallocFp);
if (code) {
ctgError("Build get stream progress failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
CTG_ERR_RET(code);
}
if (pTask) {
SStreamProgressRsp* pOut = taosMemoryCalloc(1, sizeof(SStreamProgressRsp));
if (!pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
CTG_ERR_RET(ctgUpdateMsgCtx(CTG_GET_TASK_MSGCTX(pTask, -1), reqType, pOut, (char*)tbFName));
SRequestConnInfo vConn = {.pTrans = pConn->pTrans,
.requestId = pConn->requestId,
.requestObjRefId = pConn->requestObjRefId,
.mgmtEps = vgroupInfo->epSet};
#if CTG_BATCH_FETCH
CTG_RET(ctgAddBatch(pCtg, vgroupInfo->vgId, &vConn, tReq, reqType, msg, msgLen));
#else
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(pTbName, dbFName);
SArray* pTaskId = taosArrayInit(1, sizeof(int32_t));
if (NULL == pTaskId) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pTaskId, &pTask->taskId);
CTG_RET(
ctgAsyncSendMsg(pCtg, &vConn, pTask->pJob, pTaskId, -1, NULL, dbFName, vgroupInfo->vgId, reqType, msg, msgLen));
#endif
}
SRpcMsg rpcMsg = {
.msgType = reqType,
.pCont = msg,
.contLen = msgLen,
};
SRpcMsg rpcRsp = {0};
rpcSendRecv(pConn->pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp);
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
rpcFreeCont(rpcRsp.pCont);
return TSDB_CODE_SUCCESS;
}

View File

@ -2392,3 +2392,36 @@ uint64_t ctgGetTbTSMACacheSize(STableTSMAInfo* pTsmaInfo) {
//TODO
return 0;
}
bool hasOutOfDateTSMACache(SArray* pTsmas) {
if (!pTsmas || pTsmas->size == 0) {
return false;
}
for (int32_t i = 0; i < pTsmas->size; ++i) {
STSMACache* pTsmaInfo = taosArrayGetP(pTsmas, i);
if (isCtgTSMACacheOutOfDate(pTsmaInfo)) return true;
}
return false;
}
bool isCtgTSMACacheOutOfDate(STSMACache* pTsmaCache) {
int64_t now = taosGetTimestampMs();
return !pTsmaCache->fillHistoryFinished || (30 * 1000 - pTsmaCache->delayDuration) < (now - pTsmaCache->reqTs);
}
int32_t ctgAddTSMAFetch(SArray** pFetchs, int32_t dbIdx, int32_t tbIdx, int32_t* fetchIdx, int32_t resIdx,
int32_t flag) {
if (NULL == (*pFetchs)) {
*pFetchs = taosArrayInit(CTG_DEFAULT_FETCH_NUM, sizeof(SCtgTSMAFetch));
}
SCtgTSMAFetch fetch = {0};
fetch.dbIdx = dbIdx;
fetch.tbIdx = tbIdx;
fetch.fetchIdx = (*fetchIdx)++;
fetch.resIdx = resIdx;
taosArrayPush(*pFetchs, &fetch);
return TSDB_CODE_SUCCESS;
}

View File

@ -341,6 +341,21 @@ int32_t queryBuildGetTSMAMsg(void *input, char **msg, int32_t msgSize, int32_t *
return TSDB_CODE_SUCCESS;
}
int32_t queryBuildGetStreamProgressMsg(void* input, char** msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int64_t)) {
if (!msg || !msgLen) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
int32_t len = tSerializeStreamProgressReq(NULL, 0, input);
void* pBuf = (*mallcFp)(len);
tSerializeStreamProgressReq(pBuf, len, input);
*msg = pBuf;
*msgLen = len;
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
SUseDbOutput *pOut = output;
SUseDbRsp usedbRsp = {0};
@ -733,6 +748,18 @@ int32_t queryProcessGetTbTSMARsp(void* output, char* msg, int32_t msgSize) {
return TSDB_CODE_SUCCESS;
}
int32_t queryProcessStreamProgressRsp(void* output, char* msg, int32_t msgSize) {
if (!output || !msg || msgSize <= 0) {
return TSDB_CODE_TSC_INVALID_INPUT;
}
if (tDeserializeSStreamProgressRsp(msg, msgSize, output) != 0) {
qError("tDeserializeStreamProgressRsp failed, msgSize: %d", msgSize);
return TSDB_CODE_INVALID_MSG;
}
return TSDB_CODE_SUCCESS;
}
void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
@ -751,6 +778,7 @@ void initQueryModuleMsgHandle() {
queryBuildMsg[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryBuildGetViewMetaMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_TSMA)] = queryBuildGetTableTSMAMsg;
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TSMA)] = queryBuildGetTSMAMsg;
queryBuildMsg[TMSG_INDEX(TDMT_VND_GET_STREAM_PROGRESS)] = queryBuildGetStreamProgressMsg;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
@ -768,6 +796,7 @@ void initQueryModuleMsgHandle() {
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_VIEW_META)] = queryProcessGetViewMetaRsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_TSMA)] = queryProcessGetTbTSMARsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TSMA)] = queryProcessGetTbTSMARsp;
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_GET_STREAM_PROGRESS)] = queryProcessStreamProgressRsp;
}
#pragma GCC diagnostic pop

View File

@ -446,7 +446,7 @@ class TDTestCase:
self.create_tsma('tsma5', 'test', 'norm_tb', ['avg(c1)', 'avg(c2)'], '10m')
## why need 5s, calculation not finished yet.
time.sleep(5)
time.sleep(9999999)
#time.sleep(9999999)
self.test_query_with_tsma_interval()
self.test_query_with_tsma_agg()