more integration

This commit is contained in:
Hongze Cheng 2022-01-06 10:11:28 +00:00
parent 0cffefc835
commit ae426947d3
12 changed files with 137 additions and 205 deletions

View File

@ -38,7 +38,6 @@ extern "C" {
memcpy(varDataVal(x), (str), __len); \ memcpy(varDataVal(x), (str), __len); \
} while (0); } while (0);
#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \ #define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \
do { \ do { \
char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \ char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \
@ -73,7 +72,8 @@ typedef struct {
typedef struct { typedef struct {
int version; // version int version; // version
int numOfCols; // Number of columns appended int numOfCols; // Number of columns appended
int tlen; // maximum length of a SDataRow without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) + (bytes)) int tlen; // maximum length of a SDataRow without the header part (sizeof(VarDataOffsetT) + sizeof(VarDataLenT) +
// (bytes))
uint16_t flen; // First part length in a SDataRow after the header part uint16_t flen; // First part length in a SDataRow after the header part
uint16_t vlen; // pure value part length, excluded the overhead (bytes only) uint16_t vlen; // pure value part length, excluded the overhead (bytes only)
STColumn columns[]; STColumn columns[];
@ -202,7 +202,6 @@ void tdFreeDataRow(SDataRow row);
void tdInitDataRow(SDataRow row, STSchema *pSchema); void tdInitDataRow(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row); SDataRow tdDataRowDup(SDataRow row);
// offset here not include dataRow header length // offset here not include dataRow header length
static FORCE_INLINE int tdAppendDataColVal(SDataRow row, const void *value, bool isCopyVarData, int8_t type, static FORCE_INLINE int tdAppendDataColVal(SDataRow row, const void *value, bool isCopyVarData, int8_t type,
int32_t offset) { int32_t offset) {
@ -228,7 +227,6 @@ static FORCE_INLINE int tdAppendDataColVal(SDataRow row, const void *value, bool
return 0; return 0;
} }
// offset here not include dataRow header length // offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) { static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) {
return tdAppendDataColVal(row, value, true, type, offset); return tdAppendDataColVal(row, value, true, type, offset);
@ -269,7 +267,8 @@ static FORCE_INLINE void tdSetColOfRowNullBySchema(SDataRow row, STSchema *pSche
setNull(tdGetRowDataOfCol(row, type, offset), type, bytes); setNull(tdGetRowDataOfCol(row, type, offset), type, bytes);
} }
static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSchema, int dstIdx, SDataRow src, STSchema *pSrcSchema, int srcIdx) { static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSchema, int dstIdx, SDataRow src,
STSchema *pSrcSchema, int srcIdx) {
int8_t type = pDstSchema->columns[dstIdx].type; int8_t type = pDstSchema->columns[dstIdx].type;
assert(type == pSrcSchema->columns[srcIdx].type); assert(type == pSrcSchema->columns[srcIdx].type);
void *pData = tdGetPtrToCol(dst, pDstSchema, dstIdx); void *pData = tdGetPtrToCol(dst, pDstSchema, dstIdx);
@ -319,7 +318,6 @@ static FORCE_INLINE void tdCopyColOfRowBySchema(SDataRow dst, STSchema *pDstSche
} }
} }
// ----------------- Data column structure // ----------------- Data column structure
typedef struct SDataCol { typedef struct SDataCol {
int8_t type; // column type int8_t type; // column type
@ -548,7 +546,7 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, const void *value) { static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, const void *value) {
if (pBuilder->nCols >= pBuilder->tCols) { if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2; pBuilder->tCols *= 2;
SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols); SColIdx *pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
if (pColIdx == NULL) return -1; if (pColIdx == NULL) return -1;
pBuilder->pColIdx = pColIdx; pBuilder->pColIdx = pColIdx;
} }
@ -563,7 +561,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
while (tlen > pBuilder->alloc - pBuilder->size) { while (tlen > pBuilder->alloc - pBuilder->size) {
pBuilder->alloc *= 2; pBuilder->alloc *= 2;
} }
void* buf = realloc(pBuilder->buf, pBuilder->alloc); void *buf = realloc(pBuilder->buf, pBuilder->alloc);
if (buf == NULL) return -1; if (buf == NULL) return -1;
pBuilder->buf = buf; pBuilder->buf = buf;
} }
@ -752,10 +750,10 @@ static FORCE_INLINE void tdGetColAppendDeltaLen(const void *value, int8_t colTyp
typedef struct { typedef struct {
int16_t colId; int16_t colId;
uint8_t colType; uint8_t colType;
char* colVal; char * colVal;
} SColInfo; } SColInfo;
static FORCE_INLINE void setSColInfo(SColInfo* colInfo, int16_t colId, uint8_t colType, char* colVal) { static FORCE_INLINE void setSColInfo(SColInfo *colInfo, int16_t colId, uint8_t colType, char *colVal) {
colInfo->colId = colId; colInfo->colId = colId;
colInfo->colType = colType; colInfo->colType = colType;
colInfo->colVal = colVal; colInfo->colVal = colVal;

View File

@ -73,6 +73,12 @@ extern "C" {
#endif #endif
#ifndef WINDOWS
#ifndef O_BINARY
#define O_BINARY 0
#endif
#endif
#define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b))) #define POINTER_SHIFT(p, b) ((void *)((char *)(p) + (b)))
#define POINTER_DISTANCE(p1, p2) ((char *)(p1) - (char *)(p2)) #define POINTER_DISTANCE(p1, p2) ((char *)(p1) - (char *)(p2))

View File

@ -13,16 +13,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#ifndef _TD_TSDB_HEALTH_H_ // #ifndef _TD_TSDB_HEALTH_H_
#define _TD_TSDB_HEALTH_H_ // #define _TD_TSDB_HEALTH_H_
#include "os.h" // #include "os.h"
#include "tsdb.h" // #include "tsdb.h"
bool tsdbUrgeQueryFree(STsdbRepo* pRepo); // bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo); // int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
bool tsdbIdleMemEnough(); // bool tsdbIdleMemEnough();
bool tsdbAllowNewBlock(STsdbRepo* pRepo); // bool tsdbAllowNewBlock(STsdbRepo* pRepo);
#endif /* _TD_TSDB_BUFFER_H_ */ // #endif /* _TD_TSDB_BUFFER_H_ */

View File

@ -120,17 +120,17 @@ static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
return pBufBlock; return pBufBlock;
} }
static FORCE_INLINE int tsdbGetNextMaxTables(int tid) { // static FORCE_INLINE int tsdbGetNextMaxTables(int tid) {
ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES); // ASSERT(tid >= 1 && tid <= TSDB_MAX_TABLES);
int maxTables = TSDB_INIT_NTABLES; // int maxTables = TSDB_INIT_NTABLES;
while (true) { // while (true) {
maxTables = MIN(maxTables, TSDB_MAX_TABLES); // maxTables = MIN(maxTables, TSDB_MAX_TABLES);
if (tid <= maxTables) break; // if (tid <= maxTables) break;
maxTables *= 2; // maxTables *= 2;
} // }
return maxTables + 1; // return maxTables + 1;
} // }
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "tsdbint.h"
#include "tsdbHealth.h" #include "tsdbHealth.h"
#include "tsdbint.h"
#define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0) #define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0)
@ -118,15 +118,15 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
STsdbBufPool *pBufPool = pRepo->pPool; STsdbBufPool *pBufPool = pRepo->pPool;
while (POOL_IS_EMPTY(pBufPool)) { while (POOL_IS_EMPTY(pBufPool)) {
if(tsDeadLockKillQuery) { if (tsDeadLockKillQuery) {
// supply new Block // supply new Block
if(tsdbInsertNewBlock(pRepo) > 0) { if (tsdbInsertNewBlock(pRepo) > 0) {
tsdbWarn("vgId:%d add new elastic block . elasticBlocks=%d cur free Blocks=%d", REPO_ID(pRepo), pBufPool->nElasticBlocks, pBufPool->bufBlockList->numOfEles); tsdbWarn("vgId:%d add new elastic block . elasticBlocks=%d cur free Blocks=%d", REPO_ID(pRepo),
pBufPool->nElasticBlocks, TD_DLIST_NELES(pBufPool->bufBlockList));
break; break;
} else { } else {
// no newBlock, kill query free // no newBlock, kill query free
if(!tsdbUrgeQueryFree(pRepo)) if (!tsdbUrgeQueryFree(pRepo)) tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo));
tsdbWarn("vgId:%d Urge query free thread start failed.", REPO_ID(pRepo));
} }
} }
@ -135,7 +135,7 @@ SListNode *tsdbAllocBufBlockFromPool(STsdbRepo *pRepo) {
pRepo->repoLocked = true; pRepo->repoLocked = true;
} }
SListNode * pNode = tdListPopHead(pBufPool->bufBlockList); SListNode *pNode = tdListPopHead(pBufPool->bufBlockList);
ASSERT(pNode != NULL); ASSERT(pNode != NULL);
STsdbBufBlock *pBufBlock = NULL; STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock)); tdListNodeGetData(pBufPool->bufBlockList, pNode, (void *)(&pBufBlock));
@ -163,9 +163,9 @@ STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize) {
return pBufBlock; return pBufBlock;
} }
void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); } void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock) { tfree(pBufBlock); }
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) { int tsdbExpandPool(STsdbRepo *pRepo, int32_t oldTotalBlocks) {
if (oldTotalBlocks == pRepo->config.totalBlocks) { if (oldTotalBlocks == pRepo->config.totalBlocks) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -173,7 +173,7 @@ int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks) {
int err = TSDB_CODE_SUCCESS; int err = TSDB_CODE_SUCCESS;
if (tsdbLockRepo(pRepo) < 0) return terrno; if (tsdbLockRepo(pRepo) < 0) return terrno;
STsdbBufPool* pPool = pRepo->pPool; STsdbBufPool *pPool = pRepo->pPool;
if (pRepo->config.totalBlocks > oldTotalBlocks) { if (pRepo->config.totalBlocks > oldTotalBlocks) {
for (int i = 0; i < pRepo->config.totalBlocks - oldTotalBlocks; i++) { for (int i = 0; i < pRepo->config.totalBlocks - oldTotalBlocks; i++) {
@ -199,16 +199,15 @@ err:
return err; return err;
} }
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic) { void tsdbRecycleBufferBlock(STsdbBufPool *pPool, SListNode *pNode, bool bELastic) {
STsdbBufBlock *pBufBlock = NULL; STsdbBufBlock *pBufBlock = NULL;
tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock)); tdListNodeGetData(pPool->bufBlockList, pNode, (void *)(&pBufBlock));
tsdbFreeBufBlock(pBufBlock); tsdbFreeBufBlock(pBufBlock);
free(pNode); free(pNode);
if(bELastic) if (bELastic) {
{
pPool->nElasticBlocks--; pPool->nElasticBlocks--;
tsdbWarn("pPool=%p elastic block reduce one . nElasticBlocks=%d cur free Blocks=%d", pPool, pPool->nElasticBlocks, pPool->bufBlockList->numOfEles); tsdbWarn("pPool=%p elastic block reduce one . nElasticBlocks=%d cur free Blocks=%d", pPool, pPool->nElasticBlocks,
} TD_DLIST_NELES(pPool->bufBlockList));
else } else
pPool->nBufBlocks--; pPool->nBufBlocks--;
} }

View File

@ -265,10 +265,9 @@ int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf) {
return 0; return 0;
} }
// =================== Commit Meta Data // =================== Commit Meta Data
static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile* pMf, bool open) { static int tsdbInitCommitMetaFile(STsdbRepo *pRepo, SMFile *pMf, bool open) {
STsdbFS * pfs = REPO_FS(pRepo); STsdbFS *pfs = REPO_FS(pRepo);
SMFile * pOMFile = pfs->cstatus->pmf; SMFile * pOMFile = pfs->cstatus->pmf;
SDiskID did; SDiskID did;
@ -432,7 +431,7 @@ static int tsdbUpdateMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid, void
tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM))); tsdbUpdateMFileMagic(pMFile, POINTER_SHIFT(cont, contLen - sizeof(TSCKSUM)));
SHashObj* cache = compact ? pfs->metaCacheComp : pfs->metaCache; SHashObj *cache = compact ? pfs->metaCacheComp : pfs->metaCache;
pMFile->info.nRecords++; pMFile->info.nRecords++;
@ -480,7 +479,7 @@ static int tsdbDropMetaRecord(STsdbFS *pfs, SMFile *pMFile, uint64_t uid) {
static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) { static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords); float delPercent = (float)(pMFile->info.nDels) / (float)(pMFile->info.nRecords);
float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size); float tombPercent = (float)(pMFile->info.tombSize) / (float)(pMFile->info.size);
float compactRatio = (float)(tsTsdbMetaCompactRatio)/100; float compactRatio = (float)(tsTsdbMetaCompactRatio) / 100;
if (delPercent < compactRatio && tombPercent < compactRatio) { if (delPercent < compactRatio && tombPercent < compactRatio) {
return 0; return 0;
@ -491,8 +490,9 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
return -1; return -1;
} }
tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64 ",size:%" PRId64, tsdbInfo("begin compact tsdb meta file, ratio:%d, nDels:%" PRId64 ",nRecords:%" PRId64 ",tombSize:%" PRId64
tsTsdbMetaCompactRatio, pMFile->info.nDels,pMFile->info.nRecords,pMFile->info.tombSize,pMFile->info.size); ",size:%" PRId64,
tsTsdbMetaCompactRatio, pMFile->info.nDels, pMFile->info.nRecords, pMFile->info.tombSize, pMFile->info.size);
SMFile mf; SMFile mf;
SDiskID did; SDiskID did;
@ -513,7 +513,7 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
int code = -1; int code = -1;
int64_t maxBufSize = 1024; int64_t maxBufSize = 1024;
SKVRecord *pRecord; SKVRecord *pRecord;
void *pBuf = NULL; void * pBuf = NULL;
pBuf = malloc((size_t)maxBufSize); pBuf = malloc((size_t)maxBufSize);
if (pBuf == NULL) { if (pBuf == NULL) {
@ -536,7 +536,7 @@ static int tsdbCompactMetaFile(STsdbRepo *pRepo, STsdbFS *pfs, SMFile *pMFile) {
} }
if (pRecord->size > maxBufSize) { if (pRecord->size > maxBufSize) {
maxBufSize = pRecord->size; maxBufSize = pRecord->size;
void* tmp = realloc(pBuf, (size_t)maxBufSize); void *tmp = realloc(pBuf, (size_t)maxBufSize);
if (tmp == NULL) { if (tmp == NULL) {
goto _err; goto _err;
} }
@ -572,8 +572,9 @@ _err:
if (code == 0) { if (code == 0) {
// rename meta.tmp -> meta // rename meta.tmp -> meta
tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf), TSDB_FILE_FULL_NAME(pMFile)); tsdbInfo("vgId:%d meta file rename %s -> %s", REPO_ID(pRepo), TSDB_FILE_FULL_NAME(&mf),
taosRename(mf.f.aname,pMFile->f.aname); TSDB_FILE_FULL_NAME(pMFile));
taosRename(mf.f.aname, pMFile->f.aname);
tstrncpy(mf.f.aname, pMFile->f.aname, TSDB_FILENAME_LEN); tstrncpy(mf.f.aname, pMFile->f.aname, TSDB_FILENAME_LEN);
tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN); tstrncpy(mf.f.rname, pMFile->f.rname, TSDB_FILENAME_LEN);
// update current meta file info // update current meta file info
@ -595,8 +596,8 @@ _err:
ASSERT(mf.info.nDels == 0); ASSERT(mf.info.nDels == 0);
ASSERT(mf.info.tombSize == 0); ASSERT(mf.info.tombSize == 0);
tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64, tsdbInfo("end compact tsdb meta file,code:%d,nRecords:%" PRId64 ",size:%" PRId64, code, mf.info.nRecords,
code,mf.info.nRecords,mf.info.size); mf.info.size);
return code; return code;
} }
@ -1059,7 +1060,7 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols, int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile *pDFileAggr, SDataCols *pDataCols,
SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) { SBlock *pBlock, bool isLast, bool isSuper, void **ppBuf, void **ppCBuf, void **ppExBuf) {
STsdbCfg * pCfg = REPO_CFG(pRepo); STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlockData *pBlockData; SBlockData * pBlockData;
SAggrBlkData *pAggrBlkData = NULL; SAggrBlkData *pAggrBlkData = NULL;
int64_t offset = 0, offsetAggr = 0; int64_t offset = 0, offsetAggr = 0;
int rowsToWrite = pDataCols->numOfRows; int rowsToWrite = pDataCols->numOfRows;
@ -1141,8 +1142,7 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
pBlockCol = pBlockData->cols + tcol; pBlockCol = pBlockData->cols + tcol;
tptr = POINTER_SHIFT(pBlockData, lsize); tptr = POINTER_SHIFT(pBlockData, lsize);
if (pCfg->compression == TWO_STAGE_COMP && if (pCfg->compression == TWO_STAGE_COMP && tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) {
tsdbMakeRoom(ppCBuf, tlen + COMP_OVERFLOW_BYTES) < 0) {
return -1; return -1;
} }
@ -1188,7 +1188,6 @@ int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDFile
uint32_t aggrStatus = nColsNotAllNull > 0 ? 1 : 0; uint32_t aggrStatus = nColsNotAllNull > 0 ? 1 : 0;
if (aggrStatus > 0) { if (aggrStatus > 0) {
taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr); taosCalcChecksumAppend(0, (uint8_t *)pAggrBlkData, tsizeAggr);
tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM))); tsdbUpdateDFileMagic(pDFileAggr, POINTER_SHIFT(pAggrBlkData, tsizeAggr - sizeof(TSCKSUM)));
@ -1409,7 +1408,8 @@ static int tsdbCommitAddBlock(SCommitH *pCommith, const SBlock *pSupBlock, const
return 0; return 0;
} }
static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit, bool isLastOneBlock) { static int tsdbMergeBlockData(SCommitH *pCommith, SCommitIter *pIter, SDataCols *pDataCols, TSKEY keyLimit,
bool isLastOneBlock) {
STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith); STsdbRepo *pRepo = TSDB_COMMIT_REPO(pCommith);
STsdbCfg * pCfg = REPO_CFG(pRepo); STsdbCfg * pCfg = REPO_CFG(pRepo);
SBlock block; SBlock block;
@ -1466,9 +1466,9 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
if (key1 < key2) { if (key1 < key2) {
for (int i = 0; i < pDataCols->numOfCols; i++) { for (int i = 0; i < pDataCols->numOfCols; i++) {
//TODO: dataColAppendVal may fail // TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints, 0); pTarget->maxPoints);
} }
pTarget->numOfRows++; pTarget->numOfRows++;
@ -1480,30 +1480,29 @@ static void tsdbLoadAndMergeFromCache(SDataCols *pDataCols, int *iter, SCommitIt
ASSERT(pSchema != NULL); ASSERT(pSchema != NULL);
} }
tdAppendMemRowToDataCol(row, pSchema, pTarget, true, 0); tdAppendMemRowToDataCol(row, pSchema, pTarget, true);
tSkipListIterNext(pCommitIter->pIter); tSkipListIterNext(pCommitIter->pIter);
} else { } else {
if (update != TD_ROW_OVERWRITE_UPDATE) { if (update != TD_ROW_OVERWRITE_UPDATE) {
//copy disk data // copy disk data
for (int i = 0; i < pDataCols->numOfCols; i++) { for (int i = 0; i < pDataCols->numOfCols; i++) {
//TODO: dataColAppendVal may fail // TODO: dataColAppendVal may fail
dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows, dataColAppendVal(pTarget->cols + i, tdGetColDataOfRow(pDataCols->cols + i, *iter), pTarget->numOfRows,
pTarget->maxPoints, 0); pTarget->maxPoints);
} }
if(update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++; if (update == TD_ROW_DISCARD_UPDATE) pTarget->numOfRows++;
} }
if (update != TD_ROW_DISCARD_UPDATE) { if (update != TD_ROW_DISCARD_UPDATE) {
//copy mem data // copy mem data
if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) { if (pSchema == NULL || schemaVersion(pSchema) != memRowVersion(row)) {
pSchema = pSchema =
tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row), (int8_t)memRowType(row)); tsdbGetTableSchemaImpl(pCommitIter->pTable, false, false, memRowVersion(row), (int8_t)memRowType(row));
ASSERT(pSchema != NULL); ASSERT(pSchema != NULL);
} }
tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE, tdAppendMemRowToDataCol(row, pSchema, pTarget, update == TD_ROW_OVERWRITE_UPDATE);
update != TD_ROW_PARTIAL_UPDATE ? 0 : -1);
} }
(*iter)++; (*iter)++;
tSkipListIterNext(pCommitIter->pIter); tSkipListIterNext(pCommitIter->pIter);

