simple change
This commit is contained in:
parent
7bbda1b95b
commit
98837549f3
|
@ -208,6 +208,18 @@ typedef struct {
|
|||
} SFileGroupIter;
|
||||
|
||||
// ------------------ tsdbMain.c
|
||||
typedef struct {
|
||||
int32_t totalLen;
|
||||
int32_t len;
|
||||
SDataRow row;
|
||||
} SSubmitBlkIter;
|
||||
|
||||
typedef struct {
|
||||
int32_t totalLen;
|
||||
int32_t len;
|
||||
void * pMsg;
|
||||
} SSubmitMsgIter;
|
||||
|
||||
typedef struct {
|
||||
int8_t state;
|
||||
|
||||
|
@ -430,6 +442,7 @@ void tsdbCloseBufPool(STsdbRepo* pRepo);
|
|||
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
|
||||
|
||||
// ------------------ tsdbMemTable.c
|
||||
int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, TSKEY now, int32_t* affectedrows);
|
||||
int tsdbUpdateRowInMem(STsdbRepo* pRepo, SDataRow row, STable* pTable);
|
||||
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||
|
|
|
@ -32,18 +32,6 @@
|
|||
#define TSDB_DEFAULT_COMPRESSION TWO_STAGE_COMP
|
||||
#define IS_VALID_COMPRESSION(compression) (((compression) >= NO_COMPRESSION) && ((compression) <= TWO_STAGE_COMP))
|
||||
|
||||
typedef struct {
|
||||
int32_t totalLen;
|
||||
int32_t len;
|
||||
SDataRow row;
|
||||
} SSubmitBlkIter;
|
||||
|
||||
typedef struct {
|
||||
int32_t totalLen;
|
||||
int32_t len;
|
||||
void * pMsg;
|
||||
} SSubmitMsgIter;
|
||||
|
||||
static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg);
|
||||
static int32_t tsdbSetRepoEnv(char *rootDir, STsdbCfg *pCfg);
|
||||
static int32_t tsdbUnsetRepoEnv(char *rootDir);
|
||||
|
@ -53,11 +41,8 @@ static char * tsdbGetCfgFname(char *rootDir);
|
|||
static STsdbRepo * tsdbNewRepo(char *rootDir, STsdbAppH *pAppH, STsdbCfg *pCfg);
|
||||
static void tsdbFreeRepo(STsdbRepo *pRepo);
|
||||
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
|
||||
static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows);
|
||||
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
|
||||
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
|
||||
static int tsdbRestoreInfo(STsdbRepo *pRepo);
|
||||
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
|
||||
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression);
|
||||
static int tsdbAlterKeep(STsdbRepo *pRepo, int32_t keep);
|
||||
static int tsdbAlterCacheTotalBlocks(STsdbRepo *pRepo, int totalBlocks);
|
||||
|
@ -748,43 +733,6 @@ static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) {
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
int64_t points = 0;
|
||||
|
||||
ASSERT(pBlock->tid < pMeta->maxTables);
|
||||
STable *pTable = pMeta->tables[pBlock->tid];
|
||||
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
|
||||
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
SDataRow row = NULL;
|
||||
|
||||
TSKEY minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep;
|
||||
TSKEY maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
|
||||
|
||||
tsdbInitSubmitBlkIter(pBlock, &blkIter);
|
||||
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) {
|
||||
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
|
||||
" maxKey %" PRId64,
|
||||
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey);
|
||||
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (tsdbUpdateRowInMem(pRepo, row, pTable) < 0) return -1;
|
||||
|
||||
(*affectedrows)++;
|
||||
points++;
|
||||
}
|
||||
|
||||
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
|
||||
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
|
||||
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
||||
if (pIter->len == 0) {
|
||||
pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE;
|
||||
|
@ -804,20 +752,6 @@ static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
||||
SDataRow row = pIter->row;
|
||||
if (row == NULL) return NULL;
|
||||
|
||||
pIter->len += dataRowLen(row);
|
||||
if (pIter->len >= pIter->totalLen) {
|
||||
pIter->row = NULL;
|
||||
} else {
|
||||
pIter->row = (char *)row + dataRowLen(row);
|
||||
}
|
||||
|
||||
return row;
|
||||
}
|
||||
|
||||
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
|
@ -851,14 +785,6 @@ _err:
|
|||
return -1;
|
||||
}
|
||||
|
||||
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
||||
if (pBlock->dataLen <= 0) return -1;
|
||||
pIter->totalLen = pBlock->dataLen;
|
||||
pIter->len = 0;
|
||||
pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbAlterCompression(STsdbRepo *pRepo, int8_t compression) {
|
||||
int8_t ocompression = pRepo->config.compression;
|
||||
pRepo->config.compression = compression;
|
||||
|
|
|
@ -32,8 +32,55 @@ static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
|
|||
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
|
||||
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
|
||||
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SDataRow row);
|
||||
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
|
||||
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
|
||||
|
||||
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
|
||||
TSKEY now);
|
||||
|
||||
// ---------------- INTERNAL FUNCTIONS ----------------
|
||||
int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, TSKEY now, int32_t *affectedrows) {
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
int64_t points = 0;
|
||||
STable * pTable = NULL;
|
||||
SSubmitBlkIter blkIter = {0};
|
||||
SDataRow row = NULL;
|
||||
TSKEY minKey = 0;
|
||||
TSKEY maxKey = 0;
|
||||
void ** pData = NULL;
|
||||
// int rowCounter = 0;
|
||||
|
||||
ASSERT(pBlock->tid < pMeta->maxTables);
|
||||
pTable = pMeta->tables[pBlock->tid];
|
||||
ASSERT(pTable != NULL && TABLE_UID(pTable) == pBlock->uid);
|
||||
|
||||
minKey = now - tsMsPerDay[pRepo->config.precision] * pRepo->config.keep;
|
||||
maxKey = now + tsMsPerDay[pRepo->config.precision] * pRepo->config.daysPerFile;
|
||||
|
||||
pData = (void **)calloc(pBlock->numOfRows, sizeof(void *));
|
||||
if (pData == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
tsdbInitSubmitBlkIter(pBlock, &blkIter);
|
||||
|
||||
while ((row = tsdbGetSubmitBlkNext(&blkIter)) != NULL) {
|
||||
if (tsdbCheckRowRange(pRepo, pTable, row, minKey, maxKey, now) < 0) return -1;
|
||||
|
||||
if (tsdbUpdateRowInMem(pRepo, row, pTable) < 0) return -1;
|
||||
|
||||
(*affectedrows)++;
|
||||
points++;
|
||||
}
|
||||
|
||||
STSchema *pSchema = tsdbGetTableSchemaByVersion(pTable, pBlock->sversion);
|
||||
pRepo->stat.pointsWritten += points * schemaNCols(pSchema);
|
||||
pRepo->stat.totalStorage += points * schemaVLen(pSchema);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int tsdbUpdateRowInMem(STsdbRepo *pRepo, SDataRow row, STable *pTable) {
|
||||
STsdbCfg * pCfg = &pRepo->config;
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
|
@ -847,5 +894,41 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
|
|||
tdAppendDataRowToDataCol(row, *ppSchema, pCols);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
|
||||
if (pBlock->dataLen <= 0) return -1;
|
||||
pIter->totalLen = pBlock->dataLen;
|
||||
pIter->len = 0;
|
||||
pIter->row = (SDataRow)(pBlock->data+pBlock->schemaLen);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static SDataRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
|
||||
SDataRow row = pIter->row;
|
||||
if (row == NULL) return NULL;
|
||||
|
||||
pIter->len += dataRowLen(row);
|
||||
if (pIter->len >= pIter->totalLen) {
|
||||
pIter->row = NULL;
|
||||
} else {
|
||||
pIter->row = (char *)row + dataRowLen(row);
|
||||
}
|
||||
|
||||
return row;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
|
||||
TSKEY now) {
|
||||
if (dataRowKey(row) < minKey || dataRowKey(row) > maxKey) {
|
||||
tsdbError("vgId:%d table %s tid %d uid %" PRIu64 " timestamp is out of range! now %" PRId64 " minKey %" PRId64
|
||||
" maxKey %" PRId64 " row key %" PRId64,
|
||||
REPO_ID(pRepo), TABLE_CHAR_NAME(pTable), TABLE_TID(pTable), TABLE_UID(pTable), now, minKey, maxKey,
|
||||
dataRowKey(row));
|
||||
terrno = TSDB_CODE_TDB_TIMESTAMP_OUT_OF_RANGE;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
Loading…
Reference in New Issue