TD-100
This commit is contained in:
parent
a85c04c0fb
commit
3f5b1415be
|
@ -710,11 +710,13 @@ void *tcalloc(size_t nmemb, size_t size) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t tsizeof(void *ptr) { return *(size_t *)((char *)ptr - sizeof(size_t)); }
|
size_t tsizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; }
|
||||||
|
|
||||||
void tmemset(void *ptr, int c) { memset(ptr, c, tsizeof(ptr)); }
|
void tmemset(void *ptr, int c) { memset(ptr, c, tsizeof(ptr)); }
|
||||||
|
|
||||||
void * trealloc(void *ptr, size_t size) {
|
void * trealloc(void *ptr, size_t size) {
|
||||||
|
if (ptr == NULL) return tmalloc(size);
|
||||||
|
|
||||||
if (size <= tsizeof(ptr)) return ptr;
|
if (size <= tsizeof(ptr)) return ptr;
|
||||||
|
|
||||||
void * tptr = (void *)((char *)ptr - sizeof(size_t));
|
void * tptr = (void *)((char *)ptr - sizeof(size_t));
|
||||||
|
@ -727,4 +729,8 @@ void * trealloc(void *ptr, size_t size) {
|
||||||
return (void *)((char *)tptr + sizeof(size_t));
|
return (void *)((char *)tptr + sizeof(size_t));
|
||||||
}
|
}
|
||||||
|
|
||||||
void tzfree(void *ptr) { free((void *)((char *)ptr - sizeof(size_t))); }
|
void tzfree(void *ptr) {
|
||||||
|
if (ptr) {
|
||||||
|
free((void *)((char *)ptr - sizeof(size_t)));
|
||||||
|
}
|
||||||
|
}
|
|
@ -388,17 +388,17 @@ typedef struct {
|
||||||
// For file set usage
|
// For file set usage
|
||||||
SHelperFile files;
|
SHelperFile files;
|
||||||
SCompIdx * pCompIdx;
|
SCompIdx * pCompIdx;
|
||||||
size_t compIdxSize;
|
// size_t compIdxSize;
|
||||||
|
|
||||||
// For table set usage
|
// For table set usage
|
||||||
SHelperTable tableInfo;
|
SHelperTable tableInfo;
|
||||||
SCompInfo * pCompInfo;
|
SCompInfo * pCompInfo;
|
||||||
size_t compInfoSize;
|
// size_t compInfoSize;
|
||||||
bool hasOldLastBlock;
|
bool hasOldLastBlock;
|
||||||
|
|
||||||
// For block set usage
|
// For block set usage
|
||||||
SCompData *pCompData;
|
SCompData *pCompData;
|
||||||
size_t compDataSize;
|
// size_t compDataSize;
|
||||||
SDataCols *pDataCols[2];
|
SDataCols *pDataCols[2];
|
||||||
|
|
||||||
} SRWHelper;
|
} SRWHelper;
|
||||||
|
|
|
@ -52,8 +52,9 @@ static void tsdbResetHelperFileImpl(SRWHelper *pHelper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbInitHelperFile(SRWHelper *pHelper) {
|
static int tsdbInitHelperFile(SRWHelper *pHelper) {
|
||||||
pHelper->compIdxSize = sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
|
// pHelper->compIdxSize = sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
|
||||||
pHelper->pCompIdx = (SCompIdx *)malloc(pHelper->compIdxSize);
|
size_t tsize = sizeof(SCompIdx) * pHelper->config.maxTables + sizeof(TSCKSUM);
|
||||||
|
pHelper->pCompIdx = (SCompIdx *)tmalloc(tsize);
|
||||||
if (pHelper->pCompIdx == NULL) return -1;
|
if (pHelper->pCompIdx == NULL) return -1;
|
||||||
|
|
||||||
tsdbResetHelperFileImpl(pHelper);
|
tsdbResetHelperFileImpl(pHelper);
|
||||||
|
@ -62,7 +63,7 @@ static int tsdbInitHelperFile(SRWHelper *pHelper) {
|
||||||
|
|
||||||
static void tsdbDestroyHelperFile(SRWHelper *pHelper) {
|
static void tsdbDestroyHelperFile(SRWHelper *pHelper) {
|
||||||
tsdbCloseHelperFile(pHelper, false);
|
tsdbCloseHelperFile(pHelper, false);
|
||||||
tfree(pHelper->pCompIdx);
|
tzfree(pHelper->pCompIdx);
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------- Operations on Helper Table part
|
// ---------- Operations on Helper Table part
|
||||||
|
@ -75,7 +76,7 @@ static void tsdbInitHelperTable(SRWHelper *pHelper) {
|
||||||
tsdbResetHelperTableImpl(pHelper);
|
tsdbResetHelperTableImpl(pHelper);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbDestroyHelperTable(SRWHelper *pHelper) { return; }
|
static void tsdbDestroyHelperTable(SRWHelper *pHelper) { tzfree((void *)pHelper->pCompInfo); }
|
||||||
|
|
||||||
// ---------- Operations on Helper Block part
|
// ---------- Operations on Helper Block part
|
||||||
static void tsdbResetHelperBlockImpl(SRWHelper *pHelper) {
|
static void tsdbResetHelperBlockImpl(SRWHelper *pHelper) {
|
||||||
|
@ -94,6 +95,7 @@ static int tsdbInitHelperBlock(SRWHelper *pHelper) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbDestroyHelperBlock(SRWHelper *pHelper) {
|
static void tsdbDestroyHelperBlock(SRWHelper *pHelper) {
|
||||||
|
tzfree(pHelper->pCompData);
|
||||||
tdFreeDataCols(pHelper->pDataCols[0]);
|
tdFreeDataCols(pHelper->pDataCols[0]);
|
||||||
tdFreeDataCols(pHelper->pDataCols[1]);
|
tdFreeDataCols(pHelper->pDataCols[1]);
|
||||||
}
|
}
|
||||||
|
@ -377,7 +379,7 @@ int tsdbWriteCompInfo(SRWHelper *pHelper) {
|
||||||
int tsdbWriteCompIdx(SRWHelper *pHelper) {
|
int tsdbWriteCompIdx(SRWHelper *pHelper) {
|
||||||
if (lseek(pHelper->files.nHeadF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
|
if (lseek(pHelper->files.nHeadF.fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
|
||||||
|
|
||||||
if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pCompIdx, pHelper->compIdxSize) < pHelper->compIdxSize)
|
if (twrite(pHelper->files.nHeadF.fd, (void *)pHelper->pCompIdx, tsizeof(pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx))
|
||||||
return -1;
|
return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -390,13 +392,13 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) {
|
||||||
int fd = pHelper->files.headF.fd;
|
int fd = pHelper->files.headF.fd;
|
||||||
|
|
||||||
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
|
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) return -1;
|
||||||
if (tread(fd, (void *)(pHelper->pCompIdx), pHelper->compIdxSize) < pHelper->compIdxSize) return -1;
|
if (tread(fd, (void *)(pHelper->pCompIdx), tsizeof(pHelper->pCompIdx)) < tsizeof(pHelper->pCompIdx)) return -1;
|
||||||
// TODO: check the correctness of the part
|
// TODO: check the correctness of the part
|
||||||
}
|
}
|
||||||
helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
|
helperSetState(pHelper, TSDB_HELPER_IDX_LOAD);
|
||||||
|
|
||||||
// Copy the memory for outside usage
|
// Copy the memory for outside usage
|
||||||
if (target) memcpy(target, pHelper->pCompIdx, pHelper->compIdxSize);
|
if (target) memcpy(target, pHelper->pCompIdx, tsizeof(pHelper->pCompIdx));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -415,7 +417,8 @@ int tsdbLoadCompInfo(SRWHelper *pHelper, void *target) {
|
||||||
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
|
if (!helperHasState(pHelper, TSDB_HELPER_INFO_LOAD)) {
|
||||||
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1;
|
if (lseek(fd, pIdx->offset, SEEK_SET) < 0) return -1;
|
||||||
|
|
||||||
adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, pIdx->len);
|
// adjustMem(pHelper->pCompInfo, pHelper->compInfoSize, pIdx->len);
|
||||||
|
pHelper->pCompInfo = trealloc((void *)pHelper->pCompInfo, pIdx->len);
|
||||||
if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
|
if (tread(fd, (void *)(pHelper->pCompInfo), pIdx->len) < pIdx->len) return -1;
|
||||||
// TODO: check the checksum
|
// TODO: check the checksum
|
||||||
|
|
||||||
|
@ -433,9 +436,9 @@ int tsdbLoadCompData(SRWHelper *pHelper, SCompBlock *pCompBlock, void *target) {
|
||||||
|
|
||||||
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) return -1;
|
if (lseek(fd, pCompBlock->offset, SEEK_SET) < 0) return -1;
|
||||||
|
|
||||||
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols;
|
size_t tsize = sizeof(SCompData) + sizeof(SCompCol) * pCompBlock->numOfCols + sizeof(TSCKSUM);
|
||||||
adjustMem(pHelper->pCompData, pHelper->compDataSize, tsize);
|
pHelper->pCompData = trealloc((void *)pHelper->pCompData, tsize);
|
||||||
|
if (pHelper->pCompData == NULL) return -1;
|
||||||
if (tread(fd, (void *)pHelper->pCompData, tsize) < tsize) return -1;
|
if (tread(fd, (void *)pHelper->pCompData, tsize) < tsize) return -1;
|
||||||
|
|
||||||
ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols);
|
ASSERT(pCompBlock->numOfCols == pHelper->pCompData->numOfCols);
|
||||||
|
@ -860,73 +863,16 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
||||||
|
|
||||||
static int compTSKEY(const void *key1, const void *key2) { return ((TSKEY *)key1 - (TSKEY *)key2); }
|
static int compTSKEY(const void *key1, const void *key2) { return ((TSKEY *)key1 - (TSKEY *)key2); }
|
||||||
|
|
||||||
// Get the number of rows the data can be merged into the block
|
|
||||||
// static int tsdbGetRowsCanBeMergedWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDataCols) {
|
|
||||||
// int rowsCanMerge = 0;
|
|
||||||
// TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
|
||||||
|
|
||||||
// SCompIdx * pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
|
||||||
// SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
|
||||||
|
|
||||||
// ASSERT(blkIdx < pIdx->numOfSuperBlocks);
|
|
||||||
|
|
||||||
// TSKEY keyMax = (blkIdx < pIdx->numOfSuperBlocks + 1) ? (pCompBlock + 1)->keyFirst - 1 : pHelper->files.maxKey;
|
|
||||||
|
|
||||||
// if (keyFirst > pCompBlock->keyLast) {
|
|
||||||
// void *ptr = taosbsearch((void *)(&keyMax), pDataCols->cols[0].pData, pDataCols->numOfPoints, sizeof(TSKEY),
|
|
||||||
// compTSKEY, TD_LE);
|
|
||||||
// ASSERT(ptr != NULL);
|
|
||||||
|
|
||||||
// rowsCanMerge =
|
|
||||||
// MIN((TSKEY *)ptr - (TSKEY *)pDataCols->cols[0].pData, pHelper->config.minRowsPerFileBlock - pCompBlock->numOfPoints);
|
|
||||||
|
|
||||||
// } else {
|
|
||||||
// int32_t colId[1] = {0};
|
|
||||||
// if (tsdbLoadBlockDataCols(pHelper, NULL, blkIdx, colId, 1) < 0) goto _err;
|
|
||||||
|
|
||||||
// int iter1 = 0; // For pDataCols
|
|
||||||
// int iter2 = 0; // For loaded data cols
|
|
||||||
|
|
||||||
// while (1) {
|
|
||||||
// if (iter1 >= pDataCols->numOfPoints || iter2 >= pHelper->pDataCols[0]->numOfPoints) break;
|
|
||||||
// if (pCompBlock->numOfPoints + rowsCanMerge >= pHelper->config.maxRowsPerFileBlock) break;
|
|
||||||
|
|
||||||
// TSKEY key1 = dataColsKeyAt(pDataCols, iter1);
|
|
||||||
// TSKEY key2 = dataColsKeyAt(pHelper->pDataCols[0], iter2);
|
|
||||||
|
|
||||||
// if (key1 > keyMax) break;
|
|
||||||
|
|
||||||
// if (key1 < key2) {
|
|
||||||
// iter1++;
|
|
||||||
// } else if (key1 == key2) {
|
|
||||||
// iter1++;
|
|
||||||
// iter2++;
|
|
||||||
// } else {
|
|
||||||
// iter2++;
|
|
||||||
// rowsCanMerge++;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// return rowsCanMerge;
|
|
||||||
|
|
||||||
// _err:
|
|
||||||
// return -1;
|
|
||||||
// }
|
|
||||||
|
|
||||||
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) {
|
static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t spaceNeeded) {
|
||||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
||||||
|
|
||||||
size_t spaceLeft = pHelper->compInfoSize - pIdx->len;
|
size_t spaceLeft = tsizeof((void *)pHelper->pCompInfo) - pIdx->len;
|
||||||
ASSERT(spaceLeft >= 0);
|
ASSERT(spaceLeft >= 0);
|
||||||
if (spaceLeft < spaceNeeded) {
|
if (spaceLeft < spaceNeeded) {
|
||||||
size_t tsize = pHelper->compInfoSize + sizeof(SCompBlock) * 16;
|
size_t tsize = tsizeof(pHelper->pCompInfo) + sizeof(SCompBlock) * 16;
|
||||||
if (pHelper->compInfoSize == 0) tsize += sizeof(SCompInfo);
|
if (tsizeof(pHelper->pCompInfo) == 0) tsize += sizeof(SCompInfo);
|
||||||
|
|
||||||
pHelper->pCompInfo = (SCompInfo *)realloc((void *)(pHelper->pCompInfo), tsize);
|
pHelper->pCompInfo = (SCompInfo *)trealloc(pHelper->pCompInfo, tsize);
|
||||||
if (pHelper->pCompInfo == NULL) return -1;
|
|
||||||
|
|
||||||
pHelper->compInfoSize = tsize;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
Loading…
Reference in New Issue