Merge branch '3.0' into enh/TD-23769-3.0x

This commit is contained in:
kailixu 2023-10-31 16:15:51 +08:00
commit 012d86d9a8
40 changed files with 1253 additions and 856 deletions

View File

@ -46,6 +46,7 @@ extern "C" {
#define SYNC_HEARTBEAT_SLOW_MS 1500
#define SYNC_HEARTBEAT_REPLY_SLOW_MS 1500
#define SYNC_SNAP_RESEND_MS 1000 * 60
#define SYNC_SNAP_TIMEOUT_MS 1000 * 600
#define SYNC_VND_COMMIT_MIN_MS 3000

View File

@ -657,6 +657,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_PAR_CORRESPONDING_STABLE_ERR TAOS_DEF_ERROR_CODE(0, 0x2618)
#define TSDB_CODE_PAR_INVALID_DB_OPTION TAOS_DEF_ERROR_CODE(0, 0x2619)
#define TSDB_CODE_PAR_INVALID_TABLE_OPTION TAOS_DEF_ERROR_CODE(0, 0x261A)
#define TSDB_CODE_PAR_INTER_VALUE_TOO_BIG TAOS_DEF_ERROR_CODE(0, 0x261B)
#define TSDB_CODE_PAR_GROUPBY_WINDOW_COEXIST TAOS_DEF_ERROR_CODE(0, 0x2624)
#define TSDB_CODE_PAR_AGG_FUNC_NESTING TAOS_DEF_ERROR_CODE(0, 0x2627)
#define TSDB_CODE_PAR_INVALID_STATE_WIN_TYPE TAOS_DEF_ERROR_CODE(0, 0x2628)

View File

@ -303,6 +303,8 @@ typedef enum ELogicConditionType {
#define TSDB_SYNC_APPLYQ_SIZE_LIMIT 512
#define TSDB_SYNC_NEGOTIATION_WIN 512
#define TSDB_SYNC_SNAP_BUFFER_SIZE 2048
#define TSDB_TBNAME_COLUMN_INDEX (-1)
#define TSDB_MULTI_TABLEMETA_MAX_NUM 100000 // maximum batch size allowed to load table meta

View File

@ -104,13 +104,21 @@ uint16_t tsAuditPort = 6043;
bool tsEnableAuditCreateTable = true;
// telem
#ifdef TD_ENTERPRISE
bool tsEnableTelem = false;
#else
bool tsEnableTelem = true;
#endif
int32_t tsTelemInterval = 43200;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.tdengine.com";
uint16_t tsTelemPort = 80;
char *tsTelemUri = "/report";
#ifdef TD_ENTERPRISE
bool tsEnableCrashReport = false;
#else
bool tsEnableCrashReport = true;
#endif
char *tsClientCrashReportUri = "/ccrashreport";
char *tsSvrCrashReportUri = "/dcrashreport";

View File

@ -309,7 +309,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STs
void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
// tsdbMerge.c ==============================================================================================
int32_t tsdbMerge(void *arg);
int32_t tsdbSchedMerge(STsdb *tsdb, int32_t fid);
// tsdbDiskData ==============================================================================================
int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder);
@ -371,7 +371,7 @@ struct STsdb {
char *path;
SVnode *pVnode;
STsdbKeepCfg keepCfg;
TdThreadRwlock rwLock;
TdThreadMutex mutex;
SMemTable *mem;
SMemTable *imem;
STsdbFS fs; // old
@ -668,8 +668,8 @@ struct SDelFWriter {
};
#include "tarray2.h"
//#include "tsdbFS2.h"
// struct STFileSet;
// #include "tsdbFS2.h"
// struct STFileSet;
typedef struct STFileSet STFileSet;
typedef TARRAY2(STFileSet *) TFileSetArray;
@ -677,9 +677,9 @@ typedef struct STSnapRange STSnapRange;
typedef TARRAY2(STSnapRange *) TSnapRangeArray; // disjoint snap ranges
// util
int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap);
int32_t tSerializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
int32_t tDeserializeSnapRangeArray(void *buf, int32_t bufLen, TSnapRangeArray *pSnapR);
void tsdbSnapRangeArrayDestroy(TSnapRangeArray **ppSnap);
SHashObj *tsdbGetSnapRangeHash(TSnapRangeArray *pRanges);
// snap partition list
@ -873,8 +873,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree);
void tMergeTreePinSttBlock(SMergeTree* pMTree);
void tMergeTreeUnpinSttBlock(SMergeTree* pMTree);
void tMergeTreePinSttBlock(SMergeTree *pMTree);
void tMergeTreeUnpinSttBlock(SMergeTree *pMTree);
bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree);

View File

