From a60fc0f7ce8cece41e266ae5d18b2c793c10df31 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 18 Jun 2020 08:00:03 +0000 Subject: [PATCH] TD-353 --- src/tsdb/inc/tsdbMain.h | 2 +- src/tsdb/src/tsdbBuffer.c | 3 +- src/tsdb/src/tsdbMain.c | 103 +++++++++++++++++++++++--------------- src/tsdb/src/tsdbMeta.c | 38 ++++++++------ src/util/src/hash.c | 4 +- src/util/src/tkvstore.c | 12 ++--- src/util/src/tlist.c | 6 ++- 7 files changed, 97 insertions(+), 71 deletions(-) diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 5b17e53904..0a55b9050c 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -298,7 +298,7 @@ STSchema* tsdbGetTableSchema(STable* pTable); STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid); STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version); STSchema* tsdbGetTableTagSchema(STable* pTable); -int tsdbUpdateTable(STsdbMeta* pMeta, STable* pTable, STableCfg* pCfg); +int tsdbUpdateTable(STsdbRepo* pRepo, STable* pTable, STableCfg* pCfg); int tsdbWLockRepoMeta(STsdbRepo* pRepo); int tsdbRLockRepoMeta(STsdbRepo* pRepo); int tsdbUnlockRepoMeta(STsdbRepo* pRepo); diff --git a/src/tsdb/src/tsdbBuffer.c b/src/tsdb/src/tsdbBuffer.c index 0da0df9aa0..686fc4bbec 100644 --- a/src/tsdb/src/tsdbBuffer.c +++ b/src/tsdb/src/tsdbBuffer.c @@ -67,7 +67,7 @@ int tsdbOpenBufPool(STsdbRepo *pRepo) { ASSERT(pPool != NULL); - pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; + pPool->bufBlockSize = pCfg->cacheBlockSize * 1024 * 1024; // MB pPool->tBufBlocks = pCfg->totalBlocks; pPool->nBufBlocks = 0; pPool->index = 0; @@ -106,6 +106,7 @@ void tsdbCloseBufPool(STsdbRepo *pRepo) { while ((pNode = tdListPopHead(pBufPool->bufBlockList)) != NULL) { tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock)); tsdbFreeBufBlock(pBufBlock); + free(pNode); } } diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 6a6a78cd4e..faf69d87d3 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -67,6 +67,7 @@ static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks); static int keyFGroupCompFunc(const void *key, const void *fgroup); static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg); static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg); +static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); // Function declaration int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { @@ -81,7 +82,7 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) { if (tsdbSetRepoEnv(rootDir, pCfg) < 0) return -1; tsdbTrace( - "vgId%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d maxTables %d daysPerFile %d keep " + "vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d maxTables %d daysPerFile %d keep " "%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d", pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->maxTables, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression); @@ -142,6 +143,7 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { if (repo == NULL) return; STsdbRepo *pRepo = (STsdbRepo *)repo; + int vgId = REPO_ID(pRepo); if (toCommit) { tsdbAsyncCommit(pRepo); @@ -151,7 +153,8 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) { tsdbCloseFileH(pRepo); tsdbCloseBufPool(pRepo); tsdbCloseMeta(pRepo); - tsdbTrace("vgId:%d repository is closed", REPO_ID(pRepo)); + tsdbFreeRepo(pRepo); + tsdbTrace("vgId:%d repository is closed", vgId); } int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) { @@ -170,7 +173,6 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg * while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) { if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) { - pRsp->affectedRows = htonl(affectedrows); return -1; } } @@ -534,7 +536,7 @@ static int32_t tsdbSaveConfig(char *rootDir, STsdbCfg *pCfg) { } int tlen = tsdbEncodeCfg((void *)(&pBuf), pCfg); - ASSERT(tlen + sizeof(TSCKSUM) <= TSDB_FILE_HEAD_SIZE); + ASSERT((tlen + sizeof(TSCKSUM) <= TSDB_FILE_HEAD_SIZE) && (POINTER_DISTANCE(pBuf, buf) == tlen)); taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); @@ -718,42 +720,11 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY return -1; } - // Check schema version - int32_t tversion = pBlock->sversion; - STSchema *pSchema = tsdbGetTableSchema(pTable); - ASSERT(pSchema != NULL); - int16_t nversion = schemaVersion(pSchema); - if (tversion > nversion) { - tsdbTrace("vgId:%d table %s tid %d server schema version %d is older than clien version %d, try to config.", - REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), nversion, tversion); - void *msg = (*pRepo->appH.configFunc)(REPO_ID(pRepo), TABLE_TID(pTable)); - if (msg == NULL) return -1; - - // TODO: Deal with error her - STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg); - STable * pTableUpdate = NULL; - if (pTable->type == TSDB_CHILD_TABLE) { - pTableUpdate = tsdbGetTableByUid(pMeta, pTableCfg->superUid); - } else { - pTableUpdate = pTable; - } - - int32_t code = tsdbUpdateTable(pMeta, pTableUpdate, pTableCfg); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - tsdbClearTableCfg(pTableCfg); - rpcFreeCont(msg); - - pSchema = tsdbGetTableSchemaByVersion(pTable, tversion); - } else if (tversion < nversion) { - pSchema = tsdbGetTableSchemaByVersion(pTable, tversion); - if (pSchema == NULL) { - tsdbError("vgId:%d table %s tid %d invalid schema version %d from client", REPO_ID(pRepo), - TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), tversion); - terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; - return -1; - } + // Check schema version and update schema if needed + if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) { + tsdbError("vgId:%d failed to insert data to table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + tstrerror(terrno)); + return -1; } SSubmitBlkIter blkIter = {0}; @@ -777,6 +748,8 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY (*affectedrows)++; points++; } + + STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion); pRepo->stat.pointsWritten += points * schemaNCols(pSchema); pRepo->stat.totalStorage += points * schemaVLen(pSchema); @@ -820,6 +793,7 @@ static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) { } static int tsdbRestoreInfo(STsdbRepo *pRepo) { + // TODO STsdbMeta * pMeta = pRepo->tsdbMeta; STsdbFileH *pFileH = pRepo->tsdbFileH; SFileGroup *pFGroup = NULL; @@ -943,6 +917,55 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) { return buf; } +static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) { + ASSERT(pTable != NULL); + + STSchema *pSchema = tsdbGetTableSchema(pTable); + int sversion = schemaVersion(pSchema); + + if (pBlock->sversion == sversion) return 0; + if (pBlock->sversion > sversion) { // need to config + tsdbTrace("vgId:%d table %s tid %d has version %d smaller than client version %d, try to config", REPO_ID(pRepo), + TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), sversion, pBlock->sversion); + if (pRepo->appH.configFunc) { + void *msg = (*pRepo->appH.configFunc)(REPO_ID(pRepo), TABLE_TID(pTable)); + if (msg == NULL) { + tsdbError("vgId:%d failed to config table %s tid %d since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + TABLE_TID(pTable), tstrerror(terrno)); + return -1; + } + + STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg); + if (pTableCfg == NULL) { + rpcFreeCont(msg); + return -1; + } + + if (tsdbUpdateTable(pRepo, (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable, pTableCfg) < 0) { + tsdbError("vgId:%d failed to update table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + tstrerror(terrno)); + tsdbClearTableCfg(pTableCfg); + rpcFreeCont(msg); + return -1; + } + tsdbClearTableCfg(pTableCfg); + rpcFreeCont(msg); + } else { + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + return -1; + } + } else { + if (tsdbGetTableSchemaByVersion(pTable, pBlock->sversion) == NULL) { + tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo), + pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable)); + } + terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION; + return -1; + } + + return 0; + } + static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) { // TODO // STsdbCache *pCache = pRepo->tsdbCache; diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 1bcc444628..b1e17fb259 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -74,7 +74,7 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) { // TODO if (super->type != TSDB_SUPER_TABLE) return -1; if (super->tableId.uid != pCfg->superUid) return -1; - tsdbUpdateTable(pMeta, super, pCfg); + tsdbUpdateTable(pRepo, super, pCfg); } } @@ -279,7 +279,7 @@ int tsdbUpdateTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) { STable * super = tsdbGetTableByUid(pMeta, pTableCfg->superUid); ASSERT(super != NULL); - int32_t code = tsdbUpdateTable(pMeta, super, pTableCfg); + int32_t code = tsdbUpdateTable(pRepo, super, pTableCfg); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -447,16 +447,21 @@ STSchema *tsdbGetTableTagSchema(STable *pTable) { } } -int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) { - ASSERT(pTable->type != TSDB_CHILD_TABLE); - bool isChanged = false; +int tsdbUpdateTable(STsdbRepo *pRepo, STable *pTable, STableCfg *pCfg) { + // TODO: this function can only be called when there is no query and commit on this table + ASSERT(TABLE_TYPE(pTable) != TSDB_CHILD_TABLE); + bool changed = false; + STsdbMeta *pMeta = pRepo->tsdbMeta; if (pTable->type == TSDB_SUPER_TABLE) { if (schemaVersion(pTable->tagSchema) < schemaVersion(pCfg->tagSchema)) { - int32_t code = tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema); - if (code != TSDB_CODE_SUCCESS) return code; + if (tsdbUpdateTableTagSchema(pTable, pCfg->tagSchema) < 0) { + tsdbError("vgId:%d failed to update table %s tag schema since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), + tstrerror(terrno)); + return -1; + } } - isChanged = true; + changed = true; } STSchema *pTSchema = tsdbGetTableSchema(pTable); @@ -471,18 +476,19 @@ int tsdbUpdateTable(STsdbMeta *pMeta, STable *pTable, STableCfg *pCfg) { pTable->schema[pTable->numOfSchemas - 1] = tSchema; } - isChanged = true; + pMeta->maxRowBytes = MAX(pMeta->maxRowBytes, dataRowMaxBytesFromSchema(pCfg->schema)); + pMeta->maxCols = MAX(pMeta->maxCols, schemaNCols(pCfg->schema)); + + changed = true; } - if (isChanged) { - // TODO - // char *buf = malloc(1024 * 1024); - // tsdbEncodeTable(buf, pTable); - // // tsdbInsertMetaRecord(pMeta->mfh, pTable->tableId.uid, buf, bufLen); - // free(buf); + if (changed) { + int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pTable); + void *buf = tsdbAllocBytes(pRepo, tlen); + tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable); } - return TSDB_CODE_SUCCESS; + return 0; } int tsdbWLockRepoMeta(STsdbRepo *pRepo) { diff --git a/src/util/src/hash.c b/src/util/src/hash.c index 95d0ce7081..1de013c416 100644 --- a/src/util/src/hash.c +++ b/src/util/src/hash.c @@ -309,9 +309,7 @@ void taosHashRemove(SHashObj *pHashObj, const void *key, size_t keyLen) { } void taosHashCleanup(SHashObj *pHashObj) { - if (pHashObj == NULL || pHashObj->capacity <= 0) { - return; - } + if (pHashObj == NULL) return; SHashNode *pNode, *pNext; diff --git a/src/util/src/tkvstore.c b/src/util/src/tkvstore.c index a84a5ac011..6eca141375 100644 --- a/src/util/src/tkvstore.c +++ b/src/util/src/tkvstore.c @@ -58,25 +58,21 @@ int tdCreateKVStore(char *fname) { if (fd < 0) { uError("failed to open file %s since %s", fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); - goto _err; - } - - if (tdInitKVStoreHeader(fd, fname) < 0) { - close(fd); return -1; } + if (tdInitKVStoreHeader(fd, fname) < 0) goto _err; + if (fsync(fd) < 0) { uError("failed to fsync file %s since %s", fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); - close(fd); - return -1; + goto _err; } if (close(fd) < 0) { uError("failed to close file %s since %s", fname, strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); - return -1; + goto _err; } return 0; diff --git a/src/util/src/tlist.c b/src/util/src/tlist.c index 0c1b8bd59a..9e90815f61 100644 --- a/src/util/src/tlist.c +++ b/src/util/src/tlist.c @@ -39,8 +39,10 @@ void tdListEmpty(SList *list) { } void tdListFree(SList *list) { - tdListEmpty(list); - free(list); + if (list) { + tdListEmpty(list); + free(list); + } } void tdListPrependNode(SList *list, SListNode *node) {