diff --git a/src/common/inc/dataformat.h b/src/common/inc/dataformat.h index aff239712b..1f46c68abc 100644 --- a/src/common/inc/dataformat.h +++ b/src/common/inc/dataformat.h @@ -81,11 +81,13 @@ STSchema *tdDecodeSchema(void **psrc); */ typedef void *SDataRow; + #define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t)) #define dataRowLen(r) (*(int32_t *)(r)) #define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t))) #define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE) +#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r))) #define dataRowSetLen(r, l) (dataRowLen(r) = (l)) #define dataRowSetFLen(r, l) (dataRowFLen(r) = (l)) #define dataRowIdx(r, i) ((char *)(r) + i) diff --git a/src/vnode/tsdb/inc/tsdbCache.h b/src/vnode/tsdb/inc/tsdbCache.h index 3bffa1c6a9..2676829c75 100644 --- a/src/vnode/tsdb/inc/tsdbCache.h +++ b/src/vnode/tsdb/inc/tsdbCache.h @@ -17,6 +17,7 @@ #include +#include "taosdef.h" #include "tlist.h" #ifdef __cplusplus @@ -38,18 +39,25 @@ typedef struct { SList * memPool; } STsdbCachePool; +typedef struct { + TSKEY keyFirst; + TSKEY keyLast; + int64_t numOfPoints; + SList * list; +} SCacheMem; + typedef struct { int maxBytes; int cacheBlockSize; STsdbCachePool pool; STsdbCacheBlock *curBlock; - SList * mem; - SList * imem; + SCacheMem * mem; + SCacheMem * imem; } STsdbCache; STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize); void tsdbFreeCache(STsdbCache *pCache); -void * tsdbAllocFromCache(STsdbCache *pCache, int bytes); +void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/inc/tsdbFile.h b/src/vnode/tsdb/inc/tsdbFile.h index 89159a06e7..2324ead451 100644 --- a/src/vnode/tsdb/inc/tsdbFile.h +++ b/src/vnode/tsdb/inc/tsdbFile.h @@ -18,11 +18,15 @@ #include #include "taosdef.h" +#include "tglobalcfg.h" #ifdef __cplusplus extern "C" { #endif +#define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) +#define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3) + typedef enum { TSDB_FILE_TYPE_HEAD = 0, // .head file type TSDB_FILE_TYPE_DATA, // .data file type @@ -66,6 +70,7 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32 void tsdbCloseFile(STsdbFileH *pFileH); int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables); +void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey); #ifdef __cplusplus } #endif diff --git a/src/vnode/tsdb/inc/tsdbMeta.h b/src/vnode/tsdb/inc/tsdbMeta.h index 38f0818dfb..aba506f30d 100644 --- a/src/vnode/tsdb/inc/tsdbMeta.h +++ b/src/vnode/tsdb/inc/tsdbMeta.h @@ -33,20 +33,25 @@ extern "C" { #define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL) +typedef struct { + TSKEY keyFirst; + TSKEY keyLast; + int32_t numOfPoints; + void * pData; +} SMemTable; + // ---------- TSDB TABLE DEFINITION typedef struct STable { - int8_t type; - STableId tableId; - int32_t superUid; // Super table UID - int32_t sversion; - STSchema *schema; - STSchema *tagSchema; - SDataRow tagVal; - union { - void *pData; // For TSDB_NORMAL_TABLE and TSDB_CHILD_TABLE, it is the skiplist for cache data - void *pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index - } content; - void * iData; // Skiplist to commit + int8_t type; + STableId tableId; + int32_t superUid; // Super table UID + int32_t sversion; + STSchema * schema; + STSchema * tagSchema; + SDataRow tagVal; + SMemTable * mem; + SMemTable * imem; + void * pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index void * eventHandler; // TODO void * streamHandler; // TODO struct STable *next; // TODO: remove the next @@ -69,6 +74,8 @@ typedef struct { void *map; // table map of (uid ===> table) SMetaFile *mfh; // meta file handle + int maxRowBytes; + int maxCols; } STsdbMeta; STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables); @@ -94,8 +101,9 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta); int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg); int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId); STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId); -int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable); +// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable); STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid); +char *getTupleKey(const void * data); #ifdef __cplusplus } diff --git a/src/vnode/tsdb/src/tsdbCache.c b/src/vnode/tsdb/src/tsdbCache.c index 6a0741dced..3b1d44e6c7 100644 --- a/src/vnode/tsdb/src/tsdbCache.c +++ b/src/vnode/tsdb/src/tsdbCache.c @@ -17,7 +17,7 @@ #include "tsdbCache.h" static int tsdbAllocBlockFromPool(STsdbCache *pCache); -static void tsdbFreeBlockList(SList *list); +static void tsdbFreeBlockList(SCacheMem *mem); STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache)); @@ -46,11 +46,8 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) { tdListAppend(pPool->memPool, (void *)(&pBlock)); } - pCache->mem = tdListNew(sizeof(STsdbCacheBlock *)); - if (pCache->mem == NULL) goto _err; - - pCache->imem = tdListNew(sizeof(STsdbCacheBlock *)); - if (pCache->imem == NULL) goto _err; + pCache->mem = NULL; + pCache->imem = NULL; return pCache; @@ -66,11 +63,20 @@ void tsdbFreeCache(STsdbCache *pCache) { free(pCache); } -void *tsdbAllocFromCache(STsdbCache *pCache, int bytes) { +void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) { if (pCache == NULL) return NULL; if (bytes > pCache->cacheBlockSize) return NULL; - if (isListEmpty(pCache->mem)) { + if (pCache->mem == NULL) { // Create a new one + pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem)); + if (pCache->mem == NULL) return NULL; + pCache->mem->keyFirst = INT64_MAX; + pCache->mem->keyLast = 0; + pCache->mem->numOfPoints = 0; + pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *)); + } + + if (isListEmpty(pCache->mem->list)) { if (tsdbAllocBlockFromPool(pCache) < 0) { // TODO: deal with the error } @@ -86,12 +92,16 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes) { pCache->curBlock->offset += bytes; pCache->curBlock->remain -= bytes; memset(ptr, 0, bytes); + if (key < pCache->mem->keyFirst) pCache->mem->keyFirst = key; + if (key > pCache->mem->keyLast) pCache->mem->keyLast = key; + pCache->mem->numOfPoints++; return ptr; } -static void tsdbFreeBlockList(SList *list) { - if (list == NULL) return; +static void tsdbFreeBlockList(SCacheMem *mem) { + if (mem == NULL) return; + SList * list = mem->list; SListNode * node = NULL; STsdbCacheBlock *pBlock = NULL; while ((node = tdListPopHead(list)) != NULL) { @@ -100,6 +110,7 @@ static void tsdbFreeBlockList(SList *list) { listNodeFree(node); } tdListFree(list); + free(mem); } static int tsdbAllocBlockFromPool(STsdbCache *pCache) { @@ -114,7 +125,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) { pBlock->offset = 0; pBlock->remain = pCache->cacheBlockSize; - tdListAppendNode(pCache->mem, node); + tdListAppendNode(pCache->mem->list, node); pCache->curBlock = pBlock; return 0; diff --git a/src/vnode/tsdb/src/tsdbFile.c b/src/vnode/tsdb/src/tsdbFile.c index 8a7e40cabd..1c91c03b44 100644 --- a/src/vnode/tsdb/src/tsdbFile.c +++ b/src/vnode/tsdb/src/tsdbFile.c @@ -22,15 +22,11 @@ #include #include -#include "tglobalcfg.h" #include "tsdbFile.h" #define TSDB_FILE_HEAD_SIZE 512 #define TSDB_FILE_DELIMITER 0xF00AFA0F -#define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) -#define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3) - typedef struct { int32_t len; int32_t padding; // For padding purpose @@ -228,7 +224,7 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32 return pTsdbFileH; } -static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, +void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey) { *minKey = fileId * daysPerFile * tsMsPerDay[precision]; *maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1; diff --git a/src/vnode/tsdb/src/tsdbMain.c b/src/vnode/tsdb/src/tsdbMain.c index ed95eac5bc..3a4e427798 100644 --- a/src/vnode/tsdb/src/tsdbMain.c +++ b/src/vnode/tsdb/src/tsdbMain.c @@ -44,6 +44,7 @@ #define TSDB_CFG_FILE_NAME "CONFIG" #define TSDB_DATA_DIR_NAME "data" +#define TSDB_DEFAULT_FILE_BLOCK_ROW_OPTION 0.7 enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING }; @@ -311,14 +312,14 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) { // Loop to move pData to iData for (int i = 0; i < pRepo->config.maxTables; i++) { STable *pTable = pRepo->tsdbMeta->tables[i]; - if (pTable != NULL) { - void *pData = pTable->content.pData; - pTable->content.pData = NULL; - pTable->iData = pData; + if (pTable != NULL && pTable->mem != NULL) { + pTable->imem = pTable->mem; + pTable->mem = NULL; } } - // Loop to move mem to imem - tdListMove(pRepo->tsdbCache->mem, pRepo->tsdbCache->imem); + // TODO: Loop to move mem to imem + pRepo->tsdbCache->imem = pRepo->tsdbCache->mem; + pRepo->tsdbCache->mem = NULL; pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo); pthread_mutex_unlock(&(pRepo->mutex)); @@ -669,10 +670,19 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable int32_t level = 0; int32_t headSize = 0; - tSkipListRandNodeInfo(pTable->content.pData, &level, &headSize); + if (pTable->mem == NULL) { + pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable)); + if (pTable->mem == NULL) return -1; + pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); + pTable->mem->keyFirst = INT64_MAX; + pTable->mem->keyLast = 0; + } + tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize); + + TSKEY key = dataRowKey(row); // Copy row into the memory - SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row)); + SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key); if (pNode == NULL) { // TODO: deal with allocate failure } @@ -681,7 +691,10 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable dataRowCpy(SL_GET_NODE_DATA(pNode), row); // Insert the skiplist node into the data - tsdbInsertRowToTableImpl(pNode, pTable); + tSkipListPut(pTable->mem->pData, pNode); + if (key > pTable->mem->keyLast) pTable->mem->keyLast = key; + if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key; + pTable->mem->numOfPoints++; return 0; } @@ -705,21 +718,60 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) { return 0; } +static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey, int maxRowsToRead, void *dst) { + int numOfRows = 0; + do { + SSkipListNode *node = tSkiplistIterGet(pIter); + SDataRow row = SL_GET_NODE_DATA(node); + if (dataRowKey(row) > maxKey) break; + } while (tSkipListIterNext(pIter)); + return numOfRows; +} + +// Commit to file static void *tsdbCommitToFile(void *arg) { // TODO - STsdbRepo *pRepo = (STsdbRepo *)arg; - STsdbMeta *pMeta = pRepo->tsdbMeta; - for (int i = 0; i < pRepo->config.maxTables; i++) { - STable *pTable = pMeta->tables[i]; - if (pTable == NULL) continue; - SSkipListIterator *pIter = tSkipListCreateIter(pTable->iData); - while (tSkipListIterNext(pIter)) { - SSkipListNode *node = tSkipListIterGet(pIter); - SDataRow row = SL_GET_NODE_DATA(node); - int k = 0; + STsdbRepo * pRepo = (STsdbRepo *)arg; + STsdbMeta * pMeta = pRepo->tsdbMeta; + STsdbCache *pCache = pRepo->tsdbCache; + STsdbRepo * pCfg = &(pRepo->config); + if (pCache->imem == NULL) return; + int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst); + int efid = tsdbGetKeyFileId(pCache->imem->keyLast); + SSkipListIterator **iters = (SSkipListIterator **)calloc(pCfg->maxTables, sizeof(SSkipListIterator *)); + if (iters == NULL) { + // TODO: deal with the error + return NULL; + } + + for (int fid = sfid; fid <= efid; fid++) { + TSKEY minKey = 0, maxKey = 0; + tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey); + + for (int tid = 0; tid < pCfg->maxTables; tid++) { + STable *pTable = pMeta->tables[tid]; + if (pTable == NULL || pTable->imem == NULL) continue; + if (iters[tid] == NULL) { // create table iterator + iters[tid] = tSkipListCreateIter(pTable->imem); + // TODO: deal with the error + if (iters[tid] == NULL) break; + if (!tSkipListIterNext(iters[tid])) { + // assert(0); + } + } + + // Loop the iterator + // tsdbReadRowsFromCache(); } } + // Free the iterator + for (int tid = 0; tid < pCfg->maxTables; tid++) { + if (iters[tid] != NULL) tSkipListDestroyIter(iters[tid]); + } + + free(iters); + return NULL; } \ No newline at end of file diff --git a/src/vnode/tsdb/src/tsdbMeta.c b/src/vnode/tsdb/src/tsdbMeta.c index 98dcd45bed..b8b5450d23 100644 --- a/src/vnode/tsdb/src/tsdbMeta.c +++ b/src/vnode/tsdb/src/tsdbMeta.c @@ -18,7 +18,6 @@ static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable); static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable); static int tsdbEstimateTableEncodeSize(STable *pTable); -static char * getTupleKey(const void *data); /** * Encode a TSDB table object as a binary content @@ -102,12 +101,9 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) { if (pTable == NULL) return -1; if (pTable->type == TSDB_SUPER_TABLE) { - pTable->content.pIndex = + pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, getTupleKey); - } else { - pTable->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, - TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); - } + } tsdbAddTableToMeta(pMeta, pTable, false); @@ -137,6 +133,8 @@ STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables) { pMeta->nTables = 0; pMeta->superList = NULL; pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *)); + pMeta->maxRowBytes = 0; + pMeta->maxCols = 0; if (pMeta->tables == NULL) { free(pMeta); return NULL; @@ -208,10 +206,10 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { super->schema = tdDupSchema(pCfg->schema); super->tagSchema = tdDupSchema(pCfg->tagSchema); super->tagVal = tdDataRowDup(pCfg->tagValues); - super->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, + super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, getTupleKey); // Allow duplicate key, no lock - if (super->content.pIndex == NULL) { + if (super->pIndex == NULL) { tdFreeSchema(super->schema); tdFreeSchema(super->tagSchema); tdFreeDataRow(super->tagVal); @@ -223,7 +221,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { } } - STable *table = (STable *)malloc(sizeof(STable)); + STable *table = (STable *)calloc(1, sizeof(STable)); if (table == NULL) { if (newSuper) tsdbFreeTable(super); return -1; @@ -239,7 +237,6 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) { table->superUid = -1; table->schema = tdDupSchema(pCfg->schema); } - table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey); // Register to meta if (newSuper) tsdbAddTableToMeta(pMeta, super, true); @@ -299,10 +296,10 @@ int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) { return 0; } -int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) { - tSkipListPut(pTable->content.pData, pNode); - return 0; -} +// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) { +// tSkipListPut(pTable->mem->pData, pNode); +// return 0; +// } static int tsdbFreeTable(STable *pTable) { // TODO: finish this function @@ -314,10 +311,8 @@ static int tsdbFreeTable(STable *pTable) { // Free content if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) { - tSkipListDestroy(pTable->content.pIndex); - } else { - tSkipListDestroy(pTable->content.pData); - } + tSkipListDestroy(pTable->pIndex); + } free(pTable); return 0; @@ -404,7 +399,7 @@ static int tsdbEstimateTableEncodeSize(STable *pTable) { return size; } -static char *getTupleKey(const void * data) { +char *getTupleKey(const void * data) { SDataRow row = (SDataRow)data; return dataRowAt(row, TD_DATA_ROW_HEAD_SIZE); diff --git a/src/vnode/tsdb/tests/tsdbTests.cpp b/src/vnode/tsdb/tests/tsdbTests.cpp index 42a22553c7..de58d6337c 100644 --- a/src/vnode/tsdb/tests/tsdbTests.cpp +++ b/src/vnode/tsdb/tests/tsdbTests.cpp @@ -40,7 +40,6 @@ TEST(TsdbTest, tableEncodeDecode) { ASSERT_EQ(pTable->superUid, tTable->superUid); ASSERT_EQ(pTable->sversion, tTable->sversion); ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0); - ASSERT_EQ(tTable->content.pData, nullptr); } TEST(TsdbTest, createRepo) { @@ -72,7 +71,7 @@ TEST(TsdbTest, createRepo) { tsdbCreateTable(pRepo, &tCfg); // // 3. Loop to write some simple data - int nRows = 100; + int nRows = 1000; int rowsPerSubmit = 10; int64_t start_time = 1584081000000;