Merge pull request #21660 from luckeverda/feat/TD-23739

feat/TD-23739: enhance ttl, deletetime modified by writing-op
This commit is contained in:
wade zhang 2023-06-25 09:06:57 +08:00 committed by GitHub
commit 254a20bdf3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 973 additions and 203 deletions

View File

@ -120,6 +120,7 @@ extern bool tsQueryUseNodeAllocator;
extern bool tsKeepColumnName; extern bool tsKeepColumnName;
extern bool tsEnableQueryHb; extern bool tsEnableQueryHb;
extern bool tsEnableScience; extern bool tsEnableScience;
extern bool tsTtlChangeOnWrite;
extern int32_t tsRedirectPeriod; extern int32_t tsRedirectPeriod;
extern int32_t tsRedirectFactor; extern int32_t tsRedirectFactor;
extern int32_t tsRedirectMaxPeriod; extern int32_t tsRedirectMaxPeriod;

View File

@ -945,7 +945,7 @@ int32_t tSerializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq);
int32_t tDeserializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq); int32_t tDeserializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq);
typedef struct { typedef struct {
int32_t timestamp; int32_t timestampSec;
} SVDropTtlTableReq; } SVDropTtlTableReq;
int32_t tSerializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq* pReq); int32_t tSerializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq* pReq);
@ -2278,7 +2278,7 @@ typedef struct SVCreateTbReq {
int32_t flags; int32_t flags;
char* name; char* name;
tb_uid_t uid; tb_uid_t uid;
int64_t ctime; int64_t btime;
int32_t ttl; int32_t ttl;
int32_t commentLen; int32_t commentLen;
char* comment; char* comment;
@ -2415,10 +2415,12 @@ typedef struct {
int32_t newTTL; int32_t newTTL;
int32_t newCommentLen; int32_t newCommentLen;
char* newComment; char* newComment;
int64_t ctimeMs; // fill by vnode
} SVAlterTbReq; } SVAlterTbReq;
int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq); int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq);
int32_t tDecodeSVAlterTbReq(SDecoder* pDecoder, SVAlterTbReq* pReq); int32_t tDecodeSVAlterTbReq(SDecoder* pDecoder, SVAlterTbReq* pReq);
int32_t tDecodeSVAlterTbReqSetCtime(SDecoder* pDecoder, SVAlterTbReq* pReq, int64_t ctimeMs);
typedef struct { typedef struct {
int32_t code; int32_t code;
@ -3436,6 +3438,7 @@ typedef struct SDeleteRes {
int64_t affectedRows; int64_t affectedRows;
char tableFName[TSDB_TABLE_NAME_LEN]; char tableFName[TSDB_TABLE_NAME_LEN];
char tsColName[TSDB_COL_NAME_LEN]; char tsColName[TSDB_COL_NAME_LEN];
int64_t ctimeMs; // fill by vnode
} SDeleteRes; } SDeleteRes;
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
@ -3454,10 +3457,12 @@ int32_t tDecodeSSingleDeleteReq(SDecoder* pCoder, SSingleDeleteReq* pReq);
typedef struct { typedef struct {
int64_t suid; int64_t suid;
SArray* deleteReqs; // SArray<SSingleDeleteReq> SArray* deleteReqs; // SArray<SSingleDeleteReq>
int64_t ctimeMs; // fill by vnode
} SBatchDeleteReq; } SBatchDeleteReq;
int32_t tEncodeSBatchDeleteReq(SEncoder* pCoder, const SBatchDeleteReq* pReq); int32_t tEncodeSBatchDeleteReq(SEncoder* pCoder, const SBatchDeleteReq* pReq);
int32_t tDecodeSBatchDeleteReq(SDecoder* pCoder, SBatchDeleteReq* pReq); int32_t tDecodeSBatchDeleteReq(SDecoder* pCoder, SBatchDeleteReq* pReq);
int32_t tDecodeSBatchDeleteReqSetCtime(SDecoder* pDecoder, SBatchDeleteReq* pReq, int64_t ctimeMs);
typedef struct { typedef struct {
int32_t msgIdx; int32_t msgIdx;
@ -3525,6 +3530,7 @@ typedef struct {
SArray* aRowP; SArray* aRowP;
SArray* aCol; SArray* aCol;
}; };
int64_t ctimeMs;
} SSubmitTbData; } SSubmitTbData;
typedef struct { typedef struct {

View File

@ -54,7 +54,7 @@ typedef struct SMetaEntry {
SRSmaParam rsmaParam; SRSmaParam rsmaParam;
} stbEntry; } stbEntry;
struct { struct {
int64_t ctime; int64_t btime;
int32_t ttlDays; int32_t ttlDays;
int32_t commentLen; int32_t commentLen;
char* comment; char* comment;
@ -62,7 +62,7 @@ typedef struct SMetaEntry {
uint8_t* pTags; uint8_t* pTags;
} ctbEntry; } ctbEntry;
struct { struct {
int64_t ctime; int64_t btime;
int32_t ttlDays; int32_t ttlDays;
int32_t commentLen; int32_t commentLen;
char* comment; char* comment;

View File

@ -110,6 +110,7 @@ int32_t tsQueryRspPolicy = 0;
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
bool tsEnableQueryHb = false; bool tsEnableQueryHb = false;
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true
int32_t tsQuerySmaOptimize = 0; int32_t tsQuerySmaOptimize = 0;
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
bool tsQueryPlannerTrace = false; bool tsQueryPlannerTrace = false;
@ -513,6 +514,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1; if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1;
@ -873,6 +875,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval; tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval;
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32; tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
@ -978,6 +981,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
taosSetCoreDump(enableCore); taosSetCoreDump(enableCore);
} else if (strcasecmp("enableQueryHb", name) == 0) { } else if (strcasecmp("enableQueryHb", name) == 0) {
tsEnableQueryHb = cfgGetItem(pCfg, "enableQueryHb")->bval; tsEnableQueryHb = cfgGetItem(pCfg, "enableQueryHb")->bval;
} else if (strcasecmp("ttlChangeOnWrite", name) == 0) {
tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval;
} }
break; break;
} }

View File

@ -30,6 +30,9 @@
#include "tlog.h" #include "tlog.h"
static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq);
static int32_t tDecodeSBatchDeleteReqCommon(SDecoder *pDecoder, SBatchDeleteReq *pReq);
int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
@ -1725,7 +1728,7 @@ int32_t tDeserializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq
} else { } else {
pReq->unsafe = false; pReq->unsafe = false;
} }
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -3161,7 +3164,7 @@ int32_t tSerializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableReq
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->timestamp) < 0) return -1; if (tEncodeI32(&encoder, pReq->timestampSec) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -3174,7 +3177,7 @@ int32_t tDeserializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableR
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->timestamp) < 0) return -1; if (tDecodeI32(&decoder, &pReq->timestampSec) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -4671,7 +4674,7 @@ int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnode
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1; if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
} }
} }
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
@ -6409,7 +6412,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
if (tEncodeI32v(pCoder, pReq->flags) < 0) return -1; if (tEncodeI32v(pCoder, pReq->flags) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1; if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
if (tEncodeI64(pCoder, pReq->uid) < 0) return -1; if (tEncodeI64(pCoder, pReq->uid) < 0) return -1;
if (tEncodeI64(pCoder, pReq->ctime) < 0) return -1; if (tEncodeI64(pCoder, pReq->btime) < 0) return -1;
if (tEncodeI32(pCoder, pReq->ttl) < 0) return -1; if (tEncodeI32(pCoder, pReq->ttl) < 0) return -1;
if (tEncodeI8(pCoder, pReq->type) < 0) return -1; if (tEncodeI8(pCoder, pReq->type) < 0) return -1;
if (tEncodeI32(pCoder, pReq->commentLen) < 0) return -1; if (tEncodeI32(pCoder, pReq->commentLen) < 0) return -1;
@ -6444,7 +6447,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1; if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1;
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1; if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->uid) < 0) return -1; if (tDecodeI64(pCoder, &pReq->uid) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->ctime) < 0) return -1; if (tDecodeI64(pCoder, &pReq->btime) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->ttl) < 0) return -1; if (tDecodeI32(pCoder, &pReq->ttl) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->type) < 0) return -1; if (tDecodeI8(pCoder, &pReq->type) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->commentLen) < 0) return -1; if (tDecodeI32(pCoder, &pReq->commentLen) < 0) return -1;
@ -6909,14 +6912,13 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
default: default:
break; break;
} }
if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return 0; return 0;
} }
int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1; if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1; if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->colId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->colId) < 0) return -1;
@ -6960,6 +6962,28 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
default: default:
break; break;
} }
return 0;
}
int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeSVAlterTbReqCommon(pDecoder, pReq) < 0) return -1;
pReq->ctimeMs = 0;
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
}
int32_t tDecodeSVAlterTbReqSetCtime(SDecoder* pDecoder, SVAlterTbReq* pReq, int64_t ctimeMs) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeSVAlterTbReqCommon(pDecoder, pReq) < 0) return -1;
*(int64_t *)(pDecoder->data + pDecoder->pos) = ctimeMs;
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
@ -7238,6 +7262,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1; if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1;
if (tEncodeCStr(pCoder, pRes->tsColName) < 0) return -1; if (tEncodeCStr(pCoder, pRes->tsColName) < 0) return -1;
if (tEncodeI64(pCoder, pRes->ctimeMs) < 0) return -1;
return 0; return 0;
} }
@ -7257,6 +7282,11 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
if (tDecodeCStrTo(pCoder, pRes->tableFName) < 0) return -1; if (tDecodeCStrTo(pCoder, pRes->tableFName) < 0) return -1;
if (tDecodeCStrTo(pCoder, pRes->tsColName) < 0) return -1; if (tDecodeCStrTo(pCoder, pRes->tsColName) < 0) return -1;
pRes->ctimeMs = 0;
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI64(pCoder, &pRes->ctimeMs) < 0) return -1;
}
return 0; return 0;
} }
@ -7480,10 +7510,11 @@ int32_t tEncodeSBatchDeleteReq(SEncoder *pEncoder, const SBatchDeleteReq *pReq)
SSingleDeleteReq *pOneReq = taosArrayGet(pReq->deleteReqs, i); SSingleDeleteReq *pOneReq = taosArrayGet(pReq->deleteReqs, i);
if (tEncodeSSingleDeleteReq(pEncoder, pOneReq) < 0) return -1; if (tEncodeSSingleDeleteReq(pEncoder, pOneReq) < 0) return -1;
} }
if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1;
return 0; return 0;
} }
int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) { static int32_t tDecodeSBatchDeleteReqCommon(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
if (tDecodeI64(pDecoder, &pReq->suid) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->suid) < 0) return -1;
int32_t sz; int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1; if (tDecodeI32(pDecoder, &sz) < 0) return -1;
@ -7497,6 +7528,24 @@ int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
return 0; return 0;
} }
int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
if (tDecodeSBatchDeleteReqCommon(pDecoder, pReq)) return -1;
pReq->ctimeMs = 0;
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
}
return 0;
}
int32_t tDecodeSBatchDeleteReqSetCtime(SDecoder *pDecoder, SBatchDeleteReq *pReq, int64_t ctimeMs) {
if (tDecodeSBatchDeleteReqCommon(pDecoder, pReq)) return -1;
*(int64_t *)(pDecoder->data + pDecoder->pos) = ctimeMs;
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
return 0;
}
static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData) { static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData) {
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;
@ -7531,6 +7580,7 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
pCoder->pos += rows[iRow]->len; pCoder->pos += rows[iRow]->len;
} }
} }
if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1;
tEndEncode(pCoder); tEndEncode(pCoder);
return 0; return 0;
@ -7611,6 +7661,14 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
} }
} }
pSubmitTbData->ctimeMs = 0;
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI64(pCoder, &pSubmitTbData->ctimeMs) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
}
tEndDecode(pCoder); tEndDecode(pCoder);
_exit: _exit:

