commit
e57f38cc64
|
@ -293,7 +293,7 @@ void tsDataSwap(void *pLeft, void *pRight, int32_t type, int32_t size);
|
||||||
|
|
||||||
#define TSDB_MIN_TOTAL_BLOCKS 2
|
#define TSDB_MIN_TOTAL_BLOCKS 2
|
||||||
#define TSDB_MAX_TOTAL_BLOCKS 10000
|
#define TSDB_MAX_TOTAL_BLOCKS 10000
|
||||||
#define TSDB_DEFAULT_TOTAL_BLOCKS 4
|
#define TSDB_DEFAULT_TOTAL_BLOCKS 6
|
||||||
|
|
||||||
#define TSDB_MIN_TABLES 4
|
#define TSDB_MIN_TABLES 4
|
||||||
#define TSDB_MAX_TABLES 10000000
|
#define TSDB_MAX_TABLES 10000000
|
||||||
|
|
|
@ -46,7 +46,6 @@ typedef struct {
|
||||||
int (*eventCallBack)(void *);
|
int (*eventCallBack)(void *);
|
||||||
void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
|
void *(*cqCreateFunc)(void *handle, uint64_t uid, int sid, char *sqlStr, STSchema *pSchema);
|
||||||
void (*cqDropFunc)(void *handle);
|
void (*cqDropFunc)(void *handle);
|
||||||
void *(*configFunc)(int32_t vgId, int32_t sid);
|
|
||||||
} STsdbAppH;
|
} STsdbAppH;
|
||||||
|
|
||||||
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
|
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
|
||||||
|
|
|
@ -172,7 +172,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
||||||
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
STsdbBufBlock *pBufBlock = tsdbGetCurrBufBlock(pRepo);
|
||||||
|
|
||||||
if (pBufBlock != NULL && pBufBlock->remain < bytes) {
|
if (pBufBlock != NULL && pBufBlock->remain < bytes) {
|
||||||
if (listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 2) { // need to commit mem
|
if (listNEles(pRepo->mem->bufBlockList) >= pCfg->totalBlocks / 3) { // need to commit mem
|
||||||
if (tsdbAsyncCommit(pRepo) < 0) return NULL;
|
if (tsdbAsyncCommit(pRepo) < 0) return NULL;
|
||||||
} else {
|
} else {
|
||||||
if (tsdbLockRepo(pRepo) < 0) return NULL;
|
if (tsdbLockRepo(pRepo) < 0) return NULL;
|
||||||
|
@ -204,6 +204,9 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
|
||||||
pBufBlock->offset += bytes;
|
pBufBlock->offset += bytes;
|
||||||
pBufBlock->remain -= bytes;
|
pBufBlock->remain -= bytes;
|
||||||
|
|
||||||
|
tsdbTrace("vgId:%d allocate %d bytes from buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
|
||||||
|
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
||||||
|
|
||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -324,6 +327,8 @@ static void tsdbFreeBytes(STsdbRepo *pRepo, void *ptr, int bytes) {
|
||||||
pBufBlock->offset -= bytes;
|
pBufBlock->offset -= bytes;
|
||||||
pBufBlock->remain += bytes;
|
pBufBlock->remain += bytes;
|
||||||
ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
|
ASSERT(ptr == POINTER_SHIFT(pBufBlock->data, pBufBlock->offset));
|
||||||
|
tsdbTrace("vgId:%d return %d bytes to buffer block, nBlocks %d offset %d remain %d", REPO_ID(pRepo), bytes,
|
||||||
|
listNEles(pRepo->mem->bufBlockList), pBufBlock->offset, pBufBlock->remain);
|
||||||
}
|
}
|
||||||
|
|
||||||
static SMemTable* tsdbNewMemTable(STsdbCfg* pCfg) {
|
static SMemTable* tsdbNewMemTable(STsdbCfg* pCfg) {
|
||||||
|
|
|
@ -592,7 +592,7 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
|
||||||
int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pCTable);
|
int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pCTable);
|
||||||
void *buf = tsdbAllocBytes(pRepo, tlen);
|
void *buf = tsdbAllocBytes(pRepo, tlen);
|
||||||
ASSERT(buf != NULL);
|
ASSERT(buf != NULL);
|
||||||
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pTable);
|
tsdbInsertTableAct(pRepo, TSDB_UPDATE_META, buf, pCTable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -589,20 +589,25 @@ void tsdbGetDataStatis(SRWHelper *pHelper, SDataStatis *pStatis, int numOfCols)
|
||||||
|
|
||||||
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, SCompInfo *pCompInfo, int16_t *colIds, int numOfColIds) {
|
int tsdbLoadBlockDataCols(SRWHelper *pHelper, SCompBlock *pCompBlock, SCompInfo *pCompInfo, int16_t *colIds, int numOfColIds) {
|
||||||
ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block
|
ASSERT(pCompBlock->numOfSubBlocks >= 1); // Must be super block
|
||||||
|
SCompBlock *pTCompBlock = pCompBlock;
|
||||||
|
|
||||||
int numOfSubBlocks = pCompBlock->numOfSubBlocks;
|
int numOfSubBlocks = pCompBlock->numOfSubBlocks;
|
||||||
if (numOfSubBlocks > 1)
|
if (numOfSubBlocks > 1)
|
||||||
pCompBlock = (SCompBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset);
|
pTCompBlock = (SCompBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset);
|
||||||
|
|
||||||
tdResetDataCols(pHelper->pDataCols[0]);
|
tdResetDataCols(pHelper->pDataCols[0]);
|
||||||
if (tsdbLoadBlockDataColsImpl(pHelper, pCompBlock, pHelper->pDataCols[0], colIds, numOfColIds) < 0) goto _err;
|
if (tsdbLoadBlockDataColsImpl(pHelper, pTCompBlock, pHelper->pDataCols[0], colIds, numOfColIds) < 0) goto _err;
|
||||||
for (int i = 1; i < numOfSubBlocks; i++) {
|
for (int i = 1; i < numOfSubBlocks; i++) {
|
||||||
tdResetDataCols(pHelper->pDataCols[1]);
|
tdResetDataCols(pHelper->pDataCols[1]);
|
||||||
pCompBlock++;
|
pTCompBlock++;
|
||||||
if (tsdbLoadBlockDataColsImpl(pHelper, pCompBlock, pHelper->pDataCols[1], colIds, numOfColIds) < 0) goto _err;
|
if (tsdbLoadBlockDataColsImpl(pHelper, pTCompBlock, pHelper->pDataCols[1], colIds, numOfColIds) < 0) goto _err;
|
||||||
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
|
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows &&
|
||||||
|
dataColsKeyFirst(pHelper->pDataCols[0]) == pCompBlock->keyFirst &&
|
||||||
|
dataColsKeyLast(pHelper->pDataCols[0]) == pCompBlock->keyLast);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -610,19 +615,25 @@ _err:
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SCompInfo *pCompInfo) {
|
int tsdbLoadBlockData(SRWHelper *pHelper, SCompBlock *pCompBlock, SCompInfo *pCompInfo) {
|
||||||
|
SCompBlock *pTCompBlock = pCompBlock;
|
||||||
|
|
||||||
int numOfSubBlock = pCompBlock->numOfSubBlocks;
|
int numOfSubBlock = pCompBlock->numOfSubBlocks;
|
||||||
if (numOfSubBlock > 1)
|
if (numOfSubBlock > 1)
|
||||||
pCompBlock = (SCompBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset);
|
pTCompBlock = (SCompBlock *)POINTER_SHIFT((pCompInfo == NULL) ? pHelper->pCompInfo : pCompInfo, pCompBlock->offset);
|
||||||
|
|
||||||
tdResetDataCols(pHelper->pDataCols[0]);
|
tdResetDataCols(pHelper->pDataCols[0]);
|
||||||
if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[0]) < 0) goto _err;
|
if (tsdbLoadBlockDataImpl(pHelper, pTCompBlock, pHelper->pDataCols[0]) < 0) goto _err;
|
||||||
for (int i = 1; i < numOfSubBlock; i++) {
|
for (int i = 1; i < numOfSubBlock; i++) {
|
||||||
tdResetDataCols(pHelper->pDataCols[1]);
|
tdResetDataCols(pHelper->pDataCols[1]);
|
||||||
pCompBlock++;
|
pTCompBlock++;
|
||||||
if (tsdbLoadBlockDataImpl(pHelper, pCompBlock, pHelper->pDataCols[1]) < 0) goto _err;
|
if (tsdbLoadBlockDataImpl(pHelper, pTCompBlock, pHelper->pDataCols[1]) < 0) goto _err;
|
||||||
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
|
if (tdMergeDataCols(pHelper->pDataCols[0], pHelper->pDataCols[1], pHelper->pDataCols[1]->numOfRows) < 0) goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ASSERT(pHelper->pDataCols[0]->numOfRows == pCompBlock->numOfRows &&
|
||||||
|
dataColsKeyFirst(pHelper->pDataCols[0]) == pCompBlock->keyFirst &&
|
||||||
|
dataColsKeyLast(pHelper->pDataCols[0]) == pCompBlock->keyLast);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
@ -1227,7 +1238,6 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
ASSERT(pCompData->numOfCols == pCompBlock->numOfCols);
|
|
||||||
|
|
||||||
int32_t tsize = TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols);
|
int32_t tsize = TSDB_GET_COMPCOL_LEN(pCompBlock->numOfCols);
|
||||||
if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) {
|
if (!taosCheckChecksumWhole((uint8_t *)pCompData, tsize)) {
|
||||||
|
@ -1236,6 +1246,7 @@ static int tsdbLoadBlockDataImpl(SRWHelper *pHelper, SCompBlock *pCompBlock, SDa
|
||||||
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
terrno = TSDB_CODE_TDB_FILE_CORRUPTED;
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
ASSERT(pCompData->numOfCols == pCompBlock->numOfCols);
|
||||||
|
|
||||||
pDataCols->numOfRows = pCompBlock->numOfRows;
|
pDataCols->numOfRows = pCompBlock->numOfRows;
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,9 @@ static int insertData(SInsertInfo *pInfo) {
|
||||||
pBlock->uid = pInfo->uid;
|
pBlock->uid = pInfo->uid;
|
||||||
pBlock->tid = pInfo->tid;
|
pBlock->tid = pInfo->tid;
|
||||||
pBlock->sversion = pInfo->sversion;
|
pBlock->sversion = pInfo->sversion;
|
||||||
pBlock->len = 0;
|
pBlock->dataLen = 0;
|
||||||
|
pBlock->schemaLen = 0;
|
||||||
|
pBlock->numOfRows = 0;
|
||||||
for (int i = 0; i < pInfo->rowsPerSubmit; i++) {
|
for (int i = 0; i < pInfo->rowsPerSubmit; i++) {
|
||||||
// start_time += 1000;
|
// start_time += 1000;
|
||||||
if (pInfo->isAscend) {
|
if (pInfo->isAscend) {
|
||||||
|
@ -47,7 +49,7 @@ static int insertData(SInsertInfo *pInfo) {
|
||||||
} else {
|
} else {
|
||||||
start_time -= pInfo->interval;
|
start_time -= pInfo->interval;
|
||||||
}
|
}
|
||||||
SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
|
SDataRow row = (SDataRow)(pBlock->data + pBlock->dataLen);
|
||||||
tdInitDataRow(row, pInfo->pSchema);
|
tdInitDataRow(row, pInfo->pSchema);
|
||||||
|
|
||||||
for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) {
|
for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) {
|
||||||
|
@ -59,13 +61,15 @@ static int insertData(SInsertInfo *pInfo) {
|
||||||
tdAppendColVal(row, (void *)(&val), pTCol->type, pTCol->bytes, pTCol->offset);
|
tdAppendColVal(row, (void *)(&val), pTCol->type, pTCol->bytes, pTCol->offset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pBlock->len += dataRowLen(row);
|
pBlock->dataLen += dataRowLen(row);
|
||||||
|
pBlock->numOfRows++;
|
||||||
}
|
}
|
||||||
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
|
pMsg->length = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pBlock->dataLen;
|
||||||
pMsg->numOfBlocks = 1;
|
pMsg->numOfBlocks = 1;
|
||||||
|
|
||||||
pBlock->len = htonl(pBlock->len);
|
pBlock->dataLen = htonl(pBlock->dataLen);
|
||||||
pBlock->numOfRows = htonl(pBlock->numOfRows);
|
pBlock->numOfRows = htonl(pBlock->numOfRows);
|
||||||
|
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||||
pBlock->uid = htobe64(pBlock->uid);
|
pBlock->uid = htobe64(pBlock->uid);
|
||||||
pBlock->tid = htonl(pBlock->tid);
|
pBlock->tid = htonl(pBlock->tid);
|
||||||
|
|
||||||
|
@ -74,7 +78,6 @@ static int insertData(SInsertInfo *pInfo) {
|
||||||
|
|
||||||
pMsg->length = htonl(pMsg->length);
|
pMsg->length = htonl(pMsg->length);
|
||||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||||
pMsg->compressed = htonl(pMsg->numOfBlocks);
|
|
||||||
|
|
||||||
if (tsdbInsertData(pInfo->pRepo, pMsg, NULL) < 0) {
|
if (tsdbInsertData(pInfo->pRepo, pMsg, NULL) < 0) {
|
||||||
tfree(pMsg);
|
tfree(pMsg);
|
||||||
|
|
|
@ -259,7 +259,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
appH.cqH = pVnode->cq;
|
appH.cqH = pVnode->cq;
|
||||||
appH.cqCreateFunc = cqCreate;
|
appH.cqCreateFunc = cqCreate;
|
||||||
appH.cqDropFunc = cqDrop;
|
appH.cqDropFunc = cqDrop;
|
||||||
appH.configFunc = dnodeSendCfgTableToRecv;
|
|
||||||
sprintf(temp, "%s/tsdb", rootDir);
|
sprintf(temp, "%s/tsdb", rootDir);
|
||||||
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
|
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
|
||||||
if (pVnode->tsdb == NULL) {
|
if (pVnode->tsdb == NULL) {
|
||||||
|
@ -588,7 +587,6 @@ static int vnodeResetTsdb(SVnodeObj *pVnode)
|
||||||
appH.cqH = pVnode->cq;
|
appH.cqH = pVnode->cq;
|
||||||
appH.cqCreateFunc = cqCreate;
|
appH.cqCreateFunc = cqCreate;
|
||||||
appH.cqDropFunc = cqDrop;
|
appH.cqDropFunc = cqDrop;
|
||||||
appH.configFunc = dnodeSendCfgTableToRecv;
|
|
||||||
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
|
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
|
||||||
|
|
||||||
pVnode->status = TAOS_VN_STATUS_READY;
|
pVnode->status = TAOS_VN_STATUS_READY;
|
||||||
|
|
|
@ -68,7 +68,7 @@ endi
|
||||||
if $data01 != 1 then
|
if $data01 != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data02 != NULL then
|
if $data02 != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
@ -80,7 +80,7 @@ endi
|
||||||
if $data01 != 1 then
|
if $data01 != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data02 != NULL then
|
if $data02 != 1 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue