TD-353
This commit is contained in:
parent
d5c20d7b53
commit
112ce3b1a4
|
@ -13,11 +13,12 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include "tskiplist.h"
|
|
||||||
#include "tsdb.h"
|
|
||||||
#include "taosdef.h"
|
|
||||||
#include "hash.h"
|
#include "hash.h"
|
||||||
|
#include "taosdef.h"
|
||||||
|
#include "tchecksum.h"
|
||||||
|
#include "tsdb.h"
|
||||||
#include "tsdbMain.h"
|
#include "tsdbMain.h"
|
||||||
|
#include "tskiplist.h"
|
||||||
|
|
||||||
// ------------------ OUTER FUNCTIONS ------------------
|
// ------------------ OUTER FUNCTIONS ------------------
|
||||||
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
|
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
|
||||||
|
@ -201,7 +202,7 @@ _err:
|
||||||
|
|
||||||
// ------------------ INTERNAL FUNCTIONS ------------------
|
// ------------------ INTERNAL FUNCTIONS ------------------
|
||||||
STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
|
STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
|
||||||
STsdbMeta* pMeta = (STsdbMeta*)calloc(1, sizeof(*pMeta));
|
STsdbMeta *pMeta = (STsdbMeta *)calloc(1, sizeof(*pMeta));
|
||||||
if (pMeta == NULL) {
|
if (pMeta == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -209,6 +210,7 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
|
||||||
|
|
||||||
int code = pthread_rwlock_init(&pMeta->rwLock, NULL);
|
int code = pthread_rwlock_init(&pMeta->rwLock, NULL);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
tsdbError("vgId:%d failed to init TSDB meta r/w lock since %s", pCfg->tsdbId, strerror(code));
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
@ -233,7 +235,7 @@ STsdbMeta *tsdbNewMeta(STsdbCfg *pCfg) {
|
||||||
|
|
||||||
return pMeta;
|
return pMeta;
|
||||||
|
|
||||||
_err;
|
_err:
|
||||||
tsdbFreeMeta(pMeta);
|
tsdbFreeMeta(pMeta);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -249,11 +251,50 @@ void tsdbFreeMeta(STsdbMeta *pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbOpenMeta(STsdbRepo *pRepo) {
|
int tsdbOpenMeta(STsdbRepo *pRepo) {
|
||||||
// TODO
|
char * fname = NULL;
|
||||||
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
ASSERT(pMeta != NULL);
|
||||||
|
|
||||||
|
fname = tsdbGetMetaFileName(pRepo->rootDir);
|
||||||
|
if (fname == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
pMeta->pStore = tdOpenKVStore(fname, tsdbRestoreTable, tsdbOrgMeta, (void *)pRepo);
|
||||||
|
if (pMeta->pStore == NULL) {
|
||||||
|
tsdbError("vgId:%d failed to open TSDB meta while open the kv store since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbTrace("vgId:%d open TSDB meta succeed", REPO_ID(pRepo));
|
||||||
|
tfree(fname);
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
_err:
|
||||||
|
tfree(fname);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbCloseMeta(STsdbRepo *pRepo) {
|
int tsdbCloseMeta(STsdbRepo *pRepo) {
|
||||||
// TODO
|
STsdbCfg * pCfg = &pRepo->config;
|
||||||
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
SListNode *pNode = NULL;
|
||||||
|
STable * pTable = NULL;
|
||||||
|
|
||||||
|
if (pMeta == NULL) return 0;
|
||||||
|
tdCloseKVStore(pMeta->pStore);
|
||||||
|
for (int i = 1; i < pCfg->maxTables; i++) {
|
||||||
|
tsdbFreeTable(pMeta->tables[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
while ((pNode = tdListPopHead(pMeta->superList)) != NULL) {
|
||||||
|
tdListNodeGetData(pMeta->superList, pNode, (void *)(&pTable));
|
||||||
|
tsdbFreeTable(pTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbTrace("vgId:%d TSDB meta is closed", REPO_ID(pRepo));
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) {
|
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) {
|
||||||
|
@ -387,6 +428,7 @@ static void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static STable *tsdbDecodeTable(void *cont, int contLen) {
|
static STable *tsdbDecodeTable(void *cont, int contLen) {
|
||||||
|
// TODO
|
||||||
STable *pTable = (STable *)calloc(1, sizeof(STable));
|
STable *pTable = (STable *)calloc(1, sizeof(STable));
|
||||||
if (pTable == NULL) return NULL;
|
if (pTable == NULL) return NULL;
|
||||||
|
|
||||||
|
@ -432,6 +474,13 @@ static STable *tsdbDecodeTable(void *cont, int contLen) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pTable->lastKey = TSKEY_INITIAL_VAL;
|
pTable->lastKey = TSKEY_INITIAL_VAL;
|
||||||
|
|
||||||
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
|
STColumn *pColSchema = schemaColAt(pTable->tagSchema, 0);
|
||||||
|
pTable->pIndex =
|
||||||
|
tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes, 1, 0, 1, getTagIndexKey);
|
||||||
|
}
|
||||||
|
|
||||||
return pTable;
|
return pTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -446,24 +495,27 @@ static int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
|
static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
|
||||||
STsdbMeta *pMeta = (STsdbMeta *)pHandle;
|
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
|
||||||
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
|
||||||
|
if (!taosCheckChecksumWhole((uint8_t *)cont, contLen)) {
|
||||||
|
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
STable *pTable = tsdbDecodeTable(cont, contLen);
|
STable *pTable = tsdbDecodeTable(cont, contLen);
|
||||||
if (pTable == NULL) return -1;
|
if (pTable == NULL) return -1;
|
||||||
|
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
if (tsdbAddTableToMeta(pMeta, pTable, false) < 0) return -1;
|
||||||
STColumn* pColSchema = schemaColAt(pTable->tagSchema, 0);
|
|
||||||
pTable->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes,
|
|
||||||
1, 0, 1, getTagIndexKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbAddTableToMeta(pMeta, pTable, false);
|
|
||||||
|
|
||||||
|
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is restored from file", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||||
|
TABLE_TID(pTable), TALBE_UID(pTable));
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbOrgMeta(void *pHandle) {
|
static void tsdbOrgMeta(void *pHandle) {
|
||||||
STsdbMeta *pMeta = (STsdbMeta *)pHandle;
|
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
|
||||||
|
STsdbCfg * pCfg = &pRepo->config;
|
||||||
|
|
||||||
for (int i = 1; i < pMeta->maxTables; i++) {
|
for (int i = 1; i < pMeta->maxTables; i++) {
|
||||||
STable *pTable = pMeta->tables[i];
|
STable *pTable = pMeta->tables[i];
|
||||||
|
|
|
@ -523,13 +523,14 @@ static int tdRestoreKVStore(SKVStore *pStore) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!taosCheckChecksumWhole((uint8_t *)buf, pRecord->size)) {
|
if (pStore->iFunc) {
|
||||||
uError("file %s has checksum error, offset " PRId64 " size %d", pStore->fname, pRecord->offset, pRecord->size);
|
if ((*pStore->iFunc)(pStore->appH, buf, pRecord->size) < 0) {
|
||||||
terrno = TSDB_CODE_COM_FILE_CORRUPTED;
|
uError("failed to restore record uid %" PRIu64 " in kv store %s at offset %" PRId64 " size %" PRId64
|
||||||
goto _err;
|
" since %s",
|
||||||
|
pStore->fname, pRecord->uid, pRecord->offset, pRecord->size, tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStore->iFunc) (*pStore->iFunc)(pStore->appH, buf, pRecord->size);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStore->aFunc) (*pStore->aFunc)(pStore->appH);
|
if (pStore->aFunc) (*pStore->aFunc)(pStore->appH);
|
||||||
|
|
Loading…
Reference in New Issue