more work
This commit is contained in:
parent
6f81c3f3a2
commit
89026d754f
|
@ -121,6 +121,7 @@ int32_t tGetBlockCol(uint8_t *p, void *ph);
|
||||||
#define tBlockInit() ((SBlock){0})
|
#define tBlockInit() ((SBlock){0})
|
||||||
void tBlockReset(SBlock *pBlock);
|
void tBlockReset(SBlock *pBlock);
|
||||||
void tBlockClear(SBlock *pBlock);
|
void tBlockClear(SBlock *pBlock);
|
||||||
|
int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest);
|
||||||
int32_t tPutBlock(uint8_t *p, void *ph);
|
int32_t tPutBlock(uint8_t *p, void *ph);
|
||||||
int32_t tGetBlock(uint8_t *p, void *ph);
|
int32_t tGetBlock(uint8_t *p, void *ph);
|
||||||
int32_t tBlockCmprFn(const void *p1, const void *p2);
|
int32_t tBlockCmprFn(const void *p1, const void *p2);
|
||||||
|
@ -164,6 +165,7 @@ void tsdbFree(uint8_t *pBuf);
|
||||||
#define tMapDataInit() ((SMapData){0})
|
#define tMapDataInit() ((SMapData){0})
|
||||||
void tMapDataReset(SMapData *pMapData);
|
void tMapDataReset(SMapData *pMapData);
|
||||||
void tMapDataClear(SMapData *pMapData);
|
void tMapDataClear(SMapData *pMapData);
|
||||||
|
int32_t tMapDataCopy(SMapData *pMapDataSrc, SMapData *pMapDataDest);
|
||||||
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *));
|
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *));
|
||||||
int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *));
|
int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *));
|
||||||
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
|
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
|
||||||
|
|
|
@ -342,33 +342,6 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// static int32_t tsdbGetOverlapRowNumber(STbDataIter *pIter, SBlock *pBlock) {
|
|
||||||
// int32_t nRow = 0;
|
|
||||||
// TSDBROW *pRow;
|
|
||||||
// TSDBKEY key;
|
|
||||||
// int32_t c = 0;
|
|
||||||
// STbDataIter iter = *pIter;
|
|
||||||
|
|
||||||
// iter.pRow = NULL;
|
|
||||||
// while (true) {
|
|
||||||
// pRow = tsdbTbDataIterGet(pIter);
|
|
||||||
|
|
||||||
// if (pRow == NULL) break;
|
|
||||||
// key = tsdbRowKey(pRow);
|
|
||||||
|
|
||||||
// c = tBlockCmprFn(&(SBlock){.info.maxKey = key, .info.minKey = key}, pBlock);
|
|
||||||
// if (c == 0) {
|
|
||||||
// nRow++;
|
|
||||||
// } else if (c > 0) {
|
|
||||||
// break;
|
|
||||||
// } else {
|
|
||||||
// ASSERT(0);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return nRow;
|
|
||||||
// }
|
|
||||||
|
|
||||||
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STsdb *pTsdb = pCommitter->pTsdb;
|
STsdb *pTsdb = pCommitter->pTsdb;
|
||||||
|
@ -981,6 +954,82 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbGetOvlpNRow(STbDataIter *pIter, SBlock *pBlock) {
|
||||||
|
int32_t nRow = 0;
|
||||||
|
TSDBROW *pRow;
|
||||||
|
TSDBKEY key;
|
||||||
|
int32_t c = 0;
|
||||||
|
STbDataIter iter = *pIter;
|
||||||
|
|
||||||
|
iter.pRow = NULL;
|
||||||
|
while (true) {
|
||||||
|
pRow = tsdbTbDataIterGet(pIter);
|
||||||
|
|
||||||
|
if (pRow == NULL) break;
|
||||||
|
key = TSDBROW_KEY(pRow);
|
||||||
|
|
||||||
|
c = tBlockCmprFn(&(SBlock){.maxKey = key, .minKey = key}, pBlock);
|
||||||
|
if (c == 0) {
|
||||||
|
nRow++;
|
||||||
|
} else if (c > 0) {
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nRow;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t tsdbMergeAsSubBlock(SCommitter *pCommitter, STbDataIter *pIter, SBlock *pBlock) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SBlockData *pBlockData = &pCommitter->nBlockData;
|
||||||
|
SBlockIdx *pBlockIdx = &(SBlockIdx){.suid = pIter->pTbData->suid, .uid = pIter->pTbData->uid};
|
||||||
|
TSDBROW *pRow;
|
||||||
|
|
||||||
|
tBlockDataReset(pBlockData);
|
||||||
|
pRow = tsdbTbDataIterGet(pIter);
|
||||||
|
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow));
|
||||||
|
if (code) goto _err;
|
||||||
|
while (true) {
|
||||||
|
if (pRow) break;
|
||||||
|
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
tsdbTbDataIterNext(pIter);
|
||||||
|
pRow = tsdbTbDataIterGet(pIter);
|
||||||
|
if (pRow) {
|
||||||
|
int32_t c = tBlockCmprFn(&(SBlock){}, pBlock);
|
||||||
|
|
||||||
|
if (c == 0) {
|
||||||
|
code = tsdbCommitterUpdateSchema(pCommitter, pIter->pTbData->suid, pIter->pTbData->uid, TSDBROW_SVERSION(pRow));
|
||||||
|
if (code) goto _err;
|
||||||
|
} else if (c > 0) {
|
||||||
|
pRow = NULL;
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// write as a subblock
|
||||||
|
code = tBlockCopy(pBlock, &pCommitter->nBlock);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
code = tsdbWriteBlockData(pCommitter->pWriter, pBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock,
|
||||||
|
pCommitter->cmprAlg);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
code = tMapDataPutItem(&pCommitter->nBlockMap, &pCommitter->nBlock, tPutBlock);
|
||||||
|
if (code) goto _err;
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tsdbError("vgId:%d tsdb merge as subblock failed since %s", TD_VID(pCommitter->pTsdb->pVnode), tstrerror(code));
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
|
static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBlockIdx *pBlockIdx) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
STbDataIter *pIter = &(STbDataIter){0};
|
STbDataIter *pIter = &(STbDataIter){0};
|
||||||
|
@ -1070,19 +1119,26 @@ static int32_t tsdbCommitTableData(SCommitter *pCommitter, STbData *pTbData, SBl
|
||||||
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
|
if (pRow && TSDBROW_TS(pRow) > pCommitter->maxKey) pRow = NULL;
|
||||||
} else {
|
} else {
|
||||||
// merge memory and disk
|
// merge memory and disk
|
||||||
int64_t nOvlp = 0; // (todo)
|
int32_t nOvlp = tsdbGetOvlpNRow(pIter, pBlock);
|
||||||
|
ASSERT(nOvlp);
|
||||||
if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
|
if (pBlock->nRow + nOvlp <= pCommitter->maxRow && pBlock->nSubBlock < TSDB_MAX_SUBBLOCKS) {
|
||||||
// add as a subblock
|
code = tsdbMergeAsSubBlock(pCommitter, pIter, pBlock);
|
||||||
|
if (code) goto _err;
|
||||||
} else {
|
} else {
|
||||||
if (iBlock == nBlock - 1) {
|
TSDBKEY toKey = {.ts = pCommitter->maxKey + 1, .version = VERSION_MIN};
|
||||||
code = tsdbMergeTableData(pCommitter, pIter, pBlock,
|
int8_t toDataOnly = 0;
|
||||||
(TSDBKEY){.ts = pCommitter->maxKey + 1, .version = VERSION_MIN}, 0);
|
|
||||||
|
|
||||||
if (code) goto _err;
|
if (iBlock < nBlock - 1) {
|
||||||
} else {
|
toDataOnly = 1;
|
||||||
// code = tsdbMergeTableData(pCommitter, pIter, pBlock, pBlock[1].minKey, 1);
|
|
||||||
if (code) goto _err;
|
SBlock nextBlock = {0};
|
||||||
|
tBlockReset(&nextBlock);
|
||||||
|
tMapDataGetItemByIdx(&pCommitter->oBlockMap, iBlock + 1, &nextBlock, tGetBlock);
|
||||||
|
toKey = nextBlock.minKey;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
code = tsdbMergeTableData(pCommitter, pIter, pBlock, toKey, toDataOnly);
|
||||||
|
if (code) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pRow = tsdbTbDataIterGet(pIter);
|
pRow = tsdbTbDataIterGet(pIter);
|
||||||
|
|
|
@ -35,6 +35,39 @@ void tMapDataClear(SMapData *pMapData) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tMapDataCopy(SMapData *pMapDataSrc, SMapData *pMapDataDest) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t size;
|
||||||
|
|
||||||
|
pMapDataDest->nItem = pMapDataSrc->nItem;
|
||||||
|
pMapDataDest->flag = pMapDataSrc->flag;
|
||||||
|
|
||||||
|
switch (pMapDataDest->flag) {
|
||||||
|
case TSDB_OFFSET_I32:
|
||||||
|
size = sizeof(int32_t) * pMapDataDest->nItem;
|
||||||
|
break;
|
||||||
|
case TSDB_OFFSET_I16:
|
||||||
|
size = sizeof(int16_t) * pMapDataDest->nItem;
|
||||||
|
break;
|
||||||
|
case TSDB_OFFSET_I8:
|
||||||
|
size = sizeof(int8_t) * pMapDataDest->nItem;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
code = tsdbRealloc(&pMapDataDest->pOfst, size);
|
||||||
|
if (code) goto _exit;
|
||||||
|
memcpy(pMapDataDest->pOfst, pMapDataSrc->pOfst, size);
|
||||||
|
|
||||||
|
pMapDataDest->nData = pMapDataSrc->nData;
|
||||||
|
code = tsdbRealloc(&pMapDataDest->pData, pMapDataDest->nData);
|
||||||
|
if (code) goto _exit;
|
||||||
|
memcpy(pMapDataDest->pData, pMapDataSrc->pData, pMapDataDest->nData);
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
|
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t offset = pMapData->nData;
|
int32_t offset = pMapData->nData;
|
||||||
|
@ -369,6 +402,32 @@ void tBlockClear(SBlock *pBlock) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tBlockCopy(SBlock *pBlockSrc, SBlock *pBlockDest) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
pBlockDest->minKey = pBlockSrc->minKey;
|
||||||
|
pBlockDest->maxKey = pBlockSrc->maxKey;
|
||||||
|
pBlockDest->minVersion = pBlockSrc->minVersion;
|
||||||
|
pBlockDest->maxVersion = pBlockSrc->maxVersion;
|
||||||
|
pBlockDest->nRow = pBlockSrc->nRow;
|
||||||
|
pBlockDest->last = pBlockSrc->last;
|
||||||
|
pBlockDest->hasDup = pBlockSrc->hasDup;
|
||||||
|
pBlockDest->nSubBlock = pBlockSrc->nSubBlock;
|
||||||
|
for (int32_t iSubBlock = 0; iSubBlock < pBlockSrc->nSubBlock; iSubBlock++) {
|
||||||
|
pBlockDest->aSubBlock[iSubBlock].nRow = pBlockSrc->aSubBlock[iSubBlock].nRow;
|
||||||
|
pBlockDest->aSubBlock[iSubBlock].cmprAlg = pBlockSrc->aSubBlock[iSubBlock].cmprAlg;
|
||||||
|
pBlockDest->aSubBlock[iSubBlock].offset = pBlockSrc->aSubBlock[iSubBlock].offset;
|
||||||
|
pBlockDest->aSubBlock[iSubBlock].vsize = pBlockSrc->aSubBlock[iSubBlock].vsize;
|
||||||
|
pBlockDest->aSubBlock[iSubBlock].ksize = pBlockSrc->aSubBlock[iSubBlock].ksize;
|
||||||
|
pBlockDest->aSubBlock[iSubBlock].bsize = pBlockSrc->aSubBlock[iSubBlock].bsize;
|
||||||
|
code = tMapDataCopy(&pBlockSrc->aSubBlock[iSubBlock].mBlockCol, &pBlockDest->aSubBlock[iSubBlock].mBlockCol);
|
||||||
|
if (code) goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tPutBlock(uint8_t *p, void *ph) {
|
int32_t tPutBlock(uint8_t *p, void *ph) {
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
SBlock *pBlock = (SBlock *)ph;
|
SBlock *pBlock = (SBlock *)ph;
|
||||||
|
|
Loading…
Reference in New Issue