diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fb32cb382a..ab191313ec 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1806,6 +1806,8 @@ typedef struct SVCreateTbReq { tb_uid_t uid; int64_t ctime; int32_t ttl; + int32_t commentLen; + char* comment; int8_t type; union { struct { @@ -1823,6 +1825,7 @@ int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq); static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) { taosMemoryFreeClear(req->name); + taosMemoryFreeClear(req->comment); if (req->type == TSDB_CHILD_TABLE) { taosMemoryFreeClear(req->ctb.pTag); } else if (req->type == TSDB_NORMAL_TABLE) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 945238ba1c..0fd9eb92c5 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -498,7 +498,7 @@ int32_t tSerializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pReq } if (pReq->commentLen > 0) { - if (tEncodeCStrWithLen(&encoder, pReq->comment, pReq->commentLen) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->comment) < 0) return -1; } if (pReq->ast1Len > 0) { if (tEncodeBinary(&encoder, pReq->pAst1, pReq->ast1Len) < 0) return -1; @@ -561,7 +561,7 @@ int32_t tDeserializeSMCreateStbReq(void *buf, int32_t bufLen, SMCreateStbReq *pR } if (pReq->commentLen > 0) { - pReq->comment = taosMemoryCalloc(1, pReq->commentLen + 1); + pReq->comment = taosMemoryMalloc(pReq->commentLen); if (pReq->comment == NULL) return -1; if (tDecodeCStrTo(&decoder, pReq->comment) < 0) return -1; } @@ -4321,6 +4321,10 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) { if (tEncodeI64(pCoder, pReq->ctime) < 0) return -1; if (tEncodeI32(pCoder, pReq->ttl) < 0) return -1; if (tEncodeI8(pCoder, pReq->type) < 0) return -1; + if (tEncodeI32(pCoder, pReq->commentLen) < 0) return -1; + if (pReq->commentLen > 0) { + if (tEncodeCStr(pCoder, pReq->comment) < 0) return -1; + } if (pReq->type == TSDB_CHILD_TABLE) { if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1; @@ -4344,6 +4348,12 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { if (tDecodeI64(pCoder, &pReq->ctime) < 0) return -1; if (tDecodeI32(pCoder, &pReq->ttl) < 0) return -1; if (tDecodeI8(pCoder, &pReq->type) < 0) return -1; + if (tDecodeI32(pCoder, &pReq->commentLen) < 0) return -1; + if (pReq->commentLen > 0) { + pReq->comment = taosMemoryMalloc(pReq->commentLen); + if (pReq->comment == NULL) return -1; + if (tDecodeCStrTo(pCoder, pReq->comment) < 0) return -1; + } if (pReq->type == TSDB_CHILD_TABLE) { if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1; diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 3b2eafced8..235f026066 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -77,12 +77,52 @@ static void mndPullupTelem(SMnode *pMnode) { tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg); } +static void mndPushTtlTime(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + SVgObj *pVgroup = NULL; + void *pIter = NULL; + + while (1) { + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + int32_t contLen = sizeof(SMsgHead) + sizeof(int32_t); + + SMsgHead *pHead = taosMemoryMalloc(contLen); + if (pHead == NULL) { + mError("ttl time malloc err. contLen:%d", contLen); + sdbRelease(pSdb, pVgroup); + continue; + } + pHead->contLen = htonl(pHead->contLen); + pHead->vgId = htonl(pHead->vgId); + + int32_t t = taosGetTimestampSec(); + *(int32_t*)(pHead + sizeof(SMsgHead)) = htonl(t); + + SRpcMsg rpcMsg = {.msgType = TDMT_VND_DROP_TTL_TABLE, .pCont = pHead, .contLen = contLen}; + + SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup); + int32_t code = tmsgSendReq(&epSet, &rpcMsg); + if(code != 0){ + mError("ttl time seed err. code:%d", code); + } + mError("ttl time seed succ. time:%d", t); + sdbRelease(pSdb, pVgroup); + taosMemoryFree(pHead); + } +} + static void *mndThreadFp(void *param) { SMnode *pMnode = param; int64_t lastTime = 0; setThreadName("mnode-timer"); while (1) { + if (lastTime % (100) == 0) { // sleep 1 day for ttl + mndPushTtlTime(pMnode); + } + lastTime++; taosMsleep(100); if (mndGetStop(pMnode)) break; @@ -98,6 +138,8 @@ static void *mndThreadFp(void *param) { if (lastTime % (tsTelemInterval * 10) == 0) { mndPullupTelem(pMnode); } + + } return NULL; diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index d498b640cb..9559b05b6b 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -671,12 +671,12 @@ int32_t mndBuildStbFromReq(SMnode *pMnode, SStbObj *pDst, SMCreateStbReq *pCreat pDst->numOfTags = pCreate->numOfTags; pDst->commentLen = pCreate->commentLen; if (pDst->commentLen > 0) { - pDst->comment = taosMemoryCalloc(pDst->commentLen + 1, 1); + pDst->comment = taosMemoryCalloc(pDst->commentLen, 1); if (pDst->comment == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - memcpy(pDst->comment, pCreate->comment, pDst->commentLen + 1); + memcpy(pDst->comment, pCreate->comment, pDst->commentLen); } pDst->ast1Len = pCreate->ast1Len; @@ -1208,7 +1208,6 @@ static int32_t mndSetAlterStbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj sdbRelease(pSdb, pVgroup); return -1; } - STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); action.pCont = pReq; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 3cfca66a39..c1d86e32c8 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -210,12 +210,14 @@ struct SMetaEntry { struct { int64_t ctime; int32_t ttlDays; + char *comment; tb_uid_t suid; uint8_t *pTags; } ctbEntry; struct { int64_t ctime; int32_t ttlDays; + char *comment; int32_t ncid; // next column id SSchemaWrapper schemaRow; } ntbEntry; diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index db99257ea7..58b28fed82 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -29,12 +29,14 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { } else if (pME->type == TSDB_CHILD_TABLE) { if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1; if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1; + if (tEncodeCStr(pCoder, pME->ctbEntry.comment) < 0) return -1; if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1; debugCheckTags((STag*)pME->ctbEntry.pTags); // TODO: remove after debug if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1; } else if (pME->type == TSDB_NORMAL_TABLE) { if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1; if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1; + if (tEncodeCStr(pCoder, pME->ntbEntry.comment) < 0) return -1; if (tEncodeI32v(pCoder, pME->ntbEntry.ncid) < 0) return -1; if (tEncodeSSchemaWrapper(pCoder, &pME->ntbEntry.schemaRow) < 0) return -1; } else if (pME->type == TSDB_TSMA_TABLE) { @@ -61,12 +63,14 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { } else if (pME->type == TSDB_CHILD_TABLE) { if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1; + if (tDecodeCStr(pCoder, &pME->ctbEntry.comment) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1; if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO) debugCheckTags((STag*)pME->ctbEntry.pTags); // TODO: remove after debug } else if (pME->type == TSDB_NORMAL_TABLE) { if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; + if (tDecodeCStr(pCoder, &pME->ntbEntry.comment) < 0) return -1; if (tDecodeI32v(pCoder, &pME->ntbEntry.ncid) < 0) return -1; if (tDecodeSSchemaWrapperEx(pCoder, &pME->ntbEntry.schemaRow) < 0) return -1; } else if (pME->type == TSDB_TSMA_TABLE) { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 717344fff2..c0eab28863 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -320,11 +320,13 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { if (me.type == TSDB_CHILD_TABLE) { me.ctbEntry.ctime = pReq->ctime; me.ctbEntry.ttlDays = pReq->ttl; + me.ctbEntry.comment = pReq->comment; me.ctbEntry.suid = pReq->ctb.suid; me.ctbEntry.pTags = pReq->ctb.pTag; } else { me.ntbEntry.ctime = pReq->ctime; me.ntbEntry.ttlDays = pReq->ttl; + me.ntbEntry.comment = pReq->comment; me.ntbEntry.schemaRow = pReq->ntb.schemaRow; me.ntbEntry.ncid = me.ntbEntry.schemaRow.pSchema[me.ntbEntry.schemaRow.nCols - 1].colId + 1; } @@ -412,12 +414,11 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { void * pData = NULL; int nData = 0; int rc = 0; - int64_t version; SMetaEntry e = {0}; SDecoder dc = {0}; rc = tdbTbGet(pMeta->pUidIdx, &uid, sizeof(uid), &pData, &nData); - version = *(int64_t *)pData; + int64_t version = *(int64_t *)pData; tdbTbGet(pMeta->pTbDb, &(STbDbKey){.version = version, .uid = uid}, sizeof(STbDbKey), &pData, &nData); @@ -439,6 +440,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { // drop schema.db (todo) } + metaError("ttl drop table:%s", e.name); tDecoderClear(&dc); tdbFree(pData); @@ -494,6 +496,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl tDecoderInit(&dc, entry.pBuf, nData); ret = metaDecodeEntry(&dc, &entry); ASSERT(ret == 0); + tDecoderClear(&dc); if (entry.type != TSDB_NORMAL_TABLE) { terrno = TSDB_CODE_VND_INVALID_TABLE_ACTION; @@ -579,6 +582,7 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl // save to table db metaSaveToTbDb(pMeta, &entry); + tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0); metaSaveToSkmDb(pMeta, &entry); @@ -587,14 +591,14 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp); + if (entry.pBuf) taosMemoryFree(entry.pBuf); if (pNewSchema) taosMemoryFree(pNewSchema); - tDecoderClear(&dc); tdbTbcClose(pTbDbc); tdbTbcClose(pUidIdxc); return 0; _err: - tDecoderClear(&dc); + if (entry.pBuf) taosMemoryFree(entry.pBuf); tdbTbcClose(pTbDbc); tdbTbcClose(pUidIdxc); return -1; @@ -748,18 +752,84 @@ _err: } static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) { - if (commentLen > 0) { - pNew->commentLen = commentLen; - pNew->comment = taosMemoryCalloc(1, commentLen); - if (pNew->comment == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + void * pVal = NULL; + int nVal = 0; + const void * pData = NULL; + int nData = 0; + int ret = 0; + tb_uid_t uid; + int64_t oversion; + SMetaEntry entry = {0}; + int c = 0; + + // search name index + ret = tdbTbGet(pMeta->pNameIdx, pAlterTbReq->tbName, strlen(pAlterTbReq->tbName) + 1, &pVal, &nVal); + if (ret < 0) { + terrno = TSDB_CODE_VND_TABLE_NOT_EXIST; + return -1; + } + + uid = *(tb_uid_t *)pVal; + tdbFree(pVal); + pVal = NULL; + + // search uid index + TBC *pUidIdxc = NULL; + + tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); + tdbTbcMoveTo(pUidIdxc, &uid, sizeof(uid), &c); + ASSERT(c == 0); + + tdbTbcGet(pUidIdxc, NULL, NULL, &pData, &nData); + oversion = *(int64_t *)pData; + + // search table.db + TBC *pTbDbc = NULL; + + tdbTbcOpen(pMeta->pTbDb, &pTbDbc, &pMeta->txn); + tdbTbcMoveTo(pTbDbc, &((STbDbKey){.uid = uid, .version = oversion}), sizeof(STbDbKey), &c); + ASSERT(c == 0); + tdbTbcGet(pTbDbc, NULL, NULL, &pData, &nData); + + // get table entry + SDecoder dc = {0}; + entry.pBuf = taosMemoryMalloc(nData); + memcpy(entry.pBuf, pData, nData); + tDecoderInit(&dc, entry.pBuf, nData); + ret = metaDecodeEntry(&dc, &entry); + ASSERT(ret == 0); + tDecoderClear(&dc); + + entry.version = version; + metaWLock(pMeta); + // build SMetaEntry + if (entry.type == TSDB_CHILD_TABLE) { + if(pAlterTbReq->updateTTL) { + metaDeleteTtlIdx(pMeta, &entry); + entry.ctbEntry.ttlDays = pAlterTbReq->newTTL; + metaUpdateTtlIdx(pMeta, &entry); } - memcpy(pNew->comment, pComment, commentLen); - } - if (ttl >= 0) { - pNew->ttl = ttl; + if(pAlterTbReq->updateComment) entry.ctbEntry.comment = pAlterTbReq->newComment; + } else { + if(pAlterTbReq->updateTTL) { + metaDeleteTtlIdx(pMeta, &entry); + entry.ntbEntry.ttlDays = pAlterTbReq->newTTL; + metaUpdateTtlIdx(pMeta, &entry); + } + if(pAlterTbReq->updateComment) entry.ntbEntry.comment = pAlterTbReq->newComment; } + + // save to table db + metaSaveToTbDb(pMeta, &entry); + + tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn); + tdbTbcUpsert(pUidIdxc, &entry.uid, sizeof(tb_uid_t), &version, sizeof(version), 0); + + metaULock(pMeta); + + tdbTbcClose(pTbDbc); + tdbTbcClose(pUidIdxc); + if (entry.pBuf) taosMemoryFree(entry.pBuf); return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 2540f953e0..bc808ad24a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -398,7 +398,9 @@ static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *p SArray *tbUids = taosArrayInit(8, sizeof(int64_t)); if (tbUids == NULL) return TSDB_CODE_OUT_OF_MEMORY; - int32_t ret = metaTtlDropTable(pVnode->pMeta, *(int64_t*)pReq, tbUids); + int32_t t = ntohl(*(int32_t*)pReq); + vError("rec ttl time:%d", t); + int32_t ret = metaTtlDropTable(pVnode->pMeta, t, tbUids); if(ret != 0){ goto end; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 05f1c64824..faf295518c 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -4253,6 +4253,13 @@ static int32_t buildNormalTableBatchReq(int32_t acctId, const SCreateTableStmt* req.type = TD_NORMAL_TABLE; req.name = strdup(pStmt->tableName); req.ttl = pStmt->pOptions->ttl; + if ('\0' != pStmt->pOptions->comment[0]) { + req.comment = strdup(pStmt->pOptions->comment); + if (NULL == req.comment) { + return TSDB_CODE_OUT_OF_MEMORY; + } + req.commentLen = strlen(pStmt->pOptions->comment) + 1; + } req.ntb.schemaRow.nCols = LIST_LENGTH(pStmt->pCols); req.ntb.schemaRow.version = 1; req.ntb.schemaRow.pSchema = taosMemoryCalloc(req.ntb.schemaRow.nCols, sizeof(SSchema)); @@ -4404,6 +4411,10 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S req.type = TD_CHILD_TABLE; req.name = strdup(pStmt->tableName); req.ttl = pStmt->pOptions->ttl; + if ('\0' != pStmt->pOptions->comment[0]) { + req.comment = strdup(pStmt->pOptions->comment); + req.commentLen = strlen(pStmt->pOptions->comment) + 1; + } req.ctb.suid = suid; req.ctb.pTag = (uint8_t*)pTag; if (pStmt->ignoreExists) { @@ -4982,7 +4993,7 @@ static int32_t buildUpdateOptionsReq(STranslateContext* pCxt, SAlterTableStmt* p } } - if (TSDB_CODE_SUCCESS == code) { + if (TSDB_CODE_SUCCESS == code && '\0' != pStmt->pOptions->comment[0]) { pReq->updateComment = true; pReq->newComment = strdup(pStmt->pOptions->comment); if (NULL == pReq->newComment) {