fix tsdb save

This commit is contained in:
Hongze Cheng 2022-06-07 05:24:25 +00:00
parent 933de38cd0
commit 892021b089
4 changed files with 68 additions and 43 deletions

View File

@ -171,15 +171,12 @@ struct STsdb {
#if 1 // ======================================
struct STable {
uint64_t tid;
uint64_t suid;
uint64_t uid;
STSchema *pSchema; // latest schema
STSchema *pCacheSchema; // cached cache
};
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
// int tsdbPrepareCommit(STsdb *pTsdb);
typedef enum {
TSDB_FILE_HEAD = 0, // .head
@ -387,7 +384,7 @@ typedef struct {
typedef struct {
int32_t delimiter; // For recovery usage
int32_t tid;
uint64_t suid;
uint64_t uid;
SBlock blocks[];
} SBlockInfo;

View File

@ -400,7 +400,7 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
break;
}
if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->uid <= pIdx->uid))) {
if (pIter && pIter->pTable && (!pIdx || (pIter->pTable->suid <= pIdx->suid || pIter->pTable->uid <= pIdx->uid))) {
if (tsdbCommitToTable(pCommith, mIter) < 0) {
tsdbCloseCommitFile(pCommith, true);
// revert the file change
@ -478,7 +478,7 @@ static int32_t tsdbCreateCommitIters(SCommitH *pCommith) {
pCommitIter->pTable = (STable *)taosMemoryMalloc(sizeof(STable));
pCommitIter->pTable->uid = pTbData->uid;
pCommitIter->pTable->tid = pTbData->uid;
pCommitIter->pTable->suid = pTbData->suid;
pCommitIter->pTable->pSchema = pTSchema;
pCommitIter->pTable->pCacheSchema = NULL;
}
@ -734,8 +734,8 @@ static int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA,
pBlkInfo = *ppBuf;
pBlkInfo->delimiter = TSDB_FILE_DELIMITER;
pBlkInfo->tid = TABLE_TID(pTable);
pBlkInfo->uid = TABLE_UID(pTable);
pBlkInfo->suid = pTable->suid;
pBlkInfo->uid = pTable->uid;
memcpy((void *)(pBlkInfo->blocks), taosArrayGet(pSupA, 0), nSupBlocks * sizeof(SBlock));
if (nSubBlocks > 0) {
@ -761,7 +761,8 @@ static int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA,
// Set pIdx
pBlock = taosArrayGetLast(pSupA);
pIdx->uid = TABLE_UID(pTable);
pIdx->suid = pTable->suid;
pIdx->uid = pTable->uid;
pIdx->hasLast = pBlock->last ? 1 : 0;
pIdx->maxKey = pBlock->maxKey;
pIdx->numOfBlocks = (uint32_t)nSupBlocks;
@ -916,7 +917,7 @@ static int tsdbMoveBlkIdx(SCommitH *pCommith, SBlockIdx *pIdx) {
return -1;
}
STable table = {.tid = pIdx->uid, .uid = pIdx->uid, .pSchema = NULL};
STable table = {.suid = pIdx->suid, .uid = pIdx->uid, .pSchema = NULL};
pCommith->pTable = &table;
while (bidx < nBlocks) {
@ -1177,7 +1178,7 @@ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFi
}
pBlockData->delimiter = TSDB_FILE_DELIMITER;
pBlockData->uid = TABLE_UID(pTable);
pBlockData->uid = pTable->uid;
pBlockData->numOfCols = nColsNotAllNull;
taosCalcChecksumAppend(0, (uint8_t *)pBlockData, tsize);
@ -1217,7 +1218,7 @@ static int tsdbWriteBlockImpl(STsdb *pRepo, STable *pTable, SDFile *pDFile, SDFi
tsdbDebug("vgId:%d, uid:%" PRId64 " a block of data is written to file %s, offset %" PRId64
" numOfRows %d len %d numOfCols %" PRId16 " keyFirst %" PRId64 " keyLast %" PRId64,
REPO_ID(pRepo), TABLE_UID(pTable), TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
REPO_ID(pRepo), pTable->uid, TSDB_FILE_FULL_NAME(pDFile), offset, rowsToWrite, pBlock->len,
pBlock->numOfCols, pBlock->minKey.ts, pBlock->maxKey.ts);
return 0;

View File

@ -1109,7 +1109,7 @@ static int32_t loadBlockInfo(STsdbReadHandle* pTsdbReadHandle, int32_t index, in
STableCheckInfo* pCheckInfo = taosArrayGet(pTsdbReadHandle->pTableCheckInfo, index);
pCheckInfo->numOfBlocks = 0;
STable table = {.uid = pCheckInfo->tableId, .tid = pCheckInfo->tableId};
STable table = {.uid = pCheckInfo->tableId, .suid = pCheckInfo->suid};
table.pSchema = pTsdbReadHandle->pSchema;
if (tsdbSetReadTable(&pTsdbReadHandle->rhelper, &table) != TSDB_CODE_SUCCESS) {

View File

@ -156,6 +156,24 @@ int tsdbLoadBlockIdx(SReadH *pReadh) {
return 0;
}
static int32_t tsdbBlockIdxCmprFn(const void *p1, const void *p2) {
SBlockIdx *pBlockIdx1 = (SBlockIdx *)p1;
SBlockIdx *pBlockIdx2 = (SBlockIdx *)p2;
if (pBlockIdx1->suid < pBlockIdx2->suid) {
return -1;
} else if (pBlockIdx1->suid > pBlockIdx2->suid) {
return 1;
}
if (pBlockIdx1->uid < pBlockIdx2->uid) {
return -1;
} else if (pBlockIdx1->uid > pBlockIdx2->uid) {
return 1;
}
return 0;
}
int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
STSchema *pSchema = tsdbGetTableSchemaImpl(TSDB_READ_REPO(pReadh), pTable, false, false, -1);
@ -171,33 +189,40 @@ int tsdbSetReadTable(SReadH *pReadh, STable *pTable) {
return -1;
}
size_t size = taosArrayGetSize(pReadh->aBlkIdx);
if (size > 0) {
while (true) {
if (pReadh->cidx >= size) {
pReadh->pBlkIdx = NULL;
break;
}
SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
if (pBlkIdx->uid == TABLE_TID(pTable)) {
if (pBlkIdx->uid == TABLE_UID(pTable)) {
pReadh->pBlkIdx = pBlkIdx;
} else {
pReadh->pBlkIdx = NULL;
}
pReadh->cidx++;
break;
} else if (pBlkIdx->uid > TABLE_TID(pTable)) {
pReadh->pBlkIdx = NULL;
break;
} else {
pReadh->cidx++;
}
}
} else {
uint8_t *p = taosArraySearch(pReadh->aBlkIdx, &(SBlockIdx){.suid = pTable->suid, .uid = pTable->uid},
tsdbBlockIdxCmprFn, TD_EQ);
if (p == NULL) {
pReadh->pBlkIdx = NULL;
} else {
pReadh->pBlkIdx = (SBlockIdx *)p;
}
// size_t size = taosArrayGetSize(pReadh->aBlkIdx);
// if (size > 0) {
// while (true) {
// if (pReadh->cidx >= size) {
// pReadh->pBlkIdx = NULL;
// break;
// }
// SBlockIdx *pBlkIdx = taosArrayGet(pReadh->aBlkIdx, pReadh->cidx);
// if (pBlkIdx->uid == TABLE_TID(pTable)) {
// if (pBlkIdx->uid == TABLE_UID(pTable)) {
// pReadh->pBlkIdx = pBlkIdx;
// } else {
// pReadh->pBlkIdx = NULL;
// }
// pReadh->cidx++;
// break;
// } else if (pBlkIdx->uid > TABLE_TID(pTable)) {
// pReadh->pBlkIdx = NULL;
// break;
// } else {
// pReadh->cidx++;
// }
// }
// } else {
// pReadh->pBlkIdx = NULL;
// }
return 0;
}
@ -553,12 +578,12 @@ static int tsdbLoadBlockOffset(SReadH *pReadh, SBlock *pBlock) {
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx) {
int tlen = 0;
// tlen += taosEncodeVariantI32(buf, pIdx->tid);
tlen += taosEncodeFixedU64(buf, pIdx->suid);
tlen += taosEncodeFixedU64(buf, pIdx->uid);
tlen += taosEncodeVariantU32(buf, pIdx->len);
tlen += taosEncodeVariantU32(buf, pIdx->offset);
tlen += taosEncodeFixedU8(buf, pIdx->hasLast);
tlen += taosEncodeVariantU32(buf, pIdx->numOfBlocks);
tlen += taosEncodeFixedU64(buf, pIdx->uid);
tlen += taosEncodeFixedU64(buf, pIdx->maxKey.ts);
return tlen;
@ -570,6 +595,10 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
uint64_t value = 0;
// if ((buf = taosDecodeVariantI32(buf, &(pIdx->tid))) == NULL) return NULL;
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
pIdx->suid = (int64_t)value;
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
pIdx->uid = (int64_t)value;
if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL;
if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL;
if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL;
@ -577,8 +606,6 @@ void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx) {
if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL;
pIdx->numOfBlocks = numOfBlocks;
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
pIdx->uid = (int64_t)value;
if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL;
pIdx->maxKey.ts = (TSKEY)value;
return buf;