commit
47d6e311c5
|
@ -208,6 +208,18 @@ typedef struct {
|
|||
} SFileGroupIter;
|
||||
|
||||
// ------------------ tsdbMain.c
|
||||
typedef struct {
|
||||
int32_t totalLen;
|
||||
int32_t len;
|
||||
SDataRow row;
|
||||
} SSubmitBlkIter;
|
||||
|
||||
typedef struct {
|
||||
int32_t totalLen;
|
||||
int32_t len;
|
||||
void * pMsg;
|
||||
} SSubmitMsgIter;
|
||||
|
||||
typedef struct {
|
||||
int8_t state;
|
||||
|
||||
|
@ -430,7 +442,6 @@ void tsdbCloseBufPool(STsdbRepo* pRepo);
|
|||
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
|
||||
|
||||
// ------------------ tsdbMemTable.c
|
||||
int tsdbUpdateRowInMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
|
||||
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
||||
|
|
|
@ -516,7 +516,7 @@ void tsdbGetFileInfoImpl(char *fname, uint32_t *magic, int64_t *size) {
|
|||
SFile file;
|
||||
SFile * pFile = &file;
|
||||
|
||||
strncpy(pFile->fname, fname, TSDB_FILENAME_LEN);
|
||||
strncpy(pFile->fname, fname, TSDB_FILENAME_LEN - 1);
|
||||
pFile->fd = -1;
|
||||
|
||||
if (tsdbOpenFile(pFile, O_RDONLY) < 0) goto _err;
|
||||
|
|
|
@ -32,18 +32,6 @@
|
|||
#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP
|
||||
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
|
||||
|
||||
typedef struct {
|
||||
int32_t totalLen;
|
||||
int32_t len;
|
||||
SDataRow row;
|
||||
} SSubmitBlkIter;
|
||||
|
||||
typedef struct {
|
||||
int32_t totalLen;
|
||||
int32_t len;
|
||||
void * pMsg;
|
||||
} SSubmitMsgIter;
|
||||
|
||||
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
|
||||
static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg);
|
||||
static int32_t tsdbUnsetRepoEnv(char *rootDir);
|
||||
|
@ -52,20 +40,13 @@ static int tsdbLoadConfig(char *rootDir, STsdbCfg *pCfg);
|
|||
static char * tsdbGetCfgFname(char *rootDir);
|
||||
static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg);
|
||||
static void tsdbFreeRepo(STsdbRepo *pRepo);
|
||||
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
|
||||
static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows);
|
||||
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
|
||||
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
|
||||
static int tsdbRestoreInfo(STsdbRepo *pRepo);
|
||||
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
|
||||
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
|
||||
static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
|
||||
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
|
||||
static int keyFGroupCompFunc(const void *key, const void *fgroup);
|
||||
static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
|
||||
static void * tsdbDecodeCfg(void *buf, STsdbCfg *pCfg);
|
||||
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
|
||||
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
|
||||
static void tsdbStartStream(STsdbRepo *pRepo);
|
||||
static void tsdbStopStream(STsdbRepo *pRepo);
|
||||
|
||||
|
@ -177,40 +158,6 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
|
|||
tsdbDebug("vgId:%d repository is closed", vgId);
|
||||
}
|
||||
|
||||
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
|
||||
STsdbRepo * pRepo = (STsdbRepo *)repo;
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
|
||||
if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) {
|
||||
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
|
||||
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) {
|
||||
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
SSubmitBlk *pBlock = NULL;
|
||||
int32_t affectedrows = 0;
|
||||
|
||||
TSKEY now = taosGetTimestamp(pRepo->config.precision);
|
||||
while (true) {
|
||||
tsdbGetSubmitMsgNext(&msgIter, &pBlock);
|
||||
if (pBlock == NULL) break;
|
||||
if (tsdbInsertDataToTable(pRepo, pBlock, now, &affectedrows) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
|
||||
|
||||
if (tsdbCheckCommit(pRepo) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
// STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
@ -735,93 +682,6 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
|
|||
}
|
||||
}
|
||||
|
||||
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pIter->totalLen = pMsg->length;
|
||||
pIter->len = 0;
|
||||
pIter->pMsg = pMsg;
|
||||
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
|
||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) {
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
int64_t points = 0;
|
||||
|
||||
ASSERT(pBlock->tid < pMeta->maxTables);
|
||||
STable *pTable = pMeta->tables[pBlock->tid];
|
||||
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
|
||||
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
SDataRow row = NULL;
|
||||
|
||||
TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep;
|
||||
TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
|
||||
|
||||
tsdbInitSubmitBlkIter(pBlock, &blkIter);
|
||||
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) {
|
||||
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
|
||||
" maxKey %" PRId64,
|
||||
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey);
|
||||
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbUpdateRowInMem(pRepo, row, pTable) < 0) return -1;
|
||||
|
||||
(*affectedrows)++;
|
||||
points++;
|
||||
}
|
||||
|
||||
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
|
||||
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
|
||||
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
||||
if (pIter->len == 0) {
|
||||
pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE;
|
||||
} else {
|
||||
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
||||
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
|
||||
}
|
||||
|
||||
if (pIter->len > pIter->totalLen) {
|
||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||
*pPBlock = NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
*pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
||||
SDataRow row = pIter->row;
|
||||
if (row == NULL) return NULL;
|
||||
|
||||
pIter->len += dataRowLen(row);
|
||||
if (pIter->len >= pIter->totalLen) {
|
||||
pIter->row = NULL;
|
||||
} else {
|
||||
pIter->row = (char *)row + dataRowLen(row);
|
||||
}
|
||||
|
||||
return row;
|
||||
}
|
||||
|
||||
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
|
@ -855,14 +715,6 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
||||
if (pBlock->dataLen <= 0) return -1;
|
||||
pIter->totalLen = pBlock->dataLen;
|
||||
pIter->len = 0;
|
||||
pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) {
|
||||
int8_t ocompression = pRepo->config.compression;
|
||||
pRepo->config.compression = compression;
|
||||
|
@ -959,134 +811,6 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
|
|||
return buf;
|
||||
}
|
||||
|
||||
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
|
||||
ASSERT(pTable != NULL);
|
||||
|
||||
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||
int sversion = schemaVersion(pSchema);
|
||||
|
||||
if (pBlock->sversion == sversion) {
|
||||
return 0;
|
||||
} else {
|
||||
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // stream table is not allowed to change schema
|
||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlock->sversion > sversion) { // may need to update table schema
|
||||
if (pBlock->schemaLen > 0) {
|
||||
tsdbDebug(
|
||||
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, update...",
|
||||
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
|
||||
ASSERT(pBlock->schemaLen % sizeof(STColumn) == 0);
|
||||
int numOfCols = pBlock->schemaLen / sizeof(STColumn);
|
||||
STColumn *pTCol = (STColumn *)pBlock->data;
|
||||
|
||||
STSchemaBuilder schemaBuilder = {0};
|
||||
if (tdInitTSchemaBuilder(&schemaBuilder, pBlock->sversion) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||
tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < numOfCols; i++) {
|
||||
if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, htons(pTCol[i].colId), htons(pTCol[i].bytes)) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||
tstrerror(terrno));
|
||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder);
|
||||
if (pNSchema == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
tsdbUpdateTableSchema(pRepo, pTable, pNSchema, true);
|
||||
} else {
|
||||
tsdbDebug(
|
||||
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, reconfigure...",
|
||||
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
|
||||
terrno = TSDB_CODE_TDB_TABLE_RECONFIGURE;
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
ASSERT(pBlock->sversion >= 0);
|
||||
if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) {
|
||||
tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo),
|
||||
pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable));
|
||||
}
|
||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
|
||||
ASSERT(pMsg != NULL);
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SSubmitBlk * pBlock = NULL;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
pMsg->length = htonl(pMsg->length);
|
||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||
|
||||
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
||||
while (true) {
|
||||
if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||
if (pBlock == NULL) break;
|
||||
|
||||
pBlock->uid = htobe64(pBlock->uid);
|
||||
pBlock->tid = htonl(pBlock->tid);
|
||||
pBlock->sversion = htonl(pBlock->sversion);
|
||||
pBlock->dataLen = htonl(pBlock->dataLen);
|
||||
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||
pBlock->numOfRows = htons(pBlock->numOfRows);
|
||||
|
||||
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
|
||||
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
|
||||
pBlock->tid);
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
return -1;
|
||||
}
|
||||
|
||||
STable *pTable = pMeta->tables[pBlock->tid];
|
||||
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
|
||||
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
|
||||
pBlock->tid);
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||
tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
|
||||
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Check schema version and update schema if needed
|
||||
if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) {
|
||||
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
|
||||
continue;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks) {
|
||||
// TODO
|
||||
// STsdbCache *pCache = pRepo->tsdbCache;
|
||||
|
|
|
@ -18,7 +18,6 @@
|
|||
|
||||
#define TSDB_DATA_SKIPLIST_LEVEL 5
|
||||
|
||||
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes);
|
||||
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
|
||||
static void tsdbFreeMemTable(SMemTable *pMemTable);
|
||||
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
|
||||
|
@ -32,103 +31,49 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
|
|||
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
|
||||
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
|
||||
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row);
|
||||
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
|
||||
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
|
||||
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
|
||||
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows);
|
||||
static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow);
|
||||
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
|
||||
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
|
||||
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
|
||||
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter);
|
||||
static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter);
|
||||
|
||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||
int tsdbUpdateRowInMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
TKEY tkey = dataRowTKey(row);
|
||||
TSKEY key = dataRowKey(row);
|
||||
SMemTable * pMemTable = pRepo->mem;
|
||||
STableData *pTableData = NULL;
|
||||
bool isRowDelete = TKEY_IS_DELETED(tkey);
|
||||
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
|
||||
TSKEY now);
|
||||
|
||||
if (isRowDelete) {
|
||||
if (!pCfg->update) {
|
||||
tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo));
|
||||
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
||||
return -1;
|
||||
int32_t tsdbInsertData(TSDB_REPO_T *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp) {
|
||||
STsdbRepo * pRepo = (STsdbRepo *)repo;
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SSubmitBlk * pBlock = NULL;
|
||||
int32_t affectedrows = 0;
|
||||
|
||||
if (tsdbScanAndConvertSubmitMsg(pRepo, pMsg) < 0) {
|
||||
if (terrno != TSDB_CODE_TDB_TABLE_RECONFIGURE) {
|
||||
tsdbError("vgId:%d failed to insert data since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||
}
|
||||
|
||||
if (key > TABLE_LASTKEY(pTable)) {
|
||||
tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64,
|
||||
REPO_ID(pRepo), key, TABLE_LASTKEY(pTable));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
|
||||
if (pRow == NULL) {
|
||||
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
|
||||
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
dataRowCpy(pRow, row);
|
||||
|
||||
// Operations above may change pRepo->mem, retake those values
|
||||
ASSERT(pRepo->mem != NULL);
|
||||
pMemTable = pRepo->mem;
|
||||
|
||||
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
|
||||
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
|
||||
tsdbFreeBytes(pRepo, pRow, dataRowLen(row));
|
||||
tsdbInitSubmitMsgIter(pMsg, &msgIter);
|
||||
while (true) {
|
||||
tsdbGetSubmitMsgNext(&msgIter, &pBlock);
|
||||
if (pBlock == NULL) break;
|
||||
if (tsdbInsertDataToTable(pRepo, pBlock, &affectedrows) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
pTableData = pMemTable->tData[TABLE_TID(pTable)];
|
||||
|
||||
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
|
||||
if (pTableData != NULL) {
|
||||
taosWLockLatch(&(pMemTable->latch));
|
||||
pMemTable->tData[TABLE_TID(pTable)] = NULL;
|
||||
tsdbFreeTableData(pTableData);
|
||||
taosWUnLockLatch(&(pMemTable->latch));
|
||||
}
|
||||
|
||||
pTableData = tsdbNewTableData(pCfg, pTable);
|
||||
if (pTableData == NULL) {
|
||||
tsdbError("vgId:%d failed to insert row with key %" PRId64
|
||||
" to table %s while create new table data object since %s",
|
||||
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), tstrerror(terrno));
|
||||
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
|
||||
}
|
||||
|
||||
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
|
||||
|
||||
int64_t oldSize = SL_SIZE(pTableData->pData);
|
||||
if (tSkipListPut(pTableData->pData, pRow) == NULL) {
|
||||
tsdbFreeBytes(pRepo, (void *)pRow, dataRowLen(row));
|
||||
} else {
|
||||
int64_t deltaSize = SL_SIZE(pTableData->pData) - oldSize;
|
||||
if (isRowDelete) {
|
||||
if (TABLE_LASTKEY(pTable) == key) {
|
||||
// TODO: need to update table last key here (may from file)
|
||||
}
|
||||
} else {
|
||||
if (TABLE_LASTKEY(pTable) < key) TABLE_LASTKEY(pTable) = key;
|
||||
}
|
||||
|
||||
if (pMemTable->keyFirst > key) pMemTable->keyFirst = key;
|
||||
if (pMemTable->keyLast < key) pMemTable->keyLast = key;
|
||||
pMemTable->numOfRows += deltaSize;
|
||||
|
||||
if (pTableData->keyFirst > key) pTableData->keyFirst = key;
|
||||
if (pTableData->keyLast < key) pTableData->keyLast = key;
|
||||
pTableData->numOfRows += deltaSize;
|
||||
}
|
||||
|
||||
tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
|
||||
isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
|
||||
key);
|
||||
if (pRsp != NULL) pRsp->affectedRows = htonl(affectedrows);
|
||||
|
||||
if (tsdbCheckCommit(pRepo) < 0) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||
int tsdbRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
|
||||
if (pMemTable == NULL) return 0;
|
||||
int ref = T_REF_INC(pMemTable);
|
||||
|
@ -152,7 +97,7 @@ int tsdbUnRefMemTable(STsdbRepo *pRepo, SMemTable *pMemTable) {
|
|||
}
|
||||
int code = pthread_cond_signal(&pBufPool->poolNotEmpty);
|
||||
if (code != 0) {
|
||||
tsdbUnlockRepo(pRepo);
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
tsdbError("vgId:%d failed to signal pool not empty since %s", REPO_ID(pRepo), strerror(code));
|
||||
terrno = TAOS_SYSTEM_ERROR(code);
|
||||
return -1;
|
||||
|
@ -189,6 +134,8 @@ int tsdbTakeMemSnapshot(STsdbRepo *pRepo, SMemTable **pMem, SMemTable **pIMem) {
|
|||
}
|
||||
|
||||
void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem) {
|
||||
tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem);
|
||||
|
||||
if (pMem != NULL) {
|
||||
taosRUnLockLatch(&(pMem->latch));
|
||||
tsdbUnRefMemTable(pRepo, pMem);
|
||||
|
@ -197,8 +144,6 @@ void tsdbUnTakeMemSnapShot(STsdbRepo *pRepo, SMemTable *pMem, SMemTable *pIMem)
|
|||
if (pIMem != NULL) {
|
||||
tsdbUnRefMemTable(pRepo, pIMem);
|
||||
}
|
||||
|
||||
tsdbDebug("vgId:%d untake memory snapshot, pMem %p pIMem %p", REPO_ID(pRepo), pMem, pIMem);
|
||||
}
|
||||
|
||||
void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
||||
|
@ -230,6 +175,10 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
|||
ASSERT(pRepo->mem->extraBuffList != NULL);
|
||||
SListNode *pNode = (SListNode *)malloc(sizeof(SListNode) + bytes);
|
||||
if (pNode == NULL) {
|
||||
if (listNEles(pRepo->mem->extraBuffList) == 0) {
|
||||
tdListFree(pRepo->mem->extraBuffList);
|
||||
pRepo->mem->extraBuffList = NULL;
|
||||
}
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
@ -260,18 +209,18 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
|||
}
|
||||
|
||||
int tsdbAsyncCommit(STsdbRepo *pRepo) {
|
||||
if (pRepo->mem == NULL) return 0;
|
||||
|
||||
SMemTable *pIMem = pRepo->imem;
|
||||
|
||||
if (pRepo->mem != NULL) {
|
||||
sem_wait(&(pRepo->readyToCommit));
|
||||
sem_wait(&(pRepo->readyToCommit));
|
||||
|
||||
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
|
||||
if (tsdbLockRepo(pRepo) < 0) return -1;
|
||||
pRepo->imem = pRepo->mem;
|
||||
pRepo->mem = NULL;
|
||||
tsdbScheduleCommit(pRepo);
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
}
|
||||
if (pRepo->appH.notifyStatus) pRepo->appH.notifyStatus(pRepo->appH.appH, TSDB_STATUS_COMMIT_START);
|
||||
if (tsdbLockRepo(pRepo) < 0) return -1;
|
||||
pRepo->imem = pRepo->mem;
|
||||
pRepo->mem = NULL;
|
||||
tsdbScheduleCommit(pRepo);
|
||||
if (tsdbUnlockRepo(pRepo) < 0) return -1;
|
||||
|
||||
if (tsdbUnRefMemTable(pRepo, pIMem) < 0) return -1;
|
||||
|
||||
|
@ -469,25 +418,6 @@ _exit:
|
|||
}
|
||||
|
||||
// ---------------- LOCAL FUNCTIONS ----------------
|
||||
static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
|
||||
ASSERT(pRepo->mem != NULL);
|
||||
if (pRepo->mem->extraBuffList == NULL) {
|
||||
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||
ASSERT(pBufBlock != NULL);
|
||||
pBufBlock->offset -= bytes;
|
||||
pBufBlock->remain += bytes;
|
||||
ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
|
||||
tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
|
||||
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
||||
} else {
|
||||
SListNode *pNode = (SListNode *)POINTER_SHIFT(ptr, -(int)(sizeof(SListNode)));
|
||||
ASSERT(listTail(pRepo->mem->extraBuffList) == pNode);
|
||||
tdListPopNode(pRepo->mem->extraBuffList, pNode);
|
||||
free(pNode);
|
||||
tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes);
|
||||
}
|
||||
}
|
||||
|
||||
static SMemTable* tsdbNewMemTable(STsdbRepo *pRepo) {
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
|
||||
|
@ -848,4 +778,400 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
|
|||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
||||
if (pBlock->dataLen <= 0) return -1;
|
||||
pIter->totalLen = pBlock->dataLen;
|
||||
pIter->len = 0;
|
||||
pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
||||
SDataRow row = pIter->row;
|
||||
if (row == NULL) return NULL;
|
||||
|
||||
pIter->len += dataRowLen(row);
|
||||
if (pIter->len >= pIter->totalLen) {
|
||||
pIter->row = NULL;
|
||||
} else {
|
||||
pIter->row = (char *)row + dataRowLen(row);
|
||||
}
|
||||
|
||||
return row;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
|
||||
TSKEY now) {
|
||||
if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) {
|
||||
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
|
||||
" maxKey %" PRId64 " row key %" PRId64,
|
||||
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey,
|
||||
dataRowKey(row));
|
||||
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg) {
|
||||
ASSERT(pMsg != NULL);
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
SSubmitMsgIter msgIter = {0};
|
||||
SSubmitBlk * pBlock = NULL;
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
SDataRow row = NULL;
|
||||
TSKEY now = taosGetTimestamp(pRepo->config.precision);
|
||||
TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep;
|
||||
TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
|
||||
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
pMsg->length = htonl(pMsg->length);
|
||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||
|
||||
if (tsdbInitSubmitMsgIter(pMsg, &msgIter) < 0) return -1;
|
||||
while (true) {
|
||||
if (tsdbGetSubmitMsgNext(&msgIter, &pBlock) < 0) return -1;
|
||||
if (pBlock == NULL) break;
|
||||
|
||||
pBlock->uid = htobe64(pBlock->uid);
|
||||
pBlock->tid = htonl(pBlock->tid);
|
||||
pBlock->sversion = htonl(pBlock->sversion);
|
||||
pBlock->dataLen = htonl(pBlock->dataLen);
|
||||
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||
pBlock->numOfRows = htons(pBlock->numOfRows);
|
||||
|
||||
if (pBlock->tid <= 0 || pBlock->tid >= pMeta->maxTables) {
|
||||
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
|
||||
pBlock->tid);
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
return -1;
|
||||
}
|
||||
|
||||
STable *pTable = pMeta->tables[pBlock->tid];
|
||||
if (pTable == NULL || TABLE_UID(pTable) != pBlock->uid) {
|
||||
tsdbError("vgId:%d failed to get table to insert data, uid %" PRIu64 " tid %d", REPO_ID(pRepo), pBlock->uid,
|
||||
pBlock->tid);
|
||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (TABLE_TYPE(pTable) == TSDB_SUPER_TABLE) {
|
||||
tsdbError("vgId:%d invalid action trying to insert a super table %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable));
|
||||
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Check schema version and update schema if needed
|
||||
if (tsdbCheckTableSchema(pRepo, pBlock, pTable) < 0) {
|
||||
if (terrno == TSDB_CODE_TDB_TABLE_RECONFIGURE) {
|
||||
continue;
|
||||
} else {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
tsdbInitSubmitBlkIter(pBlock, &blkIter);
|
||||
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
if (tsdbCheckRowRange(pRepo, pTable, row, minKey, maxKey, now) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (terrno != TSDB_CODE_SUCCESS) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows) {
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
int64_t points = 0;
|
||||
STable * pTable = NULL;
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
SDataRow row = NULL;
|
||||
void ** rows = NULL;
|
||||
int rowCounter = 0;
|
||||
|
||||
ASSERT(pBlock->tid < pMeta->maxTables);
|
||||
pTable = pMeta->tables[pBlock->tid];
|
||||
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
|
||||
|
||||
rows = (void **)calloc(pBlock->numOfRows, sizeof(void *));
|
||||
if (rows == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
tsdbInitSubmitBlkIter(pBlock, &blkIter);
|
||||
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
if (tsdbCopyRowToMem(pRepo, row, pTable, &(rows[rowCounter])) < 0) {
|
||||
tsdbFreeRows(pRepo, rows, rowCounter);
|
||||
goto _err;
|
||||
}
|
||||
|
||||
(*affectedrows)++;
|
||||
points++;
|
||||
|
||||
if (rows[rowCounter] != NULL) {
|
||||
rowCounter++;
|
||||
}
|
||||
}
|
||||
|
||||
if (tsdbInsertDataToTableImpl(pRepo, pTable, rows, rowCounter) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
|
||||
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
|
||||
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
|
||||
|
||||
free(rows);
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
free(rows);
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void **ppRow) {
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
TKEY tkey = dataRowTKey(row);
|
||||
TSKEY key = dataRowKey(row);
|
||||
bool isRowDelete = TKEY_IS_DELETED(tkey);
|
||||
|
||||
if (isRowDelete) {
|
||||
if (!pCfg->update) {
|
||||
tsdbWarn("vgId:%d vnode is not allowed to update but try to delete a data row", REPO_ID(pRepo));
|
||||
terrno = TSDB_CODE_TDB_INVALID_ACTION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (key > TABLE_LASTKEY(pTable)) {
|
||||
tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64,
|
||||
REPO_ID(pRepo), key, TABLE_LASTKEY(pTable));
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
void *pRow = tsdbAllocBytes(pRepo, dataRowLen(row));
|
||||
if (pRow == NULL) {
|
||||
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
|
||||
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), dataRowLen(row), tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
dataRowCpy(pRow, row);
|
||||
ppRow[0] = pRow;
|
||||
|
||||
tsdbTrace("vgId:%d a row is %s table %s tid %d uid %" PRIu64 " key %" PRIu64, REPO_ID(pRepo),
|
||||
isRowDelete ? "deleted from" : "updated in", TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable),
|
||||
key);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
|
||||
if (pMsg == NULL) {
|
||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||
return -1;
|
||||
}
|
||||
|
||||
pIter->totalLen = pMsg->length;
|
||||
pIter->len = 0;
|
||||
pIter->pMsg = pMsg;
|
||||
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
|
||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
||||
if (pIter->len == 0) {
|
||||
pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE;
|
||||
} else {
|
||||
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
||||
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
|
||||
}
|
||||
|
||||
if (pIter->len > pIter->totalLen) {
|
||||
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
|
||||
*pPBlock = NULL;
|
||||
return -1;
|
||||
}
|
||||
|
||||
*pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
|
||||
ASSERT(pTable != NULL);
|
||||
|
||||
STSchema *pSchema = tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||
int sversion = schemaVersion(pSchema);
|
||||
|
||||
if (pBlock->sversion == sversion) {
|
||||
return 0;
|
||||
} else {
|
||||
if (TABLE_TYPE(pTable) == TSDB_STREAM_TABLE) { // stream table is not allowed to change schema
|
||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
if (pBlock->sversion > sversion) { // may need to update table schema
|
||||
if (pBlock->schemaLen > 0) {
|
||||
tsdbDebug(
|
||||
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, update...",
|
||||
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
|
||||
ASSERT(pBlock->schemaLen % sizeof(STColumn) == 0);
|
||||
int numOfCols = pBlock->schemaLen / sizeof(STColumn);
|
||||
STColumn *pTCol = (STColumn *)pBlock->data;
|
||||
|
||||
STSchemaBuilder schemaBuilder = {0};
|
||||
if (tdInitTSchemaBuilder(&schemaBuilder, pBlock->sversion) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||
tstrerror(terrno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int i = 0; i < numOfCols; i++) {
|
||||
if (tdAddColToSchema(&schemaBuilder, pTCol[i].type, htons(pTCol[i].colId), htons(pTCol[i].bytes)) < 0) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tsdbError("vgId:%d failed to update schema of table %s since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||
tstrerror(terrno));
|
||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
STSchema *pNSchema = tdGetSchemaFromBuilder(&schemaBuilder);
|
||||
if (pNSchema == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
return -1;
|
||||
}
|
||||
|
||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
tsdbUpdateTableSchema(pRepo, pTable, pNSchema, true);
|
||||
} else {
|
||||
tsdbDebug(
|
||||
"vgId:%d table %s tid %d uid %" PRIu64 " schema version %d is out of data, client version %d, reconfigure...",
|
||||
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), sversion, pBlock->sversion);
|
||||
terrno = TSDB_CODE_TDB_TABLE_RECONFIGURE;
|
||||
return -1;
|
||||
}
|
||||
} else {
|
||||
ASSERT(pBlock->sversion >= 0);
|
||||
if (tsdbGetTableSchemaImpl(pTable, false, false, pBlock->sversion) == NULL) {
|
||||
tsdbError("vgId:%d invalid submit schema version %d to table %s tid %d from client", REPO_ID(pRepo),
|
||||
pBlock->sversion, TABLE_CHAR_NAME(pTable), TABLE_TID(pTable));
|
||||
}
|
||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter) {
|
||||
if (rowCounter < 1) return 0;
|
||||
|
||||
SMemTable * pMemTable = NULL;
|
||||
STableData *pTableData = NULL;
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
STsdbCfg * pCfg = &(pRepo->config);
|
||||
|
||||
ASSERT(pRepo->mem != NULL);
|
||||
pMemTable = pRepo->mem;
|
||||
|
||||
if (TABLE_TID(pTable) >= pMemTable->maxTables) {
|
||||
if (tsdbAdjustMemMaxTables(pMemTable, pMeta->maxTables) < 0) {
|
||||
tsdbFreeRows(pRepo, rows, rowCounter);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
pTableData = pMemTable->tData[TABLE_TID(pTable)];
|
||||
|
||||
if (pTableData == NULL || pTableData->uid != TABLE_UID(pTable)) {
|
||||
if (pTableData != NULL) {
|
||||
taosWLockLatch(&(pMemTable->latch));
|
||||
pMemTable->tData[TABLE_TID(pTable)] = NULL;
|
||||
tsdbFreeTableData(pTableData);
|
||||
taosWUnLockLatch(&(pMemTable->latch));
|
||||
}
|
||||
|
||||
pTableData = tsdbNewTableData(pCfg, pTable);
|
||||
if (pTableData == NULL) {
|
||||
tsdbError("vgId:%d failed to insert data to table %s uid %" PRId64 " tid %d since %s", REPO_ID(pRepo),
|
||||
TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable), tstrerror(terrno));
|
||||
tsdbFreeRows(pRepo, rows, rowCounter);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pRepo->mem->tData[TABLE_TID(pTable)] = pTableData;
|
||||
}
|
||||
|
||||
ASSERT((pTableData != NULL) && pTableData->uid == TABLE_UID(pTable));
|
||||
|
||||
int64_t osize = SL_SIZE(pTableData->pData);
|
||||
tSkipListPutBatch(pTableData->pData, rows, rowCounter);
|
||||
int64_t dsize = SL_SIZE(pTableData->pData) - osize;
|
||||
|
||||
if (pMemTable->keyFirst > dataRowKey(rows[0])) pMemTable->keyFirst = dataRowKey(rows[0]);
|
||||
if (pMemTable->keyLast < dataRowKey(rows[rowCounter - 1])) pMemTable->keyLast = dataRowKey(rows[rowCounter - 1]);
|
||||
pMemTable->numOfRows += dsize;
|
||||
|
||||
if (pTableData->keyFirst > dataRowKey(rows[0])) pTableData->keyFirst = dataRowKey(rows[0]);
|
||||
if (pTableData->keyLast < dataRowKey(rows[rowCounter - 1])) pTableData->keyLast = dataRowKey(rows[rowCounter - 1]);
|
||||
pTableData->numOfRows += dsize;
|
||||
|
||||
// TODO: impl delete row thing
|
||||
if (TABLE_LASTKEY(pTable) < dataRowKey(rows[rowCounter-1])) TABLE_LASTKEY(pTable) = dataRowKey(rows[rowCounter-1]);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
|
||||
ASSERT(pRepo->mem != NULL);
|
||||
STsdbBufPool *pBufPool = pRepo->pPool;
|
||||
|
||||
for (int i = rowCounter - 1; i >= 0; --i) {
|
||||
SDataRow row = (SDataRow)rows[i];
|
||||
int bytes = (int)dataRowLen(row);
|
||||
|
||||
if (pRepo->mem->extraBuffList == NULL) {
|
||||
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||
ASSERT(pBufBlock != NULL && pBufBlock->offset >= bytes);
|
||||
|
||||
pBufBlock->offset -= bytes;
|
||||
pBufBlock->remain += bytes;
|
||||
ASSERT(row == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
|
||||
tsdbTrace("vgId:%d free %d bytes to TSDB buffer pool, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
|
||||
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
||||
|
||||
if (pBufBlock->offset == 0) { // return the block to buffer pool
|
||||
tsdbLockRepo(pRepo);
|
||||
SListNode *pNode = tdListPopTail(pRepo->mem->bufBlockList);
|
||||
tdListPrependNode(pBufPool->bufBlockList, pNode);
|
||||
tsdbUnlockRepo(pRepo);
|
||||
}
|
||||
} else {
|
||||
ASSERT(listNEles(pRepo->mem->extraBuffList) > 0);
|
||||
SListNode *pNode = tdListPopTail(pRepo->mem->extraBuffList);
|
||||
ASSERT(row == pNode->data);
|
||||
free(pNode);
|
||||
tsdbTrace("vgId:%d free %d bytes to SYSTEM buffer pool", REPO_ID(pRepo), bytes);
|
||||
|
||||
if (listNEles(pRepo->mem->extraBuffList) == 0) {
|
||||
tdListFree(pRepo->mem->extraBuffList);
|
||||
pRepo->mem->extraBuffList = NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1595,7 +1595,7 @@ static int tsdbProcessMergeCommit(SRWHelper *pHelper, SCommitIter *pCommitIter,
|
|||
tblkIdx++;
|
||||
} else if (oBlock.numOfRows + pMergeInfo->rowsInserted - pMergeInfo->rowsDeleteSucceed == 0) {
|
||||
// Delete the block and do some stuff
|
||||
ASSERT(pMergeInfo->keyFirst == INT64_MAX && pMergeInfo->keyFirst == INT64_MIN);
|
||||
// ASSERT(pMergeInfo->keyFirst == INT64_MAX && pMergeInfo->keyFirst == INT64_MIN);
|
||||
if (tsdbDeleteSuperBlock(pHelper, tblkIdx) < 0) return -1;
|
||||
*pCommitIter->pIter = slIter;
|
||||
if (oBlock.last && pHelper->hasOldLastBlock) pHelper->hasOldLastBlock = false;
|
||||
|
|
|
@ -131,6 +131,7 @@ SSkipList *tSkipListCreate(uint8_t maxLevel, uint8_t keyType, uint16_t keyLen, _
|
|||
__sl_key_fn_t fn);
|
||||
void tSkipListDestroy(SSkipList *pSkipList);
|
||||
SSkipListNode * tSkipListPut(SSkipList *pSkipList, void *pData);
|
||||
void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata);
|
||||
SArray * tSkipListGet(SSkipList *pSkipList, SSkipListKey pKey);
|
||||
void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel);
|
||||
SSkipListIterator *tSkipListCreateIter(SSkipList *pSkipList);
|
||||
|
|
|
@ -24,10 +24,12 @@ static SSkipListNode * getPriorNode(SSkipList *pSkipList, const char *val, in
|
|||
static void tSkipListRemoveNodeImpl(SSkipList *pSkipList, SSkipListNode *pNode);
|
||||
static void tSkipListCorrectLevel(SSkipList *pSkipList);
|
||||
static SSkipListIterator *doCreateSkipListIterator(SSkipList *pSkipList, int32_t order);
|
||||
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode);
|
||||
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData);
|
||||
static SSkipListNode * tSkipListNewNode(uint8_t level);
|
||||
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward);
|
||||
static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward, void *pData);
|
||||
static SSkipListNode *tSkipListNewNode(uint8_t level);
|
||||
#define tSkipListFreeNode(n) tfree((n))
|
||||
static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward,
|
||||
bool hasDup);
|
||||
|
||||
static FORCE_INLINE int tSkipListWLock(SSkipList *pSkipList);
|
||||
static FORCE_INLINE int tSkipListRLock(SSkipList *pSkipList);
|
||||
|
@ -109,32 +111,87 @@ SSkipListNode *tSkipListPut(SSkipList *pSkipList, void *pData) {
|
|||
if (pSkipList == NULL || pData == NULL) return NULL;
|
||||
|
||||
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
uint8_t dupMode = SL_DUP_MODE(pSkipList);
|
||||
SSkipListNode *pNode = NULL;
|
||||
|
||||
tSkipListWLock(pSkipList);
|
||||
|
||||
bool hasDup = tSkipListGetPosToPut(pSkipList, backward, pData);
|
||||
|
||||
if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) {
|
||||
if (dupMode == SL_UPDATE_DUP_KEY) {
|
||||
pNode = SL_NODE_GET_BACKWARD_POINTER(backward[0], 0);
|
||||
atomic_store_ptr(&(pNode->pData), pData);
|
||||
}
|
||||
} else {
|
||||
pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList));
|
||||
if (pNode != NULL) {
|
||||
pNode->pData = pData;
|
||||
|
||||
tSkipListDoInsert(pSkipList, backward, pNode);
|
||||
}
|
||||
}
|
||||
pNode = tSkipListPutImpl(pSkipList, pData, backward, false, hasDup);
|
||||
|
||||
tSkipListUnlock(pSkipList);
|
||||
|
||||
return pNode;
|
||||
}
|
||||
|
||||
// Put a batch of data into skiplist. The batch of data must be in ascending order
|
||||
void tSkipListPutBatch(SSkipList *pSkipList, void **ppData, int ndata) {
|
||||
SSkipListNode *backward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
SSkipListNode *forward[MAX_SKIP_LIST_LEVEL] = {0};
|
||||
bool hasDup = false;
|
||||
char * pKey = NULL;
|
||||
char * pDataKey = NULL;
|
||||
int compare = 0;
|
||||
|
||||
tSkipListWLock(pSkipList);
|
||||
|
||||
// backward to put the first data
|
||||
hasDup = tSkipListGetPosToPut(pSkipList, backward, ppData[0]);
|
||||
tSkipListPutImpl(pSkipList, ppData[0], backward, false, hasDup);
|
||||
|
||||
for (int level = 0; level < pSkipList->maxLevel; level++) {
|
||||
forward[level] = SL_NODE_GET_BACKWARD_POINTER(backward[level], level);
|
||||
}
|
||||
|
||||
// forward to put the rest of data
|
||||
for (int idata = 1; idata < ndata; idata++) {
|
||||
pDataKey = pSkipList->keyFn(ppData[idata]);
|
||||
|
||||
// Compare max key
|
||||
pKey = SL_GET_MAX_KEY(pSkipList);
|
||||
compare = pSkipList->comparFn(pDataKey, pKey);
|
||||
if (compare > 0) {
|
||||
for (int i = 0; i < pSkipList->maxLevel; i++) {
|
||||
forward[i] = SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i);
|
||||
}
|
||||
|
||||
hasDup = false;
|
||||
} else {
|
||||
SSkipListNode *px = pSkipList->pHead;
|
||||
for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
|
||||
if (i < pSkipList->level) {
|
||||
// set new px
|
||||
if (forward[i] != pSkipList->pHead) {
|
||||
if (px == pSkipList->pHead ||
|
||||
pSkipList->comparFn(SL_GET_NODE_KEY(pSkipList, forward[i]), SL_GET_NODE_KEY(pSkipList, px)) > 0) {
|
||||
px = forward[i];
|
||||
}
|
||||
}
|
||||
|
||||
SSkipListNode *p = SL_NODE_GET_FORWARD_POINTER(px, i);
|
||||
while (p != pSkipList->pTail) {
|
||||
pKey = SL_GET_NODE_KEY(pSkipList, p);
|
||||
|
||||
compare = pSkipList->comparFn(pKey, pDataKey);
|
||||
if (compare >= 0) {
|
||||
if (compare == 0) hasDup = true;
|
||||
break;
|
||||
} else {
|
||||
px = p;
|
||||
p = SL_NODE_GET_FORWARD_POINTER(px, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
forward[i] = px;
|
||||
}
|
||||
}
|
||||
|
||||
tSkipListPutImpl(pSkipList, ppData[idata], forward, true, hasDup);
|
||||
}
|
||||
|
||||
tSkipListUnlock(pSkipList);
|
||||
}
|
||||
|
||||
uint32_t tSkipListRemove(SSkipList *pSkipList, SSkipListKey key) {
|
||||
uint32_t count = 0;
|
||||
|
||||
|
@ -310,22 +367,25 @@ void tSkipListPrint(SSkipList *pSkipList, int16_t nlevel) {
|
|||
}
|
||||
}
|
||||
|
||||
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **backward, SSkipListNode *pNode) {
|
||||
static void tSkipListDoInsert(SSkipList *pSkipList, SSkipListNode **direction, SSkipListNode *pNode, bool isForward) {
|
||||
for (int32_t i = 0; i < pNode->level; ++i) {
|
||||
if (i >= pSkipList->level) {
|
||||
SL_NODE_GET_FORWARD_POINTER(pNode, i) = pSkipList->pTail;
|
||||
SL_NODE_GET_BACKWARD_POINTER(pNode, i) = pSkipList->pHead;
|
||||
SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i) = pNode;
|
||||
SL_NODE_GET_BACKWARD_POINTER(pSkipList->pTail, i) = pNode;
|
||||
SSkipListNode *x = direction[i];
|
||||
if (isForward) {
|
||||
SL_NODE_GET_BACKWARD_POINTER(pNode, i) = x;
|
||||
|
||||
SSkipListNode *next = SL_NODE_GET_FORWARD_POINTER(x, i);
|
||||
SL_NODE_GET_BACKWARD_POINTER(next, i) = pNode;
|
||||
|
||||
SL_NODE_GET_FORWARD_POINTER(pNode, i) = next;
|
||||
SL_NODE_GET_FORWARD_POINTER(x, i) = pNode;
|
||||
} else {
|
||||
SSkipListNode *x = backward[i];
|
||||
SL_NODE_GET_FORWARD_POINTER(pNode, i) = x;
|
||||
|
||||
SSkipListNode *prev = SL_NODE_GET_BACKWARD_POINTER(x, i);
|
||||
SL_NODE_GET_FORWARD_POINTER(prev, i) = pNode;
|
||||
|
||||
SL_NODE_GET_BACKWARD_POINTER(x, i) = pNode;
|
||||
SL_NODE_GET_BACKWARD_POINTER(pNode, i) = prev;
|
||||
SL_NODE_GET_BACKWARD_POINTER(x, i) = pNode;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -377,7 +437,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
|
|||
char * pDataKey = pSkipList->keyFn(pData);
|
||||
|
||||
if (pSkipList->size == 0) {
|
||||
for (int i = 0; i < pSkipList->level; i++) {
|
||||
for (int i = 0; i < pSkipList->maxLevel; i++) {
|
||||
backward[i] = pSkipList->pTail;
|
||||
}
|
||||
} else {
|
||||
|
@ -387,7 +447,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
|
|||
pKey = SL_GET_MAX_KEY(pSkipList);
|
||||
compare = pSkipList->comparFn(pDataKey, pKey);
|
||||
if (compare >= 0) {
|
||||
for (int i = 0; i < pSkipList->level; i++) {
|
||||
for (int i = 0; i < pSkipList->maxLevel; i++) {
|
||||
backward[i] = pSkipList->pTail;
|
||||
}
|
||||
|
||||
|
@ -398,7 +458,7 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
|
|||
pKey = SL_GET_MIN_KEY(pSkipList);
|
||||
compare = pSkipList->comparFn(pDataKey, pKey);
|
||||
if (compare < 0) {
|
||||
for (int i = 0; i < pSkipList->level; i++) {
|
||||
for (int i = 0; i < pSkipList->maxLevel; i++) {
|
||||
backward[i] = SL_NODE_GET_FORWARD_POINTER(pSkipList->pHead, i);
|
||||
}
|
||||
|
||||
|
@ -406,18 +466,20 @@ static bool tSkipListGetPosToPut(SSkipList *pSkipList, SSkipListNode **backward,
|
|||
}
|
||||
|
||||
SSkipListNode *px = pSkipList->pTail;
|
||||
for (int i = pSkipList->level - 1; i >= 0; --i) {
|
||||
SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(px, i);
|
||||
while (p != pSkipList->pHead) {
|
||||
pKey = SL_GET_NODE_KEY(pSkipList, p);
|
||||
for (int i = pSkipList->maxLevel - 1; i >= 0; --i) {
|
||||
if (i < pSkipList->level) {
|
||||
SSkipListNode *p = SL_NODE_GET_BACKWARD_POINTER(px, i);
|
||||
while (p != pSkipList->pHead) {
|
||||
pKey = SL_GET_NODE_KEY(pSkipList, p);
|
||||
|
||||
compare = pSkipList->comparFn(pKey, pDataKey);
|
||||
if (compare <= 0) {
|
||||
if (compare == 0 && !hasDupKey) hasDupKey = true;
|
||||
break;
|
||||
} else {
|
||||
px = p;
|
||||
p = SL_NODE_GET_BACKWARD_POINTER(px, i);
|
||||
compare = pSkipList->comparFn(pKey, pDataKey);
|
||||
if (compare <= 0) {
|
||||
if (compare == 0 && !hasDupKey) hasDupKey = true;
|
||||
break;
|
||||
} else {
|
||||
px = p;
|
||||
p = SL_NODE_GET_BACKWARD_POINTER(px, i);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -579,6 +641,32 @@ static SSkipListNode *tSkipListNewNode(uint8_t level) {
|
|||
return pNode;
|
||||
}
|
||||
|
||||
static SSkipListNode *tSkipListPutImpl(SSkipList *pSkipList, void *pData, SSkipListNode **direction, bool isForward,
|
||||
bool hasDup) {
|
||||
uint8_t dupMode = SL_DUP_MODE(pSkipList);
|
||||
SSkipListNode *pNode = NULL;
|
||||
|
||||
if (hasDup && (dupMode == SL_DISCARD_DUP_KEY || dupMode == SL_UPDATE_DUP_KEY)) {
|
||||
if (dupMode == SL_UPDATE_DUP_KEY) {
|
||||
if (isForward) {
|
||||
pNode = SL_NODE_GET_FORWARD_POINTER(direction[0], 0);
|
||||
} else {
|
||||
pNode = SL_NODE_GET_BACKWARD_POINTER(direction[0], 0);
|
||||
}
|
||||
atomic_store_ptr(&(pNode->pData), pData);
|
||||
}
|
||||
} else {
|
||||
pNode = tSkipListNewNode(getSkipListRandLevel(pSkipList));
|
||||
if (pNode != NULL) {
|
||||
pNode->pData = pData;
|
||||
|
||||
tSkipListDoInsert(pSkipList, direction, pNode, isForward);
|
||||
}
|
||||
}
|
||||
|
||||
return pNode;
|
||||
}
|
||||
|
||||
// static int32_t tSkipListEndParQuery(SSkipList *pSkipList, SSkipListNode *pStartNode, SSkipListKey *pEndKey,
|
||||
// int32_t cond, SSkipListNode ***pRes) {
|
||||
// pthread_rwlock_rdlock(&pSkipList->lock);
|
||||
|
|
Loading…
Reference in New Issue