View File

@ -888,7 +888,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
void *pIter = NULL; void *pIter = NULL;
SVDropTtlTableReq ttlReq = {.timestamp = taosGetTimestampSec()}; SVDropTtlTableReq ttlReq = {.timestampSec = taosGetTimestampSec()};
int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq); int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
int32_t contLen = reqLen + sizeof(SMsgHead); int32_t contLen = reqLen + sizeof(SMsgHead);
@ -914,7 +914,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
if (code != 0) { if (code != 0) {
mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code); mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
} else { } else {
mInfo("vgId:%d, send drop ttl table request to vnode, time:%d", pVgroup->vgId, ttlReq.timestamp); mInfo("vgId:%d, send drop ttl table request to vnode, time:%" PRId32, pVgroup->vgId, ttlReq.timestampSec);
} }
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
@ -1188,7 +1188,7 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p
if (mndAllocStbSchemas(pOld, pNew) != 0) { if (mndAllocStbSchemas(pOld, pNew) != 0) {
return -1; return -1;
} }
if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags){ if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags){
terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW; terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW;
return -1; return -1;

View File

@ -27,6 +27,7 @@ target_sources(
"src/meta/metaEntry.c" "src/meta/metaEntry.c"
"src/meta/metaSnapshot.c" "src/meta/metaSnapshot.c"
"src/meta/metaCache.c" "src/meta/metaCache.c"
"src/meta/metaTtl.c"
# sma # sma
"src/sma/smaEnv.c" "src/sma/smaEnv.c"

View File

@ -115,6 +115,7 @@ int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName); int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid); int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid);
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType); int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
int metaGetTableTtlByUid(void *meta, uint64_t uid, int64_t *ttlDays);
bool metaIsTableExist(void* pVnode, tb_uid_t uid); bool metaIsTableExist(void* pVnode, tb_uid_t uid);
int32_t metaGetCachedTableUidList(void *pVnode, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList, int32_t metaGetCachedTableUidList(void *pVnode, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
bool *acquired); bool *acquired);

View File

@ -17,6 +17,7 @@
#define _TD_VNODE_META_H_ #define _TD_VNODE_META_H_
#include "index.h" #include "index.h"
#include "metaTtl.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#ifdef __cplusplus #ifdef __cplusplus
@ -89,10 +90,10 @@ struct SMeta {
// ivt idx and idx // ivt idx and idx
void* pTagIvtIdx; void* pTagIvtIdx;
TTB* pTagIdx; TTB* pTagIdx;
TTB* pTtlIdx; STtlManger* pTtlMgr;
TTB* pCtimeIdx; // table created time idx TTB* pBtimeIdx; // table created time idx
TTB* pNcolIdx; // ncol of table idx, normal table only TTB* pNcolIdx; // ncol of table idx, normal table only
TTB* pSmaIdx; TTB* pSmaIdx;
@ -138,20 +139,15 @@ typedef struct {
} STagIdxKey; } STagIdxKey;
#pragma pack(pop) #pragma pack(pop)
typedef struct {
int64_t dtime;
tb_uid_t uid;
} STtlIdxKey;
typedef struct { typedef struct {
tb_uid_t uid; tb_uid_t uid;
int64_t smaUid; int64_t smaUid;
} SSmaIdxKey; } SSmaIdxKey;
typedef struct { typedef struct {
int64_t ctime; int64_t btime;
tb_uid_t uid; tb_uid_t uid;
} SCtimeIdxKey; } SBtimeIdxKey;
typedef struct { typedef struct {
int64_t ncol; int64_t ncol;

View File

@ -0,0 +1,98 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_TTL_H_
#define _TD_VNODE_TTL_H_
#include "taosdef.h"
#include "thash.h"
#include "tdb.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef enum DirtyEntryType {
ENTRY_TYPE_DEL = 1,
ENTRY_TYPE_UPSERT = 2,
} DirtyEntryType;
typedef struct STtlManger {
TdThreadRwlock lock;
TTB* pOldTtlIdx; // btree<{deleteTime, tuid}, NULL>
SHashObj* pTtlCache; // key: tuid, value: {ttl, ctime}
SHashObj* pDirtyUids; // dirty tuid
TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl>
} STtlManger;
typedef struct {
int64_t ttlDays;
int64_t changeTimeMs;
} STtlCacheEntry;
typedef struct {
DirtyEntryType type;
} STtlDirtyEntry;
typedef struct {
int64_t deleteTimeSec;
tb_uid_t uid;
} STtlIdxKey;
typedef struct {
int64_t deleteTimeMs;
tb_uid_t uid;
} STtlIdxKeyV1;
typedef struct {
int64_t ttlDays;
} STtlIdxValue;
typedef struct {
tb_uid_t uid;
int64_t changeTimeMs;
} STtlUpdCtimeCtx;
typedef struct {
tb_uid_t uid;
int64_t changeTimeMs;
int64_t ttlDays;
} STtlUpdTtlCtx;
typedef struct {
tb_uid_t uid;
TXN* pTxn;
} STtlDelTtlCtx;
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback);
int ttlMgrClose(STtlManger* pTtlMgr);
int ttlMgrBegin(STtlManger* pTtlMgr, void* pMeta);
int ttlMgrConvert(TTB* pOldTtlIdx, TTB* pNewTtlIdx, void* pMeta);
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx);
int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_TTL_H_*/

View File

@ -149,8 +149,9 @@ int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int32_t metaTrimTables(SMeta* pMeta); int32_t metaTrimTables(SMeta* pMeta);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t timePointMs, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTime(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema); int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
@ -176,7 +177,6 @@ SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
SArray* metaGetSmaTbUids(SMeta* pMeta); SArray* metaGetSmaTbUids(SMeta* pMeta);
void* metaGetIdx(SMeta* pMeta); void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta);
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
void metaReaderInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags); void metaReaderInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags);

View File

