diff --git a/include/util/tutil.h b/include/util/tutil.h index a0c7b3d7ad..32a88b37ec 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -69,7 +69,18 @@ static FORCE_INLINE void taosEncryptPass_c(uint8_t *inBuf, size_t len, char *tar memcpy(target, buf, TSDB_PASSWORD_LEN); } -#define taosGetTbHashVal(tbname, tblen, method, prefix, suffix) MurmurHash3_32((tbname), (tblen)) +static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, int32_t method, int32_t prefix, + int32_t suffix) { + if (prefix == 0 && suffix == 0) { + return MurmurHash3_32(tbname, tblen); + } else { + if (tblen <= (prefix + suffix)) { + return MurmurHash3_32(tbname, tblen); + } else { + return MurmurHash3_32(tbname + prefix, tblen - prefix - suffix); + } + } +} #ifdef __cplusplus } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 61fc530f57..f77320c5ac 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -103,6 +103,8 @@ static const SSysDbTableSchema userDBSchema[] = { {.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = true}, {.name = "wal_segment_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT, .sysInfo = true}, {.name = "sst_trigger", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, + {.name = "table_prefix", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, + {.name = "table_suffix", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT, .sysInfo = true}, }; static const SSysDbTableSchema userFuncSchema[] = { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 84bca239d2..9e58b74017 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -513,6 +513,12 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, return -1; } + if (dbObj.cfg.hashPrefix > 0) { + int32_t dbLen = strlen(dbObj.name) + 1; + mInfo("db:%s, hashPrefix adjust from %d to %d", dbObj.name, dbObj.cfg.hashPrefix, dbObj.cfg.hashPrefix + dbLen); + dbObj.cfg.hashPrefix += dbLen; + } + SVgObj *pVgroups = NULL; if (mndAllocVgroup(pMnode, &dbObj, &pVgroups) != 0) { mError("db:%s, failed to create since %s", pCreate->db, terrstr()); @@ -1710,6 +1716,16 @@ static void mndDumpDbInfoData(SMnode *pMnode, SSDataBlock *pBlock, SDbObj *pDb, pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.sstTrigger, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + int16_t hashPrefix = pDb->cfg.hashPrefix; + if (hashPrefix > 0) { + hashPrefix = pDb->cfg.hashPrefix - strlen(pDb->name) - 1; + } + colDataAppend(pColInfo, rows, (const char *)&hashPrefix, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.hashSuffix, false); } taosMemoryFree(buf); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 7eb907c5a6..98282605d0 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -82,6 +82,8 @@ typedef struct SLDataIter SLDataIter; #define TSDBKEY_MIN ((TSDBKEY){.ts = TSKEY_MIN, .version = VERSION_MIN}) #define TSDBKEY_MAX ((TSDBKEY){.ts = TSKEY_MAX, .version = VERSION_MAX}) +#define TABLE_SAME_SCHEMA(SUID1, UID1, SUID2, UID2) ((SUID1) ? (SUID1) == (SUID2) : (UID1) == (UID2)) + #define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM)) #define LOGIC_TO_FILE_OFFSET(LOFFSET, PAGE) \ ((LOFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (LOFFSET) % PAGE_CONTENT_SIZE(PAGE)) @@ -260,7 +262,7 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx); -int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx); +int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *pBlockIdx); int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int8_t cmprAlg, int8_t toLast); @@ -270,7 +272,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo); int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); -int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); +int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk); int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk); int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg); int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData); @@ -660,6 +662,12 @@ typedef struct SMergeTree { const char *idStr; } SMergeTree; +typedef struct { + int64_t suid; + int64_t uid; + STSchema *pTSchema; +} SSkmInfo; + int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange, void *pLoadInfo, const char *idStr); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 62aba649a3..0e85e7bfb6 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -104,7 +104,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); -int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t *tbUid); +int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 3ddba8e656..8da783a5bd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -589,7 +589,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow) { } tMapDataReset(&state->blockMap); - code = tsdbReadBlock(state->pDataFReader, state->pBlockIdx, &state->blockMap); + code = tsdbReadDataBlk(state->pDataFReader, state->pBlockIdx, &state->blockMap); if (code) goto _err; state->nBlock = state->blockMap.nItem; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 4b5df2d7a0..a619b9f2e4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -14,13 +14,8 @@ */ #include "tsdb.h" -typedef struct { - int64_t suid; - int64_t uid; - STSchema *pTSchema; -} SSkmInfo; -typedef enum { MEMORY_DATA_ITER = 0, LAST_DATA_ITER } EDataIterT; +typedef enum { MEMORY_DATA_ITER = 0, STT_DATA_ITER } EDataIterT; typedef struct { SRBTreeNode n; @@ -99,7 +94,7 @@ static int32_t tsdbCommitCache(SCommitter *pCommitter); static int32_t tsdbEndCommit(SCommitter *pCommitter, int32_t eno); static int32_t tsdbNextCommitRow(SCommitter *pCommitter); -static int32_t tRowInfoCmprFn(const void *p1, const void *p2) { +int32_t tRowInfoCmprFn(const void *p1, const void *p2) { SRowInfo *pInfo1 = (SRowInfo *)p1; SRowInfo *pInfo2 = (SRowInfo *)p2; @@ -325,22 +320,22 @@ _err: return code; } -static int32_t tsdbCommitterUpdateTableSchema(SCommitter *pCommitter, int64_t suid, int64_t uid) { +int32_t tsdbUpdateTableSchema(SMeta *pMeta, int64_t suid, int64_t uid, SSkmInfo *pSkmInfo) { int32_t code = 0; if (suid) { - if (pCommitter->skmTable.suid == suid) { - pCommitter->skmTable.uid = uid; + if (pSkmInfo->suid == suid) { + pSkmInfo->uid = uid; goto _exit; } } else { - if (pCommitter->skmTable.uid == uid) goto _exit; + if (pSkmInfo->uid == uid) goto _exit; } - pCommitter->skmTable.suid = suid; - pCommitter->skmTable.uid = uid; - tTSchemaDestroy(pCommitter->skmTable.pTSchema); - code = metaGetTbTSchemaEx(pCommitter->pTsdb->pVnode->pMeta, suid, uid, -1, &pCommitter->skmTable.pTSchema); + pSkmInfo->suid = suid; + pSkmInfo->uid = uid; + tTSchemaDestroy(pSkmInfo->pTSchema); + code = metaGetTbTSchemaEx(pMeta, suid, uid, -1, &pSkmInfo->pTSchema); if (code) goto _exit; _exit: @@ -382,7 +377,7 @@ static int32_t tsdbCommitterNextTableData(SCommitter *pCommitter) { pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, pCommitter->dReader.iBlockIdx); - code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); + code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); if (code) goto _exit; ASSERT(pCommitter->dReader.mBlock.nItem > 0); @@ -432,7 +427,7 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { int8_t iIter = 0; for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) { pIter = &pCommitter->aDataIter[iIter]; - pIter->type = LAST_DATA_ITER; + pIter->type = STT_DATA_ITER; pIter->iStt = iStt; code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk); @@ -498,7 +493,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { pCommitter->dReader.iBlockIdx = 0; if (taosArrayGetSize(pCommitter->dReader.aBlockIdx) > 0) { pCommitter->dReader.pBlockIdx = (SBlockIdx *)taosArrayGet(pCommitter->dReader.aBlockIdx, 0); - code = tsdbReadBlock(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); + code = tsdbReadDataBlk(pCommitter->dReader.pReader, pCommitter->dReader.pBlockIdx, &pCommitter->dReader.mBlock); if (code) goto _err; } else { pCommitter->dReader.pBlockIdx = NULL; @@ -556,46 +551,45 @@ _err: return code; } -static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { - int32_t code = 0; - SBlockData *pBlockData = &pCommitter->dWriter.bData; - SDataBlk block; +int32_t tsdbWriteDataBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SMapData *mDataBlk, int8_t cmprAlg) { + int32_t code = 0; - ASSERT(pBlockData->nRow > 0); + if (pBlockData->nRow == 0) return code; - tDataBlkReset(&block); + SDataBlk dataBlk; + tDataBlkReset(&dataBlk); // info - block.nRow += pBlockData->nRow; + dataBlk.nRow += pBlockData->nRow; for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { TSDBKEY key = {.ts = pBlockData->aTSKEY[iRow], .version = pBlockData->aVersion[iRow]}; if (iRow == 0) { - if (tsdbKeyCmprFn(&block.minKey, &key) > 0) { - block.minKey = key; + if (tsdbKeyCmprFn(&dataBlk.minKey, &key) > 0) { + dataBlk.minKey = key; } } else { if (pBlockData->aTSKEY[iRow] == pBlockData->aTSKEY[iRow - 1]) { - block.hasDup = 1; + dataBlk.hasDup = 1; } } - if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&block.maxKey, &key) < 0) { - block.maxKey = key; + if (iRow == pBlockData->nRow - 1 && tsdbKeyCmprFn(&dataBlk.maxKey, &key) < 0) { + dataBlk.maxKey = key; } - block.minVer = TMIN(block.minVer, key.version); - block.maxVer = TMAX(block.maxVer, key.version); + dataBlk.minVer = TMIN(dataBlk.minVer, key.version); + dataBlk.maxVer = TMAX(dataBlk.maxVer, key.version); } // write - block.nSubBlock++; - code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &block.aSubBlock[block.nSubBlock - 1], - ((block.nSubBlock == 1) && !block.hasDup) ? &block.smaInfo : NULL, pCommitter->cmprAlg, 0); + dataBlk.nSubBlock++; + code = tsdbWriteBlockData(pWriter, pBlockData, &dataBlk.aSubBlock[dataBlk.nSubBlock - 1], + ((dataBlk.nSubBlock == 1) && !dataBlk.hasDup) ? &dataBlk.smaInfo : NULL, cmprAlg, 0); if (code) goto _err; // put SDataBlk - code = tMapDataPutItem(&pCommitter->dWriter.mBlock, &block, tPutDataBlk); + code = tMapDataPutItem(mDataBlk, &dataBlk, tPutDataBlk); if (code) goto _err; // clear @@ -604,39 +598,38 @@ static int32_t tsdbCommitDataBlock(SCommitter *pCommitter) { return code; _err: - tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb commit data block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } -static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { - int32_t code = 0; - SSttBlk blockL; - SBlockData *pBlockData = &pCommitter->dWriter.bDatal; +int32_t tsdbWriteSttBlock(SDataFWriter *pWriter, SBlockData *pBlockData, SArray *aSttBlk, int8_t cmprAlg) { + int32_t code = 0; + SSttBlk sstBlk; - ASSERT(pBlockData->nRow > 0); + if (pBlockData->nRow == 0) return code; // info - blockL.suid = pBlockData->suid; - blockL.nRow = pBlockData->nRow; - blockL.minKey = TSKEY_MAX; - blockL.maxKey = TSKEY_MIN; - blockL.minVer = VERSION_MAX; - blockL.maxVer = VERSION_MIN; + sstBlk.suid = pBlockData->suid; + sstBlk.nRow = pBlockData->nRow; + sstBlk.minKey = TSKEY_MAX; + sstBlk.maxKey = TSKEY_MIN; + sstBlk.minVer = VERSION_MAX; + sstBlk.maxVer = VERSION_MIN; for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { - blockL.minKey = TMIN(blockL.minKey, pBlockData->aTSKEY[iRow]); - blockL.maxKey = TMAX(blockL.maxKey, pBlockData->aTSKEY[iRow]); - blockL.minVer = TMIN(blockL.minVer, pBlockData->aVersion[iRow]); - blockL.maxVer = TMAX(blockL.maxVer, pBlockData->aVersion[iRow]); + sstBlk.minKey = TMIN(sstBlk.minKey, pBlockData->aTSKEY[iRow]); + sstBlk.maxKey = TMAX(sstBlk.maxKey, pBlockData->aTSKEY[iRow]); + sstBlk.minVer = TMIN(sstBlk.minVer, pBlockData->aVersion[iRow]); + sstBlk.maxVer = TMAX(sstBlk.maxVer, pBlockData->aVersion[iRow]); } - blockL.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0]; - blockL.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; + sstBlk.minUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[0]; + sstBlk.maxUid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[pBlockData->nRow - 1]; // write - code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1); + code = tsdbWriteBlockData(pWriter, pBlockData, &sstBlk.bInfo, NULL, cmprAlg, 1); if (code) goto _err; // push SSttBlk - if (taosArrayPush(pCommitter->dWriter.aSttBlk, &blockL) == NULL) { + if (taosArrayPush(aSttBlk, &sstBlk) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -647,7 +640,7 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { return code; _err: - tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb commit last block failed since %s", TD_VID(pWriter->pTsdb->pVnode), tstrerror(code)); return code; } @@ -692,7 +685,7 @@ static int32_t tsdbMoveCommitData(SCommitter *pCommitter, TABLEID toTable) { while (pCommitter->dReader.pBlockIdx && tTABLEIDCmprFn(pCommitter->dReader.pBlockIdx, &toTable) < 0) { SBlockIdx blockIdx = *pCommitter->dReader.pBlockIdx; - code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx); + code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dReader.mBlock, &blockIdx); if (code) goto _err; if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { @@ -1046,7 +1039,7 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { break; } } - } else if (pCommitter->pIter->type == LAST_DATA_ITER) { // last file + } else if (pCommitter->pIter->type == STT_DATA_ITER) { // last file pIter->iRow++; if (pIter->iRow < pIter->bData.nRow) { pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; @@ -1124,15 +1117,14 @@ static int32_t tsdbCommitAheadBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) } if (pBlockData->nRow >= pCommitter->maxRow) { - code = tsdbCommitDataBlock(pCommitter); + code = + tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); if (code) goto _err; } } - if (pBlockData->nRow) { - code = tsdbCommitDataBlock(pCommitter); - if (code) goto _err; - } + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBlockData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); + if (code) goto _err; return code; @@ -1193,7 +1185,7 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) } if (pBDataW->nRow >= pCommitter->maxRow) { - code = tsdbCommitDataBlock(pCommitter); + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); if (code) goto _err; } } @@ -1210,15 +1202,13 @@ static int32_t tsdbCommitMergeBlock(SCommitter *pCommitter, SDataBlk *pDataBlk) } if (pBDataW->nRow >= pCommitter->maxRow) { - code = tsdbCommitDataBlock(pCommitter); + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); if (code) goto _err; } } - if (pBDataW->nRow) { - code = tsdbCommitDataBlock(pCommitter); - if (code) goto _err; - } + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBDataW, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); + if (code) goto _err; return code; @@ -1306,10 +1296,8 @@ static int32_t tsdbInitLastBlockIfNeed(SCommitter *pCommitter, TABLEID id) { SBlockData *pBDatal = &pCommitter->dWriter.bDatal; if (pBDatal->suid || pBDatal->uid) { if ((pBDatal->suid != id.suid) || (id.suid == 0)) { - if (pBDatal->nRow) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _exit; - } + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); + if (code) goto _exit; tBlockDataReset(pBDatal); } } @@ -1341,7 +1329,7 @@ static int32_t tsdbAppendLastBlock(SCommitter *pCommitter) { if (code) goto _err; if (pBDatal->nRow >= pCommitter->maxRow) { - code = tsdbCommitLastBlock(pCommitter); + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBDatal, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); if (code) goto _err; } } @@ -1393,10 +1381,11 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { if (pBData->nRow >= pCommitter->maxRow) { if (pCommitter->toLastOnly) { - code = tsdbCommitLastBlock(pCommitter); + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, pBData, pCommitter->dWriter.aSttBlk, pCommitter->cmprAlg); if (code) goto _err; } else { - code = tsdbCommitDataBlock(pCommitter); + code = + tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); if (code) goto _err; } } @@ -1404,7 +1393,7 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, TABLEID id) { if (!pCommitter->toLastOnly && pBData->nRow) { if (pBData->nRow > pCommitter->minRow) { - code = tsdbCommitDataBlock(pCommitter); + code = tsdbWriteDataBlock(pCommitter->dWriter.pWriter, pBData, &pCommitter->dWriter.mBlock, pCommitter->cmprAlg); if (code) goto _err; } else { code = tsdbAppendLastBlock(pCommitter); @@ -1437,7 +1426,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { tMapDataReset(&pCommitter->dWriter.mBlock); // impl - code = tsdbCommitterUpdateTableSchema(pCommitter, id.suid, id.uid); + code = tsdbUpdateTableSchema(pCommitter->pTsdb->pVnode->pMeta, id.suid, id.uid, &pCommitter->skmTable); if (code) goto _err; code = tBlockDataInit(&pCommitter->dReader.bData, id.suid, id.uid, pCommitter->skmTable.pTSchema); if (code) goto _err; @@ -1455,7 +1444,7 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { // end if (pCommitter->dWriter.mBlock.nItem > 0) { SBlockIdx blockIdx = {.suid = id.suid, .uid = id.uid}; - code = tsdbWriteBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); + code = tsdbWriteDataBlk(pCommitter->dWriter.pWriter, &pCommitter->dWriter.mBlock, &blockIdx); if (code) goto _err; if (taosArrayPush(pCommitter->dWriter.aBlockIdx, &blockIdx) == NULL) { @@ -1470,10 +1459,9 @@ static int32_t tsdbCommitFileDataImpl(SCommitter *pCommitter) { code = tsdbMoveCommitData(pCommitter, id); if (code) goto _err; - if (pCommitter->dWriter.bDatal.nRow > 0) { - code = tsdbCommitLastBlock(pCommitter); - if (code) goto _err; - } + code = tsdbWriteSttBlock(pCommitter->dWriter.pWriter, &pCommitter->dWriter.bDatal, pCommitter->dWriter.aSttBlk, + pCommitter->cmprAlg); + if (code) goto _err; return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 997f903045..9956339093 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -16,7 +16,7 @@ #include "osDef.h" #include "tsdb.h" -#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) +#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) typedef enum { EXTERNAL_ROWS_PREV = 0x1, @@ -83,11 +83,11 @@ typedef struct SBlockLoadSuppInfo { } SBlockLoadSuppInfo; typedef struct SLastBlockReader { - STimeWindow window; - SVersionRange verRange; - int32_t order; - uint64_t uid; - SMergeTree mergeTree; + STimeWindow window; + SVersionRange verRange; + int32_t order; + uint64_t uid; + SMergeTree mergeTree; SSttBlockLoadInfo* pInfo; } SLastBlockReader; @@ -231,10 +231,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { int64_t skey = pTsdbReader->window.skey; - info.lastKey = (skey > INT64_MIN)? (skey - 1):skey; + info.lastKey = (skey > INT64_MIN) ? (skey - 1) : skey; } else { int64_t ekey = pTsdbReader->window.ekey; - info.lastKey = (ekey < INT64_MAX)? (ekey + 1):ekey; + info.lastKey = (ekey < INT64_MAX) ? (ekey + 1) : ekey; } taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); @@ -601,7 +601,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN STableBlockScanInfo* pScanInfo = taosHashGet(pReader->status.pTableMap, &pBlockIdx->uid, sizeof(int64_t)); tMapDataReset(&pScanInfo->mapData); - tsdbReadBlock(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); + tsdbReadDataBlk(pReader->pFileReader, pBlockIdx, &pScanInfo->mapData); sizeInDisk += pScanInfo->mapData.nData; for (int32_t j = 0; j < pScanInfo->mapData.nItem; ++j) { @@ -1933,7 +1933,7 @@ static bool initLastBlockReader(SLastBlockReader* pLBlockReader, STableBlockScan initMemDataIterator(pScanInfo, pReader); pLBlockReader->uid = pScanInfo->uid; - int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order)? 1:-1; + int32_t step = ASCENDING_TRAVERSE(pLBlockReader->order) ? 1 : -1; STimeWindow w = pLBlockReader->window; if (ASCENDING_TRAVERSE(pLBlockReader->order)) { w.skey = pScanInfo->lastKey + step; @@ -3621,7 +3621,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); resetDataBlockIterator(&pReader->status.blockIter, pReader->order); - int64_t ts = ASCENDING_TRAVERSE(pReader->order)?pReader->window.skey-1:pReader->window.ekey+1; + int64_t ts = ASCENDING_TRAVERSE(pReader->order) ? pReader->window.skey - 1 : pReader->window.ekey + 1; resetDataBlockScanInfo(pReader->status.pTableMap, ts); int32_t code = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index b171d52dfc..2518811d94 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -418,21 +418,21 @@ _err: return code; } -int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBlockIdx) { +int32_t tsdbWriteDataBlk(SDataFWriter *pWriter, SMapData *mDataBlk, SBlockIdx *pBlockIdx) { int32_t code = 0; SHeadFile *pHeadFile = &pWriter->fHead; int64_t size; int64_t n; - ASSERT(mBlock->nItem > 0); + ASSERT(mDataBlk->nItem > 0); // alloc - size = tPutMapData(NULL, mBlock); + size = tPutMapData(NULL, mDataBlk); code = tRealloc(&pWriter->aBuf[0], size); if (code) goto _err; // build - n = tPutMapData(pWriter->aBuf[0], mBlock); + n = tPutMapData(pWriter->aBuf[0], mDataBlk); // write code = tsdbWriteFile(pWriter->pHeadFD, pHeadFile->size, pWriter->aBuf[0], size); @@ -446,7 +446,7 @@ int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *mBlock, SBlockIdx *pBloc tsdbTrace("vgId:%d, write block, file ID:%d commit ID:%d suid:%" PRId64 " uid:%" PRId64 " offset:%" PRId64 " size:%" PRId64 " nItem:%d", TD_VID(pWriter->pTsdb->pVnode), pWriter->wSet.fid, pHeadFile->commitID, pBlockIdx->suid, pBlockIdx->uid, - pBlockIdx->offset, pBlockIdx->size, mBlock->nItem); + pBlockIdx->offset, pBlockIdx->size, mDataBlk->nItem); return code; _err: @@ -872,7 +872,7 @@ _err: return code; } -int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBlock) { +int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk) { int32_t code = 0; int64_t offset = pBlockIdx->offset; int64_t size = pBlockIdx->size; @@ -886,7 +886,7 @@ int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mBl if (code) goto _err; // decode - int64_t n = tGetMapData(pReader->aBuf[0], mBlock); + int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk); if (n < 0) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -1053,6 +1053,29 @@ _err: return code; } +int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { + int32_t code = 0; + SBlockInfo *pBlockInfo = &pDataBlk->aSubBlock[0]; + + // alloc + code = tRealloc(&pReader->aBuf[0], pBlockInfo->szBlock); + if (code) goto _err; + + // read + code = tsdbReadFile(pReader->pDataFD, pBlockInfo->offset, pReader->aBuf[0], pBlockInfo->szBlock); + if (code) goto _err; + + // decmpr + code = tDecmprBlockData(pReader->aBuf[0], pBlockInfo->szBlock, pBlockData, &pReader->aBuf[1]); + if (code) goto _err; + + return code; + +_err: + tsdbError("vgId:%d tsdb read data block ex failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + return code; +} + int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { int32_t code = 0; @@ -1147,8 +1170,8 @@ int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb pDelFWriter->fDel = *pFile; tsdbDelFileName(pTsdb, pFile, fname); - code = - tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE, &pDelFWriter->pWriteH); + int32_t flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE; + code = tsdbOpenFile(fname, TSDB_DEFAULT_PAGE_SIZE, flag, &pDelFWriter->pWriteH); if (code) goto _err; // update header diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 9fc5639c5e..502d227121 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -16,6 +16,29 @@ #include "tsdb.h" // STsdbSnapReader ======================================== +typedef enum { SNAP_DATA_FILE_ITER = 0, SNAP_STT_FILE_ITER } EFIterT; +typedef struct { + SRBTreeNode n; + SRowInfo rInfo; + EFIterT type; + union { + struct { + SArray* aBlockIdx; + int32_t iBlockIdx; + SBlockIdx* pBlockIdx; + SMapData mBlock; + int32_t iBlock; + }; // .data file + struct { + int32_t iStt; + SArray* aSttBlk; + int32_t iSttBlk; + }; // .stt file + }; + SBlockData bData; + int32_t iRow; +} SFDataIter; + struct STsdbSnapReader { STsdb* pTsdb; int64_t sver; @@ -26,146 +49,301 @@ struct STsdbSnapReader { int8_t dataDone; int32_t fid; SDataFReader* pDataFReader; - SArray* aBlockIdx; // SArray - SArray* aSstBlk; // SArray - SBlockIdx* pBlockIdx; - SSttBlk* pSstBlk; - - int32_t iBlockIdx; - int32_t iBlockL; - SMapData mBlock; // SMapData - int32_t iBlock; - SBlockData oBlockData; - SBlockData nBlockData; + SFDataIter* pIter; + SRBTree rbt; + SFDataIter aFDataIter[TSDB_MAX_STT_FILE + 1]; + SBlockData bData; + SSkmInfo skmTable; // for del file int8_t delDone; SDelFReader* pDelFReader; SArray* aDelIdx; // SArray int32_t iDelIdx; SArray* aDelData; // SArray + uint8_t* aBuf[5]; }; +extern int32_t tRowInfoCmprFn(const void* p1, const void* p2); +extern int32_t tsdbReadDataBlockEx(SDataFReader* pReader, SDataBlk* pDataBlk, SBlockData* pBlockData); +extern int32_t tsdbUpdateTableSchema(SMeta* pMeta, int64_t suid, int64_t uid, SSkmInfo* pSkmInfo); + +static int32_t tsdbSnapReadOpenFile(STsdbSnapReader* pReader) { + int32_t code = 0; + + SDFileSet dFileSet = {.fid = pReader->fid}; + SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT); + if (pSet == NULL) return code; + + pReader->fid = pSet->fid; + code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet); + if (code) goto _err; + + pReader->pIter = NULL; + tRBTreeCreate(&pReader->rbt, tRowInfoCmprFn); + + // .data file + SFDataIter* pIter = &pReader->aFDataIter[0]; + pIter->type = SNAP_DATA_FILE_ITER; + + code = tsdbReadBlockIdx(pReader->pDataFReader, pIter->aBlockIdx); + if (code) goto _err; + + for (pIter->iBlockIdx = 0; pIter->iBlockIdx < taosArrayGetSize(pIter->aBlockIdx); pIter->iBlockIdx++) { + pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx); + + code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock); + if (code) goto _err; + + for (pIter->iBlock = 0; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) { + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk); + + if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue; + + code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData); + if (code) goto _err; + + ASSERT(pIter->pBlockIdx->suid == pIter->bData.suid); + ASSERT(pIter->pBlockIdx->uid == pIter->bData.uid); + + for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) { + int64_t rowVer = pIter->bData.aVersion[pIter->iRow]; + + if (rowVer >= pReader->sver && rowVer <= pReader->ever) { + pIter->rInfo.suid = pIter->pBlockIdx->suid; + pIter->rInfo.uid = pIter->pBlockIdx->uid; + pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); + goto _add_iter_and_break; + } + } + } + + continue; + + _add_iter_and_break: + tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter); + break; + } + + // .stt file + pIter = &pReader->aFDataIter[1]; + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + pIter->type = SNAP_STT_FILE_ITER; + pIter->iStt = iStt; + + code = tsdbReadSttBlk(pReader->pDataFReader, iStt, pIter->aSttBlk); + if (code) goto _err; + + for (pIter->iSttBlk = 0; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) { + SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); + + if (pSttBlk->minVer > pReader->ever) continue; + if (pSttBlk->maxVer < pReader->sver) continue; + + code = tsdbReadSttBlock(pReader->pDataFReader, iStt, pSttBlk, &pIter->bData); + if (code) goto _err; + + for (pIter->iRow = 0; pIter->iRow < pIter->bData.nRow; pIter->iRow++) { + int64_t rowVer = pIter->bData.aVersion[pIter->iRow]; + + if (rowVer >= pReader->sver && rowVer <= pReader->ever) { + pIter->rInfo.suid = pIter->bData.suid; + pIter->rInfo.uid = pIter->bData.uid; + pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); + goto _add_iter; + } + } + } + + continue; + + _add_iter: + tRBTreePut(&pReader->rbt, (SRBTreeNode*)pIter); + pIter++; + } + + tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pReader->pTsdb->pVnode), + pReader->pTsdb->path, pReader->fid); + return code; + +_err: + tsdbError("vgId:%d vnode snapshot tsdb snap read open file failed since %s", TD_VID(pReader->pTsdb->pVnode), + tstrerror(code)); + return code; +} + +static SRowInfo* tsdbSnapGetRow(STsdbSnapReader* pReader) { return pReader->pIter ? &pReader->pIter->rInfo : NULL; } + +static int32_t tsdbSnapNextRow(STsdbSnapReader* pReader) { + int32_t code = 0; + + if (pReader->pIter) { + SFDataIter* pIter = pReader->pIter; + + while (true) { + _find_row: + for (pIter->iRow++; pIter->iRow < pIter->bData.nRow; pIter->iRow++) { + int64_t rowVer = pIter->bData.aVersion[pIter->iRow]; + + if (rowVer >= pReader->sver && rowVer <= pReader->ever) { + pIter->rInfo.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; + pIter->rInfo.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); + goto _out; + } + } + + if (pIter->type == SNAP_DATA_FILE_ITER) { + while (true) { + for (pIter->iBlock++; pIter->iBlock < pIter->mBlock.nItem; pIter->iBlock++) { + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pIter->mBlock, pIter->iBlock, &dataBlk, tGetDataBlk); + + if (dataBlk.minVer > pReader->ever || dataBlk.maxVer < pReader->sver) continue; + + code = tsdbReadDataBlockEx(pReader->pDataFReader, &dataBlk, &pIter->bData); + if (code) goto _err; + + pIter->iRow = -1; + goto _find_row; + } + + pIter->iBlockIdx++; + if (pIter->iBlockIdx >= taosArrayGetSize(pIter->aBlockIdx)) break; + + pIter->pBlockIdx = (SBlockIdx*)taosArrayGet(pIter->aBlockIdx, pIter->iBlockIdx); + code = tsdbReadDataBlk(pReader->pDataFReader, pIter->pBlockIdx, &pIter->mBlock); + if (code) goto _err; + pIter->iBlock = -1; + } + + pReader->pIter = NULL; + } else if (pIter->type == SNAP_STT_FILE_ITER) { + for (pIter->iSttBlk++; pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk); pIter->iSttBlk++) { + SSttBlk* pSttBlk = (SSttBlk*)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); + + if (pSttBlk->minVer > pReader->ever || pSttBlk->maxVer < pReader->sver) continue; + + code = tsdbReadSttBlock(pReader->pDataFReader, pIter->iStt, pSttBlk, &pIter->bData); + if (code) goto _err; + + pIter->iRow = -1; + goto _find_row; + } + + pReader->pIter = NULL; + } else { + ASSERT(0); + } + } + + _out: + pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt); + if (pReader->pIter && pIter) { + int32_t c = tRowInfoCmprFn(&pReader->pIter->rInfo, &pIter->rInfo); + if (c > 0) { + tRBTreePut(&pReader->rbt, (SRBTreeNode*)pReader->pIter); + pReader->pIter = NULL; + } else { + ASSERT(c); + } + } + } + + if (pReader->pIter == NULL) { + pReader->pIter = (SFDataIter*)tRBTreeMin(&pReader->rbt); + if (pReader->pIter) { + tRBTreeDrop(&pReader->rbt, (SRBTreeNode*)pReader->pIter); + } + } + + return code; + +_err: + return code; +} + +static int32_t tsdbSnapCmprData(STsdbSnapReader* pReader, uint8_t** ppData) { + int32_t code = 0; + + ASSERT(pReader->bData.nRow); + + int32_t aBufN[5] = {0}; + code = tCmprBlockData(&pReader->bData, TWO_STAGE_COMP, NULL, NULL, pReader->aBuf, aBufN); + if (code) goto _exit; + + int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3]; + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + SSnapDataHdr* pHdr = (SSnapDataHdr*)*ppData; + pHdr->type = SNAP_DATA_TSDB; + pHdr->size = size; + + memcpy(pHdr->data, pReader->aBuf[3], aBufN[3]); + memcpy(pHdr->data + aBufN[3], pReader->aBuf[2], aBufN[2]); + if (aBufN[1]) { + memcpy(pHdr->data + aBufN[3] + aBufN[2], pReader->aBuf[1], aBufN[1]); + } + if (aBufN[0]) { + memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], pReader->aBuf[0], aBufN[0]); + } + +_exit: + return code; +} + static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { int32_t code = 0; STsdb* pTsdb = pReader->pTsdb; while (true) { if (pReader->pDataFReader == NULL) { - // next - SDFileSet dFileSet = {.fid = pReader->fid}; - SDFileSet* pSet = taosArraySearch(pReader->fs.aDFileSet, &dFileSet, tDFileSetCmprFn, TD_GT); - if (pSet == NULL) goto _exit; - pReader->fid = pSet->fid; - - // load - code = tsdbDataFReaderOpen(&pReader->pDataFReader, pTsdb, pSet); + code = tsdbSnapReadOpenFile(pReader); if (code) goto _err; - - code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx); - if (code) goto _err; - - code = tsdbReadSttBlk(pReader->pDataFReader, 0, pReader->aSstBlk); - if (code) goto _err; - - // init - pReader->iBlockIdx = 0; - if (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) { - pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx); - - code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock); - if (code) goto _err; - - pReader->iBlock = 0; - } else { - pReader->pBlockIdx = NULL; - } - - pReader->iBlockL = 0; - while (true) { - if (pReader->iBlockL >= taosArrayGetSize(pReader->aSstBlk)) { - pReader->pSstBlk = NULL; - break; - } - - pReader->pSstBlk = (SSttBlk*)taosArrayGet(pReader->aSstBlk, pReader->iBlockL); - if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) { - // TODO - break; - } - - pReader->iBlockL++; - } - - tsdbInfo("vgId:%d, vnode snapshot tsdb open data file to read for %s, fid:%d", TD_VID(pTsdb->pVnode), pTsdb->path, - pReader->fid); } - while (true) { - if (pReader->pBlockIdx && pReader->pSstBlk) { - TABLEID id = {.suid = pReader->pSstBlk->suid, .uid = pReader->pSstBlk->minUid}; + if (pReader->pDataFReader == NULL) break; - ASSERT(0); + SRowInfo* pRowInfo = tsdbSnapGetRow(pReader); + if (pRowInfo == NULL) { + tsdbDataFReaderClose(&pReader->pDataFReader); + continue; + } - // if (tTABLEIDCmprFn(pReader->pBlockIdx, &minId) < 0) { - // // TODO - // } else if (tTABLEIDCmprFn(pReader->pBlockIdx, &maxId) < 0) { - // // TODO - // } else { - // // TODO - // } - } else if (pReader->pBlockIdx) { - while (pReader->iBlock < pReader->mBlock.nItem) { - SDataBlk block; - tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, &block, tGetDataBlk); + TABLEID id = {.suid = pRowInfo->suid, .uid = pRowInfo->uid}; + SBlockData* pBlockData = &pReader->bData; - if (block.minVer <= pReader->ever && block.maxVer >= pReader->sver) { - // load data (todo) - } + code = tsdbUpdateTableSchema(pTsdb->pVnode->pMeta, id.suid, id.uid, &pReader->skmTable); + if (code) goto _err; - // next - pReader->iBlock++; - if (*ppData) break; - } + code = tBlockDataInit(pBlockData, id.suid, id.uid, pReader->skmTable.pTSchema); + if (code) goto _err; - if (pReader->iBlock >= pReader->mBlock.nItem) { - pReader->iBlockIdx++; - if (pReader->iBlockIdx < taosArrayGetSize(pReader->aBlockIdx)) { - pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx); + while (pRowInfo->suid == id.suid && pRowInfo->uid == id.uid) { + code = tBlockDataAppendRow(pBlockData, &pRowInfo->row, NULL, pRowInfo->uid); + if (code) goto _err; - code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock); - if (code) goto _err; + code = tsdbSnapNextRow(pReader); + if (code) goto _err; - pReader->iBlock = 0; - } else { - pReader->pBlockIdx = NULL; - } - } - - if (*ppData) goto _exit; - } else if (pReader->pSstBlk) { - while (pReader->pSstBlk) { - if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) { - // load data (todo) - } - - // next - pReader->iBlockL++; - if (pReader->iBlockL < taosArrayGetSize(pReader->aSstBlk)) { - pReader->pSstBlk = (SSttBlk*)taosArrayGetSize(pReader->aSstBlk); - } else { - pReader->pSstBlk = NULL; - } - - if (*ppData) goto _exit; - } - } else { + pRowInfo = tsdbSnapGetRow(pReader); + if (pRowInfo == NULL) { tsdbDataFReaderClose(&pReader->pDataFReader); break; } + + if (pBlockData->nRow >= 4096) break; } + + code = tsdbSnapCmprData(pReader, ppData); + if (code) goto _err; + + break; } -_exit: return code; _err: @@ -216,7 +394,6 @@ static int32_t tsdbSnapReadDel(STsdbSnapReader* pReader, uint8_t** ppData) { size += tPutDelData(NULL, pDelData); } } - if (size == 0) continue; // org data @@ -292,23 +469,33 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type goto _err; } + // data pReader->fid = INT32_MIN; - pReader->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pReader->aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; + for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) { + SFDataIter* pIter = &pReader->aFDataIter[iIter]; + + if (iIter == 0) { + pIter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pIter->aBlockIdx == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } else { + pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if (pIter->aSttBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + } + + code = tBlockDataCreate(&pIter->bData); + if (code) goto _err; } - pReader->aSstBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pReader->aSstBlk == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - pReader->mBlock = tMapDataInit(); - code = tBlockDataCreate(&pReader->oBlockData); - if (code) goto _err; - code = tBlockDataCreate(&pReader->nBlockData); + + code = tBlockDataCreate(&pReader->bData); if (code) goto _err; + // del pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); if (pReader->aDelIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -335,18 +522,26 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { int32_t code = 0; STsdbSnapReader* pReader = *ppReader; - if (pReader->pDataFReader) { - tsdbDataFReaderClose(&pReader->pDataFReader); - } - taosArrayDestroy(pReader->aSstBlk); - taosArrayDestroy(pReader->aBlockIdx); - tMapDataClear(&pReader->mBlock); - tBlockDataDestroy(&pReader->oBlockData, 1); - tBlockDataDestroy(&pReader->nBlockData, 1); + // data + if (pReader->pDataFReader) tsdbDataFReaderClose(&pReader->pDataFReader); + for (int32_t iIter = 0; iIter < sizeof(pReader->aFDataIter) / sizeof(pReader->aFDataIter[0]); iIter++) { + SFDataIter* pIter = &pReader->aFDataIter[iIter]; - if (pReader->pDelFReader) { - tsdbDelFReaderClose(&pReader->pDelFReader); + if (iIter == 0) { + taosArrayDestroy(pIter->aBlockIdx); + tMapDataClear(&pIter->mBlock); + } else { + taosArrayDestroy(pIter->aSttBlk); + } + + tBlockDataDestroy(&pIter->bData, 1); } + + tBlockDataDestroy(&pReader->bData, 1); + tTSchemaDestroy(pReader->skmTable.pTSchema); + + // del + if (pReader->pDelFReader) tsdbDelFReaderClose(&pReader->pDelFReader); taosArrayDestroy(pReader->aDelIdx); taosArrayDestroy(pReader->aDelData); @@ -354,6 +549,10 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { tsdbInfo("vgId:%d, vnode snapshot tsdb reader closed for %s", TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path); + for (int32_t iBuf = 0; iBuf < sizeof(pReader->aBuf) / sizeof(pReader->aBuf[0]); iBuf++) { + tFree(pReader->aBuf[iBuf]); + } + taosMemoryFree(pReader); *ppReader = NULL; return code; @@ -410,40 +609,37 @@ struct STsdbSnapWriter { STsdbFS fs; // config - int32_t minutes; - int8_t precision; - int32_t minRow; - int32_t maxRow; - int8_t cmprAlg; - int64_t commitID; - + int32_t minutes; + int8_t precision; + int32_t minRow; + int32_t maxRow; + int8_t cmprAlg; + int64_t commitID; uint8_t* aBuf[5]; + // for data file SBlockData bData; - - int32_t fid; - SDataFReader* pDataFReader; - SArray* aBlockIdx; // SArray - int32_t iBlockIdx; - SBlockIdx* pBlockIdx; - SMapData mBlock; // SMapData - int32_t iBlock; - SBlockData* pBlockData; - int32_t iRow; - SBlockData bDataR; - SArray* aSstBlk; // SArray - int32_t iBlockL; - SBlockData lDataR; - - SDataFWriter* pDataFWriter; - SBlockIdx* pBlockIdxW; // NULL when no committing table - SDataBlk blockW; - SBlockData bDataW; - SBlockIdx blockIdxW; - - SMapData mBlockW; // SMapData - SArray* aBlockIdxW; // SArray - SArray* aBlockLW; // SArray + int32_t fid; + TABLEID id; + SSkmInfo skmTable; + struct { + SDataFReader* pReader; + SArray* aBlockIdx; + int32_t iBlockIdx; + SBlockIdx* pBlockIdx; + SMapData mDataBlk; + int32_t iDataBlk; + SBlockData bData; + int32_t iRow; + } dReader; + struct { + SDataFWriter* pWriter; + SArray* aBlockIdx; + SMapData mDataBlk; + SArray* aSttBlk; + SBlockData bData; + SBlockData sData; + } dWriter; // for del file SDelFReader* pDelFReader; @@ -454,520 +650,447 @@ struct STsdbSnapWriter { SArray* aDelIdxW; }; +// SNAP_DATA_TSDB +extern int32_t tsdbWriteDataBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SMapData* mDataBlk, int8_t cmprAlg); +extern int32_t tsdbWriteSttBlock(SDataFWriter* pWriter, SBlockData* pBlockData, SArray* aSttBlk, int8_t cmprAlg); + +static int32_t tsdbSnapNextTableData(STsdbSnapWriter* pWriter) { + int32_t code = 0; + + ASSERT(pWriter->dReader.iRow >= pWriter->dReader.bData.nRow); + + if (pWriter->dReader.iBlockIdx < taosArrayGetSize(pWriter->dReader.aBlockIdx)) { + pWriter->dReader.pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->dReader.aBlockIdx, pWriter->dReader.iBlockIdx); + + code = tsdbReadDataBlk(pWriter->dReader.pReader, pWriter->dReader.pBlockIdx, &pWriter->dReader.mDataBlk); + if (code) goto _exit; + + pWriter->dReader.iBlockIdx++; + } else { + pWriter->dReader.pBlockIdx = NULL; + tMapDataReset(&pWriter->dReader.mDataBlk); + } + pWriter->dReader.iDataBlk = 0; // point to the next one + tBlockDataReset(&pWriter->dReader.bData); + pWriter->dReader.iRow = 0; + +_exit: + return code; +} + +static int32_t tsdbSnapWriteCopyData(STsdbSnapWriter* pWriter, TABLEID* pId) { + int32_t code = 0; + + while (true) { + if (pWriter->dReader.pBlockIdx == NULL) break; + if (tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, pId) >= 0) break; + + SBlockIdx blkIdx = *pWriter->dReader.pBlockIdx; + code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dReader.mDataBlk, &blkIdx); + if (code) goto _exit; + + if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blkIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + code = tsdbSnapNextTableData(pWriter); + if (code) goto _exit; + } + +_exit: + return code; +} + +static int32_t tsdbSnapWriteTableDataStart(STsdbSnapWriter* pWriter, TABLEID* pId) { + int32_t code = 0; + + code = tsdbSnapWriteCopyData(pWriter, pId); + if (code) goto _err; + + pWriter->id.suid = pId->suid; + pWriter->id.uid = pId->uid; + + code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pId->suid, pId->uid, &pWriter->skmTable); + if (code) goto _err; + + tMapDataReset(&pWriter->dWriter.mDataBlk); + code = tBlockDataInit(&pWriter->dWriter.bData, pId->suid, pId->uid, pWriter->skmTable.pTSchema); + if (code) goto _err; + + return code; + +_err: + tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); + return code; +} + static int32_t tsdbSnapWriteTableDataEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; - ASSERT(pWriter->pDataFWriter); + if (pWriter->id.suid == 0 && pWriter->id.uid == 0) return code; - if (pWriter->pBlockIdxW == NULL) goto _exit; + int32_t c = 1; + if (pWriter->dReader.pBlockIdx) { + c = tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &pWriter->id); + ASSERT(c >= 0); + } - // consume remain rows - if (pWriter->pBlockData) { - ASSERT(pWriter->iRow < pWriter->pBlockData->nRow); - while (pWriter->iRow < pWriter->pBlockData->nRow) { - code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), NULL, - 0); // todo + if (c == 0) { + SBlockData* pBData = &pWriter->dWriter.bData; + + for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) { + TSDBROW row = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow); + + code = tBlockDataAppendRow(pBData, &row, NULL, pWriter->id.uid); if (code) goto _err; - if (pWriter->bDataW.nRow >= pWriter->maxRow * 4 / 5) { - // pWriter->blockW.last = 0; - // code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - // &pWriter->blockW, pWriter->cmprAlg); + if (pBData->nRow >= pWriter->maxRow) { + code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg); if (code) goto _err; - - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); - if (code) goto _err; - - tDataBlkReset(&pWriter->blockW); - tBlockDataClear(&pWriter->bDataW); - } - - pWriter->iRow++; - } - } - - // write remain data if has - if (pWriter->bDataW.nRow > 0) { - // pWriter->blockW.last = 0; - if (pWriter->bDataW.nRow < pWriter->minRow) { - if (pWriter->iBlock > pWriter->mBlock.nItem) { - // pWriter->blockW.last = 1; } } - // code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - // &pWriter->blockW, pWriter->cmprAlg); - // if (code) goto _err; - - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); - if (code) goto _err; - } - - while (true) { - if (pWriter->iBlock >= pWriter->mBlock.nItem) break; - - SDataBlk block; - tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk); - - // if (block.last) { - // code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); - // if (code) goto _err; - - // tBlockReset(&block); - // block.last = 1; - // code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pWriter->pBlockIdxW, &block, - // pWriter->cmprAlg); - // if (code) goto _err; - // } - - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); + code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, pBData, &pWriter->dWriter.mDataBlk, pWriter->cmprAlg); if (code) goto _err; - pWriter->iBlock++; - } + for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) { + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk); - // SDataBlk - // code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, NULL, pWriter->pBlockIdxW); - // if (code) goto _err; - - // SBlockIdx - if (taosArrayPush(pWriter->aBlockIdxW, pWriter->pBlockIdxW) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - -_exit: - tsdbInfo("vgId:%d, tsdb snapshot write table data end for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); - return code; - -_err: - tsdbError("vgId:%d, tsdb snapshot write table data end for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), - pWriter->pTsdb->path, tstrerror(code)); - return code; -} - -static int32_t tsdbSnapMoveWriteTableData(STsdbSnapWriter* pWriter, SBlockIdx* pBlockIdx) { - int32_t code = 0; - - code = tsdbReadBlock(pWriter->pDataFReader, pBlockIdx, &pWriter->mBlock); - if (code) goto _err; - - // SBlockData - SDataBlk block; - tMapDataReset(&pWriter->mBlockW); - for (int32_t iBlock = 0; iBlock < pWriter->mBlock.nItem; iBlock++) { - tMapDataGetItemByIdx(&pWriter->mBlock, iBlock, &block, tGetDataBlk); - - // if (block.last) { - // code = tsdbReadBlockData(pWriter->pDataFReader, pBlockIdx, &block, &pWriter->bDataR, NULL, NULL); - // if (code) goto _err; - - // tBlockReset(&block); - // block.last = 1; - // code = - // tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataR, NULL, NULL, pBlockIdx, &block, - // pWriter->cmprAlg); - // if (code) goto _err; - // } - - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); - if (code) goto _err; - } - - // SDataBlk - SBlockIdx blockIdx = {.suid = pBlockIdx->suid, .uid = pBlockIdx->uid}; - code = tsdbWriteBlock(pWriter->pDataFWriter, &pWriter->mBlockW, &blockIdx); - if (code) goto _err; - - // SBlockIdx - if (taosArrayPush(pWriter->aBlockIdxW, &blockIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - -_exit: - return code; - -_err: - tsdbError("vgId:%d, tsdb snapshot move write table data for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), - pWriter->pTsdb->path, tstrerror(code)); - return code; -} - -static int32_t tsdbSnapWriteTableDataImpl(STsdbSnapWriter* pWriter) { - int32_t code = 0; - SBlockData* pBlockData = &pWriter->bData; - int32_t iRow = 0; - TSDBROW row; - TSDBROW* pRow = &row; - - // // correct schema - // code = tBlockDataCorrectSchema(&pWriter->bDataW, pBlockData); - // if (code) goto _err; - - // loop to merge - *pRow = tsdbRowFromBlockData(pBlockData, iRow); - while (true) { - if (pRow == NULL) break; - - if (pWriter->pBlockData) { - ASSERT(pWriter->iRow < pWriter->pBlockData->nRow); - - int32_t c = tsdbRowCmprFn(pRow, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow)); - - ASSERT(c); - - if (c < 0) { - // code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); - // if (code) goto _err; - - iRow++; - if (iRow < pWriter->pBlockData->nRow) { - *pRow = tsdbRowFromBlockData(pBlockData, iRow); - } else { - pRow = NULL; - } - } else if (c > 0) { - // code = tBlockDataAppendRow(&pWriter->bDataW, &tsdbRowFromBlockData(pWriter->pBlockData, pWriter->iRow), - // NULL); if (code) goto _err; - - pWriter->iRow++; - if (pWriter->iRow >= pWriter->pBlockData->nRow) { - pWriter->pBlockData = NULL; - } - } - } else { - TSDBKEY key = TSDBROW_KEY(pRow); - - while (true) { - if (pWriter->iBlock >= pWriter->mBlock.nItem) break; - - SDataBlk block; - int32_t c; - - tMapDataGetItemByIdx(&pWriter->mBlock, pWriter->iBlock, &block, tGetDataBlk); - - // if (block.last) { - // pWriter->pBlockData = &pWriter->bDataR; - - // code = tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, - // NULL); if (code) goto _err; pWriter->iRow = 0; - - // pWriter->iBlock++; - // break; - // } - - c = tsdbKeyCmprFn(&block.maxKey, &key); - - ASSERT(c); - - if (c < 0) { - if (pWriter->bDataW.nRow) { - // pWriter->blockW.last = 0; - // code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - // &pWriter->blockW, pWriter->cmprAlg); - // if (code) goto _err; - - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); - if (code) goto _err; - - tDataBlkReset(&pWriter->blockW); - tBlockDataClear(&pWriter->bDataW); - } - - code = tMapDataPutItem(&pWriter->mBlockW, &block, tPutDataBlk); - if (code) goto _err; - - pWriter->iBlock++; - } else { - c = tsdbKeyCmprFn(&tBlockDataLastKey(pBlockData), &block.minKey); - - ASSERT(c); - - if (c > 0) { - pWriter->pBlockData = &pWriter->bDataR; - // code = - // tsdbReadBlockData(pWriter->pDataFReader, pWriter->pBlockIdx, &block, pWriter->pBlockData, NULL, - // NULL); - // if (code) goto _err; - pWriter->iRow = 0; - - pWriter->iBlock++; - } - break; - } - } - - if (pWriter->pBlockData) continue; - - // code = tBlockDataAppendRow(&pWriter->bDataW, pRow, NULL); - // if (code) goto _err; - - iRow++; - if (iRow < pBlockData->nRow) { - *pRow = tsdbRowFromBlockData(pBlockData, iRow); - } else { - pRow = NULL; - } - } - - _check_write: - if (pWriter->bDataW.nRow < pWriter->maxRow * 4 / 5) continue; - - _write_block: - // code = tsdbWriteBlockData(pWriter->pDataFWriter, &pWriter->bDataW, NULL, NULL, pWriter->pBlockIdxW, - // &pWriter->blockW, pWriter->cmprAlg); - // if (code) goto _err; - - code = tMapDataPutItem(&pWriter->mBlockW, &pWriter->blockW, tPutDataBlk); - if (code) goto _err; - - tDataBlkReset(&pWriter->blockW); - tBlockDataClear(&pWriter->bDataW); - } - - return code; - -_err: - tsdbError("vgId:%d, vnode snapshot tsdb write table data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), - pWriter->pTsdb->path, tstrerror(code)); - return code; -} - -static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { - int32_t code = 0; - SBlockData* pBlockData = &pWriter->bData; - TSDBKEY keyFirst = tBlockDataFirstKey(pBlockData); - TSDBKEY keyLast = tBlockDataLastKey(pBlockData); - - // end last table write if should - if (pWriter->pBlockIdxW) { - int32_t c = tTABLEIDCmprFn(pWriter->pBlockIdxW, &id); - if (c < 0) { - // end - code = tsdbSnapWriteTableDataEnd(pWriter); + code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk); if (code) goto _err; + } - // reset - pWriter->pBlockIdxW = NULL; - } else if (c > 0) { - ASSERT(0); + code = tsdbSnapNextTableData(pWriter); + if (code) goto _err; + } + + if (pWriter->dWriter.mDataBlk.nItem) { + SBlockIdx blockIdx = {.suid = pWriter->id.suid, .uid = pWriter->id.uid}; + code = tsdbWriteDataBlk(pWriter->dWriter.pWriter, &pWriter->dWriter.mDataBlk, &blockIdx); + + if (taosArrayPush(pWriter->dWriter.aBlockIdx, &blockIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; } } - // start new table data write if need - if (pWriter->pBlockIdxW == NULL) { - // write table data ahead - while (true) { - if (pWriter->iBlockIdx >= taosArrayGetSize(pWriter->aBlockIdx)) break; + pWriter->id.suid = 0; + pWriter->id.uid = 0; - SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); - int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); - - if (c >= 0) break; - - code = tsdbSnapMoveWriteTableData(pWriter, pBlockIdx); - if (code) goto _err; - - pWriter->iBlockIdx++; - } - - // reader - pWriter->pBlockIdx = NULL; - if (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { - ASSERT(pWriter->pDataFReader); - - SBlockIdx* pBlockIdx = (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx); - int32_t c = tTABLEIDCmprFn(pBlockIdx, &id); - - ASSERT(c >= 0); - - if (c == 0) { - pWriter->pBlockIdx = pBlockIdx; - pWriter->iBlockIdx++; - } - } - - if (pWriter->pBlockIdx) { - code = tsdbReadBlock(pWriter->pDataFReader, pWriter->pBlockIdx, &pWriter->mBlock); - if (code) goto _err; - } else { - tMapDataReset(&pWriter->mBlock); - } - pWriter->iBlock = 0; - pWriter->pBlockData = NULL; - pWriter->iRow = 0; - - // writer - pWriter->pBlockIdxW = &pWriter->blockIdxW; - pWriter->pBlockIdxW->suid = id.suid; - pWriter->pBlockIdxW->uid = id.uid; - - tDataBlkReset(&pWriter->blockW); - tBlockDataReset(&pWriter->bDataW); - tMapDataReset(&pWriter->mBlockW); - } - - ASSERT(pWriter->pBlockIdxW && pWriter->pBlockIdxW->suid == id.suid && pWriter->pBlockIdxW->uid == id.uid); - ASSERT(pWriter->pBlockIdx == NULL || (pWriter->pBlockIdx->suid == id.suid && pWriter->pBlockIdx->uid == id.uid)); - - code = tsdbSnapWriteTableDataImpl(pWriter); - if (code) goto _err; - -_exit: - tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode), - pWriter->pTsdb->path); return code; _err: - tsdbError("vgId:%d, vnode snapshot tsdb write data impl for %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), - pWriter->pTsdb->path, tstrerror(code)); return code; } -static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { +static int32_t tsdbSnapWriteOpenFile(STsdbSnapWriter* pWriter, int32_t fid) { int32_t code = 0; STsdb* pTsdb = pWriter->pTsdb; - if (pWriter->pDataFWriter == NULL) goto _exit; + ASSERT(pWriter->dWriter.pWriter == NULL); + + pWriter->fid = fid; + pWriter->id = (TABLEID){0}; + SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); + + // Reader + if (pSet) { + code = tsdbDataFReaderOpen(&pWriter->dReader.pReader, pWriter->pTsdb, pSet); + if (code) goto _err; + + code = tsdbReadBlockIdx(pWriter->dReader.pReader, pWriter->dReader.aBlockIdx); + if (code) goto _err; + } else { + ASSERT(pWriter->dReader.pReader == NULL); + taosArrayClear(pWriter->dReader.aBlockIdx); + } + pWriter->dReader.iBlockIdx = 0; // point to the next one + code = tsdbSnapNextTableData(pWriter); + if (code) goto _err; + + // Writer + SHeadFile fHead = {.commitID = pWriter->commitID}; + SDataFile fData = {.commitID = pWriter->commitID}; + SSmaFile fSma = {.commitID = pWriter->commitID}; + SSttFile fStt = {.commitID = pWriter->commitID}; + SDFileSet wSet = {.fid = pWriter->fid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma}; + if (pSet) { + wSet.diskId = pSet->diskId; + fData = *pSet->pDataF; + fSma = *pSet->pSmaF; + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + wSet.aSttF[iStt] = pSet->aSttF[iStt]; + } + wSet.nSttF = pSet->nSttF + 1; // TODO: fix pSet->nSttF == pTsdb->maxFile + } else { + SDiskID did = {0}; + tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); + tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); + wSet.diskId = did; + wSet.nSttF = 1; + } + wSet.aSttF[wSet.nSttF - 1] = &fStt; + + code = tsdbDataFWriterOpen(&pWriter->dWriter.pWriter, pWriter->pTsdb, &wSet); + if (code) goto _err; + taosArrayClear(pWriter->dWriter.aBlockIdx); + tMapDataReset(&pWriter->dWriter.mDataBlk); + taosArrayClear(pWriter->dWriter.aSttBlk); + tBlockDataReset(&pWriter->dWriter.bData); + tBlockDataReset(&pWriter->dWriter.sData); + + return code; + +_err: + return code; +} + +static int32_t tsdbSnapWriteCloseFile(STsdbSnapWriter* pWriter) { + int32_t code = 0; + + ASSERT(pWriter->dWriter.pWriter); - // finish current table code = tsdbSnapWriteTableDataEnd(pWriter); if (code) goto _err; - // move remain table - while (pWriter->iBlockIdx < taosArrayGetSize(pWriter->aBlockIdx)) { - code = tsdbSnapMoveWriteTableData(pWriter, (SBlockIdx*)taosArrayGet(pWriter->aBlockIdx, pWriter->iBlockIdx)); - if (code) goto _err; - - pWriter->iBlockIdx++; - } - - // write remain stuff - if (taosArrayGetSize(pWriter->aBlockLW) > 0) { - code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aBlockIdxW); - if (code) goto _err; - } - - if (taosArrayGetSize(pWriter->aBlockIdx) > 0) { - code = tsdbWriteBlockIdx(pWriter->pDataFWriter, pWriter->aBlockIdxW); - if (code) goto _err; - } - - code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->pDataFWriter->wSet); + // copy remain table data + TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX}; + code = tsdbSnapWriteCopyData(pWriter, &id); if (code) goto _err; - code = tsdbDataFWriterClose(&pWriter->pDataFWriter, 1); + code = + tsdbWriteSttBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.sData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg); if (code) goto _err; - if (pWriter->pDataFReader) { - code = tsdbDataFReaderClose(&pWriter->pDataFReader); + // Indices + code = tsdbWriteBlockIdx(pWriter->dWriter.pWriter, pWriter->dWriter.aBlockIdx); + if (code) goto _err; + + code = tsdbWriteSttBlk(pWriter->dWriter.pWriter, pWriter->dWriter.aSttBlk); + if (code) goto _err; + + code = tsdbUpdateDFileSetHeader(pWriter->dWriter.pWriter); + if (code) goto _err; + + code = tsdbFSUpsertFSet(&pWriter->fs, &pWriter->dWriter.pWriter->wSet); + if (code) goto _err; + + code = tsdbDataFWriterClose(&pWriter->dWriter.pWriter, 1); + if (code) goto _err; + + if (pWriter->dReader.pReader) { + code = tsdbDataFReaderClose(&pWriter->dReader.pReader); if (code) goto _err; } _exit: - tsdbInfo("vgId:%d, vnode snapshot tsdb writer data end for %s", TD_VID(pTsdb->pVnode), pTsdb->path); return code; _err: - tsdbError("vgId:%d, vnode snapshot tsdb writer data end for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, - tstrerror(code)); + return code; +} + +static int32_t tsdbSnapWriteToDataFile(STsdbSnapWriter* pWriter, int32_t iRow, int8_t* done) { + int32_t code = 0; + + SBlockData* pBData = &pWriter->bData; + TABLEID id = {.suid = pBData->suid, .uid = pBData->uid ? pBData->uid : pBData->aUid[iRow]}; + TSDBROW row = tsdbRowFromBlockData(pBData, iRow); + TSDBKEY key = TSDBROW_KEY(&row); + + *done = 0; + while (pWriter->dReader.iRow < pWriter->dReader.bData.nRow || + pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem) { + // Merge row by row + for (; pWriter->dReader.iRow < pWriter->dReader.bData.nRow; pWriter->dReader.iRow++) { + TSDBROW trow = tsdbRowFromBlockData(&pWriter->dReader.bData, pWriter->dReader.iRow); + TSDBKEY tKey = TSDBROW_KEY(&trow); + + ASSERT(pWriter->dReader.bData.suid == id.suid && pWriter->dReader.bData.uid == id.uid); + + int32_t c = tsdbKeyCmprFn(&key, &tKey); + if (c < 0) { + code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid); + if (code) goto _err; + } else if (c > 0) { + code = tBlockDataAppendRow(&pWriter->dWriter.bData, &trow, NULL, id.uid); + if (code) goto _err; + } else { + ASSERT(0); + } + + if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) { + code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, + pWriter->cmprAlg); + if (code) goto _err; + } + + if (c < 0) { + *done = 1; + goto _exit; + } + } + + // Merge row by block + SDataBlk tDataBlk = {.minKey = key, .maxKey = key}; + for (; pWriter->dReader.iDataBlk < pWriter->dReader.mDataBlk.nItem; pWriter->dReader.iDataBlk++) { + SDataBlk dataBlk; + tMapDataGetItemByIdx(&pWriter->dReader.mDataBlk, pWriter->dReader.iDataBlk, &dataBlk, tGetDataBlk); + + int32_t c = tDataBlkCmprFn(&dataBlk, &tDataBlk); + if (c < 0) { + code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, + pWriter->cmprAlg); + if (code) goto _err; + + code = tMapDataPutItem(&pWriter->dWriter.mDataBlk, &dataBlk, tPutDataBlk); + if (code) goto _err; + } else if (c > 0) { + code = tBlockDataAppendRow(&pWriter->dWriter.bData, &row, NULL, id.uid); + if (code) goto _err; + + if (pWriter->dWriter.bData.nRow >= pWriter->maxRow) { + code = tsdbWriteDataBlock(pWriter->dWriter.pWriter, &pWriter->dWriter.bData, &pWriter->dWriter.mDataBlk, + pWriter->cmprAlg); + if (code) goto _err; + } + + *done = 1; + goto _exit; + } else { + code = tsdbReadDataBlockEx(pWriter->dReader.pReader, &dataBlk, &pWriter->dReader.bData); + if (code) goto _err; + pWriter->dReader.iRow = 0; + + pWriter->dReader.iDataBlk++; + break; + } + } + } + +_exit: + return code; + +_err: + tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); + return code; +} + +static int32_t tsdbSnapWriteToSttFile(STsdbSnapWriter* pWriter, int32_t iRow) { + int32_t code = 0; + + TABLEID id = {.suid = pWriter->bData.suid, + .uid = pWriter->bData.uid ? pWriter->bData.uid : pWriter->bData.aUid[iRow]}; + TSDBROW row = tsdbRowFromBlockData(&pWriter->bData, iRow); + SBlockData* pBData = &pWriter->dWriter.sData; + + if (pBData->suid || pBData->uid) { + if (!TABLE_SAME_SCHEMA(pBData->suid, pBData->uid, id.suid, id.uid)) { + code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg); + if (code) goto _err; + + pBData->suid = 0; + pBData->uid = 0; + } + } + + if (pBData->suid == 0 && pBData->uid == 0) { + code = tsdbUpdateTableSchema(pWriter->pTsdb->pVnode->pMeta, pWriter->id.suid, pWriter->id.uid, &pWriter->skmTable); + if (code) goto _err; + + code = tBlockDataInit(pBData, pWriter->id.suid, pWriter->id.suid ? 0 : pWriter->id.uid, pWriter->skmTable.pTSchema); + if (code) goto _err; + } + + code = tBlockDataAppendRow(pBData, &row, NULL, id.uid); + if (code) goto _err; + + if (pBData->nRow >= pWriter->maxRow) { + code = tsdbWriteSttBlock(pWriter->dWriter.pWriter, pBData, pWriter->dWriter.aSttBlk, pWriter->cmprAlg); + if (code) goto _err; + } + +_exit: + return code; + +_err: + return code; +} + +static int32_t tsdbSnapWriteRowData(STsdbSnapWriter* pWriter, int32_t iRow) { + int32_t code = 0; + + SBlockData* pBlockData = &pWriter->bData; + TABLEID id = {.suid = pBlockData->suid, .uid = pBlockData->uid ? pBlockData->uid : pBlockData->aUid[iRow]}; + + // End last table data write if need + if (tTABLEIDCmprFn(&pWriter->id, &id) != 0) { + code = tsdbSnapWriteTableDataEnd(pWriter); + if (code) goto _err; + } + + // Start new table data write if need + if (pWriter->id.suid == 0 && pWriter->id.uid == 0) { + code = tsdbSnapWriteTableDataStart(pWriter, &id); + if (code) goto _err; + } + + // Merge with .data file data + int8_t done = 0; + if (pWriter->dReader.pBlockIdx && tTABLEIDCmprFn(pWriter->dReader.pBlockIdx, &id) == 0) { + code = tsdbSnapWriteToDataFile(pWriter, iRow, &done); + if (code) goto _err; + } + + // Append to the .stt data block (todo: check if need to set/reload sst block) + if (!done) { + code = tsdbSnapWriteToSttFile(pWriter, iRow); + if (code) goto _err; + } + +_exit: + return code; + +_err: + tsdbError("vgId:%d %s failed since %s", TD_VID(pWriter->pTsdb->pVnode), __func__, tstrerror(code)); return code; } static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { - int32_t code = 0; - STsdb* pTsdb = pWriter->pTsdb; - SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; - TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr)); - int64_t n; - - // decode + int32_t code = 0; + STsdb* pTsdb = pWriter->pTsdb; SBlockData* pBlockData = &pWriter->bData; - code = tDecmprBlockData(pData + sizeof(SSnapDataHdr) + sizeof(TABLEID), pHdr->size - sizeof(TABLEID), pBlockData, - pWriter->aBuf); + + // Decode data + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; + code = tDecmprBlockData(pHdr->data, pHdr->size, pBlockData, pWriter->aBuf); if (code) goto _err; - // open file - TSDBKEY keyFirst = {.version = pBlockData->aVersion[0], .ts = pBlockData->aTSKEY[0]}; - TSDBKEY keyLast = {.version = pBlockData->aVersion[pBlockData->nRow - 1], - .ts = pBlockData->aTSKEY[pBlockData->nRow - 1]}; + ASSERT(pBlockData->nRow > 0); - int32_t fid = tsdbKeyFid(keyFirst.ts, pWriter->minutes, pWriter->precision); - ASSERT(fid == tsdbKeyFid(keyLast.ts, pWriter->minutes, pWriter->precision)); - if (pWriter->pDataFWriter == NULL || pWriter->fid != fid) { - // end last file data write if need - code = tsdbSnapWriteDataEnd(pWriter); - if (code) goto _err; + // Loop to handle each row + for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) { + TSKEY ts = pBlockData->aTSKEY[iRow]; + int32_t fid = tsdbKeyFid(ts, pWriter->minutes, pWriter->precision); - pWriter->fid = fid; + if (pWriter->dWriter.pWriter == NULL || pWriter->fid != fid) { + if (pWriter->dWriter.pWriter) { + ASSERT(fid > pWriter->fid); - // read - SDFileSet* pSet = taosArraySearch(pWriter->fs.aDFileSet, &(SDFileSet){.fid = fid}, tDFileSetCmprFn, TD_EQ); - if (pSet) { - code = tsdbDataFReaderOpen(&pWriter->pDataFReader, pTsdb, pSet); + code = tsdbSnapWriteCloseFile(pWriter); + if (code) goto _err; + } + + code = tsdbSnapWriteOpenFile(pWriter, fid); if (code) goto _err; - - code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx); - if (code) goto _err; - - code = tsdbReadSttBlk(pWriter->pDataFReader, 0, pWriter->aSstBlk); - if (code) goto _err; - } else { - ASSERT(pWriter->pDataFReader == NULL); - taosArrayClear(pWriter->aBlockIdx); - taosArrayClear(pWriter->aSstBlk); - } - pWriter->iBlockIdx = 0; - pWriter->pBlockIdx = NULL; - tMapDataReset(&pWriter->mBlock); - pWriter->iBlock = 0; - pWriter->pBlockData = NULL; - pWriter->iRow = 0; - pWriter->iBlockL = 0; - tBlockDataReset(&pWriter->bDataR); - tBlockDataReset(&pWriter->lDataR); - - // write - SHeadFile fHead; - SDataFile fData; - SSttFile fLast; - SSmaFile fSma; - SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aSttF[0] = &fLast, .pSmaF = &fSma}; - - if (pSet) { - wSet.diskId = pSet->diskId; - wSet.fid = fid; - wSet.nSttF = 1; - fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; - fData = *pSet->pDataF; - fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0}; - fSma = *pSet->pSmaF; - } else { - wSet.diskId = (SDiskID){.level = 0, .id = 0}; - wSet.fid = fid; - wSet.nSttF = 1; - fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; - fData = (SDataFile){.commitID = pWriter->commitID, .size = 0}; - fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0, .offset = 0}; - fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0}; } - code = tsdbDataFWriterOpen(&pWriter->pDataFWriter, pTsdb, &wSet); + code = tsdbSnapWriteRowData(pWriter, iRow); if (code) goto _err; - - taosArrayClear(pWriter->aBlockIdxW); - taosArrayClear(pWriter->aBlockLW); - tMapDataReset(&pWriter->mBlockW); - pWriter->pBlockIdxW = NULL; - tBlockDataReset(&pWriter->bDataW); } - code = tsdbSnapWriteTableData(pWriter, id); - if (code) goto _err; - - tsdbInfo("vgId:%d, vnode snapshot tsdb write data for %s, fid:%d suid:%" PRId64 " uid:%" PRId64 " nRow:%d", - TD_VID(pTsdb->pVnode), pTsdb->path, fid, id.suid, id.suid, pBlockData->nRow); return code; _err: @@ -976,10 +1099,41 @@ _err: return code; } +// SNAP_DATA_DEL +static int32_t tsdbSnapMoveWriteDelData(STsdbSnapWriter* pWriter, TABLEID* pId) { + int32_t code = 0; + + while (true) { + if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break; + + SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); + + if (tTABLEIDCmprFn(pDelIdx, pId) >= 0) break; + + code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData); + if (code) goto _exit; + + SDelIdx delIdx = *pDelIdx; + code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx); + if (code) goto _exit; + + if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + pWriter->iDelIdx++; + } + +_exit: + return code; +} + static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) { int32_t code = 0; STsdb* pTsdb = pWriter->pTsdb; + // Open del file if not opened yet if (pWriter->pDelFWriter == NULL) { SDelFile* pDelFile = pWriter->fs.pDelFile; @@ -990,38 +1144,28 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR); if (code) goto _err; + } else { + taosArrayClear(pWriter->aDelIdxR); } + pWriter->iDelIdx = 0; // writer - SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0}; + SDelFile delFile = {.commitID = pWriter->commitID}; code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb); if (code) goto _err; + taosArrayClear(pWriter->aDelIdxW); } - // process the del data - TABLEID id = *(TABLEID*)(pData + sizeof(SSnapDataHdr)); + SSnapDataHdr* pHdr = (SSnapDataHdr*)pData; + TABLEID id = *(TABLEID*)pHdr->data; - while (true) { - if (pWriter->iDelIdx >= taosArrayGetSize(pWriter->aDelIdxR)) break; - if (tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) >= 0) break; + ASSERT(pHdr->size + sizeof(SSnapDataHdr) == nData); - SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); - - code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData); - if (code) goto _err; - - SDelIdx delIdx = *pDelIdx; - code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx); - if (code) goto _err; - - if (taosArrayPush(pWriter->aDelIdxW, &delIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - - pWriter->iDelIdx++; - } + // Move write data < id + code = tsdbSnapMoveWriteDelData(pWriter, &id); + if (code) goto _err; + // Merge incoming data with current if (pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR) && tTABLEIDCmprFn(taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx), &id) == 0) { SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); @@ -1055,7 +1199,6 @@ static int32_t tsdbSnapWriteDel(STsdbSnapWriter* pWriter, uint8_t* pData, uint32 goto _err; } -_exit: return code; _err: @@ -1068,23 +1211,14 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { int32_t code = 0; STsdb* pTsdb = pWriter->pTsdb; - if (pWriter->pDelFWriter == NULL) goto _exit; + if (pWriter->pDelFWriter == NULL) return code; - for (; pWriter->iDelIdx < taosArrayGetSize(pWriter->aDelIdxR); pWriter->iDelIdx++) { - SDelIdx* pDelIdx = (SDelIdx*)taosArrayGet(pWriter->aDelIdxR, pWriter->iDelIdx); + TABLEID id = {.suid = INT64_MAX, .uid = INT64_MAX}; + code = tsdbSnapMoveWriteDelData(pWriter, &id); + if (code) goto _err; - code = tsdbReadDelData(pWriter->pDelFReader, pDelIdx, pWriter->aDelData); - if (code) goto _err; - - SDelIdx delIdx = *pDelIdx; - code = tsdbWriteDelData(pWriter->pDelFWriter, pWriter->aDelData, &delIdx); - if (code) goto _err; - - if (taosArrayPush(pWriter->aDelIdxR, &delIdx) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - } + code = tsdbWriteDelIdx(pWriter->pDelFWriter, pWriter->aDelIdxW); + if (code) goto _err; code = tsdbUpdateDelFileHdr(pWriter->pDelFWriter); if (code) goto _err; @@ -1100,7 +1234,6 @@ static int32_t tsdbSnapWriteDelEnd(STsdbSnapWriter* pWriter) { if (code) goto _err; } -_exit: tsdbInfo("vgId:%d, vnode snapshot tsdb write del for %s end", TD_VID(pTsdb->pVnode), pTsdb->path); return code; @@ -1110,6 +1243,7 @@ _err: return code; } +// APIs int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWriter** ppWriter) { int32_t code = 0; STsdbSnapWriter* pWriter = NULL; @@ -1135,39 +1269,38 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr pWriter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; pWriter->commitID = pTsdb->pVnode->state.commitID; - // for data file + // SNAP_DATA_TSDB code = tBlockDataCreate(&pWriter->bData); - - if (code) goto _err; - pWriter->aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); - if (pWriter->aBlockIdx == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - code = tBlockDataCreate(&pWriter->bDataR); if (code) goto _err; - pWriter->aSstBlk = taosArrayInit(0, sizeof(SSttBlk)); - if (pWriter->aSstBlk == NULL) { + pWriter->fid = INT32_MIN; + pWriter->id = (TABLEID){0}; + // Reader + pWriter->dReader.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pWriter->dReader.aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - - pWriter->aBlockIdxW = taosArrayInit(0, sizeof(SBlockIdx)); - if (pWriter->aBlockIdxW == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - code = tBlockDataCreate(&pWriter->bDataW); + code = tBlockDataCreate(&pWriter->dReader.bData); if (code) goto _err; - pWriter->aBlockLW = taosArrayInit(0, sizeof(SSttBlk)); - if (pWriter->aBlockLW == NULL) { + // Writer + pWriter->dWriter.aBlockIdx = taosArrayInit(0, sizeof(SBlockIdx)); + if (pWriter->dWriter.aBlockIdx == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } + pWriter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if (pWriter->dWriter.aSttBlk == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + code = tBlockDataCreate(&pWriter->dWriter.bData); + if (code) goto _err; + code = tBlockDataCreate(&pWriter->dWriter.sData); + if (code) goto _err; - // for del file + // SNAP_DATA_DEL pWriter->aDelIdxR = taosArrayInit(0, sizeof(SDelIdx)); if (pWriter->aDelIdxR == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1188,6 +1321,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr tsdbInfo("vgId:%d, tsdb snapshot writer open for %s succeed", TD_VID(pTsdb->pVnode), pTsdb->path); return code; + _err: tsdbError("vgId:%d, tsdb snapshot writer open for %s failed since %s", TD_VID(pTsdb->pVnode), pTsdb->path, tstrerror(code)); @@ -1198,14 +1332,17 @@ _err: int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { int32_t code = 0; STsdbSnapWriter* pWriter = *ppWriter; + STsdb* pTsdb = pWriter->pTsdb; if (rollback) { ASSERT(0); // code = tsdbFSRollback(pWriter->pTsdb->pFS); // if (code) goto _err; } else { - code = tsdbSnapWriteDataEnd(pWriter); - if (code) goto _err; + if (pWriter->dWriter.pWriter) { + code = tsdbSnapWriteCloseFile(pWriter); + if (code) goto _err; + } code = tsdbSnapWriteDelEnd(pWriter); if (code) goto _err; @@ -1213,14 +1350,44 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** ppWriter, int8_t rollback) { code = tsdbFSCommit1(pWriter->pTsdb, &pWriter->fs); if (code) goto _err; + // lock + taosThreadRwlockWrlock(&pTsdb->rwLock); + code = tsdbFSCommit2(pWriter->pTsdb, &pWriter->fs); - if (code) goto _err; + if (code) { + taosThreadRwlockUnlock(&pTsdb->rwLock); + goto _err; + } + + // unlock + taosThreadRwlockUnlock(&pTsdb->rwLock); } + // SNAP_DATA_DEL + taosArrayDestroy(pWriter->aDelIdxW); + taosArrayDestroy(pWriter->aDelData); + taosArrayDestroy(pWriter->aDelIdxR); + + // SNAP_DATA_TSDB + + // Writer + tBlockDataDestroy(&pWriter->dWriter.sData, 1); + tBlockDataDestroy(&pWriter->dWriter.bData, 1); + taosArrayDestroy(pWriter->dWriter.aSttBlk); + tMapDataClear(&pWriter->dWriter.mDataBlk); + taosArrayDestroy(pWriter->dWriter.aBlockIdx); + + // Reader + tBlockDataDestroy(&pWriter->dReader.bData, 1); + tMapDataClear(&pWriter->dReader.mDataBlk); + taosArrayDestroy(pWriter->dReader.aBlockIdx); + + tBlockDataDestroy(&pWriter->bData, 1); + tTSchemaDestroy(pWriter->skmTable.pTSchema); + for (int32_t iBuf = 0; iBuf < sizeof(pWriter->aBuf) / sizeof(uint8_t*); iBuf++) { tFree(pWriter->aBuf[iBuf]); } - tsdbInfo("vgId:%d, vnode snapshot tsdb writer close for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); taosMemoryFree(pWriter); *ppWriter = NULL; @@ -1245,8 +1412,8 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) goto _exit; } else { - if (pWriter->pDataFWriter) { - code = tsdbSnapWriteDataEnd(pWriter); + if (pWriter->dWriter.pWriter) { + code = tsdbSnapWriteCloseFile(pWriter); if (code) goto _err; } } @@ -1259,7 +1426,6 @@ int32_t tsdbSnapWrite(STsdbSnapWriter* pWriter, uint8_t* pData, uint32_t nData) _exit: tsdbDebug("vgId:%d, tsdb snapshot write for %s succeed", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); - return code; _err: diff --git a/source/dnode/vnode/src/vnd/vnodeSnapshot.c b/source/dnode/vnode/src/vnd/vnodeSnapshot.c index 03ab5e8285..08c3a34699 100644 --- a/source/dnode/vnode/src/vnd/vnodeSnapshot.c +++ b/source/dnode/vnode/src/vnd/vnodeSnapshot.c @@ -354,7 +354,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) { code = metaSnapWrite(pWriter->pMetaSnapWriter, pData, nData); if (code) goto _err; } break; - case SNAP_DATA_TSDB: { + case SNAP_DATA_TSDB: + case SNAP_DATA_DEL: { // tsdb if (pWriter->pTsdbSnapWriter == NULL) { code = tsdbSnapWriterOpen(pVnode->pTsdb, pWriter->sver, pWriter->ever, &pWriter->pTsdbSnapWriter); diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 57cf477bf8..161c878440 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -221,6 +221,7 @@ ./test.sh -f tsim/table/describe.sim ./test.sh -f tsim/table/double.sim ./test.sh -f tsim/table/float.sim +./test.sh -f tsim/table/hash.sim ./test.sh -f tsim/table/int.sim ./test.sh -f tsim/table/limit.sim ./test.sh -f tsim/table/smallint.sim diff --git a/tests/script/sh/abs_max.c b/tests/script/sh/abs_max.c deleted file mode 100644 index d623adacf9..0000000000 --- a/tests/script/sh/abs_max.c +++ /dev/null @@ -1,88 +0,0 @@ -#include -#include -#include - -typedef struct SUdfInit{ - int maybe_null; /* 1 if function can return NULL */ - int decimals; /* for real functions */ - long long length; /* For string functions */ - char *ptr; /* free pointer for function data */ - int const_item; /* 0 if result is independent of arguments */ -} SUdfInit; - - -#define TSDB_DATA_INT_NULL 0x80000000LL -#define TSDB_DATA_BIGINT_NULL 0x8000000000000000LL - -void abs_max(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput, - int* numOfOutput, short otype, short obytes, SUdfInit* buf) { - int i; - int r = 0; - printf("abs_max input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf); - if (itype == 5) { - r=*(long *)dataOutput; - *numOfOutput=0; - - for(i=0;i r) { - r = v; - } - } - - *(long *)dataOutput=r; - - printf("abs_max out, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput); - } -} - - - -void abs_max_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) { - int i; - int r = 0; - printf("abs_max_finalize dataoutput:%p:%d, numOfOutput:%d, buf:%p\n", dataOutput, *dataOutput, *numOfOutput, buf); - *numOfOutput=1; - printf("abs_max finalize, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput); -} - -void abs_max_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) { - int r = 0; - - if (numOfRows > 0) { - r = *((long *)data); - } - printf("abs_max_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf); - for (int i = 1; i < numOfRows; ++i) { - printf("abs_max_merge %d - %ld\n", i, *((long *)data + i)); - if (*((long*)data + i) > r) { - r= *((long*)data + i); - } - } - - *(long*)dataOutput=r; - if (numOfRows > 0) { - *numOfOutput=1; - } else { - *numOfOutput=0; - } - - printf("abs_max_merge, dataoutput:%ld, numOfOutput:%d\n", *(long *)dataOutput, *numOfOutput); -} - - -int abs_max_init(SUdfInit* buf) { - printf("abs_max init\n"); - return 0; -} - - -void abs_max_destroy(SUdfInit* buf) { - printf("abs_max destroy\n"); -} - diff --git a/tests/script/sh/add_one.c b/tests/script/sh/add_one.c deleted file mode 100644 index e12cf8f26f..0000000000 --- a/tests/script/sh/add_one.c +++ /dev/null @@ -1,33 +0,0 @@ -#include -#include -#include - -typedef struct SUdfInit{ - int maybe_null; /* 1 if function can return NULL */ - int decimals; /* for real functions */ - long long length; /* For string functions */ - char *ptr; /* free pointer for function data */ - int const_item; /* 0 if result is independent of arguments */ -} SUdfInit; - -void add_one(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBUf, char* tsOutput, - int* numOfOutput, short otype, short obytes, SUdfInit* buf) { - int i; - int r = 0; - printf("add_one input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf); - if (itype == 4) { - for(i=0;i -#include -#include - -typedef struct SUdfInit{ - int maybe_null; /* 1 if function can return NULL */ - int decimals; /* for real functions */ - long long length; /* For string functions */ - char *ptr; /* free pointer for function data */ - int const_item; /* 0 if result is independent of arguments */ -} SUdfInit; - -typedef struct SDemo{ - double sum; - int num; - short otype; -}SDemo; - -#define FLOAT_NULL 0x7FF00000 // it is an NAN -#define DOUBLE_NULL 0x7FFFFF0000000000LL // it is an NAN - - -void demo(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput, - int* numOfOutput, short otype, short obytes, SUdfInit* buf) { - int i; - double r = 0; - SDemo *p = (SDemo *)interBuf; - SDemo *q = (SDemo *)dataOutput; - printf("demo input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, interBUf:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, interBuf, tsOutput, numOfOutput, buf); - - for(i=0;isum += r*r; - } - - p->otype = otype; - p->num += numOfRows; - - q->sum = p->sum; - q->num = p->num; - q->otype = p->otype; - - *numOfOutput=1; - - printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput); -} - - -void demo_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) { - int i; - SDemo *p = (SDemo *)data; - SDemo res = {0}; - printf("demo_merge input data:%p, rows:%d, dataoutput:%p, numOfOutput:%p, buf:%p\n", data, numOfRows, dataOutput, numOfOutput, buf); - - for(i=0;isum * p->sum; - res.num += p->num; - p++; - } - - p->sum = res.sum; - p->num = res.num; - - *numOfOutput=1; - - printf("demo out, sum:%f, num:%d, numOfOutput:%d\n", p->sum, p->num, *numOfOutput); -} - - - -void demo_finalize(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf) { - SDemo *p = (SDemo *)interBuf; - printf("demo_finalize interbuf:%p, numOfOutput:%p, buf:%p, sum:%f, num:%d\n", interBuf, numOfOutput, buf, p->sum, p->num); - if (p->otype == 6) { - if (p->num != 30000) { - *(unsigned int *)dataOutput = FLOAT_NULL; - } else { - *(float *)dataOutput = (float)(p->sum / p->num); - } - printf("finalize values:%f\n", *(float *)dataOutput); - } else if (p->otype == 7) { - if (p->num != 30000) { - *(unsigned long long *)dataOutput = DOUBLE_NULL; - } else { - *(double *)dataOutput = (double)(p->sum / p->num); - } - printf("finalize values:%f\n", *(double *)dataOutput); - } - - *numOfOutput=1; - - printf("demo finalize, numOfOutput:%d\n", *numOfOutput); -} - - -int demo_init(SUdfInit* buf) { - printf("demo init\n"); - return 0; -} - - -void demo_destroy(SUdfInit* buf) { - printf("demo destroy\n"); -} - diff --git a/tests/script/sh/demo.lua b/tests/script/sh/demo.lua deleted file mode 100644 index c5e5582fc3..0000000000 --- a/tests/script/sh/demo.lua +++ /dev/null @@ -1,43 +0,0 @@ -funcName = "test" - -global = {} - -function test_init() - return global -end - -function test_add(rows, ans, key) - t = {} - t["sum"] = 0.0 - t["num"] = 0 - for i=1, #rows do - t["sum"] = t["sum"] + rows[i] * rows[i] - end - t["num"] = #rows - - - if (ans[key] ~= nil) - then - ans[key]["sum"] = ans[key]["sum"] + t["sum"] - ans[key]["num"] = ans[key]["num"] + t["num"] - else - ans[key] = t - end - - return ans; -end - -function test_finalize(ans, key) - local ret = 0.0 - - if (ans[key] ~= nil and ans[key]["num"] == 30000) - then - ret = ans[key]["sum"]/ans[key]["num"] - ans[key]["sum"] = 0.0 - ans[key]["num"] = 0 - else - ret = inf - end - - return ret, ans -end diff --git a/tests/script/sh/sum_double.c b/tests/script/sh/sum_double.c deleted file mode 100644 index d6eea5d291..0000000000 --- a/tests/script/sh/sum_double.c +++ /dev/null @@ -1,84 +0,0 @@ -#include -#include -#include - -typedef struct SUdfInit{ - int maybe_null; /* 1 if function can return NULL */ - int decimals; /* for real functions */ - long long length; /* For string functions */ - char *ptr; /* free pointer for function data */ - int const_item; /* 0 if result is independent of arguments */ -} SUdfInit; - -#define TSDB_DATA_INT_NULL 0x80000000LL - - -void sum_double(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput, - int* numOfOutput, short otype, short obytes, SUdfInit* buf) { - int i; - int r = 0; - printf("sum_double input data:%p, type:%d, rows:%d, ts:%p,%lld, dataoutput:%p, tsOutput:%p, numOfOutput:%p, buf:%p\n", data, itype, numOfRows, ts, *ts, dataOutput, tsOutput, numOfOutput, buf); - if (itype == 4) { - r=*(int *)dataOutput; - *numOfOutput=0; - - for(i=0;iptr)=*(int*)dataOutput*2; - *(int*)dataOutput=*(int*)(buf->ptr); - printf("sum_double finalize, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput); -} - -void sum_double_merge(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf) { - int r = 0; - int sum = 0; - - printf("sum_double_merge numOfRows:%d, dataoutput:%p, buf:%p\n", numOfRows, dataOutput, buf); - for (int i = 0; i < numOfRows; ++i) { - printf("sum_double_merge %d - %d\n", i, *((int*)data + i)); - sum +=*((int*)data + i); - } - - *(int*)dataOutput+=sum; - if (numOfRows > 0) { - *numOfOutput=1; - } else { - *numOfOutput=0; - } - - printf("sum_double_merge, dataoutput:%d, numOfOutput:%d\n", *(int *)dataOutput, *numOfOutput); -} - - -int sum_double_init(SUdfInit* buf) { - buf->maybe_null=1; - buf->ptr = taosMemoryMalloc(sizeof(int)); - printf("sum_double init\n"); - return 0; -} - - -void sum_double_destroy(SUdfInit* buf) { - taosMemoryFree(buf->ptr); - printf("sum_double destroy\n"); -} - diff --git a/tests/script/tsim/table/hash.sim b/tests/script/tsim/table/hash.sim new file mode 100644 index 0000000000..664f867137 --- /dev/null +++ b/tests/script/tsim/table/hash.sim @@ -0,0 +1,84 @@ +system sh/stop_dnodes.sh +system sh/deploy.sh -n dnode1 -i 1 +system sh/exec.sh -n dnode1 -s start +sql connect + +#=========== prepare +#sql create database d1 vgroups 2 +sql create database d1 vgroups 2 table_prefix 3 table_suffix 2 +sql select * from information_schema.ins_databases +print $data(d1)[27] $data(d1)[28] +if $data(d1)[27] != 3 then + return -1 +endi +if $data(d1)[28] != 2 then + return -1 +endi + +sql use d1; +sql create table st (ts timestamp, i int) tags (j int); +sql create table st_ct_1 using st tags(3) st_ct_2 using st tags(4) st_ct_3 using st tags(5) st_ct_4 using st tags(6) st_ct_5 using st tags(7) +sql insert into st_ct_1 values(now+1s, 1) +sql insert into st_ct_1 values(now+2s, 2) +sql insert into st_ct_1 values(now+3s, 3) +sql insert into st_ct_2 values(now+1s, 1) +sql insert into st_ct_2 values(now+2s, 2) +sql insert into st_ct_2 values(now+3s, 3) +sql insert into st_ct_3 values(now+1s, 1) +sql insert into st_ct_3 values(now+2s, 2) +sql insert into st_ct_3 values(now+3s, 2) +sql insert into st_ct_4 values(now+1s, 1) +sql insert into st_ct_4 values(now+2s, 2) +sql insert into st_ct_4 values(now+3s, 2) +sql insert into st_ct_5 values(now+1s, 1) +sql insert into st_ct_5 values(now+2s, 2) +sql insert into st_ct_5 values(now+3s, 2) + +# check query +sql select * from st +if $rows != 15 then + return -1 +endi + +# check table vgroup +sql select * from information_schema.ins_tables where db_name = 'd1' +if $data(st_ct_1)[6] != 2 then + return -1 +endi +if $data(st_ct_2)[6] != 2 then + return -1 +endi +if $data(st_ct_3)[6] != 2 then + return -1 +endi +if $data(st_ct_4)[6] != 2 then + return -1 +endi +if $data(st_ct_5)[6] != 2 then + return -1 +endi + +# check invalid table name +sql create table c1 using st tags(3) +sql create table c12 using st tags(3) +sql create table c123 using st tags(3) +sql create table c1234 using st tags(3) +sql create table c12345 using st tags(3) +sql select * from information_schema.ins_tables where db_name = 'd1' +if $data(c1)[6] != 2 then + return -1 +endi +if $data(c12)[6] != 3 then + return -1 +endi +if $data(c123)[6] != 2 then + return -1 +endi +if $data(c1234)[6] != 3 then + return -1 +endi +if $data(c12345)[6] != 3 then + return -1 +endi + +system sh/exec.sh -n dnode1 -s stop -x SIGINT