make branch compileable
This commit is contained in:
parent
6d736bd099
commit
6994cd71e0
|
@ -35,6 +35,7 @@ int64_t taosReadImp(int32_t fd, void *buf, int64_t count);
|
||||||
int64_t taosWriteImp(int32_t fd, void *buf, int64_t count);
|
int64_t taosWriteImp(int32_t fd, void *buf, int64_t count);
|
||||||
int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence);
|
int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence);
|
||||||
int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstPath);
|
int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstPath);
|
||||||
|
int64_t taosCopy(char *from, char *to);
|
||||||
|
|
||||||
#define taosRead(fd, buf, count) taosReadImp(fd, buf, count)
|
#define taosRead(fd, buf, count) taosReadImp(fd, buf, count)
|
||||||
#define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count)
|
#define taosWrite(fd, buf, count) taosWriteImp(fd, buf, count)
|
||||||
|
|
|
@ -119,11 +119,11 @@ int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) {
|
||||||
return (int64_t)tlseek(fd, (long)offset, whence);
|
return (int64_t)tlseek(fd, (long)offset, whence);
|
||||||
}
|
}
|
||||||
|
|
||||||
ssize_t taosTCopy(char *from, char *to) {
|
int64_t taosCopy(char *from, char *to) {
|
||||||
char buffer[4096];
|
char buffer[4096];
|
||||||
int fidto = -1, fidfrom = -1;
|
int fidto = -1, fidfrom = -1;
|
||||||
ssize_t size = 0;
|
int64_t size = 0;
|
||||||
ssize_t bytes;
|
int64_t bytes;
|
||||||
|
|
||||||
fidfrom = open(from, O_RDONLY);
|
fidfrom = open(from, O_RDONLY);
|
||||||
if (fidfrom < 0) goto _err;
|
if (fidfrom < 0) goto _err;
|
||||||
|
|
|
@ -21,6 +21,7 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TS
|
||||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols);
|
||||||
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
|
static SCommitIter *tsdbCreateCommitIters(STsdbRepo *pRepo);
|
||||||
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
|
static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables);
|
||||||
|
static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key);
|
||||||
|
|
||||||
void *tsdbCommitData(STsdbRepo *pRepo) {
|
void *tsdbCommitData(STsdbRepo *pRepo) {
|
||||||
SMemTable * pMem = pRepo->imem;
|
SMemTable * pMem = pRepo->imem;
|
||||||
|
@ -42,8 +43,6 @@ void *tsdbCommitData(STsdbRepo *pRepo) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbFitRetention(pRepo);
|
|
||||||
|
|
||||||
tsdbInfo("vgId:%d commit over, succeed", REPO_ID(pRepo));
|
tsdbInfo("vgId:%d commit over, succeed", REPO_ID(pRepo));
|
||||||
tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS);
|
tsdbEndCommit(pRepo, TSDB_CODE_SUCCESS);
|
||||||
|
|
||||||
|
@ -65,9 +64,16 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
|
||||||
SCommitIter *iters = NULL;
|
SCommitIter *iters = NULL;
|
||||||
SRWHelper whelper = {0};
|
SRWHelper whelper = {0};
|
||||||
STsdbCfg * pCfg = &(pRepo->config);
|
STsdbCfg * pCfg = &(pRepo->config);
|
||||||
|
SFidGroup fidGroup = {0};
|
||||||
|
TSKEY minKey = 0;
|
||||||
|
TSKEY maxKey = 0;
|
||||||
|
|
||||||
if (pMem->numOfRows <= 0) return 0;
|
if (pMem->numOfRows <= 0) return 0;
|
||||||
|
|
||||||
|
tsdbGetFidGroup(pCfg, &fidGroup);
|
||||||
|
tsdbGetFidKeyRange(pCfg->daysPerFile, pCfg->precision, fidGroup.minFid, &minKey, &maxKey);
|
||||||
|
tsdbRemoveFilesBeyondRetention(pRepo, &fidGroup);
|
||||||
|
|
||||||
iters = tsdbCreateCommitIters(pRepo);
|
iters = tsdbCreateCommitIters(pRepo);
|
||||||
if (iters == NULL) {
|
if (iters == NULL) {
|
||||||
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
|
tsdbError("vgId:%d failed to create commit iterator since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
|
@ -89,14 +95,20 @@ static int tsdbCommitTSData(STsdbRepo *pRepo) {
|
||||||
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
|
int sfid = (int)(TSDB_KEY_FILEID(pMem->keyFirst, pCfg->daysPerFile, pCfg->precision));
|
||||||
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
|
int efid = (int)(TSDB_KEY_FILEID(pMem->keyLast, pCfg->daysPerFile, pCfg->precision));
|
||||||
|
|
||||||
|
tsdbSeekCommitIter(iters, pMem->maxTables, minKey);
|
||||||
|
|
||||||
// Loop to commit to each file
|
// Loop to commit to each file
|
||||||
for (int fid = sfid; fid <= efid; fid++) {
|
for (int fid = sfid; fid <= efid; fid++) {
|
||||||
|
if (fid < fidGroup.minFid) continue;
|
||||||
|
|
||||||
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
|
if (tsdbCommitToFile(pRepo, fid, iters, &whelper, pDataCols) < 0) {
|
||||||
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
tsdbError("vgId:%d failed to commit to file %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsdbApplyRetention(pRepo, &fidGroup);
|
||||||
|
|
||||||
tdFreeDataCols(pDataCols);
|
tdFreeDataCols(pDataCols);
|
||||||
tsdbDestroyCommitIters(iters, pMem->maxTables);
|
tsdbDestroyCommitIters(iters, pMem->maxTables);
|
||||||
tsdbDestroyHelper(&whelper);
|
tsdbDestroyHelper(&whelper);
|
||||||
|
@ -173,7 +185,6 @@ static int tsdbHasDataToCommit(SCommitIter *iters, int nIters, TSKEY minKey, TSK
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
|
static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHelper *pHelper, SDataCols *pDataCols) {
|
||||||
char * dataDir = NULL;
|
|
||||||
STsdbCfg * pCfg = &pRepo->config;
|
STsdbCfg * pCfg = &pRepo->config;
|
||||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||||
SFileGroup *pGroup = NULL;
|
SFileGroup *pGroup = NULL;
|
||||||
|
@ -190,15 +201,17 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create and open files for commit
|
if ((pGroup = tsdbSearchFGroup(pFileH, fid, TD_EQ)) == NULL) {
|
||||||
dataDir = tsdbGetDataDirName(pRepo->rootDir);
|
pGroup = tsdbCreateFGroup(pRepo, fid);
|
||||||
if (dataDir == NULL) {
|
if (pGroup == NULL) {
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((pGroup = tsdbCreateFGroupIfNeed(pRepo, dataDir, fid)) == NULL) {
|
// Open files for write/read
|
||||||
tsdbError("vgId:%d failed to create file group %d since %s", REPO_ID(pRepo), fid, tstrerror(terrno));
|
if (tsdbSetAndOpenHelperFile(pHelper, pGroup) < 0) {
|
||||||
|
tsdbError("vgId:%d failed to set helper file since %s", REPO_ID(pRepo), tstrerror(terrno));
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -259,7 +272,6 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(dataDir);
|
|
||||||
tsdbCloseHelperFile(pHelper, 0, pGroup);
|
tsdbCloseHelperFile(pHelper, 0, pGroup);
|
||||||
|
|
||||||
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||||
|
@ -281,7 +293,6 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
tfree(dataDir);
|
|
||||||
tsdbCloseHelperFile(pHelper, 1, pGroup);
|
tsdbCloseHelperFile(pHelper, 1, pGroup);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -338,3 +349,13 @@ static void tsdbDestroyCommitIters(SCommitIter *iters, int maxTables) {
|
||||||
|
|
||||||
free(iters);
|
free(iters);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void tsdbSeekCommitIter(SCommitIter *pIters, int nIters, TSKEY key) {
|
||||||
|
for (int i = 0; i < nIters; i++) {
|
||||||
|
SCommitIter *pIter = pIters + i;
|
||||||
|
if (pIter->pTable == NULL) continue;
|
||||||
|
if (pIter->pIter == NULL) continue;
|
||||||
|
|
||||||
|
tsdbLoadDataFromCache(pIter->pTable, pIter->pIter, key-1, INT32_MAX, NULL, NULL, 0, true, NULL);
|
||||||
|
}
|
||||||
|
}
|
|
@ -476,7 +476,7 @@ int tsdbApplyRetention(STsdbRepo *pRepo, SFidGroup *pFidGroup) {
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
for (int type = 0; type < TSDB_FILE_TYPE_MAX; type++) {
|
||||||
if (taosTCopy(oFileGroup.files[type].fname, nFileGroup.files[type].fname) < 0) return -1;
|
if (taosCopy(oFileGroup.files[type].fname, nFileGroup.files[type].fname) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
pthread_rwlock_wrlock(&(pFileH->fhlock));
|
||||||
|
|
|
@ -262,6 +262,9 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey
|
||||||
bool isRowDel = false;
|
bool isRowDel = false;
|
||||||
int filterIter = 0;
|
int filterIter = 0;
|
||||||
SDataRow row = NULL;
|
SDataRow row = NULL;
|
||||||
|
SMergeInfo mInfo;
|
||||||
|
|
||||||
|
if (pMergeInfo == NULL) pMergeInfo = &mInfo;
|
||||||
|
|
||||||
memset(pMergeInfo, 0, sizeof(*pMergeInfo));
|
memset(pMergeInfo, 0, sizeof(*pMergeInfo));
|
||||||
pMergeInfo->keyFirst = INT64_MAX;
|
pMergeInfo->keyFirst = INT64_MAX;
|
||||||
|
|
|
@ -21,8 +21,13 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
#include "dnode.h"
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
#include "vnodeCfg.h"
|
||||||
|
#include "vnodeVersion.h"
|
||||||
|
#include "dnodeVWrite.h"
|
||||||
|
#include "dnodeVRead.h"
|
||||||
#include "query.h"
|
#include "query.h"
|
||||||
#include "tpath.h"
|
#include "tpath.h"
|
||||||
#include "tdisk.h"
|
#include "tdisk.h"
|
||||||
|
|
Loading…
Reference in New Issue