@ -40,6 +40,12 @@ int metaBegin(SMeta *pMeta, int8_t heap) {
return -1; return -1;
} }
if (ttlMgrBegin(pMeta->pTtlMgr, pMeta) < 0) {
return -1;
}
tdbCommit(pMeta->pEnv, pMeta->txn);
return 0; return 0;
} }
@ -50,6 +56,7 @@ int metaFinishCommit(SMeta *pMeta, TXN *txn) { return tdbPostCommit(pMeta->pEnv
int metaPrepareAsyncCommit(SMeta *pMeta) { int metaPrepareAsyncCommit(SMeta *pMeta) {
// return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn); // return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn);
int code = 0; int code = 0;
code = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
code = tdbCommit(pMeta->pEnv, pMeta->txn); code = tdbCommit(pMeta->pEnv, pMeta->txn);
return code; return code;

View File

@ -31,7 +31,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tEncodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1; if (tEncodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1;
} }
} else if (pME->type == TSDB_CHILD_TABLE) { } else if (pME->type == TSDB_CHILD_TABLE) {
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1; if (tEncodeI64(pCoder, pME->ctbEntry.btime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1; if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
if (tEncodeI32v(pCoder, pME->ctbEntry.commentLen) < 0) return -1; if (tEncodeI32v(pCoder, pME->ctbEntry.commentLen) < 0) return -1;
if (pME->ctbEntry.commentLen > 0) { if (pME->ctbEntry.commentLen > 0) {
@ -40,7 +40,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1; if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1; if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1;
} else if (pME->type == TSDB_NORMAL_TABLE) { } else if (pME->type == TSDB_NORMAL_TABLE) {
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1; if (tEncodeI64(pCoder, pME->ntbEntry.btime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1; if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
if (tEncodeI32v(pCoder, pME->ntbEntry.commentLen) < 0) return -1; if (tEncodeI32v(pCoder, pME->ntbEntry.commentLen) < 0) return -1;
if (pME->ntbEntry.commentLen > 0) { if (pME->ntbEntry.commentLen > 0) {
@ -76,7 +76,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1; if (tDecodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1;
} }
} else if (pME->type == TSDB_CHILD_TABLE) { } else if (pME->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.btime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ctbEntry.commentLen) < 0) return -1; if (tDecodeI32v(pCoder, &pME->ctbEntry.commentLen) < 0) return -1;
if (pME->ctbEntry.commentLen > 0) { if (pME->ctbEntry.commentLen > 0) {
@ -85,7 +85,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO) if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO)
} else if (pME->type == TSDB_NORMAL_TABLE) { } else if (pME->type == TSDB_NORMAL_TABLE) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ntbEntry.btime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ntbEntry.commentLen) < 0) return -1; if (tDecodeI32v(pCoder, &pME->ntbEntry.commentLen) < 0) return -1;
if (pME->ntbEntry.commentLen > 0) { if (pME->ntbEntry.commentLen > 0) {

View File

@ -19,12 +19,11 @@ static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen
static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ctimeIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int btimeIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
@ -128,8 +127,8 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
goto _err; goto _err;
} }
// open pTtlIdx // open pTtlMgr ("ttlv1.idx")
ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx, 0); ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
@ -143,7 +142,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
} }
// idx table create time // idx table create time
ret = tdbTbOpen("ctime.idx", sizeof(SCtimeIdxKey), 0, ctimeIdxCmpr, pMeta->pEnv, &pMeta->pCtimeIdx, 0); ret = tdbTbOpen("ctime.idx", sizeof(SBtimeIdxKey), 0, btimeIdxCmpr, pMeta->pEnv, &pMeta->pBtimeIdx, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta ctime index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta ctime index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
@ -184,9 +183,9 @@ _err:
if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
if (pMeta->pCtimeIdx) tdbTbClose(pMeta->pCtimeIdx); if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
@ -209,9 +208,9 @@ int metaClose(SMeta **ppMeta) {
if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
if (pMeta->pCtimeIdx) tdbTbClose(pMeta->pCtimeIdx); if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
@ -399,37 +398,18 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
return 0; return 0;
} }
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) { static int btimeIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
STtlIdxKey *pTtlIdxKey1 = (STtlIdxKey *)pKey1; SBtimeIdxKey *pBtimeIdxKey1 = (SBtimeIdxKey *)pKey1;
STtlIdxKey *pTtlIdxKey2 = (STtlIdxKey *)pKey2; SBtimeIdxKey *pBtimeIdxKey2 = (SBtimeIdxKey *)pKey2;
if (pBtimeIdxKey1->btime > pBtimeIdxKey2->btime) {
if (pTtlIdxKey1->dtime > pTtlIdxKey2->dtime) {
return 1; return 1;
} else if (pTtlIdxKey1->dtime < pTtlIdxKey2->dtime) { } else if (pBtimeIdxKey1->btime < pBtimeIdxKey2->btime) {
return -1; return -1;
} }
if (pTtlIdxKey1->uid > pTtlIdxKey2->uid) { if (pBtimeIdxKey1->uid > pBtimeIdxKey2->uid) {
return 1; return 1;
} else if (pTtlIdxKey1->uid < pTtlIdxKey2->uid) { } else if (pBtimeIdxKey1->uid < pBtimeIdxKey2->uid) {
return -1;
}
return 0;
}
static int ctimeIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
SCtimeIdxKey *pCtimeIdxKey1 = (SCtimeIdxKey *)pKey1;
SCtimeIdxKey *pCtimeIdxKey2 = (SCtimeIdxKey *)pKey2;
if (pCtimeIdxKey1->ctime > pCtimeIdxKey2->ctime) {
return 1;
} else if (pCtimeIdxKey1->ctime < pCtimeIdxKey2->ctime) {
return -1;
}
if (pCtimeIdxKey1->uid > pCtimeIdxKey2->uid) {
return 1;
} else if (pCtimeIdxKey1->uid < pCtimeIdxKey2->uid) {
return -1; return -1;
} }

View File

@ -212,6 +212,29 @@ int metaReadNext(SMetaReader *pReader) {
return 0; return 0;
} }
int metaGetTableTtlByUid(void *meta, uint64_t uid, int64_t *ttlDays) {
int code = -1;
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
goto _exit;
}
if (mr.me.type == TSDB_CHILD_TABLE) {
*ttlDays = mr.me.ctbEntry.ttlDays;
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
*ttlDays = mr.me.ntbEntry.ttlDays;
} else {
goto _exit;
}
code = 0;
_exit:
metaReaderClear(&mr);
return code;
}
#if 1 // =================================================== #if 1 // ===================================================
SMTbCursor *metaOpenTbCursor(void *pVnode) { SMTbCursor *metaOpenTbCursor(void *pVnode) {
SMTbCursor *pTbCur = NULL; SMTbCursor *pTbCur = NULL;
@ -387,37 +410,6 @@ _err:
return NULL; return NULL;
} }
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
TBC *pCur;
int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
if (ret < 0) {
return ret;
}
STtlIdxKey ttlKey = {0};
ttlKey.dtime = ttl;
ttlKey.uid = INT64_MAX;
int c = 0;
tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
if (c < 0) {
tdbTbcMoveToPrev(pCur);
}
void *pKey = NULL;
int kLen = 0;
while (1) {
ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
if (ret < 0) {
break;
}
ttlKey = *(STtlIdxKey *)pKey;
taosArrayPush(uidList, &ttlKey.uid);
}
tdbFree(pKey);
tdbTbcClose(pCur);
return 0;
}
struct SMCtbCursor { struct SMCtbCursor {
SMeta *pMeta; SMeta *pMeta;
TBC *pCur; TBC *pCur;
@ -1018,17 +1010,17 @@ int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
pCursor->type = param->type; pCursor->type = param->type;
metaRLock(pMeta); metaRLock(pMeta);
ret = tdbTbcOpen(pMeta->pCtimeIdx, &pCursor->pCur, NULL); ret = tdbTbcOpen(pMeta->pBtimeIdx, &pCursor->pCur, NULL);
if (ret != 0) { if (ret != 0) {
goto END; goto END;
} }
int64_t uidLimit = param->reverse ? INT64_MAX : 0; int64_t uidLimit = param->reverse ? INT64_MAX : 0;
SCtimeIdxKey ctimeKey = {.ctime = *(int64_t *)(param->val), .uid = uidLimit}; SBtimeIdxKey btimeKey = {.btime = *(int64_t *)(param->val), .uid = uidLimit};
SCtimeIdxKey *pCtimeKey = &ctimeKey; SBtimeIdxKey *pBtimeKey = &btimeKey;
int cmp = 0; int cmp = 0;
if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) { if (tdbTbcMoveTo(pCursor->pCur, &btimeKey, sizeof(btimeKey), &cmp) < 0) {
goto END; goto END;
} }
@ -1042,10 +1034,10 @@ int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, NULL, NULL); valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, NULL, NULL);
if (valid < 0) break; if (valid < 0) break;
SCtimeIdxKey *p = entryKey; SBtimeIdxKey *p = entryKey;
if (count > TRY_ERROR_LIMIT) break; if (count > TRY_ERROR_LIMIT) break;
int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type); int32_t cmp = (*param->filterFunc)((void *)&p->btime, (void *)&pBtimeKey->btime, param->type);
if (cmp == 0) if (cmp == 0)
taosArrayPush(pUids, &p->uid); taosArrayPush(pUids, &p->uid);
else { else {
@ -1149,7 +1141,7 @@ int32_t metaFilterTtl(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
pCursor->type = param->type; pCursor->type = param->type;
metaRLock(pMeta); metaRLock(pMeta);
ret = tdbTbcOpen(pMeta->pTtlIdx, &pCursor->pCur, NULL); //ret = tdbTbcOpen(pMeta->pTtlIdx, &pCursor->pCur, NULL);
END: END:
if (pCursor->pMeta) metaULock(pCursor->pMeta); if (pCursor->pMeta) metaULock(pCursor->pMeta);

View File

@ -20,7 +20,7 @@ static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, con
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
@ -28,8 +28,8 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type); static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey); static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
// opt ins_tables query // opt ins_tables query
static int metaUpdateCtimeIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaDeleteCtimeIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
@ -734,7 +734,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
me.uid = pReq->uid; me.uid = pReq->uid;
me.name = pReq->name; me.name = pReq->name;
if (me.type == TSDB_CHILD_TABLE) { if (me.type == TSDB_CHILD_TABLE) {
me.ctbEntry.ctime = pReq->ctime; me.ctbEntry.btime = pReq->btime;
me.ctbEntry.ttlDays = pReq->ttl; me.ctbEntry.ttlDays = pReq->ttl;
me.ctbEntry.commentLen = pReq->commentLen; me.ctbEntry.commentLen = pReq->commentLen;
me.ctbEntry.comment = pReq->comment; me.ctbEntry.comment = pReq->comment;
@ -770,7 +770,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
metaTbGroupCacheClear(pMeta, me.ctbEntry.suid); metaTbGroupCacheClear(pMeta, me.ctbEntry.suid);
metaULock(pMeta); metaULock(pMeta);
} else { } else {
me.ntbEntry.ctime = pReq->ctime; me.ntbEntry.btime = pReq->btime;
me.ntbEntry.ttlDays = pReq->ttl; me.ntbEntry.ttlDays = pReq->ttl;
me.ntbEntry.commentLen = pReq->commentLen; me.ntbEntry.commentLen = pReq->commentLen;
me.ntbEntry.comment = pReq->comment; me.ntbEntry.comment = pReq->comment;
@ -923,50 +923,40 @@ end:
return code; return code;
} }
int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) { int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) {
int ret = metaTtlSmaller(pMeta, ttl, tbUids); int ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
if (ret != 0) { if (ret != 0) {
metaError("ttl failed to flush, ret:%d", ret);
return ret;
}
ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids);
if (ret != 0) {
metaError("ttl failed to find expired table, ret:%d", ret);
return ret; return ret;
} }
if (TARRAY_SIZE(tbUids) == 0) { if (TARRAY_SIZE(tbUids) == 0) {
return 0; return 0;
} }
metaInfo("ttl find expired table count: %zu" , TARRAY_SIZE(tbUids));
metaDropTables(pMeta, tbUids); metaDropTables(pMeta, tbUids);
return 0; return 0;
} }
static void metaBuildTtlIdxKey(STtlIdxKey *ttlKey, const SMetaEntry *pME) { static int metaBuildBtimeIdxKey(SBtimeIdxKey *btimeKey, const SMetaEntry *pME) {
int64_t ttlDays = 0; int64_t btime;
int64_t ctime = 0;
if (pME->type == TSDB_CHILD_TABLE) { if (pME->type == TSDB_CHILD_TABLE) {
ctime = pME->ctbEntry.ctime; btime = pME->ctbEntry.btime;
ttlDays = pME->ctbEntry.ttlDays;
} else if (pME->type == TSDB_NORMAL_TABLE) { } else if (pME->type == TSDB_NORMAL_TABLE) {
ctime = pME->ntbEntry.ctime; btime = pME->ntbEntry.btime;
ttlDays = pME->ntbEntry.ttlDays;
} else {
metaError("meta/table: invalide table type: %" PRId8 " build ttl idx key failed.", pME->type);
return;
}
if (ttlDays <= 0) return;
ttlKey->dtime = ctime / 1000 + ttlDays * tsTtlUnit;
ttlKey->uid = pME->uid;
}
static int metaBuildCtimeIdxKey(SCtimeIdxKey *ctimeKey, const SMetaEntry *pME) {
int64_t ctime;
if (pME->type == TSDB_CHILD_TABLE) {
ctime = pME->ctbEntry.ctime;
} else if (pME->type == TSDB_NORMAL_TABLE) {
ctime = pME->ntbEntry.ctime;
} else { } else {
return -1; return -1;
} }
ctimeKey->ctime = ctime; btimeKey->btime = btime;
ctimeKey->uid = pME->uid; btimeKey->uid = pME->uid;
return 0; return 0;
} }
@ -980,11 +970,9 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
return 0; return 0;
} }
static int metaDeleteTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
STtlIdxKey ttlKey = {0}; STtlDelTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn};
metaBuildTtlIdxKey(&ttlKey, pME); return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
if (ttlKey.dtime == 0) return 0;
return tdbTbDelete(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), pMeta->txn);
} }
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
@ -1066,10 +1054,10 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, pMeta->txn); tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, pMeta->txn);
tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), pMeta->txn); tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), pMeta->txn);
if (e.type == TSDB_CHILD_TABLE || e.type == TSDB_NORMAL_TABLE) metaDeleteCtimeIdx(pMeta, &e); if (e.type == TSDB_CHILD_TABLE || e.type == TSDB_NORMAL_TABLE) metaDeleteBtimeIdx(pMeta, &e);
if (e.type == TSDB_NORMAL_TABLE) metaDeleteNcolIdx(pMeta, &e); if (e.type == TSDB_NORMAL_TABLE) metaDeleteNcolIdx(pMeta, &e);
if (e.type != TSDB_SUPER_TABLE) metaDeleteTtlIdx(pMeta, &e); if (e.type != TSDB_SUPER_TABLE) metaDeleteTtl(pMeta, &e);
if (e.type == TSDB_CHILD_TABLE) { if (e.type == TSDB_CHILD_TABLE) {
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn); tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn);
@ -1102,23 +1090,23 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
return 0; return 0;
} }
// opt ins_tables // opt ins_tables
int metaUpdateCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) { int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtimeIdxKey ctimeKey = {0}; SBtimeIdxKey btimeKey = {0};
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) { if (metaBuildBtimeIdxKey(&btimeKey, pME) < 0) {
return 0; return 0;
} }
metaTrace("vgId:%d, start to save version:%" PRId64 " uid:%" PRId64 " ctime:%" PRId64, TD_VID(pMeta->pVnode), metaTrace("vgId:%d, start to save version:%" PRId64 " uid:%" PRId64 " btime:%" PRId64, TD_VID(pMeta->pVnode),
pME->version, pME->uid, ctimeKey.ctime); pME->version, pME->uid, btimeKey.btime);
return tdbTbUpsert(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), NULL, 0, pMeta->txn); return tdbTbUpsert(pMeta->pBtimeIdx, &btimeKey, sizeof(btimeKey), NULL, 0, pMeta->txn);
} }
int metaDeleteCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) { int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtimeIdxKey ctimeKey = {0}; SBtimeIdxKey btimeKey = {0};
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) { if (metaBuildBtimeIdxKey(&btimeKey, pME) < 0) {
return 0; return 0;
} }
return tdbTbDelete(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), pMeta->txn); return tdbTbDelete(pMeta->pBtimeIdx, &btimeKey, sizeof(btimeKey), pMeta->txn);
} }
int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME) { int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME) {
SNcolIdxKey ncolKey = {0}; SNcolIdxKey ncolKey = {0};
@ -1328,6 +1316,8 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
metaULock(pMeta); metaULock(pMeta);
metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs);
metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp); metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp);
if (entry.pBuf) taosMemoryFree(entry.pBuf); if (entry.pBuf) taosMemoryFree(entry.pBuf);
@ -1515,6 +1505,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaULock(pMeta); metaULock(pMeta);
metaUpdateChangeTime(pMeta, ctbEntry.uid, pAlterTbReq->ctimeMs);
tDecoderClear(&dc1); tDecoderClear(&dc1);
tDecoderClear(&dc2); tDecoderClear(&dc2);
taosMemoryFree((void *)ctbEntry.ctbEntry.pTags); taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
@ -1603,9 +1595,9 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
// build SMetaEntry // build SMetaEntry
if (entry.type == TSDB_CHILD_TABLE) { if (entry.type == TSDB_CHILD_TABLE) {
if (pAlterTbReq->updateTTL) { if (pAlterTbReq->updateTTL) {
metaDeleteTtlIdx(pMeta, &entry); metaDeleteTtl(pMeta, &entry);
entry.ctbEntry.ttlDays = pAlterTbReq->newTTL; entry.ctbEntry.ttlDays = pAlterTbReq->newTTL;
metaUpdateTtlIdx(pMeta, &entry); metaUpdateTtl(pMeta, &entry);
} }
if (pAlterTbReq->newCommentLen >= 0) { if (pAlterTbReq->newCommentLen >= 0) {
entry.ctbEntry.commentLen = pAlterTbReq->newCommentLen; entry.ctbEntry.commentLen = pAlterTbReq->newCommentLen;
@ -1613,9 +1605,9 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
} }
} else { } else {
if (pAlterTbReq->updateTTL) { if (pAlterTbReq->updateTTL) {
metaDeleteTtlIdx(pMeta, &entry); metaDeleteTtl(pMeta, &entry);
entry.ntbEntry.ttlDays = pAlterTbReq->newTTL; entry.ntbEntry.ttlDays = pAlterTbReq->newTTL;
metaUpdateTtlIdx(pMeta, &entry); metaUpdateTtl(pMeta, &entry);
} }
if (pAlterTbReq->newCommentLen >= 0) { if (pAlterTbReq->newCommentLen >= 0) {
entry.ntbEntry.commentLen = pAlterTbReq->newCommentLen; entry.ntbEntry.commentLen = pAlterTbReq->newCommentLen;
@ -1628,6 +1620,8 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
metaUpdateUidIdx(pMeta, &entry); metaUpdateUidIdx(pMeta, &entry);
metaULock(pMeta); metaULock(pMeta);
metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs);
tdbTbcClose(pTbDbc); tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc); tdbTbcClose(pUidIdxc);
tDecoderClear(&dc); tDecoderClear(&dc);
@ -1967,11 +1961,28 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbUpsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), pMeta->txn); return tdbTbUpsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), pMeta->txn);
} }
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
STtlIdxKey ttlKey = {0}; if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
metaBuildTtlIdxKey(&ttlKey, pME);
if (ttlKey.dtime == 0) return 0; STtlUpdTtlCtx ctx = {.uid = pME->uid};
return tdbTbUpsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, pMeta->txn);
if (pME->type == TSDB_CHILD_TABLE) {
ctx.ttlDays = pME->ctbEntry.ttlDays;
ctx.changeTimeMs = pME->ctbEntry.btime;
} else {
ctx.ttlDays = pME->ntbEntry.ttlDays;
ctx.changeTimeMs = pME->ntbEntry.btime;
}
return ttlMgrInsertTtl(pMeta->pTtlMgr, &ctx);
}
int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
if (!tsTtlChangeOnWrite) return 0;
STtlUpdCtimeCtx ctx = {.uid = uid, .changeTimeMs = changeTimeMs};
return ttlMgrUpdateChangeTime(pMeta->pTtlMgr, &ctx);
} }
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
@ -2182,7 +2193,7 @@ int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
} }
} }
code = metaUpdateCtimeIdx(pMeta, pME); code = metaUpdateBtimeIdx(pMeta, pME);
VND_CHECK_CODE(code, line, _err); VND_CHECK_CODE(code, line, _err);
if (pME->type == TSDB_NORMAL_TABLE) { if (pME->type == TSDB_NORMAL_TABLE) {
@ -2191,7 +2202,7 @@ int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
} }
if (pME->type != TSDB_SUPER_TABLE) { if (pME->type != TSDB_SUPER_TABLE) {
code = metaUpdateTtlIdx(pMeta, pME); code = metaUpdateTtl(pMeta, pME);
VND_CHECK_CODE(code, line, _err); VND_CHECK_CODE(code, line, _err);
} }

