fix: not submit for rsma without rollup

This commit is contained in:
Cary Xu 2022-06-16 15:34:33 +08:00
parent 8ad688d3d2
commit dbef3dfc55
3 changed files with 38 additions and 19 deletions

View File

@ -1583,6 +1583,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
int32_t rowSize = pDataBlock->info.rowSize; int32_t rowSize = pDataBlock->info.rowSize;
int64_t groupId = pDataBlock->info.groupId; int64_t groupId = pDataBlock->info.groupId;
if (colNum <= 1) {
// invalid if only with TS col
continue;
}
if (rb.nCols != colNum) { if (rb.nCols != colNum) {
tdSRowSetTpInfo(&rb, colNum, pTSchema->flen); tdSRowSetTpInfo(&rb, colNum, pTSchema->flen);
} }
@ -1679,6 +1684,7 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
msgLen += pSubmitBlk->dataLen; msgLen += pSubmitBlk->dataLen;
} }
if (numOfBlks > 0) {
(*pReq)->length = msgLen; (*pReq)->length = msgLen;
(*pReq)->header.vgId = htonl(vgId); (*pReq)->header.vgId = htonl(vgId);
@ -1697,6 +1703,10 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
blk->numOfRows = htons(blk->numOfRows); blk->numOfRows = htons(blk->numOfRows);
blk = (SSubmitBlk*)(blk->data + dataLen); blk = (SSubmitBlk*)(blk->data + dataLen);
} }
} else {
// no valid rows
taosMemoryFreeClear(*pReq);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -260,6 +260,7 @@ struct SSma {
#define SMA_CFG(s) (&(s)->pVnode->config) #define SMA_CFG(s) (&(s)->pVnode->config)
#define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg) #define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg)
#define SMA_RETENTION(s) ((SRetention *)&(s)->pVnode->config.tsdbCfg.retentions)
#define SMA_LOCKED(s) ((s)->locked) #define SMA_LOCKED(s) ((s)->locked)
#define SMA_META(s) ((s)->pVnode->pMeta) #define SMA_META(s) ((s)->pVnode->pMeta)
#define SMA_VID(s) TD_VID((s)->pVnode) #define SMA_VID(s) TD_VID((s)->pVnode)

View File

@ -400,22 +400,24 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
} }
if (taosArrayGetSize(pResult) > 0) { if (taosArrayGetSize(pResult) > 0) {
#if 1 #if 0
char flag[10] = {0}; char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, level); snprintf(flag, 10, "level %" PRIi8, level);
blockDebugShowData(pResult, flag); blockDebugShowData(pResult, flag);
#endif #endif
STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2); STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pSma->pRSmaTsdb1 : pSma->pRSmaTsdb2);
SSubmitReq *pReq = NULL; SSubmitReq *pReq = NULL;
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) != 0) { if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
taosArrayDestroy(pResult); taosArrayDestroy(pResult);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
if (tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) != 0) {
if (pReq && tdProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) < 0) {
taosArrayDestroy(pResult); taosArrayDestroy(pResult);
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
return TSDB_CODE_FAILED; return TSDB_CODE_FAILED;
} }
taosMemoryFreeClear(pReq); taosMemoryFreeClear(pReq);
} else { } else {
smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno)); smaDebug("vgId:%d, no rsma % " PRIi8 " data generated since %s", SMA_VID(pSma), level, tstrerror(terrno));
@ -469,6 +471,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SRetention *pRetention = SMA_RETENTION(pSma);
if (!RETENTION_VALID(pRetention + 1)) {
// return directly if retention level 1 is invalid
return TSDB_CODE_SUCCESS;
}
if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) { if (inputType == STREAM_DATA_TYPE_SUBMIT_BLOCK) {
STbUidStore uidStore = {0}; STbUidStore uidStore = {0};
tdFetchSubmitReqSuids(pMsg, &uidStore); tdFetchSubmitReqSuids(pMsg, &uidStore);