fix: drop table after commit
This commit is contained in:
parent
82ee169d61
commit
ac2853f28c
|
@ -37,7 +37,6 @@ typedef struct {
|
||||||
int64_t cid;
|
int64_t cid;
|
||||||
int64_t now;
|
int64_t now;
|
||||||
TSKEY nextKey;
|
TSKEY nextKey;
|
||||||
TSKEY maxDelKey;
|
|
||||||
int32_t fid;
|
int32_t fid;
|
||||||
int32_t expLevel;
|
int32_t expLevel;
|
||||||
SDiskID did;
|
SDiskID did;
|
||||||
|
@ -46,7 +45,6 @@ typedef struct {
|
||||||
STFileSet *fset;
|
STFileSet *fset;
|
||||||
TABLEID tbid[1];
|
TABLEID tbid[1];
|
||||||
bool hasTSData;
|
bool hasTSData;
|
||||||
bool skipTsRow;
|
|
||||||
} ctx[1];
|
} ctx[1];
|
||||||
|
|
||||||
// reader
|
// reader
|
||||||
|
@ -128,21 +126,8 @@ static int32_t tsdbCommitTSData(SCommitter2 *committer) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
extern int8_t tsS3Enabled;
|
|
||||||
|
|
||||||
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
|
|
||||||
committer->ctx->skipTsRow = false;
|
|
||||||
if (tsS3Enabled && nlevel > 1 && committer->ctx->did.level == nlevel - 1) {
|
|
||||||
committer->ctx->skipTsRow = true;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
int64_t ts = TSDBROW_TS(&row->row);
|
int64_t ts = TSDBROW_TS(&row->row);
|
||||||
|
|
||||||
if (committer->ctx->skipTsRow && ts <= committer->ctx->maxKey) {
|
|
||||||
ts = committer->ctx->maxKey + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ts > committer->ctx->maxKey) {
|
if (ts > committer->ctx->maxKey) {
|
||||||
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts);
|
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, ts);
|
||||||
code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
|
code = tsdbIterMergerSkipTableData(committer->dataIterMerger, committer->ctx->tbid);
|
||||||
|
@ -175,15 +160,13 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
|
||||||
int64_t numRecord = 0;
|
int64_t numRecord = 0;
|
||||||
SMetaInfo info;
|
SMetaInfo info;
|
||||||
|
|
||||||
if (committer->ctx->fset == NULL && !committer->ctx->hasTSData) {
|
if (committer->tsdb->imem->nDel == 0) {
|
||||||
if (committer->ctx->maxKey < committer->ctx->maxDelKey) {
|
goto _exit;
|
||||||
committer->ctx->nextKey = committer->ctx->maxKey + 1;
|
|
||||||
} else {
|
|
||||||
committer->ctx->nextKey = TSKEY_MAX;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// do not need to write tomb data if there is no ts data
|
||||||
|
bool skip = (committer->ctx->fset == NULL && !committer->ctx->hasTSData);
|
||||||
|
|
||||||
committer->ctx->tbid->suid = 0;
|
committer->ctx->tbid->suid = 0;
|
||||||
committer->ctx->tbid->uid = 0;
|
committer->ctx->tbid->uid = 0;
|
||||||
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
|
for (STombRecord *record; (record = tsdbIterMergerGetTombRecord(committer->tombIterMerger));) {
|
||||||
|
@ -210,9 +193,11 @@ static int32_t tsdbCommitTombData(SCommitter2 *committer) {
|
||||||
record->skey = TMAX(record->skey, committer->ctx->minKey);
|
record->skey = TMAX(record->skey, committer->ctx->minKey);
|
||||||
record->ekey = TMIN(record->ekey, committer->ctx->maxKey);
|
record->ekey = TMIN(record->ekey, committer->ctx->maxKey);
|
||||||
|
|
||||||
numRecord++;
|
if (!skip) {
|
||||||
code = tsdbFSetWriteTombRecord(committer->writer, record);
|
numRecord++;
|
||||||
TSDB_CHECK_CODE(code, lino, _exit);
|
code = tsdbFSetWriteTombRecord(committer->writer, record);
|
||||||
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = tsdbIterMergerNext(committer->tombIterMerger);
|
code = tsdbIterMergerNext(committer->tombIterMerger);
|
||||||
|
@ -406,31 +391,6 @@ static int32_t tsdbCommitFileSetBegin(SCommitter2 *committer) {
|
||||||
// reset nextKey
|
// reset nextKey
|
||||||
committer->ctx->nextKey = TSKEY_MAX;
|
committer->ctx->nextKey = TSKEY_MAX;
|
||||||
|
|
||||||
committer->ctx->skipTsRow = false;
|
|
||||||
|
|
||||||
extern int8_t tsS3Enabled;
|
|
||||||
extern int32_t tsS3UploadDelaySec;
|
|
||||||
long s3Size(const char *object_name);
|
|
||||||
int32_t nlevel = tfsGetLevel(committer->tsdb->pVnode->pTfs);
|
|
||||||
if (tsS3Enabled && nlevel > 1 && committer->ctx->fset) {
|
|
||||||
STFileObj *fobj = committer->ctx->fset->farr[TSDB_FTYPE_DATA];
|
|
||||||
if (fobj && fobj->f->did.level == nlevel - 1) {
|
|
||||||
// if exists on s3 or local mtime < committer->ctx->now - tsS3UploadDelay
|
|
||||||
const char *object_name = taosDirEntryBaseName((char *)fobj->fname);
|
|
||||||
|
|
||||||
if (taosCheckExistFile(fobj->fname)) {
|
|
||||||
int32_t mtime = 0;
|
|
||||||
taosStatFile(fobj->fname, NULL, &mtime, NULL);
|
|
||||||
if (mtime < committer->ctx->now - tsS3UploadDelaySec) {
|
|
||||||
committer->ctx->skipTsRow = true;
|
|
||||||
}
|
|
||||||
} else /*if (s3Size(object_name) > 0) */ {
|
|
||||||
committer->ctx->skipTsRow = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// new fset can be written with ts data
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
@ -519,28 +479,11 @@ static int32_t tsdbOpenCommitter(STsdb *tsdb, SCommitInfo *info, SCommitter2 *co
|
||||||
STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);
|
STbData *tbData = TCONTAINER_OF(node, STbData, rbtn);
|
||||||
|
|
||||||
for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) {
|
for (SDelData *delData = tbData->pHead; delData; delData = delData->pNext) {
|
||||||
if (delData->sKey < committer->ctx->nextKey) {
|
committer->ctx->nextKey = TMIN(committer->ctx->nextKey, delData->sKey);
|
||||||
committer->ctx->nextKey = delData->sKey;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
committer->ctx->maxDelKey = TSKEY_MIN;
|
|
||||||
TSKEY minKey = TSKEY_MAX;
|
|
||||||
TSKEY maxKey = TSKEY_MIN;
|
|
||||||
if (TARRAY2_SIZE(committer->fsetArr) > 0) {
|
|
||||||
STFileSet *fset = TARRAY2_LAST(committer->fsetArr);
|
|
||||||
tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &committer->ctx->maxDelKey);
|
|
||||||
|
|
||||||
fset = TARRAY2_FIRST(committer->fsetArr);
|
|
||||||
tsdbFidKeyRange(fset->fid, committer->minutes, committer->precision, &minKey, &maxKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (committer->ctx->nextKey < TMIN(tsdb->imem->minKey, minKey)) {
|
|
||||||
committer->ctx->nextKey = TMIN(tsdb->imem->minKey, minKey);
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
|
||||||
|
|
Loading…
Reference in New Issue