View File

@ -0,0 +1,434 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "metaTtl.h"
#include "meta.h"
typedef struct {
TTB *pNewTtlIdx;
SMeta *pMeta;
} SConvertData;
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid);
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ttlMgrFillCache(STtlManger *pTtlMgr);
static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache);
static int32_t ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData);
static int32_t ttlMgrWLock(STtlManger *pTtlMgr);
static int32_t ttlMgrRLock(STtlManger *pTtlMgr);
static int32_t ttlMgrULock(STtlManger *pTtlMgr);
const char *ttlTbname = "ttl.idx";
const char *ttlV1Tbname = "ttlv1.idx";
int ttlMgrOpen(STtlManger **ppTtlMgr, TDB *pEnv, int8_t rollback) {
int ret;
*ppTtlMgr = NULL;
STtlManger *pTtlMgr = (STtlManger *)tdbOsCalloc(1, sizeof(*pTtlMgr));
if (pTtlMgr == NULL) {
return -1;
}
if (tdbTbExist(ttlTbname, pEnv)) {
ret = tdbTbOpen(ttlTbname, sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pEnv, &pTtlMgr->pOldTtlIdx, rollback);
if (ret < 0) {
metaError("failed to open %s index since %s", ttlTbname, tstrerror(terrno));
return ret;
}
}
ret = tdbTbOpen(ttlV1Tbname, TDB_VARIANT_LEN, TDB_VARIANT_LEN, ttlIdxKeyV1Cmpr, pEnv, &pTtlMgr->pTtlIdx, rollback);
if (ret < 0) {
metaError("failed to open %s since %s", ttlV1Tbname, tstrerror(terrno));
tdbOsFree(pTtlMgr);
return ret;
}
pTtlMgr->pTtlCache = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
pTtlMgr->pDirtyUids = taosHashInit(8192, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
taosThreadRwlockInit(&pTtlMgr->lock, NULL);
*ppTtlMgr = pTtlMgr;
return 0;
}
int ttlMgrClose(STtlManger *pTtlMgr) {
taosHashCleanup(pTtlMgr->pTtlCache);
taosHashCleanup(pTtlMgr->pDirtyUids);
tdbTbClose(pTtlMgr->pTtlIdx);
taosThreadRwlockDestroy(&pTtlMgr->lock);
tdbOsFree(pTtlMgr);
return 0;
}
int ttlMgrBegin(STtlManger *pTtlMgr, void *pMeta) {
metaInfo("ttl mgr start open");
int ret;
int64_t startNs = taosGetTimestampNs();
SMeta *meta = (SMeta *)pMeta;
if (pTtlMgr->pOldTtlIdx) {
ret = ttlMgrConvert(pTtlMgr->pOldTtlIdx, pTtlMgr->pTtlIdx, pMeta);
if (ret < 0) {
metaError("failed to convert ttl index since %s", tstrerror(terrno));
goto _out;
}
ret = tdbTbDropByName(ttlTbname, meta->pEnv, meta->txn);
if (ret < 0) {
metaError("failed to drop old ttl index since %s", tstrerror(terrno));
goto _out;
}
tdbTbClose(pTtlMgr->pOldTtlIdx);
pTtlMgr->pOldTtlIdx = NULL;
}
ret = ttlMgrFillCache(pTtlMgr);
if (ret < 0) {
metaError("failed to fill hash since %s", tstrerror(terrno));
goto _out;
}
int64_t endNs = taosGetTimestampNs();
metaInfo("ttl mgr open end, hash size: %d, time consumed: %" PRId64 " ns", taosHashGetSize(pTtlMgr->pTtlCache),
endNs - startNs);
_out:
return ret;
}
static void ttlMgrBuildKey(STtlIdxKeyV1 *pTtlKey, int64_t ttlDays, int64_t changeTimeMs, tb_uid_t uid) {
if (ttlDays <= 0) return;
pTtlKey->deleteTimeMs = changeTimeMs + ttlDays * tsTtlUnit * 1000;
pTtlKey->uid = uid;
}
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
STtlIdxKey *pTtlIdxKey1 = (STtlIdxKey *)pKey1;
STtlIdxKey *pTtlIdxKey2 = (STtlIdxKey *)pKey2;
if (pTtlIdxKey1->deleteTimeSec > pTtlIdxKey2->deleteTimeSec) {
return 1;
} else if (pTtlIdxKey1->deleteTimeSec < pTtlIdxKey2->deleteTimeSec) {
return -1;
}
if (pTtlIdxKey1->uid > pTtlIdxKey2->uid) {
return 1;
} else if (pTtlIdxKey1->uid < pTtlIdxKey2->uid) {
return -1;
}
return 0;
}
static int ttlIdxKeyV1Cmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
STtlIdxKeyV1 *pTtlIdxKey1 = (STtlIdxKeyV1 *)pKey1;
STtlIdxKeyV1 *pTtlIdxKey2 = (STtlIdxKeyV1 *)pKey2;
if (pTtlIdxKey1->deleteTimeMs > pTtlIdxKey2->deleteTimeMs) {
return 1;
} else if (pTtlIdxKey1->deleteTimeMs < pTtlIdxKey2->deleteTimeMs) {
return -1;
}
if (pTtlIdxKey1->uid > pTtlIdxKey2->uid) {
return 1;
} else if (pTtlIdxKey1->uid < pTtlIdxKey2->uid) {
return -1;
}
return 0;
}
static int ttlMgrFillCache(STtlManger *pTtlMgr) {
return tdbTbTraversal(pTtlMgr->pTtlIdx, pTtlMgr->pTtlCache, ttlMgrFillCacheOneEntry);
}
static int32_t ttlMgrFillCacheOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pTtlCache) {
SHashObj *pCache = (SHashObj *)pTtlCache;
STtlIdxKeyV1 *ttlKey = (STtlIdxKeyV1 *)pKey;
tb_uid_t uid = ttlKey->uid;
int64_t ttlDays = *(int64_t *)pVal;
int64_t changeTimeMs = ttlKey->deleteTimeMs - ttlDays * tsTtlUnit * 1000;
STtlCacheEntry data = {.ttlDays = ttlDays, .changeTimeMs = changeTimeMs};
return taosHashPut(pCache, &uid, sizeof(uid), &data, sizeof(data));
}
static int ttlMgrConvertOneEntry(const void *pKey, int keyLen, const void *pVal, int valLen, void *pConvertData) {
SConvertData *pData = (SConvertData *)pConvertData;
STtlIdxKey *ttlKey = (STtlIdxKey *)pKey;
tb_uid_t uid = ttlKey->uid;
int64_t ttlDays = 0;
int ret = metaGetTableTtlByUid(pData->pMeta, uid, &ttlDays);
if (ret < 0) {
metaError("ttlMgr convert failed to get ttl since %s", tstrerror(terrno));
goto _out;
}
STtlIdxKeyV1 ttlKeyV1 = {.deleteTimeMs = ttlKey->deleteTimeSec * 1000, .uid = uid};
ret = tdbTbUpsert(pData->pNewTtlIdx, &ttlKeyV1, sizeof(ttlKeyV1), &ttlDays, sizeof(ttlDays), pData->pMeta->txn);
if (ret < 0) {
metaError("ttlMgr convert failed to upsert since %s", tstrerror(terrno));
goto _out;
}
ret = 0;
_out:
return ret;
}
int ttlMgrConvert(TTB *pOldTtlIdx, TTB *pNewTtlIdx, void *pMeta) {
SMeta *meta = pMeta;
metaInfo("ttlMgr convert ttl start.");
SConvertData cvData = {.pNewTtlIdx = pNewTtlIdx, .pMeta = meta};
int ret = tdbTbTraversal(pOldTtlIdx, &cvData, ttlMgrConvertOneEntry);
if (ret < 0) {
metaError("failed to convert ttl since %s", tstrerror(terrno));
}
metaInfo("ttlMgr convert ttl end.");
return ret;
}
int ttlMgrInsertTtl(STtlManger *pTtlMgr, const STtlUpdTtlCtx *updCtx) {
if (updCtx->ttlDays == 0) return 0;
STtlCacheEntry cacheEntry = {.ttlDays = updCtx->ttlDays, .changeTimeMs = updCtx->changeTimeMs};
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
ttlMgrWLock(pTtlMgr);
int ret = taosHashPut(pTtlMgr->pTtlCache, &updCtx->uid, sizeof(updCtx->uid), &cacheEntry, sizeof(cacheEntry));
if (ret < 0) {
metaError("ttlMgr insert failed to update ttl cache since %s", tstrerror(terrno));
goto _out;
}
ret = taosHashPut(pTtlMgr->pDirtyUids, &updCtx->uid, sizeof(updCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
if (ret < 0) {
metaError("ttlMgr insert failed to update ttl dirty uids since %s", tstrerror(terrno));
goto _out;
}
ret = 0;
_out:
ttlMgrULock(pTtlMgr);
metaDebug("ttl mgr insert ttl, uid: %" PRId64 ", ctime: %" PRId64 ", ttlDays: %" PRId64, updCtx->uid,
updCtx->changeTimeMs, updCtx->ttlDays);
return ret;
}
int ttlMgrDeleteTtl(STtlManger *pTtlMgr, const STtlDelTtlCtx *delCtx) {
ttlMgrWLock(pTtlMgr);
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_DEL};
int ret = taosHashPut(pTtlMgr->pDirtyUids, &delCtx->uid, sizeof(delCtx->uid), &dirtryEntry, sizeof(dirtryEntry));
if (ret < 0) {
metaError("ttlMgr del failed to update ttl dirty uids since %s", tstrerror(terrno));
goto _out;
}
ret = 0;
_out:
ttlMgrULock(pTtlMgr);
metaDebug("ttl mgr delete ttl, uid: %" PRId64, delCtx->uid);
return ret;
}
int ttlMgrUpdateChangeTime(STtlManger *pTtlMgr, const STtlUpdCtimeCtx *pUpdCtimeCtx) {
ttlMgrWLock(pTtlMgr);
STtlCacheEntry *oldData = taosHashGet(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid));
if (oldData == NULL) {
goto _out;
}
STtlCacheEntry cacheEntry = {.ttlDays = oldData->ttlDays, .changeTimeMs = pUpdCtimeCtx->changeTimeMs};
STtlDirtyEntry dirtryEntry = {.type = ENTRY_TYPE_UPSERT};
int ret =
taosHashPut(pTtlMgr->pTtlCache, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &cacheEntry, sizeof(cacheEntry));
if (ret < 0) {
metaError("ttlMgr update ctime failed to update ttl cache since %s", tstrerror(terrno));
goto _out;
}
ret = taosHashPut(pTtlMgr->pDirtyUids, &pUpdCtimeCtx->uid, sizeof(pUpdCtimeCtx->uid), &dirtryEntry,
sizeof(dirtryEntry));
if (ret < 0) {
metaError("ttlMgr update ctime failed to update ttl dirty uids since %s", tstrerror(terrno));
goto _out;
}
ret = 0;
_out:
ttlMgrULock(pTtlMgr);
metaDebug("ttl mgr update ctime, uid: %" PRId64 ", ctime: %" PRId64, pUpdCtimeCtx->uid, pUpdCtimeCtx->changeTimeMs);
return ret;
}
int ttlMgrFindExpired(STtlManger *pTtlMgr, int64_t timePointMs, SArray *pTbUids) {
ttlMgrRLock(pTtlMgr);
TBC *pCur;
int ret = tdbTbcOpen(pTtlMgr->pTtlIdx, &pCur, NULL);
if (ret < 0) {
goto _out;
}
STtlIdxKeyV1 ttlKey = {0};
ttlKey.deleteTimeMs = timePointMs;
ttlKey.uid = INT64_MAX;
int c = 0;
tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
if (c < 0) {
tdbTbcMoveToPrev(pCur);
}
void *pKey = NULL;
int kLen = 0;
while (1) {
ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
if (ret < 0) {
ret = 0;
break;
}
ttlKey = *(STtlIdxKeyV1 *)pKey;
taosArrayPush(pTbUids, &ttlKey.uid);
}
tdbFree(pKey);
tdbTbcClose(pCur);
ret = 0;
_out:
ttlMgrULock(pTtlMgr);
return ret;
}
int ttlMgrFlush(STtlManger *pTtlMgr, TXN *pTxn) {
ttlMgrWLock(pTtlMgr);
metaInfo("ttl mgr flush start.");
int ret = -1;
void *pIter = taosHashIterate(pTtlMgr->pDirtyUids, NULL);
while (pIter != NULL) {
STtlDirtyEntry *pEntry = (STtlDirtyEntry *)pIter;
tb_uid_t *pUid = taosHashGetKey(pIter, NULL);
STtlCacheEntry *cacheEntry = taosHashGet(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
if (cacheEntry == NULL) {
metaError("ttlMgr flush failed to get ttl cache since %s", tstrerror(terrno));
goto _out;
}
STtlIdxKeyV1 ttlKey;
ttlMgrBuildKey(&ttlKey, cacheEntry->ttlDays, cacheEntry->changeTimeMs, *pUid);
if (pEntry->type == ENTRY_TYPE_UPSERT) {
ret = tdbTbUpsert(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), &cacheEntry->ttlDays, sizeof(cacheEntry->ttlDays),
pTxn);
if (ret < 0) {
metaError("ttlMgr flush failed to flush ttl cache upsert since %s", tstrerror(terrno));
goto _out;
}
} else if (pEntry->type == ENTRY_TYPE_DEL) {
ret = tdbTbDelete(pTtlMgr->pTtlIdx, &ttlKey, sizeof(ttlKey), pTxn);
if (ret < 0) {
metaError("ttlMgr flush failed to flush ttl cache del since %s", tstrerror(terrno));
goto _out;
}
ret = taosHashRemove(pTtlMgr->pTtlCache, pUid, sizeof(*pUid));
if (ret < 0) {
metaError("ttlMgr flush failed to delete ttl cache since %s", tstrerror(terrno));
goto _out;
}
} else {
metaError("ttlMgr flush failed to flush ttl cache, unknown type: %d", pEntry->type);
goto _out;
}
pIter = taosHashIterate(pTtlMgr->pDirtyUids, pIter);
}
taosHashClear(pTtlMgr->pDirtyUids);
ret = 0;
_out:
ttlMgrULock(pTtlMgr);
metaInfo("ttl mgr flush end.");
return ret;
}
static int32_t ttlMgrRLock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("ttlMgr rlock %p", &pTtlMgr->lock);
ret = taosThreadRwlockRdlock(&pTtlMgr->lock);
return ret;
}
static int32_t ttlMgrWLock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("ttlMgr wlock %p", &pTtlMgr->lock);
ret = taosThreadRwlockWrlock(&pTtlMgr->lock);
return ret;
}
static int32_t ttlMgrULock(STtlManger *pTtlMgr) {
int32_t ret = 0;
metaTrace("ttlMgr ulock %p", &pTtlMgr->lock);
ret = taosThreadRwlockUnlock(&pTtlMgr->lock);
return ret;
}