View File

@ -223,9 +223,9 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
} }
static bool tsdbShouldCompact(SCompactH *pComph) { static bool tsdbShouldCompact(SCompactH *pComph) {
if (tsdbForceCompactFile) { // if (tsdbForceCompactFile) {
return true; // return true;
} // }
STsdbRepo * pRepo = TSDB_COMPACT_REPO(pComph); STsdbRepo * pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo); STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pComph->readh); SReadH * pReadh = &(pComph->readh);

View File

@ -471,7 +471,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
} }
// fsync, close and rename // fsync, close and rename
if (taosFsync(fd) < 0) { if (taosFsyncFile(fd) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
close(fd); close(fd);
remove(tfname); remove(tfname);
@ -480,7 +480,7 @@ static int tsdbSaveFSStatus(SFSStatus *pStatus, int vid) {
} }
(void)close(fd); (void)close(fd);
(void)taosRename(tfname, cfname); (void)taosRenameFile(tfname, cfname);
taosTZfree(pBuf); taosTZfree(pBuf);
return 0; return 0;
@ -838,7 +838,7 @@ int tsdbLoadMetaCache(STsdbRepo *pRepo, bool recoverMeta) {
if (taosHashPut(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) { if (taosHashPut(pfs->metaCache, (void *)(&rInfo.uid), sizeof(rInfo.uid), &rInfo, sizeof(rInfo)) < 0) {
tsdbError("vgId:%d failed to load meta cache from file %s since OOM", REPO_ID(pRepo), tsdbError("vgId:%d failed to load meta cache from file %s since OOM", REPO_ID(pRepo),
TSDB_FILE_FULL_NAME(pMFile)); TSDB_FILE_FULL_NAME(pMFile));
terrno = TSDB_CODE_COM_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tsdbCloseMFile(pMFile); tsdbCloseMFile(pMFile);
return -1; return -1;
} }
@ -1256,7 +1256,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
isOneFSetFinish = true; isOneFSetFinish = true;
} else { } else {
// return error in case of removing uncomplete DFileSets // return error in case of removing uncomplete DFileSets
terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET; // terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET;
tsdbError("vgId:%d incomplete DFileSet, fid:%d, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles); tsdbError("vgId:%d incomplete DFileSet, fid:%d, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles);
taosArrayDestroy(&fArray); taosArrayDestroy(&fArray);
return -1; return -1;
@ -1269,7 +1269,7 @@ static int tsdbRestoreDFileSet(STsdbRepo *pRepo) {
isOneFSetFinish = true; isOneFSetFinish = true;
} else { } else {
// return error in case of removing uncomplete DFileSets // return error in case of removing uncomplete DFileSets
terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET; // terrno = TSDB_CODE_TDB_INCOMPLETE_DFILESET;
tsdbError("vgId:%d incomplete DFileSet, fid:%d, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles); tsdbError("vgId:%d incomplete DFileSet, fid:%d, nDFiles=%" PRIu8, REPO_ID(pRepo), fset.fid, nDFiles);
taosArrayDestroy(&fArray); taosArrayDestroy(&fArray);
return -1; return -1;

View File

@ -13,6 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#if 0
#include "tsdbHealth.h" #include "tsdbHealth.h"
#include "os.h" #include "os.h"
#include "query.h" #include "query.h"
@ -96,3 +98,5 @@ bool tsdbNoProblem(STsdbRepo* pRepo) {
if (listNEles(pRepo->pPool->bufBlockList) == 0) return false; if (listNEles(pRepo->pPool->bufBlockList) == 0) return false;
return true; return true;
} }
#endif

View File

@ -185,13 +185,13 @@ int tsdbUnlockRepo(STsdbRepo *pRepo) {
return 0; return 0;
} }
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB // int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize) { // MB
STsdbCfg *pCfg = &(pRepo->config); // STsdbCfg *pCfg = &(pRepo->config);
if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) { // if ((walSize > tsdbWalFlushSize) && (walSize > (pCfg->totalBlocks / 2 * pCfg->cacheBlockSize))) {
if (tsdbAsyncCommit(pRepo) < 0) return -1; // if (tsdbAsyncCommit(pRepo) < 0) return -1;
} // }
return 0; // return 0;
} // }
int tsdbCheckCommit(STsdbRepo *pRepo) { int tsdbCheckCommit(STsdbRepo *pRepo) {
ASSERT(pRepo->mem != NULL); ASSERT(pRepo->mem != NULL);
@ -740,7 +740,7 @@ static int tsdbRestoreLastColumns(STsdbRepo *pRepo, STable *pTable, SReadH* pRea
// file block with sub-blocks has no statistics data // file block with sub-blocks has no statistics data
if (pBlock->numOfSubBlocks <= 1) { if (pBlock->numOfSubBlocks <= 1) {
if (tsdbLoadBlockStatis(pReadh, pBlock) == TSDB_STATIS_OK) { if (tsdbLoadBlockStatis(pReadh, pBlock) == 0) {
tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns, pBlock); tsdbGetBlockStatis(pReadh, pBlockStatis, (int)numColumns, pBlock);
loadStatisData = true; loadStatisData = true;
} }

View File

@ -18,22 +18,11 @@
#include "tsdbint.h" #include "tsdbint.h"
#include "tskiplist.h" #include "tskiplist.h"
#include "tsdbRowMergeBuf.h" #include "tsdbRowMergeBuf.h"
#include "ttime.h"
#define TSDB_DATA_SKIPLIST_LEVEL 5 #define TSDB_DATA_SKIPLIST_LEVEL 5
#define TSDB_MAX_INSERT_BATCH 512 #define TSDB_MAX_INSERT_BATCH 512
typedef struct {
int32_t totalLen;
int32_t len;
SMemRow row;
} SSubmitBlkIter;
typedef struct {
int32_t totalLen;
int32_t len;
void * pMsg;
} SSubmitMsgIter;
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo); static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
static void tsdbFreeMemTable(SMemTable *pMemTable); static void tsdbFreeMemTable(SMemTable *pMemTable);
static STableData* tsdbNewTableData(STsdbCfg *pCfg, STable *pTable); static STableData* tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
@ -41,12 +30,8 @@ static void tsdbFreeTableData(STableData *pTableData);
static char * tsdbGetTsTupleKey(const void *data); static char * tsdbGetTsTupleKey(const void *data);
static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables); static int tsdbAdjustMemMaxTables(SMemTable *pMemTable, int maxTables);
static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row); static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema **ppSchema, SMemRow row);
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter);
static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter);
static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg); static int tsdbScanAndConvertSubmitMsg(STsdbRepo *pRepo, SSubmitMsg *pMsg);
static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows); static int tsdbInsertDataToTable(STsdbRepo *pRepo, SSubmitBlk *pBlock, int32_t *affectedrows);
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter);
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock);
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable); static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row); static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SMemRow row);
@ -256,7 +241,7 @@ void *tsdbAllocBytes(STsdbRepo *pRepo, int bytes) {
return NULL; return NULL;
} }
pNode->next = pNode->prev = NULL; TD_DLIST_NODE_NEXT(pNode) = TD_DLIST_NODE_PREV(pNode) = NULL;
tdListAppendNode(pRepo->mem->extraBuffList, pNode); tdListAppendNode(pRepo->mem->extraBuffList, pNode);
ptr = (void *)(pNode->data); ptr = (void *)(pNode->data);
tsdbTrace("vgId:%d allocate %d bytes from SYSTEM buffer block", REPO_ID(pRepo), bytes); tsdbTrace("vgId:%d allocate %d bytes from SYSTEM buffer block", REPO_ID(pRepo), bytes);
@ -598,34 +583,12 @@ static int tsdbAppendTableRowToCols(STable *pTable, SDataCols *pCols, STSchema *
} }
} }
tdAppendMemRowToDataCol(row, *ppSchema, pCols, true, 0); tdAppendMemRowToDataCol(row, *ppSchema, pCols, true);
} }
return 0; return 0;
} }
static int tsdbInitSubmitBlkIter(SSubmitBlk *pBlock, SSubmitBlkIter *pIter) {
if (pBlock->dataLen <= 0) return -1;
pIter->totalLen = pBlock->dataLen;
pIter->len = 0;
pIter->row = (SMemRow)(pBlock->data + pBlock->schemaLen);
return 0;
}
static SMemRow tsdbGetSubmitBlkNext(SSubmitBlkIter *pIter) {
SMemRow row = pIter->row; // firstly, get current row
if (row == NULL) return NULL;
pIter->len += memRowTLen(row);
if (pIter->len >= pIter->totalLen) { // reach the end
pIter->row = NULL;
} else {
pIter->row = (char *)row + memRowTLen(row); // secondly, move to next row
}
return row;
}
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey, static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SMemRow row, TSKEY minKey, TSKEY maxKey,
TSKEY now) { TSKEY now) {
TSKEY rowKey = memRowKey(row); TSKEY rowKey = memRowKey(row);
@ -841,7 +804,7 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *
SMemRow lastRow = NULL; SMemRow lastRow = NULL;
int64_t osize = SL_SIZE(pTableData->pData); int64_t osize = SL_SIZE(pTableData->pData);
tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow); tsdbSetupSkipListHookFns(pTableData->pData, pRepo, pTable, &points, &lastRow);
tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tsdbGetSubmitBlkNext); tSkipListPutBatchByIter(pTableData->pData, &blkIter, (iter_next_fn_t)tGetSubmitBlkNext);
int64_t dsize = SL_SIZE(pTableData->pData) - osize; int64_t dsize = SL_SIZE(pTableData->pData) - osize;
(*pAffectedRows) += points; (*pAffectedRows) += points;
@ -866,43 +829,6 @@ static int tsdbInsertDataToTable(STsdbRepo* pRepo, SSubmitBlk* pBlock, int32_t *
return 0; return 0;
} }
static int tsdbInitSubmitMsgIter(SSubmitMsg *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
pIter->totalLen = pMsg->length;
pIter->len = 0;
pIter->pMsg = pMsg;
if (pMsg->length <= TSDB_SUBMIT_MSG_HEAD_SIZE) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
return -1;
}
return 0;
}
static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPBlock) {
if (pIter->len == 0) {
pIter->len += TSDB_SUBMIT_MSG_HEAD_SIZE;
} else {
SSubmitBlk *pSubmitBlk = (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
pIter->len += (sizeof(SSubmitBlk) + pSubmitBlk->dataLen + pSubmitBlk->schemaLen);
}
if (pIter->len > pIter->totalLen) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
*pPBlock = NULL;
return -1;
}
*pPBlock = (pIter->len == pIter->totalLen) ? NULL : (SSubmitBlk *)POINTER_SHIFT(pIter->pMsg, pIter->len);
return 0;
}
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) { static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable) {
ASSERT(pTable != NULL); ASSERT(pTable != NULL);

View File

@ -505,7 +505,7 @@ int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock) {
if (pBlock->aggrStat) { if (pBlock->aggrStat) {
return tsdbLoadBlockStatisFromAggr(pReadh, pBlock); return tsdbLoadBlockStatisFromAggr(pReadh, pBlock);
} }
return TSDB_STATIS_NONE; return 1;
} }
return tsdbLoadBlockStatisFromDFile(pReadh, pBlock); return tsdbLoadBlockStatisFromDFile(pReadh, pBlock);
} }