From 5b7117884447e96964c34f12d300b4da328c956d Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Mon, 2 May 2022 09:37:24 +0800 Subject: [PATCH] feat: rollup data storage --- include/common/tdatablock.h | 2 +- source/common/src/tdatablock.c | 8 +++++++- source/dnode/vnode/src/tsdb/tsdbFile.c | 4 ++-- source/dnode/vnode/src/tsdb/tsdbSma.c | 25 +++++++++++++++++-------- 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index d86586a378..3a98102661 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -225,7 +225,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData); void blockDebugShowData(const SArray* dataBlocks); -void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId, tb_uid_t uid, tb_uid_t suid); static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index e4f344b88f..dcea167e81 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1476,7 +1476,7 @@ void blockDebugShowData(const SArray* dataBlocks) { * * TODO: colId should be set */ -void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema *pTSchema, int32_t vgId, tb_uid_t uid, +int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema *pTSchema, int32_t vgId, tb_uid_t uid, tb_uid_t suid) { int32_t sz = taosArrayGetSize(pDataBlocks); int32_t bufSize = sizeof(SSubmitReq); @@ -1489,6 +1489,10 @@ void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, S ASSERT(bufSize < 3 * 1024 * 1024); *pReq = taosMemoryCalloc(1, bufSize); + if(!(*pReq)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } void* pDataBuf = *pReq; int32_t msgLen = sizeof(SSubmitReq); @@ -1582,4 +1586,6 @@ void buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, S blk->numOfRows = htons(blk->numOfRows); blk = (SSubmitBlk*)(blk->data + dataLen); } + + return TSDB_CODE_SUCCESS; } diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 25566d6dad..2f40698e49 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -27,7 +27,7 @@ static const char *TSDB_FNAME_SUFFIX[] = { "rsma", // TSDB_FILE_RSMA }; -static const char *TSDB_DIR_NAME[] = { +static const char *TSDB_LEVEL_DNAME[] = { "tsdb", "rsma1", "rsma2", @@ -51,7 +51,7 @@ void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t pDFile->info.magic = TSDB_FILE_INIT_MAGIC; pDFile->info.fver = tsdbGetDFSVersion(ftype); - tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, TSDB_DIR_NAME[pRepo->level], fname); + tsdbGetFilename(REPO_ID(pRepo), fid, ver, ftype, TSDB_LEVEL_DNAME[pRepo->level], fname); tfsInitFile(REPO_TFS(pRepo), &(pDFile->f), did, fname); } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 0e38deb17d..362e31ff61 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -175,7 +175,8 @@ static int32_t tsdbInsertRSmaDataImpl(STsdb *pTsdb, const char *msg); static FORCE_INLINE int32_t tsdbUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); static FORCE_INLINE int32_t tsdbUpdateTbUidListImpl(STsdb *pTsdb, tb_uid_t *suid, SArray *tbUids); static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, int32_t inputType, - qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, int8_t level); + qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, + int8_t level); // mgmt interface static int32_t tsdbDropTSmaDataImpl(STsdb *pTsdb, int64_t indexUid); @@ -1976,7 +1977,7 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { if (!pBlock) break; tsdbUidStorePut(pStore, msgIter.suid, NULL); - pStore->uid = msgIter.uid; // TODO: remove, just for debugging + pStore->uid = msgIter.uid; // TODO: remove, just for debugging } if (terrno != TSDB_CODE_SUCCESS) return -1; @@ -1984,8 +1985,8 @@ static int32_t tsdbFetchSubmitReqSuids(SSubmitReq *pMsg, STbUidStore *pStore) { } static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, int32_t inputType, - qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, - tb_uid_t uid, int8_t level) { + qTaskInfo_t *taskInfo, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, + int8_t level) { SArray *pResult = NULL; tsdbDebug("vgId:%d execute rsma %" PRIi8 " task for qTaskInfo:%p suid:%" PRIu64, REPO_ID(pTsdb), level, taskInfo, suid); @@ -2013,10 +2014,18 @@ static FORCE_INLINE int32_t tsdbExecuteRSmaImpl(STsdb *pTsdb, const void *pMsg, if (taosArrayGetSize(pResult) > 0) { blockDebugShowData(pResult); - STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pTsdb->pVnode->pRSma1 : pTsdb->pVnode->pRSma2); + STsdb *sinkTsdb = (level == TSDB_RETENTION_L1 ? pTsdb->pVnode->pRSma1 : pTsdb->pVnode->pRSma2); SSubmitReq *pReq = NULL; - buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, TD_VID(pTsdb->pVnode),uid, suid); - tsdbProcessSubmitReq(sinkTsdb, INT64_MAX, pReq); + if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, TD_VID(pTsdb->pVnode), uid, suid) != 0) { + taosArrayDestroy(pResult); + return TSDB_CODE_FAILED; + } + if (tsdbProcessSubmitReq(sinkTsdb, INT64_MAX, pReq) != 0) { + taosArrayDestroy(pResult); + taosMemoryFreeClear(pReq); + return TSDB_CODE_FAILED; + } + taosMemoryFreeClear(pReq); } else { tsdbWarn("vgId:%d no rsma % " PRIi8 " data generated since %s", REPO_ID(pTsdb), level, tstrerror(terrno)); } @@ -2033,7 +2042,7 @@ static int32_t tsdbExecuteRSma(STsdb *pTsdb, const void *pMsg, int32_t inputType return TSDB_CODE_SUCCESS; } - ASSERT(uid != 0); // TODO: remove later + ASSERT(uid != 0); // TODO: remove later SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaInfo *pRSmaInfo = NULL;