View File

@ -37,7 +37,7 @@ static int32_t vnodeProcessCreateIndexReq(SVnode *pVnode, int64_t ver, void *pRe
static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessDropIndexReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp); static int32_t vnodeProcessCompactVnodeReq(SVnode *pVnode, int64_t ver, void *pReq, int32_t len, SRpcMsg *pRsp);
static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t ctime, int64_t *pUid) { static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, int64_t btime, int64_t *pUid) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -66,8 +66,8 @@ static int32_t vnodePreprocessCreateTableReq(SVnode *pVnode, SDecoder *pCoder, i
} }
*(int64_t *)(pCoder->data + pCoder->pos) = uid; *(int64_t *)(pCoder->data + pCoder->pos) = uid;
// ctime // btime
*(int64_t *)(pCoder->data + pCoder->pos + 8) = ctime; *(int64_t *)(pCoder->data + pCoder->pos + 8) = btime;
tEndDecode(pCoder); tEndDecode(pCoder);
@ -84,7 +84,7 @@ static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int64_t ctime = taosGetTimestampMs(); int64_t btime = taosGetTimestampMs();
SDecoder dc = {0}; SDecoder dc = {0};
int32_t nReqs; int32_t nReqs;
@ -99,7 +99,7 @@ static int32_t vnodePreProcessCreateTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
for (int32_t iReq = 0; iReq < nReqs; iReq++) { for (int32_t iReq = 0; iReq < nReqs; iReq++) {
code = vnodePreprocessCreateTableReq(pVnode, &dc, ctime, NULL); code = vnodePreprocessCreateTableReq(pVnode, &dc, btime, NULL);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -109,8 +109,35 @@ _exit:
tDecoderClear(&dc); tDecoderClear(&dc);
return code; return code;
} }
static int32_t vnodePreProcessAlterTableMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = TSDB_CODE_INVALID_MSG;
int32_t lino = 0;
SDecoder dc = {0};
tDecoderInit(&dc, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
SVAlterTbReq vAlterTbReq = {0};
int64_t ctimeMs = taosGetTimestampMs();
if (tDecodeSVAlterTbReqSetCtime(&dc, &vAlterTbReq, ctimeMs) < 0) {
goto _exit;
}
code = 0;
_exit:
tDecoderClear(&dc);
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} else {
vTrace("vgId:%d %s done, table:%s ctimeMs generated:%" PRId64, TD_VID(pVnode), __func__, vAlterTbReq.tbName,
ctimeMs);
}
return code;
}
extern int64_t tsMaxKeyByPrecision[]; extern int64_t tsMaxKeyByPrecision[];
static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t ctime) { static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int64_t btimeMs, int64_t ctimeMs) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -127,7 +154,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
int64_t uid; int64_t uid;
if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) { if (submitTbData.flags & SUBMIT_REQ_AUTO_CREATE_TABLE) {
code = vnodePreprocessCreateTableReq(pVnode, pCoder, ctime, &uid); code = vnodePreprocessCreateTableReq(pVnode, pCoder, btimeMs, &uid);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -153,7 +180,7 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
} }
// scan and check // scan and check
TSKEY now = ctime; TSKEY now = btimeMs;
if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) { if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_MICRO) {
now *= 1000; now *= 1000;
} else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) { } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) {
@ -170,7 +197,6 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
SColData colData = {0}; SColData colData = {0};
pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData); pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
if (colData.flag != HAS_VALUE) { if (colData.flag != HAS_VALUE) {
code = TSDB_CODE_INVALID_MSG; code = TSDB_CODE_INVALID_MSG;
goto _exit; goto _exit;
@ -182,6 +208,10 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
goto _exit; goto _exit;
} }
} }
for (uint64_t i = 1; i < nColData; i++) {
pCoder->pos += tGetColData(pCoder->data + pCoder->pos, &colData);
}
} else { } else {
uint64_t nRow; uint64_t nRow;
if (tDecodeU64v(pCoder, &nRow) < 0) { if (tDecodeU64v(pCoder, &nRow) < 0) {
@ -200,6 +230,9 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int
} }
} }
*(int64_t *)(pCoder->data + pCoder->pos) = ctimeMs;
pCoder->pos += sizeof(int64_t);
tEndDecode(pCoder); tEndDecode(pCoder);
_exit: _exit:
@ -229,9 +262,10 @@ static int32_t vnodePreProcessSubmitMsg(SVnode *pVnode, SRpcMsg *pMsg) {
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
int64_t ctime = taosGetTimestampMs(); int64_t btimeMs = taosGetTimestampMs();
int64_t ctimeMs = btimeMs;
for (int32_t i = 0; i < nSubmitTbData; i++) { for (int32_t i = 0; i < nSubmitTbData; i++) {
code = vnodePreProcessSubmitTbData(pVnode, pCoder, ctime); code = vnodePreProcessSubmitTbData(pVnode, pCoder, btimeMs, ctimeMs);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
@ -261,6 +295,7 @@ static int32_t vnodePreProcessDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res); code = qWorkerProcessDeleteMsg(&handle, pVnode->pQuery, pMsg, &res);
if (code) goto _exit; if (code) goto _exit;
res.ctimeMs = taosGetTimestampMs();
// malloc and encode // malloc and encode
tEncodeSize(tEncodeDeleteRes, &res, size, ret); tEncodeSize(tEncodeDeleteRes, &res, size, ret);
pCont = rpcMallocCont(size + sizeof(SMsgHead)); pCont = rpcMallocCont(size + sizeof(SMsgHead));
@ -282,6 +317,31 @@ _exit:
return code; return code;
} }
static int32_t vnodePreProcessBatchDeleteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t lino = 0;
int64_t ctimeMs = taosGetTimestampMs();
SBatchDeleteReq pReq = {0};
SDecoder *pCoder = &(SDecoder){0};
tDecoderInit(pCoder, (uint8_t *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead));
if (tDecodeSBatchDeleteReqSetCtime(pCoder, &pReq, ctimeMs) < 0) {
code = TSDB_CODE_INVALID_MSG;
}
tDecoderClear(pCoder);
taosArrayDestroy(pReq.deleteReqs);
if (code) {
vError("vgId:%d %s failed at line %d since %s", TD_VID(pVnode), __func__, lino, tstrerror(code));
} else {
vTrace("vgId:%d %s done, ctimeMs generated:%" PRId64, TD_VID(pVnode), __func__, ctimeMs);
}
return code;
}
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) { int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0; int32_t code = 0;
@ -289,12 +349,18 @@ int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
case TDMT_VND_CREATE_TABLE: { case TDMT_VND_CREATE_TABLE: {
code = vnodePreProcessCreateTableMsg(pVnode, pMsg); code = vnodePreProcessCreateTableMsg(pVnode, pMsg);
} break; } break;
case TDMT_VND_ALTER_TABLE: {
code = vnodePreProcessAlterTableMsg(pVnode, pMsg);
} break;
case TDMT_VND_SUBMIT: { case TDMT_VND_SUBMIT: {
code = vnodePreProcessSubmitMsg(pVnode, pMsg); code = vnodePreProcessSubmitMsg(pVnode, pMsg);
} break; } break;
case TDMT_VND_DELETE: { case TDMT_VND_DELETE: {
code = vnodePreProcessDeleteMsg(pVnode, pMsg); code = vnodePreProcessDeleteMsg(pVnode, pMsg);
} break; } break;
case TDMT_VND_BATCH_DEL: {
code = vnodePreProcessBatchDeleteMsg(pVnode, pMsg);
} break;
default: default:
break; break;
} }
@ -641,8 +707,8 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq,
goto end; goto end;
} }
vDebug("vgId:%d, drop ttl table req will be processed, time:%d", pVnode->config.vgId, ttlReq.timestamp); vDebug("vgId:%d, drop ttl table req will be processed, time:%" PRId32, pVnode->config.vgId, ttlReq.timestampSec);
int32_t ret = metaTtlDropTable(pVnode->pMeta, ttlReq.timestamp, tbUids); int32_t ret = metaTtlDropTable(pVnode->pMeta, (int64_t)ttlReq.timestampSec * 1000, tbUids);
if (ret != 0) { if (ret != 0) {
goto end; goto end;
} }
@ -650,7 +716,7 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t ver, void *pReq,
tqUpdateTbUidList(pVnode->pTq, tbUids, false); tqUpdateTbUidList(pVnode->pTq, tbUids, false);
} }
vnodeAsyncRentention(pVnode, ttlReq.timestamp); vnodeAsyncRentention(pVnode, ttlReq.timestampSec);
end: end:
taosArrayDestroy(tbUids); taosArrayDestroy(tbUids);
@ -1386,6 +1452,9 @@ static int32_t vnodeProcessSubmitReq(SVnode *pVnode, int64_t ver, void *pReq, in
code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows); code = tsdbInsertTableData(pVnode->pTsdb, ver, pSubmitTbData, &affectedRows);
if (code) goto _exit; if (code) goto _exit;
code = metaUpdateChangeTime(pVnode->pMeta, pSubmitTbData->uid, pSubmitTbData->ctimeMs);
if (code) goto _exit;
pSubmitRsp->affectedRows += affectedRows; pSubmitRsp->affectedRows += affectedRows;
} }
@ -1640,6 +1709,14 @@ static int32_t vnodeProcessBatchDeleteReq(SVnode *pVnode, int64_t ver, void *pRe
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs); TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
} }
code = metaUpdateChangeTime(pVnode->pMeta, uid, deleteReq.ctimeMs);
if (code < 0) {
terrno = code;
vError("vgId:%d, update change time error since %s, suid:%" PRId64 ", uid:%" PRId64 ", start ts:%" PRId64
", end ts:%" PRId64,
TD_VID(pVnode), terrstr(), deleteReq.suid, uid, pOneReq->startTs, pOneReq->endTs);
}
tDecoderClear(&mr.coder); tDecoderClear(&mr.coder);
} }
metaReaderClear(&mr); metaReaderClear(&mr);
@ -1668,8 +1745,10 @@ static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t ver, void *pReq, in
ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0)); ASSERT(taosArrayGetSize(pRes->uidList) == 0 || (pRes->skey != 0 && pRes->ekey != 0));
for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) { for (int32_t iUid = 0; iUid < taosArrayGetSize(pRes->uidList); iUid++) {
code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, *(uint64_t *)taosArrayGet(pRes->uidList, iUid), uint64_t uid = *(uint64_t *)taosArrayGet(pRes->uidList, iUid);
pRes->skey, pRes->ekey); code = tsdbDeleteTableData(pVnode->pTsdb, ver, pRes->suid, uid, pRes->skey, pRes->ekey);
if (code) goto _err;
code = metaUpdateChangeTime(pVnode->pMeta, uid, pRes->ctimeMs);
if (code) goto _err; if (code) goto _err;
} }

