From 4e1a3550525fa7973938779f7c350ee9a7dd6167 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 23 Nov 2023 04:43:28 +0000 Subject: [PATCH] mnode for compact --- include/common/systable.h | 2 + include/common/tglobal.h | 1 + include/common/tmsg.h | 32 + include/common/tmsgdef.h | 3 + source/common/src/systable.c | 16 + source/common/src/tglobal.c | 6 + source/common/src/tmsg.c | 106 +++ source/dnode/mgmt/mgmt_mnode/src/mmHandle.c | 2 + source/dnode/mnode/impl/inc/mndCompact.h | 56 ++ .../dnode/mnode/impl/inc/mndCompactDetail.h | 48 ++ source/dnode/mnode/impl/inc/mndDef.h | 16 + source/dnode/mnode/impl/src/mndCompact.c | 694 ++++++++++++++++++ .../dnode/mnode/impl/src/mndCompactDetail.c | 289 ++++++++ source/dnode/mnode/impl/src/mndMain.c | 21 +- source/dnode/mnode/impl/src/mndShow.c | 4 + source/dnode/mnode/sdb/inc/sdb.h | 4 +- source/dnode/mnode/sdb/src/sdbHash.c | 6 +- 17 files changed, 1303 insertions(+), 3 deletions(-) create mode 100644 source/dnode/mnode/impl/inc/mndCompact.h create mode 100644 source/dnode/mnode/impl/inc/mndCompactDetail.h create mode 100644 source/dnode/mnode/impl/src/mndCompact.c create mode 100644 source/dnode/mnode/impl/src/mndCompactDetail.c diff --git a/include/common/systable.h b/include/common/systable.h index b44d8ce1d6..92e7915424 100644 --- a/include/common/systable.h +++ b/include/common/systable.h @@ -50,6 +50,8 @@ extern "C" { #define TSDB_INS_TABLE_STREAM_TASKS "ins_stream_tasks" #define TSDB_INS_TABLE_USER_PRIVILEGES "ins_user_privileges" #define TSDB_INS_TABLE_VIEWS "ins_views" +#define TSDB_INS_TABLE_COMPACTS "ins_compacts" +#define TSDB_INS_TABLE_COMPACT_DETAILS "ins_compact_details" #define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema" #define TSDB_PERFS_TABLE_SMAS "perf_smas" diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 58517a5db0..3470c72527 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -192,6 +192,7 @@ extern int64_t tsWalFsyncDataSizeLimit; // internal extern int32_t tsTransPullupInterval; +extern int32_t tsCompactPullupInterval; extern int32_t tsMqRebalanceInterval; extern int32_t tsStreamCheckpointInterval; extern float tsSinkDataRate; diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 8a56c2fad4..5fcb3a1fcc 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -144,6 +144,8 @@ typedef enum _mgmt_table { TSDB_MGMT_TABLE_STREAM_TASKS, TSDB_MGMT_TABLE_PRIVILEGES, TSDB_MGMT_TABLE_VIEWS, + TSDB_MGMT_TABLE_COMPACT, + TSDB_MGMT_TABLE_COMPACT_DETAIL, TSDB_MGMT_TABLE_MAX, } EShowType; @@ -1675,6 +1677,26 @@ int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pR int32_t tDeserializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq); int32_t tFreeSCreateVnodeReq(SCreateVnodeReq* pReq); +typedef struct { + int32_t compactId; + int32_t vgId; + int32_t dnodeId; +} SQueryCompactProgressReq; + +int32_t tSerializeSQueryCompactProgressReq(void* buf, int32_t bufLen, SQueryCompactProgressReq* pReq); +int32_t tDeserializeSQueryCompactProgressReq(void* buf, int32_t bufLen, SQueryCompactProgressReq* pReq); + +typedef struct { + int32_t compactId; + int32_t vgId; + int32_t dnodeId; + int32_t numberFileset; + int32_t finished; +} SQueryCompactProgressRsp; + +int32_t tSerializeSQueryCompactProgressRsp(void* buf, int32_t bufLen, SQueryCompactProgressRsp* pReq); +int32_t tDeserializeSQueryCompactProgressRsp(void* buf, int32_t bufLen, SQueryCompactProgressRsp* pReq); + typedef struct { int32_t vgId; int32_t dnodeId; @@ -1702,11 +1724,21 @@ typedef struct { char db[TSDB_DB_FNAME_LEN]; int64_t compactStartTime; STimeWindow tw; + int32_t compactId; } SCompactVnodeReq; int32_t tSerializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq); int32_t tDeserializeSCompactVnodeReq(void* buf, int32_t bufLen, SCompactVnodeReq* pReq); +typedef struct { + int32_t compactId; + int32_t vgId; + int32_t dnodeId; +} SVKillCompactReq; + +int32_t tSerializeSVKillCompactReq(void* buf, int32_t bufLen, SVKillCompactReq* pReq); +int32_t tDeserializeSVKillCompactReq(void* buf, int32_t bufLen, SVKillCompactReq* pReq); + typedef struct { int32_t vgVersion; int32_t buffer; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index a1cc6a8739..212d0cdf39 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -193,6 +193,7 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_MND_DROP_VIEW, "drop-view", SCMDropViewReq, NULL) TD_DEF_MSG_TYPE(TDMT_MND_VIEW_META, "view-meta", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_KILL_COMPACT, "kill-compact", SKillCompactReq, NULL) + TD_DEF_MSG_TYPE(TDMT_MND_COMPACT_TIMER, "compact-tmr", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_VND_MSG) @@ -242,6 +243,8 @@ enum { // WARN: new msg should be appended to segment tail TD_DEF_MSG_TYPE(TDMT_VND_CREATE_INDEX, "vnode-create-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DROP_INDEX, "vnode-drop-index", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_DISABLE_WRITE, "vnode-disable-write", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_QUERY_COMPACT_PROGRESS, "vnode-query-compact-progress", NULL, NULL) // no longer used + TD_DEF_MSG_TYPE(TDMT_VND_KILL_COMPACT, "kill-compact", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MAX_MSG, "vnd-max", NULL, NULL) TD_NEW_MSG_SEG(TDMT_SCH_MSG) diff --git a/source/common/src/systable.c b/source/common/src/systable.c index 5f44d3e7fc..6750f65be5 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -332,6 +332,20 @@ static const SSysDbTableSchema userViewsSchema[] = { // {.name = "column_list", .bytes = 2048 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, }; +static const SSysDbTableSchema userCompactsSchema[] = { + {.name = "compact_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR, .sysInfo = false}, + {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, +}; + +static const SSysDbTableSchema userCompactsDetailSchema[] = { + {.name = "compact_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "dnode_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "number_fileset", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "finished", .bytes = 4, .type = TSDB_DATA_TYPE_INT, .sysInfo = false}, + {.name = "start_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP, .sysInfo = false}, +}; static const SSysTableMeta infosMeta[] = { {TSDB_INS_TABLE_DNODES, dnodesSchema, tListLen(dnodesSchema), true}, @@ -360,6 +374,8 @@ static const SSysTableMeta infosMeta[] = { {TSDB_INS_TABLE_VNODES, vnodesSchema, tListLen(vnodesSchema), true}, {TSDB_INS_TABLE_USER_PRIVILEGES, userUserPrivilegesSchema, tListLen(userUserPrivilegesSchema), true}, {TSDB_INS_TABLE_VIEWS, userViewsSchema, tListLen(userViewsSchema), false}, + {TSDB_INS_TABLE_COMPACTS, userCompactsSchema, tListLen(userCompactsSchema), false}, + {TSDB_INS_TABLE_COMPACT_DETAILS, userCompactsDetailSchema, tListLen(userCompactsDetailSchema), false}, }; static const SSysDbTableSchema connectionsSchema[] = { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 7a2ae90cda..7591cf2edc 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -247,6 +247,7 @@ int32_t tsTtlBatchDropNum = 10000; // number of tables dropped per batch // internal int32_t tsTransPullupInterval = 2; +int32_t tsCompactPullupInterval = 10; int32_t tsMqRebalanceInterval = 2; int32_t tsStreamCheckpointInterval = 60; float tsSinkDataRate = 2.0; @@ -689,6 +690,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "transPullupInterval", tsTransPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; + if (cfgAddInt32(pCfg, "transPullupInterval", tsCompactPullupInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != + 0) + return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -1135,6 +1139,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tmqMaxTopicNum = cfgGetItem(pCfg, "tmqMaxTopicNum")->i32; tsTransPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; + tsCompactPullupInterval = cfgGetItem(pCfg, "transPullupInterval")->i32; tsMqRebalanceInterval = cfgGetItem(pCfg, "mqRebalanceInterval")->i32; tsTtlUnit = cfgGetItem(pCfg, "ttlUnit")->i32; tsTtlPushIntervalSec = cfgGetItem(pCfg, "ttlPushInterval")->i32; @@ -1449,6 +1454,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, char *name) { {"timeseriesThreshold", &tsTimeSeriesThreshold}, {"tmqMaxTopicNum", &tmqMaxTopicNum}, {"transPullupInterval", &tsTransPullupInterval}, + {"compactPullupInterval", &tsCompactPullupInterval}, {"trimVDbIntervalSec", &tsTrimVDbIntervalSec}, {"ttlBatchDropNum", &tsTtlBatchDropNum}, {"ttlFlushThreshold", &tsTtlFlushThreshold}, diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 61cb294026..de7cae40c1 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5122,6 +5122,74 @@ int32_t tFreeSCreateVnodeReq(SCreateVnodeReq *pReq) { return 0; } +int32_t tSerializeSQueryCompactProgressReq(void* buf, int32_t bufLen, SQueryCompactProgressReq* pReq){ + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI32(&encoder, pReq->compactId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSQueryCompactProgressReq(void* buf, int32_t bufLen, SQueryCompactProgressReq* pReq){ + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSQueryCompactProgressRsp(void* buf, int32_t bufLen, SQueryCompactProgressRsp* pReq){ + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI32(&encoder, pReq->compactId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->numberFileset) < 0) return -1; + if (tEncodeI32(&encoder, pReq->finished) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} +int32_t tDeserializeSQueryCompactProgressRsp(void* buf, int32_t bufLen, SQueryCompactProgressRsp* pReq){ + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->numberFileset) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->finished) < 0) return -1; + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + + int32_t tSerializeSDropVnodeReq(void *buf, int32_t bufLen, SDropVnodeReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); @@ -5210,6 +5278,8 @@ int32_t tSerializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq * if (tEncodeI64(&encoder, pReq->tw.skey) < 0) return -1; if (tEncodeI64(&encoder, pReq->tw.ekey) < 0) return -1; + if (tEncodeI32(&encoder, pReq->compactId) < 0) return -1; + tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -5236,6 +5306,42 @@ int32_t tDeserializeSCompactVnodeReq(void *buf, int32_t bufLen, SCompactVnodeReq if (tDecodeI64(&decoder, &pReq->tw.ekey) < 0) return -1; } + if(!tDecodeIsEnd(&decoder)){ + if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -1; + } + + tEndDecode(&decoder); + tDecoderClear(&decoder); + return 0; +} + +int32_t tSerializeSVKillCompactReq(void *buf, int32_t bufLen, SVKillCompactReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI32(&encoder, pReq->compactId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1; + if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSVKillCompactReq(void *buf, int32_t bufLen, SVKillCompactReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeI32(&decoder, &pReq->compactId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; + tEndDecode(&decoder); tDecoderClear(&decoder); return 0; diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c index 737a0338ef..3404b6ed0e 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmHandle.c @@ -192,6 +192,8 @@ SArray *mmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_VIEW, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_VIEW_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_COMPACT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER; diff --git a/source/dnode/mnode/impl/inc/mndCompact.h b/source/dnode/mnode/impl/inc/mndCompact.h new file mode 100644 index 0000000000..4683300237 --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndCompact.h @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_COMPACT_H_ +#define _TD_MND_COMPACT_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitCompact(SMnode *pMnode); +void mndCleanupCompact(SMnode *pMnode); + +void tFreeCompactObj(SCompactObj *pCompact); +int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj); +int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj); + +SSdbRaw* mndCompactActionEncode(SCompactObj *pCompact); +SSdbRow* mndCompactActionDecode(SSdbRaw *pRaw); + +int32_t mndCompactActionInsert(SSdb *pSdb, SCompactObj *pCompact); +int32_t mndCompactActionDelete(SSdb *pSdb, SCompactObj *pCompact); +int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj *pNewCompact); + +int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompact, SDbObj *pDb, SCompactDbRsp *rsp); + +int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); + +int32_t mndProcessKillCompactReq(SRpcMsg *pReq); + +int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq); + +SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId); +void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact); + +void mndCompactUpdate(SMnode *pMnode, SCompactObj *pCompact); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_COMPACT_H_*/ diff --git a/source/dnode/mnode/impl/inc/mndCompactDetail.h b/source/dnode/mnode/impl/inc/mndCompactDetail.h new file mode 100644 index 0000000000..601af3b64b --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndCompactDetail.h @@ -0,0 +1,48 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_COMPACT_DETAIL_H_ +#define _TD_MND_COMPACT_DETAIL_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitCompactDetail(SMnode *pMnode); +void mndCleanupCompactDetail(SMnode *pMnode); + +void tFreeCompactDetailObj(SCompactDetailObj *pCompact); +int32_t tSerializeSCompactDetailObj(void *buf, int32_t bufLen, const SCompactDetailObj *pObj); +int32_t tDeserializeSCompactDetailObj(void *buf, int32_t bufLen, SCompactDetailObj *pObj); + +SSdbRaw* mndCompactDetailActionEncode(SCompactDetailObj *pCompact); +SSdbRow* mndCompactDetailActionDecode(SSdbRaw *pRaw); + +int32_t mndCompactDetailActionInsert(SSdb *pSdb, SCompactDetailObj *pCompact); +int32_t mndCompactDetailActionDelete(SSdb *pSdb, SCompactDetailObj *pCompact); +int32_t mndCompactDetailActionUpdate(SSdb *pSdb, SCompactDetailObj *pOldCompact, SCompactDetailObj *pNewCompact); + +int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompact, SVgObj *pVgroup, + SVnodeGid *pVgid, int32_t index); + +int32_t mndRetrieveCompactDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_COMPACT_DETAIL_H_*/ diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index efa99db74b..2219202ed1 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -737,6 +737,22 @@ int32_t tEncodeSViewObj(SEncoder* pEncoder, const SViewObj* pObj); int32_t tDecodeSViewObj(SDecoder* pDecoder, SViewObj* pObj, int32_t sver); void tFreeSViewObj(SViewObj* pObj); +typedef struct { + int32_t compactDetailId; + int32_t compactId; + int32_t vgId; + int32_t dnodeId; + int32_t numberFileset; + int32_t finished; + int64_t startTime; +}SCompactDetailObj; + +typedef struct { + int32_t compactId; + char dbname[TSDB_TABLE_FNAME_LEN]; + int64_t startTime; + SArray* compactDetail; +} SCompactObj; #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndCompact.c b/source/dnode/mnode/impl/src/mndCompact.c new file mode 100644 index 0000000000..5553d66f62 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndCompact.c @@ -0,0 +1,694 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "mndCompact.h" +#include "mndTrans.h" +#include "mndShow.h" +#include "mndDb.h" +#include "mndCompactDetail.h" +#include "mndVgroup.h" +#include "tmsgcb.h" + +#define MND_COMPACT_VER_NUMBER 1 + +static int32_t mndProcessCompactTimer(SRpcMsg *pReq); + +int32_t mndInitCompact(SMnode *pMnode) { + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT, mndRetrieveCompact); + mndSetMsgHandle(pMnode, TDMT_MND_KILL_COMPACT, mndProcessKillCompactReq); + mndSetMsgHandle(pMnode, TDMT_VND_QUERY_COMPACT_PROGRESS_RSP, mndProcessQueryCompactRsp); + + mndSetMsgHandle(pMnode, TDMT_MND_COMPACT_TIMER, mndProcessCompactTimer); + + SSdbTable table = { + .sdbType = SDB_COMPACT, + .keyType = SDB_KEY_INT32, + .encodeFp = (SdbEncodeFp)mndCompactActionEncode, + .decodeFp = (SdbDecodeFp)mndCompactActionDecode, + .insertFp = (SdbInsertFp)mndCompactActionInsert, + .updateFp = (SdbUpdateFp)mndCompactActionUpdate, + .deleteFp = (SdbDeleteFp)mndCompactActionDelete, + }; + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupCompact(SMnode *pMnode) { + mDebug("mnd compact cleanup"); +} + +void tFreeCompactObj(SCompactObj *pCompact) { + //int32_t size = taosArrayGetSize(pCompact->compactDetail); + + //for (int32_t i = 0; i < size; ++i) { + // SCompactDetailObj *detail = taosArrayGet(pCompact->compactDetail, i); + // taosMemoryFree(detail); + //} + + //taosArrayDestroy(pCompact->compactDetail); +} + +int32_t tSerializeSCompactObj(void *buf, int32_t bufLen, const SCompactObj *pObj) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI32(&encoder, pObj->compactId) < 0) return -1; + if (tEncodeCStr(&encoder, pObj->dbname) < 0) return -1; + if (tEncodeI64(&encoder, pObj->startTime) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSCompactObj(void *buf, int32_t bufLen, SCompactObj *pObj) { + int8_t ex = 0; + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeI32(&decoder, &pObj->compactId) < 0) return -1; + if (tDecodeCStrTo(&decoder, pObj->dbname) < 0) return -1; + if (tDecodeI64(&decoder, &pObj->startTime) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +SSdbRaw *mndCompactActionEncode(SCompactObj *pCompact) { + terrno = TSDB_CODE_SUCCESS; + + void *buf = NULL; + SSdbRaw *pRaw = NULL; + + int32_t tlen = tSerializeSCompactObj(NULL, 0, pCompact); + if (tlen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + int32_t size = sizeof(int32_t) + tlen; + pRaw = sdbAllocRaw(SDB_COMPACT, MND_COMPACT_VER_NUMBER, size); + if (pRaw == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + tlen = tSerializeSCompactObj(buf, tlen, pCompact); + if (tlen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, tlen, OVER); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER); + SDB_SET_DATALEN(pRaw, dataPos, OVER); + + +OVER: + taosMemoryFreeClear(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("compact:%" PRId32 ", failed to encode to raw:%p since %s", pCompact->compactId, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("compact:%" PRId32 ", encode to raw:%p, row:%p", pCompact->compactId, pRaw, pCompact); + return pRaw; +} + +SSdbRow *mndCompactActionDecode(SSdbRaw *pRaw) { + SSdbRow *pRow = NULL; + SCompactObj *pCompact = NULL; + void *buf = NULL; + terrno = TSDB_CODE_SUCCESS; + + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) { + goto OVER; + } + + if (sver != MND_COMPACT_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + mError("view read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER); + goto OVER; + } + + pRow = sdbAllocRow(sizeof(SCompactObj)); + if (pRow == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + pCompact = sdbGetRowObj(pRow); + if (pCompact == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + int32_t tlen; + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, dataPos, &tlen, OVER); + buf = taosMemoryMalloc(tlen + 1); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER); + + if (tDeserializeSCompactObj(buf, tlen, pCompact) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + //taosInitRWLatch(&pView->lock); + +OVER: + taosMemoryFreeClear(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("compact:%" PRId32 ", failed to decode from raw:%p since %s", pCompact->compactId, pRaw, terrstr()); + taosMemoryFreeClear(pRow); + return NULL; + } + + mTrace("compact:%" PRId32 ", decode from raw:%p, row:%p", pCompact->compactId, pRaw, pCompact); + return pRow; +} + +int32_t mndCompactActionInsert(SSdb *pSdb, SCompactObj *pCompact) { + mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId); + return 0; +} + +int32_t mndCompactActionDelete(SSdb *pSdb, SCompactObj *pCompact) { + mTrace("compact:%" PRId32 ", perform insert action", pCompact->compactId); + tFreeCompactObj(pCompact); + return 0; +} + +int32_t mndCompactActionUpdate(SSdb *pSdb, SCompactObj *pOldCompact, SCompactObj *pNewCompact) { + mTrace("compact:%" PRId32 ", perform update action, old row:%p new row:%p", + pOldCompact->compactId, pOldCompact, pNewCompact); + + //TSWAP(pOldCompact->compactDetail, pNewCompact->compactDetail); + + return 0; +} + +int32_t mndAddCompactToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompact, SDbObj *pDb, SCompactDbRsp *rsp){ + //char uuid[40]; + //int32_t code = taosGetSystemUUID(uuid, 40); + //if (code != 0) { + // strcpy(uuid, "tdengine3.0"); + // mError("failed to get name from system, set to default val %s", uuid); + //} + + pCompact->compactId = tGenIdPI32(); //mndGenerateUid(uuid, TSDB_CLUSTER_ID_LEN); + //pCompact->compactId = (pCompact->compactId >= 0 ? pCompact->compactId : -pCompact->compactId); + + strcpy(pCompact->dbname, pDb->name); + + pCompact->startTime = taosGetTimestampMs(); + + SSdbRaw *pVgRaw = mndCompactActionEncode(pCompact); + if (pVgRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) { + sdbFreeRaw(pVgRaw); + return -1; + } + (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); + + rsp->compactId = pCompact->compactId; + + return 0; +} + +int32_t mndRetrieveCompact(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows){ + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SCompactObj *pCompact = NULL; + char *sep = NULL; + SDbObj *pDb = NULL; + + if (strlen(pShow->db) > 0) { + sep = strchr(pShow->db, '.'); + if (sep && ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) { + sep++; + } else { + pDb = mndAcquireDb(pMnode, pShow->db); + if (pDb == NULL) return terrno; + } + } + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_COMPACT, pShow->pIter, (void **)&pCompact); + if (pShow->pIter == NULL) break; + + SColumnInfoData *pColInfo; + SName n; + int32_t cols = 0; + + char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->compactId, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + if (pDb != NULL || !IS_SYS_DBNAME(pCompact->dbname)) { + SName name = {0}; + tNameFromString(&name, pCompact->dbname, T_NAME_ACCT | T_NAME_DB); + tNameGetDbName(&name, varDataVal(tmpBuf)); + } else { + strncpy(varDataVal(tmpBuf), pCompact->dbname, strlen(pCompact->dbname) + 1); + } + varDataSetLen(tmpBuf, strlen(varDataVal(tmpBuf))); + colDataSetVal(pColInfo, numOfRows, (const char *)tmpBuf, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pCompact->startTime, false); + + numOfRows++; + sdbRelease(pSdb, pCompact); + } + + pShow->numOfRows += numOfRows; + mndReleaseDb(pMnode, pDb); + return numOfRows; +} + +SCompactObj *mndAcquireCompact(SMnode *pMnode, int64_t compactId) { + SSdb *pSdb = pMnode->pSdb; + SCompactObj *pCompact = sdbAcquire(pSdb, SDB_COMPACT, &compactId); + if (pCompact == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { + terrno = TSDB_CODE_SUCCESS; + } + return pCompact; +} + +void mndReleaseCompact(SMnode *pMnode, SCompactObj *pCompact) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pCompact); +} + +static void *mndBuildKillCompactReq(SMnode *pMnode, SVgObj *pVgroup, int32_t *pContLen, int32_t compactId) { + SVKillCompactReq req = {0}; + req.compactId = compactId; + req.vgId = pVgroup->vgId; + //req.dnodeId = pVgroup->; + + mInfo("vgId:%d, build compact vnode config req", pVgroup->vgId); + int32_t contLen = tSerializeSVKillCompactReq(NULL, 0, &req); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + contLen += sizeof(SMsgHead); + + void *pReq = taosMemoryMalloc(contLen); + if (pReq == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + + SMsgHead *pHead = pReq; + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pVgroup->vgId); + + tSerializeSVKillCompactReq((char *)pReq + sizeof(SMsgHead), contLen, &req); + *pContLen = contLen; + return pReq; +} + +static int32_t mndAddKillCompactAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVgroup, int32_t compactId) { + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgroup); + + int32_t contLen = 0; + void *pReq = mndBuildKillCompactReq(pMnode, pVgroup, &contLen, compactId); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_VND_KILL_COMPACT; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + +static int32_t mndKillCompact(SMnode *pMnode, SRpcMsg *pReq, SCompactObj *pCompact) { + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "kill-compact"); + if (pTrans == NULL) { + mError("compact:%" PRId32 ", failed to drop since %s" , pCompact->compactId, terrstr()); + return -1; + } + mInfo("trans:%d, used to kill compact:%" PRId32, pTrans->id, pCompact->compactId); + + SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); + + void *pIter = NULL; + while (1) { + SCompactDetailObj *pDetail = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); + if (pIter == NULL) break; + + if (pDetail->compactId == pCompact->compactId) { + SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId); + if(pVgroup == NULL){ + mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + if(mndAddKillCompactAction(pMnode, pTrans, pVgroup, pCompact->compactId) != 0){ + mError("trans:%d, failed to append redo action since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndReleaseVgroup(pMnode, pVgroup); + + SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); + } + + sdbRelease(pMnode->pSdb, pDetail); + } + + //mndUserRemoveView(pMnode, pTrans, pView->fullname); + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; +} + +static int32_t mndUpdateCompactProgress(SMnode *pMnode, SRpcMsg *pReq, int32_t compactId, SQueryCompactProgressRsp* rsp) { + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-compact-progress"); + if (pTrans == NULL) { + mError("compact:%" PRId32 ", failed to create since %s" , compactId, terrstr()); + return -1; + } + mInfo("trans:%d, used to update compact progress:%" PRId32, pTrans->id, compactId); + + SCompactObj *pCompact = mndAcquireCompact(pMnode, compactId); + + void *pIter = NULL; + while (1) { + SCompactDetailObj *pDetail = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); + if (pIter == NULL) break; + + if (pDetail->compactId == pCompact->compactId && pDetail->vgId == rsp->vgId && pDetail->dnodeId == rsp->dnodeId) { + pDetail->numberFileset = rsp->numberFileset; + pDetail->finished = rsp->finished; + + SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + } + + sdbRelease(pMnode->pSdb, pDetail); + } + + bool allFinished = true; + while (1) { + SCompactDetailObj *pDetail = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); + if (pIter == NULL) break; + + if (pDetail->numberFileset != pDetail->finished) { + allFinished = false; + sdbRelease(pMnode->pSdb, pDetail); + break; + } + + sdbRelease(pMnode->pSdb, pDetail); + } + + if(allFinished){ + while (1) { + SCompactDetailObj *pDetail = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); + if (pIter == NULL) break; + + if (pDetail->compactId == pCompact->compactId) { + SSdbRaw *pCommitRaw = mndCompactDetailActionEncode(pDetail); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); + } + + sdbRelease(pMnode->pSdb, pDetail); + } + + SSdbRaw *pCommitRaw = mndCompactActionEncode(pCompact); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + (void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); + } + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; +} + +int32_t mndProcessKillCompactReq(SRpcMsg *pReq){ + SKillCompactReq killCompactReq = {0}; + if (tDeserializeSKillCompactReq(pReq->pCont, pReq->contLen, &killCompactReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + mInfo("start to kill compact:%" PRId32, killCompactReq.compactId); + + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SCompactObj *pCompact = mndAcquireCompact(pMnode, killCompactReq.compactId); + + /* + if (0 != mndCheckViewPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_VIEW, killCompactReq.compactId)) { + goto _OVER; + } + */ + + if (mndKillCompact(pMnode, pReq, pCompact) < 0) { + goto _OVER; + } + + code = TSDB_CODE_ACTION_IN_PROGRESS; + + //mndLogDropViewAudit(pReq, pMnode, killCompactReq); + +_OVER: + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("failed to kill compact %" PRId32 " since %s", killCompactReq.compactId, terrstr()); + } + + sdbRelease(pMnode->pSdb, pCompact); + + return code; +} + +int32_t mndProcessQueryCompactRsp(SRpcMsg *pReq){ + SQueryCompactProgressRsp req = {0}; + if (tDeserializeSQueryCompactProgressRsp(pReq->pCont, pReq->contLen, &req) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + mInfo("numberFileset:%d, finished:%d", req.numberFileset, req.finished); + + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + + + mndUpdateCompactProgress(pMnode, pReq, req.compactId, &req); + + return 0; +} + +void mndCompactUpdate(SMnode *pMnode, SCompactObj *pCompact){ + void *pIter = NULL; + + while (1) { + SCompactDetailObj *pDetail = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT_DETAIL, pIter, (void **)&pDetail); + if (pIter == NULL) break; + + if (pDetail->compactId == pCompact->compactId) { + SEpSet epSet = {0}; + + SVgObj *pVgroup = mndAcquireVgroup(pMnode, pDetail->vgId); + if(pVgroup){ + epSet = mndGetVgroupEpset(pMnode, pVgroup); + mndReleaseVgroup(pMnode, pVgroup); + } + + SQueryCompactProgressReq req; + req.compactId = pDetail->compactId; + req.vgId = pDetail->vgId; + req.dnodeId = pDetail->dnodeId; + + int32_t contLen = tSerializeSQueryCompactProgressReq(NULL, 0, &req); + if (contLen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + continue; + } + + mInfo("tSerializeSQueryCompactProgressReq contLen:%d", contLen); + + contLen += sizeof(SMsgHead); + + SMsgHead *pHead = rpcMallocCont(contLen); + if (pHead == NULL) { + sdbCancelFetch(pMnode->pSdb, pDetail); + sdbRelease(pMnode->pSdb, pDetail); + continue; + } + + pHead->contLen = htonl(contLen); + pHead->vgId = htonl(pDetail->vgId); + + tSerializeSQueryCompactProgressReq((char *)pHead + sizeof(SMsgHead), contLen - sizeof(SMsgHead), &req); + + //only send + SRpcMsg rpcMsg = {.msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, + .contLen = contLen}; + + rpcMsg.pCont = rpcMallocCont(contLen); + if (rpcMsg.pCont == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + //return -1; + } + //rpcMsg.info.traceId.rootId = pTrans->mTraceId; + + memcpy(rpcMsg.pCont, pHead, contLen); + + char detail[1024] = {0}; + int32_t len = snprintf(detail, sizeof(detail), "msgType:%s numOfEps:%d inUse:%d", TMSG_INFO(TDMT_VND_QUERY_COMPACT_PROGRESS), + epSet.numOfEps, epSet.inUse); + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + len += snprintf(detail + len, sizeof(detail) - len, " ep:%d-%s:%u", i, epSet.eps[i].fqdn, + epSet.eps[i].port); + } + + mInfo("%s", detail); + + int32_t code = tmsgSendReq(&epSet, &rpcMsg); + + //send and receive + /* + SRpcMsg rpcMsg = {.pCont = pReq, + .contLen = contLen, + .msgType = TDMT_VND_QUERY_COMPACT_PROGRESS, + .info.ahandle = (void *)0x9527, + .info.refId = 0, + .info.noResp = 0}; + SRpcMsg rpcRsp = {0}; + + rpcSendRecvWithTimeout(pMnode->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp, 5000); + if (rpcRsp.code != 0) { + SQueryCompactProgressRsp rsp; + if (tDeserializeSQueryCompactProgressRsp(rpcRsp.pCont, rpcRsp.contLen, &rsp) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + } + } + */ + } + + sdbRelease(pMnode->pSdb, pDetail); + } +} + +void mndCompactPullup(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + SArray *pArray = taosArrayInit(sdbGetSize(pSdb, SDB_COMPACT), sizeof(int32_t)); + if (pArray == NULL) return; + + void *pIter = NULL; + while (1) { + SCompactObj *pCompact = NULL; + pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact); + if (pIter == NULL) break; + taosArrayPush(pArray, &pCompact->compactId); + sdbRelease(pSdb, pCompact); + } + + //taosArraySort(pArray, (__compar_fn_t)mndCompareTransId); + + for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) { + int32_t *pCompactId = taosArrayGet(pArray, i); + SCompactObj *pCompact = mndAcquireCompact(pMnode, *pCompactId); + if (pCompact != NULL) { + mndCompactUpdate(pMnode, pCompact); + } + mndReleaseCompact(pMnode, pCompact); + } + taosArrayDestroy(pArray); +} + +static int32_t mndProcessCompactTimer(SRpcMsg *pReq) { + mTrace("start to process compact timer"); + mndCompactPullup(pReq->info.node); + return 0; +} diff --git a/source/dnode/mnode/impl/src/mndCompactDetail.c b/source/dnode/mnode/impl/src/mndCompactDetail.c new file mode 100644 index 0000000000..67e632fd57 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndCompactDetail.c @@ -0,0 +1,289 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "mndCompactDetail.h" +#include "mndTrans.h" +#include "mndShow.h" +#include "mndDb.h" + +#define MND_COMPACT_VER_NUMBER 1 + +int32_t mndInitCompactDetail(SMnode *pMnode) { + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_COMPACT_DETAIL, mndRetrieveCompactDetail); + + SSdbTable table = { + .sdbType = SDB_COMPACT_DETAIL, + .keyType = SDB_KEY_INT32, + .encodeFp = (SdbEncodeFp)mndCompactDetailActionEncode, + .decodeFp = (SdbDecodeFp)mndCompactDetailActionDecode, + .insertFp = (SdbInsertFp)mndCompactDetailActionInsert, + .updateFp = (SdbUpdateFp)mndCompactDetailActionUpdate, + .deleteFp = (SdbDeleteFp)mndCompactDetailActionDelete, + }; + + return sdbSetTable(pMnode->pSdb, table); +} + +void mndCleanupCompactDetail(SMnode *pMnode) { + mDebug("mnd compact detail cleanup"); +} + +int32_t mndRetrieveCompactDetail(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows){ + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + SCompactDetailObj *pCompactDetail = NULL; + char *sep = NULL; + SDbObj *pDb = NULL; + + if (strlen(pShow->db) > 0) { + sep = strchr(pShow->db, '.'); + if (sep && ((0 == strcmp(sep + 1, TSDB_INFORMATION_SCHEMA_DB) || (0 == strcmp(sep + 1, TSDB_PERFORMANCE_SCHEMA_DB))))) { + sep++; + } else { + pDb = mndAcquireDb(pMnode, pShow->db); + if (pDb == NULL) return terrno; + } + } + + while(numOfRows < rows){ + pShow->pIter = sdbFetch(pSdb, SDB_COMPACT_DETAIL, pShow->pIter, (void **)&pCompactDetail); + if (pShow->pIter == NULL) break; + + SColumnInfoData *pColInfo; + SName n; + int32_t cols = 0; + + char tmpBuf[TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE] = {0}; + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->compactId, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->vgId, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->dnodeId, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->numberFileset, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->finished, false); + + pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); + colDataSetVal(pColInfo, numOfRows, (const char *)&pCompactDetail->startTime, false); + + numOfRows++; + sdbRelease(pSdb, pCompactDetail); + } + + pShow->numOfRows += numOfRows; + mndReleaseDb(pMnode, pDb); + return numOfRows; +} + +void tFreeCompactDetailObj(SCompactDetailObj *pCompact) { +} + +int32_t tSerializeSCompactDetailObj(void *buf, int32_t bufLen, const SCompactDetailObj *pObj) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + + if (tEncodeI32(&encoder, pObj->compactDetailId) < 0) return -1; + if (tEncodeI32(&encoder, pObj->compactId) < 0) return -1; + if (tEncodeI32(&encoder, pObj->vgId) < 0) return -1; + if (tEncodeI32(&encoder, pObj->dnodeId) < 0) return -1; + if (tEncodeI32(&encoder, pObj->numberFileset) < 0) return -1; + if (tEncodeI32(&encoder, pObj->finished) < 0) return -1; + if (tEncodeI64(&encoder, pObj->startTime) < 0) return -1; + + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSCompactDetailObj(void *buf, int32_t bufLen, SCompactDetailObj *pObj) { + int8_t ex = 0; + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + + if (tDecodeI32(&decoder, &pObj->compactDetailId) < 0) return -1; + if (tDecodeI32(&decoder, &pObj->compactId) < 0) return -1; + if (tDecodeI32(&decoder, &pObj->vgId) < 0) return -1; + if (tDecodeI32(&decoder, &pObj->dnodeId) < 0) return -1; + if (tDecodeI32(&decoder, &pObj->numberFileset) < 0) return -1; + if (tDecodeI32(&decoder, &pObj->finished) < 0) return -1; + if (tDecodeI64(&decoder, &pObj->startTime) < 0) return -1; + + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + +SSdbRaw *mndCompactDetailActionEncode(SCompactDetailObj *pCompact) { + terrno = TSDB_CODE_SUCCESS; + + void *buf = NULL; + SSdbRaw *pRaw = NULL; + + int32_t tlen = tSerializeSCompactDetailObj(NULL, 0, pCompact); + if (tlen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + int32_t size = sizeof(int32_t) + tlen; + pRaw = sdbAllocRaw(SDB_COMPACT_DETAIL, MND_COMPACT_VER_NUMBER, size); + if (pRaw == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + buf = taosMemoryMalloc(tlen); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + tlen = tSerializeSCompactDetailObj(buf, tlen, pCompact); + if (tlen < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, tlen, OVER); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, OVER); + SDB_SET_DATALEN(pRaw, dataPos, OVER); + + +OVER: + taosMemoryFreeClear(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("compact detail:%" PRId32 ", failed to encode to raw:%p since %s", pCompact->compactId, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("compact detail:%" PRId32 ", encode to raw:%p, row:%p", pCompact->compactId, pRaw, pCompact); + return pRaw; +} + +SSdbRow *mndCompactDetailActionDecode(SSdbRaw *pRaw) { + SSdbRow *pRow = NULL; + SCompactDetailObj *pCompact = NULL; + void *buf = NULL; + terrno = TSDB_CODE_SUCCESS; + + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) { + goto OVER; + } + + if (sver != MND_COMPACT_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + mError("compact detail read invalid ver, data ver: %d, curr ver: %d", sver, MND_COMPACT_VER_NUMBER); + goto OVER; + } + + pRow = sdbAllocRow(sizeof(SCompactObj)); + if (pRow == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + pCompact = sdbGetRowObj(pRow); + if (pCompact == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + int32_t tlen; + int32_t dataPos = 0; + SDB_GET_INT32(pRaw, dataPos, &tlen, OVER); + buf = taosMemoryMalloc(tlen + 1); + if (buf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + SDB_GET_BINARY(pRaw, dataPos, buf, tlen, OVER); + + if (tDeserializeSCompactDetailObj(buf, tlen, pCompact) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto OVER; + } + + //taosInitRWLatch(&pView->lock); + +OVER: + taosMemoryFreeClear(buf); + if (terrno != TSDB_CODE_SUCCESS) { + mError("compact detail:%" PRId32 ", failed to decode from raw:%p since %s", pCompact->compactId, pRaw, terrstr()); + taosMemoryFreeClear(pRow); + return NULL; + } + + mTrace("compact detail:%" PRId32 ", decode from raw:%p, row:%p", pCompact->compactId, pRaw, pCompact); + return pRow; +} + +int32_t mndCompactDetailActionInsert(SSdb *pSdb, SCompactDetailObj *pCompact) { + mTrace("compact detail:%" PRId32 ", perform insert action", pCompact->compactId); + return 0; +} + +int32_t mndCompactDetailActionDelete(SSdb *pSdb, SCompactDetailObj *pCompact) { + mTrace("compact detail:%" PRId32 ", perform insert action", pCompact->compactId); + tFreeCompactDetailObj(pCompact); + return 0; +} + +int32_t mndCompactDetailActionUpdate(SSdb *pSdb, SCompactDetailObj *pOldCompact, SCompactDetailObj *pNewCompact) { + mTrace("compact detail:%" PRId32 ", perform update action, old row:%p new row:%p", + pOldCompact->compactId, pOldCompact, pNewCompact); + + + pOldCompact->numberFileset = pNewCompact->numberFileset; + pOldCompact->finished = pNewCompact->finished; + + return 0; +} + +int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* pCompact, SVgObj *pVgroup, + SVnodeGid *pVgid, int32_t index){ + SCompactDetailObj compactDetail = {0}; + compactDetail.compactDetailId = index; + compactDetail.compactId = pCompact->compactId; + compactDetail.vgId = pVgroup->vgId; + compactDetail.dnodeId = pVgid->dnodeId; + compactDetail.startTime = taosGetTimestampMs(); + + SSdbRaw *pVgRaw = mndCompactDetailActionEncode(&compactDetail); + if (pVgRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) { + sdbFreeRaw(pVgRaw); + return -1; + } + (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); + + return 0; +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 8d00dfefb6..8c69706dd4 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -42,6 +42,8 @@ #include "mndUser.h" #include "mndVgroup.h" #include "mndView.h" +#include "mndCompact.h" +#include "mndCompactDetail.h" static inline int32_t mndAcquireRpc(SMnode *pMnode) { int32_t code = 0; @@ -112,6 +114,16 @@ static void mndPullupTrans(SMnode *pMnode) { } } +static void mndPullupCompacts(SMnode *pMnode) { + mTrace("pullup compact timer msg"); + int32_t contLen = 0; + void *pReq = mndBuildTimerMsg(&contLen); + if (pReq != NULL) { + SRpcMsg rpcMsg = {.msgType = TDMT_MND_COMPACT_TIMER, .pCont = pReq, .contLen = contLen}; + tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg); + } +} + static void mndPullupTtl(SMnode *pMnode) { mTrace("pullup ttl"); int32_t contLen = 0; @@ -278,6 +290,10 @@ static void *mndThreadFp(void *param) { mndPullupTrans(pMnode); } + if (sec % tsCompactPullupInterval == 0) { + mndPullupCompacts(pMnode); + } + if (sec % tsMqRebalanceInterval == 0) { mndCalMqRebalance(pMnode); } @@ -449,6 +465,8 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-func", mndInitFunc, mndCleanupFunc) != 0) return -1; if (mndAllocStep(pMnode, "mnode-view", mndInitView, mndCleanupView) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-compact", mndInitCompact, mndCleanupCompact) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-compact-detail", mndInitCompactDetail, mndCleanupCompactDetail) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sdb", mndOpenSdb, NULL) != 0) return -1; if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1; if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1; @@ -680,7 +698,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { _OVER: if (pMsg->msgType == TDMT_MND_TMQ_TIMER || pMsg->msgType == TDMT_MND_TELEM_TIMER || pMsg->msgType == TDMT_MND_TRANS_TIMER || pMsg->msgType == TDMT_MND_TTL_TIMER || - pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER) { + pMsg->msgType == TDMT_MND_TRIM_DB_TIMER || pMsg->msgType == TDMT_MND_UPTIME_TIMER || + pMsg->msgType == TDMT_MND_COMPACT_TIMER) { mTrace("timer not process since mnode restored:%d stopped:%d, sync restored:%d role:%s ", pMnode->restored, pMnode->stopped, state.restored, syncStr(state.state)); return -1; diff --git a/source/dnode/mnode/impl/src/mndShow.c b/source/dnode/mnode/impl/src/mndShow.c index bdf7df25e1..dd7267451e 100644 --- a/source/dnode/mnode/impl/src/mndShow.c +++ b/source/dnode/mnode/impl/src/mndShow.c @@ -118,6 +118,10 @@ static int32_t convertToRetrieveType(char *name, int32_t len) { type = TSDB_MGMT_TABLE_PRIVILEGES; } else if (strncasecmp(name, TSDB_INS_TABLE_VIEWS, len) == 0) { type = TSDB_MGMT_TABLE_VIEWS; + } else if (strncasecmp(name, TSDB_INS_TABLE_COMPACTS, len) == 0) { + type = TSDB_MGMT_TABLE_COMPACT; + } else if (strncasecmp(name, TSDB_INS_TABLE_COMPACT_DETAILS, len) == 0) { + type = TSDB_MGMT_TABLE_COMPACT_DETAIL; } else { mError("invalid show name:%s len:%d", name, len); } diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index ddde645fae..e6372bddc1 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -149,7 +149,9 @@ typedef enum { SDB_FUNC = 20, SDB_IDX = 21, SDB_VIEW = 22, - SDB_MAX = 23 + SDB_COMPACT = 23, + SDB_COMPACT_DETAIL = 24, + SDB_MAX = 25 } ESdbType; typedef struct SSdbRaw { diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index df5c399da8..109a3ca211 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -64,7 +64,11 @@ const char *sdbTableName(ESdbType type) { return "idx"; case SDB_VIEW: return "view"; - default: + case SDB_COMPACT: + return "compact"; + case SDB_COMPACT_DETAIL: + return "compact_detail"; + default: return "undefine"; } }