diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d826f312e5..9f919fa250 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -845,6 +845,8 @@ typedef struct { int64_t uid; int32_t vgVersion; int32_t vgNum; + int16_t hashPrefix; + int16_t hashSuffix; int8_t hashMethod; SArray* pVgroupInfos; // Array of SVgroupInfo } SUseDbRsp; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 1fa7dca7dc..44a9e10679 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -116,6 +116,8 @@ typedef struct STableMeta { typedef struct SDBVgInfo { int32_t vgVersion; + int16_t hashPrefix; + int16_t hashSuffix; int8_t hashMethod; int32_t numOfTable; // DB's table num, unit is TSDB_TABLE_NUM_UNIT SHashObj* vgHash; // key:vgId, value:SVgroupInfo diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 56e3527f96..84a827ed78 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -73,6 +73,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog vgInfo->vgVersion = rsp->vgVersion; vgInfo->hashMethod = rsp->hashMethod; + vgInfo->hashPrefix = rsp->hashPrefix; + vgInfo->hashSuffix = rsp->hashSuffix; vgInfo->vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); if (NULL == vgInfo->vgHash) { taosMemoryFree(vgInfo); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index c08817aaf4..ea25094d10 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -2463,6 +2463,8 @@ int32_t tSerializeSUseDbRspImp(SEncoder *pEncoder, const SUseDbRsp *pRsp) { if (tEncodeI64(pEncoder, pRsp->uid) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->vgVersion) < 0) return -1; if (tEncodeI32(pEncoder, pRsp->vgNum) < 0) return -1; + if (tEncodeI16(pEncoder, pRsp->hashPrefix) < 0) return -1; + if (tEncodeI16(pEncoder, pRsp->hashSuffix) < 0) return -1; if (tEncodeI8(pEncoder, pRsp->hashMethod) < 0) return -1; for (int32_t i = 0; i < pRsp->vgNum; ++i) { @@ -2514,6 +2516,8 @@ int32_t tDeserializeSUseDbRspImp(SDecoder *pDecoder, SUseDbRsp *pRsp) { if (tDecodeI64(pDecoder, &pRsp->uid) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->vgVersion) < 0) return -1; if (tDecodeI32(pDecoder, &pRsp->vgNum) < 0) return -1; + if (tDecodeI16(pDecoder, &pRsp->hashPrefix) < 0) return -1; + if (tDecodeI16(pDecoder, &pRsp->hashSuffix) < 0) return -1; if (tDecodeI8(pDecoder, &pRsp->hashMethod) < 0) return -1; if (pRsp->vgNum <= 0) { diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 77c9d0bb79..939439ca56 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1171,6 +1171,8 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs pRsp->vgVersion = pDb->vgVersion; pRsp->vgNum = taosArrayGetSize(pRsp->pVgroupInfos); pRsp->hashMethod = pDb->cfg.hashMethod; + pRsp->hashPrefix = pDb->cfg.hashPrefix; + pRsp->hashSuffix = pDb->cfg.hashSuffix; return 0; } @@ -1303,6 +1305,8 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs, usedbRsp.vgVersion = pDb->vgVersion; usedbRsp.vgNum = (int32_t)taosArrayGetSize(usedbRsp.pVgroupInfos); usedbRsp.hashMethod = pDb->cfg.hashMethod; + usedbRsp.hashPrefix = pDb->cfg.hashPrefix; + usedbRsp.hashSuffix = pDb->cfg.hashSuffix; taosArrayPush(batchUseRsp.pArray, &usedbRsp); mndReleaseDb(pMnode, pDb); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 91aacaa328..d45a6f19f0 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -43,14 +43,14 @@ typedef struct STbDataIter STbDataIter; typedef struct SMapData SMapData; typedef struct SBlockIdx SBlockIdx; typedef struct SDataBlk SDataBlk; -typedef struct SSstBlk SSstBlk; +typedef struct SSttBlk SSttBlk; typedef struct SColData SColData; typedef struct SDiskDataHdr SDiskDataHdr; typedef struct SBlockData SBlockData; typedef struct SDelFile SDelFile; typedef struct SHeadFile SHeadFile; typedef struct SDataFile SDataFile; -typedef struct SSstFile SSstFile; +typedef struct SSttFile SSttFile; typedef struct SSmaFile SSmaFile; typedef struct SDFileSet SDFileSet; typedef struct SDataFWriter SDataFWriter; @@ -69,8 +69,8 @@ typedef struct SLDataIter SLDataIter; #define TSDB_FILE_DLMT ((uint32_t)0xF00AFA0F) #define TSDB_MAX_SUBBLOCKS 8 -#define TSDB_MAX_SST_FILE 16 -#define TSDB_DEFAULT_SST_FILE 8 +#define TSDB_MAX_STT_FILE 16 +#define TSDB_DEFAULT_STT_FILE 8 #define TSDB_FHDR_SIZE 512 #define TSDB_DEFAULT_PAGE_SIZE 4096 @@ -130,9 +130,9 @@ int32_t tPutDataBlk(uint8_t *p, void *ph); int32_t tGetDataBlk(uint8_t *p, void *ph); int32_t tDataBlkCmprFn(const void *p1, const void *p2); bool tDataBlkHasSma(SDataBlk *pDataBlk); -// SSstBlk -int32_t tPutSstBlk(uint8_t *p, void *ph); -int32_t tGetSstBlk(uint8_t *p, void *ph); +// SSttBlk +int32_t tPutSttBlk(uint8_t *p, void *ph); +int32_t tGetSttBlk(uint8_t *p, void *ph); // SBlockIdx int32_t tPutBlockIdx(uint8_t *p, void *ph); int32_t tGetBlockIdx(uint8_t *p, void *ph); @@ -228,7 +228,7 @@ bool tsdbDelFileIsSame(SDelFile *pDelFile1, SDelFile *pDelFile2); int32_t tsdbDFileRollback(STsdb *pTsdb, SDFileSet *pSet, EDataFileT ftype); int32_t tPutHeadFile(uint8_t *p, SHeadFile *pHeadFile); int32_t tPutDataFile(uint8_t *p, SDataFile *pDataFile); -int32_t tPutSstFile(uint8_t *p, SSstFile *pSstFile); +int32_t tPutSttFile(uint8_t *p, SSttFile *pSttFile); int32_t tPutSmaFile(uint8_t *p, SSmaFile *pSmaFile); int32_t tPutDelFile(uint8_t *p, SDelFile *pDelFile); int32_t tGetDelFile(uint8_t *p, SDelFile *pDelFile); @@ -237,7 +237,7 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet); void tsdbHeadFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SHeadFile *pHeadF, char fname[]); void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, char fname[]); -void tsdbSstFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSstFile *pSstF, char fname[]); +void tsdbSttFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSttFile *pSttF, char fname[]); void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]); // SDelFile void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]); @@ -263,7 +263,7 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync); int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter); int32_t tsdbWriteBlockIdx(SDataFWriter *pWriter, SArray *aBlockIdx); int32_t tsdbWriteBlock(SDataFWriter *pWriter, SMapData *pMapData, SBlockIdx *pBlockIdx); -int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk); +int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk); int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlockInfo *pBlkInfo, SSmaInfo *pSmaInfo, int8_t cmprAlg, int8_t toLast); @@ -273,10 +273,10 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); int32_t tsdbReadBlock(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *pMapData); -int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk); +int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk); int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg); int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData); -int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData); +int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData); // SDelFWriter int32_t tsdbDelFWriterOpen(SDelFWriter **ppWriter, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFWriterClose(SDelFWriter **ppWriter, int8_t sync); @@ -448,7 +448,7 @@ struct SDataBlk { SSmaInfo smaInfo; }; -struct SSstBlk { +struct SSttBlk { int64_t suid; int64_t minUid; int64_t maxUid; @@ -550,7 +550,7 @@ struct SDataFile { int64_t size; }; -struct SSstFile { +struct SSttFile { volatile int32_t nRef; int64_t commitID; @@ -571,8 +571,8 @@ struct SDFileSet { SHeadFile *pHeadF; SDataFile *pDataF; SSmaFile *pSmaF; - uint8_t nSstF; - SSstFile *aSstF[TSDB_MAX_SST_FILE]; + uint8_t nSttF; + SSttFile *aSttF[TSDB_MAX_STT_FILE]; }; struct SRowIter { @@ -617,12 +617,12 @@ struct SDataFWriter { STsdbFD *pHeadFD; STsdbFD *pDataFD; STsdbFD *pSmaFD; - STsdbFD *pSstFD; + STsdbFD *pSttFD; SHeadFile fHead; SDataFile fData; SSmaFile fSma; - SSstFile fSst[TSDB_MAX_SST_FILE]; + SSttFile fStt[TSDB_MAX_STT_FILE]; uint8_t *aBuf[4]; }; @@ -633,7 +633,7 @@ struct SDataFReader { STsdbFD *pHeadFD; STsdbFD *pDataFD; STsdbFD *pSmaFD; - STsdbFD *aSstFD[TSDB_MAX_SST_FILE]; + STsdbFD *aSttFD[TSDB_MAX_STT_FILE]; uint8_t *aBuf[3]; }; @@ -650,7 +650,7 @@ typedef struct SMergeTree { SLDataIter *pIter; } SMergeTree; -int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t uid, +int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); bool tMergeTreeNext(SMergeTree *pMTree); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 93f9691675..62aba649a3 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -104,7 +104,7 @@ int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* int metaAlterSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq, SArray* tbUidList); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); -int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids); +int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t *tbUid); int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); @@ -208,7 +208,7 @@ int32_t tdProcessRSmaCreate(SSma* pSma, SVCreateStbReq* pReq); int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType); int32_t tdProcessRSmaDrop(SSma* pSma, SVDropStbReq* pReq); int32_t tdFetchTbUidList(SSma* pSma, STbUidStore** ppStore, tb_uid_t suid, tb_uid_t uid); -int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore); +int32_t tdUpdateTbUidList(SSma* pSma, STbUidStore* pUidStore, bool isAdd); void tdUidStoreDestory(STbUidStore* pStore); void* tdUidStoreFree(STbUidStore* pStore); diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 583a2e098f..2c11b9bf0f 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -474,7 +474,7 @@ _err: return -1; } -int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) { +int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids, tb_uid_t *tbUid) { void *pData = NULL; int nData = 0; int rc = 0; @@ -496,6 +496,10 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi taosArrayPush(tbUids, &uid); } + if ((type == TSDB_CHILD_TABLE) && tbUid) { + *tbUid = uid; + } + tdbFree(pData); return 0; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index fce244bfb8..dc7244ef10 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -34,7 +34,7 @@ typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem; typedef struct SRSmaQTaskInfoIter SRSmaQTaskInfoIter; static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid); -static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids); +static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd); static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat *pStat, SRSmaInfo *pRSmaInfo, int8_t idx); static int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int32_t msgSize, int32_t inputType, SRSmaInfo *pInfo, @@ -175,7 +175,7 @@ static FORCE_INLINE int32_t tdUidStoreInit(STbUidStore **pStore) { return TSDB_CODE_SUCCESS; } -static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) { +static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd) { SRSmaInfo *pRSmaInfo = NULL; if (!suid || !tbUids) { @@ -199,7 +199,7 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pRSmaInfo->taskInfo[i]) { - if (((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, true)) < 0)) { + if (((terrno = qUpdateQualifiedTableId(pRSmaInfo->taskInfo[i], tbUids, isAdd)) < 0)) { tdReleaseRSmaInfo(pSma, pRSmaInfo); smaError("vgId:%d, update tbUidList failed for uid:%" PRIi64 " level %d since %s", SMA_VID(pSma), *suid, i, terrstr()); @@ -215,12 +215,12 @@ static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids) return TSDB_CODE_SUCCESS; } -int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore) { +int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore, bool isAdd) { if (!pStore || (taosArrayGetSize(pStore->tbUids) == 0)) { return TSDB_CODE_SUCCESS; } - if (tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids) != TSDB_CODE_SUCCESS) { + if (tdUpdateTbUidListImpl(pSma, &pStore->suid, pStore->tbUids, isAdd) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } @@ -229,7 +229,7 @@ int32_t tdUpdateTbUidList(SSma *pSma, STbUidStore *pStore) { tb_uid_t *pTbSuid = (tb_uid_t *)taosHashGetKey(pIter, NULL); SArray *pTbUids = *(SArray **)pIter; - if (tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids) != TSDB_CODE_SUCCESS) { + if (tdUpdateTbUidListImpl(pSma, pTbSuid, pTbUids, isAdd) != TSDB_CODE_SUCCESS) { taosHashCancelIterate(pStore->uidHash, pIter); return TSDB_CODE_FAILED; } @@ -1118,7 +1118,7 @@ static int32_t tdRSmaRestoreQTaskInfoInit(SSma *pSma, int64_t *nTables) { goto _err; } - if (tdUpdateTbUidList(pVnode->pSma, &uidStore) < 0) { + if (tdUpdateTbUidList(pVnode->pSma, &uidStore, true) < 0) { smaError("vgId:%d, rsma restore, update tb uid list failed for %" PRIi64 " since %s", TD_VID(pVnode), suid, terrstr()); goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 64caff1542..9ac62b4b59 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -420,6 +420,7 @@ typedef enum { typedef struct { SFSLASTNEXTROWSTATES state; // [input] STsdb *pTsdb; // [input] + tb_uid_t suid; tb_uid_t uid; int32_t nFileSet; int32_t iFileSet; @@ -454,7 +455,7 @@ static int32_t getNextRowFromFSLast(void *iter, TSDBROW **ppRow) { code = tsdbDataFReaderOpen(&state->pDataFReader, state->pTsdb, pFileSet); if (code) goto _err; - tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->uid, + tMergeTreeOpen(&state->mergeTree, 1, state->pDataFReader, state->suid, state->uid, &(STimeWindow){.skey = TSKEY_MIN, .ekey = TSKEY_MAX}, &(SVersionRange){.minVer = 0, .maxVer = UINT64_MAX}); bool hasVal = tMergeTreeNext(&state->mergeTree); @@ -796,7 +797,7 @@ static bool tsdbKeyDeleted(TSDBKEY *key, SArray *pSkyline, int64_t *iSkyline) { if (key->ts > pItemBack->ts) { return false; } else if (key->ts >= pItemFront->ts && key->ts <= pItemBack->ts) { - if ((key->version <= pItemFront->version || key->ts == pItemBack->ts && key->version <= pItemBack->version)) { + if (key->version <= pItemFront->version || (key->ts == pItemBack->ts && key->version <= pItemBack->version)) { return true; } else { return false; @@ -890,6 +891,7 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->fsLastState.state = (SFSLASTNEXTROWSTATES)SFSNEXTROW_FS; pIter->fsLastState.pTsdb = pTsdb; pIter->fsLastState.aDFileSet = pIter->pReadSnap->fs.aDFileSet; + pIter->fsLastState.suid = suid; pIter->fsLastState.uid = uid; pIter->fsState.state = SFSNEXTROW_FS; diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index 6f7a78ee46..fb06203605 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -32,12 +32,12 @@ typedef struct { STbDataIter iter; }; // memory data iter struct { - int32_t iSst; - SArray *aSstBlk; - int32_t iSstBlk; + int32_t iStt; + SArray *aSttBlk; + int32_t iSttBlk; SBlockData bData; int32_t iRow; - }; // sst file data iter + }; // stt file data iter }; } SDataIter; @@ -71,13 +71,13 @@ typedef struct { SDataIter *pIter; SRBTree rbt; SDataIter dataIter; - SDataIter aDataIter[TSDB_MAX_SST_FILE]; + SDataIter aDataIter[TSDB_MAX_STT_FILE]; int8_t toLastOnly; }; struct { SDataFWriter *pWriter; SArray *aBlockIdx; // SArray - SArray *aSstBlk; // SArray + SArray *aSttBlk; // SArray SMapData mBlock; // SMapData SBlockData bData; SBlockData bDatal; @@ -428,21 +428,21 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { pCommitter->toLastOnly = 0; SDataFReader *pReader = pCommitter->dReader.pReader; if (pReader) { - if (pReader->pSet->nSstF >= pCommitter->maxLast) { + if (pReader->pSet->nSttF >= pCommitter->maxLast) { int8_t iIter = 0; - for (int32_t iSst = 0; iSst < pReader->pSet->nSstF; iSst++) { + for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) { pIter = &pCommitter->aDataIter[iIter]; pIter->type = LAST_DATA_ITER; - pIter->iSst = iSst; + pIter->iStt = iStt; - code = tsdbReadSstBlk(pCommitter->dReader.pReader, iSst, pIter->aSstBlk); + code = tsdbReadSttBlk(pCommitter->dReader.pReader, iStt, pIter->aSttBlk); if (code) goto _err; - if (taosArrayGetSize(pIter->aSstBlk) == 0) continue; + if (taosArrayGetSize(pIter->aSttBlk) == 0) continue; - pIter->iSstBlk = 0; - SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, 0); - code = tsdbReadSstBlock(pCommitter->dReader.pReader, iSst, pSstBlk, &pIter->bData); + pIter->iSttBlk = 0; + SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, 0); + code = tsdbReadSttBlock(pCommitter->dReader.pReader, iStt, pSttBlk, &pIter->bData); if (code) goto _err; pIter->iRow = 0; @@ -454,9 +454,9 @@ static int32_t tsdbOpenCommitIter(SCommitter *pCommitter) { iIter++; } } else { - for (int32_t iSst = 0; iSst < pReader->pSet->nSstF; iSst++) { - SSstFile *pSstFile = pReader->pSet->aSstF[iSst]; - if (pSstFile->size > pSstFile->offset) { + for (int32_t iStt = 0; iStt < pReader->pSet->nSttF; iStt++) { + SSttFile *pSttFile = pReader->pSet->aSttF[iStt]; + if (pSttFile->size > pSttFile->offset) { pCommitter->toLastOnly = 1; break; } @@ -512,34 +512,34 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) { SHeadFile fHead = {.commitID = pCommitter->commitID}; SDataFile fData = {.commitID = pCommitter->commitID}; SSmaFile fSma = {.commitID = pCommitter->commitID}; - SSstFile fSst = {.commitID = pCommitter->commitID}; + SSttFile fStt = {.commitID = pCommitter->commitID}; SDFileSet wSet = {.fid = pCommitter->commitFid, .pHeadF = &fHead, .pDataF = &fData, .pSmaF = &fSma}; if (pRSet) { - ASSERT(pRSet->nSstF <= pCommitter->maxLast); + ASSERT(pRSet->nSttF <= pCommitter->maxLast); fData = *pRSet->pDataF; fSma = *pRSet->pSmaF; wSet.diskId = pRSet->diskId; - if (pRSet->nSstF < pCommitter->maxLast) { - for (int32_t iSst = 0; iSst < pRSet->nSstF; iSst++) { - wSet.aSstF[iSst] = pRSet->aSstF[iSst]; + if (pRSet->nSttF < pCommitter->maxLast) { + for (int32_t iStt = 0; iStt < pRSet->nSttF; iStt++) { + wSet.aSttF[iStt] = pRSet->aSttF[iStt]; } - wSet.nSstF = pRSet->nSstF + 1; + wSet.nSttF = pRSet->nSttF + 1; } else { - wSet.nSstF = 1; + wSet.nSttF = 1; } } else { SDiskID did = {0}; tfsAllocDisk(pTsdb->pVnode->pTfs, 0, &did); tfsMkdirRecurAt(pTsdb->pVnode->pTfs, pTsdb->path, did); wSet.diskId = did; - wSet.nSstF = 1; + wSet.nSttF = 1; } - wSet.aSstF[wSet.nSstF - 1] = &fSst; + wSet.aSttF[wSet.nSttF - 1] = &fStt; code = tsdbDataFWriterOpen(&pCommitter->dWriter.pWriter, pTsdb, &wSet); if (code) goto _err; taosArrayClear(pCommitter->dWriter.aBlockIdx); - taosArrayClear(pCommitter->dWriter.aSstBlk); + taosArrayClear(pCommitter->dWriter.aSttBlk); tMapDataReset(&pCommitter->dWriter.mBlock); tBlockDataReset(&pCommitter->dWriter.bData); tBlockDataReset(&pCommitter->dWriter.bDatal); @@ -610,7 +610,7 @@ _err: static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { int32_t code = 0; - SSstBlk blockL; + SSttBlk blockL; SBlockData *pBlockData = &pCommitter->dWriter.bDatal; ASSERT(pBlockData->nRow > 0); @@ -635,8 +635,8 @@ static int32_t tsdbCommitLastBlock(SCommitter *pCommitter) { code = tsdbWriteBlockData(pCommitter->dWriter.pWriter, pBlockData, &blockL.bInfo, NULL, pCommitter->cmprAlg, 1); if (code) goto _err; - // push SSstBlk - if (taosArrayPush(pCommitter->dWriter.aSstBlk, &blockL) == NULL) { + // push SSttBlk + if (taosArrayPush(pCommitter->dWriter.aSttBlk, &blockL) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -658,8 +658,8 @@ static int32_t tsdbCommitFileDataEnd(SCommitter *pCommitter) { code = tsdbWriteBlockIdx(pCommitter->dWriter.pWriter, pCommitter->dWriter.aBlockIdx); if (code) goto _err; - // write aSstBlk - code = tsdbWriteSstBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSstBlk); + // write aSttBlk + code = tsdbWriteSttBlk(pCommitter->dWriter.pWriter, pCommitter->dWriter.aSttBlk); if (code) goto _err; // update file header @@ -757,7 +757,7 @@ static int32_t tsdbStartCommit(STsdb *pTsdb, SCommitter *pCommitter) { pCommitter->minRow = pTsdb->pVnode->config.tsdbCfg.minRows; pCommitter->maxRow = pTsdb->pVnode->config.tsdbCfg.maxRows; pCommitter->cmprAlg = pTsdb->pVnode->config.tsdbCfg.compression; - pCommitter->maxLast = TSDB_DEFAULT_SST_FILE; // TODO: make it as a config + pCommitter->maxLast = TSDB_DEFAULT_STT_FILE; // TODO: make it as a config pCommitter->aTbDataP = tsdbMemTableGetTbDataArray(pTsdb->imem); if (pCommitter->aTbDataP == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -787,10 +787,10 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { if (code) goto _exit; // merger - for (int32_t iSst = 0; iSst < TSDB_MAX_SST_FILE; iSst++) { - SDataIter *pIter = &pCommitter->aDataIter[iSst]; - pIter->aSstBlk = taosArrayInit(0, sizeof(SSstBlk)); - if (pIter->aSstBlk == NULL) { + for (int32_t iStt = 0; iStt < TSDB_MAX_STT_FILE; iStt++) { + SDataIter *pIter = &pCommitter->aDataIter[iStt]; + pIter->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if (pIter->aSttBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -806,8 +806,8 @@ static int32_t tsdbCommitDataStart(SCommitter *pCommitter) { goto _exit; } - pCommitter->dWriter.aSstBlk = taosArrayInit(0, sizeof(SSstBlk)); - if (pCommitter->dWriter.aSstBlk == NULL) { + pCommitter->dWriter.aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if (pCommitter->dWriter.aSttBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -829,15 +829,15 @@ static void tsdbCommitDataEnd(SCommitter *pCommitter) { tBlockDataDestroy(&pCommitter->dReader.bData, 1); // merger - for (int32_t iSst = 0; iSst < TSDB_MAX_SST_FILE; iSst++) { - SDataIter *pIter = &pCommitter->aDataIter[iSst]; - taosArrayDestroy(pIter->aSstBlk); + for (int32_t iStt = 0; iStt < TSDB_MAX_STT_FILE; iStt++) { + SDataIter *pIter = &pCommitter->aDataIter[iStt]; + taosArrayDestroy(pIter->aSttBlk); tBlockDataDestroy(&pIter->bData, 1); } // writer taosArrayDestroy(pCommitter->dWriter.aBlockIdx); - taosArrayDestroy(pCommitter->dWriter.aSstBlk); + taosArrayDestroy(pCommitter->dWriter.aSttBlk); tMapDataClear(&pCommitter->dWriter.mBlock); tBlockDataDestroy(&pCommitter->dWriter.bData, 1); tBlockDataDestroy(&pCommitter->dWriter.bDatal, 1); @@ -1052,11 +1052,11 @@ static int32_t tsdbNextCommitRow(SCommitter *pCommitter) { pIter->r.uid = pIter->bData.uid ? pIter->bData.uid : pIter->bData.aUid[pIter->iRow]; pIter->r.row = tsdbRowFromBlockData(&pIter->bData, pIter->iRow); } else { - pIter->iSstBlk++; - if (pIter->iSstBlk < taosArrayGetSize(pIter->aSstBlk)) { - SSstBlk *pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk); + pIter->iSttBlk++; + if (pIter->iSttBlk < taosArrayGetSize(pIter->aSttBlk)) { + SSttBlk *pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); - code = tsdbReadSstBlock(pCommitter->dReader.pReader, pIter->iSst, pSstBlk, &pIter->bData); + code = tsdbReadSttBlock(pCommitter->dReader.pReader, pIter->iStt, pSttBlk, &pIter->bData); if (code) goto _exit; pIter->iRow = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 14bc1214a6..8ab733aa56 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -113,7 +113,7 @@ _err: // taosRemoveFile(fname); // } -// // sst +// // stt // if (isSameDisk && pFrom->pLastF->commitID == pTo->pLastF->commitID) { // if (pFrom->pLastF->size > pTo->pLastF->size) { // code = tsdbDFileRollback(pFS->pTsdb, pTo, TSDB_LAST_FILE); @@ -143,7 +143,7 @@ _err: // tsdbDataFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pDataF, fname); // taosRemoveFile(fname); -// // sst +// // stt // tsdbLastFileName(pFS->pTsdb, pFrom->diskId, pFrom->fid, pFrom->pLastF, fname); // taosRemoveFile(fname); @@ -258,8 +258,8 @@ void tsdbFSDestroy(STsdbFS *pFS) { taosMemoryFree(pSet->pHeadF); taosMemoryFree(pSet->pDataF); taosMemoryFree(pSet->pSmaF); - for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { - taosMemoryFree(pSet->aSstF[iSst]); + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + taosMemoryFree(pSet->aSttF[iStt]); } } @@ -328,14 +328,14 @@ static int32_t tsdbScanAndTryFixFS(STsdb *pTsdb) { if (code) goto _err; } - // sst =========== - for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { - tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname); + // stt =========== + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); if (taosStatFile(fname, &size, NULL)) { code = TAOS_SYSTEM_ERROR(errno); goto _err; } - if (size != LOGIC_TO_FILE_SIZE(pSet->aSstF[iSst]->size, TSDB_DEFAULT_PAGE_SIZE)) { + if (size != LOGIC_TO_FILE_SIZE(pSet->aSttF[iStt]->size, TSDB_DEFAULT_PAGE_SIZE)) { code = TSDB_CODE_FILE_CORRUPTED; goto _err; } @@ -519,10 +519,10 @@ int32_t tsdbFSClose(STsdb *pTsdb) { ASSERT(pSet->pSmaF->nRef == 1); taosMemoryFree(pSet->pSmaF); - // sst - for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { - ASSERT(pSet->aSstF[iSst]->nRef == 1); - taosMemoryFree(pSet->aSstF[iSst]); + // stt + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + ASSERT(pSet->aSttF[iStt]->nRef == 1); + taosMemoryFree(pSet->aSttF[iStt]); } } @@ -579,14 +579,14 @@ int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) { } *fSet.pSmaF = *pSet->pSmaF; - // sst - for (fSet.nSstF = 0; fSet.nSstF < pSet->nSstF; fSet.nSstF++) { - fSet.aSstF[fSet.nSstF] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); - if (fSet.aSstF[fSet.nSstF] == NULL) { + // stt + for (fSet.nSttF = 0; fSet.nSttF < pSet->nSttF; fSet.nSttF++) { + fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (fSet.aSttF[fSet.nSttF] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - *fSet.aSstF[fSet.nSstF] = *pSet->aSstF[fSet.nSstF]; + *fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF]; } if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) { @@ -639,28 +639,28 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { *pDFileSet->pHeadF = *pSet->pHeadF; *pDFileSet->pDataF = *pSet->pDataF; *pDFileSet->pSmaF = *pSet->pSmaF; - // sst - if (pSet->nSstF > pDFileSet->nSstF) { - ASSERT(pSet->nSstF == pDFileSet->nSstF + 1); + // stt + if (pSet->nSttF > pDFileSet->nSttF) { + ASSERT(pSet->nSttF == pDFileSet->nSttF + 1); - pDFileSet->aSstF[pDFileSet->nSstF] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); - if (pDFileSet->aSstF[pDFileSet->nSstF] == NULL) { + pDFileSet->aSttF[pDFileSet->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pDFileSet->aSttF[pDFileSet->nSttF] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - *pDFileSet->aSstF[pDFileSet->nSstF] = *pSet->aSstF[pSet->nSstF - 1]; - pDFileSet->nSstF++; - } else if (pSet->nSstF < pDFileSet->nSstF) { - ASSERT(pSet->nSstF == 1); - for (int32_t iSst = 1; iSst < pDFileSet->nSstF; iSst++) { - taosMemoryFree(pDFileSet->aSstF[iSst]); + *pDFileSet->aSttF[pDFileSet->nSttF] = *pSet->aSttF[pSet->nSttF - 1]; + pDFileSet->nSttF++; + } else if (pSet->nSttF < pDFileSet->nSttF) { + ASSERT(pSet->nSttF == 1); + for (int32_t iStt = 1; iStt < pDFileSet->nSttF; iStt++) { + taosMemoryFree(pDFileSet->aSttF[iStt]); } - *pDFileSet->aSstF[0] = *pSet->aSstF[0]; - pDFileSet->nSstF = 1; + *pDFileSet->aSttF[0] = *pSet->aSttF[0]; + pDFileSet->nSttF = 1; } else { - for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { - *pDFileSet->aSstF[iSst] = *pSet->aSstF[iSst]; + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + *pDFileSet->aSttF[iStt] = *pSet->aSttF[iStt]; } } @@ -668,8 +668,8 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { } } - ASSERT(pSet->nSstF == 1); - SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nSstF = 1}; + ASSERT(pSet->nSttF == 1); + SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nSttF = 1}; // head fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); @@ -695,13 +695,13 @@ int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) { } *fSet.pSmaF = *pSet->pSmaF; - // sst - fSet.aSstF[0] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); - if (fSet.aSstF[0] == NULL) { + // stt + fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (fSet.aSttF[0] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - *fSet.aSstF[0] = *pSet->aSstF[0]; + *fSet.aSttF[0] = *pSet->aSttF[0]; if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -869,81 +869,81 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { pSetOld->pSmaF->size = pSetNew->pSmaF->size; } - // sst + // stt if (sameDisk) { - if (pSetNew->nSstF > pSetOld->nSstF) { - ASSERT(pSetNew->nSstF = pSetOld->nSstF + 1); - pSetOld->aSstF[pSetOld->nSstF] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); - if (pSetOld->aSstF[pSetOld->nSstF] == NULL) { + if (pSetNew->nSttF > pSetOld->nSttF) { + ASSERT(pSetNew->nSttF = pSetOld->nSttF + 1); + pSetOld->aSttF[pSetOld->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[pSetOld->nSttF] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - *pSetOld->aSstF[pSetOld->nSstF] = *pSetNew->aSstF[pSetOld->nSstF]; - pSetOld->aSstF[pSetOld->nSstF]->nRef = 1; - pSetOld->nSstF++; - } else if (pSetNew->nSstF < pSetOld->nSstF) { - ASSERT(pSetNew->nSstF == 1); - for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) { - SSstFile *pSstFile = pSetOld->aSstF[iSst]; - nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1); + *pSetOld->aSttF[pSetOld->nSttF] = *pSetNew->aSttF[pSetOld->nSttF]; + pSetOld->aSttF[pSetOld->nSttF]->nRef = 1; + pSetOld->nSttF++; + } else if (pSetNew->nSttF < pSetOld->nSttF) { + ASSERT(pSetNew->nSttF == 1); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); if (nRef == 0) { - tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname); + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); taosRemoveFile(fname); - taosMemoryFree(pSstFile); + taosMemoryFree(pSttFile); } - pSetOld->aSstF[iSst] = NULL; + pSetOld->aSttF[iStt] = NULL; } - pSetOld->nSstF = 1; - pSetOld->aSstF[0] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); - if (pSetOld->aSstF[0] == NULL) { + pSetOld->nSttF = 1; + pSetOld->aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[0] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - *pSetOld->aSstF[0] = *pSetNew->aSstF[0]; - pSetOld->aSstF[0]->nRef = 1; + *pSetOld->aSttF[0] = *pSetNew->aSttF[0]; + pSetOld->aSttF[0]->nRef = 1; } else { - for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) { - if (pSetOld->aSstF[iSst]->commitID != pSetNew->aSstF[iSst]->commitID) { - SSstFile *pSstFile = pSetOld->aSstF[iSst]; - nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + if (pSetOld->aSttF[iStt]->commitID != pSetNew->aSttF[iStt]->commitID) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); if (nRef == 0) { - tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname); + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); taosRemoveFile(fname); - taosMemoryFree(pSstFile); + taosMemoryFree(pSttFile); } - pSetOld->aSstF[iSst] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); - if (pSetOld->aSstF[iSst] == NULL) { + pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[iStt] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - *pSetOld->aSstF[iSst] = *pSetNew->aSstF[iSst]; - pSetOld->aSstF[iSst]->nRef = 1; + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; } else { - ASSERT(pSetOld->aSstF[iSst]->size == pSetOld->aSstF[iSst]->size); - ASSERT(pSetOld->aSstF[iSst]->offset == pSetOld->aSstF[iSst]->offset); + ASSERT(pSetOld->aSttF[iStt]->size == pSetOld->aSttF[iStt]->size); + ASSERT(pSetOld->aSttF[iStt]->offset == pSetOld->aSttF[iStt]->offset); } } } } else { - ASSERT(pSetOld->nSstF == pSetNew->nSstF); - for (int32_t iSst = 0; iSst < pSetOld->nSstF; iSst++) { - SSstFile *pSstFile = pSetOld->aSstF[iSst]; - nRef = atomic_sub_fetch_32(&pSstFile->nRef, 1); + ASSERT(pSetOld->nSttF == pSetNew->nSttF); + for (int32_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + SSttFile *pSttFile = pSetOld->aSttF[iStt]; + nRef = atomic_sub_fetch_32(&pSttFile->nRef, 1); if (nRef == 0) { - tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSstFile, fname); + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSttFile, fname); taosRemoveFile(fname); - taosMemoryFree(pSstFile); + taosMemoryFree(pSttFile); } - pSetOld->aSstF[iSst] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); - if (pSetOld->aSstF[iSst] == NULL) { + pSetOld->aSttF[iStt] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (pSetOld->aSttF[iStt] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - *pSetOld->aSstF[iSst] = *pSetNew->aSstF[iSst]; - pSetOld->aSstF[iSst]->nRef = 1; + *pSetOld->aSttF[iStt] = *pSetNew->aSttF[iStt]; + pSetOld->aSttF[iStt]->nRef = 1; } } @@ -977,12 +977,12 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { taosMemoryFree(pSetOld->pSmaF); } - for (int8_t iSst = 0; iSst < pSetOld->nSstF; iSst++) { - nRef = atomic_sub_fetch_32(&pSetOld->aSstF[iSst]->nRef, 1); + for (int8_t iStt = 0; iStt < pSetOld->nSttF; iStt++) { + nRef = atomic_sub_fetch_32(&pSetOld->aSttF[iStt]->nRef, 1); if (nRef == 0) { - tsdbSstFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSstF[iSst], fname); + tsdbSttFileName(pTsdb, pSetOld->diskId, pSetOld->fid, pSetOld->aSttF[iStt], fname); taosRemoveFile(fname); - taosMemoryFree(pSetOld->aSstF[iSst]); + taosMemoryFree(pSetOld->aSttF[iStt]); } } @@ -990,7 +990,7 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { continue; _add_new: - fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSstF = 1}; + fSet = (SDFileSet){.diskId = pSetNew->diskId, .fid = pSetNew->fid, .nSttF = 1}; // head fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile)); @@ -1019,15 +1019,15 @@ int32_t tsdbFSCommit2(STsdb *pTsdb, STsdbFS *pFSNew) { *fSet.pSmaF = *pSetNew->pSmaF; fSet.pSmaF->nRef = 1; - // sst - ASSERT(pSetNew->nSstF == 1); - fSet.aSstF[0] = (SSstFile *)taosMemoryMalloc(sizeof(SSstFile)); - if (fSet.aSstF[0] == NULL) { + // stt + ASSERT(pSetNew->nSttF == 1); + fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile)); + if (fSet.aSttF[0] == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - *fSet.aSstF[0] = *pSetNew->aSstF[0]; - fSet.aSstF[0]->nRef = 1; + *fSet.aSttF[0] = *pSetNew->aSttF[0]; + fSet.aSttF[0]->nRef = 1; if (taosArrayInsert(pTsdb->fs.aDFileSet, iOld, &fSet) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -1075,8 +1075,8 @@ int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) { nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1); ASSERT(nRef > 0); - for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { - nRef = atomic_fetch_add_32(&pSet->aSstF[iSst]->nRef, 1); + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + nRef = atomic_fetch_add_32(&pSet->aSttF[iStt]->nRef, 1); ASSERT(nRef > 0); } @@ -1134,14 +1134,14 @@ void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) { taosMemoryFree(pSet->pSmaF); } - // sst - for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { - nRef = atomic_sub_fetch_32(&pSet->aSstF[iSst]->nRef, 1); + // stt + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1); ASSERT(nRef >= 0); if (nRef == 0) { - tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname); + tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); taosRemoveFile(fname); - taosMemoryFree(pSet->aSstF[iSst]); + taosMemoryFree(pSet->aSttF[iStt]); /* code */ } } diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 2a7966e423..619fc17f5b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -53,22 +53,22 @@ static int32_t tGetDataFile(uint8_t *p, SDataFile *pDataFile) { return n; } -int32_t tPutSstFile(uint8_t *p, SSstFile *pSstFile) { +int32_t tPutSttFile(uint8_t *p, SSttFile *pSttFile) { int32_t n = 0; - n += tPutI64v(p ? p + n : p, pSstFile->commitID); - n += tPutI64v(p ? p + n : p, pSstFile->size); - n += tPutI64v(p ? p + n : p, pSstFile->offset); + n += tPutI64v(p ? p + n : p, pSttFile->commitID); + n += tPutI64v(p ? p + n : p, pSttFile->size); + n += tPutI64v(p ? p + n : p, pSttFile->offset); return n; } -static int32_t tGetSstFile(uint8_t *p, SSstFile *pSstFile) { +static int32_t tGetSttFile(uint8_t *p, SSttFile *pSttFile) { int32_t n = 0; - n += tGetI64v(p + n, &pSstFile->commitID); - n += tGetI64v(p + n, &pSstFile->size); - n += tGetI64v(p + n, &pSstFile->offset); + n += tGetI64v(p + n, &pSttFile->commitID); + n += tGetI64v(p + n, &pSttFile->size); + n += tGetI64v(p + n, &pSttFile->offset); return n; } @@ -102,9 +102,9 @@ void tsdbDataFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SDataFile *pDataF, TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pDataF->commitID, ".data"); } -void tsdbSstFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSstFile *pSstF, char fname[]) { +void tsdbSttFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSttFile *pSttF, char fname[]) { snprintf(fname, TSDB_FILENAME_LEN - 1, "%s%s%s%sv%df%dver%" PRId64 "%s", tfsGetDiskPath(pTsdb->pVnode->pTfs, did), - TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pSstF->commitID, ".sst"); + TD_DIRSEP, pTsdb->path, TD_DIRSEP, TD_VID(pTsdb->pVnode), fid, pSttF->commitID, ".stt"); } void tsdbSmaFileName(STsdb *pTsdb, SDiskID did, int32_t fid, SSmaFile *pSmaF, char fname[]) { @@ -194,10 +194,10 @@ int32_t tPutDFileSet(uint8_t *p, SDFileSet *pSet) { n += tPutDataFile(p ? p + n : p, pSet->pDataF); n += tPutSmaFile(p ? p + n : p, pSet->pSmaF); - // sst - n += tPutU8(p ? p + n : p, pSet->nSstF); - for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { - n += tPutSstFile(p ? p + n : p, pSet->aSstF[iSst]); + // stt + n += tPutU8(p ? p + n : p, pSet->nSttF); + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + n += tPutSttFile(p ? p + n : p, pSet->aSttF[iStt]); } return n; @@ -234,15 +234,15 @@ int32_t tGetDFileSet(uint8_t *p, SDFileSet *pSet) { pSet->pSmaF->nRef = 1; n += tGetSmaFile(p + n, pSet->pSmaF); - // sst - n += tGetU8(p + n, &pSet->nSstF); - for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { - pSet->aSstF[iSst] = (SSstFile *)taosMemoryCalloc(1, sizeof(SSstFile)); - if (pSet->aSstF[iSst] == NULL) { + // stt + n += tGetU8(p + n, &pSet->nSttF); + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + pSet->aSttF[iStt] = (SSttFile *)taosMemoryCalloc(1, sizeof(SSttFile)); + if (pSet->aSttF[iStt] == NULL) { return -1; } - pSet->aSstF[iSst]->nRef = 1; - n += tGetSstFile(p + n, pSet->aSstF[iSst]); + pSet->aSttF[iStt]->nRef = 1; + n += tGetSttFile(p + n, pSet->aSttF[iStt]); } return n; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 171e32007a..a072f22fa9 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -18,12 +18,12 @@ // SLDataIter ================================================= struct SLDataIter { SRBTreeNode node; - SSstBlk *pSstBlk; + SSttBlk *pSttBlk; SDataFReader *pReader; - int32_t iSst; + int32_t iStt; int8_t backward; - SArray *aSstBlk; - int32_t iSstBlk; + SArray *aSttBlk; + int32_t iSttBlk; SBlockData bData[2]; int32_t loadIndex; int32_t iRow; @@ -40,8 +40,8 @@ static SBlockData *getNextBlock(SLDataIter *pIter) { return getCurrentBlock(pIter); } -int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iSst, int8_t backward, uint64_t uid, - STimeWindow *pTimeWindow, SVersionRange *pRange) { +int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t iStt, int8_t backward, uint64_t suid, + uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pRange) { int32_t code = 0; *pIter = taosMemoryCalloc(1, sizeof(SLDataIter)); if (*pIter == NULL) { @@ -50,13 +50,13 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t } (*pIter)->uid = uid; - (*pIter)->timeWindow = *pTimeWindow; - (*pIter)->verRange = *pRange; (*pIter)->pReader = pReader; - (*pIter)->iSst = iSst; + (*pIter)->iStt = iStt; (*pIter)->backward = backward; - (*pIter)->aSstBlk = taosArrayInit(0, sizeof(SSstBlk)); - if ((*pIter)->aSstBlk == NULL) { + (*pIter)->verRange = *pRange; + (*pIter)->timeWindow = *pTimeWindow; + (*pIter)->aSttBlk = taosArrayInit(0, sizeof(SSttBlk)); + if ((*pIter)->aSttBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -71,18 +71,22 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t goto _exit; } - code = tsdbReadSstBlk(pReader, iSst, (*pIter)->aSstBlk); + code = tsdbReadSttBlk(pReader, iStt, (*pIter)->aSttBlk); if (code) { goto _exit; } - size_t size = taosArrayGetSize((*pIter)->aSstBlk); + size_t size = taosArrayGetSize((*pIter)->aSttBlk); // find the start block int32_t index = -1; if (!backward) { // asc for (int32_t i = 0; i < size; ++i) { - SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i); + SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i); + if (p->suid != suid) { + continue; + } + if (p->minUid <= uid && p->maxUid >= uid) { index = i; break; @@ -90,7 +94,11 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t } } else { // desc for (int32_t i = size - 1; i >= 0; --i) { - SSstBlk *p = taosArrayGet((*pIter)->aSstBlk, i); + SSttBlk *p = taosArrayGet((*pIter)->aSttBlk, i); + if (p->suid != suid) { + continue; + } + if (p->minUid <= uid && p->maxUid >= uid) { index = i; break; @@ -98,9 +106,9 @@ int32_t tLDataIterOpen(struct SLDataIter **pIter, SDataFReader *pReader, int32_t } } - (*pIter)->iSstBlk = index; + (*pIter)->iSttBlk = index; if (index != -1) { - (*pIter)->pSstBlk = taosArrayGet((*pIter)->aSstBlk, (*pIter)->iSstBlk); + (*pIter)->pSttBlk = taosArrayGet((*pIter)->aSttBlk, (*pIter)->iSttBlk); } _exit: @@ -110,18 +118,18 @@ _exit: void tLDataIterClose(SLDataIter *pIter) { tBlockDataDestroy(&pIter->bData[0], 1); tBlockDataDestroy(&pIter->bData[1], 1); - taosArrayDestroy(pIter->aSstBlk); + taosArrayDestroy(pIter->aSttBlk); taosMemoryFree(pIter); } void tLDataIterNextBlock(SLDataIter *pIter) { int32_t step = pIter->backward ? -1 : 1; - pIter->iSstBlk += step; + pIter->iSttBlk += step; int32_t index = -1; - size_t size = taosArrayGetSize(pIter->aSstBlk); - for (int32_t i = pIter->iSstBlk; i < size && i >= 0; i += step) { - SSstBlk *p = taosArrayGet(pIter->aSstBlk, i); + size_t size = taosArrayGetSize(pIter->aSttBlk); + for (int32_t i = pIter->iSttBlk; i < size && i >= 0; i += step) { + SSttBlk *p = taosArrayGet(pIter->aSttBlk, i); if ((!pIter->backward) && p->minUid > pIter->uid) { break; } @@ -130,16 +138,38 @@ void tLDataIterNextBlock(SLDataIter *pIter) { break; } + // check uid firstly if (p->minUid <= pIter->uid && p->maxUid >= pIter->uid) { - index = i; - break; + if ((!pIter->backward) && p->minKey > pIter->timeWindow.ekey) { + break; + } + + if (pIter->backward && p->maxKey < pIter->timeWindow.skey) { + break; + } + + // check time range secondly + if (p->minKey <= pIter->timeWindow.ekey && p->maxKey >= pIter->timeWindow.skey) { + if ((!pIter->backward) && p->minVer > pIter->verRange.maxVer) { + break; + } + + if (pIter->backward && p->maxVer < pIter->verRange.minVer) { + break; + } + + if (p->minVer <= pIter->verRange.maxVer && p->maxVer >= pIter->verRange.minVer) { + index = i; + break; + } + } } } if (index == -1) { - pIter->pSstBlk = NULL; + pIter->pSttBlk = NULL; } else { - pIter->pSstBlk = (SSstBlk *)taosArrayGet(pIter->aSstBlk, pIter->iSstBlk); + pIter->pSttBlk = (SSttBlk *)taosArrayGet(pIter->aSttBlk, pIter->iSttBlk); } } @@ -192,14 +222,6 @@ static void findNextValidRow(SLDataIter *pIter) { continue; } - // todo handle delete soon -#if 0 - TSDBKEY k = {.ts = ts, .version = ver}; - if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) { - continue; - } -#endif - hasVal = true; break; } @@ -212,16 +234,16 @@ bool tLDataIterNextRow(SLDataIter *pIter) { int32_t step = pIter->backward ? -1 : 1; // no qualified last file block in current file, no need to fetch row - if (pIter->pSstBlk == NULL) { + if (pIter->pSttBlk == NULL) { return false; } - int32_t iBlockL = pIter->iSstBlk; + int32_t iBlockL = pIter->iSttBlk; SBlockData *pBlockData = getCurrentBlock(pIter); - if (pBlockData->nRow == 0 && pIter->pSstBlk != NULL) { // current block not loaded yet + if (pBlockData->nRow == 0 && pIter->pSttBlk != NULL) { // current block not loaded yet pBlockData = getNextBlock(pIter); - code = tsdbReadSstBlock(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData); + code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData); if (code != TSDB_CODE_SUCCESS) { goto _exit; } @@ -236,16 +258,16 @@ bool tLDataIterNextRow(SLDataIter *pIter) { if (pIter->iRow >= pBlockData->nRow || pIter->iRow < 0) { tLDataIterNextBlock(pIter); - if (pIter->pSstBlk == NULL) { // no more data + if (pIter->pSttBlk == NULL) { // no more data goto _exit; } } else { break; } - if (iBlockL != pIter->iSstBlk) { + if (iBlockL != pIter->iSttBlk) { pBlockData = getNextBlock(pIter); - code = tsdbReadSstBlock(pIter->pReader, pIter->iSst, pIter->pSstBlk, pBlockData); + code = tsdbReadSttBlock(pIter->pReader, pIter->iStt, pIter->pSttBlk, pBlockData); if (code) { goto _exit; } @@ -262,7 +284,7 @@ _exit: terrno = code; } - return (code == TSDB_CODE_SUCCESS) && (pIter->pSstBlk != NULL); + return (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL); } SRowInfo *tLDataIterGet(SLDataIter *pIter) { return &pIter->rInfo; } @@ -290,7 +312,7 @@ static FORCE_INLINE int32_t tLDataIterCmprFn(const void *p1, const void *p2) { } } -int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t uid, +int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFReader, uint64_t suid, uint64_t uid, STimeWindow *pTimeWindow, SVersionRange *pVerRange) { pMTree->backward = backward; pMTree->pIter = NULL; @@ -302,9 +324,9 @@ int32_t tMergeTreeOpen(SMergeTree *pMTree, int8_t backward, SDataFReader *pFRead tRBTreeCreate(&pMTree->rbt, tLDataIterCmprFn); int32_t code = TSDB_CODE_OUT_OF_MEMORY; - struct SLDataIter *pIterList[TSDB_DEFAULT_SST_FILE] = {0}; - for (int32_t i = 0; i < pFReader->pSet->nSstF; ++i) { // open all last file - code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, uid, pTimeWindow, pVerRange); + struct SLDataIter *pIterList[TSDB_DEFAULT_STT_FILE] = {0}; + for (int32_t i = 0; i < pFReader->pSet->nSttF; ++i) { // open all last file + code = tLDataIterOpen(&pIterList[i], pFReader, i, pMTree->backward, suid, uid, pTimeWindow, pVerRange); if (code != TSDB_CODE_SUCCESS) { goto _end; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 1828f1e40e..8a51fc73a6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -226,16 +226,13 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK return NULL; } + int32_t step = ASCENDING_TRAVERSE(pTsdbReader->order)? 1:-1; for (int32_t j = 0; j < numOfTables; ++j) { STableBlockScanInfo info = {.lastKey = 0, .uid = idList[j].uid}; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { - if (info.lastKey == INT64_MIN || info.lastKey < pTsdbReader->window.skey) { - info.lastKey = pTsdbReader->window.skey; - } - - ASSERT(info.lastKey >= pTsdbReader->window.skey && info.lastKey <= pTsdbReader->window.ekey); + info.lastKey = pTsdbReader->window.skey - step; } else { - info.lastKey = pTsdbReader->window.skey; + info.lastKey = pTsdbReader->window.ekey - step; } taosHashPut(pTableMap, &info.uid, sizeof(uint64_t), &info, sizeof(info)); @@ -249,7 +246,7 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK return pTableMap; } -static void resetDataBlockScanInfo(SHashObj* pTableMap) { +static void resetDataBlockScanInfo(SHashObj* pTableMap, int64_t ts) { STableBlockScanInfo* p = NULL; while ((p = taosHashIterate(pTableMap, p)) != NULL) { @@ -260,6 +257,7 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) { } p->delSkyline = taosArrayDestroy(p->delSkyline); + p->lastKey = ts; } } @@ -621,7 +619,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN } } - pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSstF; + pBlockNum->numOfLastFiles = pReader->pFileReader->pSet->nSttF; int32_t total = pBlockNum->numOfLastFiles + pBlockNum->numOfBlocks; double el = (taosGetTimestampUs() - st) / 1000.0; @@ -1163,7 +1161,7 @@ static bool fileBlockShouldLoad(STsdbReader* pReader, SFileDataBlockInfo* pFBloc bool overlapWithlastBlock = false; #if 0 if (taosArrayGetSize(pLastBlockReader->pSstBlk) > 0 && (pLastBlockReader->currentBlockIndex != -1)) { - SSstBlk* pSstBlk = taosArrayGet(pLastBlockReader->pSstBlk, pLastBlockReader->currentBlockIndex); + SSttBlk* pSstBlk = taosArrayGet(pLastBlockReader->pSstBlk, pLastBlockReader->currentBlockIndex); overlapWithlastBlock = !(pBlock->maxKey.ts < pSstBlk->minKey || pBlock->minKey.ts > pSstBlk->maxKey); } #endif @@ -1380,7 +1378,7 @@ static int32_t doMergeFileBlockAndLastBlock(SLastBlockReader* pLastBlockReader, bool mergeBlockData) { SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; // SBlockData* pLastBlockData = &pLastBlockReader->lastBlockData; - int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); + int64_t tsLastBlock = getCurrentKeyInLastBlock(pLastBlockReader); STSRow* pTSRow = NULL; SRowMerger merge = {0}; @@ -1430,6 +1428,10 @@ static int32_t mergeFileBlockAndLastBlock(STsdbReader* pReader, SLastBlockReader tRowMergerInit(&merge, &fRow, pReader->pSchema); doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pReader, &merge); + + TSDBROW fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree); + tRowMerge(&merge, &fRow1); + doMergeRowsInLastBlock(pLastBlockReader, pBlockScanInfo, ts, &merge); int32_t code = tRowMergerGetRow(&merge, &pTSRow); @@ -1779,7 +1781,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea pBlockScanInfo->iter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iter.iter) != NULL); tsdbDebug("%p uid:%" PRId64 ", check data in mem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 - "-%" PRId64 " %s", + "-%" PRId64 " %s", pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, d->minKey, d->maxKey, pReader->idStr); } else { tsdbError("%p uid:%" PRId64 ", failed to create iterator for imem, code:%s, %s", pReader, pBlockScanInfo->uid, @@ -1800,7 +1802,7 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea pBlockScanInfo->iiter.hasVal = (tsdbTbDataIterGet(pBlockScanInfo->iiter.iter) != NULL); tsdbDebug("%p uid:%" PRId64 ", check data in imem from skey:%" PRId64 ", order:%d, ts range in buf:%" PRId64 - "-%" PRId64 " %s", + "-%" PRId64 " %s", pReader, pBlockScanInfo->uid, startKey.ts, pReader->order, di->minKey, di->maxKey, pReader->idStr); } else { tsdbError("%p uid:%" PRId64 ", failed to create iterator for mem, code:%s, %s", pReader, pBlockScanInfo->uid, @@ -1850,7 +1852,7 @@ static bool isValidFileBlockRow(SBlockData* pBlockData, SFileBlockDumpInfo* pDum static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo) { - while(1) { + while (1) { bool hasVal = tMergeTreeNext(&pLastBlockReader->mergeTree); if (!hasVal) { return false; @@ -1862,61 +1864,6 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc return true; } } - -#if 0 - *(pLastBlockReader->rowIndex) += step; - - SBlockData* pBlockData = &pLastBlockReader->lastBlockData; - for (int32_t i = *(pLastBlockReader->rowIndex); i < pBlockData->nRow && i >= 0; i += step) { - if (pBlockData->aUid != NULL) { - if (asc) { - if (pBlockData->aUid[i] < pLastBlockReader->uid) { - continue; - } else if (pBlockData->aUid[i] > pLastBlockReader->uid) { - break; - } - } else { - if (pBlockData->aUid[i] > pLastBlockReader->uid) { - continue; - } else if (pBlockData->aUid[i] < pLastBlockReader->uid) { - break; - } - } - } - - int64_t ts = pBlockData->aTSKEY[i]; - if (ts < pLastBlockReader->window.skey) { - continue; - } - - int64_t ver = pBlockData->aVersion[i]; - if (ver < pLastBlockReader->verRange.minVer) { - continue; - } - - // no data any more, todo opt handle desc case - if (ts > pLastBlockReader->window.ekey) { - continue; - } - - // todo opt handle desc case - if (ver > pLastBlockReader->verRange.maxVer) { - continue; - } - - TSDBKEY k = {.ts = ts, .version = ver}; - if (hasBeenDropped(pBlockScanInfo->delSkyline, &pBlockScanInfo->lastBlockDelIndex, &k, pLastBlockReader->order)) { - continue; - } - - *(pLastBlockReader->rowIndex) = i; - return true; - } - - // set all data is consumed in last block - setAllRowsChecked(pLastBlockReader); - return false; -#endif } static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pBlockScanInfo, @@ -1932,9 +1879,18 @@ static bool initLastBlockReader(SLastBlockReader* pLastBlockReader, STableBlockS initMemDataIterator(pBlockScanInfo, pReader); pLastBlockReader->uid = pBlockScanInfo->uid; + + int32_t step = ASCENDING_TRAVERSE(pLastBlockReader->order)? 1:-1; + STimeWindow w = pLastBlockReader->window; + if (ASCENDING_TRAVERSE(pLastBlockReader->order)) { + w.skey = pBlockScanInfo->lastKey + step; + } else { + w.ekey = pBlockScanInfo->lastKey + step; + } + int32_t code = tMergeTreeOpen(&pLastBlockReader->mergeTree, (pLastBlockReader->order == TSDB_ORDER_DESC), pReader->pFileReader, - pBlockScanInfo->uid, &pLastBlockReader->window, &pLastBlockReader->verRange); + pReader->suid, pBlockScanInfo->uid, &w, &pLastBlockReader->verRange); if (code != TSDB_CODE_SUCCESS) { return false; } @@ -2166,7 +2122,7 @@ _err: } static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader) { - TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}; + TSDBKEY key = {.ts = TSKEY_INITIAL_VAL}; TSDBROW* pRow = getValidMemRow(&pScanInfo->iter, pScanInfo->delSkyline, pReader); if (pRow != NULL) { key = TSDBROW_KEY(pRow); @@ -2204,7 +2160,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum) { return code; } - if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSstF > 0) { + if (taosArrayGetSize(pIndexList) > 0 || pReader->pFileReader->pSet->nSttF > 0) { code = doLoadFileBlock(pReader, pIndexList, pBlockNum); if (code != TSDB_CODE_SUCCESS) { taosArrayDestroy(pIndexList); @@ -2314,7 +2270,7 @@ static int32_t doLoadLastBlockSequentially(STsdbReader* pReader) { while (1) { // load the last data block of current table STableBlockScanInfo* pScanInfo = pStatus->pTableIter; - bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); + bool hasVal = initLastBlockReader(pLastBlockReader, pScanInfo, pReader); if (!hasVal) { bool hasNexTable = moveToNextTable(pOrderedCheckInfo, pStatus); if (!hasNexTable) { @@ -2537,22 +2493,24 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { bool hasNext = blockIteratorNext(&pReader->status.blockIter); if (hasNext) { // check for the next block in the block accessed order list initBlockDumpInfo(pReader, pBlockIter); - } else if (hasDataInLastBlock(pReader->status.fileIter.pLastBlockReader)) { - // data blocks in current file are exhausted, let's try the next file now - tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->order); - goto _begin; } else { - code = initForFirstBlockInFile(pReader, pBlockIter); - - // error happens or all the data files are completely checked - if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { - return code; - } - - // this file does not have blocks, let's start check the last block file - if (pBlockIter->numOfBlocks == 0) { + if (pReader->status.pCurrentFileset->nSttF > 0) { + // data blocks in current file are exhausted, let's try the next file now + tBlockDataReset(&pReader->status.fileBlockData); + resetDataBlockIterator(pBlockIter, pReader->order); goto _begin; + } else { + code = initForFirstBlockInFile(pReader, pBlockIter); + + // error happens or all the data files are completely checked + if ((code != TSDB_CODE_SUCCESS) || (pReader->status.loadFromFile == false)) { + return code; + } + + // this file does not have blocks, let's start check the last block file + if (pBlockIter->numOfBlocks == 0) { + goto _begin; + } } } } @@ -2898,6 +2856,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc int32_t doMergeRowsInLastBlock(SLastBlockReader* pLastBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, SRowMerger* pMerger) { + pScanInfo->lastKey = ts; while (nextRowFromLastBlocks(pLastBlockReader, pScanInfo)) { int64_t next1 = getCurrentKeyInLastBlock(pLastBlockReader); if (next1 == ts) { @@ -3589,11 +3548,12 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { tsdbDataFReaderClose(&pReader->pFileReader); int32_t numOfTables = taosHashGetSize(pReader->status.pTableMap); - tsdbDataFReaderClose(&pReader->pFileReader); initFilesetIterator(&pReader->status.fileIter, pReader->pReadSnap->fs.aDFileSet, pReader); resetDataBlockIterator(&pReader->status.blockIter, pReader->order); - resetDataBlockScanInfo(pReader->status.pTableMap); + + int64_t ts = ASCENDING_TRAVERSE(pReader->order)?pReader->window.skey-1:pReader->window.ekey+1; + resetDataBlockScanInfo(pReader->status.pTableMap, ts); int32_t code = 0; SDataBlockIter* pBlockIter = &pReader->status.blockIter; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 25daec76c6..128bfe37da 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -226,13 +226,13 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS .pHeadF = &pWriter->fHead, .pDataF = &pWriter->fData, .pSmaF = &pWriter->fSma, - .nSstF = pSet->nSstF}; + .nSttF = pSet->nSttF}; pWriter->fHead = *pSet->pHeadF; pWriter->fData = *pSet->pDataF; pWriter->fSma = *pSet->pSmaF; - for (int8_t iSst = 0; iSst < pSet->nSstF; iSst++) { - pWriter->wSet.aSstF[iSst] = &pWriter->fSst[iSst]; - pWriter->fSst[iSst] = *pSet->aSstF[iSst]; + for (int8_t iStt = 0; iStt < pSet->nSttF; iStt++) { + pWriter->wSet.aSttF[iStt] = &pWriter->fStt[iStt]; + pWriter->fStt[iStt] = *pSet->aSttF[iStt]; } // head @@ -276,15 +276,15 @@ int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pS pWriter->fSma.size += TSDB_FHDR_SIZE; } - // sst - ASSERT(pWriter->fSst[pSet->nSstF - 1].size == 0); + // stt + ASSERT(pWriter->fStt[pSet->nSttF - 1].size == 0); flag = TD_FILE_READ | TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC; - tsdbSstFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fSst[pSet->nSstF - 1], fname); - code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSstFD); + tsdbSttFileName(pTsdb, pWriter->wSet.diskId, pWriter->wSet.fid, &pWriter->fStt[pSet->nSttF - 1], fname); + code = tsdbOpenFile(fname, szPage, flag, &pWriter->pSttFD); if (code) goto _err; - code = tsdbWriteFile(pWriter->pSstFD, 0, hdr, TSDB_FHDR_SIZE); + code = tsdbWriteFile(pWriter->pSttFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; - pWriter->fSst[pWriter->wSet.nSstF - 1].size += TSDB_FHDR_SIZE; + pWriter->fStt[pWriter->wSet.nSttF - 1].size += TSDB_FHDR_SIZE; *ppWriter = pWriter; return code; @@ -312,14 +312,14 @@ int32_t tsdbDataFWriterClose(SDataFWriter **ppWriter, int8_t sync) { code = tsdbFsyncFile((*ppWriter)->pSmaFD); if (code) goto _err; - code = tsdbFsyncFile((*ppWriter)->pSstFD); + code = tsdbFsyncFile((*ppWriter)->pSttFD); if (code) goto _err; } tsdbCloseFile(&(*ppWriter)->pHeadFD); tsdbCloseFile(&(*ppWriter)->pDataFD); tsdbCloseFile(&(*ppWriter)->pSmaFD); - tsdbCloseFile(&(*ppWriter)->pSstFD); + tsdbCloseFile(&(*ppWriter)->pSttFD); for (int32_t iBuf = 0; iBuf < sizeof((*ppWriter)->aBuf) / sizeof(uint8_t *); iBuf++) { tFree((*ppWriter)->aBuf[iBuf]); @@ -357,10 +357,10 @@ int32_t tsdbUpdateDFileSetHeader(SDataFWriter *pWriter) { code = tsdbWriteFile(pWriter->pSmaFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; - // sst ============== + // stt ============== memset(hdr, 0, TSDB_FHDR_SIZE); - tPutSstFile(hdr, &pWriter->fSst[pWriter->wSet.nSstF - 1]); - code = tsdbWriteFile(pWriter->pSstFD, 0, hdr, TSDB_FHDR_SIZE); + tPutSttFile(hdr, &pWriter->fStt[pWriter->wSet.nSttF - 1]); + code = tsdbWriteFile(pWriter->pSttFD, 0, hdr, TSDB_FHDR_SIZE); if (code) goto _err; return code; @@ -454,22 +454,22 @@ _err: return code; } -int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) { +int32_t tsdbWriteSttBlk(SDataFWriter *pWriter, SArray *aSttBlk) { int32_t code = 0; - SSstFile *pSstFile = &pWriter->fSst[pWriter->wSet.nSstF - 1]; + SSttFile *pSttFile = &pWriter->fStt[pWriter->wSet.nSttF - 1]; int64_t size; int64_t n; // check - if (taosArrayGetSize(aSstBlk) == 0) { - pSstFile->offset = pSstFile->size; + if (taosArrayGetSize(aSttBlk) == 0) { + pSttFile->offset = pSttFile->size; goto _exit; } // size size = 0; - for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) { - size += tPutSstBlk(NULL, taosArrayGet(aSstBlk, iBlockL)); + for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSttBlk); iBlockL++) { + size += tPutSttBlk(NULL, taosArrayGet(aSttBlk, iBlockL)); } // alloc @@ -478,21 +478,21 @@ int32_t tsdbWriteSstBlk(SDataFWriter *pWriter, SArray *aSstBlk) { // encode n = 0; - for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSstBlk); iBlockL++) { - n += tPutSstBlk(pWriter->aBuf[0] + n, taosArrayGet(aSstBlk, iBlockL)); + for (int32_t iBlockL = 0; iBlockL < taosArrayGetSize(aSttBlk); iBlockL++) { + n += tPutSttBlk(pWriter->aBuf[0] + n, taosArrayGet(aSttBlk, iBlockL)); } // write - code = tsdbWriteFile(pWriter->pSstFD, pSstFile->size, pWriter->aBuf[0], size); + code = tsdbWriteFile(pWriter->pSttFD, pSttFile->size, pWriter->aBuf[0], size); if (code) goto _err; // update - pSstFile->offset = pSstFile->size; - pSstFile->size += size; + pSttFile->offset = pSttFile->size; + pSttFile->size += size; _exit: - tsdbTrace("vgId:%d tsdb write sst block, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), - pSstFile->offset, size); + tsdbTrace("vgId:%d tsdb write stt block, loffset:%" PRId64 " size:%" PRId64, TD_VID(pWriter->pTsdb->pVnode), + pSttFile->offset, size); return code; _err: @@ -546,7 +546,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock ASSERT(pBlockData->nRow > 0); if (toLast) { - pBlkInfo->offset = pWriter->fSst[pWriter->wSet.nSstF - 1].size; + pBlkInfo->offset = pWriter->fStt[pWriter->wSet.nSttF - 1].size; } else { pBlkInfo->offset = pWriter->fData.size; } @@ -558,7 +558,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock if (code) goto _err; // write ================= - STsdbFD *pFD = toLast ? pWriter->pSstFD : pWriter->pDataFD; + STsdbFD *pFD = toLast ? pWriter->pSttFD : pWriter->pDataFD; pBlkInfo->szKey = aBufN[3] + aBufN[2]; pBlkInfo->szBlock = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3]; @@ -585,7 +585,7 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, SBlock // update info if (toLast) { - pWriter->fSst[pWriter->wSet.nSstF - 1].size += pBlkInfo->szBlock; + pWriter->fStt[pWriter->wSet.nSttF - 1].size += pBlkInfo->szBlock; } else { pWriter->fData.size += pBlkInfo->szBlock; } @@ -664,9 +664,9 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { taosCloseFile(&pOutFD); taosCloseFile(&PInFD); - // sst - tsdbSstFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSstF[0], fNameFrom); - tsdbSstFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSstF[0], fNameTo); + // stt + tsdbSttFileName(pTsdb, pSetFrom->diskId, pSetFrom->fid, pSetFrom->aSttF[0], fNameFrom); + tsdbSttFileName(pTsdb, pSetTo->diskId, pSetTo->fid, pSetTo->aSttF[0], fNameTo); pOutFD = taosOpenFile(fNameTo, TD_FILE_WRITE | TD_FILE_CREATE | TD_FILE_TRUNC); if (pOutFD == NULL) { @@ -680,7 +680,7 @@ int32_t tsdbDFileSetCopy(STsdb *pTsdb, SDFileSet *pSetFrom, SDFileSet *pSetTo) { goto _err; } - n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aSstF[0]->size); + n = taosFSendFile(pOutFD, PInFD, 0, pSetFrom->aSttF[0]->size); if (n < 0) { code = TAOS_SYSTEM_ERROR(errno); goto _err; @@ -750,10 +750,10 @@ int32_t tsdbDataFReaderOpen(SDataFReader **ppReader, STsdb *pTsdb, SDFileSet *pS code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->pSmaFD); if (code) goto _err; - // sst - for (int32_t iSst = 0; iSst < pSet->nSstF; iSst++) { - tsdbSstFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSstF[iSst], fname); - code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSstFD[iSst]); + // stt + for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) { + tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname); + code = tsdbOpenFile(fname, szPage, TD_FILE_READ, &pReader->aSttFD[iStt]); if (code) goto _err; } @@ -779,10 +779,10 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader) { // sma tsdbCloseFile(&(*ppReader)->pSmaFD); - // sst - for (int32_t iSst = 0; iSst < TSDB_MAX_SST_FILE; iSst++) { - if ((*ppReader)->aSstFD[iSst]) { - tsdbCloseFile(&(*ppReader)->aSstFD[iSst]); + // stt + for (int32_t iStt = 0; iStt < TSDB_MAX_STT_FILE; iStt++) { + if ((*ppReader)->aSttFD[iStt]) { + tsdbCloseFile(&(*ppReader)->aSttFD[iStt]); } } @@ -835,13 +835,13 @@ _err: return code; } -int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) { +int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk) { int32_t code = 0; - SSstFile *pSstFile = pReader->pSet->aSstF[iSst]; - int64_t offset = pSstFile->offset; - int64_t size = pSstFile->size - offset; + SSttFile *pSttFile = pReader->pSet->aSttF[iStt]; + int64_t offset = pSttFile->offset; + int64_t size = pSttFile->size - offset; - taosArrayClear(aSstBlk); + taosArrayClear(aSttBlk); if (size == 0) return code; // alloc @@ -849,16 +849,16 @@ int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) { if (code) goto _err; // read - code = tsdbReadFile(pReader->aSstFD[iSst], offset, pReader->aBuf[0], size); + code = tsdbReadFile(pReader->aSttFD[iStt], offset, pReader->aBuf[0], size); if (code) goto _err; // decode int64_t n = 0; while (n < size) { - SSstBlk sstBlk; - n += tGetSstBlk(pReader->aBuf[0] + n, &sstBlk); + SSttBlk sttBlk; + n += tGetSttBlk(pReader->aBuf[0] + n, &sttBlk); - if (taosArrayPush(aSstBlk, &sstBlk) == NULL) { + if (taosArrayPush(aSttBlk, &sttBlk) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -868,7 +868,7 @@ int32_t tsdbReadSstBlk(SDataFReader *pReader, int32_t iSst, SArray *aSstBlk) { return code; _err: - tsdbError("vgId:%d read sst blk failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d read stt blk failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } @@ -1107,25 +1107,25 @@ _err: return code; } -int32_t tsdbReadSstBlock(SDataFReader *pReader, int32_t iSst, SSstBlk *pSstBlk, SBlockData *pBlockData) { +int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) { int32_t code = 0; // alloc - code = tRealloc(&pReader->aBuf[0], pSstBlk->bInfo.szBlock); + code = tRealloc(&pReader->aBuf[0], pSttBlk->bInfo.szBlock); if (code) goto _err; // read - code = tsdbReadFile(pReader->aSstFD[iSst], pSstBlk->bInfo.offset, pReader->aBuf[0], pSstBlk->bInfo.szBlock); + code = tsdbReadFile(pReader->aSttFD[iStt], pSttBlk->bInfo.offset, pReader->aBuf[0], pSttBlk->bInfo.szBlock); if (code) goto _err; // decmpr - code = tDecmprBlockData(pReader->aBuf[0], pSstBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); + code = tDecmprBlockData(pReader->aBuf[0], pSttBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); if (code) goto _err; return code; _err: - tsdbError("vgId:%d tsdb read sst block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); + tsdbError("vgId:%d tsdb read stt block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRetention.c b/source/dnode/vnode/src/tsdb/tsdbRetention.c index ee29538a81..d10613e719 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRetention.c +++ b/source/dnode/vnode/src/tsdb/tsdbRetention.c @@ -60,7 +60,7 @@ int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now) { if (expLevel < 0) { taosMemoryFree(pSet->pHeadF); taosMemoryFree(pSet->pDataF); - taosMemoryFree(pSet->aSstF[0]); + taosMemoryFree(pSet->aSttF[0]); taosMemoryFree(pSet->pSmaF); taosArrayRemove(fs.aDFileSet, iSet); iSet--; diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 8d19a2ffb8..9fc5639c5e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -27,9 +27,9 @@ struct STsdbSnapReader { int32_t fid; SDataFReader* pDataFReader; SArray* aBlockIdx; // SArray - SArray* aSstBlk; // SArray + SArray* aSstBlk; // SArray SBlockIdx* pBlockIdx; - SSstBlk* pSstBlk; + SSttBlk* pSstBlk; int32_t iBlockIdx; int32_t iBlockL; @@ -64,7 +64,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { code = tsdbReadBlockIdx(pReader->pDataFReader, pReader->aBlockIdx); if (code) goto _err; - code = tsdbReadSstBlk(pReader->pDataFReader, 0, pReader->aSstBlk); + code = tsdbReadSttBlk(pReader->pDataFReader, 0, pReader->aSstBlk); if (code) goto _err; // init @@ -87,7 +87,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { break; } - pReader->pSstBlk = (SSstBlk*)taosArrayGet(pReader->aSstBlk, pReader->iBlockL); + pReader->pSstBlk = (SSttBlk*)taosArrayGet(pReader->aSstBlk, pReader->iBlockL); if (pReader->pSstBlk->minVer <= pReader->ever && pReader->pSstBlk->maxVer >= pReader->sver) { // TODO break; @@ -151,7 +151,7 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { // next pReader->iBlockL++; if (pReader->iBlockL < taosArrayGetSize(pReader->aSstBlk)) { - pReader->pSstBlk = (SSstBlk*)taosArrayGetSize(pReader->aSstBlk); + pReader->pSstBlk = (SSttBlk*)taosArrayGetSize(pReader->aSstBlk); } else { pReader->pSstBlk = NULL; } @@ -298,7 +298,7 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, int8_t type code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } - pReader->aSstBlk = taosArrayInit(0, sizeof(SSstBlk)); + pReader->aSstBlk = taosArrayInit(0, sizeof(SSttBlk)); if (pReader->aSstBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -431,7 +431,7 @@ struct STsdbSnapWriter { SBlockData* pBlockData; int32_t iRow; SBlockData bDataR; - SArray* aSstBlk; // SArray + SArray* aSstBlk; // SArray int32_t iBlockL; SBlockData lDataR; @@ -443,7 +443,7 @@ struct STsdbSnapWriter { SMapData mBlockW; // SMapData SArray* aBlockIdxW; // SArray - SArray* aBlockLW; // SArray + SArray* aBlockLW; // SArray // for del file SDelFReader* pDelFReader; @@ -845,7 +845,7 @@ static int32_t tsdbSnapWriteDataEnd(STsdbSnapWriter* pWriter) { // write remain stuff if (taosArrayGetSize(pWriter->aBlockLW) > 0) { - code = tsdbWriteSstBlk(pWriter->pDataFWriter, pWriter->aBlockIdxW); + code = tsdbWriteSttBlk(pWriter->pDataFWriter, pWriter->aBlockIdxW); if (code) goto _err; } @@ -911,7 +911,7 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 code = tsdbReadBlockIdx(pWriter->pDataFReader, pWriter->aBlockIdx); if (code) goto _err; - code = tsdbReadSstBlk(pWriter->pDataFReader, 0, pWriter->aSstBlk); + code = tsdbReadSttBlk(pWriter->pDataFReader, 0, pWriter->aSstBlk); if (code) goto _err; } else { ASSERT(pWriter->pDataFReader == NULL); @@ -931,25 +931,25 @@ static int32_t tsdbSnapWriteData(STsdbSnapWriter* pWriter, uint8_t* pData, uint3 // write SHeadFile fHead; SDataFile fData; - SSstFile fLast; + SSttFile fLast; SSmaFile fSma; - SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aSstF[0] = &fLast, .pSmaF = &fSma}; + SDFileSet wSet = {.pHeadF = &fHead, .pDataF = &fData, .aSttF[0] = &fLast, .pSmaF = &fSma}; if (pSet) { wSet.diskId = pSet->diskId; wSet.fid = fid; - wSet.nSstF = 1; + wSet.nSttF = 1; fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; fData = *pSet->pDataF; - fLast = (SSstFile){.commitID = pWriter->commitID, .size = 0}; + fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0}; fSma = *pSet->pSmaF; } else { wSet.diskId = (SDiskID){.level = 0, .id = 0}; wSet.fid = fid; - wSet.nSstF = 1; + wSet.nSttF = 1; fHead = (SHeadFile){.commitID = pWriter->commitID, .offset = 0, .size = 0}; fData = (SDataFile){.commitID = pWriter->commitID, .size = 0}; - fLast = (SSstFile){.commitID = pWriter->commitID, .size = 0, .offset = 0}; + fLast = (SSttFile){.commitID = pWriter->commitID, .size = 0, .offset = 0}; fSma = (SSmaFile){.commitID = pWriter->commitID, .size = 0}; } @@ -1147,7 +1147,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr code = tBlockDataCreate(&pWriter->bDataR); if (code) goto _err; - pWriter->aSstBlk = taosArrayInit(0, sizeof(SSstBlk)); + pWriter->aSstBlk = taosArrayInit(0, sizeof(SSttBlk)); if (pWriter->aSstBlk == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; @@ -1161,7 +1161,7 @@ int32_t tsdbSnapWriterOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapWr code = tBlockDataCreate(&pWriter->bDataW); if (code) goto _err; - pWriter->aBlockLW = taosArrayInit(0, sizeof(SSstBlk)); + pWriter->aBlockLW = taosArrayInit(0, sizeof(SSttBlk)); if (pWriter->aBlockLW == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto _err; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 8509c0c759..caeca45e01 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -214,7 +214,7 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { int32_t tCmprBlockL(void const *lhs, void const *rhs) { SBlockIdx *lBlockIdx = (SBlockIdx *)lhs; - SSstBlk *rBlockL = (SSstBlk *)rhs; + SSttBlk *rBlockL = (SSttBlk *)rhs; if (lBlockIdx->suid < rBlockL->suid) { return -1; @@ -311,41 +311,41 @@ bool tDataBlkHasSma(SDataBlk *pDataBlk) { return pDataBlk->smaInfo.size > 0; } -// SSstBlk ====================================================== -int32_t tPutSstBlk(uint8_t *p, void *ph) { +// SSttBlk ====================================================== +int32_t tPutSttBlk(uint8_t *p, void *ph) { int32_t n = 0; - SSstBlk *pSstBlk = (SSstBlk *)ph; + SSttBlk *pSttBlk = (SSttBlk *)ph; - n += tPutI64(p ? p + n : p, pSstBlk->suid); - n += tPutI64(p ? p + n : p, pSstBlk->minUid); - n += tPutI64(p ? p + n : p, pSstBlk->maxUid); - n += tPutI64v(p ? p + n : p, pSstBlk->minKey); - n += tPutI64v(p ? p + n : p, pSstBlk->maxKey); - n += tPutI64v(p ? p + n : p, pSstBlk->minVer); - n += tPutI64v(p ? p + n : p, pSstBlk->maxVer); - n += tPutI32v(p ? p + n : p, pSstBlk->nRow); - n += tPutI64v(p ? p + n : p, pSstBlk->bInfo.offset); - n += tPutI32v(p ? p + n : p, pSstBlk->bInfo.szBlock); - n += tPutI32v(p ? p + n : p, pSstBlk->bInfo.szKey); + n += tPutI64(p ? p + n : p, pSttBlk->suid); + n += tPutI64(p ? p + n : p, pSttBlk->minUid); + n += tPutI64(p ? p + n : p, pSttBlk->maxUid); + n += tPutI64v(p ? p + n : p, pSttBlk->minKey); + n += tPutI64v(p ? p + n : p, pSttBlk->maxKey); + n += tPutI64v(p ? p + n : p, pSttBlk->minVer); + n += tPutI64v(p ? p + n : p, pSttBlk->maxVer); + n += tPutI32v(p ? p + n : p, pSttBlk->nRow); + n += tPutI64v(p ? p + n : p, pSttBlk->bInfo.offset); + n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szBlock); + n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szKey); return n; } -int32_t tGetSstBlk(uint8_t *p, void *ph) { +int32_t tGetSttBlk(uint8_t *p, void *ph) { int32_t n = 0; - SSstBlk *pSstBlk = (SSstBlk *)ph; + SSttBlk *pSttBlk = (SSttBlk *)ph; - n += tGetI64(p + n, &pSstBlk->suid); - n += tGetI64(p + n, &pSstBlk->minUid); - n += tGetI64(p + n, &pSstBlk->maxUid); - n += tGetI64v(p + n, &pSstBlk->minKey); - n += tGetI64v(p + n, &pSstBlk->maxKey); - n += tGetI64v(p + n, &pSstBlk->minVer); - n += tGetI64v(p + n, &pSstBlk->maxVer); - n += tGetI32v(p + n, &pSstBlk->nRow); - n += tGetI64v(p + n, &pSstBlk->bInfo.offset); - n += tGetI32v(p + n, &pSstBlk->bInfo.szBlock); - n += tGetI32v(p + n, &pSstBlk->bInfo.szKey); + n += tGetI64(p + n, &pSttBlk->suid); + n += tGetI64(p + n, &pSttBlk->minUid); + n += tGetI64(p + n, &pSttBlk->maxUid); + n += tGetI64v(p + n, &pSttBlk->minKey); + n += tGetI64v(p + n, &pSttBlk->maxKey); + n += tGetI64v(p + n, &pSttBlk->minVer); + n += tGetI64v(p + n, &pSttBlk->maxVer); + n += tGetI32v(p + n, &pSttBlk->nRow); + n += tGetI64v(p + n, &pSttBlk->bInfo.offset); + n += tGetI32v(p + n, &pSttBlk->bInfo.szBlock); + n += tGetI32v(p + n, &pSttBlk->bInfo.szKey); return n; } diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 383652531e..0a9fbf92a4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -39,7 +39,7 @@ int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp * SSubmitBlkRsp r = {0}; tGetSubmitMsgNext(&msgIter, &pBlock); if (pBlock == NULL) break; - if (tsdbInsertTableData(pTsdb, version, &msgIter, pBlock, &r) < 0) { + if ((terrno = tsdbInsertTableData(pTsdb, version, &msgIter, pBlock, &r)) < 0) { return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2724d4cbfd..51d83d8eed 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -534,7 +534,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pR } tqUpdateTbUidList(pVnode->pTq, tbUids, true); - if (tdUpdateTbUidList(pVnode->pSma, pStore) < 0) { + if (tdUpdateTbUidList(pVnode->pSma, pStore, true) < 0) { goto _exit; } tdUidStoreFree(pStore); @@ -692,6 +692,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq SEncoder encoder = {0}; int32_t ret; SArray *tbUids = NULL; + STbUidStore *pStore = NULL; pRsp->msgType = TDMT_VND_DROP_TABLE_RSP; pRsp->pCont = NULL; @@ -715,9 +716,10 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { SVDropTbReq *pDropTbReq = req.pReqs + iReq; SVDropTbRsp dropTbRsp = {0}; + tb_uid_t tbUid = 0; /* code */ - ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids); + ret = metaDropTable(pVnode->pMeta, version, pDropTbReq, tbUids, &tbUid); if (ret < 0) { if (pDropTbReq->igNotExists && terrno == TSDB_CODE_VND_TABLE_NOT_EXIST) { dropTbRsp.code = TSDB_CODE_SUCCESS; @@ -726,15 +728,18 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t version, void *pReq } } else { dropTbRsp.code = TSDB_CODE_SUCCESS; + if (tbUid > 0) tdFetchTbUidList(pVnode->pSma, &pStore, pDropTbReq->suid, tbUid); } taosArrayPush(rsp.pArray, &dropTbRsp); } tqUpdateTbUidList(pVnode->pTq, tbUids, false); + tdUpdateTbUidList(pVnode->pSma, pStore, false); _exit: taosArrayDestroy(tbUids); + tdUidStoreFree(pStore); tDecoderClear(&decoder); tEncodeSize(tEncodeSVDropTbBatchRsp, &rsp, pRsp->contLen, ret); pRsp->pCont = rpcMallocCont(pRsp->contLen); diff --git a/source/libs/catalog/src/ctgDbg.c b/source/libs/catalog/src/ctgDbg.c index bd3402dc39..d215242307 100644 --- a/source/libs/catalog/src/ctgDbg.c +++ b/source/libs/catalog/src/ctgDbg.c @@ -367,18 +367,22 @@ void ctgdShowDBCache(SCatalog* pCtg, SHashObj *dbHash) { int32_t stbNum = dbCache->stbCache ? taosHashGetSize(dbCache->stbCache) : 0; int32_t vgVersion = CTG_DEFAULT_INVALID_VERSION; int32_t hashMethod = -1; + int16_t hashPrefix = 0; + int16_t hashSuffix = 0; int32_t vgNum = 0; if (dbCache->vgCache.vgInfo) { vgVersion = dbCache->vgCache.vgInfo->vgVersion; hashMethod = dbCache->vgCache.vgInfo->hashMethod; + hashPrefix = dbCache->vgCache.vgInfo->hashPrefix; + hashSuffix = dbCache->vgCache.vgInfo->hashSuffix; if (dbCache->vgCache.vgInfo->vgHash) { vgNum = taosHashGetSize(dbCache->vgCache.vgInfo->vgHash); } } - ctgDebug("[%d] db [%.*s][0x%"PRIx64"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, vgNum:%d", - i, (int32_t)len, dbFName, dbCache->dbId, dbCache->deleted?"deleted":"", metaNum, stbNum, vgVersion, hashMethod, vgNum); + ctgDebug("[%d] db [%.*s][0x%"PRIx64"] %s: metaNum:%d, stbNum:%d, vgVersion:%d, hashMethod:%d, prefix:%d, suffix:%d, vgNum:%d", + i, (int32_t)len, dbFName, dbCache->dbId, dbCache->deleted?"deleted":"", metaNum, stbNum, vgVersion, hashMethod, hashPrefix, hashSuffix, vgNum); pIter = taosHashIterate(dbHash, pIter); } diff --git a/source/libs/catalog/src/ctgUtil.c b/source/libs/catalog/src/ctgUtil.c index e28234ab76..296100ce6d 100644 --- a/source/libs/catalog/src/ctgUtil.c +++ b/source/libs/catalog/src/ctgUtil.c @@ -848,15 +848,11 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName CTG_ERR_RET(TSDB_CODE_TSC_DB_NOT_SELECTED); } - tableNameHashFp fp = NULL; SVgroupInfo *vgInfo = NULL; - - CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp)); - char tbFullName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(pTableName, tbFullName); - uint32_t hashValue = (*fp)(tbFullName, (uint32_t)strlen(tbFullName)); + uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod, dbInfo->hashPrefix, dbInfo->hashSuffix); void *pIter = taosHashIterate(dbInfo->vgHash, NULL); while (pIter) { @@ -919,11 +915,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); } - tableNameHashFp fp = NULL; SVgroupInfo *vgInfo = NULL; - - CTG_ERR_RET(ctgGetHashFunction(dbInfo->hashMethod, &fp)); - int32_t tbNum = taosArrayGetSize(pNames); if (1 == vgNum) { @@ -975,7 +967,7 @@ int32_t ctgGetVgInfosFromHashValue(SCatalog *pCtg, SCtgTaskReq* tReq, SDBVgInfo tbNameLen = offset + strlen(pName->tname); strcpy(tbFullName + offset, pName->tname); - uint32_t hashValue = (*fp)(tbFullName, (uint32_t)tbNameLen); + uint32_t hashValue = taosGetTbHashVal(tbFullName, (uint32_t)strlen(tbFullName), dbInfo->hashMethod, dbInfo->hashPrefix, dbInfo->hashSuffix); SVgroupInfo **p = taosArraySearch(pVgList, &hashValue, ctgHashValueComp, TD_EQ); diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 0be85333dc..c01c269e64 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -218,6 +218,8 @@ void ctgTestBuildDBVgroup(SDBVgInfo **pdbVgroup) { ctgTestCurrentVgVersion = dbVgroup->vgVersion; dbVgroup->hashMethod = 0; + dbVgroup->hashPrefix = 0; + dbVgroup->hashSuffix = 0; dbVgroup->vgHash = taosHashInit(ctgTestVgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); vgNum = ctgTestGetVgNumFromVgVersion(dbVgroup->vgVersion); diff --git a/source/libs/qcom/src/querymsg.c b/source/libs/qcom/src/querymsg.c index e2d3ac1583..e54937114c 100644 --- a/source/libs/qcom/src/querymsg.c +++ b/source/libs/qcom/src/querymsg.c @@ -38,6 +38,8 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) { pOut->dbVgroup->vgVersion = usedbRsp->vgVersion; pOut->dbVgroup->hashMethod = usedbRsp->hashMethod; + pOut->dbVgroup->hashPrefix = usedbRsp->hashPrefix; + pOut->dbVgroup->hashSuffix = usedbRsp->hashSuffix; qDebug("Got %d vgroup for db %s", usedbRsp->vgNum, usedbRsp->db);