View File

@ -1154,7 +1154,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
int32_t tableType = mr.me.type; int32_t tableType = mr.me.type;
if (tableType == TSDB_CHILD_TABLE) { if (tableType == TSDB_CHILD_TABLE) {
// create time // create time
int64_t ts = mr.me.ctbEntry.ctime; int64_t ts = mr.me.ctbEntry.btime;
pColInfoData = taosArrayGet(p->pDataBlock, 2); pColInfoData = taosArrayGet(p->pDataBlock, 2);
colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false); colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false);
@ -1206,7 +1206,7 @@ static SSDataBlock* sysTableBuildUserTablesByUids(SOperatorInfo* pOperator) {
} else if (tableType == TSDB_NORMAL_TABLE) { } else if (tableType == TSDB_NORMAL_TABLE) {
// create time // create time
pColInfoData = taosArrayGet(p->pDataBlock, 2); pColInfoData = taosArrayGet(p->pDataBlock, 2);
colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false); colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.btime, false);
// number of columns // number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3); pColInfoData = taosArrayGet(p->pDataBlock, 3);
@ -1338,7 +1338,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
int32_t tableType = pInfo->pCur->mr.me.type; int32_t tableType = pInfo->pCur->mr.me.type;
if (tableType == TSDB_CHILD_TABLE) { if (tableType == TSDB_CHILD_TABLE) {
// create time // create time
int64_t ts = pInfo->pCur->mr.me.ctbEntry.ctime; int64_t ts = pInfo->pCur->mr.me.ctbEntry.btime;
pColInfoData = taosArrayGet(p->pDataBlock, 2); pColInfoData = taosArrayGet(p->pDataBlock, 2);
colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false); colDataSetVal(pColInfoData, numOfRows, (char*)&ts, false);
@ -1392,7 +1392,7 @@ static SSDataBlock* sysTableBuildUserTables(SOperatorInfo* pOperator) {
} else if (tableType == TSDB_NORMAL_TABLE) { } else if (tableType == TSDB_NORMAL_TABLE) {
// create time // create time
pColInfoData = taosArrayGet(p->pDataBlock, 2); pColInfoData = taosArrayGet(p->pDataBlock, 2);
colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.ctime, false); colDataSetVal(pColInfoData, numOfRows, (char*)&pInfo->pCur->mr.me.ntbEntry.btime, false);
// number of columns // number of columns
pColInfoData = taosArrayGet(p->pDataBlock, 3); pColInfoData = taosArrayGet(p->pDataBlock, 3);

View File

@ -2361,7 +2361,7 @@ static int32_t checkHavingGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect)
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (NULL == getGroupByList(pCxt) && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow) { if (NULL == getGroupByList(pCxt) && NULL == pSelect->pPartitionByList && NULL == pSelect->pWindow) {
return code; return code;
} }
if (NULL != pSelect->pHaving) { if (NULL != pSelect->pHaving) {
code = checkExprForGroupBy(pCxt, &pSelect->pHaving); code = checkExprForGroupBy(pCxt, &pSelect->pHaving);
} }
@ -2372,7 +2372,7 @@ static int32_t checkHavingGroupBy(STranslateContext* pCxt, SSelectStmt* pSelect)
if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pOrderByList) { if (TSDB_CODE_SUCCESS == code && NULL != pSelect->pOrderByList) {
code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pOrderByList); code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pOrderByList);
} }
*/ */
return code; return code;
} }
@ -2680,7 +2680,7 @@ static int32_t replaceTbName(STranslateContext* pCxt, SSelectStmt* pSelect) {
SNode** pNode = NULL; SNode** pNode = NULL;
SRewriteTbNameContext pRewriteCxt = {0}; SRewriteTbNameContext pRewriteCxt = {0};
pRewriteCxt.pTbName = pTable->table.tableName; pRewriteCxt.pTbName = pTable->table.tableName;
nodesRewriteExprPostOrder(&pSelect->pWhere, doTranslateTbName, &pRewriteCxt); nodesRewriteExprPostOrder(&pSelect->pWhere, doTranslateTbName, &pRewriteCxt);
return pRewriteCxt.errCode; return pRewriteCxt.errCode;
@ -3177,7 +3177,7 @@ static int32_t translateSelectList(STranslateContext* pCxt, SSelectStmt* pSelect
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList); code = checkExprListForGroupBy(pCxt, pSelect, pSelect->pProjectionList);
} }
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateFillValues(pCxt, pSelect); code = translateFillValues(pCxt, pSelect);
} }
@ -4965,6 +4965,14 @@ static int32_t checkCreateTable(STranslateContext* pCxt, SCreateTableStmt* pStmt
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = checkTableSchema(pCxt, pStmt); code = checkTableSchema(pCxt, pStmt);
} }
if (TSDB_CODE_SUCCESS == code) {
if (createStable && pStmt->pOptions->ttl != 0) {
code = generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_TABLE_OPTION,
"Only supported for create non-super table in databases "
"configured with the 'TTL' option");
}
}
return code; return code;
} }

View File

@ -765,7 +765,7 @@ TEST_F(ParserInitialCTest, createStable) {
"TAGS (a1 TIMESTAMP, a2 INT, a3 INT UNSIGNED, a4 BIGINT, a5 BIGINT UNSIGNED, a6 FLOAT, a7 DOUBLE, " "TAGS (a1 TIMESTAMP, a2 INT, a3 INT UNSIGNED, a4 BIGINT, a5 BIGINT UNSIGNED, a6 FLOAT, a7 DOUBLE, "
"a8 BINARY(20), a9 SMALLINT, a10 SMALLINT UNSIGNED, a11 TINYINT, " "a8 BINARY(20), a9 SMALLINT, a10 SMALLINT UNSIGNED, a11 TINYINT, "
"a12 TINYINT UNSIGNED, a13 BOOL, a14 NCHAR(30), a15 VARCHAR(50)) " "a12 TINYINT UNSIGNED, a13 BOOL, a14 NCHAR(30), a15 VARCHAR(50)) "
"TTL 100 COMMENT 'test create table' SMA(c1, c2, c3) ROLLUP (MIN) MAX_DELAY 100s,10m WATERMARK 10a,1m " "COMMENT 'test create table' SMA(c1, c2, c3) ROLLUP (MIN) MAX_DELAY 100s,10m WATERMARK 10a,1m "
"DELETE_MARK 1000s,200m"); "DELETE_MARK 1000s,200m");
clearCreateStbReq(); clearCreateStbReq();
} }
@ -1005,7 +1005,7 @@ TEST_F(ParserInitialCTest, createTable) {
ASSERT_EQ(pReq->flags, pExpect->flags); ASSERT_EQ(pReq->flags, pExpect->flags);
ASSERT_EQ(std::string(pReq->name), std::string(pExpect->name)); ASSERT_EQ(std::string(pReq->name), std::string(pExpect->name));
ASSERT_EQ(pReq->uid, pExpect->uid); ASSERT_EQ(pReq->uid, pExpect->uid);
ASSERT_EQ(pReq->ctime, pExpect->ctime); ASSERT_EQ(pReq->btime, pExpect->btime);
ASSERT_EQ(pReq->ttl, pExpect->ttl); ASSERT_EQ(pReq->ttl, pExpect->ttl);
ASSERT_EQ(pReq->commentLen, pExpect->commentLen); ASSERT_EQ(pReq->commentLen, pExpect->commentLen);
ASSERT_EQ(std::string(pReq->comment), std::string(pExpect->comment)); ASSERT_EQ(std::string(pReq->comment), std::string(pExpect->comment));
@ -1038,7 +1038,7 @@ TEST_F(ParserInitialCTest, createTable) {
"TAGS (a1 TIMESTAMP, a2 INT, a3 INT UNSIGNED, a4 BIGINT, a5 BIGINT UNSIGNED, a6 FLOAT, a7 DOUBLE, a8 BINARY(20), " "TAGS (a1 TIMESTAMP, a2 INT, a3 INT UNSIGNED, a4 BIGINT, a5 BIGINT UNSIGNED, a6 FLOAT, a7 DOUBLE, a8 BINARY(20), "
"a9 SMALLINT, a10 SMALLINT UNSIGNED, a11 TINYINT, a12 TINYINT UNSIGNED, a13 BOOL, " "a9 SMALLINT, a10 SMALLINT UNSIGNED, a11 TINYINT, a12 TINYINT UNSIGNED, a13 BOOL, "
"a14 NCHAR(30), a15 VARCHAR(50)) " "a14 NCHAR(30), a15 VARCHAR(50)) "
"TTL 100 COMMENT 'test create table' SMA(c1, c2, c3) ROLLUP (MIN)"); "COMMENT 'test create table' SMA(c1, c2, c3) ROLLUP (MIN)");
run("CREATE TABLE IF NOT EXISTS t1 USING st1 TAGS(1, 'wxy', NOW)"); run("CREATE TABLE IF NOT EXISTS t1 USING st1 TAGS(1, 'wxy', NOW)");

View File

