Merge branch 'develop' of https://github.com/taosdata/TDengine into develop
This commit is contained in:
commit
b4ce336cff
|
@ -146,6 +146,7 @@ class CTaosInterface(object):
|
||||||
libtaos.taos_errstr.restype = ctypes.c_char_p
|
libtaos.taos_errstr.restype = ctypes.c_char_p
|
||||||
libtaos.taos_subscribe.restype = ctypes.c_void_p
|
libtaos.taos_subscribe.restype = ctypes.c_void_p
|
||||||
libtaos.taos_consume.restype = ctypes.c_void_p
|
libtaos.taos_consume.restype = ctypes.c_void_p
|
||||||
|
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
|
||||||
|
|
||||||
def __init__(self, config=None):
|
def __init__(self, config=None):
|
||||||
'''
|
'''
|
||||||
|
@ -314,6 +315,8 @@ class CTaosInterface(object):
|
||||||
|
|
||||||
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
|
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
|
||||||
blocks = [None] * len(fields)
|
blocks = [None] * len(fields)
|
||||||
|
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
|
||||||
|
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
|
||||||
for i in range(len(fields)):
|
for i in range(len(fields)):
|
||||||
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
|
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
|
||||||
if data == None:
|
if data == None:
|
||||||
|
@ -323,7 +326,7 @@ class CTaosInterface(object):
|
||||||
if fields[i]['type'] not in _CONVERT_FUNC:
|
if fields[i]['type'] not in _CONVERT_FUNC:
|
||||||
raise DatabaseError("Invalid data type returned from database")
|
raise DatabaseError("Invalid data type returned from database")
|
||||||
|
|
||||||
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fields[i]['bytes'], isMicro)
|
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
|
||||||
|
|
||||||
return blocks, abs(num_of_rows)
|
return blocks, abs(num_of_rows)
|
||||||
|
|
||||||
|
|
|
@ -146,6 +146,7 @@ class CTaosInterface(object):
|
||||||
libtaos.taos_errstr.restype = ctypes.c_char_p
|
libtaos.taos_errstr.restype = ctypes.c_char_p
|
||||||
libtaos.taos_subscribe.restype = ctypes.c_void_p
|
libtaos.taos_subscribe.restype = ctypes.c_void_p
|
||||||
libtaos.taos_consume.restype = ctypes.c_void_p
|
libtaos.taos_consume.restype = ctypes.c_void_p
|
||||||
|
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
|
||||||
|
|
||||||
def __init__(self, config=None):
|
def __init__(self, config=None):
|
||||||
'''
|
'''
|
||||||
|
@ -314,6 +315,8 @@ class CTaosInterface(object):
|
||||||
|
|
||||||
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
|
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
|
||||||
blocks = [None] * len(fields)
|
blocks = [None] * len(fields)
|
||||||
|
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
|
||||||
|
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
|
||||||
for i in range(len(fields)):
|
for i in range(len(fields)):
|
||||||
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
|
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
|
||||||
if data == None:
|
if data == None:
|
||||||
|
@ -323,7 +326,7 @@ class CTaosInterface(object):
|
||||||
if fields[i]['type'] not in _CONVERT_FUNC:
|
if fields[i]['type'] not in _CONVERT_FUNC:
|
||||||
raise DatabaseError("Invalid data type returned from database")
|
raise DatabaseError("Invalid data type returned from database")
|
||||||
|
|
||||||
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fields[i]['bytes'], isMicro)
|
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
|
||||||
|
|
||||||
return blocks, abs(num_of_rows)
|
return blocks, abs(num_of_rows)
|
||||||
|
|
||||||
|
|
|
@ -146,6 +146,7 @@ class CTaosInterface(object):
|
||||||
libtaos.taos_errstr.restype = ctypes.c_char_p
|
libtaos.taos_errstr.restype = ctypes.c_char_p
|
||||||
libtaos.taos_subscribe.restype = ctypes.c_void_p
|
libtaos.taos_subscribe.restype = ctypes.c_void_p
|
||||||
libtaos.taos_consume.restype = ctypes.c_void_p
|
libtaos.taos_consume.restype = ctypes.c_void_p
|
||||||
|
libtaos.taos_fetch_lengths.restype = ctypes.c_void_p
|
||||||
|
|
||||||
def __init__(self, config=None):
|
def __init__(self, config=None):
|
||||||
'''
|
'''
|
||||||
|
@ -314,6 +315,8 @@ class CTaosInterface(object):
|
||||||
|
|
||||||
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
|
isMicro = (CTaosInterface.libtaos.taos_result_precision(result) == FieldType.C_TIMESTAMP_MICRO)
|
||||||
blocks = [None] * len(fields)
|
blocks = [None] * len(fields)
|
||||||
|
fieldL = CTaosInterface.libtaos.taos_fetch_lengths(result)
|
||||||
|
fieldLen = [ele for ele in ctypes.cast(fieldL, ctypes.POINTER(ctypes.c_int))[:len(fields)]]
|
||||||
for i in range(len(fields)):
|
for i in range(len(fields)):
|
||||||
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
|
data = ctypes.cast(pblock, ctypes.POINTER(ctypes.c_void_p))[i]
|
||||||
if data == None:
|
if data == None:
|
||||||
|
@ -323,7 +326,7 @@ class CTaosInterface(object):
|
||||||
if fields[i]['type'] not in _CONVERT_FUNC:
|
if fields[i]['type'] not in _CONVERT_FUNC:
|
||||||
raise DatabaseError("Invalid data type returned from database")
|
raise DatabaseError("Invalid data type returned from database")
|
||||||
|
|
||||||
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fields[i]['bytes'], isMicro)
|
blocks[i] = _CONVERT_FUNC[fields[i]['type']](data, num_of_rows, fieldLen[i], isMicro)
|
||||||
|
|
||||||
return blocks, abs(num_of_rows)
|
return blocks, abs(num_of_rows)
|
||||||
|
|
||||||
|
|
|
@ -121,10 +121,11 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
|
||||||
|
|
||||||
for (int k = 0; k < numOfRows; ++k) {
|
for (int k = 0; k < numOfRows; ++k) {
|
||||||
TAOS_ROW row = taos_fetch_row(result);
|
TAOS_ROW row = taos_fetch_row(result);
|
||||||
|
int32_t* length = taos_fetch_lengths(result);
|
||||||
|
|
||||||
// for group by
|
// for group by
|
||||||
if (groupFields != -1) {
|
if (groupFields != -1) {
|
||||||
char target[HTTP_GC_TARGET_SIZE];
|
char target[HTTP_GC_TARGET_SIZE] = {0};
|
||||||
int len;
|
int len;
|
||||||
len = snprintf(target,HTTP_GC_TARGET_SIZE,"%s{",aliasBuffer);
|
len = snprintf(target,HTTP_GC_TARGET_SIZE,"%s{",aliasBuffer);
|
||||||
for (int i = dataFields + 1; i<num_fields; i++){
|
for (int i = dataFields + 1; i<num_fields; i++){
|
||||||
|
@ -150,7 +151,11 @@ bool gcBuildQueryJson(HttpContext *pContext, HttpSqlCmd *cmd, TAOS_RES *result,
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%s", fields[i].name, (char *)row[i]);
|
if (row[i]!= NULL){
|
||||||
|
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:", fields[i].name);
|
||||||
|
memcpy(target + len, (char *) row[i], length[i]);
|
||||||
|
len = strlen(target);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%s", fields[i].name, "-");
|
len += snprintf(target + len, HTTP_GC_TARGET_SIZE - len, "%s:%s", fields[i].name, "-");
|
||||||
|
|
|
@ -95,6 +95,7 @@ typedef struct STable {
|
||||||
void * streamHandler; // TODO
|
void * streamHandler; // TODO
|
||||||
TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure
|
TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure
|
||||||
struct STable *next; // TODO: remove the next
|
struct STable *next; // TODO: remove the next
|
||||||
|
struct STable *prev;
|
||||||
} STable;
|
} STable;
|
||||||
|
|
||||||
#define TSDB_GET_TABLE_LAST_KEY(pTable) ((pTable)->lastKey)
|
#define TSDB_GET_TABLE_LAST_KEY(pTable) ((pTable)->lastKey)
|
||||||
|
|
|
@ -90,6 +90,7 @@ void tsdbFreeCfg(STsdbCfg *pCfg) {
|
||||||
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */) {
|
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO */) {
|
||||||
|
|
||||||
if (mkdir(rootDir, 0755) != 0) {
|
if (mkdir(rootDir, 0755) != 0) {
|
||||||
|
tsdbError("id %d: failed to create rootDir! rootDir %s, reason %s", pCfg->tsdbId, rootDir, strerror(errno));
|
||||||
if (errno == EACCES) {
|
if (errno == EACCES) {
|
||||||
return TSDB_CODE_NO_DISK_PERMISSIONS;
|
return TSDB_CODE_NO_DISK_PERMISSIONS;
|
||||||
} else if (errno == ENOSPC) {
|
} else if (errno == ENOSPC) {
|
||||||
|
@ -611,14 +612,20 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
|
||||||
if (pCfg->precision == -1) {
|
if (pCfg->precision == -1) {
|
||||||
pCfg->precision = TSDB_DEFAULT_PRECISION;
|
pCfg->precision = TSDB_DEFAULT_PRECISION;
|
||||||
} else {
|
} else {
|
||||||
if (!IS_VALID_PRECISION(pCfg->precision)) return -1;
|
if (!IS_VALID_PRECISION(pCfg->precision)) {
|
||||||
|
tsdbError("id %d: invalid precision configuration! precision %d", pCfg->tsdbId, pCfg->precision);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check compression
|
// Check compression
|
||||||
if (pCfg->compression == -1) {
|
if (pCfg->compression == -1) {
|
||||||
pCfg->compression = TSDB_DEFAULT_COMPRESSION;
|
pCfg->compression = TSDB_DEFAULT_COMPRESSION;
|
||||||
} else {
|
} else {
|
||||||
if (!IS_VALID_COMPRESSION(pCfg->compression)) return -1;
|
if (!IS_VALID_COMPRESSION(pCfg->compression)) {
|
||||||
|
tsdbError("id %d: invalid compression configuration! compression %d", pCfg->tsdbId, pCfg->precision);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check tsdbId
|
// Check tsdbId
|
||||||
|
@ -628,30 +635,50 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
|
||||||
if (pCfg->maxTables == -1) {
|
if (pCfg->maxTables == -1) {
|
||||||
pCfg->maxTables = TSDB_DEFAULT_TABLES;
|
pCfg->maxTables = TSDB_DEFAULT_TABLES;
|
||||||
} else {
|
} else {
|
||||||
if (pCfg->maxTables < TSDB_MIN_TABLES || pCfg->maxTables > TSDB_MAX_TABLES) return -1;
|
if (pCfg->maxTables < TSDB_MIN_TABLES || pCfg->maxTables > TSDB_MAX_TABLES) {
|
||||||
|
tsdbError("id %d: invalid maxTables configuration! maxTables %d TSDB_MIN_TABLES %d TSDB_MAX_TABLES %d",
|
||||||
|
pCfg->tsdbId, pCfg->maxTables, TSDB_MIN_TABLES, TSDB_MAX_TABLES);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check daysPerFile
|
// Check daysPerFile
|
||||||
if (pCfg->daysPerFile == -1) {
|
if (pCfg->daysPerFile == -1) {
|
||||||
pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
|
pCfg->daysPerFile = TSDB_DEFAULT_DAYS_PER_FILE;
|
||||||
} else {
|
} else {
|
||||||
if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) return -1;
|
if (pCfg->daysPerFile < TSDB_MIN_DAYS_PER_FILE || pCfg->daysPerFile > TSDB_MAX_DAYS_PER_FILE) {
|
||||||
|
tsdbError(
|
||||||
|
"id %d: invalid daysPerFile configuration! daysPerFile %d TSDB_MIN_DAYS_PER_FILE %d TSDB_MAX_DAYS_PER_FILE "
|
||||||
|
"%d",
|
||||||
|
pCfg->tsdbId, pCfg->daysPerFile, TSDB_MIN_DAYS_PER_FILE, TSDB_MAX_DAYS_PER_FILE);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check minRowsPerFileBlock and maxRowsPerFileBlock
|
// Check minRowsPerFileBlock and maxRowsPerFileBlock
|
||||||
if (pCfg->minRowsPerFileBlock == -1) {
|
if (pCfg->minRowsPerFileBlock == -1) {
|
||||||
pCfg->minRowsPerFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK;
|
pCfg->minRowsPerFileBlock = TSDB_DEFAULT_MIN_ROW_FBLOCK;
|
||||||
} else {
|
} else {
|
||||||
if (pCfg->minRowsPerFileBlock < TSDB_MIN_MIN_ROW_FBLOCK || pCfg->minRowsPerFileBlock > TSDB_MAX_MIN_ROW_FBLOCK)
|
if (pCfg->minRowsPerFileBlock < TSDB_MIN_MIN_ROW_FBLOCK || pCfg->minRowsPerFileBlock > TSDB_MAX_MIN_ROW_FBLOCK) {
|
||||||
|
tsdbError(
|
||||||
|
"id %d: invalid minRowsPerFileBlock configuration! minRowsPerFileBlock %d TSDB_MIN_MIN_ROW_FBLOCK %d "
|
||||||
|
"TSDB_MAX_MIN_ROW_FBLOCK %d",
|
||||||
|
pCfg->tsdbId, pCfg->minRowsPerFileBlock, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pCfg->maxRowsPerFileBlock == -1) {
|
if (pCfg->maxRowsPerFileBlock == -1) {
|
||||||
pCfg->maxRowsPerFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK;
|
pCfg->maxRowsPerFileBlock = TSDB_DEFAULT_MAX_ROW_FBLOCK;
|
||||||
} else {
|
} else {
|
||||||
if (pCfg->maxRowsPerFileBlock < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_MAX_ROW_FBLOCK)
|
if (pCfg->maxRowsPerFileBlock < TSDB_MIN_MAX_ROW_FBLOCK || pCfg->maxRowsPerFileBlock > TSDB_MAX_MAX_ROW_FBLOCK) {
|
||||||
|
tsdbError(
|
||||||
|
"id %d: invalid maxRowsPerFileBlock configuration! maxRowsPerFileBlock %d TSDB_MIN_MAX_ROW_FBLOCK %d "
|
||||||
|
"TSDB_MAX_MAX_ROW_FBLOCK %d",
|
||||||
|
pCfg->tsdbId, pCfg->maxRowsPerFileBlock, TSDB_MIN_MIN_ROW_FBLOCK, TSDB_MAX_MIN_ROW_FBLOCK);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) return -1;
|
if (pCfg->minRowsPerFileBlock > pCfg->maxRowsPerFileBlock) return -1;
|
||||||
|
|
||||||
|
@ -659,7 +686,13 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
|
||||||
if (pCfg->keep == -1) {
|
if (pCfg->keep == -1) {
|
||||||
pCfg->keep = TSDB_DEFAULT_KEEP;
|
pCfg->keep = TSDB_DEFAULT_KEEP;
|
||||||
} else {
|
} else {
|
||||||
if (pCfg->keep < TSDB_MIN_KEEP || pCfg->keep > TSDB_MAX_KEEP) return -1;
|
if (pCfg->keep < TSDB_MIN_KEEP || pCfg->keep > TSDB_MAX_KEEP) {
|
||||||
|
tsdbError(
|
||||||
|
"id %d: invalid keep configuration! keep %d TSDB_MIN_KEEP %d "
|
||||||
|
"TSDB_MAX_KEEP %d",
|
||||||
|
pCfg->tsdbId, pCfg->keep, TSDB_MIN_KEEP, TSDB_MAX_KEEP);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -716,15 +749,22 @@ static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) {
|
static int32_t tsdbSetRepoEnv(STsdbRepo *pRepo) {
|
||||||
|
STsdbCfg *pCfg = &pRepo->config;
|
||||||
if (tsdbSaveConfig(pRepo) < 0) return -1;
|
if (tsdbSaveConfig(pRepo) < 0) return -1;
|
||||||
|
|
||||||
char dirName[128] = "\0";
|
char dirName[128] = "\0";
|
||||||
if (tsdbGetDataDirName(pRepo, dirName) < 0) return -1;
|
if (tsdbGetDataDirName(pRepo, dirName) < 0) return -1;
|
||||||
|
|
||||||
if (mkdir(dirName, 0755) < 0) {
|
if (mkdir(dirName, 0755) < 0) {
|
||||||
|
tsdbError("id %d: failed to create repository directory! reason %s", pRepo->config.tsdbId, strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbError(
|
||||||
|
"id %d: set up tsdb environment succeed! cacheBlockSize %d, totalBlocks %d, maxTables %d, daysPerFile %d, keep "
|
||||||
|
"%d, minRowsPerFileBlock %d, maxRowsPerFileBlock %d, precision %d, compression%d",
|
||||||
|
pRepo->config.tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->maxTables, pCfg->daysPerFile, pCfg->keep,
|
||||||
|
pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -811,7 +851,8 @@ static int32_t tsdbInsertDataToTable(TsdbRepoT *repo, SSubmitBlk *pBlock, TSKEY
|
||||||
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
|
STableId tableId = {.uid = pBlock->uid, .tid = pBlock->tid};
|
||||||
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
|
STable *pTable = tsdbIsValidTableToInsert(pRepo->tsdbMeta, tableId);
|
||||||
if (pTable == NULL) {
|
if (pTable == NULL) {
|
||||||
tsdbError("failed to get table for insert, uid:%" PRIu64 ", tid:%d", tableId.uid, tableId.tid);
|
tsdbError("id %d: failed to get table for insert, uid:%" PRIu64 ", tid:%d", pRepo->config.tsdbId, pBlock->uid,
|
||||||
|
pBlock->tid);
|
||||||
return TSDB_CODE_INVALID_TABLE_ID;
|
return TSDB_CODE_INVALID_TABLE_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -13,10 +13,10 @@
|
||||||
static int tsdbFreeTable(STable *pTable);
|
static int tsdbFreeTable(STable *pTable);
|
||||||
static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
|
static int32_t tsdbCheckTableCfg(STableCfg *pCfg);
|
||||||
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx);
|
static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx);
|
||||||
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable);
|
|
||||||
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
||||||
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
||||||
static int tsdbEstimateTableEncodeSize(STable *pTable);
|
static int tsdbEstimateTableEncodeSize(STable *pTable);
|
||||||
|
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encode a TSDB table object as a binary content
|
* Encode a TSDB table object as a binary content
|
||||||
|
@ -375,21 +375,9 @@ int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
|
||||||
STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid);
|
STable *pTable = tsdbGetTableByUid(pMeta, tableId.uid);
|
||||||
if (pTable == NULL) return -1;
|
if (pTable == NULL) return -1;
|
||||||
|
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
if (tsdbRemoveTableFromMeta(pMeta, pTable) < 0) return -1;
|
||||||
// TODO: implement drop super table
|
|
||||||
return -1;
|
|
||||||
} else {
|
|
||||||
pMeta->tables[pTable->tableId.tid] = NULL;
|
|
||||||
pMeta->nTables--;
|
|
||||||
assert(pMeta->nTables >= 0);
|
|
||||||
if (pTable->type == TSDB_CHILD_TABLE) {
|
|
||||||
tsdbRemoveTableFromIndex(pMeta, pTable);
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbFreeTable(pTable);
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) {
|
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) {
|
||||||
|
@ -445,10 +433,12 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
|
||||||
if (pMeta->superList == NULL) {
|
if (pMeta->superList == NULL) {
|
||||||
pMeta->superList = pTable;
|
pMeta->superList = pTable;
|
||||||
pTable->next = NULL;
|
pTable->next = NULL;
|
||||||
|
pTable->prev = NULL;
|
||||||
} else {
|
} else {
|
||||||
STable *pTemp = pMeta->superList;
|
pTable->next = pMeta->superList;
|
||||||
|
pTable->prev = NULL;
|
||||||
|
pTable->next->prev = pTable;
|
||||||
pMeta->superList = pTable;
|
pMeta->superList = pTable;
|
||||||
pTable->next = pTemp;
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// add non-super table to the array
|
// add non-super table to the array
|
||||||
|
@ -467,22 +457,50 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
|
||||||
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
|
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
return tsdbAddTableIntoMap(pMeta, pTable);
|
if (taosHashPut(pMeta->map, (char *)(&pTable->tableId.uid), sizeof(pTable->tableId.uid), (void *)(&pTable), sizeof(pTable)) < 0) {
|
||||||
}
|
|
||||||
|
|
||||||
// static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
|
|
||||||
// // TODO
|
|
||||||
// return 0;
|
|
||||||
// }
|
|
||||||
|
|
||||||
static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable) {
|
|
||||||
// TODO: add the table to the map
|
|
||||||
int64_t uid = pTable->tableId.uid;
|
|
||||||
if (taosHashPut(pMeta->map, (char *)(&uid), sizeof(uid), (void *)(&pTable), sizeof(pTable)) < 0) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbRemoveTableFromMeta(STsdbMeta *pMeta, STable *pTable) {
|
||||||
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
|
SSkipListIterator *pIter = tSkipListCreateIter(pTable->pIndex);
|
||||||
|
while (tSkipListIterNext(pIter)) {
|
||||||
|
STable *tTable = *(STable **)SL_GET_NODE_DATA(tSkipListIterGet(pIter));
|
||||||
|
ASSERT(tTable != NULL && tTable->type == TSDB_CHILD_TABLE);
|
||||||
|
|
||||||
|
pMeta->tables[tTable->tableId.tid] = NULL;
|
||||||
|
taosHashRemove(pMeta->map, (char *)(&(pTable->tableId.uid)), sizeof(pTable->tableId.uid));
|
||||||
|
pMeta->nTables--;
|
||||||
|
tsdbFreeTable(tTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
tSkipListDestroyIter(pIter);
|
||||||
|
|
||||||
|
// TODO: Remove the table from the list
|
||||||
|
if (pTable->prev != NULL) {
|
||||||
|
pTable->prev->next = pTable->next;
|
||||||
|
if (pTable->next != NULL) {
|
||||||
|
pTable->next->prev = pTable->prev;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pMeta->superList = pTable->next;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pMeta->tables[pTable->tableId.tid] = NULL;
|
||||||
|
if (pTable->type == TSDB_CHILD_TABLE) {
|
||||||
|
tsdbRemoveTableFromIndex(pMeta, pTable);
|
||||||
|
}
|
||||||
|
|
||||||
|
pMeta->nTables--;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbFreeTable(pTable);
|
||||||
|
taosHashRemove(pMeta->map, (char *)(&(pTable->tableId.uid)), sizeof(pTable->tableId.uid));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
|
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable) {
|
||||||
assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
|
assert(pTable->type == TSDB_CHILD_TABLE && pTable != NULL);
|
||||||
STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
|
STable* pSTable = tsdbGetTableByUid(pMeta, pTable->superUid);
|
||||||
|
|
Loading…
Reference in New Issue