update table schema change code
This commit is contained in:
parent
fb951a3762
commit
b823afbd6e
|
@ -579,9 +579,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
|
||||||
int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta);
|
int32_t numOfCols = tscGetNumOfColumns(pTableDataBlock->pTableMeta);
|
||||||
for(int32_t j = 0; j < numOfCols; ++j) {
|
for(int32_t j = 0; j < numOfCols; ++j) {
|
||||||
STColumn* pCol = (STColumn*) pDataBlock;
|
STColumn* pCol = (STColumn*) pDataBlock;
|
||||||
pCol->colId = pSchema[j].colId;
|
pCol->colId = htons(pSchema[j].colId);
|
||||||
pCol->type = pSchema[j].type;
|
pCol->type = pSchema[j].type;
|
||||||
pCol->bytes = pSchema[j].bytes;
|
pCol->bytes = htons(pSchema[j].bytes);
|
||||||
pCol->offset = 0;
|
pCol->offset = 0;
|
||||||
|
|
||||||
pDataBlock += sizeof(STColumn);
|
pDataBlock += sizeof(STColumn);
|
||||||
|
|
|
@ -200,6 +200,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_ACTION, 0, 0x060D, "tsdb inval
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_CREATE_TB_MSG, 0, 0x060E, "tsdb invalid create table message")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_NO_TABLE_DATA_IN_MEM, 0, 0x060F, "tsdb no table data in memory skiplist")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_FILE_ALREADY_EXISTS, 0, 0x0610, "tsdb file already exists")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TDB_TABLE_RECONFIGURE, 0, 0x0611, "tsdb need to reconfigure table")
|
||||||
|
|
||||||
// query
|
// query
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle")
|
TAOS_DEFINE_ERROR(TSDB_CODE_QRY_INVALID_QHANDLE, 0, 0x0700, "query invalid handle")
|
||||||
|
|
|
@ -203,8 +203,7 @@ typedef struct SSubmitBlk {
|
||||||
typedef struct SSubmitMsg {
|
typedef struct SSubmitMsg {
|
||||||
SMsgHead header;
|
SMsgHead header;
|
||||||
int32_t length;
|
int32_t length;
|
||||||
int32_t compressed : 2;
|
int32_t numOfBlocks;
|
||||||
int32_t numOfBlocks : 30;
|
|
||||||
SSubmitBlk blocks[];
|
SSubmitBlk blocks[];
|
||||||
} SSubmitMsg;
|
} SSubmitMsg;
|
||||||
|
|
||||||
|
|
|
@ -308,6 +308,49 @@ int tsdbRLockRepoMeta(STsdbRepo* pRepo);
|
||||||
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
|
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
|
||||||
void tsdbRefTable(STable* pTable);
|
void tsdbRefTable(STable* pTable);
|
||||||
void tsdbUnRefTable(STable* pTable);
|
void tsdbUnRefTable(STable* pTable);
|
||||||
|
void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct);
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
||||||
|
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
|
||||||
|
return -1;
|
||||||
|
} else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) {
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t version) {
|
||||||
|
ASSERT(TABLE_TYPE(pTable) != TSDB_SUPER_TABLE);
|
||||||
|
STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
|
||||||
|
STSchema* pSchema = NULL;
|
||||||
|
STSchema* pTSchema = NULL;
|
||||||
|
|
||||||
|
if (lock) taosRLockLatch(&(pDTable->latch));
|
||||||
|
if (version < 0) { // get the latest version of schema
|
||||||
|
pTSchema = pDTable->schema[pDTable->numOfSchemas - 1];
|
||||||
|
} else { // get the schema with version
|
||||||
|
void* ptr = taosbsearch(&version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*),
|
||||||
|
tsdbCompareSchemaVersion, TD_EQ);
|
||||||
|
if (ptr == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
pTSchema = *(STSchema**)ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pTSchema != NULL);
|
||||||
|
|
||||||
|
if (copy) {
|
||||||
|
if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
} else {
|
||||||
|
pSchema = pTSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (lock) taosRUnLockLatch(&(pDTable->latch));
|
||||||
|
return pSchema;
|
||||||
|
}
|
||||||
|
|
||||||
// ------------------ tsdbBuffer.c
|
// ------------------ tsdbBuffer.c
|
||||||
STsdbBufPool* tsdbNewBufPool();
|
STsdbBufPool* tsdbNewBufPool();
|
||||||
|
|
|
@ -43,7 +43,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t totalLen;
|
int32_t totalLen;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
SSubmitBlk *pBlock;
|
void * pMsg;
|
||||||
} SSubmitMsgIter;
|
} SSubmitMsgIter;
|
||||||
|
|
||||||
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
|
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
|
||||||
|
@ -56,7 +56,7 @@ static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg);
|
||||||
static void tsdbFreeRepo(STsdbRepo *pRepo);
|
static void tsdbFreeRepo(STsdbRepo *pRepo);
|
||||||
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
|
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
|
||||||
static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows);
|
static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows);
|
||||||
static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter);
|
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
|
||||||
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
|
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
|
||||||
static int tsdbRestoreInfo(STsdbRepo *pRepo);
|
static int tsdbRestoreInfo(STsdbRepo *pRepo);
|
||||||
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
|
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
|
||||||
|
@ -68,6 +68,7 @@ static int keyFGroupCompFunc(const void *key, const void *fgroup);
|
||||||
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
|
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
|
||||||
static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg);
|
static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg);
|
||||||
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
|
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
|
||||||
|
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
|
||||||
|
|
||||||
// Function declaration
|
// Function declaration
|
||||||
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
|
int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
|
||||||
|
@ -164,6 +165,15 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
|
||||||
STsdbRepo * pRepo = (STsdbRepo *)repo;
|
STsdbRepo * pRepo = (STsdbRepo *)repo;
|
||||||
SSubmitMsgIter msgIter = {0};
|
SSubmitMsgIter msgIter = {0};
|
||||||
|
|
||||||
|
if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) {
|
||||||
|
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) {
|
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) {
|
||||||
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
|
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -173,12 +183,14 @@ int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *
|
||||||
int32_t affectedrows = 0;
|
int32_t affectedrows = 0;
|
||||||
|
|
||||||
TSKEY now = taosGetTimestamp(pRepo->config.precision);
|
TSKEY now = taosGetTimestamp(pRepo->config.precision);
|
||||||
|
while (true) {
|
||||||
while ((pBlock = tsdbGetSubmitMsgNext(&msgIter)) != NULL) {
|
tsdbGetSubmitMsgNext(&msgIter, &pBlock);
|
||||||
|
if (pBlock == NULL) break;
|
||||||
if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) {
|
if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
|
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -263,7 +275,7 @@ void tsdbStartStream(TSDB_REPO_T *repo) {
|
||||||
STable *pTable = pMeta->tables[i];
|
STable *pTable = pMeta->tables[i];
|
||||||
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
|
if (pTable && pTable->type == TSDB_STREAM_TABLE) {
|
||||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
|
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
|
||||||
tsdbGetTableSchema(pTable));
|
tsdbGetTableSchemaImpl(pTable, false, false, -1));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -694,17 +706,12 @@ static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMsg->length = htonl(pMsg->length);
|
|
||||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
|
||||||
pMsg->compressed = htonl(pMsg->compressed);
|
|
||||||
|
|
||||||
pIter->totalLen = pMsg->length;
|
pIter->totalLen = pMsg->length;
|
||||||
pIter->len = TSDB_SUBMIT_MSG_HEAD_SIZE;
|
pIter->len = 0;
|
||||||
|
pIter->pMsg = pMsg;
|
||||||
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
|
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
|
||||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
|
||||||
pIter->pBlock = pMsg->blocks;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -714,26 +721,8 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||||
int64_t points = 0;
|
int64_t points = 0;
|
||||||
|
|
||||||
STable *pTable = tsdbGetTableByUid(pMeta, pBlock->uid);
|
STable *pTable = pMeta->tables[pBlock->tid];
|
||||||
if (pTable == NULL || TABLE_TID(pTable) != pBlock->tid) {
|
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
|
||||||
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
|
|
||||||
pBlock->tid);
|
|
||||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
|
||||||
tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
|
|
||||||
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check schema version and update schema if needed
|
|
||||||
if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) {
|
|
||||||
tsdbError("vgId:%d failed to insert data to table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
|
||||||
tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
SSubmitBlkIter blkIter = {0};
|
SSubmitBlkIter blkIter = {0};
|
||||||
SDataRow row = NULL;
|
SDataRow row = NULL;
|
||||||
|
@ -764,27 +753,23 @@ static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
|
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
||||||
SSubmitBlk *pBlock = pIter->pBlock;
|
if (pIter->len == 0) {
|
||||||
if (pBlock == NULL) return NULL;
|
pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE;
|
||||||
|
|
||||||
pBlock->dataLen = htonl(pBlock->dataLen);
|
|
||||||
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
|
||||||
pBlock->numOfRows = htons(pBlock->numOfRows);
|
|
||||||
pBlock->uid = htobe64(pBlock->uid);
|
|
||||||
pBlock->tid = htonl(pBlock->tid);
|
|
||||||
|
|
||||||
pBlock->sversion = htonl(pBlock->sversion);
|
|
||||||
pBlock->padding = htonl(pBlock->padding);
|
|
||||||
|
|
||||||
pIter->len = pIter->len + sizeof(SSubmitBlk) + pBlock->dataLen;
|
|
||||||
if (pIter->len >= pIter->totalLen) {
|
|
||||||
pIter->pBlock = NULL;
|
|
||||||
} else {
|
} else {
|
||||||
pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->dataLen + sizeof(SSubmitBlk));
|
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
||||||
|
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pBlock;
|
if (pIter->len > pIter->totalLen) {
|
||||||
|
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||||
|
*pPBlock = NULL;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
||||||
|
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
||||||
|
@ -969,42 +954,64 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
|
||||||
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
|
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
|
||||||
ASSERT(pTable != NULL);
|
ASSERT(pTable != NULL);
|
||||||
|
|
||||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||||
int sversion = schemaVersion(pSchema);
|
int sversion = schemaVersion(pSchema);
|
||||||
|
|
||||||
if (pBlock->sversion == sversion) return 0;
|
if (pBlock->sversion == sversion) {
|
||||||
if (pBlock->sversion > sversion) { // need to config
|
return 0;
|
||||||
tsdbDebug("vgId:%d table %s tid %d has version %d smaller than client version %d, try to config", REPO_ID(pRepo),
|
|
||||||
TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), sversion, pBlock->sversion);
|
|
||||||
if (pRepo->appH.configFunc) {
|
|
||||||
void *msg = (*pRepo->appH.configFunc)(REPO_ID(pRepo), TABLE_TID(pTable));
|
|
||||||
if (msg == NULL) {
|
|
||||||
tsdbError("vgId:%d failed to config table %s tid %d since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
|
||||||
TABLE_TID(pTable), tstrerror(terrno));
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
STableCfg *pTableCfg = tsdbCreateTableCfgFromMsg(msg);
|
|
||||||
if (pTableCfg == NULL) {
|
|
||||||
rpcFreeCont(msg);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbUpdateTable(pRepo, (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable, pTableCfg) < 0) {
|
|
||||||
tsdbError("vgId:%d failed to update table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
|
||||||
tstrerror(terrno));
|
|
||||||
tsdbClearTableCfg(pTableCfg);
|
|
||||||
rpcFreeCont(msg);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
tsdbClearTableCfg(pTableCfg);
|
|
||||||
rpcFreeCont(msg);
|
|
||||||
} else {
|
} else {
|
||||||
|
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // stream table is not allowed to change schema
|
||||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBlock->sversion > sversion) { // may need to update table schema
|
||||||
|
if (pBlock->schemaLen > 0) {
|
||||||
|
tsdbDebug(
|
||||||
|
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, update...",
|
||||||
|
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
|
||||||
|
ASSERT(pBlock->schemaLen % sizeof(STColumn) == 0);
|
||||||
|
int numOfCols = pBlock->schemaLen / sizeof(STColumn);
|
||||||
|
STColumn *pTCol = (STColumn *)pBlock->data;
|
||||||
|
|
||||||
|
STSchemaBuilder schemaBuilder = {0};
|
||||||
|
if (tdInitTSchemaBuilder(&schemaBuilder, pBlock->sversion) < 0) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||||
|
tstrerror(terrno));
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int i = 0; i < numOfCols; i++) {
|
||||||
|
if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, htons(pTCol[i].colId), htons(pTCol[i].bytes)) < 0) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||||
|
tstrerror(terrno));
|
||||||
|
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder);
|
||||||
|
if (pNSchema == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||||
|
tsdbUpdateTableSchema(pRepo, pTable, pNSchema, true);
|
||||||
} else {
|
} else {
|
||||||
if (tsdbGetTableSchemaByVersion(pTable, pBlock->sversion) == NULL) {
|
tsdbDebug(
|
||||||
|
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, reconfigure...",
|
||||||
|
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
|
||||||
|
terrno = TSDB_CODE_TDB_TABLE_RECONFIGURE;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(pBlock->sversion >= 0);
|
||||||
|
if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) {
|
||||||
tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo),
|
tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo),
|
||||||
pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable));
|
pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable));
|
||||||
}
|
}
|
||||||
|
@ -1015,6 +1022,63 @@ static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pT
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
|
||||||
|
ASSERT(pMsg != NULL);
|
||||||
|
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||||
|
SSubmitMsgIter msgIter = {0};
|
||||||
|
SSubmitBlk * pBlock = NULL;
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
pMsg->length = htonl(pMsg->length);
|
||||||
|
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||||
|
|
||||||
|
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
||||||
|
while (true) {
|
||||||
|
if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||||
|
if (pBlock == NULL) break;
|
||||||
|
|
||||||
|
pBlock->uid = htobe64(pBlock->uid);
|
||||||
|
pBlock->tid = htonl(pBlock->tid);
|
||||||
|
pBlock->sversion = htonl(pBlock->sversion);
|
||||||
|
pBlock->dataLen = htonl(pBlock->dataLen);
|
||||||
|
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||||
|
pBlock->numOfRows = htons(pBlock->numOfRows);
|
||||||
|
|
||||||
|
if (pBlock->tid <= 0 || pBlock->tid >= pRepo->config.maxTables) {
|
||||||
|
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
|
||||||
|
pBlock->tid);
|
||||||
|
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
STable *pTable = pMeta->tables[pBlock->tid];
|
||||||
|
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
|
||||||
|
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
|
||||||
|
pBlock->tid);
|
||||||
|
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||||
|
tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
|
||||||
|
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check schema version and update schema if needed
|
||||||
|
if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) {
|
||||||
|
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
|
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
|
||||||
// TODO
|
// TODO
|
||||||
// STsdbCache *pCache = pRepo->tsdbCache;
|
// STsdbCache *pCache = pRepo->tsdbCache;
|
||||||
|
|
|
@ -538,10 +538,12 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
SCommitIter *pIter = iters + tid;
|
SCommitIter *pIter = iters + tid;
|
||||||
if (pIter->pTable == NULL) continue;
|
if (pIter->pTable == NULL) continue;
|
||||||
|
|
||||||
|
taosRLockLatch(&(pIter->pTable->latch));
|
||||||
|
|
||||||
tsdbSetHelperTable(pHelper, pIter->pTable, pRepo);
|
tsdbSetHelperTable(pHelper, pIter->pTable, pRepo);
|
||||||
|
|
||||||
if (pIter->pIter != NULL) {
|
if (pIter->pIter != NULL) {
|
||||||
tdInitDataCols(pDataCols, tsdbGetTableSchema(pIter->pTable));
|
tdInitDataCols(pDataCols, tsdbGetTableSchemaImpl(pIter->pTable, false, false, -1));
|
||||||
|
|
||||||
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
|
int maxRowsToRead = pCfg->maxRowsPerFileBlock * 4 / 5;
|
||||||
int nLoop = 0;
|
int nLoop = 0;
|
||||||
|
@ -557,6 +559,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
|
int rowsWritten = tsdbWriteDataBlock(pHelper, pDataCols);
|
||||||
ASSERT(rowsWritten != 0);
|
ASSERT(rowsWritten != 0);
|
||||||
if (rowsWritten < 0) {
|
if (rowsWritten < 0) {
|
||||||
|
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||||
tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
|
tsdbError("vgId:%d failed to write data block to table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
|
||||||
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
|
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
|
||||||
tstrerror(terrno));
|
tstrerror(terrno));
|
||||||
|
@ -571,6 +574,8 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
ASSERT(pDataCols->numOfRows == 0);
|
ASSERT(pDataCols->numOfRows == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||||
|
|
||||||
// Move the last block to the new .l file if neccessary
|
// Move the last block to the new .l file if neccessary
|
||||||
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
|
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
|
||||||
tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
|
tsdbError("vgId:%d, failed to move last block, since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
|
@ -680,10 +685,10 @@ static int tsdbReadRowsFromCache(STsdbMeta *pMeta, STable *pTable, SSkipListIter
|
||||||
if (dataRowKey(row) > maxKey) break;
|
if (dataRowKey(row) > maxKey) break;
|
||||||
|
|
||||||
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
if (pSchema == NULL || schemaVersion(pSchema) != dataRowVersion(row)) {
|
||||||
pSchema = tsdbGetTableSchemaByVersion(pTable, dataRowVersion(row));
|
pSchema = tsdbGetTableSchemaImpl(pTable, true, false, dataRowVersion(row));
|
||||||
if (pSchema == NULL) {
|
if (pSchema == NULL) {
|
||||||
// TODO: deal with the error here
|
// TODO: deal with the error here
|
||||||
ASSERT(false);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -449,17 +449,7 @@ int tsdbCloseMeta(STsdbRepo *pRepo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema *tsdbGetTableSchema(STable *pTable) {
|
STSchema *tsdbGetTableSchema(STable *pTable) { return tsdbGetTableSchemaImpl(pTable, true, true, -1); }
|
||||||
if (pTable->type == TSDB_NORMAL_TABLE || pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_STREAM_TABLE) {
|
|
||||||
return pTable->schema[pTable->numOfSchemas - 1];
|
|
||||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
|
||||||
STable *pSuper = pTable->pSuper;
|
|
||||||
if (pSuper == NULL) return NULL;
|
|
||||||
return pSuper->schema[pSuper->numOfSchemas - 1];
|
|
||||||
} else {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
|
STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
|
||||||
void *ptr = taosHashGet(pMeta->uidMap, (char *)(&uid), sizeof(uid));
|
void *ptr = taosHashGet(pMeta->uidMap, (char *)(&uid), sizeof(uid));
|
||||||
|
@ -470,14 +460,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t version) {
|
STSchema *tsdbGetTableSchemaByVersion(STable *pTable, int16_t version) {
|
||||||
STable *pSearchTable = (pTable->type == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
|
return tsdbGetTableSchemaImpl(pTable, true, true, version);
|
||||||
if (pSearchTable == NULL) return NULL;
|
|
||||||
|
|
||||||
void *ptr = taosbsearch(&version, pSearchTable->schema, pSearchTable->numOfSchemas, sizeof(STSchema *),
|
|
||||||
tsdbCompareSchemaVersion, TD_EQ);
|
|
||||||
if (ptr == NULL) return NULL;
|
|
||||||
|
|
||||||
return *(STSchema **)ptr;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
STSchema *tsdbGetTableTagSchema(STable *pTable) {
|
STSchema *tsdbGetTableTagSchema(STable *pTable) {
|
||||||
|
@ -575,7 +558,7 @@ void tsdbRefTable(STable *pTable) {
|
||||||
|
|
||||||
void tsdbUnRefTable(STable *pTable) {
|
void tsdbUnRefTable(STable *pTable) {
|
||||||
int32_t ref = T_REF_DEC(pTable);
|
int32_t ref = T_REF_DEC(pTable);
|
||||||
tsdbTrace("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref);
|
tsdbDebug("unref table uid:%"PRIu64", tid:%d, refCount:%d", TABLE_UID(pTable), TABLE_TID(pTable), ref);
|
||||||
|
|
||||||
if (ref == 0) {
|
if (ref == 0) {
|
||||||
// tsdbDebug("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable));
|
// tsdbDebug("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable));
|
||||||
|
@ -587,17 +570,32 @@ void tsdbUnRefTable(STable *pTable) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ------------------ LOCAL FUNCTIONS ------------------
|
void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema, bool insertAct) {
|
||||||
static int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
ASSERT(TABLE_TYPE(pTable) != TSDB_STREAM_TABLE && TABLE_TYPE(pTable) != TSDB_SUPER_TABLE);
|
||||||
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
|
|
||||||
return -1;
|
STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
|
||||||
} else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) {
|
ASSERT(schemaVersion(pSchema) > schemaVersion(pCTable->schema[pCTable->numOfSchemas - 1]));
|
||||||
return 1;
|
|
||||||
|
taosWLockLatch(&(pCTable->latch));
|
||||||
|
if (pCTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) {
|
||||||
|
pCTable->schema[pCTable->numOfSchemas++] = pSchema;
|
||||||
} else {
|
} else {
|
||||||
return 0;
|
ASSERT(pCTable->numOfSchemas == TSDB_MAX_TABLE_SCHEMAS);
|
||||||
|
tdFreeSchema(pCTable->schema[0]);
|
||||||
|
memmove(pCTable->schema, pCTable->schema + 1, sizeof(STSchema *) * (TSDB_MAX_TABLE_SCHEMAS - 1));
|
||||||
|
pCTable->schema[pCTable->numOfSchemas - 1] = pSchema;
|
||||||
|
}
|
||||||
|
taosWUnLockLatch(&(pCTable->latch));
|
||||||
|
|
||||||
|
if (insertAct) {
|
||||||
|
int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pCTable);
|
||||||
|
void *buf = tsdbAllocBytes(pRepo, tlen);
|
||||||
|
ASSERT(buf != NULL);
|
||||||
|
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------ LOCAL FUNCTIONS ------------------
|
||||||
static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
|
static int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
|
STsdbRepo *pRepo = (STsdbRepo *)pHandle;
|
||||||
STable * pTable = NULL;
|
STable * pTable = NULL;
|
||||||
|
@ -809,14 +807,15 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
|
if (TABLE_TYPE(pTable) != TSDB_CHILD_TABLE) {
|
||||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||||
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
|
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
|
||||||
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
|
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
if (addIdx && tsdbUnlockRepoMeta(pRepo) < 0) return -1;
|
||||||
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
|
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE && addIdx) {
|
||||||
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql, tsdbGetTableSchema(pTable));
|
pTable->cqhandle = (*pRepo->appH.cqCreateFunc)(pRepo->appH.cqH, TABLE_UID(pTable), TABLE_TID(pTable), pTable->sql,
|
||||||
|
tsdbGetTableSchemaImpl(pTable, false, false, -1));
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
tsdbTrace("vgId:%d table %s tid %d uid %" PRIu64 " is added to meta", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||||
|
@ -836,7 +835,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
|
||||||
STable * tTable = NULL;
|
STable * tTable = NULL;
|
||||||
STsdbCfg * pCfg = &(pRepo->config);
|
STsdbCfg * pCfg = &(pRepo->config);
|
||||||
|
|
||||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||||
int maxCols = schemaNCols(pSchema);
|
int maxCols = schemaNCols(pSchema);
|
||||||
int maxRowBytes = schemaTLen(pSchema);
|
int maxRowBytes = schemaTLen(pSchema);
|
||||||
|
|
||||||
|
@ -870,7 +869,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
|
||||||
for (int i = 0; i < pCfg->maxTables; i++) {
|
for (int i = 0; i < pCfg->maxTables; i++) {
|
||||||
STable *pTable = pMeta->tables[i];
|
STable *pTable = pMeta->tables[i];
|
||||||
if (pTable != NULL) {
|
if (pTable != NULL) {
|
||||||
pSchema = tsdbGetTableSchema(pTable);
|
pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||||
maxCols = MAX(maxCols, schemaNCols(pSchema));
|
maxCols = MAX(maxCols, schemaNCols(pSchema));
|
||||||
maxRowBytes = MAX(maxRowBytes, schemaTLen(pSchema));
|
maxRowBytes = MAX(maxRowBytes, schemaTLen(pSchema));
|
||||||
}
|
}
|
||||||
|
|
|
@ -217,7 +217,7 @@ void tsdbSetHelperTable(SRWHelper *pHelper, STable *pTable, STsdbRepo *pRepo) {
|
||||||
|
|
||||||
pHelper->tableInfo.tid = pTable->tableId.tid;
|
pHelper->tableInfo.tid = pTable->tableId.tid;
|
||||||
pHelper->tableInfo.uid = pTable->tableId.uid;
|
pHelper->tableInfo.uid = pTable->tableId.uid;
|
||||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||||
pHelper->tableInfo.sversion = schemaVersion(pSchema);
|
pHelper->tableInfo.sversion = schemaVersion(pSchema);
|
||||||
|
|
||||||
tdInitDataCols(pHelper->pDataCols[0], pSchema);
|
tdInitDataCols(pHelper->pDataCols[0], pSchema);
|
||||||
|
|
Loading…
Reference in New Issue