@ -31,8 +31,6 @@ SSmaMgmt smaMgmt = {
typedef struct SRSmaQTaskInfoItem SRSmaQTaskInfoItem;
extern int32_t tsdbDoRetention(STsdb *pTsdb, int64_t now);
static int32_t tdUidStorePut(STbUidStore *pStore, tb_uid_t suid, tb_uid_t *uid);
static void tdUidStoreDestory(STbUidStore *pStore);
static int32_t tdUpdateTbUidListImpl(SSma *pSma, tb_uid_t *suid, SArray *tbUids, bool isAdd);

View File

@ -131,7 +131,7 @@ int32_t tsdbBegin(STsdb *pTsdb) {
TSDB_CHECK_CODE(code, lino, _exit);
// lock
if ((code = taosThreadRwlockWrlock(&pTsdb->rwLock))) {
if ((code = taosThreadMutexLock(&pTsdb->mutex))) {
code = TAOS_SYSTEM_ERROR(code);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -139,7 +139,7 @@ int32_t tsdbBegin(STsdb *pTsdb) {
pTsdb->mem = pMemTable;
// unlock
if ((code = taosThreadRwlockUnlock(&pTsdb->rwLock))) {
if ((code = taosThreadMutexUnlock(&pTsdb->mutex))) {
code = TAOS_SYSTEM_ERROR(code);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -152,11 +152,11 @@ _exit:
}
int32_t tsdbPrepareCommit(STsdb *pTsdb) {
taosThreadRwlockWrlock(&pTsdb->rwLock);
taosThreadMutexLock(&pTsdb->mutex);
ASSERT(pTsdb->imem == NULL);
pTsdb->imem = pTsdb->mem;
pTsdb->mem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock);
taosThreadMutexUnlock(&pTsdb->mutex);
return 0;
}
@ -171,9 +171,9 @@ int32_t tsdbCommit(STsdb *pTsdb, SCommitInfo *pInfo) {
// check
if (pMemTable->nRow == 0 && pMemTable->nDel == 0) {
taosThreadRwlockWrlock(&pTsdb->rwLock);
taosThreadMutexLock(&pTsdb->mutex);
pTsdb->imem = NULL;
taosThreadRwlockUnlock(&pTsdb->rwLock);
taosThreadMutexUnlock(&pTsdb->mutex);
tsdbUnrefMemTable(pMemTable, NULL, true);
goto _exit;
@ -501,6 +501,7 @@ static int32_t tsdbCommitFileDataStart(SCommitter *pCommitter) {
int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SDFileSet *pRSet = NULL;
// memory
pCommitter->commitFid = tsdbKeyFid(pCommitter->nextKey, pCommitter->minutes, pCommitter->precision);
pCommitter->expLevel = tsdbFidLevel(pCommitter->commitFid, &pCommitter->pTsdb->keepCfg, taosGetTimestampSec());
@ -798,6 +799,7 @@ static int32_t tsdbCommitFileData(SCommitter *pCommitter) {
int32_t lino = 0;
STsdb *pTsdb = pCommitter->pTsdb;
SMemTable *pMemTable = pTsdb->imem;
// commit file data start
code = tsdbCommitFileDataStart(pCommitter);
TSDB_CHECK_CODE(code, lino, _exit);
@ -1650,18 +1652,18 @@ int32_t tsdbFinishCommit(STsdb *pTsdb) {
SMemTable *pMemTable = pTsdb->imem;
// lock
taosThreadRwlockWrlock(&pTsdb->rwLock);
taosThreadMutexLock(&pTsdb->mutex);
code = tsdbFSCommit(pTsdb);
if (code) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
taosThreadMutexUnlock(&pTsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit);
}
pTsdb->imem = NULL;
// unlock
taosThreadRwlockUnlock(&pTsdb->rwLock);
taosThreadMutexUnlock(&pTsdb->mutex);
if (pMemTable) {
tsdbUnrefMemTable(pMemTable, NULL, true);
}

View File

@ -367,7 +367,12 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
int32_t lino = 0;
STsdb *tsdb = committer->tsdb;
committer->ctx->fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision);
int32_t fid = tsdbKeyFid(committer->ctx->nextKey, committer->minutes, committer->precision);
// check if can commit
tsdbFSCheckCommit(tsdb, fid);
committer->ctx->fid = fid;
committer->ctx->expLevel = tsdbFidLevel(committer->ctx->fid, &tsdb->keepCfg, committer->ctx->now);
tsdbFidKeyRange(committer->ctx->fid, committer->minutes, committer->precision, &committer->ctx->minKey,
&committer->ctx->maxKey);
@ -549,11 +554,11 @@ _exit:
}
int32_t tsdbPreCommit(STsdb *tsdb) {
taosThreadRwlockWrlock(&tsdb->rwLock);
taosThreadMutexLock(&tsdb->mutex);
ASSERT(tsdb->imem == NULL);
tsdb->imem = tsdb->mem;
tsdb->mem = NULL;
taosThreadRwlockUnlock(&tsdb->rwLock);
taosThreadMutexUnlock(&tsdb->mutex);
return 0;
}
@ -568,15 +573,13 @@ int32_t tsdbCommitBegin(STsdb *tsdb, SCommitInfo *info) {
int64_t nDel = imem->nDel;
if (nRow == 0 && nDel == 0) {
taosThreadRwlockWrlock(&tsdb->rwLock);
taosThreadMutexLock(&tsdb->mutex);
tsdb->imem = NULL;
taosThreadRwlockUnlock(&tsdb->rwLock);
taosThreadMutexUnlock(&tsdb->mutex);
tsdbUnrefMemTable(imem, NULL, true);
} else {
SCommitter2 committer[1];
tsdbFSCheckCommit(tsdb->pFS);
code = tsdbOpenCommitter(tsdb, info, committer);
TSDB_CHECK_CODE(code, lino, _exit);
@ -605,14 +608,14 @@ int32_t tsdbCommitCommit(STsdb *tsdb) {
if (tsdb->imem == NULL) goto _exit;
SMemTable *pMemTable = tsdb->imem;
taosThreadRwlockWrlock(&tsdb->rwLock);
taosThreadMutexLock(&tsdb->mutex);
code = tsdbFSEditCommit(tsdb->pFS);
if (code) {
taosThreadRwlockUnlock(&tsdb->rwLock);
taosThreadMutexUnlock(&tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit);
}
tsdb->imem = NULL;
taosThreadRwlockUnlock(&tsdb->rwLock);
taosThreadMutexUnlock(&tsdb->mutex);
tsdbUnrefMemTable(pMemTable, NULL, true);
_exit:

View File

@ -55,25 +55,11 @@ static int32_t create_fs(STsdb *pTsdb, STFileSystem **fs) {
TARRAY2_INIT(fs[0]->fSetArr);
TARRAY2_INIT(fs[0]->fSetArrTmp);
// background task queue
taosThreadMutexInit(fs[0]->mutex, NULL);
fs[0]->bgTaskQueue->next = fs[0]->bgTaskQueue;
fs[0]->bgTaskQueue->prev = fs[0]->bgTaskQueue;
taosThreadMutexInit(&fs[0]->commitMutex, NULL);
taosThreadCondInit(&fs[0]->canCommit, NULL);
fs[0]->blockCommit = false;
return 0;
}
static int32_t destroy_fs(STFileSystem **fs) {
if (fs[0] == NULL) return 0;
taosThreadMutexDestroy(&fs[0]->commitMutex);
taosThreadCondDestroy(&fs[0]->canCommit);
taosThreadMutexDestroy(fs[0]->mutex);
ASSERT(fs[0]->bgTaskNum == 0);
TARRAY2_DESTROY(fs[0]->fSetArr, NULL);
TARRAY2_DESTROY(fs[0]->fSetArrTmp, NULL);
@ -264,10 +250,11 @@ static int32_t apply_commit(STFileSystem *fs) {
if (fset1 && fset2) {
if (fset1->fid < fset2->fid) {
// delete fset1
TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove);
tsdbTFileSetRemove(fset1);
i1++;
} else if (fset1->fid > fset2->fid) {
// create new file set with fid of fset2->fid
code = tsdbTFileSetInitDup(fs->tsdb, fset2, &fset1);
code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
if (code) return code;
code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
if (code) return code;
@ -282,10 +269,11 @@ static int32_t apply_commit(STFileSystem *fs) {
}
} else if (fset1) {
// delete fset1
TARRAY2_REMOVE(fsetArray1, i1, tsdbTFileSetRemove);
tsdbTFileSetRemove(fset1);
i1++;
} else {
// create new file set with fid of fset2->fid
code = tsdbTFileSetInitDup(fs->tsdb, fset2, &fset1);
code = tsdbTFileSetInitCopy(fs->tsdb, fset2, &fset1);
if (code) return code;
code = TARRAY2_SORT_INSERT(fsetArray1, fset1, tsdbTFileSetCmprFn);
if (code) return code;
@ -512,7 +500,8 @@ static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
code = tsdbFSDoScanAndFixFile(fs, fobj);
if (code) {
fset->maxVerValid = (fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
fset->maxVerValid =
(fobj->f->minVer <= fobj->f->maxVer) ? TMIN(fset->maxVerValid, fobj->f->minVer - 1) : -1;
corrupt = true;
}
}
@ -592,7 +581,7 @@ static int32_t tsdbFSDupState(STFileSystem *fs) {
const STFileSet *fset1;
TARRAY2_FOREACH(src, fset1) {
STFileSet *fset2;
code = tsdbTFileSetInitDup(fs->tsdb, fset1, &fset2);
code = tsdbTFileSetInitCopy(fs->tsdb, fset1, &fset2);
if (code) return code;
code = TARRAY2_APPEND(dst, fset2);
if (code) return code;
@ -665,12 +654,6 @@ static int32_t close_file_system(STFileSystem *fs) {
return 0;
}
static int32_t apply_edit(STFileSystem *pFS) {
int32_t code = 0;
ASSERTS(0, "TODO: Not implemented yet");
return code;
}
static int32_t fset_cmpr_fn(const struct STFileSet *pSet1, const struct STFileSet *pSet2) {
if (pSet1->fid < pSet2->fid) {
return -1;
@ -710,10 +693,23 @@ static int32_t edit_fs(STFileSystem *fs, const TFileOpArray *opArray) {
TSDB_CHECK_CODE(code, lino, _exit);
}
// remove empty file set
// remove empty empty stt level and empty file set
int32_t i = 0;
while (i < TARRAY2_SIZE(fsetArray)) {
fset = TARRAY2_GET(fsetArray, i);
SSttLvl *lvl;
int32_t j = 0;
while (j < TARRAY2_SIZE(fset->lvlArr)) {
lvl = TARRAY2_GET(fset->lvlArr, j);
if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
} else {
j++;
}
}
if (tsdbTFileSetIsEmpty(fset)) {
TARRAY2_REMOVE(fsetArray, i, tsdbTFileSetClear);
} else {
@ -753,13 +749,13 @@ _exit:
static void tsdbDoWaitBgTask(STFileSystem *fs, STFSBgTask *task) {
task->numWait++;
taosThreadCondWait(task->done, fs->mutex);
taosThreadCondWait(task->done, &fs->tsdb->mutex);
task->numWait--;
if (task->numWait == 0) {
taosThreadCondDestroy(task->done);
if (task->free) {
task->free(task->arg);
if (task->destroy) {
task->destroy(task->arg);
}
taosMemoryFree(task);
}
@ -770,8 +766,8 @@ static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) {
taosThreadCondBroadcast(task->done);
} else {
taosThreadCondDestroy(task->done);
if (task->free) {
task->free(task->arg);
if (task->destroy) {
task->destroy(task->arg);
}
taosMemoryFree(task);
}
@ -780,23 +776,16 @@ static void tsdbDoDoneBgTask(STFileSystem *fs, STFSBgTask *task) {
int32_t tsdbCloseFS(STFileSystem **fs) {
if (fs[0] == NULL) return 0;
taosThreadMutexLock(fs[0]->mutex);
fs[0]->stop = true;
if (fs[0]->bgTaskRunning) {
tsdbDoWaitBgTask(fs[0], fs[0]->bgTaskRunning);
}
taosThreadMutexUnlock(fs[0]->mutex);
tsdbFSDisableBgTask(fs[0]);
close_file_system(fs[0]);
destroy_fs(fs);
return 0;
}
int64_t tsdbFSAllocEid(STFileSystem *fs) {
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
taosThreadMutexLock(&fs->tsdb->mutex);
int64_t cid = ++fs->neid;
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
taosThreadMutexUnlock(&fs->tsdb->mutex);
return cid;
}
@ -837,27 +826,34 @@ _exit:
return code;
}
static int32_t tsdbFSSetBlockCommit(STFileSystem *fs, bool block) {
taosThreadMutexLock(&fs->commitMutex);
static int32_t tsdbFSSetBlockCommit(STFileSet *fset, bool block) {
if (block) {
fs->blockCommit = true;
fset->blockCommit = true;
} else {
fs->blockCommit = false;
taosThreadCondSignal(&fs->canCommit);
fset->blockCommit = false;
if (fset->numWaitCommit > 0) {
taosThreadCondSignal(&fset->canCommit);
}
}
taosThreadMutexUnlock(&fs->commitMutex);
return 0;
}
int32_t tsdbFSCheckCommit(STFileSystem *fs) {
taosThreadMutexLock(&fs->commitMutex);
while (fs->blockCommit) {
taosThreadCondWait(&fs->canCommit, &fs->commitMutex);
int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid) {
taosThreadMutexLock(&tsdb->mutex);
STFileSet *fset;
tsdbFSGetFSet(tsdb->pFS, fid, &fset);
if (fset) {
while (fset->blockCommit) {
fset->numWaitCommit++;
taosThreadCondWait(&fset->canCommit, &tsdb->mutex);
fset->numWaitCommit--;
}
}
taosThreadMutexUnlock(&fs->commitMutex);
taosThreadMutexUnlock(&tsdb->mutex);
return 0;
}
// IMPORTANT: the caller must hold fs->tsdb->mutex
int32_t tsdbFSEditCommit(STFileSystem *fs) {
int32_t code = 0;
int32_t lino = 0;
@ -867,36 +863,57 @@ int32_t tsdbFSEditCommit(STFileSystem *fs) {
TSDB_CHECK_CODE(code, lino, _exit);
// schedule merge
if (fs->tsdb->pVnode->config.sttTrigger > 1) {
int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
if (sttTrigger > 1) {
STFileSet *fset;
int32_t sttTrigger = fs->tsdb->pVnode->config.sttTrigger;
bool schedMerge = false;
bool blockCommit = false;
TARRAY2_FOREACH_REVERSE(fs->fSetArr, fset) {
if (TARRAY2_SIZE(fset->lvlArr) == 0) continue;
if (TARRAY2_SIZE(fset->lvlArr) == 0) {
tsdbFSSetBlockCommit(fset, false);
continue;
}
SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
if (lvl->level != 0) continue;
if (lvl->level != 0) {
tsdbFSSetBlockCommit(fset, false);
continue;
}
int32_t numFile = TARRAY2_SIZE(lvl->fobjArr);
if (numFile >= sttTrigger) {
schedMerge = true;
// launch merge
code = tsdbSchedMerge(fs->tsdb, fset->fid);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (numFile >= sttTrigger * BLOCK_COMMIT_FACTOR) {
blockCommit = true;
tsdbFSSetBlockCommit(fset, true);
} else {
tsdbFSSetBlockCommit(fset, false);
}
}
}
if (schedMerge && blockCommit) break;
// clear empty level and fset
int32_t i = 0;
while (i < TARRAY2_SIZE(fs->fSetArr)) {
STFileSet *fset = TARRAY2_GET(fs->fSetArr, i);
int32_t j = 0;
while (j < TARRAY2_SIZE(fset->lvlArr)) {
SSttLvl *lvl = TARRAY2_GET(fset->lvlArr, j);
if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
TARRAY2_REMOVE(fset->lvlArr, j, tsdbSttLvlClear);
} else {
j++;
}
}
if (schedMerge) {
code = tsdbFSScheduleBgTask(fs, TSDB_BG_TASK_MERGER, tsdbMerge, NULL, fs->tsdb, NULL);
TSDB_CHECK_CODE(code, lino, _exit);
if (tsdbTFileSetIsEmpty(fset) && fset->bgTaskRunning == NULL) {
TARRAY2_REMOVE(fs->fSetArr, i, tsdbTFileSetClear);
} else {
i++;
}
tsdbFSSetBlockCommit(fs, blockCommit);
}
_exit:
@ -933,15 +950,15 @@ int32_t tsdbFSCreateCopySnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
TARRAY2_INIT(fsetArr[0]);
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
taosThreadMutexLock(&fs->tsdb->mutex);
TARRAY2_FOREACH(fs->fSetArr, fset) {
code = tsdbTFileSetInitDup(fs->tsdb, fset, &fset1);
code = tsdbTFileSetInitCopy(fs->tsdb, fset, &fset1);
if (code) break;
code = TARRAY2_APPEND(fsetArr[0], fset1);
if (code) break;
}
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
taosThreadMutexUnlock(&fs->tsdb->mutex);
if (code) {
TARRAY2_DESTROY(fsetArr[0], tsdbTFileSetClear);
@ -961,9 +978,9 @@ int32_t tsdbFSDestroyCopySnapshot(TFileSetArray **fsetArr) {
}
int32_t tsdbFSCreateRefSnapshot(STFileSystem *fs, TFileSetArray **fsetArr) {
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
taosThreadMutexLock(&fs->tsdb->mutex);
int32_t code = tsdbFSCreateRefSnapshotWithoutLock(fs, fsetArr);
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
taosThreadMutexUnlock(&fs->tsdb->mutex);
return code;
}
@ -1017,7 +1034,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pRange
}
}
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
taosThreadMutexLock(&fs->tsdb->mutex);
TARRAY2_FOREACH(fs->fSetArr, fset) {
int64_t ever = VERSION_MAX;
if (pHash) {
@ -1034,7 +1051,7 @@ int32_t tsdbFSCreateCopyRangedSnapshot(STFileSystem *fs, TSnapRangeArray *pRange
code = TARRAY2_APPEND(fsetArr[0], fset1);
if (code) break;
}
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
taosThreadMutexUnlock(&fs->tsdb->mutex);
_out:
if (code) {
@ -1089,7 +1106,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
}
}
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
taosThreadMutexLock(&fs->tsdb->mutex);
TARRAY2_FOREACH(fs->fSetArr, fset) {
int64_t sver1 = sver;
int64_t ever1 = ever;
@ -1118,7 +1135,7 @@ int32_t tsdbFSCreateRefRangedSnapshot(STFileSystem *fs, int64_t sver, int64_t ev
fsr1 = NULL;
}
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
taosThreadMutexUnlock(&fs->tsdb->mutex);
if (code) {
tsdbTSnapRangeClear(&fsr1);
@ -1137,59 +1154,69 @@ _out:
const char *gFSBgTaskName[] = {NULL, "MERGE", "RETENTION", "COMPACT"};
static int32_t tsdbFSRunBgTask(void *arg) {
STFileSystem *fs = (STFileSystem *)arg;
STFSBgTask *task = (STFSBgTask *)arg;
STFileSystem *fs = task->fs;
STFileSet *fset;
ASSERT(fs->bgTaskRunning != NULL);
tsdbFSGetFSet(fs, task->fid, &fset);
fs->bgTaskRunning->launchTime = taosGetTimestampMs();
fs->bgTaskRunning->run(fs->bgTaskRunning->arg);
fs->bgTaskRunning->finishTime = taosGetTimestampMs();
ASSERT(fset != NULL && fset->bgTaskRunning == task);
task->launchTime = taosGetTimestampMs();
task->run(task->arg);
task->finishTime = taosGetTimestampMs();
tsdbDebug("vgId:%d bg task:%s task id:%" PRId64 " finished, schedule time:%" PRId64 " launch time:%" PRId64
" finish time:%" PRId64,
TD_VID(fs->tsdb->pVnode), gFSBgTaskName[fs->bgTaskRunning->type], fs->bgTaskRunning->taskid,
fs->bgTaskRunning->scheduleTime, fs->bgTaskRunning->launchTime, fs->bgTaskRunning->finishTime);
TD_VID(fs->tsdb->pVnode), gFSBgTaskName[task->type], task->taskid, task->scheduleTime, task->launchTime,
task->finishTime);
taosThreadMutexLock(fs->mutex);
taosThreadMutexLock(&fs->tsdb->mutex);
// free last
tsdbDoDoneBgTask(fs, fs->bgTaskRunning);
fs->bgTaskRunning = NULL;
tsdbDoDoneBgTask(fs, task);
fset->bgTaskRunning = NULL;
// schedule next
if (fs->bgTaskNum > 0) {
if (fset->bgTaskNum > 0) {
if (fs->stop) {
while (fs->bgTaskNum > 0) {
STFSBgTask *task = fs->bgTaskQueue->next;
task->prev->next = task->next;
task->next->prev = task->prev;
fs->bgTaskNum--;
tsdbDoDoneBgTask(fs, task);
while (fset->bgTaskNum > 0) {
STFSBgTask *nextTask = fset->bgTaskQueue->next;
nextTask->prev->next = nextTask->next;
nextTask->next->prev = nextTask->prev;
fset->bgTaskNum--;
tsdbDoDoneBgTask(fs, nextTask);
}
} else {
// pop task from head
fs->bgTaskRunning = fs->bgTaskQueue->next;
fs->bgTaskRunning->prev->next = fs->bgTaskRunning->next;
fs->bgTaskRunning->next->prev = fs->bgTaskRunning->prev;
fs->bgTaskNum--;
vnodeScheduleTaskEx(1, tsdbFSRunBgTask, arg);
fset->bgTaskRunning = fset->bgTaskQueue->next;
fset->bgTaskRunning->prev->next = fset->bgTaskRunning->next;
fset->bgTaskRunning->next->prev = fset->bgTaskRunning->prev;
fset->bgTaskNum--;
vnodeScheduleTaskEx(1, tsdbFSRunBgTask, fset->bgTaskRunning);
}
}
taosThreadMutexUnlock(fs->mutex);
taosThreadMutexUnlock(&fs->tsdb->mutex);
return 0;
}
static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *),
void (*destroy)(void *), void *arg, int64_t *taskid) {
// IMPORTANT: the caller must hold the fs->tsdb->mutex
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *),
void (*destroy)(void *), void *arg, int64_t *taskid) {
if (fs->stop) {
if (destroy) {
destroy(arg);
}
return 0; // TODO: use a better error code
return 0;
}
for (STFSBgTask *task = fs->bgTaskQueue->next; task != fs->bgTaskQueue; task = task->next) {
STFileSet *fset;
tsdbFSGetFSet(fs, fid, &fset);
ASSERT(fset != NULL);
for (STFSBgTask *task = fset->bgTaskQueue->next; task != fset->bgTaskQueue; task = task->next) {
if (task->type == type) {
if (destroy) {
destroy(arg);
@ -1203,22 +1230,24 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int
if (task == NULL) return TSDB_CODE_OUT_OF_MEMORY;
taosThreadCondInit(task->done, NULL);
task->fs = fs;
task->fid = fid;
task->type = type;
task->run = run;
task->free = destroy;
task->destroy = destroy;
task->arg = arg;
task->scheduleTime = taosGetTimestampMs();
task->taskid = ++fs->taskid;
if (fs->bgTaskRunning == NULL && fs->bgTaskNum == 0) {
if (fset->bgTaskRunning == NULL && fset->bgTaskNum == 0) {
// launch task directly
fs->bgTaskRunning = task;
vnodeScheduleTaskEx(1, tsdbFSRunBgTask, fs);
fset->bgTaskRunning = task;
vnodeScheduleTaskEx(1, tsdbFSRunBgTask, task);
} else {
// add to the queue tail
fs->bgTaskNum++;
task->next = fs->bgTaskQueue;
task->prev = fs->bgTaskQueue->prev;
fset->bgTaskNum++;
task->next = fset->bgTaskQueue;
task->prev = fset->bgTaskQueue->prev;
task->prev->next = task;
task->next->prev = task;
}
@ -1227,68 +1256,30 @@ static int32_t tsdbFSScheduleBgTaskImpl(STFileSystem *fs, EFSBgTaskT type, int
return 0;
}
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg,
int64_t *taskid) {
taosThreadMutexLock(fs->mutex);
int32_t code = tsdbFSScheduleBgTaskImpl(fs, type, run, free, arg, taskid);
taosThreadMutexUnlock(fs->mutex);
return code;
}
int32_t tsdbFSDisableBgTask(STFileSystem *fs) {
taosThreadMutexLock(&fs->tsdb->mutex);
for (;;) {
fs->stop = true;
bool done = true;
int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid) {
STFSBgTask *task = NULL;
taosThreadMutexLock(fs->mutex);
if (fs->bgTaskRunning && fs->bgTaskRunning->taskid == taskid) {
task = fs->bgTaskRunning;
} else {
for (STFSBgTask *taskt = fs->bgTaskQueue->next; taskt != fs->bgTaskQueue; taskt = taskt->next) {
if (taskt->taskid == taskid) {
task = taskt;
STFileSet *fset;
TARRAY2_FOREACH(fs->fSetArr, fset) {
if (fset->bgTaskRunning) {
tsdbDoWaitBgTask(fs, fset->bgTaskRunning);
done = false;
break;
}
}
}
if (task) {
tsdbDoWaitBgTask(fs, task);
if (done) break;
}
taosThreadMutexUnlock(fs->mutex);
taosThreadMutexUnlock(&fs->tsdb->mutex);
return 0;
}
int32_t tsdbFSWaitAllBgTask(STFileSystem *fs) {
taosThreadMutexLock(fs->mutex);
while (fs->bgTaskRunning) {
taosThreadCondWait(fs->bgTaskRunning->done, fs->mutex);
}
taosThreadMutexUnlock(fs->mutex);
return 0;
}
static int32_t tsdbFSDoDisableBgTask(STFileSystem *fs) {
fs->stop = true;
if (fs->bgTaskRunning) {
tsdbDoWaitBgTask(fs, fs->bgTaskRunning);
}
return 0;
}
int32_t tsdbFSDisableBgTask(STFileSystem *fs) {
taosThreadMutexLock(fs->mutex);
int32_t code = tsdbFSDoDisableBgTask(fs);
taosThreadMutexUnlock(fs->mutex);
return code;
}
int32_t tsdbFSEnableBgTask(STFileSystem *fs) {
taosThreadMutexLock(fs->mutex);
taosThreadMutexLock(&fs->tsdb->mutex);
fs->stop = false;
taosThreadMutexUnlock(fs->mutex);
taosThreadMutexUnlock(&fs->tsdb->mutex);
return 0;
}

View File

@ -22,22 +22,11 @@
extern "C" {
#endif
/* Exposed Handle */
typedef struct STFileSystem STFileSystem;
typedef struct STFSBgTask STFSBgTask;
// typedef TARRAY2(STFileSet *) TFileSetArray;
typedef enum {
TSDB_FEDIT_COMMIT = 1, //
TSDB_FEDIT_MERGE
} EFEditT;
typedef enum {
TSDB_BG_TASK_MERGER = 1,
TSDB_BG_TASK_RETENTION,
TSDB_BG_TASK_COMPACT,
} EFSBgTaskT;
typedef enum {
TSDB_FCURRENT = 1,
TSDB_FCURRENT_C, // for commit
@ -67,37 +56,17 @@ int32_t tsdbFSEditBegin(STFileSystem *fs, const TFileOpArray *opArray, EFEditT e
int32_t tsdbFSEditCommit(STFileSystem *fs);
int32_t tsdbFSEditAbort(STFileSystem *fs);
// background task
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, EFSBgTaskT type, int32_t (*run)(void *), void (*free)(void *), void *arg,
int64_t *taskid);
int32_t tsdbFSWaitBgTask(STFileSystem *fs, int64_t taskid);
int32_t tsdbFSWaitAllBgTask(STFileSystem *fs);
int32_t tsdbFSScheduleBgTask(STFileSystem *fs, int32_t fid, EFSBgTaskT type, int32_t (*run)(void *),
void (*destroy)(void *), void *arg, int64_t *taskid);
int32_t tsdbFSDisableBgTask(STFileSystem *fs);
int32_t tsdbFSEnableBgTask(STFileSystem *fs);
// other
int32_t tsdbFSGetFSet(STFileSystem *fs, int32_t fid, STFileSet **fset);
int32_t tsdbFSCheckCommit(STFileSystem *fs);
int32_t tsdbFSCheckCommit(STsdb *tsdb, int32_t fid);
// utils
int32_t save_fs(const TFileSetArray *arr, const char *fname);
int32_t current_fname(STsdb *pTsdb, char *fname, EFCurrentT ftype);
struct STFSBgTask {
EFSBgTaskT type;
int32_t (*run)(void *arg);
void (*free)(void *arg);
void *arg;
TdThreadCond done[1];
int32_t numWait;
int64_t taskid;
int64_t scheduleTime;
int64_t launchTime;
int64_t finishTime;
struct STFSBgTask *prev;
struct STFSBgTask *next;
};
/* Exposed Structs */
struct STFileSystem {
STsdb *tsdb;
@ -109,17 +78,8 @@ struct STFileSystem {
TFileSetArray fSetArrTmp[1];
// background task queue
TdThreadMutex mutex[1];
bool stop;
int64_t taskid;
int32_t bgTaskNum;
STFSBgTask bgTaskQueue[1];
STFSBgTask *bgTaskRunning;
// block commit variables
TdThreadMutex commitMutex;
TdThreadCond canCommit;
bool blockCommit;
bool stop;
int64_t taskid;
};
#ifdef __cplusplus

View File

@ -342,11 +342,6 @@ int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op) {
int32_t idx = TARRAY2_SEARCH_IDX(lvl->fobjArr, &tfobjp, tsdbTFileObjCmpr, TD_EQ);
ASSERT(idx >= 0);
TARRAY2_REMOVE(lvl->fobjArr, idx, tsdbSttLvlClearFObj);
if (TARRAY2_SIZE(lvl->fobjArr) == 0) {
// TODO: remove the stt level if no file exists anymore
// TARRAY2_REMOVE(&fset->lvlArr, lvl - fset->lvlArr.data, tsdbSttLvlClear);
}
} else {
ASSERT(tsdbIsSameTFile(&op->of, fset->farr[op->of.type]->f));
tsdbTFileObjUnref(fset->farr[op->of.type]);
@ -454,10 +449,22 @@ int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset) {
fset[0]->fid = fid;
fset[0]->maxVerValid = VERSION_MAX;
TARRAY2_INIT(fset[0]->lvlArr);
// background task queue
fset[0]->bgTaskNum = 0;
fset[0]->bgTaskQueue->next = fset[0]->bgTaskQueue;
fset[0]->bgTaskQueue->prev = fset[0]->bgTaskQueue;
fset[0]->bgTaskRunning = NULL;
// block commit variables
taosThreadCondInit(&fset[0]->canCommit, NULL);
fset[0]->numWaitCommit = 0;
fset[0]->blockCommit = false;
return 0;
}
int32_t tsdbTFileSetInitDup(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset) {
int32_t tsdbTFileSetInitCopy(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset) {
int32_t code = tsdbTFileSetInit(fset1->fid, fset);
if (code) return code;
@ -588,21 +595,23 @@ int32_t tsdbTFileSetClear(STFileSet **fset) {
TARRAY2_DESTROY(fset[0]->lvlArr, tsdbSttLvlClear);
taosThreadCondDestroy(&fset[0]->canCommit);
taosMemoryFree(fset[0]);
fset[0] = NULL;
return 0;
}
int32_t tsdbTFileSetRemove(STFileSet **fset) {
int32_t tsdbTFileSetRemove(STFileSet *fset) {
if (fset == NULL) return 0;
for (tsdb_ftype_t ftype = TSDB_FTYPE_MIN; ftype < TSDB_FTYPE_MAX; ++ftype) {
if (fset[0]->farr[ftype] == NULL) continue;
tsdbTFileObjRemove(fset[0]->farr[ftype]);
if (fset->farr[ftype] == NULL) continue;
tsdbTFileObjRemove(fset->farr[ftype]);
}
TARRAY2_DESTROY(fset[0]->lvlArr, tsdbSttLvlRemove);
taosMemoryFree(fset[0]);
fset[0] = NULL;
TARRAY2_DESTROY(fset->lvlArr, tsdbSttLvlRemove);
return 0;
}

View File

@ -28,6 +28,8 @@ typedef struct SSttLvl SSttLvl;
typedef TARRAY2(STFileObj *) TFileObjArray;
typedef TARRAY2(SSttLvl *) TSttLvlArray;
typedef TARRAY2(STFileOp) TFileOpArray;
typedef struct STFileSystem STFileSystem;
typedef struct STFSBgTask STFSBgTask;
typedef enum {
TSDB_FOP_NONE = 0,
@ -41,10 +43,10 @@ typedef enum {
// init/clear
int32_t tsdbTFileSetInit(int32_t fid, STFileSet **fset);
int32_t tsdbTFileSetInitDup(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
int32_t tsdbTFileSetInitCopy(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
int32_t tsdbTFileSetInitRef(STsdb *pTsdb, const STFileSet *fset1, STFileSet **fset);
int32_t tsdbTFileSetClear(STFileSet **fset);
int32_t tsdbTFileSetRemove(STFileSet **fset);
int32_t tsdbTFileSetRemove(STFileSet *fset);
int32_t tsdbTFileSetFilteredInitDup(STsdb *pTsdb, const STFileSet *fset1, int64_t ever, STFileSet **fset,
TFileOpArray *fopArr);
@ -58,6 +60,7 @@ int32_t tsdbJsonToTFileSet(STsdb *pTsdb, const cJSON *json, STFileSet **fset);
// cmpr
int32_t tsdbTFileSetCmprFn(const STFileSet **fset1, const STFileSet **fset2);
// edit
int32_t tsdbSttLvlClear(SSttLvl **lvl);
int32_t tsdbTFileSetEdit(STsdb *pTsdb, STFileSet *fset, const STFileOp *op);
int32_t tsdbTFileSetApplyEdit(STsdb *pTsdb, const STFileSet *fset1, STFileSet *fset);
// max commit id
@ -70,6 +73,33 @@ bool tsdbTFileSetIsEmpty(const STFileSet *fset);
int32_t tsdbSttLvlInit(int32_t level, SSttLvl **lvl);
int32_t tsdbSttLvlClear(SSttLvl **lvl);
typedef enum {
TSDB_BG_TASK_MERGER = 1,
TSDB_BG_TASK_RETENTION,
TSDB_BG_TASK_COMPACT,
} EFSBgTaskT;
struct STFSBgTask {
STFileSystem *fs;
int32_t fid;
EFSBgTaskT type;
int32_t (*run)(void *arg);
void (*destroy)(void *arg);
void *arg;
TdThreadCond done[1];
int32_t numWait;
int64_t taskid;
int64_t scheduleTime;
int64_t launchTime;
int64_t finishTime;
struct STFSBgTask *prev;
struct STFSBgTask *next;
};
struct STFileOp {
tsdb_fop_t optype;
int32_t fid;
@ -87,6 +117,16 @@ struct STFileSet {
int64_t maxVerValid;
STFileObj *farr[TSDB_FTYPE_MAX]; // file array
TSttLvlArray lvlArr[1]; // level array
// background task queue
int32_t bgTaskNum;
STFSBgTask bgTaskQueue[1];
STFSBgTask *bgTaskRunning;
// block commit variables
TdThreadCond canCommit;
int32_t numWaitCommit;
bool blockCommit;
};
struct STSnapRange {

View File

@ -191,7 +191,7 @@ int32_t tsdbDeleteTableData(STsdb *pTsdb, int64_t version, tb_uid_t suid, tb_uid
pMemTable->nDel++;
pMemTable->minVer = TMIN(pMemTable->minVer, version);
pMemTable->maxVer = TMIN(pMemTable->maxVer, version);
pMemTable->maxVer = TMAX(pMemTable->maxVer, version);
/*
if (TSDB_CACHE_LAST_ROW(pMemTable->pTsdb->pVnode->config) && tsdbKeyCmprFn(&lastKey, &pTbData->maxKey) >= 0) {
tsdbCacheDeleteLastrow(pTsdb->lruCache, pTbData->uid, eKey);

View File

@ -15,11 +15,17 @@
#include "tsdbMerge.h"
#define TSDB_MAX_LEVEL 6 // means max level is 7
#define TSDB_MAX_LEVEL 2 // means max level is 3
typedef struct {
STsdb *tsdb;
TFileSetArray *fsetArr;
STsdb *tsdb;
int32_t fid;
} SMergeArg;
typedef struct {
STsdb *tsdb;
int32_t fid;
STFileSet *fset;
int32_t sttTrigger;
int32_t maxRow;
@ -313,7 +319,6 @@ static int32_t tsdbMergeFileSetBeginOpenWriter(SMerger *merger) {
if (merger->ctx->fset->farr[ftype]) {
config.files[ftype].exist = true;
config.files[ftype].file = merger->ctx->fset->farr[ftype]->f[0];
} else {
config.files[ftype].exist = false;
}
@ -397,13 +402,13 @@ static int32_t tsdbMergeFileSetEnd(SMerger *merger) {
code = tsdbFSEditBegin(merger->tsdb->pFS, merger->fopArr, TSDB_FEDIT_MERGE);
TSDB_CHECK_CODE(code, lino, _exit);
taosThreadRwlockWrlock(&merger->tsdb->rwLock);
taosThreadMutexLock(&merger->tsdb->mutex);
code = tsdbFSEditCommit(merger->tsdb->pFS);
if (code) {
taosThreadRwlockUnlock(&merger->tsdb->rwLock);
taosThreadMutexUnlock(&merger->tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadRwlockUnlock(&merger->tsdb->rwLock);
taosThreadMutexUnlock(&merger->tsdb->mutex);
_exit:
if (code) {
@ -478,30 +483,21 @@ _exit:
}
static int32_t tsdbDoMerge(SMerger *merger) {
int32_t code = 0;
int32_t lino = 0;
int32_t code = 0;
int32_t lino = 0;
SSttLvl *lvl = TARRAY2_FIRST(merger->fset->lvlArr);
STFileSet *fset;
TARRAY2_FOREACH(merger->fsetArr, fset) {
if (TARRAY2_SIZE(fset->lvlArr) == 0) continue;
if (TARRAY2_SIZE(merger->fset->lvlArr) == 0) return 0;
if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < merger->sttTrigger) return 0;
SSttLvl *lvl = TARRAY2_FIRST(fset->lvlArr);
code = tsdbMergerOpen(merger);
TSDB_CHECK_CODE(code, lino, _exit);
if (lvl->level != 0 || TARRAY2_SIZE(lvl->fobjArr) < merger->sttTrigger) continue;
code = tsdbMergeFileSet(merger, merger->fset);
TSDB_CHECK_CODE(code, lino, _exit);
if (!merger->ctx->opened) {
code = tsdbMergerOpen(merger);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbMergeFileSet(merger, fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (merger->ctx->opened) {
code = tsdbMergerClose(merger);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbMergerClose(merger);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
@ -512,36 +508,73 @@ _exit:
return code;
}
int32_t tsdbMerge(void *arg) {
int32_t code = 0;
int32_t lino = 0;
STsdb *tsdb = (STsdb *)arg;
static int32_t tsdbMergeGetFSet(SMerger *merger) {
STFileSet *fset;
SMerger merger[1] = {{
.tsdb = tsdb,
.sttTrigger = tsdb->pVnode->config.sttTrigger,
}};
if (merger->sttTrigger <= 1) {
taosThreadMutexLock(&merger->tsdb->mutex);
tsdbFSGetFSet(merger->tsdb->pFS, merger->fid, &fset);
if (fset == NULL) {
taosThreadMutexUnlock(&merger->tsdb->mutex);
return 0;
}
code = tsdbFSCreateCopySnapshot(tsdb->pFS, &merger->fsetArr);
int32_t code = tsdbTFileSetInitCopy(merger->tsdb, fset, &merger->fset);
if (code) {
taosThreadMutexUnlock(&merger->tsdb->mutex);
return code;
}
taosThreadMutexUnlock(&merger->tsdb->mutex);
return 0;
}
static int32_t tsdbMerge(void *arg) {
int32_t code = 0;
int32_t lino = 0;
SMergeArg *mergeArg = (SMergeArg *)arg;
STsdb *tsdb = mergeArg->tsdb;
SMerger merger[1] = {{
.tsdb = tsdb,
.fid = mergeArg->fid,
.sttTrigger = tsdb->pVnode->config.sttTrigger,
}};
if (merger->sttTrigger <= 1) return 0;
// copy snapshot
code = tsdbMergeGetFSet(merger);
TSDB_CHECK_CODE(code, lino, _exit);
if (merger->fset == NULL) return 0;
// do merge
tsdbDebug("vgId:%d merge begin, fid:%d", TD_VID(tsdb->pVnode), merger->fid);
code = tsdbDoMerge(merger);
tsdbDebug("vgId:%d merge done, fid:%d", TD_VID(tsdb->pVnode), mergeArg->fid);
TSDB_CHECK_CODE(code, lino, _exit);
tsdbFSDestroyCopySnapshot(&merger->fsetArr);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
tsdbFatal("vgId:%d, failed to merge stt files since %s. code:%d", TD_VID(tsdb->pVnode), terrstr(), code);
taosMsleep(100);
exit(EXIT_FAILURE);
} else if (merger->ctx->opened) {
tsdbDebug("vgId:%d %s done", TD_VID(tsdb->pVnode), __func__);
}
tsdbTFileSetClear(&merger->fset);
return code;
}
int32_t tsdbSchedMerge(STsdb *tsdb, int32_t fid) {
SMergeArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
arg->tsdb = tsdb;
arg->fid = fid;
int32_t code = tsdbFSScheduleBgTask(tsdb->pFS, fid, TSDB_BG_TASK_MERGER, tsdbMerge, taosMemoryFree, arg, NULL);
if (code) taosMemoryFree(arg);
return code;
}

View File

@ -53,7 +53,7 @@ int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb, const char *dir, STsdbKeepCfg *pKee
snprintf(pTsdb->path, TD_PATH_MAX, "%s%s%s", pVnode->path, TD_DIRSEP, dir);
// taosRealPath(pTsdb->path, NULL, slen);
pTsdb->pVnode = pVnode;
taosThreadRwlockInit(&pTsdb->rwLock, NULL);
taosThreadMutexInit(&pTsdb->mutex, NULL);
if (!pKeepCfg) {
tsdbSetKeepCfg(pTsdb, &pVnode->config.tsdbCfg);
} else {
@ -99,15 +99,14 @@ int tsdbClose(STsdb **pTsdb) {
tsdbDebug("vgId:%d, tsdb is close at %s, days:%d, keep:%d,%d,%d, keepTimeOffset:%d", TD_VID(pdb->pVnode), pdb->path,
pdb->keepCfg.days, pdb->keepCfg.keep0, pdb->keepCfg.keep1, pdb->keepCfg.keep2,
pdb->keepCfg.keepTimeOffset);
taosThreadRwlockWrlock(&(*pTsdb)->rwLock);
taosThreadMutexLock(&(*pTsdb)->mutex);
tsdbMemTableDestroy((*pTsdb)->mem, true);
(*pTsdb)->mem = NULL;
taosThreadRwlockUnlock(&(*pTsdb)->rwLock);
taosThreadRwlockDestroy(&(*pTsdb)->rwLock);
taosThreadMutexUnlock(&(*pTsdb)->mutex);
tsdbCloseFS(&(*pTsdb)->pFS);
tsdbCloseCache(*pTsdb);
taosThreadMutexDestroy(&(*pTsdb)->mutex);
taosMemoryFreeClear(*pTsdb);
}
return 0;

View File

@ -1105,8 +1105,9 @@ static int32_t dataBlockPartiallyRequired(STimeWindow* pWindow, SVersionRange* p
(pVerRange->maxVer < pBlock->record.maxVer && pVerRange->maxVer >= pBlock->record.minVer);
}
static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pTableBlockScanInfo,
int32_t* nextIndex, int32_t order, SBrinRecord* pRecord) {
static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlockInfo* pBlockInfo,
STableBlockScanInfo* pTableBlockScanInfo, int32_t* nextIndex, int32_t order,
SBrinRecord* pRecord) {
bool asc = ASCENDING_TRAVERSE(order);
if (asc && pBlockInfo->tbBlockIdx >= taosArrayGetSize(pTableBlockScanInfo->pBlockIdxList) - 1) {
return false;
@ -1119,7 +1120,8 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
int32_t step = asc ? 1 : -1;
// *nextIndex = pBlockInfo->tbBlockIdx + step;
// *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
STableDataBlockIdx* pTableDataBlockIdx =
taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
memcpy(pRecord, &p->record, sizeof(SBrinRecord));
@ -1145,7 +1147,8 @@ static int32_t findFileBlockInfoIndex(SDataBlockIter* pBlockIter, SFileDataBlock
return -1;
}
static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index, int32_t step) {
static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index,
int32_t step) {
if (index < 0 || index >= pBlockIter->numOfBlocks) {
return -1;
}
@ -1158,7 +1161,8 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
for (int32_t i = index - 1; i >= pBlockIter->index; --i) {
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i);
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
STableBlockScanInfo* pBlockScanInfo =
getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx);
pTableDataBlockIdx->globalIndex = i + 1;
@ -1168,13 +1172,13 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
for (int32_t i = index + 1; i <= pBlockIter->index; ++i) {
SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i);
STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
STableBlockScanInfo* pBlockScanInfo =
getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr);
STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx);
pTableDataBlockIdx->globalIndex = i - 1;
taosArraySet(pBlockIter->blockList, i - 1, pBlockInfo);
}
}
taosArraySet(pBlockIter->blockList, pBlockIter->index, &fblock);
@ -1286,7 +1290,8 @@ static void getBlockToLoadInfo(SDataBlockToLoadInfo* pInfo, SFileDataBlockInfo*
int32_t neighborIndex = 0;
SBrinRecord rec = {0};
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex, pReader->info.order, &rec);
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pScanInfo, &neighborIndex,
pReader->info.order, &rec);
// overlap with neighbor
if (hasNeighbor) {
@ -1420,9 +1425,7 @@ static bool nextRowFromLastBlocks(SLastBlockReader* pLastBlockReader, STableBloc
}
}
static void doPinSttBlock(SLastBlockReader* pLastBlockReader) {
tMergeTreePinSttBlock(&pLastBlockReader->mergeTree);
}
static void doPinSttBlock(SLastBlockReader* pLastBlockReader) { tMergeTreePinSttBlock(&pLastBlockReader->mergeTree); }
static void doUnpinSttBlock(SLastBlockReader* pLastBlockReader) {
tMergeTreeUnpinSttBlock(&pLastBlockReader->mergeTree);
@ -1568,7 +1571,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) {
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1618,7 +1621,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (minKey == tsLast) {
TSDBROW* fRow1 = tMergeTreeGetRow(&pLastBlockReader->mergeTree);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
int32_t code = tsdbRowMergerAdd(pMerger, fRow1, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -1826,8 +1829,8 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
int64_t key = hasDataInFileBlock(pBlockData, pDumpInfo) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] : INT64_MIN;
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
@ -2219,8 +2222,9 @@ static int32_t buildComposedDataBlockImpl(STsdbReader* pReader, STableBlockScanI
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
TSDBROW *pRow = NULL, *piRow = NULL;
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped)) ? pBlockData->aTSKEY[pDumpInfo->rowIndex] :
(ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN);
int64_t key = (pBlockData->nRow > 0 && (!pDumpInfo->allDumped))
? pBlockData->aTSKEY[pDumpInfo->rowIndex]
: (ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MAX : INT64_MIN);
if (pBlockScanInfo->iter.hasVal) {
pRow = getValidMemRow(&pBlockScanInfo->iter, pBlockScanInfo->delSkyline, pReader);
}
@ -2257,7 +2261,8 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
*loadNeighbor = false;
SBrinRecord rec = {0};
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pBlockScanInfo, &nextIndex, pReader->info.order, &rec);
bool hasNeighbor = getNeighborBlockOfSameTable(&pReader->status.blockIter, pBlockInfo, pBlockScanInfo, &nextIndex,
pReader->info.order, &rec);
if (!hasNeighbor) { // do nothing
return code;
}
@ -2268,7 +2273,7 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
// 1. find the next neighbor block in the scan block list
STableDataBlockIdx* tableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, nextIndex);
int32_t neighborIndex = tableDataBlockIdx->globalIndex;
int32_t neighborIndex = tableDataBlockIdx->globalIndex;
// 2. remove it from the scan block list
setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step);
@ -2704,7 +2709,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
(ASCENDING_TRAVERSE(pReader->info.order)) ? pBlockInfo->record.firstKey : pBlockInfo->record.lastKey;
code = buildDataBlockFromBuf(pReader, pScanInfo, endKey);
} else {
bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
bool bHasDataInLastBlock = hasDataInLastBlock(pLastBlockReader);
int64_t tsLast = bHasDataInLastBlock ? getCurrentKeyInLastBlock(pLastBlockReader) : INT64_MIN;
if (!bHasDataInLastBlock ||
((asc && pBlockInfo->record.lastKey < tsLast) || (!asc && pBlockInfo->record.firstKey > tsLast))) {
@ -3479,7 +3484,7 @@ int32_t doMergeMemTableMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo* pIter,
// start to merge duplicated rows
STSchema* pTSchema = NULL;
if (current.type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory
if (current.type == TSDBROW_ROW_FMT) { // get the correct schema for row-wise data in memory
pTSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(&current), pReader, uid);
if (pTSchema == NULL) {
return terrno;
@ -3525,8 +3530,8 @@ int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* p
SRow** pTSRow) {
SRowMerger* pMerger = &pReader->status.merger;
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
TSDBKEY k = TSDBROW_KEY(pRow);
TSDBKEY ik = TSDBROW_KEY(piRow);
STSchema* pSchema = NULL;
if (pRow->type == TSDBROW_ROW_FMT) {
@ -4907,12 +4912,12 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
SVersionRange* pRange = &pReader->info.verRange;
// lock
taosThreadRwlockRdlock(&pTsdb->rwLock);
taosThreadMutexLock(&pTsdb->mutex);
// alloc
STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap));
if (pSnap == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
taosThreadMutexUnlock(&pTsdb->mutex);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
@ -4922,7 +4927,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
pSnap->pMem = pTsdb->mem;
pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode));
if (pSnap->pNode == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
taosThreadMutexUnlock(&pTsdb->mutex);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
@ -4937,7 +4942,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
pSnap->pIMem = pTsdb->imem;
pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode));
if (pSnap->pINode == NULL) {
taosThreadRwlockUnlock(&pTsdb->rwLock);
taosThreadMutexUnlock(&pTsdb->mutex);
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
@ -4952,7 +4957,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs
code = tsdbFSCreateRefSnapshotWithoutLock(pTsdb->pFS, &pSnap->pfSetArray);
// unlock
taosThreadRwlockUnlock(&pTsdb->rwLock);
taosThreadMutexUnlock(&pTsdb->mutex);
if (code == TSDB_CODE_SUCCESS) {
tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode));
@ -5005,4 +5010,5 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
pReader->status.fileIter.pLastBlockReader->mergeTree.idStr = pReader->idStr;
}
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ }
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
}

View File

@ -25,11 +25,6 @@ typedef struct {
TFileSetArray *fsetArr;
TFileOpArray fopArr[1];
struct {
int32_t fsetArrIdx;
STFileSet *fset;
} ctx[1];
} SRTNer;
static int32_t tsdbDoRemoveFileObject(SRTNer *rtner, const STFileObj *fobj) {
@ -227,8 +222,8 @@ _exit:
typedef struct {
STsdb *tsdb;
int32_t sync;
int64_t now;
int32_t fid;
} SRtnArg;
static int32_t tsdbDoRetentionBegin(SRtnArg *arg, SRTNer *rtner) {
@ -263,15 +258,15 @@ static int32_t tsdbDoRetentionEnd(SRTNer *rtner) {
code = tsdbFSEditBegin(rtner->tsdb->pFS, rtner->fopArr, TSDB_FEDIT_MERGE);
TSDB_CHECK_CODE(code, lino, _exit);
taosThreadRwlockWrlock(&rtner->tsdb->rwLock);
taosThreadMutexLock(&rtner->tsdb->mutex);
code = tsdbFSEditCommit(rtner->tsdb->pFS);
if (code) {
taosThreadRwlockUnlock(&rtner->tsdb->rwLock);
taosThreadMutexUnlock(&rtner->tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosThreadRwlockUnlock(&rtner->tsdb->rwLock);
taosThreadMutexUnlock(&rtner->tsdb->mutex);
TARRAY2_DESTROY(rtner->fopArr, NULL);
@ -285,95 +280,83 @@ _exit:
return code;
}
static int32_t tsdbDoRetention2(void *arg) {
int32_t code = 0;
int32_t lino = 0;
SRTNer rtner[1] = {0};
static int32_t tsdbDoRetentionOnFileSet(SRTNer *rtner, STFileSet *fset) {
int32_t code = 0;
int32_t lino = 0;
STFileObj *fobj = NULL;
int32_t expLevel = tsdbFidLevel(fset->fid, &rtner->tsdb->keepCfg, rtner->now);
code = tsdbDoRetentionBegin(arg, rtner);
TSDB_CHECK_CODE(code, lino, _exit);
if (expLevel < 0) { // remove the fileset
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
for (rtner->ctx->fsetArrIdx = 0; rtner->ctx->fsetArrIdx < TARRAY2_SIZE(rtner->fsetArr); rtner->ctx->fsetArrIdx++) {
rtner->ctx->fset = TARRAY2_GET(rtner->fsetArr, rtner->ctx->fsetArrIdx);
STFileObj *fobj;
int32_t expLevel = tsdbFidLevel(rtner->ctx->fset->fid, &rtner->tsdb->keepCfg, rtner->now);
if (expLevel < 0) { // remove the file set
for (int32_t ftype = 0; (ftype < TSDB_FTYPE_MAX) && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) {
code = tsdbRemoveFileObjectS3(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
SSttLvl *lvl;
TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else if (expLevel == 0) {
continue;
} else {
SDiskID did;
if (tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did) < 0) {
code = terrno;
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && fobj->f->did.level == nlevel - 1) {
code = tsdbRemoveFileObjectS3(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
}
tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did);
}
// data
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = rtner->ctx->fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
SSttLvl *lvl;
TARRAY2_FOREACH(fset->lvlArr, lvl) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
code = tsdbDoRemoveFileObject(rtner, fobj);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
} else if (expLevel == 0) { // only migrate to upper level
return 0;
} else { // migrate
SDiskID did;
if (tfsAllocDisk(rtner->tsdb->pVnode->pTfs, expLevel, &did) < 0) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
tfsMkdirRecurAt(rtner->tsdb->pVnode->pTfs, rtner->tsdb->path, did);
// data
for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX && (fobj = fset->farr[ftype], 1); ++ftype) {
if (fobj == NULL) continue;
if (fobj->f->did.level == did.level) continue;
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) {
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
if (tsS3Enabled) {
int64_t fsize = 0;
if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(terrno);
tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), __func__,
fobj->fname, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit);
}
s3EvictCache(fobj->fname, fsize * 2);
}
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// stt
SSttLvl *lvl;
TARRAY2_FOREACH(fset->lvlArr, lvl) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
if (fobj->f->did.level == did.level) continue;
int32_t nlevel = tfsGetLevel(rtner->tsdb->pVnode->pTfs);
if (tsS3Enabled && nlevel > 1 && TSDB_FTYPE_DATA == ftype && did.level == nlevel - 1) {
code = tsdbMigrateDataFileS3(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
if (tsS3Enabled) {
int64_t fsize = 0;
if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) {
code = TAOS_SYSTEM_ERROR(terrno);
tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(rtner->tsdb->pVnode), __func__,
fobj->fname, tstrerror(code));
TSDB_CHECK_CODE(code, lino, _exit);
}
s3EvictCache(fobj->fname, fsize * 2);
}
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
// stt
SSttLvl *lvl;
TARRAY2_FOREACH(rtner->ctx->fset->lvlArr, lvl) {
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
if (fobj->f->did.level == did.level) continue;
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbDoMigrateFileObj(rtner, fobj, &did);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
code = tsdbDoRetentionEnd(rtner);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
if (TARRAY2_DATA(rtner->fopArr)) {
@ -389,30 +372,105 @@ _exit:
return code;
}
static void tsdbFreeRtnArg(void *arg) {
SRtnArg *rArg = (SRtnArg *)arg;
if (rArg->sync) {
tsem_post(&rArg->tsdb->pVnode->canCommit);
static int32_t tsdbDoRetentionSync(void *arg) {
int32_t code = 0;
int32_t lino = 0;
SRTNer rtner[1] = {0};
code = tsdbDoRetentionBegin(arg, rtner);
TSDB_CHECK_CODE(code, lino, _exit);
STFileSet *fset;
TARRAY2_FOREACH(rtner->fsetArr, fset) {
code = tsdbDoRetentionOnFileSet(rtner, fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
taosMemoryFree(arg);
code = tsdbDoRetentionEnd(rtner);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
}
tsem_post(&((SRtnArg *)arg)->tsdb->pVnode->canCommit);
return code;
}
int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) return TSDB_CODE_OUT_OF_MEMORY;
arg->tsdb = tsdb;
arg->sync = sync;
arg->now = now;
static int32_t tsdbDoRetentionAsync(void *arg) {
int32_t code = 0;
int32_t lino = 0;
SRTNer rtner[1] = {0};
if (sync) {
tsem_wait(&tsdb->pVnode->canCommit);
code = tsdbDoRetentionBegin(arg, rtner);
TSDB_CHECK_CODE(code, lino, _exit);
STFileSet *fset;
TARRAY2_FOREACH(rtner->fsetArr, fset) {
if (fset->fid != ((SRtnArg *)arg)->fid) continue;
code = tsdbDoRetentionOnFileSet(rtner, fset);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t taskid;
int32_t code =
tsdbFSScheduleBgTask(tsdb->pFS, TSDB_BG_TASK_RETENTION, tsdbDoRetention2, tsdbFreeRtnArg, arg, &taskid);
code = tsdbDoRetentionEnd(rtner);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbFreeRtnArg(arg);
TSDB_ERROR_LOG(TD_VID(rtner->tsdb->pVnode), lino, code);
}
return code;
}
static void tsdbFreeRtnArg(void *arg) { taosMemoryFree(arg); }
int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync) {
int32_t code = 0;
if (sync) { // sync retention
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
arg->tsdb = tsdb;
arg->now = now;
arg->fid = INT32_MAX;
tsem_wait(&tsdb->pVnode->canCommit);
code = vnodeScheduleTask(tsdbDoRetentionSync, arg);
if (code) {
tsem_post(&tsdb->pVnode->canCommit);
taosMemoryFree(arg);
return code;
}
} else { // async retention
taosThreadMutexLock(&tsdb->mutex);
STFileSet *fset;
TARRAY2_FOREACH(tsdb->pFS->fSetArr, fset) {
SRtnArg *arg = taosMemoryMalloc(sizeof(*arg));
if (arg == NULL) {
taosThreadMutexUnlock(&tsdb->mutex);
return TSDB_CODE_OUT_OF_MEMORY;
}
arg->tsdb = tsdb;
arg->now = now;
arg->fid = fset->fid;
code = tsdbFSScheduleBgTask(tsdb->pFS, fset->fid, TSDB_BG_TASK_RETENTION, tsdbDoRetentionAsync, tsdbFreeRtnArg,
arg, NULL);
if (code) {
tsdbFreeRtnArg(arg);
taosThreadMutexUnlock(&tsdb->mutex);
return code;
}
}
taosThreadMutexUnlock(&tsdb->mutex);
}
return code;
}

View File

@ -38,8 +38,8 @@ struct STsdbSnapReader {
struct {
int32_t fsrArrIdx;
STSnapRange* fsr;
bool isDataDone;
bool isTombDone;
bool isDataDone;
bool isTombDone;
} ctx[1];
// reader
@ -1095,17 +1095,17 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
code = tsdbFSEditAbort(writer[0]->tsdb->pFS);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
taosThreadRwlockWrlock(&writer[0]->tsdb->rwLock);
taosThreadMutexLock(&writer[0]->tsdb->mutex);
code = tsdbFSEditCommit(writer[0]->tsdb->pFS);
if (code) {
taosThreadRwlockUnlock(&writer[0]->tsdb->rwLock);
taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
TSDB_CHECK_CODE(code, lino, _exit);
}
writer[0]->tsdb->pFS->fsstate = TSDB_FS_STATE_NORMAL;
taosThreadRwlockUnlock(&writer[0]->tsdb->rwLock);
taosThreadMutexUnlock(&writer[0]->tsdb->mutex);
}
tsdbFSEnableBgTask(tsdb->pFS);
@ -1236,7 +1236,7 @@ static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP
if (fset->farr[ftype] == NULL) continue;
typ = tsdbFTypeToSRangeTyp(ftype);
ASSERT(typ < TSDB_SNAP_RANGE_TYP_MAX);
STFile* f = fset->farr[ftype]->f;
STFile* f = fset->farr[ftype]->f;
if (f->maxVer > fset->maxVerValid) {
corrupt = true;
tsdbError("skip incomplete data file: fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64
@ -1255,7 +1255,7 @@ static int32_t tsdbTFileSetToSnapPart(STFileSet* fset, STsdbSnapPartition** ppSP
TARRAY2_FOREACH(fset->lvlArr, lvl) {
STFileObj* fobj;
TARRAY2_FOREACH(lvl->fobjArr, fobj) {
STFile* f = fobj->f;
STFile* f = fobj->f;
if (f->maxVer > fset->maxVerValid) {
corrupt = true;
tsdbError("skip incomplete stt file.fid:%d, maxVerValid:%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64
@ -1299,7 +1299,7 @@ static STsdbSnapPartList* tsdbGetSnapPartList(STFileSystem* fs) {
}
int32_t code = 0;
taosThreadRwlockRdlock(&fs->tsdb->rwLock);
taosThreadMutexLock(&fs->tsdb->mutex);
STFileSet* fset;
TARRAY2_FOREACH(fs->fSetArr, fset) {
STsdbSnapPartition* pItem = NULL;
@ -1311,7 +1311,7 @@ static STsdbSnapPartList* tsdbGetSnapPartList(STFileSystem* fs) {
code = TARRAY2_SORT_INSERT(pList, pItem, tsdbSnapPartCmprFn);
ASSERT(code == 0);
}
taosThreadRwlockUnlock(&fs->tsdb->rwLock);
taosThreadMutexUnlock(&fs->tsdb->mutex);
if (code) {
TARRAY2_DESTROY(pList, tsdbSnapPartitionClear);

View File

@ -584,6 +584,7 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
// commit json
if (!rollback) {
ASSERT(pVnode->config.vgId == pWriter->info.config.vgId);
pWriter->info.state.committed = pWriter->ever;
pVnode->config = pWriter->info.config;
pVnode->state = (SVState){.committed = pWriter->info.state.committed,

View File

@ -20,6 +20,7 @@
#include "vnode.h"
#include "vnodeInt.h"
#include "audit.h"
#include "tstrbuild.h"
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessAlterStbReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
@ -886,6 +887,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
char tbName[TSDB_TABLE_FNAME_LEN];
STbUidStore *pStore = NULL;
SArray *tbUids = NULL;
SArray *tbNames = NULL;
pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP;
pRsp->code = TSDB_CODE_SUCCESS;
@ -902,7 +904,8 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp));
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
if (rsp.pArray == NULL || tbUids == NULL) {
tbNames = taosArrayInit(req.nReqs, sizeof(char*));
if (rsp.pArray == NULL || tbUids == NULL || tbNames == NULL) {
rcode = -1;
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
@ -948,12 +951,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
taosArrayPush(rsp.pArray, &cRsp);
if(tsEnableAuditCreateTable){
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
SName name = {0};
tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
auditRecord(NULL, clusterId, "createTable", name.dbname, "", pCreateReq->name, strlen(pCreateReq->name));
char* str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
strcpy(str, pCreateReq->name);
taosArrayPush(tbNames, &str);
}
}
@ -976,6 +976,30 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq,
tEncoderInit(&encoder, pRsp->pCont, pRsp->contLen);
tEncodeSVCreateTbBatchRsp(&encoder, &rsp);
if(tsEnableAuditCreateTable){
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
SName name = {0};
tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
SStringBuilder sb = {0};
for(int32_t iReq = 0; iReq < req.nReqs; iReq++){
char** key = (char**)taosArrayGet(tbNames, iReq);
taosStringBuilderAppendStringLen(&sb, *key, strlen(*key));
if(iReq < req.nReqs - 1){
taosStringBuilderAppendChar(&sb, ',');
}
taosMemoryFreeClear(*key);
}
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
auditRecord(NULL, clusterId, "createTable", name.dbname, "", keyJoined, len);
taosStringBuilderDestroy(&sb);
}
_exit:
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
@ -987,6 +1011,7 @@ _exit:
taosArrayDestroy(tbUids);
tDecoderClear(&decoder);
tEncoderClear(&encoder);
taosArrayDestroy(tbNames);
return rcode;
}
@ -1120,6 +1145,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
int32_t ret;
SArray *tbUids = NULL;
STbUidStore *pStore = NULL;
SArray *tbNames = NULL;
pRsp->msgType = TDMT_VND_DROP_TABLE_RSP;
pRsp->pCont = NULL;
@ -1138,7 +1164,8 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
// process req
tbUids = taosArrayInit(req.nReqs, sizeof(int64_t));
rsp.pArray = taosArrayInit(req.nReqs, sizeof(SVDropTbRsp));
if (tbUids == NULL || rsp.pArray == NULL) goto _exit;
tbNames = taosArrayInit(req.nReqs, sizeof(char*));
if (tbUids == NULL || rsp.pArray == NULL || tbNames == NULL) goto _exit;
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq *pDropTbReq = req.pReqs + iReq;
@ -1159,11 +1186,41 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in
}
taosArrayPush(rsp.pArray, &dropTbRsp);
if(tsEnableAuditCreateTable){
char* str = taosMemoryCalloc(1, TSDB_TABLE_FNAME_LEN);
strcpy(str, pDropTbReq->name);
taosArrayPush(tbNames, &str);
}
}
tqUpdateTbUidList(pVnode->pTq, tbUids, false);
tdUpdateTbUidList(pVnode->pSma, pStore, false);
if(tsEnableAuditCreateTable){
int64_t clusterId = pVnode->config.syncCfg.nodeInfo[0].clusterId;
SName name = {0};
tNameFromString(&name, pVnode->config.dbname, T_NAME_ACCT | T_NAME_DB);
SStringBuilder sb = {0};
for(int32_t iReq = 0; iReq < req.nReqs; iReq++){
char** key = (char**)taosArrayGet(tbNames, iReq);
taosStringBuilderAppendStringLen(&sb, *key, strlen(*key));
if(iReq < req.nReqs - 1){
taosStringBuilderAppendChar(&sb, ',');
}
taosMemoryFreeClear(*key);
}
size_t len = 0;
char* keyJoined = taosStringBuilderGetResult(&sb, &len);
auditRecord(NULL, clusterId, "dropTable", name.dbname, "", keyJoined, len);
taosStringBuilderDestroy(&sb);
}
_exit:
taosArrayDestroy(tbUids);
tdUidStoreFree(pStore);
@ -1174,6 +1231,7 @@ _exit:
tEncodeSVDropTbBatchRsp(&encoder, &rsp);
tEncoderClear(&encoder);
taosArrayDestroy(rsp.pArray);
taosArrayDestroy(tbNames);
return 0;
}

View File

@ -516,7 +516,10 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool
pVnode->config.vgId, isApply, pSnapshot->lastApplyIndex, pSnapshot->lastApplyTerm, pSnapshot->lastConfigIndex);
int32_t code = vnodeSnapWriterClose(pWriter, !isApply, pSnapshot);
vInfo("vgId:%d, apply vnode snapshot finished, code:0x%x", pVnode->config.vgId, code);
if (code != 0) {
vError("vgId:%d, failed to finish applying vnode snapshot since %s, code:0x%x", pVnode->config.vgId, terrstr(),
code);
}
return code;
}

View File

@ -1030,7 +1030,11 @@ SNode* createSelectStmt(SAstCreateContext* pCxt, bool isDistinct, SNodeList* pPr
SNode* setSelectStmtTagMode(SAstCreateContext* pCxt, SNode* pStmt, bool bSelectTags) {
if (pStmt && QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
((SSelectStmt*)pStmt)->tagScan = bSelectTags;
if (pCxt->pQueryCxt->biMode) {
((SSelectStmt*)pStmt)->tagScan = true;
} else {
((SSelectStmt*)pStmt)->tagScan = bSelectTags;
}
}
return pStmt;
}

View File

@ -3499,6 +3499,20 @@ static const char* getPrecisionStr(uint8_t precision) {
return "unknown";
}
static int64_t getPrecisionMultiple(uint8_t precision) {
switch (precision) {
case TSDB_TIME_PRECISION_MILLI:
return 1;
case TSDB_TIME_PRECISION_MICRO:
return 1000;
case TSDB_TIME_PRECISION_NANO:
return 1000000;
default:
break;
}
return 1;
}
static void convertVarDuration(SValueNode* pOffset, uint8_t precision) {
const int64_t factors[3] = {NANOSECOND_PER_MSEC, NANOSECOND_PER_USEC, 1};
const int8_t units[3] = {TIME_UNIT_MILLISECOND, TIME_UNIT_MICROSECOND, TIME_UNIT_NANOSECOND};
@ -3512,6 +3526,7 @@ static void convertVarDuration(SValueNode* pOffset, uint8_t precision) {
pOffset->unit = units[precision];
}
static const int64_t tsdbMaxKeepMS = (int64_t)60 * 1000 * TSDB_MAX_KEEP;
static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode* pInterval) {
uint8_t precision = ((SColumnNode*)pInterval->pCol)->node.resType.precision;
@ -3520,6 +3535,8 @@ static int32_t checkIntervalWindow(STranslateContext* pCxt, SIntervalWindowNode*
if (pInter->datum.i <= 0 || (!valInter && pInter->datum.i < tsMinIntervalTime)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL, tsMinIntervalTime,
getPrecisionStr(precision));
} else if (pInter->datum.i / getPrecisionMultiple(precision) > tsdbMaxKeepMS) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INTER_VALUE_TOO_BIG, 1000, "years");
}
if (NULL != pInterval->pOffset) {

View File

@ -65,6 +65,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "This statement is no longer supported";
case TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL:
return "Interval cannot be less than %d %s";
case TSDB_CODE_PAR_INTER_VALUE_TOO_BIG:
return "Interval cannot be more than %d %s";
case TSDB_CODE_PAR_DB_NOT_SPECIFIED:
return "Database not specified";
case TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME:

View File

@ -249,8 +249,8 @@ int32_t syncNodeOnRequestVote(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnRequestVoteReply(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntries(SSyncNode* pNode, const SRpcMsg* pMsg);
int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnSnapshot(SSyncNode* ths, SRpcMsg* pMsg);
int32_t syncNodeOnSnapshotRsp(SSyncNode* ths, SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeat(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, const SRpcMsg* pMsg);
int32_t syncNodeOnLocalCmd(SSyncNode* ths, const SRpcMsg* pMsg);

View File

@ -22,21 +22,41 @@ extern "C" {
#include "syncInt.h"
#define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_FORCE_CLOSE -3
#define SYNC_SNAPSHOT_SEQ_PREP_SNAPSHOT -1
#define SYNC_SNAPSHOT_SEQ_INVALID -2
#define SYNC_SNAPSHOT_SEQ_PREP -1
#define SYNC_SNAPSHOT_SEQ_BEGIN 0
#define SYNC_SNAPSHOT_SEQ_END 0x7FFFFFFF
#define SYNC_SNAPSHOT_RETRY_MS 5000
typedef struct SSyncSnapBuffer {
void *entries[TSDB_SYNC_SNAP_BUFFER_SIZE];
int64_t start;
int64_t cursor;
int64_t end;
int64_t size;
TdThreadMutex mutex;
void (*entryDeleteCb)(void *ptr);
} SSyncSnapBuffer;
typedef struct SyncSnapBlock {
int32_t seq;
int8_t acked;
int64_t sendTimeMs;
int16_t blockType;
void *pBlock;
int32_t blockLen;
} SyncSnapBlock;
void syncSnapBlockDestroy(void *ptr);
typedef struct SSyncSnapshotSender {
int8_t start;
int32_t seq;
int32_t ack;
void *pReader;
void *pCurrentBlock;
int32_t blockLen;
SSnapshotParam snapshotParam;
SSnapshot snapshot;
SSyncCfg lastConfig;
@ -47,6 +67,9 @@ typedef struct SSyncSnapshotSender {
int64_t lastSendTime;
bool finish;
// ring buffer for ack
SSyncSnapBuffer *pSndBuf;
// init when create
SSyncNode *pSyncNode;
int32_t replicaIndex;
@ -72,6 +95,9 @@ typedef struct SSyncSnapshotReceiver {
SSnapshotParam snapshotParam;
SSnapshot snapshot;
// buffer
SSyncSnapBuffer *pRcvBuf;
// init when create
SSyncNode *pSyncNode;
} SSyncSnapshotReceiver;
@ -83,8 +109,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
// on message
int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg);
int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg);
// int32_t syncNodeOnSnapshot(SSyncNode *ths, const SRpcMsg *pMsg);
// int32_t syncNodeOnSnapshotRsp(SSyncNode *ths, const SRpcMsg *pMsg);
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode *pSyncNode, SyncIndex snapshotLastApplyIndex);

View File

@ -818,7 +818,8 @@ SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion) {
if (!taosCheckExistFile(pSyncNode->configPath)) {
// create a new raft config file
sInfo("vgId:%d, create a new raft config file", pSyncNode->vgId);
sInfo("vgId:%d, create a new raft config file", pSyncInfo->vgId);
pSyncNode->vgId = pSyncInfo->vgId;
pSyncNode->raftCfg.isStandBy = pSyncInfo->isStandBy;
pSyncNode->raftCfg.snapshotStrategy = pSyncInfo->snapshotStrategy;
pSyncNode->raftCfg.lastConfigIndex = pSyncInfo->syncCfg.lastIndex;

View File

@ -797,7 +797,7 @@ _out:
pMgr->retryBackoff = syncLogReplGetNextRetryBackoff(pMgr);
SSyncLogBuffer* pBuf = pNode->pLogBuf;
sInfo("vgId:%d, resend %d sync log entries. dest:%" PRIx64 ", indexes:%" PRId64 " ..., terms: ... %" PRId64
", retryWaitMs:%" PRId64 ", mgr: [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
", retryWaitMs:%" PRId64 ", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64
" %" PRId64 ", %" PRId64 ")",
pNode->vgId, count, pDestId->addr, firstIndex, term, retryWaitMs, pMgr->startIndex, pMgr->matchIndex,
pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
@ -815,9 +815,9 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
ASSERT(pMgr->matchIndex == 0);
if (pMsg->matchIndex < 0) {
pMgr->restored = true;
sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
"), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0;
}
@ -832,9 +832,9 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
if (pMsg->success && pMsg->matchIndex == pMsg->lastSendIndex) {
pMgr->matchIndex = pMsg->matchIndex;
pMgr->restored = true;
sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), mgr: rs(%d) [%" PRId64 " %" PRId64
", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, DID(&destId), destId.addr, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
sInfo("vgId:%d, sync log repl restored. peer: dnode:%d (%" PRIx64 "), repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64
"), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, DID(&destId), destId.addr, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0;
}
@ -958,10 +958,10 @@ int32_t syncLogReplProbe(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncIndex inde
pMgr->endIndex = index + 1;
SSyncLogBuffer* pBuf = pNode->pLogBuf;
sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ". mgr (rs:%d): [%" PRId64
" %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pDestId->addr, index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
sTrace("vgId:%d, probe peer:%" PRIx64 " with msg of index:%" PRId64 " term:%" PRId64 ". repl-mgr:[%" PRId64
" %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, pDestId->addr, index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex, pBuf->startIndex,
pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0;
}
@ -1002,9 +1002,9 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
pMgr->endIndex = index + 1;
if (barrier) {
sInfo("vgId:%d, replicated sync barrier to dnode:%d. index:%" PRId64 ", term:%" PRId64
", repl mgr: rs(%d) [%" PRId64 " %" PRId64 ", %" PRId64 ")",
pNode->vgId, DID(pDestId), index, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
sInfo("vgId:%d, replicated sync barrier to dnode:%d. index:%" PRId64 ", term:%" PRId64 ", repl-mgr:[%" PRId64
" %" PRId64 ", %" PRId64 ")",
pNode->vgId, DID(pDestId), index, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex);
break;
}
}
@ -1013,10 +1013,10 @@ int32_t syncLogReplAttempt(SSyncLogReplMgr* pMgr, SSyncNode* pNode) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
sTrace("vgId:%d, replicated %d msgs to peer:%" PRIx64 ". indexes:%" PRId64 "..., terms: ...%" PRId64
", mgr: (rs:%d) [%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
", repl-mgr:[%" PRId64 " %" PRId64 ", %" PRId64 "), buffer: [%" PRId64 " %" PRId64 " %" PRId64 ", %" PRId64
")",
pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->restored, pMgr->startIndex, pMgr->matchIndex,
pMgr->endIndex, pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
pNode->vgId, count, pDestId->addr, firstIndex, term, pMgr->startIndex, pMgr->matchIndex, pMgr->endIndex,
pBuf->startIndex, pBuf->commitIndex, pBuf->matchIndex, pBuf->endIndex);
return 0;
}

File diff suppressed because it is too large Load Diff

View File

@ -77,12 +77,19 @@ static int32_t syncNodeTimerRoutine(SSyncNode* ths) {
for (int i = 0; i < ths->peersNum; ++i) {
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(ths->peersId[i]));
if (pSender != NULL) {
if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start &&
timeNow - pSender->lastSendTime > SYNC_SNAP_RESEND_MS) {
snapshotReSend(pSender);
} else {
sTrace("vgId:%d, do not resend: nstart%d, now:%" PRId64 ", lstsend:%" PRId64 ", diff:%" PRId64, ths->vgId,
ths->isStart, timeNow, pSender->lastSendTime, timeNow - pSender->lastSendTime);
if (ths->isStart && ths->state == TAOS_SYNC_STATE_LEADER && pSender->start) {
int64_t elapsedMs = timeNow - pSender->lastSendTime;
if (elapsedMs < SYNC_SNAP_RESEND_MS) {
continue;
}
if (elapsedMs > SYNC_SNAP_TIMEOUT_MS) {
sSError(pSender, "snap replication timeout, terminate.");
snapshotSenderStop(pSender, false);
} else {
sSWarn(pSender, "snap replication resend.");
snapshotReSend(pSender);
}
}
}
}

View File

@ -267,21 +267,23 @@ void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dfla
va_end(argpointer);
taosPrintLog(flags, level, dflag,
"vgId:%d, %s, sync:%s, snap-sender:{%p start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64
" last-term:%" PRIu64 " last-cfg:%" PRId64
", seq:%d ack:%d finish:%d, as:%d dnode:%d}"
"vgId:%d, %s, sync:%s, snap-sender:%p signature:(%" PRId64 ", %" PRId64 "), {start:%" PRId64
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
", seq:%d, ack:%d, "
" buf:[%" PRId64 " %" PRId64 ", %" PRId64
"), finish:%d, as:%d, to-dnode:%d}"
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64
", min-match:%" PRId64 ", snap:{last-index:%" PRId64 ", term:%" PRIu64
"}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, lc-timer:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s",
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->snapshotParam.start,
pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack, pSender->finish, pSender->replicaIndex,
DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode), pNode->commitIndex,
logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex, snapshot.lastApplyTerm,
pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum, pNode->raftCfg.lastConfigIndex,
pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock,
pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
pNode->vgId, eventLog, syncStr(pNode->state), pSender, pSender->term, pSender->startTime,
pSender->snapshotParam.start, pSender->snapshotParam.end, pSender->snapshot.lastApplyIndex,
pSender->snapshot.lastApplyTerm, pSender->snapshot.lastConfigIndex, pSender->seq, pSender->ack,
pSender->pSndBuf->start, pSender->pSndBuf->cursor, pSender->pSndBuf->end, pSender->finish,
pSender->replicaIndex, DID(&pNode->replicasId[pSender->replicaIndex]), raftStoreGetTerm(pNode),
pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize, pNode->replicaNum,
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr, cfgStr);
}
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,
@ -316,19 +318,21 @@ void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t df
taosPrintLog(
flags, level, dflag,
"vgId:%d, %s, sync:%s,"
" snap-receiver:{%p started:%d acked:%d term:%" PRIu64 " start-time:%" PRId64 " from-dnode:%d, start:%" PRId64
" end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
" snap-receiver:%p signature:(%" PRId64 ", %" PRId64 "), {start:%d ack:%d buf:[%" PRId64 " %" PRId64 ", %" PRId64
")"
" from-dnode:%d, start:%" PRId64 " end:%" PRId64 " last-index:%" PRId64 " last-term:%" PRIu64 " last-cfg:%" PRId64
"}"
", term:%" PRIu64 ", commit-index:%" PRId64 ", firstver:%" PRId64 ", lastver:%" PRId64 ", min-match:%" PRId64
", snap:{last-index:%" PRId64 ", last-term:%" PRIu64 "}, standby:%d, batch-sz:%d, replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, lc-timers:{elect:%" PRId64 ", hb:%" PRId64 "}, peer:%s, cfg:%s",
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->start, pReceiver->ack, pReceiver->term,
pReceiver->startTime, DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end,
", chging:%d, restore:%d, quorum:%d, peer:%s, cfg:%s",
pNode->vgId, eventLog, syncStr(pNode->state), pReceiver, pReceiver->term, pReceiver->startTime, pReceiver->start,
pReceiver->ack, pReceiver->pRcvBuf->start, pReceiver->pRcvBuf->cursor, pReceiver->pRcvBuf->end,
DID(&pReceiver->fromId), pReceiver->snapshotParam.start, pReceiver->snapshotParam.end,
pReceiver->snapshot.lastApplyIndex, pReceiver->snapshot.lastApplyTerm, pReceiver->snapshot.lastConfigIndex,
raftStoreGetTerm(pNode), pNode->commitIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex,
snapshot.lastApplyIndex, snapshot.lastApplyTerm, pNode->raftCfg.isStandBy, pNode->raftCfg.batchSize,
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, peerStr, cfgStr);
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, pNode->quorum, peerStr,
cfgStr);
}
void syncLogRecvTimer(SSyncNode* pSyncNode, const SyncTimeout* pMsg, const char* s) {

View File

@ -110,7 +110,9 @@ int32_t tfsAllocDiskOnTier(STfsTier *pTier) {
}
int32_t retId = -1;
int64_t avail = 0;
for (int32_t id = 0; id < TFS_MAX_DISKS_PER_TIER; ++id) {
#if 0 // round-robin
int32_t diskId = (pTier->nextid + id) % pTier->ndisk;
STfsDisk *pDisk = pTier->disks[diskId];
@ -126,6 +128,18 @@ int32_t tfsAllocDiskOnTier(STfsTier *pTier) {
terrno = 0;
pTier->nextid = (diskId + 1) % pTier->ndisk;
break;
#else // select the disk with the most available space
STfsDisk *pDisk = pTier->disks[id];
if (pDisk == NULL) continue;
if (pDisk->size.avail < tsMinDiskFreeSize) continue;
if (pDisk->size.avail > avail) {
avail = pDisk->size.avail;
retId = id;
terrno = 0;
}
#endif
}
tfsUnLockTier(pTier);

View File

@ -8,7 +8,7 @@ target_link_libraries(
PUBLIC gtest_main
)
add_test(
NAME tfs_test
COMMAND tfs_test
)
# add_test(
# NAME tfs_test
# COMMAND tfs_test
# )

View File

@ -477,47 +477,38 @@ struct tm *taosLocalTime(const time_t *timep, struct tm *result, char *buf) {
return res;
}
#ifdef WINDOWS
if (*timep < 0) {
if (*timep < -2208988800LL) {
if (buf != NULL) {
sprintf(buf, "NaN");
}
return NULL;
}
SYSTEMTIME s;
FILETIME f;
LARGE_INTEGER offset;
struct tm tm1;
time_t tt = 0;
if (localtime_s(&tm1, &tt) != 0 ) {
if (buf != NULL) {
sprintf(buf, "NaN");
}
return NULL;
}
offset.QuadPart = TIMEEPOCH1900;
offset.QuadPart += *timep * 10000000;
f.dwLowDateTime = offset.QuadPart & 0xffffffff;
f.dwHighDateTime = (offset.QuadPart >> 32) & 0xffffffff;
FileTimeToSystemTime(&f, &s);
result->tm_sec = s.wSecond;
result->tm_min = s.wMinute;
result->tm_hour = s.wHour;
result->tm_mday = s.wDay;
result->tm_mon = s.wMonth - 1;
result->tm_year = s.wYear - 1900;
result->tm_wday = s.wDayOfWeek;
result->tm_yday = 0;
result->tm_isdst = 0;
} else {
if (localtime_s(result, timep) != 0) {
if (buf != NULL) {
sprintf(buf, "NaN");
}
return NULL;
if (*timep < -2208988800LL) {
if (buf != NULL) {
sprintf(buf, "NaN");
}
return NULL;
}
SYSTEMTIME s;
FILETIME f;
LARGE_INTEGER offset;
struct tm tm1;
time_t tt = 0;
if (localtime_s(&tm1, &tt) != 0) {
if (buf != NULL) {
sprintf(buf, "NaN");
}
return NULL;
}
offset.QuadPart = TIMEEPOCH1900;
offset.QuadPart += *timep * 10000000;
f.dwLowDateTime = offset.QuadPart & 0xffffffff;
f.dwHighDateTime = (offset.QuadPart >> 32) & 0xffffffff;
FileTimeToSystemTime(&f, &s);
result->tm_sec = s.wSecond;
result->tm_min = s.wMinute;
result->tm_hour = s.wHour;
result->tm_mday = s.wDay;
result->tm_mon = s.wMonth - 1;
result->tm_year = s.wYear - 1900;
result->tm_wday = s.wDayOfWeek;
result->tm_yday = 0;
result->tm_isdst = 0;
#else
res = localtime_r(timep, result);
if (res == NULL && buf != NULL) {

View File

@ -114,10 +114,6 @@ TEST(osTimeTests, taosLocalTime) {
ASSERT_EQ(local_time->tm_min, 0);
ASSERT_EQ(local_time->tm_sec, 0);
time_t over_timep = 6406301441633558;
local_time = taosLocalTime(&over_timep, &result, NULL);
ASSERT_EQ(local_time, nullptr);
time_t neg_timep3 = -78115158887;
local_time = taosLocalTime(&neg_timep3, &result, NULL);
ASSERT_EQ(local_time, nullptr);

View File

@ -518,6 +518,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PORT, "Port should be an in
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ENDPOINT, "Endpoint should be in the format of 'fqdn:port'")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_EXPRIE_STATEMENT, "This statement is no longer supported")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_VALUE_TOO_SMALL, "Interval too small")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTER_VALUE_TOO_BIG, "Interval too big")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_DB_NOT_SPECIFIED, "Database not specified")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_IDENTIFIER_NAME, "Invalid identifier name")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_CORRESPONDING_STABLE_ERR, "Corresponding super table not in this db")

View File

@ -1048,6 +1048,7 @@ e
,,y,script,./test.sh -f tsim/query/tableCount.sim
,,y,script,./test.sh -f tsim/query/show_db_table_kind.sim
,,y,script,./test.sh -f tsim/query/bi_star_table.sim
,,y,script,./test.sh -f tsim/query/bi_tag_scan.sim
,,y,script,./test.sh -f tsim/query/tag_scan.sim
,,y,script,./test.sh -f tsim/query/nullColSma.sim
,,y,script,./test.sh -f tsim/query/bug3398.sim

View File

@ -69,7 +69,7 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j 10|| exit 1"
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0;make -j 10|| exit 1"
# -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
if [[ -d ${WORKDIR}/debugNoSan ]] ;then
@ -99,7 +99,7 @@ docker run \
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=0;make -j 10|| exit 1 "
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_SANITIZER=1 -DTOOLS_SANITIZE=true -DTOOLS_BUILD_TYPE=Debug -DBUILD_TAOSX=false -DJEMALLOC_ENABLED=0;make -j 10|| exit 1 "
mv ${REP_REAL_PATH}/debug ${WORKDIR}/debugSan

View File

@ -0,0 +1,31 @@
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/exec.sh -n dnode1 -s start
sql connect
sql drop database if exists db1;
sql create database db1 vgroups 3;
sql create database db1;
sql use db1;
sql create stable sta (ts timestamp, f1 int, f2 binary(200)) tags(t1 int, t2 int, t3 int);
sql create stable stb (ts timestamp, f1 int, f2 binary(200)) tags(t1 int, t2 int, t3 int);
sql create table tba1 using sta tags(1, 1, 1);
sql create table tba2 using sta tags(2, 2, 2);
sql insert into tba1 values(now, 1, "1")(now+3s, 3, "3")(now+5s, 5, "5");
sql insert into tba2 values(now + 1s, 2, "2")(now+2s, 2, "2")(now+4s, 4, "4");
sql create table tbn1 (ts timestamp, f1 int);
set_bi_mode 1
sql select t1,t2,t3 from db1.sta order by t1;
print $rows $data00 $data10
if $rows != 2 then
return -1
endi
if $data00 != 1 then
return -1
endi
if $data10 != 2 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -21,6 +21,7 @@ sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 in
sql create stream streams2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s) sliding (5s);
sql create stream stream_t1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s);
sql create stream stream_t2 trigger at_once watermark 1d IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamtST2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s) sliding (5s);
sleep 1000
sql insert into t1 values(1648791210000,1,2,3,1.0);
sql insert into t1 values(1648791216000,2,2,3,1.1);
@ -311,6 +312,7 @@ sql create table t2 using st tags(2,2,2);
sql create stream streams11 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s);
sql create stream streams12 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt2 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s);
sleep 1000
sql insert into t1 values(1648791213000,1,2,3,1.0);
sql insert into t1 values(1648791223001,2,2,3,1.1);
@ -444,6 +446,7 @@ sql create table t2 using st tags(2,2,2);
sql create stream streams21 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt21 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from t1 interval(10s, 5s);
sql create stream streams22 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt22 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(10s, 5s);
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1);
@ -582,6 +585,7 @@ sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams23 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt23 as select _wstart, count(*) c1, sum(a) c3 , max(b) c4, min(c) c5 from st interval(20s) sliding(10s);
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791223001,2,2,2,1.1);
@ -706,6 +710,7 @@ sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql create stream streams4 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt4 as select _wstart as ts, count(*),min(a) c1 from st interval(10s) sliding(5s);
sleep 1000
sql insert into t1 values(1648791213000,1,1,1,1.0);
sql insert into t1 values(1648791243000,2,1,1,1.0);