partial work

This commit is contained in:
Hongze Cheng 2021-01-07 23:12:17 +08:00
parent 4c70e6a394
commit 70221b1098
1 changed files with 78 additions and 25 deletions

View File

@ -280,6 +280,8 @@ static int tsdbCommitToFile(SCommitH *pch, SDFileSet *pOldSet, int fid) {
for (size_t tid = 0; tid < pMem->maxTables; tid++) { for (size_t tid = 0; tid < pMem->maxTables; tid++) {
SCommitIter *pIter = pch->iters + tid; SCommitIter *pIter = pch->iters + tid;
// No table exists, continue
if (pIter->pTable == NULL) continue; if (pIter->pTable == NULL) continue;
if (tsdbCommitToTable(pch, tid) < 0) { if (tsdbCommitToTable(pch, tid) < 0) {
@ -452,45 +454,96 @@ static int tsdbCommitToTable(SCommitH *pch, int tid) {
tsdbSetCommitTable(pch, pIter->pTable); tsdbSetCommitTable(pch, pIter->pTable);
// No memory data and no disk data, just return // No memory data and no disk data, just return
if (pIter->pIter == NULL && pch->readh.pBlockIdx == NULL) { if (pIter->pIter == NULL && pch->readh.pBlkIdx == NULL) {
TSDB_RUNLOCK_TABLE(pIter->pTable); TSDB_RUNLOCK_TABLE(pIter->pTable);
return 0; return 0;
} }
tsdbLoadBlockInfo(&(pch->readh), NULL); if (tsdbLoadBlockInfo(&(pch->readh), NULL) < 0) {
TSDB_RUNLOCK_TABLE(pIter->pTable);
return -1;
}
if (pIter->pIter == NULL) { // Process merge commit
// No memory data but has disk data int nBlocks = (pch->readh.pBlkIdx == NULL) ? 0 : pch->readh.pBlkIdx->numOfBlocks;
// TODO
} else {
TSKEY nextKey = tsdbNextIterKey(pIter->pIter); TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
int cidx = 0; int cidx = 0;
void * ptr = NULL;
SBlock *pBlock = NULL; SBlock *pBlock = NULL;
void *ptr = taosbsearch((void *)(&nextKey), pch->readh.pBlkInfo->blocks, pch->readh.pBlockIdx->numOfBlocks,
sizeof(SBlock), tsdbComparKeyBlock, TD_GE);
while (true) { while (true) {
if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) && (cidx >= pch->readh.pBlockIdx->numOfBlocks)) if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) && (cidx >= nBlocks)) break;
break;
if (tsdbComparKeyBlock((void *)(&nextKey), pBlock) < 0) { if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) ||
((cidx < nBlocks) && (!pBlock->last) && tsdbComparKeyBlock((void *)(&nextKey), pBlock) > 0)) {
// TODO: move the block
cidx++;
} else if ((cidx < nBlocks) && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) {
// TODO: process merge commit
cidx++;
} else {
if (pBlock == NULL) {
// commit memory data until pch->maxKey and write to the appropriate file
} else {
// commit memory data until pBlock->keyFirst and write to only data file
}
}
#if 0
if (/* Key end */) {
tsdbMoveBlock(); =============
} else {
if (/*block end*/) {
// process append commit until pch->maxKey >>>>>>>
} else {
if (pBlock->last) { if (pBlock->last) {
// merge with the last block // TODO: merge the block ||||||||||||||||||||||
} else { } else {
// Commit until pch->maxKey or (pBlock[1].keyFirst-1) if (pBlock > nextKey) {
} // process append commit until pBlock->keyFirst-1 >>>>>>
} else if (tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0) { // merge the block } else if (pBlock < nextKey) {
// tsdbMoveBlock() ============
} else { } else {
// merge the block ||||||||||||
}
}
}
}
#endif
}
} // if (pIter->pIter == NULL) {
} // // No memory data but has disk data
} // // TODO
// } else {
// TSKEY nextKey = tsdbNextIterKey(pIter->pIter);
// int cidx = 0;
// SBlock *pBlock = NULL;
// void *ptr = taosbsearch((void *)(&nextKey), pch->readh.pBlkInfo->blocks, pch->readh.pBlkIdx->numOfBlocks,
// sizeof(SBlock), tsdbComparKeyBlock, TD_GE);
// while (true) {
// if ((nextKey == TSDB_DATA_TIMESTAMP_NULL || nextKey > pch->maxKey) && (cidx >= pch->readh.pBlkIdx->numOfBlocks))
// break;
// if (tsdbComparKeyBlock((void *)(&nextKey), pBlock) < 0) {
// if (pBlock->last) {
// // merge with the last block
// } else {
// // Commit until pch->maxKey or (pBlock[1].keyFirst-1)
// }
// } else if (tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0) { // merge the block
// } else {
// }
// }
// }
TSDB_RUNLOCK_TABLE(pIter->pTable); TSDB_RUNLOCK_TABLE(pIter->pTable);
tsdbWriteBlockInfo(pch); if (tsdbWriteBlockInfo(pch) < 0) return -1;
return 0; return 0;
} }