diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index 52b2d1e156..a7016061e4 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -132,6 +132,7 @@ void tdResetDataCols(SDataCols *pCols); void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols); +void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); #ifdef __cplusplus } diff --git a/src/common/src/dataformat.c b/src/common/src/dataformat.c index 8f6a40805f..56565b6e5c 100644 --- a/src/common/src/dataformat.c +++ b/src/common/src/dataformat.c @@ -353,6 +353,10 @@ void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols) { } pCols->numOfPoints++; } +// Pop pointsToPop points from the SDataCols +void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) { + +} /** * Return the first part length of a data row for a schema diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index c761f8520e..e4592ffa2e 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -134,12 +134,15 @@ typedef struct { int64_t uid; // For recovery usage SCompCol cols[]; } SCompData; -int tsdbCopyCompBlockToFile(SFile *outFile, SFile *inFile, SCompInfo *pCompInfo, int index, int isOutLast); + +int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols); int tsdbLoadCompIdx(SFileGroup *pGroup, void *buf, int maxTables); int tsdbLoadCompBlocks(SFileGroup *pGroup, SCompIdx *pIdx, void *buf); int tsdbLoadCompCols(SFile *pFile, SCompBlock *pBlock, void *buf); int tsdbLoadColData(SFile *pFile, SCompCol *pCol, int64_t blockBaseOffset, void *buf); +int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData); + // TODO: need an API to merge all sub-block data into one int tsdbWriteBlockToFile(SFileGroup *pGroup, SCompInfo *pCompInfo, SCompIdx *pIdx, int isMerge, SCompBlock *pBlock, SDataCols *pCols); diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index f22274531d..c1b7ae2382 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -103,8 +103,59 @@ int tsdbRemoveFileGroup(STsdbFileH *pFileH, int fid) { return 0; } -int tsdbCopyCompBlockToFile(SFile *outFile, SFile *inFile, SCompInfo *pCompInfo, int index, int isOutLast) { - // TODO + +int tsdbLoadDataBlock(SFile *pFile, SCompBlock *pStartBlock, int numOfBlocks, SDataCols *pCols, SCompData *pCompData) { + SCompBlock *pBlock = pStartBlock; + for (int i = 0; i < numOfBlocks; i++) { + if (tsdbLoadCompCols(pFile, pBlock, (void *)pCompData) < 0) return -1; + for (int iCol = 0; iCol < pBlock->numOfCols; iCol++) { + SCompCol *pCompCol = &(pCompData->cols[iCol]); + pCols->numOfPoints += pBlock->numOfPoints; + int k = 0; + for (; k < pCols->numOfCols; k++) { + if (pCompCol->colId == pCols->cols[k].colId) break; + } + + if (tsdbLoadColData(pFile, pCompCol, pBlock->offset, + (void *)((char *)(pCols->cols[k].pData) + pCols->cols[k].len)) < 0) + return -1; + } + pStartBlock++; + } + return 0; +} + +int tsdbCopyBlockDataInFile(SFile *pOutFile, SFile *pInFile, SCompInfo *pCompInfo, int idx, int isLast, SDataCols *pCols) { + SCompBlock *pSuperBlock = TSDB_COMPBLOCK_AT(pCompInfo, idx); + SCompBlock *pStartBlock = NULL; + SCompBlock *pBlock = NULL; + int numOfBlocks = pSuperBlock->numOfSubBlocks; + + if (numOfBlocks == 1) + pStartBlock = pSuperBlock; + else + pStartBlock = TSDB_COMPBLOCK_AT(pCompInfo, pSuperBlock->offset); + + int maxNumOfCols = 0; + pBlock = pStartBlock; + for (int i = 0; i < numOfBlocks; i++) { + if (pBlock->numOfCols > maxNumOfCols) maxNumOfCols = pBlock->numOfCols; + pBlock++; + } + + SCompData *pCompData = (SCompData *)malloc(sizeof(SCompData) + sizeof(SCompCol) * maxNumOfCols); + if (pCompData == NULL) return -1; + + // Load data from the block + if (tsdbLoadDataBlock(pOutFile, pStartBlock, numOfBlocks, pCols, pCompData)); + + // Write data block to the file + { + // TODO + } + + + if (pCompData) free(pCompData); return 0; } diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index 3945ffabda..599af73813 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -88,6 +88,8 @@ static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname); static void * tsdbCommitData(void *arg); static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols); +static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey); +static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey); #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_NAME(pRepo, name) @@ -872,13 +874,12 @@ static void *tsdbCommitData(void *arg) { } static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters, SDataCols *pCols) { - int hasDataToCommit = 0; int isNewLastFile = 0; STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; STsdbCfg * pCfg = &pRepo->config; - SFile tFile, lFile; + SFile hFile, lFile; SFileGroup *pGroup = NULL; SCompIdx * pIndices = NULL; SCompInfo * pCompInfo = NULL; @@ -889,125 +890,181 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters TSKEY minKey = 0, maxKey = 0; tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); - for (int tid = 0; tid < pCfg->maxTables; tid++) { - STable * pTable = pMeta->tables[tid]; - SSkipListIterator *pIter = iters[tid]; - int isLoadCompBlocks = 0; - char dataDir[128] = "\0"; + // Check if there are data to commit to this file + int hasDataToCommit = tsdbHasDataToCommit(iters, pCfg->maxTables, minKey, maxKey); + if (!hasDataToCommit) return 0; // No data to commit, just return - if (pIter == NULL) continue; - tdInitDataCols(pCols, pTable->schema); - - int numOfWrites = 0; - int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; // We keep 20% of space for merge purpose - // Loop to read columns from cache - while (tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols)) { - if (!hasDataToCommit) { - // There are data to commit to this fileId, we need to create/open it for read/write. - // At the meantime, we set the flag to prevent further create/open operations - tsdbGetDataDirName(pRepo, dataDir); - - if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { - // TODO: deal with the ERROR here - } - // Open files for commit - pGroup = tsdbOpenFilesForCommit(pFileH, fid); - if (pGroup == NULL) { - // TODO: deal with the ERROR here - } - // TODO: open .h file and if neccessary, open .l file - tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &tFile, 1, 0); - if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { - // TODO: make it not to write the last file every time - tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); - isNewLastFile = 1; - } - - // load the SCompIdx part - pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); - if (pIndices == NULL) { // TODO: deal with the ERROR - } - if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { // TODO: deal with the ERROR here - } - - // sendfile those not need to changed table content - lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, - SEEK_SET); - lseek(tFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); - for (int ttid = 0; ttid < tid; ttid++) { - SCompIdx * tIdx= &pIndices[ttid]; - if (tIdx->len <= 0) continue; - if (isNewLastFile && tIdx->hasLast) { - // TODO: Need to load the SCompBlock part and copy to new last file - pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, tIdx->len); - if (pCompInfo == NULL) { /* TODO */} - if (tsdbLoadCompBlocks(pGroup, tIdx, (void *)pCompInfo) < 0) {/* TODO */} - SCompBlock *pLastBlock = TSDB_COMPBLOCK_AT(pCompInfo, tIdx->numOfSuperBlocks - 1); - int numOfSubBlocks = pLastBlock->numOfSubBlocks; - assert(pLastBlock->last); - if (tsdbCopyCompBlockToFile(&pGroup->files[TSDB_FILE_TYPE_LAST], &lFile, pCompInfo, tIdx->numOfSuperBlocks, 1 /* isOutLast*/) < 0) {/* TODO */} - { - if (numOfSubBlocks > 1) tIdx->len -= (sizeof(SCompBlock) * numOfSubBlocks); - tIdx->checksum = 0; - } - write(tFile.fd, (void *)pCompInfo, tIdx->len); - tFile.size += tIdx->len; - } else { - sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, tFile.fd, NULL, tIdx->len); - tFile.size += tIdx->len; - } - } - - hasDataToCommit = 1; - } - - SCompIdx *pIdx = &pIndices[tid]; - - /* The first time to write to the table, need to decide - * if it is neccessary to load the SComplock part. If it - * is needed, just load it, or, just use sendfile and - * append it. - */ - if (numOfWrites == 0 && pIdx->offset > 0) { - if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { - pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); - if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { - // TODO: deal with the ERROR here - } - if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; - } else { - // TODO: sendfile the prefix part - } - } - - // if (tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols) < 0) { - // // TODO: deal with the ERROR here - // } - - // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); - - - // if (1 /* the SCompBlock part is not loaded*/) { - // // Append to .data file generate a SCompBlock and record it - // } else { - // } - - // // TODO: need to reset the pCols - - numOfWrites++; - } - - if (pCols->numOfPoints > 0) { - // TODO: still has data to commit, commit it - } - - if (1/* SCompBlock part is loaded, write it to .head file*/) { - // TODO - } else { - // TODO: use sendfile send the old part and append the newly added part - } + // Create and open files for commit + tsdbGetDataDirName(pRepo, dataDir); + if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) {/* TODO */} + pGroup = tsdbOpenFilesForCommit(pFileH, fid); + if (pGroup == NULL) {/* TODO */} + tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &hFile, 1, 0); + if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { + // TODO: make it not to write the last file every time + tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); + isNewLastFile = 1; } + // Load the SCompIdx + pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) *pCfg->maxTables); + if (pIndices == NULL) {/* TODO*/} + if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) {/* TODO */} + +// Loop to commit data in each table + for (int tid = 0; tid < pCfg->maxTables; tid++) { + STable *pTable = pMeta->tables[tid]; + SSkipListIterator *pIter = iters[tid]; + SCompIdx *pIdx = &pIndices[tid]; + + if (!tsdbHasDataInRange(pIter, minKey, maxKey)) { + // This table does not have data in this range, just copy its head part and last + // part (if neccessary) to new file + if (pIdx->len != 0) { // has old data + if (isNewLastFile && pIdx->hasLast) { + // Need to load SCompBlock part and copy to new file + if ((pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len)) == NULL) {/* TODO */} + if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) {/* TODO */} + + // Copy the last block from old last file to new file + // tsdbCopyBlockData() + } else { + pIdx->offset = lseek(hFile.fd, 0, SEEK_CUR); + sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, hFile.fd, NULL, pIdx->len); + hFile.size += pIdx->len; + } + } + continue; + } + + // while () { + + // } + } + + // for (int tid = 0; tid < pCfg->maxTables; tid++) { + // STable * pTable = pMeta->tables[tid]; + // SSkipListIterator *pIter = iters[tid]; + // int isLoadCompBlocks = 0; + // char dataDir[128] = "\0"; + + // if (pIter == NULL) { + // if (hasDataToCommit && isNewLastFile()) + // continue; + // } + // tdInitDataCols(pCols, pTable->schema); + + // int numOfWrites = 0; + // int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5; // We keep 20% of space for merge purpose + // // Loop to read columns from cache + // while (tsdbReadRowsFromCache(pIter, maxKey, maxRowsToRead, pCols)) { + // if (!hasDataToCommit) { + // // There are data to commit to this fileId, we need to create/open it for read/write. + // // At the meantime, we set the flag to prevent further create/open operations + // tsdbGetDataDirName(pRepo, dataDir); + + // if (tsdbCreateFGroup(pFileH, dataDir, fid, pCfg->maxTables) < 0) { + // // TODO: deal with the ERROR here + // } + // // Open files for commit + // pGroup = tsdbOpenFilesForCommit(pFileH, fid); + // if (pGroup == NULL) { + // // TODO: deal with the ERROR here + // } + // // TODO: open .h file and if neccessary, open .l file + // tsdbCreateFile(dataDir, fid, ".h", pCfg->maxTables, &tFile, 1, 0); + // if (1 /*pGroup->files[TSDB_FILE_TYPE_LAST].size > TSDB_MAX_LAST_FILE_SIZE*/) { + // // TODO: make it not to write the last file every time + // tsdbCreateFile(dataDir, fid, ".l", pCfg->maxTables, &lFile, 0, 0); + // isNewLastFile = 1; + // } + + // // load the SCompIdx part + // pIndices = (SCompIdx *)malloc(sizeof(SCompIdx) * pCfg->maxTables); + // if (pIndices == NULL) { // TODO: deal with the ERROR + // } + // if (tsdbLoadCompIdx(pGroup, (void *)pIndices, pCfg->maxTables) < 0) { // TODO: deal with the ERROR here + // } + + // // sendfile those not need to changed table content + // lseek(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, + // SEEK_SET); + // lseek(tFile.fd, TSDB_FILE_HEAD_SIZE + sizeof(SCompIdx) * pCfg->maxTables, SEEK_SET); + // for (int ttid = 0; ttid < tid; ttid++) { + // SCompIdx * tIdx= &pIndices[ttid]; + // if (tIdx->len <= 0) continue; + // if (isNewLastFile && tIdx->hasLast) { + // // TODO: Need to load the SCompBlock part and copy to new last file + // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, tIdx->len); + // if (pCompInfo == NULL) { /* TODO */} + // if (tsdbLoadCompBlocks(pGroup, tIdx, (void *)pCompInfo) < 0) {/* TODO */} + // SCompBlock *pLastBlock = TSDB_COMPBLOCK_AT(pCompInfo, tIdx->numOfSuperBlocks - 1); + // int numOfSubBlocks = pLastBlock->numOfSubBlocks; + // assert(pLastBlock->last); + // if (tsdbCopyCompBlockToFile(&pGroup->files[TSDB_FILE_TYPE_LAST], &lFile, pCompInfo, tIdx->numOfSuperBlocks, 1 /* isOutLast*/) < 0) {/* TODO */} + // { + // if (numOfSubBlocks > 1) tIdx->len -= (sizeof(SCompBlock) * numOfSubBlocks); + // tIdx->checksum = 0; + // } + // write(tFile.fd, (void *)pCompInfo, tIdx->len); + // tFile.size += tIdx->len; + // } else { + // sendfile(pGroup->files[TSDB_FILE_TYPE_HEAD].fd, tFile.fd, NULL, tIdx->len); + // tFile.size += tIdx->len; + // } + // } + + // hasDataToCommit = 1; + // } + + // SCompIdx *pIdx = &pIndices[tid]; + + // /* The first time to write to the table, need to decide + // * if it is neccessary to load the SComplock part. If it + // * is needed, just load it, or, just use sendfile and + // * append it. + // */ + // if (numOfWrites == 0 && pIdx->offset > 0) { + // if (dataColsKeyFirst(pCols) <= pIdx->maxKey || pIdx->hasLast) { + // pCompInfo = (SCompInfo *)realloc((void *)pCompInfo, pIdx->len); + // if (tsdbLoadCompBlocks(pGroup, pIdx, (void *)pCompInfo) < 0) { + // // TODO: deal with the ERROR here + // } + // if (pCompInfo->uid == pTable->tableId.uid) isLoadCompBlocks = 1; + // } else { + // // TODO: sendfile the prefix part + // } + // } + + // int numOfPointsWritten = tsdbWriteBlockToFile(pGroup, pCompInfo, pIdx, isLoadCompBlocks, pBlock, pCols); + // if (numOfPointsWritten < 0) { + // // TODO: deal with the ERROR here + // } + + // // pCompInfo = tsdbMergeBlock(pCompInfo, pBlock); + + + // // if (1 /* the SCompBlock part is not loaded*/) { + // // // Append to .data file generate a SCompBlock and record it + // // } else { + // // } + + // // // TODO: need to reset the pCols + + // numOfWrites++; + // } + + // if (pCols->numOfPoints > 0) { + // // TODO: still has data to commit, commit it + // } + + // if (1/* SCompBlock part is loaded, write it to .head file*/) { + // // TODO + // } else { + // // TODO: use sendfile send the old part and append the newly added part + // } + // } + // Write the SCompIdx part // Close all files and return @@ -1018,5 +1075,25 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SSkipListIterator **iters if (pIndices) free(pIndices); if (pCompInfo) free(pCompInfo); + return 0; +} + +static int tsdbHasDataInRange(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey) { + if (pIter == NULL) return 0; + + SSkipListNode *node = tSkipListIterGet(pIter); + if (node == NULL) return 0; + + SDataRow row = SL_GET_NODE_DATA(node); + if (dataRowKey(row) >= minKey && dataRowKey(row) <= maxKey) return 1; + + return 0; +} + +static int tsdbHasDataToCommit(SSkipListIterator **iters, int nIters, TSKEY minKey, TSKEY maxKey) { + for (int i = 0; i < nIters; i++) { + SSkipListIterator *pIter = iters[i]; + if (tsdbHasDataInRange(pIter, minKey, maxKey)) return 1; + } return 0; } \ No newline at end of file