more work
This commit is contained in:
parent
6843fd6e31
commit
bad9d55119
|
@ -15,6 +15,7 @@
|
|||
#include "tsdbMain.h"
|
||||
|
||||
#define TSDB_IVLD_FID INT_MIN
|
||||
#define TSDB_MAX_SUBBLOCKS 8
|
||||
|
||||
typedef struct {
|
||||
int minFid;
|
||||
|
@ -495,14 +496,7 @@ static int tsdbCommitToTable(SCommitH *pch, int tid) {
|
|||
pBlock = NULL;
|
||||
}
|
||||
} else if ((cidx < nBlocks) && (pBlock->last || tsdbComparKeyBlock((void *)(&nextKey), pBlock) == 0)) {
|
||||
TSKEY keyLimit;
|
||||
if (cidx == nBlocks - 1) {
|
||||
keyLimit = pch->maxKey;
|
||||
} else {
|
||||
keyLimit = pBlock[1].keyFirst - 1;
|
||||
}
|
||||
|
||||
if (tsdbMergeMemData(pch, pIter, pBlock, keyLimit) < 0) {
|
||||
if (tsdbMergeMemData(pch, pIter, cidx) < 0) {
|
||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||
return -1;
|
||||
}
|
||||
|
@ -874,8 +868,74 @@ static int tsdbCommitMemData(SCommitH *pCommith, SCommitIter *pIter, TSKEY keyLi
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, SBlock *pBlock, TSKEY keyLimit) {
|
||||
// TODO
|
||||
static int tsdbMergeMemData(SCommitH *pCommith, SCommitIter *pIter, int bidx) {
|
||||
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||
int nBlocks = pCommith->readh.pBlkIdx->numOfBlocks;
|
||||
SBlock * pBlock = pCommith->readh.pBlkInfo->blocks + bidx;
|
||||
TSKEY keyLimit;
|
||||
int16_t colId = 0;
|
||||
SMergeInfo mInfo;
|
||||
SBlock subBlocks[TSDB_MAX_SUBBLOCKS];
|
||||
SBlock block, supBlock;
|
||||
SDFile * pDFile;
|
||||
|
||||
if (bidx == nBlocks - 1) {
|
||||
keyLimit = pCommith->maxKey;
|
||||
} else {
|
||||
keyLimit = pBlock[1].keyFirst - 1;
|
||||
}
|
||||
|
||||
SSkipListIterator titer = *(pIter->pIter);
|
||||
if (tsdbLoadBlockDataCols(&(pCommith->readh), pBlock, NULL, &colId, 1) < 0) return -1;
|
||||
|
||||
tsdbLoadDataFromCache(pIter->pTable, &titer, keyLimit, INT32_MAX, NULL, pCommith->readh.pDCols[0]->cols[0].pData,
|
||||
pCommith->readh.pDCols[0]->numOfRows, pCfg->update, &mInfo);
|
||||
|
||||
if (mInfo.nOperations == 0) {
|
||||
// no new data to insert (all updates denied)
|
||||
if (tsdbMoveBlock(pCommith, bidx) < 0) {
|
||||
return -1;
|
||||
}
|
||||
*(pIter->pIter) = titer;
|
||||
} else if (pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed == 0) {
|
||||
// Ignore the block
|
||||
ASSERT(0);
|
||||
*(pIter->pIter) = titer;
|
||||
} else if (tsdbCanAddSubBlock()) {
|
||||
// Add a sub-block
|
||||
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, keyLimit, INT32_MAX, pCommith->pDataCols,
|
||||
pCommith->readh.pDCols[0]->cols[0].pData, pCommith->readh.pDCols[0]->numOfRows, pCfg->update,
|
||||
&mInfo);
|
||||
if (pBlock->last) {
|
||||
pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
|
||||
} else {
|
||||
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
|
||||
}
|
||||
|
||||
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, pBlock->last, false) < 0) return -1;
|
||||
|
||||
if (pBlock->numOfSubBlocks == 1) {
|
||||
subBlocks[0] = *pBlock;
|
||||
subBlocks[0].numOfSubBlocks = 0;
|
||||
} else {
|
||||
memcpy(subBlocks, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
|
||||
sizeof(SBlock) * pBlock->numOfSubBlocks);
|
||||
}
|
||||
subBlocks[pBlock->numOfSubBlocks] = block;
|
||||
supBlock = *pBlock;
|
||||
supBlock.keyFirst = mInfo.keyFirst;
|
||||
supBlock.keyLast = mInfo.keyLast;
|
||||
supBlock.numOfSubBlocks++;
|
||||
supBlock.numOfRows = pBlock->numOfRows + mInfo.rowsInserted - mInfo.rowsDeleteSucceed;
|
||||
supBlock.offset = taosArrayGetSize(pCommith->aSubBlk) * sizeof(SBlock);
|
||||
|
||||
if (tsdbCommitAddBlock(pCommith, &supBlock, subBlocks, pBlock->numOfSubBlocks + 1) < 0) return -1;
|
||||
} else {
|
||||
if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
|
||||
if (tsdbMergeBlockData(pCommith, pIter, pCommith->readh.pDCols[0], keyLimit, bidx == (nBlocks - 1)) < 0) return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -887,32 +947,141 @@ static int tsdbMoveBlock(SCommitH *pCommith, int bidx) {
|
|||
|
||||
if (tfsIsSameFile(&(pCommitF->f), &(pReadF->f))) {
|
||||
if (pBlock->numOfSubBlocks == 1) {
|
||||
if (taosArrayPush(pCommith->aSupBlk, (void *)pBlock) == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
if (tsdbCommitAddBlock(pCommith, pBlock, NULL, 0) < 0) return -1;
|
||||
} else {
|
||||
block = *pBlock;
|
||||
block.offset = sizeof(SBlock) * taosArrayGetSize(pCommith->aSupBlock);
|
||||
|
||||
if (taosArrayPush(pCommith->aSupBlk, (void *)(&block)) == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (taosArrayPushBatch(pCommith->aSubBlk, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset), pBlock->numOfSubBlocks) == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
if (tsdbCommitAddBlock(pCommith, &block, POINTER_SHIFT(pCommith->readh.pBlkInfo, pBlock->offset),
|
||||
pBlock->numOfSubBlocks) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if (tsdbLoadBlockData(&(pCommith->readh), pBlock, NULL) < 0) return -1;
|
||||
if (tsdbWriteBlock(pCommith, pCommitF, pCommith->readh.pDCols[0], &block, pBlock->last, true) < 0) return -1;
|
||||
if (taosArrayPush(pCommith->aSupBlk, (void *)(&block)) == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const SBlock *pSubBlocks, int nSubBlocks) {
|
||||
ASSERT(pSupBlock != NULL);
|
||||
|
||||
if (taosArrayPush(pCommith->aSupBlk, pSupBlock) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pSubBlocks && taosArrayPushBatch(pCommith->aSupBlk, pSubBlocks, nSubBlocks) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock) {
|
||||
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
|
||||
STsdbCfg * pCfg = REPO_CFG(pRepo);
|
||||
SBlock block;
|
||||
SDFile * pDFile;
|
||||
bool isLast;
|
||||
int32_t defaultRows = TSDB_COMMIT_DEFAULT_ROWS(pCommith);
|
||||
|
||||
int biter = 0;
|
||||
while (true) {
|
||||
tsdbLoadAndMergeFromCache(pCommith->readh.pDCols[0], &biter, pIter, pCommith->pDataCols, keyLimit, defaultRows,
|
||||
pCfg->update);
|
||||
|
||||
if (pCommith->pDataCols->numOfRows == 0) break;
|
||||
|
||||
if (isLastOneBlock) {
|
||||
if (pCommith->pDataCols->numOfRows < pCfg->minRowsPerFileBlock) {
|
||||
pDFile = TSDB_COMMIT_LAST_FILE(pCommith);
|
||||
isLast = true;
|
||||
} else {
|
||||
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
|
||||
isLast = false;
|
||||
}
|
||||
} else {
|
||||
pDFile = TSDB_COMMIT_DATA_FILE(pCommith);
|
||||
isLast = false;
|
||||
}
|
||||
|
||||
if (tsdbWriteBlock(pCommith, pDFile, pCommith->pDataCols, &block, isLast, true) < 0) return -1;
|
||||
if (tsdbCommitAddBlock(pCommith, &block, NULL, 0) < 0) return -1;
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIter *pCommitIter, SDataCols *pTarget,
|
||||
TSKEY maxKey, int maxRows, int8_t update) {
|
||||
TSKEY key1 = INT64_MAX;
|
||||
TSKEY key2 = INT64_MAX;
|
||||
STSchema *pSchema = NULL;
|
||||
|
||||
ASSERT(maxRows > 0 && dataColsKeyLast(pDataCols) <= maxKey);
|
||||
tdResetDataCols(pTarget);
|
||||
|
||||
while (true) {
|
||||
key1 = (*iter >= pDataCols->numOfRows) ? INT64_MAX : dataColsKeyAt(pDataCols, *iter);
|
||||
bool isRowDel = false;
|
||||
SDataRow row = tsdbNextIterRow(pCommitIter->pIter);
|
||||
if (row == NULL || dataRowKey(row) > maxKey) {
|
||||
key2 = INT64_MAX;
|
||||
} else {
|
||||
key2 = dataRowKey(row);
|
||||
isRowDel = dataRowDeleted(row);
|
||||
}
|
||||
|
||||
if (key1 == INT64_MAX && key2 == INT64_MAX) break;
|
||||
|
||||
if (key1 < key2) {
|
||||
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
|
||||
pTarget->maxPoints);
|
||||
}
|
||||
|
||||
pTarget->numOfRows++;
|
||||
(*iter)++;
|
||||
} else if (key1 > key2) {
|
||||
if (!isRowDel) {
|
||||
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row));
|
||||
ASSERT(pSchema != NULL);
|
||||
}
|
||||
|
||||
tdAppendDataRowToDataCol(row, pSchema, pTarget);
|
||||
}
|
||||
|
||||
tSkipListIterNext(pCommitIter->pIter);
|
||||
} else {
|
||||
if (update) {
|
||||
if (!isRowDel) {
|
||||
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||
pSchema = tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, dataRowVersion(row));
|
||||
ASSERT(pSchema != NULL);
|
||||
}
|
||||
|
||||
tdAppendDataRowToDataCol(row, pSchema, pTarget);
|
||||
}
|
||||
} else {
|
||||
ASSERT(!isRowDel);
|
||||
|
||||
for (int i = 0; i < pDataCols->numOfCols; i++) {
|
||||
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
|
||||
pTarget->maxPoints);
|
||||
}
|
||||
|
||||
pTarget->numOfRows++;
|
||||
}
|
||||
(*iter)++;
|
||||
tSkipListIterNext(pCommitIter->pIter);
|
||||
}
|
||||
|
||||
if (pTarget->numOfRows >= maxRows) break;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue