Merge branch '2.0' into liaohj_2
This commit is contained in:
commit
db171b36e9
|
@ -100,6 +100,8 @@ SListNode *tdListPopHead(SList *list) {
|
|||
list->head = node->next;
|
||||
}
|
||||
list->numOfEles--;
|
||||
node->next = NULL;
|
||||
node->prev = NULL;
|
||||
return node;
|
||||
}
|
||||
|
||||
|
@ -113,6 +115,7 @@ SListNode *tdListPopTail(SList *list) {
|
|||
list->tail = node->prev;
|
||||
}
|
||||
list->numOfEles--;
|
||||
node->next = node->prev = NULL;
|
||||
return node;
|
||||
}
|
||||
|
||||
|
@ -131,6 +134,7 @@ SListNode *tdListPopNode(SList *list, SListNode *node) {
|
|||
node->next->prev = node->prev;
|
||||
}
|
||||
list->numOfEles--;
|
||||
node->next = node->prev = NULL;
|
||||
|
||||
return node;
|
||||
}
|
||||
|
|
|
@ -157,22 +157,22 @@ TEST(testCase, string_strnchr_test) {
|
|||
EXPECT_TRUE(strnchr(a10, '.', strlen(a10), true) == NULL);
|
||||
}
|
||||
|
||||
TEST(testCase, cache_resize_test) {
|
||||
char a11[] = "abc'.'";
|
||||
EXPECT_TRUE(strnchr(a11, '.', strlen(a11), false) != NULL);
|
||||
// TEST(testCase, cache_resize_test) {
|
||||
// char a11[] = "abc'.'";
|
||||
// EXPECT_TRUE(strnchr(a11, '.', strlen(a11), false) != NULL);
|
||||
|
||||
char a12[] = "abc'-'";
|
||||
EXPECT_TRUE(strnchr(a12, '-', strlen(a12), false) != NULL);
|
||||
// char a12[] = "abc'-'";
|
||||
// EXPECT_TRUE(strnchr(a12, '-', strlen(a12), false) != NULL);
|
||||
|
||||
char a15[] = "abc'-'";
|
||||
EXPECT_TRUE(strnchr(a15, '-', strlen(a15), true) == NULL);
|
||||
// char a15[] = "abc'-'";
|
||||
// EXPECT_TRUE(strnchr(a15, '-', strlen(a15), true) == NULL);
|
||||
|
||||
char a13[] = "'-'";
|
||||
EXPECT_TRUE(strnchr(a13, '-', strlen(a13), false) != NULL);
|
||||
// char a13[] = "'-'";
|
||||
// EXPECT_TRUE(strnchr(a13, '-', strlen(a13), false) != NULL);
|
||||
|
||||
char a14[] = "'-'";
|
||||
EXPECT_TRUE(strnchr(a14, '-', strlen(a14), true) == NULL);
|
||||
// char a14[] = "'-'";
|
||||
// EXPECT_TRUE(strnchr(a14, '-', strlen(a14), true) == NULL);
|
||||
|
||||
char a16[] = "'-'.";
|
||||
EXPECT_TRUE(strnchr(a16, '.', strlen(a16), true) != NULL);
|
||||
}
|
||||
// char a16[] = "'-'.";
|
||||
// EXPECT_TRUE(strnchr(a16, '.', strlen(a16), true) != NULL);
|
||||
// }
|
|
@ -345,12 +345,12 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
|
|||
|
||||
int32_t tsdbLockRepo(tsdb_repo_t *repo) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
return pthread_mutex_lock(repo);
|
||||
return pthread_mutex_lock(&(pRepo->mutex));
|
||||
}
|
||||
|
||||
int32_t tsdbUnLockRepo(tsdb_repo_t *repo) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
return pthread_mutex_unlock(repo);
|
||||
return pthread_mutex_unlock(&(pRepo->mutex));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -905,6 +905,9 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
|||
int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey);
|
||||
if (!hasDataToCommit) return 0; // No data to commit, just return
|
||||
|
||||
// TODO: make it more flexible
|
||||
pCompInfo = (SCompInfo *)malloc(sizeof(SCompInfo) + sizeof(SCompBlock) * 1000);
|
||||
|
||||
// Create and open files for commit
|
||||
tsdbGetDataDirName(pRepo, dataDir);
|
||||
if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { /* TODO */
|
||||
|
@ -913,7 +916,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
|||
if (pGroup == NULL) { /* TODO */
|
||||
}
|
||||
tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0);
|
||||
if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
|
||||
if (0 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) {
|
||||
// TODO: make it not to write the last file every time
|
||||
tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0);
|
||||
isNewLastFile = 1;
|
||||
|
@ -934,6 +937,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
|||
SSkipListIterator *pIter = iters[tid];
|
||||
SCompIdx * pIdx = &pIndices[tid];
|
||||
|
||||
int nNewBlocks = 0;
|
||||
|
||||
if (pTable == NULL || pIter == NULL) continue;
|
||||
|
||||
/* If no new data to write for this table, just write the old data to new file
|
||||
|
@ -941,8 +946,10 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
|||
*/
|
||||
if (!tsdbHasDataInRange(pIter, minKey, maxKey)) {
|
||||
// has old data
|
||||
if (pIdx->offset > 0) {
|
||||
if (isNewLastFile && pIdx->hasLast) {
|
||||
if (pIdx->len > 0) {
|
||||
goto _table_over;
|
||||
// if (isNewLastFile && pIdx->hasLast) {
|
||||
if (0) {
|
||||
// need to move the last block to new file
|
||||
if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) { /* TODO */
|
||||
}
|
||||
|
@ -981,9 +988,14 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
|||
continue;
|
||||
}
|
||||
|
||||
pCompInfo->delimiter = TSDB_FILE_DELIMITER;
|
||||
pCompInfo->checksum = 0;
|
||||
pCompInfo->uid = pTable->tableId.uid;
|
||||
|
||||
// Load SCompBlock part if neccessary
|
||||
int isCompBlockLoaded = 0;
|
||||
if (pIdx->offset > 0) {
|
||||
if (0) {
|
||||
// if (pIdx->offset > 0) {
|
||||
if (pIdx->hasLast || tsdbHasDataInRange(pIter, minKey, pIdx->maxKey)) {
|
||||
// has last block || cache key overlap with commit key
|
||||
pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len + sizeof(SCompBlock) * 100);
|
||||
|
@ -1003,34 +1015,50 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters
|
|||
tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols);
|
||||
if (pCols->numOfPoints == 0) break;
|
||||
|
||||
int pointsWritten = 0;
|
||||
// { // TODO : try to write the block data to file
|
||||
// if (!isCompBlockLoaded) { // Just append
|
||||
// if (pCols->numOfPoints > pCfg->minRowsPerFileBlock) { // write directly to .data file
|
||||
// lseek(pGroup->files[TSDB_FILE_TYPE_DATA], 0, SEEK_END);
|
||||
int pointsWritten = pCols->numOfPoints;
|
||||
// TODO: all write to the end of .data file
|
||||
int64_t toffset = 0;
|
||||
int32_t tlen = 0;
|
||||
tsdbWriteBlockToFileImpl(&pGroup->files[TSDB_FILE_TYPE_DATA], pCols, pCols->numOfPoints, &toffset, &tlen, pTable->tableId.uid);
|
||||
|
||||
// } else {
|
||||
// if (isNewLastFile) { // write directly to .l file
|
||||
// Make the compBlock
|
||||
SCompBlock *pTBlock = pCompInfo->blocks + nNewBlocks++;
|
||||
pTBlock->offset = toffset;
|
||||
pTBlock->len = tlen;
|
||||
pTBlock->keyFirst = dataColsKeyFirst(pCols);
|
||||
pTBlock->keyLast = dataColsKeyLast(pCols);
|
||||
pTBlock->last = 0;
|
||||
pTBlock->algorithm = 0;
|
||||
pTBlock->numOfPoints = pCols->numOfPoints;
|
||||
pTBlock->sversion = pTable->sversion;
|
||||
pTBlock->numOfSubBlocks = 1;
|
||||
|
||||
// } else { // write directly to .last file
|
||||
if (dataColsKeyLast(pCols) > pIdx->maxKey) pIdx->maxKey = dataColsKeyLast(pCols);
|
||||
|
||||
// }
|
||||
// }
|
||||
// } else { // Need to append
|
||||
// // SCompBlock *pTBlock = NULL;
|
||||
// }
|
||||
// }
|
||||
// pointsWritten = pCols->numOfPoints;
|
||||
tdPopDataColsPoints(pCols, pointsWritten);
|
||||
maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5 - pCols->numOfPoints;
|
||||
}
|
||||
|
||||
|
||||
_table_over:
|
||||
// Write the SCompBlock part
|
||||
if (isCompBlockLoaded) {
|
||||
// merge the block into old and update pIdx
|
||||
pIdx->offset = lseek(hFile.fd, 0, SEEK_END);
|
||||
if (pIdx->len > 0) {
|
||||
sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len);
|
||||
if (nNewBlocks > 0) {
|
||||
write(hFile.fd, (void *)(pCompInfo->blocks), sizeof(SCompBlock) * nNewBlocks);
|
||||
pIdx->len += (sizeof(SCompBlock) * nNewBlocks);
|
||||
}
|
||||
} else {
|
||||
// sendfile the SCompBlock part and update the pIdx
|
||||
if (nNewBlocks > 0) {
|
||||
write(hFile.fd, (void *)pCompInfo, sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks);
|
||||
pIdx->len += sizeof(SCompInfo) + sizeof(SCompBlock) * nNewBlocks;
|
||||
}
|
||||
}
|
||||
|
||||
pIdx->checksum = 0;
|
||||
pIdx->numOfSuperBlocks += nNewBlocks;
|
||||
pIdx->hasLast = 0;
|
||||
}
|
||||
|
||||
// Write the SCompIdx part
|
||||
|
|
Loading…
Reference in New Issue