TD-353
This commit is contained in:
parent
b7b4148b67
commit
472b194a1e
|
@ -83,8 +83,8 @@ typedef struct {
|
||||||
#define tdFreeSchema(s) tfree((s))
|
#define tdFreeSchema(s) tfree((s))
|
||||||
|
|
||||||
STSchema *tdDupSchema(STSchema *pSchema);
|
STSchema *tdDupSchema(STSchema *pSchema);
|
||||||
void * tdEncodeSchema(void *dst, STSchema *pSchema);
|
void * tdEncodeSchema(void *buf, STSchema *pSchema);
|
||||||
STSchema *tdDecodeSchema(void **psrc);
|
void * tdDecodeSchema(void *buf, STSchema **pRSchema);
|
||||||
|
|
||||||
static FORCE_INLINE int comparColId(const void *key1, const void *key2) {
|
static FORCE_INLINE int comparColId(const void *key1, const void *key2) {
|
||||||
if (*(int16_t *)key1 > ((STColumn *)key2)->colId) {
|
if (*(int16_t *)key1 > ((STColumn *)key2)->colId) {
|
||||||
|
|
|
@ -14,6 +14,7 @@
|
||||||
*/
|
*/
|
||||||
#include "tdataformat.h"
|
#include "tdataformat.h"
|
||||||
#include "talgo.h"
|
#include "talgo.h"
|
||||||
|
#include "tcoding.h"
|
||||||
#include "wchar.h"
|
#include "wchar.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,50 +34,48 @@ STSchema *tdDupSchema(STSchema *pSchema) {
|
||||||
/**
|
/**
|
||||||
* Encode a schema to dst, and return the next pointer
|
* Encode a schema to dst, and return the next pointer
|
||||||
*/
|
*/
|
||||||
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
|
void *tdEncodeSchema(void *buf, STSchema *pSchema) {
|
||||||
|
buf = taosEncodeFixedI32(buf, schemaVersion(pSchema));
|
||||||
|
buf = taosEncodeFixedI32(buf, schemaNCols(pSchema));
|
||||||
|
|
||||||
T_APPEND_MEMBER(dst, pSchema, STSchema, version);
|
|
||||||
T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
|
|
||||||
for (int i = 0; i < schemaNCols(pSchema); i++) {
|
for (int i = 0; i < schemaNCols(pSchema); i++) {
|
||||||
STColumn *pCol = schemaColAt(pSchema, i);
|
STColumn *pCol = schemaColAt(pSchema, i);
|
||||||
T_APPEND_MEMBER(dst, pCol, STColumn, type);
|
buf = taosEncodeFixedI8(buf, colType(pCol));
|
||||||
T_APPEND_MEMBER(dst, pCol, STColumn, colId);
|
buf = taosEncodeFixedI16(buf, colColId(pCol));
|
||||||
T_APPEND_MEMBER(dst, pCol, STColumn, bytes);
|
buf = taosEncodeFixedI32(buf, colBytes(pCol)) :
|
||||||
}
|
}
|
||||||
|
|
||||||
return dst;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decode a schema from a binary.
|
* Decode a schema from a binary.
|
||||||
*/
|
*/
|
||||||
STSchema *tdDecodeSchema(void **psrc) {
|
void *tdDecodeSchema(void *buf, STSchema **pRSchema) {
|
||||||
int totalCols = 0;
|
|
||||||
int version = 0;
|
int version = 0;
|
||||||
STSchemaBuilder schemaBuilder = {0};
|
int numOfCols = 0;
|
||||||
|
|
||||||
T_READ_MEMBER(*psrc, int, version);
|
buf = taosDecodeFixedI32(buf, &version);
|
||||||
T_READ_MEMBER(*psrc, int, totalCols);
|
buf = taosDecodeFixedI32(buf, &numOfCols);
|
||||||
|
|
||||||
if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;
|
if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL;
|
||||||
|
|
||||||
for (int i = 0; i < totalCols; i++) {
|
for (int i = 0; i < numOfCols; i++) {
|
||||||
int8_t type = 0;
|
int8_t type = 0;
|
||||||
int16_t colId = 0;
|
int16_t colId = 0;
|
||||||
int32_t bytes = 0;
|
int32_t bytes = 0;
|
||||||
T_READ_MEMBER(*psrc, int8_t, type);
|
buf = taosDecodeFixedI8(buf, &type);
|
||||||
T_READ_MEMBER(*psrc, int16_t, colId);
|
buf = taosDecodeFixedI16(buf, &colId);
|
||||||
T_READ_MEMBER(*psrc, int32_t, bytes);
|
buf = taosDecodeFixedI32(buf, &bytes);
|
||||||
|
|
||||||
if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
|
if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) {
|
||||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema *pSchema = tdGetSchemaFromBuilder(&schemaBuilder);
|
*pRSchema = tdGetSchemaFromBuilder(&schemaBuilder);
|
||||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||||
return pSchema;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
|
int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) {
|
||||||
|
@ -613,6 +612,7 @@ void *tdEncodeKVRow(void *buf, SKVRow row) {
|
||||||
|
|
||||||
void *tdDecodeKVRow(void *buf, SKVRow *row) {
|
void *tdDecodeKVRow(void *buf, SKVRow *row) {
|
||||||
*row = tdKVRowDup(buf);
|
*row = tdKVRowDup(buf);
|
||||||
|
if (*row == NULL) return NULL;
|
||||||
return POINTER_SHIFT(buf, kvRowLen(*row));
|
return POINTER_SHIFT(buf, kvRowLen(*row));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -362,7 +362,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
||||||
#define TAOS_QTYPE_WAL 2
|
#define TAOS_QTYPE_WAL 2
|
||||||
#define TAOS_QTYPE_CQ 3
|
#define TAOS_QTYPE_CQ 3
|
||||||
|
|
||||||
typedef enum {
|
typedef enum : uint8_t{
|
||||||
TSDB_SUPER_TABLE = 0, // super table
|
TSDB_SUPER_TABLE = 0, // super table
|
||||||
TSDB_CHILD_TABLE = 1, // table created from super table
|
TSDB_CHILD_TABLE = 1, // table created from super table
|
||||||
TSDB_NORMAL_TABLE = 2, // ordinary table
|
TSDB_NORMAL_TABLE = 2, // ordinary table
|
||||||
|
|
|
@ -391,7 +391,6 @@ int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
||||||
STSchema *tsdbGetTableSchemaByVersion(STsdbMeta *pMeta, STable *pTable, int16_t version);
|
STSchema *tsdbGetTableSchemaByVersion(STsdbMeta *pMeta, STable *pTable, int16_t version);
|
||||||
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
|
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable);
|
||||||
|
|
||||||
#define DEFAULT_TAG_INDEX_COLUMN 0 // skip list built based on the first column of tags
|
|
||||||
|
|
||||||
int compFGroupKey(const void *key, const void *fgroup);
|
int compFGroupKey(const void *key, const void *fgroup);
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -21,6 +21,7 @@
|
||||||
#include "tskiplist.h"
|
#include "tskiplist.h"
|
||||||
|
|
||||||
#define TSDB_SUPER_TABLE_SL_LEVEL 5
|
#define TSDB_SUPER_TABLE_SL_LEVEL 5
|
||||||
|
#define DEFAULT_TAG_INDEX_COLUMN 0
|
||||||
|
|
||||||
// ------------------ OUTER FUNCTIONS ------------------
|
// ------------------ OUTER FUNCTIONS ------------------
|
||||||
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
|
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg) {
|
||||||
|
@ -316,11 +317,11 @@ int tsdbCloseMeta(STsdbRepo *pRepo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema *tsdbGetTableSchema(STsdbMeta *pMeta, STable *pTable) {
|
STSchema *tsdbGetTableSchema(STable *pTable) {
|
||||||
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) {
|
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) {
|
||||||
return pTable->schema[pTable->numOfSchemas - 1];
|
return pTable->schema[pTable->numOfSchemas - 1];
|
||||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
||||||
STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid);
|
STable *pSuper = pTable->pSuper;
|
||||||
if (pSuper == NULL) return NULL;
|
if (pSuper == NULL) return NULL;
|
||||||
return pSuper->schema[pSuper->numOfSchemas - 1];
|
return pSuper->schema[pSuper->numOfSchemas - 1];
|
||||||
} else {
|
} else {
|
||||||
|
@ -336,14 +337,9 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
|
||||||
return *(STable **)ptr;
|
return *(STable **)ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema *tsdbGetTableSchemaByVersion(STsdbMeta *pMeta, STable *pTable, int16_t version) {
|
STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t version) {
|
||||||
STable *pSearchTable = NULL;
|
STable *pSearchTable = (pTable->type == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
|
||||||
if (pTable->type == TSDB_CHILD_TABLE) {
|
if (pSearchTable == NULL) return NULL;
|
||||||
pSearchTable = tsdbGetTableByUid(pMeta, pTable->superUid);
|
|
||||||
} else {
|
|
||||||
pSearchTable = pTable;
|
|
||||||
}
|
|
||||||
ASSERT(pSearchTable != NULL);
|
|
||||||
|
|
||||||
void *ptr = taosbsearch(&version, pSearchTable->schema, pSearchTable->numOfSchemas, sizeof(STSchema *),
|
void *ptr = taosbsearch(&version, pSearchTable->schema, pSearchTable->numOfSchemas, sizeof(STSchema *),
|
||||||
tsdbCompareSchemaVersion, TD_EQ);
|
tsdbCompareSchemaVersion, TD_EQ);
|
||||||
|
@ -352,11 +348,11 @@ STSchema *tsdbGetTableSchemaByVersion(STsdbMeta *pMeta, STable *pTable, int16_t
|
||||||
return *(STSchema **)ptr;
|
return *(STSchema **)ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) {
|
STSchema *tsdbGetTableTagSchema(STable *pTable) {
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
return pTable->tagSchema;
|
return pTable->tagSchema;
|
||||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
||||||
STable *pSuper = tsdbGetTableByUid(pMeta, pTable->superUid);
|
STable *pSuper = pTable->pSuper;
|
||||||
if (pSuper == NULL) return NULL;
|
if (pSuper == NULL) return NULL;
|
||||||
return pSuper->tagSchema;
|
return pSuper->tagSchema;
|
||||||
} else {
|
} else {
|
||||||
|
@ -440,107 +436,11 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
// ------------------ LOCAL FUNCTIONS ------------------
|
// ------------------ LOCAL FUNCTIONS ------------------
|
||||||
static void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) {
|
|
||||||
if (pTable == NULL) return;
|
|
||||||
|
|
||||||
void *ptr = buf;
|
|
||||||
T_APPEND_MEMBER(ptr, pTable, STable, type);
|
|
||||||
// Encode name, todo refactor
|
|
||||||
*(int *)ptr = varDataLen(pTable->name);
|
|
||||||
ptr = (char *)ptr + sizeof(int);
|
|
||||||
memcpy(ptr, varDataVal(pTable->name), varDataLen(pTable->name));
|
|
||||||
ptr = (char *)ptr + varDataLen(pTable->name);
|
|
||||||
|
|
||||||
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, uid);
|
|
||||||
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, tid);
|
|
||||||
T_APPEND_MEMBER(ptr, pTable, STable, superUid);
|
|
||||||
|
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
|
||||||
T_APPEND_MEMBER(ptr, pTable, STable, numOfSchemas);
|
|
||||||
for (int i = 0; i < pTable->numOfSchemas; i++) {
|
|
||||||
ptr = tdEncodeSchema(ptr, pTable->schema[i]);
|
|
||||||
}
|
|
||||||
ptr = tdEncodeSchema(ptr, pTable->tagSchema);
|
|
||||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
|
||||||
ptr = tdEncodeKVRow(ptr, pTable->tagVal);
|
|
||||||
} else {
|
|
||||||
T_APPEND_MEMBER(ptr, pTable, STable, numOfSchemas);
|
|
||||||
for (int i = 0; i < pTable->numOfSchemas; i++) {
|
|
||||||
ptr = tdEncodeSchema(ptr, pTable->schema[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTable->type == TSDB_STREAM_TABLE) {
|
|
||||||
ptr = taosEncodeString(ptr, pTable->sql);
|
|
||||||
}
|
|
||||||
|
|
||||||
*contLen = (char *)ptr - buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
static STable *tsdbDecodeTable(void *cont, int contLen) {
|
|
||||||
// TODO
|
|
||||||
STable *pTable = (STable *)calloc(1, sizeof(STable));
|
|
||||||
if (pTable == NULL) return NULL;
|
|
||||||
|
|
||||||
void *ptr = cont;
|
|
||||||
T_READ_MEMBER(ptr, int8_t, pTable->type);
|
|
||||||
if (pTable->type != TSDB_CHILD_TABLE) {
|
|
||||||
pTable->schema = (STSchema **)malloc(sizeof(STSchema *) * TSDB_MAX_TABLE_SCHEMAS);
|
|
||||||
if (pTable->schema == NULL) {
|
|
||||||
free(pTable);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
int len = *(int *)ptr;
|
|
||||||
ptr = (char *)ptr + sizeof(int);
|
|
||||||
pTable->name = calloc(1, len + VARSTR_HEADER_SIZE + 1);
|
|
||||||
if (pTable->name == NULL) return NULL;
|
|
||||||
|
|
||||||
varDataSetLen(pTable->name, len);
|
|
||||||
memcpy(pTable->name->data, ptr, len);
|
|
||||||
|
|
||||||
ptr = (char *)ptr + len;
|
|
||||||
T_READ_MEMBER(ptr, uint64_t, pTable->tableId.uid);
|
|
||||||
T_READ_MEMBER(ptr, int32_t, pTable->tableId.tid);
|
|
||||||
T_READ_MEMBER(ptr, uint64_t, pTable->superUid);
|
|
||||||
|
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
|
||||||
T_READ_MEMBER(ptr, int16_t, pTable->numOfSchemas);
|
|
||||||
for (int i = 0; i < pTable->numOfSchemas; i++) {
|
|
||||||
pTable->schema[i] = tdDecodeSchema(&ptr);
|
|
||||||
}
|
|
||||||
pTable->tagSchema = tdDecodeSchema(&ptr);
|
|
||||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
|
||||||
ptr = tdDecodeKVRow(ptr, &pTable->tagVal);
|
|
||||||
} else {
|
|
||||||
T_READ_MEMBER(ptr, int16_t, pTable->numOfSchemas);
|
|
||||||
for (int i = 0; i < pTable->numOfSchemas; i++) {
|
|
||||||
pTable->schema[i] = tdDecodeSchema(&ptr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTable->type == TSDB_STREAM_TABLE) {
|
|
||||||
ptr = taosDecodeString(ptr, &(pTable->sql));
|
|
||||||
}
|
|
||||||
|
|
||||||
pTable->lastKey = TSKEY_INITIAL_VAL;
|
|
||||||
|
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
|
||||||
STColumn *pColSchema = schemaColAt(pTable->tagSchema, 0);
|
|
||||||
pTable->pIndex =
|
|
||||||
tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, pColSchema->type, pColSchema->bytes, 1, 0, 1, getTagIndexKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
return pTable;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
static int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
||||||
if (*(int16_t *)key1 < (*(STSchema **)key2)->version) {
|
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
|
||||||
return -1;
|
return -1;
|
||||||
} else if (*(int16_t *)key1 > (*(STSchema **)key2)->version) {
|
} else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) {
|
||||||
return 1;
|
return 1;
|
||||||
} else {
|
} else {
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -550,16 +450,20 @@ static int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
||||||
static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
|
static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
|
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
|
STable * pTable = NULL;
|
||||||
|
|
||||||
if (!taosCheckChecksumWhole((uint8_t *)cont, contLen)) {
|
if (!taosCheckChecksumWhole((uint8_t *)cont, contLen)) {
|
||||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
STable *pTable = tsdbDecodeTable(cont, contLen);
|
pTable = tsdbDecodeTable(cont, contLen);
|
||||||
if (pTable == NULL) return -1;
|
if (pTable == NULL) return -1;
|
||||||
|
|
||||||
if (tsdbAddTableToMeta(pMeta, pTable, false) < 0) return -1;
|
if (tsdbAddTableToMeta(pMeta, pTable, false) < 0) {
|
||||||
|
tsdbFreeTable(pTable);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is restored from file", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is restored from file", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||||
TABLE_TID(pTable), TALBE_UID(pTable));
|
TABLE_TID(pTable), TALBE_UID(pTable));
|
||||||
|
@ -579,11 +483,11 @@ static void tsdbOrgMeta(void *pHandle) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static char *getTagIndexKey(const void *pData) {
|
static char *getTagIndexKey(const void *pData) {
|
||||||
STableIndexElem *elem = (STableIndexElem *)pData;
|
STable *pTable = *(STable **)pData;
|
||||||
|
|
||||||
STSchema *pSchema = tsdbGetTableTagSchema(elem->pMeta, elem->pTable);
|
STSchema *pSchema = tsdbGetTableTagSchema(pTable);
|
||||||
STColumn *pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN];
|
STColumn *pCol = schemaColAt(DEFAULT_TAG_INDEX_COLUMN);
|
||||||
void * res = tdGetKVRowValOfCol(elem->pTable->tagVal, pCol->colId);
|
void * res = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -703,11 +607,14 @@ static int tsdbUpdateTableTagSchema(STable *pTable, STSchema *newSchema) {
|
||||||
ASSERT(schemaVersion(pTable->tagSchema) < schemaVersion(newSchema));
|
ASSERT(schemaVersion(pTable->tagSchema) < schemaVersion(newSchema));
|
||||||
STSchema *pOldSchema = pTable->tagSchema;
|
STSchema *pOldSchema = pTable->tagSchema;
|
||||||
STSchema *pNewSchema = tdDupSchema(newSchema);
|
STSchema *pNewSchema = tdDupSchema(newSchema);
|
||||||
if (pNewSchema == NULL) return TSDB_CODE_TDB_OUT_OF_MEMORY;
|
if (pNewSchema == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
pTable->tagSchema = pNewSchema;
|
pTable->tagSchema = pNewSchema;
|
||||||
tdFreeSchema(pOldSchema);
|
tdFreeSchema(pOldSchema);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
|
static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
|
||||||
|
@ -727,7 +634,6 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pMeta->tables[TABLE_TID(pTable)] = pTable;
|
|
||||||
if (TABLE_TID(pTable) == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index
|
if (TABLE_TID(pTable) == TSDB_CHILD_TABLE && addIdx) { // add STABLE to the index
|
||||||
if (tsdbAddTableIntoIndex(pMeta, pTable) < 0) {
|
if (tsdbAddTableIntoIndex(pMeta, pTable) < 0) {
|
||||||
tsdbTrace("vgId:%d failed to add table %s to meta while add table to index since %s", REPO_ID(pRepo),
|
tsdbTrace("vgId:%d failed to add table %s to meta while add table to index since %s", REPO_ID(pRepo),
|
||||||
|
@ -735,15 +641,15 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
pMeta->tables[TABLE_TID(pTable)] = pTable;
|
||||||
pMeta->nTables++;
|
pMeta->nTables++;
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosHashPut(pMeta->map, (char *)(&pTable->tableId.uid), sizeof(pTable->tableId.uid), (void *)(&pTable),
|
if (taosHashPut(pMeta->uidMap, (char *)(&pTable->tableId.uid), sizeof(pTable->tableId.uid), (void *)(&pTable),
|
||||||
sizeof(pTable)) < 0) {
|
sizeof(pTable)) < 0) {
|
||||||
return -1;
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
tsdbError("vgId:%d failed to add table %s to meta while put into uid map since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), tstrerror(terrno));
|
||||||
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
||||||
|
@ -753,54 +659,50 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tsdbRemoveTableFromIndex()
|
tsdbRemoveTableFromMeta(pRepo, pTable, false);
|
||||||
if (addIdx) tsdbUnlockRepoMeta(pRepo);
|
if (addIdx) tsdbUnlockRepoMeta(pRepo);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFromIdx) {
|
static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFromIdx) {
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
|
SListIter lIter = {0};
|
||||||
while (tSkipListIterNext(pIter)) {
|
SListNode *pNode = NULL;
|
||||||
STableIndexElem *pEle = (STableIndexElem *)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
|
STable * tTable = NULL;
|
||||||
STable *tTable = pEle->pTable;
|
|
||||||
|
|
||||||
ASSERT(tTable != NULL && tTable->type == TSDB_CHILD_TABLE);
|
if (rmFromIdx) tsdbWLockRepoMeta(pRepo);
|
||||||
|
|
||||||
tsdbRemoveTableFromMeta(pMeta, tTable, false);
|
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||||
|
tdListInitIter(pMeta->superList, &lIter, TD_LIST_BACKWARD);
|
||||||
|
|
||||||
|
while ((pNode = tdListNext(&lIter)) != NULL) {
|
||||||
|
tdListNodeGetData(pMeta->superList, pNode, (void *)(tTable));
|
||||||
|
if (pTable == tTable) {
|
||||||
|
break;
|
||||||
|
tdListPopNode(pMeta->superList, pNode);
|
||||||
|
free(pNode);
|
||||||
}
|
}
|
||||||
|
|
||||||
tSkipListDestroyIter(pIter);
|
|
||||||
|
|
||||||
if (pTable->prev != NULL) {
|
|
||||||
pTable->prev->next = pTable->next;
|
|
||||||
if (pTable->next != NULL) {
|
|
||||||
pTable->next->prev = pTable->prev;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
pMeta->superList = pTable->next;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
pMeta->tables[pTable->tableId.tid] = NULL;
|
pMeta->tables[pTable->tableId.tid] = NULL;
|
||||||
if (pTable->type == TSDB_CHILD_TABLE && rmFromIdx) {
|
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE && rmFromIdx) {
|
||||||
tsdbRemoveTableFromIndex(pMeta, pTable);
|
tsdbRemoveTableFromIndex(pMeta, pTable);
|
||||||
}
|
}
|
||||||
if (pTable->type == TSDB_STREAM_TABLE && rmFromIdx) {
|
|
||||||
// TODO
|
|
||||||
}
|
|
||||||
|
|
||||||
pMeta->nTables--;
|
pMeta->nTables--;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashRemove(pMeta->map, (char *)(&(pTable->tableId.uid)), sizeof(pTable->tableId.uid));
|
taosHashRemove(pMeta->uid, (char *)(&(TALBE_UID(pTable))), sizeof(TALBE_UID(pTable)));
|
||||||
tsdbFreeTable(pTable);
|
|
||||||
return 0;
|
if (rmFromIdx) tsdbUnlockRepoMeta(pRepo);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
|
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
|
||||||
assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
|
ASSERT(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
|
||||||
STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
|
STable *pSTable = tsdbGetTableByUid(pMeta, TABLE_SUID(pTable));
|
||||||
assert(pSTable != NULL);
|
ASSERT(pSTable != NULL);
|
||||||
|
|
||||||
|
pTable->pSuper = pSTable;
|
||||||
|
|
||||||
int32_t level = 0;
|
int32_t level = 0;
|
||||||
int32_t headSize = 0;
|
int32_t headSize = 0;
|
||||||
|
@ -809,39 +711,40 @@ static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
|
||||||
|
|
||||||
// NOTE: do not allocate the space for key, since in each skip list node, only keep the pointer to pTable, not the
|
// NOTE: do not allocate the space for key, since in each skip list node, only keep the pointer to pTable, not the
|
||||||
// actual key value, and the key value will be retrieved during query through the pTable and getTagIndexKey function
|
// actual key value, and the key value will be retrieved during query through the pTable and getTagIndexKey function
|
||||||
SSkipListNode* pNode = calloc(1, headSize + sizeof(STableIndexElem));
|
SSkipListNode *pNode = calloc(1, headSize + sizeof(STable *));
|
||||||
|
if (pNode == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
pNode->level = level;
|
pNode->level = level;
|
||||||
|
|
||||||
SSkipList *list = pSTable->pIndex;
|
SSkipList *list = pSTable->pIndex;
|
||||||
STableIndexElem* elem = (STableIndexElem*) (SL_GET_NODE_DATA(pNode));
|
(STable *)(SL_GET_NODE_DATA(pNode)) = pTable;
|
||||||
|
|
||||||
elem->pTable = pTable;
|
tSkipListPut(pSTable->pIndex, pNode);
|
||||||
elem->pMeta = pMeta;
|
|
||||||
|
|
||||||
tSkipListPut(list, pNode);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
|
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
|
||||||
assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
|
ASSERT(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
|
||||||
|
|
||||||
STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
|
STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
|
||||||
assert(pSTable != NULL);
|
ASSERT(pSTable != NULL);
|
||||||
|
|
||||||
STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable);
|
STSchema *pSchema = tsdbGetTableTagSchema(pTable);
|
||||||
STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN];
|
STColumn *pCol = schemaColAt(pSchema, DEFAULT_TAG_INDEX_COLUMN);
|
||||||
|
|
||||||
char* key = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId);
|
char* key = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId);
|
||||||
SArray* res = tSkipListGet(pSTable->pIndex, key);
|
SArray* res = tSkipListGet(pSTable->pIndex, key);
|
||||||
|
|
||||||
size_t size = taosArrayGetSize(res);
|
size_t size = taosArrayGetSize(res);
|
||||||
assert(size > 0);
|
ASSERT(size > 0);
|
||||||
|
|
||||||
for(int32_t i = 0; i < size; ++i) {
|
for(int32_t i = 0; i < size; ++i) {
|
||||||
SSkipListNode* pNode = taosArrayGetP(res, i);
|
SSkipListNode* pNode = taosArrayGetP(res, i);
|
||||||
|
|
||||||
STableIndexElem* pElem = (STableIndexElem*) SL_GET_NODE_DATA(pNode);
|
// STableIndexElem* pElem = (STableIndexElem*) SL_GET_NODE_DATA(pNode);
|
||||||
if (pElem->pTable == pTable) { // this is the exact what we need
|
if ((STable *)SL_GET_NODE_DATA(pNode) == pTable) { // this is the exact what we need
|
||||||
tSkipListRemoveNode(pSTable->pIndex, pNode);
|
tSkipListRemoveNode(pSTable->pIndex, pNode);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -975,3 +878,93 @@ static int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void *tsdbEncodeTableName(void *buf, tstr *name) {
|
||||||
|
void *pBuf = buf;
|
||||||
|
|
||||||
|
pBuf = taosEncodeFixedI16(pBuf, name->len);
|
||||||
|
memcpy(pBuf, name->data, name->len);
|
||||||
|
pBuf = POINTER_SHIFT(pBuf, name->len);
|
||||||
|
|
||||||
|
return POINTER_DISTANCE(pBuf, buf);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *tsdbDecodeTableName(void *buf, tstr **name) {
|
||||||
|
VarDataLenT len = 0;
|
||||||
|
|
||||||
|
buf = taosDecodeFixedI16(buf, &len);
|
||||||
|
*name = calloc(1, sizeof(tstr) + len + 1);
|
||||||
|
if (*name == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
(*name)->len = len;
|
||||||
|
memcpy((*name)->data, buf, len);
|
||||||
|
|
||||||
|
buf = POINTER_SHIFT(buf, len);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *tsdbEncodeTable(void *buf, STable *pTable) {
|
||||||
|
ASSERT(pTable != NULL);
|
||||||
|
|
||||||
|
buf = taosEncodeFixedU8(buf, pTable->type);
|
||||||
|
buf = tsdbEncodeTableName(buf, pTable->name);
|
||||||
|
buf = taosEncodeFixedU64(buf, TALBE_UID(pTable));
|
||||||
|
buf = taosEncodeFixedI32(buf, TABLE_TID(pTable));
|
||||||
|
|
||||||
|
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) {
|
||||||
|
buf = taosEncodeFixedU64(buf, TABLE_SUID(pTable));
|
||||||
|
buf = tdEncodeKVRow(buf, pTable->tagVal);
|
||||||
|
} else {
|
||||||
|
buf = taosEncodeFixedU8(buf, pTable->numOfSchemas);
|
||||||
|
for (int i = 0; i < pTable->numOfSchemas; i++) {
|
||||||
|
buf = tdEncodeSchema(buf, pTable->schema[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||||
|
buf = tdEncodeSchema(buf, pTable->tagSchema);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) {
|
||||||
|
buf = taosEncodeString(buf, pTable->sql);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *tsdbDecodeTable(void *buf, STable **pRTable) {
|
||||||
|
STable *pTable = (STable *)calloc(1, sizeof(STable));
|
||||||
|
if (pTable == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
buf = taosDecodeFixedU8(buf, &(pTable->type));
|
||||||
|
buf = tsdbDecodeTableName(buf, &(pTable->name));
|
||||||
|
buf = taosDecodeFixedU64(buf, &TALBE_UID(pTable));
|
||||||
|
buf = taosDecodeFixedI32(buf, &TABLE_TID(pTable));
|
||||||
|
|
||||||
|
if (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) {
|
||||||
|
buf = taosDecodeFixedU64(buf, &TABLE_SUID(pTable));
|
||||||
|
buf = tdDecodeKVRow(buf, &(pTable->tagVal));
|
||||||
|
} else {
|
||||||
|
buf = taosDecodeFixedU8(buf, &(pTable->numOfSchemas));
|
||||||
|
for (int i = 0; i < pTable->numOfSchemas; i++) {
|
||||||
|
buf = tdDecodeSchema(buf, &(pTable->schema[i]));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||||
|
buf = tdDecodeSchema(buf, &(pTable->tagSchema));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) {
|
||||||
|
buf = taosDecodeString(buf, &(pTable->sql));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
*pRTable = pTable;
|
||||||
|
|
||||||
|
return buf;
|
||||||
|
}
|
Loading…
Reference in New Issue