@ -141,7 +141,7 @@ void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
if (NULL == pMsgBody) { if (NULL == pMsgBody) {
return; return;
} }
taosMemoryFreeClear(pMsgBody->target.dbFName); taosMemoryFreeClear(pMsgBody->target.dbFName);
taosMemoryFreeClear(pMsgBody->msgInfo.pData); taosMemoryFreeClear(pMsgBody->msgInfo.pData);
if (pMsgBody->paramFreeFp) { if (pMsgBody->paramFreeFp) {
@ -173,7 +173,7 @@ int32_t asyncSendMsgToServerExt(void* pTransporter, SEpSet* epSet, int64_t* pTra
.contLen = pInfo->msgInfo.len, .contLen = pInfo->msgInfo.len,
.info.ahandle = (void*)pInfo, .info.ahandle = (void*)pInfo,
.info.handle = pInfo->msgInfo.handle, .info.handle = pInfo->msgInfo.handle,
.info.persistHandle = persistHandle, .info.persistHandle = persistHandle,
.code = 0 .code = 0
}; };
TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId); TRACE_SET_ROOTID(&rpcMsg.info.traceId, pInfo->requestId);
@ -252,7 +252,7 @@ void destroyQueryExecRes(SExecResult* pRes) {
taosMemoryFreeClear(pRes->res); taosMemoryFreeClear(pRes->res);
break; break;
} }
case TDMT_SCH_QUERY: case TDMT_SCH_QUERY:
case TDMT_SCH_MERGE_QUERY: { case TDMT_SCH_MERGE_QUERY: {
taosArrayDestroy((SArray*)pRes->res); taosArrayDestroy((SArray*)pRes->res);
break; break;
@ -532,7 +532,7 @@ int32_t cloneSVreateTbReq(SVCreateTbReq* pSrc, SVCreateTbReq** pDst) {
(*pDst)->name = taosStrdup(pSrc->name); (*pDst)->name = taosStrdup(pSrc->name);
} }
(*pDst)->uid = pSrc->uid; (*pDst)->uid = pSrc->uid;
(*pDst)->ctime = pSrc->ctime; (*pDst)->btime = pSrc->btime;
(*pDst)->ttl = pSrc->ttl; (*pDst)->ttl = pSrc->ttl;
(*pDst)->commentLen = pSrc->commentLen; (*pDst)->commentLen = pSrc->commentLen;
if (pSrc->comment) { if (pSrc->comment) {

View File

@ -427,6 +427,7 @@ int32_t qwGetDeleteResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SDeleteRes *pRes
pRes->suid = pDelRes->suid; pRes->suid = pDelRes->suid;
pRes->uidList = pDelRes->uidList; pRes->uidList = pDelRes->uidList;
pRes->ctimeMs = taosGetTimestampMs();
pRes->skey = pDelRes->skey; pRes->skey = pDelRes->skey;
pRes->ekey = pDelRes->ekey; pRes->ekey = pDelRes->ekey;
pRes->affectedRows = pDelRes->affectedRows; pRes->affectedRows = pDelRes->affectedRows;

View File

@ -46,12 +46,16 @@ int32_t tdbAlter(TDB *pDb, int pages);
int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb, int32_t tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprFn, TDB *pEnv, TTB **ppTb,
int8_t rollback); int8_t rollback);
int32_t tdbTbClose(TTB *pTb); int32_t tdbTbClose(TTB *pTb);
bool tdbTbExist(const char *tbname, TDB *pEnv);
int tdbTbDropByName(const char *tbname, TDB *pEnv, TXN* pTxn);
int32_t tdbTbDrop(TTB *pTb); int32_t tdbTbDrop(TTB *pTb);
int32_t tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn); int32_t tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn);
int32_t tdbTbDelete(TTB *pTb, const void *pKey, int kLen, TXN *pTxn); int32_t tdbTbDelete(TTB *pTb, const void *pKey, int kLen, TXN *pTxn);
int32_t tdbTbUpsert(TTB *pTb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn); int32_t tdbTbUpsert(TTB *pTb, const void *pKey, int kLen, const void *pVal, int vLen, TXN *pTxn);
int32_t tdbTbGet(TTB *pTb, const void *pKey, int kLen, void **ppVal, int *vLen); int32_t tdbTbGet(TTB *pTb, const void *pKey, int kLen, void **ppVal, int *vLen);
int32_t tdbTbPGet(TTB *pTb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen); int32_t tdbTbPGet(TTB *pTb, const void *pKey, int kLen, void **ppKey, int *pkLen, void **ppVal, int *vLen);
int32_t tdbTbTraversal(TTB *pTb, void *data,
int32_t (*func)(const void *pKey, int keyLen, const void *pVal, int valLen, void *data));
// TBC // TBC
int32_t tdbTbcOpen(TTB *pTb, TBC **ppTbc, TXN *pTxn); int32_t tdbTbcOpen(TTB *pTb, TBC **ppTbc, TXN *pTxn);

View File

@ -134,11 +134,67 @@ int tdbTbClose(TTB *pTb) {
return 0; return 0;
} }
bool tdbTbExist(const char *tbname, TDB *pEnv) {
bool exist = false;
SPager *pPager;
char fFullName[TDB_FILENAME_LEN];
#ifdef USE_MAINDB
snprintf(fFullName, TDB_FILENAME_LEN, "%s/%s", pEnv->dbName, TDB_MAINDB_NAME);
if (strcmp(TDB_MAINDB_NAME, tbname)) {
pPager = tdbEnvGetPager(pEnv, fFullName);
exist = tdbTbGet(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, NULL, NULL) == 0;
} else {
exist = taosCheckExistFile(fFullName);
}
#else
snprintf(fFullName, TDB_FILENAME_LEN, "%s/%s", pEnv->dbName, tbname);
exist = taosCheckExistFile(fFullName);
#endif
return exist;
}
int tdbTbDrop(TTB *pTb) { int tdbTbDrop(TTB *pTb) {
// TODO // TODO
return 0; return 0;
} }
int tdbTbDropByName(const char *tbname, TDB *pEnv, TXN *pTxn) {
int ret;
SPager *pPager;
char fFullName[TDB_FILENAME_LEN];
#ifdef USE_MAINDB
snprintf(fFullName, TDB_FILENAME_LEN, "%s/%s", pEnv->dbName, TDB_MAINDB_NAME);
if (strcmp(TDB_MAINDB_NAME, tbname)) {
pPager = tdbEnvGetPager(pEnv, fFullName);
ret = tdbTbDelete(pPager->pEnv->pMainDb, tbname, strlen(tbname) + 1, pTxn);
} else {
ret = taosRemoveFile(fFullName);
}
#else
snprintf(fFullName, TDB_FILENAME_LEN, "%s/%s", pEnv->dbName, tbname);
ret = taosRemoveFile(fFullName);
#endif
return ret;
}
int tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) { int tdbTbInsert(TTB *pTb, const void *pKey, int keyLen, const void *pVal, int valLen, TXN *pTxn) {
return tdbBtreeInsert(pTb->pBt, pKey, keyLen, pVal, valLen, pTxn); return tdbBtreeInsert(pTb->pBt, pKey, keyLen, pVal, valLen, pTxn);
} }
@ -173,6 +229,38 @@ int tdbTbcOpen(TTB *pTb, TBC **ppTbc, TXN *pTxn) {
return 0; return 0;
} }
int32_t tdbTbTraversal(TTB *pTb, void *data,
int32_t (*func)(const void *pKey, int keyLen, const void *pVal, int valLen, void *data)) {
TBC *pCur;
int ret = tdbTbcOpen(pTb, &pCur, NULL);
if (ret < 0) {
return ret;
}
tdbTbcMoveToFirst(pCur);
void *pKey = NULL;
int kLen = 0;
void *pValue = NULL;
int vLen = 0;
while (1) {
ret = tdbTbcNext(pCur, &pKey, &kLen, &pValue, &vLen);
if (ret < 0) {
ret = 0;
break;
}
ret = func(pKey, kLen, pValue, vLen, data);
if (ret < 0) break;
}
tdbFree(pKey);
tdbFree(pValue);
tdbTbcClose(pCur);
return 0;
}
int tdbTbcMoveTo(TBC *pTbc, const void *pKey, int kLen, int *c) { return tdbBtcMoveTo(&pTbc->btc, pKey, kLen, c); } int tdbTbcMoveTo(TBC *pTbc, const void *pKey, int kLen, int *c) { return tdbBtcMoveTo(&pTbc->btc, pKey, kLen, c); }
int tdbTbcMoveToFirst(TBC *pTbc) { return tdbBtcMoveToFirst(&pTbc->btc); } int tdbTbcMoveToFirst(TBC *pTbc) { return tdbBtcMoveToFirst(&pTbc->btc); }

View File

@ -35,14 +35,14 @@ class TDTestCase:
tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.ttl_param}') tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.ttl_param}')
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(0) tdSql.checkRows(0)
for i in range(self.tbnum): for i in range(self.tbnum):
tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.default_ttl}') tdSql.execute(f'create table db.{self.ntbname}_{i} (ts timestamp,c0 int) ttl {self.default_ttl}')
for i in range(int(self.tbnum/2)): for i in range(int(self.tbnum/2)):
tdSql.execute(f'alter table db.{self.ntbname}_{i} ttl {self.modify_ttl}') tdSql.execute(f'alter table db.{self.ntbname}_{i} ttl {self.modify_ttl}')
sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum - int(self.tbnum/2)) tdSql.checkRows(self.tbnum - int(self.tbnum/2))
tdSql.execute('drop database db') tdSql.execute('drop database db')
@ -54,7 +54,7 @@ class TDTestCase:
tdSql.execute(f'create table db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param}') tdSql.execute(f'create table db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param}')
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(0) tdSql.checkRows(0)
for i in range(self.tbnum): for i in range(self.tbnum):
@ -63,7 +63,7 @@ class TDTestCase:
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
for i in range(int(self.tbnum/2)): for i in range(int(self.tbnum/2)):
tdSql.execute(f'alter table db.{self.stbname}_{i} ttl {self.modify_ttl}') tdSql.execute(f'alter table db.{self.stbname}_{i} ttl {self.modify_ttl}')
sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.modify_ttl+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum - int(self.tbnum/2)) tdSql.checkRows(self.tbnum - int(self.tbnum/2))
tdSql.execute('drop database db') tdSql.execute('drop database db')
@ -75,7 +75,7 @@ class TDTestCase:
tdSql.execute(f'insert into db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param} values(now,1)') tdSql.execute(f'insert into db.{self.stbname}_{i} using db.{self.stbname} tags({i}) ttl {self.ttl_param} values(now,1)')
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(self.tbnum) tdSql.checkRows(self.tbnum)
sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval']) sleep(self.updatecfgDict['ttlUnit']*self.ttl_param+self.updatecfgDict['ttlPushInterval'] + 1)
tdSql.query(f'show db.tables') tdSql.query(f'show db.tables')
tdSql.checkRows(0) tdSql.checkRows(0)
tdSql.execute('drop database db') tdSql.execute('drop database db')

View File

@ -52,7 +52,7 @@ int buildStable(TAOS* pConn, TAOS_RES* pRes) {
pRes = taos_query(pConn, "create table d2 using meters tags(3, 'San Francisco')"); pRes = taos_query(pConn, "create table d2 using meters tags(3, 'San Francisco')");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create child table d1, reason:%s\n", taos_errstr(pRes)); printf("failed to create child table d2, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
} }
taos_free_result(pRes); taos_free_result(pRes);