fix create tsma txn and fix fetch wrong stream progress
This commit is contained in:
parent
9cc33b1296
commit
97498e1844
|
@ -1523,7 +1523,7 @@ static int32_t mndCreateTSMATxnPrepare(SCreateTSMACxt* pCxt) {
|
||||||
mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
|
mndTransSetDbName(pTrans, pCxt->pDb->name, NULL);
|
||||||
if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
|
if (mndTransCheckConflict(pCxt->pMnode, pTrans) != 0) goto _OVER;
|
||||||
|
|
||||||
mndTransSetParallel(pTrans);
|
mndTransSetSerial(pTrans);
|
||||||
mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name,
|
mInfo("trans:%d, used to create tsma:%s stream:%s", pTrans->id, pCxt->pCreateSmaReq->name,
|
||||||
pCxt->pCreateStreamReq->name);
|
pCxt->pCreateStreamReq->name);
|
||||||
|
|
||||||
|
|
|
@ -2845,7 +2845,8 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
||||||
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
|
STablesReq* pTbReq = taosArrayGet(pCtx->pNames, pFetch->dbIdx);
|
||||||
SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
|
SName* pTbName = taosArrayGet(pTbReq->pTables, pFetch->tbIdx);
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
|
if (reqType != TDMT_VND_GET_STREAM_PROGRESS)
|
||||||
|
CTG_ERR_JRET(ctgProcessRspMsg(pMsgCtx->out, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
|
||||||
|
|
||||||
switch (reqType) {
|
switch (reqType) {
|
||||||
case TDMT_MND_GET_TABLE_TSMA: {
|
case TDMT_MND_GET_TABLE_TSMA: {
|
||||||
|
@ -2878,10 +2879,12 @@ int32_t ctgHandleGetTbTSMARsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
case TDMT_VND_GET_STREAM_PROGRESS: {
|
case TDMT_VND_GET_STREAM_PROGRESS: {
|
||||||
|
SStreamProgressRsp rsp = {0};
|
||||||
|
CTG_ERR_JRET(ctgProcessRspMsg(&rsp, reqType, pMsg->pData, pMsg->len, rspCode, pMsgCtx->target));
|
||||||
// update progress into res
|
// update progress into res
|
||||||
STableTSMAInfoRsp* pTsmasRsp = pRes->pRes;
|
STableTSMAInfoRsp* pTsmasRsp = pRes->pRes;
|
||||||
SArray* pTsmas = pTsmasRsp->pTsmas;
|
SArray* pTsmas = pTsmasRsp->pTsmas;
|
||||||
SStreamProgressRsp* pRsp = pMsgCtx->out;
|
SStreamProgressRsp* pRsp = &rsp;
|
||||||
int32_t tsmaIdx = pRsp->subFetchIdx / pFetch->vgNum;
|
int32_t tsmaIdx = pRsp->subFetchIdx / pFetch->vgNum;
|
||||||
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas, tsmaIdx);
|
STableTSMAInfo* pTsmaInfo = taosArrayGetP(pTsmas, tsmaIdx);
|
||||||
if (pTsmaInfo->rspTs == 0) pTsmaInfo->fillHistoryFinished = true;
|
if (pTsmaInfo->rspTs == 0) pTsmaInfo->fillHistoryFinished = true;
|
||||||
|
|
Loading…
Reference in New Issue