Merge remote-tracking branch 'origin/feat/tsdb_refact' into feat/tsdb_refact
This commit is contained in:
commit
f02dd0ef88
|
@ -138,7 +138,6 @@ void tColDataClear(void *ph);
|
|||
int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal);
|
||||
int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest);
|
||||
int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal);
|
||||
int32_t tColDataPCmprFn(const void *p1, const void *p2);
|
||||
// SBlockData
|
||||
#define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0)
|
||||
#define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1)
|
||||
|
|
|
@ -415,10 +415,6 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
|
|||
TSDBROW *pRow1;
|
||||
TSDBROW row2;
|
||||
TSDBROW *pRow2 = &row2;
|
||||
TSDBROW row;
|
||||
TSDBROW *pRow = &row;
|
||||
int32_t c = 0;
|
||||
TSKEY lastKey;
|
||||
|
||||
// read SBlockData
|
||||
code = tsdbReadBlockData(pCommitter->pReader, pBlockIdx, pBlockMerge, pBlockDataMerge, NULL, NULL);
|
||||
|
@ -432,7 +428,6 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
|
|||
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
|
||||
if (code) goto _err;
|
||||
|
||||
lastKey = TSKEY_MIN;
|
||||
tBlockReset(pBlock);
|
||||
tBlockDataReset(pBlockData);
|
||||
while (true) {
|
||||
|
@ -445,68 +440,56 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
|
|||
}
|
||||
|
||||
if (pRow1 && pRow2) {
|
||||
if (tsdbRowCmprFn(pRow1, pRow2) < 0) {
|
||||
*pRow = *pRow1;
|
||||
|
||||
tsdbTbDataIterNext(pIter);
|
||||
pRow1 = tsdbTbDataIterGet(pIter);
|
||||
|
||||
if (pRow1) {
|
||||
if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
|
||||
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_VERSION(pRow1));
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
pRow1 = NULL;
|
||||
}
|
||||
}
|
||||
} else if (tsdbRowCmprFn(pRow1, pRow2) > 0) {
|
||||
*pRow = *pRow2;
|
||||
|
||||
if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
|
||||
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
|
||||
} else {
|
||||
pRow2 = NULL;
|
||||
}
|
||||
int32_t c = tsdbRowCmprFn(pRow1, pRow2);
|
||||
if (c < 0) {
|
||||
goto _append_mem_row;
|
||||
} else if (c > 0) {
|
||||
goto _append_block_row;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
} else if (pRow1) {
|
||||
*pRow = *pRow1;
|
||||
|
||||
tsdbTbDataIterNext(pIter);
|
||||
pRow1 = tsdbTbDataIterGet(pIter);
|
||||
if (pRow1) {
|
||||
if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
|
||||
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_VERSION(pRow1));
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
pRow1 = NULL;
|
||||
}
|
||||
}
|
||||
goto _append_mem_row;
|
||||
} else {
|
||||
*pRow = *pRow2;
|
||||
|
||||
if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
|
||||
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
|
||||
} else {
|
||||
pRow2 = NULL;
|
||||
}
|
||||
goto _append_block_row;
|
||||
}
|
||||
|
||||
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
|
||||
_append_mem_row:
|
||||
code = tBlockDataAppendRow(pBlockData, pRow1, pCommitter->pTSchema);
|
||||
if (code) goto _err;
|
||||
|
||||
pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow));
|
||||
pBlock->maxVersion = TMAX(pBlock->maxVersion, TSDBROW_VERSION(pRow));
|
||||
pBlock->nRow++;
|
||||
if (lastKey == TSDBROW_TS(pRow)) {
|
||||
pBlock->hasDup = 1;
|
||||
} else {
|
||||
lastKey = TSDBROW_TS(pRow);
|
||||
tsdbTbDataIterNext(pIter);
|
||||
pRow1 = tsdbTbDataIterGet(pIter);
|
||||
if (pRow1) {
|
||||
if (tsdbKeyCmprFn(&TSDBROW_KEY(pRow1), &toKey) < 0) {
|
||||
code = tsdbCommitterUpdateSchema(pCommitter, pBlockIdx->suid, pBlockIdx->uid, TSDBROW_SVERSION(pRow1));
|
||||
if (code) goto _err;
|
||||
} else {
|
||||
pRow1 = NULL;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) goto _write_block;
|
||||
continue;
|
||||
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
|
||||
goto _write_block;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
|
||||
_append_block_row:
|
||||
code = tBlockDataAppendRow(pBlockData, pRow2, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
if (pRow2->iRow + 1 < pBlockDataMerge->nRow) {
|
||||
*pRow2 = tsdbRowFromBlockData(pBlockDataMerge, pRow2->iRow + 1);
|
||||
} else {
|
||||
pRow2 = NULL;
|
||||
}
|
||||
|
||||
if (pBlockData->nRow >= pCommitter->maxRow * 4 / 5) {
|
||||
goto _write_block;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
|
||||
_write_block:
|
||||
if (!toDataOnly && pBlockData->nRow < pCommitter->minRow) {
|
||||
|
@ -521,7 +504,6 @@ static int32_t tsdbMergeTableData(SCommitter *pCommitter, STbDataIter *pIter, SB
|
|||
code = tMapDataPutItem(&pCommitter->nBlockMap, pBlock, tPutBlock);
|
||||
if (code) goto _err;
|
||||
|
||||
lastKey = TSKEY_MIN;
|
||||
tBlockReset(pBlock);
|
||||
tBlockDataReset(pBlockData);
|
||||
}
|
||||
|
@ -538,7 +520,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
|
|||
TSDBROW *pRow;
|
||||
SBlock *pBlock = &pCommitter->nBlock;
|
||||
SBlockData *pBlockData = &pCommitter->nBlockData;
|
||||
TSKEY lastKey = TSKEY_MIN;
|
||||
int64_t suid = pIter->pTbData->suid;
|
||||
int64_t uid = pIter->pTbData->uid;
|
||||
|
||||
|
@ -563,16 +544,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
|
|||
code = tBlockDataAppendRow(pBlockData, pRow, pCommitter->pTSchema);
|
||||
if (code) goto _err;
|
||||
|
||||
// update
|
||||
pBlock->minVersion = TMIN(pBlock->minVersion, TSDBROW_VERSION(pRow));
|
||||
pBlock->maxVersion = TMIN(pBlock->maxVersion, TSDBROW_VERSION(pRow));
|
||||
pBlock->nRow++;
|
||||
if (TSDBROW_TS(pRow) == lastKey) {
|
||||
pBlock->hasDup = 1;
|
||||
} else {
|
||||
lastKey = TSDBROW_TS(pRow);
|
||||
}
|
||||
|
||||
tsdbTbDataIterNext(pIter);
|
||||
pRow = tsdbTbDataIterGet(pIter);
|
||||
if (pRow && tsdbKeyCmprFn(&TSDBROW_KEY(pRow), &toKey) >= 0) pRow = NULL;
|
||||
|
@ -596,7 +567,6 @@ static int32_t tsdbCommitTableMemData(SCommitter *pCommitter, STbDataIter *pIter
|
|||
|
||||
tBlockReset(pBlock);
|
||||
tBlockDataReset(pBlockData);
|
||||
lastKey = TSKEY_MIN;
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -614,12 +584,7 @@ static int32_t tsdbCommitTableDiskData(SCommitter *pCommitter, SBlock *pBlock, S
|
|||
if (code) goto _err;
|
||||
|
||||
tBlockReset(&pCommitter->nBlock);
|
||||
pCommitter->nBlock.minKey = pBlock->minKey;
|
||||
pCommitter->nBlock.maxKey = pBlock->maxKey;
|
||||
pCommitter->nBlock.minVersion = pBlock->minVersion;
|
||||
pCommitter->nBlock.nRow = pBlock->nRow;
|
||||
pCommitter->nBlock.last = pBlock->last;
|
||||
pCommitter->nBlock.hasDup = pBlock->hasDup;
|
||||
code = tsdbWriteBlockData(pCommitter->pWriter, &pCommitter->oBlockData, NULL, NULL, pBlockIdx, &pCommitter->nBlock,
|
||||
pCommitter->cmprAlg);
|
||||
if (code) goto _err;
|
||||
|
|
|
@ -176,7 +176,8 @@ static int32_t tsdbApplyDFileSetChange(STsdbFS *pFS, SDFileSet *pFrom, SDFileSet
|
|||
if (pFrom && pTo) {
|
||||
// head
|
||||
if (tsdbFileIsSame(pFrom, pTo, TSDB_HEAD_FILE)) {
|
||||
ASSERT(0);
|
||||
ASSERT(pFrom->fHead.size == pTo->fHead.size);
|
||||
ASSERT(pFrom->fHead.offset == pTo->fHead.offset);
|
||||
} else {
|
||||
tsdbDataFileName(pFS->pTsdb, pFrom, TSDB_HEAD_FILE, fname);
|
||||
taosRemoveFile(fname);
|
||||
|
|
|
@ -107,7 +107,7 @@ int32_t tsdbWriteDelData(SDelFWriter *pWriter, SArray *aDelData, uint8_t **ppBuf
|
|||
*(SBlockDataHdr *)(*ppBuf) = hdr;
|
||||
n += sizeof(hdr);
|
||||
for (int32_t iDelData = 0; iDelData < taosArrayGetSize(aDelData); iDelData++) {
|
||||
size += tPutDelData(*ppBuf + n, taosArrayGet(aDelData, iDelData));
|
||||
n += tPutDelData(*ppBuf + n, taosArrayGet(aDelData, iDelData));
|
||||
}
|
||||
taosCalcChecksumAppend(0, *ppBuf, size);
|
||||
|
||||
|
@ -1220,8 +1220,25 @@ int32_t tsdbWriteBlockData(SDataFWriter *pWriter, SBlockData *pBlockData, uint8_
|
|||
if (!ppBuf1) ppBuf1 = &pBuf1;
|
||||
if (!ppBuf2) ppBuf2 = &pBuf2;
|
||||
|
||||
pBlock->minKey = MIN_TSDBKEY(pBlock->minKey, tBlockDataFirstKey(pBlockData));
|
||||
pBlock->maxKey = MAX_TSDBKEY(pBlock->maxKey, tBlockDataLastKey(pBlockData));
|
||||
TSKEY lastKey = TSKEY_MIN;
|
||||
for (int32_t iRow = 0; iRow < pBlockData->nRow; iRow++) {
|
||||
TSDBKEY key = TSDBROW_KEY(&tsdbRowFromBlockData(pBlockData, iRow));
|
||||
if (iRow == 0) {
|
||||
pBlock->minKey = MIN_TSDBKEY(pBlock->minKey, key);
|
||||
}
|
||||
|
||||
if (iRow == pBlockData->nRow - 1) {
|
||||
pBlock->maxKey = MAX_TSDBKEY(pBlock->maxKey, key);
|
||||
}
|
||||
|
||||
pBlock->minVersion = TMIN(pBlock->minVersion, key.version);
|
||||
pBlock->maxVersion = TMAX(pBlock->maxVersion, key.version);
|
||||
if (key.ts == lastKey) {
|
||||
pBlock->hasDup = 1;
|
||||
}
|
||||
lastKey = key.ts;
|
||||
}
|
||||
pBlock->nRow += pBlockData->nRow;
|
||||
|
||||
pSubBlock->nRow = pBlockData->nRow;
|
||||
pSubBlock->cmprAlg = cmprAlg;
|
||||
|
|
|
@ -1085,7 +1085,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tColDataPCmprFn(const void *p1, const void *p2) {
|
||||
static FORCE_INLINE int32_t tColDataPCmprFn(const void *p1, const void *p2) {
|
||||
SColData *pColData1 = *(SColData **)p1;
|
||||
SColData *pColData2 = *(SColData **)p2;
|
||||
|
||||
|
|
Loading…
Reference in New Issue