Merge pull request #6463 from taosdata/feature/TD-3178

[TD-3178]: compact db
This commit is contained in:
haojun Liao 2021-06-16 10:17:56 +08:00 committed by GitHub
commit cd7b241e95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 2238 additions and 1991 deletions

View File

@ -111,6 +111,7 @@ static int32_t validateDNodeConfig(SMiscInfo* pOptions);
static int32_t validateLocalConfig(SMiscInfo* pOptions);
static int32_t validateColumnName(char* name);
static int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType);
static int32_t setCompactVnodeInfo(SSqlObj* pSql, struct SSqlInfo* pInfo);
static bool validateOneTags(SSqlCmd* pCmd, TAOS_FIELD* pTagField);
static bool hasTimestampForPointInterpQuery(SQueryInfo* pQueryInfo);
@ -506,7 +507,7 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
break;
}
case TSDB_SQL_CREATE_DNODE: {
const char* msg = "invalid host name (ip address)";
@ -802,7 +803,13 @@ int32_t tscValidateSqlInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
break;
}
case TSDB_SQL_COMPACT_VNODE:{
const char* msg = "invalid compact";
if (setCompactVnodeInfo(pSql, pInfo) != TSDB_CODE_SUCCESS) {
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), msg);
}
break;
}
default:
return invalidOperationMsg(tscGetErrorMsgPayload(pCmd), "not support sql expression");
}
@ -2963,7 +2970,12 @@ int32_t setKillInfo(SSqlObj* pSql, struct SSqlInfo* pInfo, int32_t killType) {
return TSDB_CODE_SUCCESS;
}
static int32_t setCompactVnodeInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
SSqlCmd* pCmd = &pSql->cmd;
pCmd->command = pInfo->type;
return TSDB_CODE_SUCCESS;
}
bool validateIpAddress(const char* ip, size_t size) {
char tmp[128] = {0}; // buffer to build null-terminated string
assert(size < 128);

View File

@ -47,6 +47,31 @@ static int32_t getWaitingTimeInterval(int32_t count) {
return initial * ((2u)<<(count - 2));
}
static int32_t vgIdCompare(const void *lhs, const void *rhs) {
int32_t left = *(int32_t *)lhs;
int32_t right = *(int32_t *)rhs;
if (left == right) {
return 0;
} else {
return left > right ? 1 : -1;
}
}
static int32_t removeDupVgid(int32_t *src, int32_t sz) {
if (src == NULL || sz <= 0) {
return 0;
}
qsort(src, sz, sizeof(src[0]), vgIdCompare);
int32_t ret = 1;
for (int i = 1; i < sz; i++) {
if (src[i] != src[i - 1]) {
src[ret++] = src[i];
}
}
return ret;
}
static void tscSetDnodeEpSet(SRpcEpSet* pEpSet, SVgroupInfo* pVgroupInfo) {
assert(pEpSet != NULL && pVgroupInfo != NULL && pVgroupInfo->numOfEps > 0);
@ -1515,6 +1540,60 @@ int tscAlterDbMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
return TSDB_CODE_SUCCESS;
}
int tscBuildCompactMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
if (pInfo->list == NULL || taosArrayGetSize(pInfo->list) <= 0) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
STscObj *pObj = pSql->pTscObj;
SSqlCmd *pCmd = &pSql->cmd;
SArray *pList = pInfo->list;
int32_t size = (int32_t)taosArrayGetSize(pList);
int32_t *result = malloc(sizeof(int32_t) * size);
if (result == NULL) {
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < size; i++) {
tSqlExprItem* pSub = taosArrayGet(pList, i);
tVariant* pVar = &pSub->pNode->value;
if (pVar->nType >= TSDB_DATA_TYPE_TINYINT && pVar->nType <= TSDB_DATA_TYPE_BIGINT) {
result[i] = (int32_t)(pVar->i64);
} else {
free(result);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
}
int count = removeDupVgid(result, size);
pCmd->payloadLen = sizeof(SCompactMsg) + count * sizeof(int32_t);
pCmd->msgType = TSDB_MSG_TYPE_CM_COMPACT_VNODE;
if (TSDB_CODE_SUCCESS != tscAllocPayload(pCmd, pCmd->payloadLen)) {
tscError("0x%"PRIx64" failed to malloc for query msg", pSql->self);
free(result);
return TSDB_CODE_TSC_OUT_OF_MEMORY;
}
SCompactMsg *pCompactMsg = (SCompactMsg *)pCmd->payload;
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, 0);
if (tNameIsEmpty(&pTableMetaInfo->name)) {
pthread_mutex_lock(&pObj->mutex);
tstrncpy(pCompactMsg->db, pObj->db, sizeof(pCompactMsg->db));
pthread_mutex_unlock(&pObj->mutex);
} else {
tNameGetFullDbName(&pTableMetaInfo->name, pCompactMsg->db);
}
pCompactMsg->numOfVgroup = htons(count);
for (int32_t i = 0; i < count; i++) {
pCompactMsg->vgid[i] = htons(result[i]);
}
free(result);
return TSDB_CODE_SUCCESS;
}
int tscBuildRetrieveFromMgmtMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
SSqlCmd *pCmd = &pSql->cmd;
@ -2268,6 +2347,10 @@ int tscProcessAlterDbMsgRsp(SSqlObj *pSql) {
UNUSED(pSql);
return 0;
}
int tscProcessCompactRsp(SSqlObj *pSql) {
UNUSED(pSql);
return TSDB_CODE_SUCCESS;
}
int tscProcessShowCreateRsp(SSqlObj *pSql) {
return tscLocalResultCommonBuilder(pSql, 1);
@ -2654,6 +2737,7 @@ void tscInitMsgsFp() {
tscBuildMsg[TSDB_SQL_ALTER_TABLE] = tscBuildAlterTableMsg;
tscBuildMsg[TSDB_SQL_UPDATE_TAGS_VAL] = tscBuildUpdateTagMsg;
tscBuildMsg[TSDB_SQL_ALTER_DB] = tscAlterDbMsg;
tscBuildMsg[TSDB_SQL_COMPACT_VNODE] = tscBuildCompactMsg;
tscBuildMsg[TSDB_SQL_CONNECT] = tscBuildConnectMsg;
tscBuildMsg[TSDB_SQL_USE_DB] = tscBuildUseDbMsg;
@ -2694,6 +2778,7 @@ void tscInitMsgsFp() {
tscProcessMsgRsp[TSDB_SQL_ALTER_TABLE] = tscProcessAlterTableMsgRsp;
tscProcessMsgRsp[TSDB_SQL_ALTER_DB] = tscProcessAlterDbMsgRsp;
tscProcessMsgRsp[TSDB_SQL_COMPACT_VNODE] = tscProcessCompactRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_TABLE] = tscProcessShowCreateRsp;
tscProcessMsgRsp[TSDB_SQL_SHOW_CREATE_STABLE] = tscProcessShowCreateRsp;

View File

@ -51,6 +51,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_ACCT, "alter-acct" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_TABLE, "alter-table" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_ALTER_DB, "alter-db" )
TSDB_DEFINE_SQL_TYPE(TSDB_SQL_SYNC_DB_REPLICA, "sync db-replica")
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_CREATE_MNODE, "create-mnode" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_DROP_MNODE, "drop-mnode" )
@ -63,6 +64,7 @@ enum {
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_QUERY, "kill-query" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_STREAM, "kill-stream" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_KILL_CONNECTION, "kill-connection" )
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_COMPACT_VNODE, "compact-vnode" )
// SQL below is for read operation
TSDB_DEFINE_SQL_TYPE( TSDB_SQL_READ, "read" )

View File

@ -47,7 +47,8 @@ int32_t dnodeInitServer() {
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_CREATE_MNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE] = dnodeDispatchToVMgmtQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_TABLE] = dnodeDispatchToMPeerQueue;
dnodeProcessReqMsgFp[TSDB_MSG_TYPE_DM_CONFIG_VNODE] = dnodeDispatchToMPeerQueue;

View File

@ -61,6 +61,7 @@ int32_t dnodeInitShell() {
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_STREAM] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_KILL_CONN] = dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_CONFIG_DNODE]= dnodeDispatchToMWriteQueue;
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_COMPACT_VNODE]= dnodeDispatchToMWriteQueue;
// the following message shall be treated as mnode query
dnodeProcessShellMsgFp[TSDB_MSG_TYPE_CM_HEARTBEAT] = dnodeDispatchToMReadQueue;

View File

@ -31,6 +31,7 @@ static void * dnodeProcessMgmtQueue(void *param);
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessAlterStreamMsg(SRpcMsg *pMsg);
static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg);
@ -40,7 +41,8 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
int32_t dnodeInitVMgmt() {
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CREATE_VNODE] = dnodeProcessCreateVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_VNODE] = dnodeProcessAlterVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeProcessSyncVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_SYNC_VNODE] = dnodeProcessSyncVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_COMPACT_VNODE]= dnodeProcessCompactVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_DROP_VNODE] = dnodeProcessDropVnodeMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_ALTER_STREAM] = dnodeProcessAlterStreamMsg;
dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MD_CONFIG_DNODE] = dnodeProcessConfigDnodeMsg;
@ -187,6 +189,12 @@ static int32_t dnodeProcessSyncVnodeMsg(SRpcMsg *rpcMsg) {
return vnodeSync(pSyncVnode->vgId);
}
static int32_t dnodeProcessCompactVnodeMsg(SRpcMsg *rpcMsg) {
SCompactVnodeMsg *pCompactVnode = rpcMsg->pCont;
pCompactVnode->vgId = htonl(pCompactVnode->vgId);
return vnodeCompact(pCompactVnode->vgId);
}
static int32_t dnodeProcessDropVnodeMsg(SRpcMsg *rpcMsg) {
SDropVnodeMsg *pDrop = rpcMsg->pCont;
pDrop->vgId = htonl(pDrop->vgId);

View File

@ -61,9 +61,11 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CONFIG_DNODE, "config-dnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_ALTER_VNODE, "alter-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_SYNC_VNODE, "sync-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_MNODE, "create-mnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY6, "dummy6" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_DUMMY7, "dummy7" )
// message from client to mnode
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CONNECT, "connect" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_CREATE_ACCT, "create-acct" )
@ -84,6 +86,7 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_DROP_TABLE, "drop-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_TABLE, "alter-table" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLE_META, "table-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_STABLE_VGROUP, "stable-vgroup" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_COMPACT_VNODE, "compact-vnode" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_TABLES_META, "multiTable-meta" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_ALTER_STREAM, "alter-stream" )
TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_CM_SHOW, "show" )
@ -393,7 +396,7 @@ typedef struct {
typedef struct {
int32_t vgId;
} SDropVnodeMsg, SSyncVnodeMsg;
} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg;
typedef struct SColIndex {
int16_t colId; // column id
@ -539,7 +542,7 @@ typedef struct {
uint8_t status;
uint8_t role;
uint8_t replica;
uint8_t reserved;
uint8_t compact;
} SVnodeLoad;
typedef struct {
@ -781,6 +784,12 @@ typedef struct {
char payload[];
} SShowMsg;
typedef struct {
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t numOfVgroup;
int32_t vgid[];
} SCompactMsg;
typedef struct SShowRsp {
uint64_t qhandle;
STableMetaMsg tableMeta;

View File

@ -16,7 +16,6 @@
#ifndef TDENGINE_TTOKENDEF_H
#define TDENGINE_TTOKENDEF_H
#define TK_ID 1
#define TK_BOOL 2
#define TK_TINYINT 3
@ -95,121 +94,117 @@
#define TK_PASS 76
#define TK_PRIVILEGE 77
#define TK_LOCAL 78
#define TK_IF 79
#define TK_EXISTS 80
#define TK_PPS 81
#define TK_TSERIES 82
#define TK_DBS 83
#define TK_STORAGE 84
#define TK_QTIME 85
#define TK_CONNS 86
#define TK_STATE 87
#define TK_COMMA 88
#define TK_KEEP 89
#define TK_CACHE 90
#define TK_REPLICA 91
#define TK_QUORUM 92
#define TK_DAYS 93
#define TK_MINROWS 94
#define TK_MAXROWS 95
#define TK_BLOCKS 96
#define TK_CTIME 97
#define TK_WAL 98
#define TK_FSYNC 99
#define TK_COMP 100
#define TK_PRECISION 101
#define TK_UPDATE 102
#define TK_CACHELAST 103
#define TK_PARTITIONS 104
#define TK_LP 105
#define TK_RP 106
#define TK_UNSIGNED 107
#define TK_TAGS 108
#define TK_USING 109
#define TK_AS 110
#define TK_NULL 111
#define TK_SELECT 112
#define TK_UNION 113
#define TK_ALL 114
#define TK_DISTINCT 115
#define TK_FROM 116
#define TK_VARIABLE 117
#define TK_INTERVAL 118
#define TK_SESSION 119
#define TK_STATE_WINDOW 120
#define TK_FILL 121
#define TK_SLIDING 122
#define TK_ORDER 123
#define TK_BY 124
#define TK_ASC 125
#define TK_DESC 126
#define TK_GROUP 127
#define TK_HAVING 128
#define TK_LIMIT 129
#define TK_OFFSET 130
#define TK_SLIMIT 131
#define TK_SOFFSET 132
#define TK_WHERE 133
#define TK_NOW 134
#define TK_RESET 135
#define TK_QUERY 136
#define TK_SYNCDB 137
#define TK_ADD 138
#define TK_COLUMN 139
#define TK_MODIFY 140
#define TK_TAG 141
#define TK_CHANGE 142
#define TK_SET 143
#define TK_KILL 144
#define TK_CONNECTION 145
#define TK_STREAM 146
#define TK_COLON 147
#define TK_ABORT 148
#define TK_AFTER 149
#define TK_ATTACH 150
#define TK_BEFORE 151
#define TK_BEGIN 152
#define TK_CASCADE 153
#define TK_CLUSTER 154
#define TK_CONFLICT 155
#define TK_COPY 156
#define TK_DEFERRED 157
#define TK_DELIMITERS 158
#define TK_DETACH 159
#define TK_EACH 160
#define TK_END 161
#define TK_EXPLAIN 162
#define TK_FAIL 163
#define TK_FOR 164
#define TK_IGNORE 165
#define TK_IMMEDIATE 166
#define TK_INITIALLY 167
#define TK_INSTEAD 168
#define TK_MATCH 169
#define TK_KEY 170
#define TK_OF 171
#define TK_RAISE 172
#define TK_REPLACE 173
#define TK_RESTRICT 174
#define TK_ROW 175
#define TK_STATEMENT 176
#define TK_TRIGGER 177
#define TK_VIEW 178
#define TK_SEMI 179
#define TK_NONE 180
#define TK_PREV 181
#define TK_LINEAR 182
#define TK_IMPORT 183
#define TK_TBNAME 184
#define TK_JOIN 185
#define TK_INSERT 186
#define TK_INTO 187
#define TK_VALUES 188
#define TK_COMPACT 79
#define TK_LP 80
#define TK_RP 81
#define TK_IF 82
#define TK_EXISTS 83
#define TK_PPS 84
#define TK_TSERIES 85
#define TK_DBS 86
#define TK_STORAGE 87
#define TK_QTIME 88
#define TK_CONNS 89
#define TK_STATE 90
#define TK_COMMA 91
#define TK_KEEP 92
#define TK_CACHE 93
#define TK_REPLICA 94
#define TK_QUORUM 95
#define TK_DAYS 96
#define TK_MINROWS 97
#define TK_MAXROWS 98
#define TK_BLOCKS 99
#define TK_CTIME 100
#define TK_WAL 101
#define TK_FSYNC 102
#define TK_COMP 103
#define TK_PRECISION 104
#define TK_UPDATE 105
#define TK_CACHELAST 106
#define TK_PARTITIONS 107
#define TK_UNSIGNED 108
#define TK_TAGS 109
#define TK_USING 110
#define TK_AS 111
#define TK_NULL 112
#define TK_SELECT 113
#define TK_UNION 114
#define TK_ALL 115
#define TK_DISTINCT 116
#define TK_FROM 117
#define TK_VARIABLE 118
#define TK_INTERVAL 119
#define TK_SESSION 120
#define TK_STATE_WINDOW 121
#define TK_FILL 122
#define TK_SLIDING 123
#define TK_ORDER 124
#define TK_BY 125
#define TK_ASC 126
#define TK_DESC 127
#define TK_GROUP 128
#define TK_HAVING 129
#define TK_LIMIT 130
#define TK_OFFSET 131
#define TK_SLIMIT 132
#define TK_SOFFSET 133
#define TK_WHERE 134
#define TK_NOW 135
#define TK_RESET 136
#define TK_QUERY 137
#define TK_SYNCDB 138
#define TK_ADD 139
#define TK_COLUMN 140
#define TK_MODIFY 141
#define TK_TAG 142
#define TK_CHANGE 143
#define TK_SET 144
#define TK_KILL 145
#define TK_CONNECTION 146
#define TK_STREAM 147
#define TK_COLON 148
#define TK_ABORT 149
#define TK_AFTER 150
#define TK_ATTACH 151
#define TK_BEFORE 152
#define TK_BEGIN 153
#define TK_CASCADE 154
#define TK_CLUSTER 155
#define TK_CONFLICT 156
#define TK_COPY 157
#define TK_DEFERRED 158
#define TK_DELIMITERS 159
#define TK_DETACH 160
#define TK_EACH 161
#define TK_END 162
#define TK_EXPLAIN 163
#define TK_FAIL 164
#define TK_FOR 165
#define TK_IGNORE 166
#define TK_IMMEDIATE 167
#define TK_INITIALLY 168
#define TK_INSTEAD 169
#define TK_MATCH 170
#define TK_KEY 171
#define TK_OF 172
#define TK_RAISE 173
#define TK_REPLACE 174
#define TK_RESTRICT 175
#define TK_ROW 176
#define TK_STATEMENT 177
#define TK_TRIGGER 178
#define TK_VIEW 179
#define TK_SEMI 180
#define TK_NONE 181
#define TK_PREV 182
#define TK_LINEAR 183
#define TK_IMPORT 184
#define TK_TBNAME 185
#define TK_JOIN 186
#define TK_INSERT 187
#define TK_INTO 188
#define TK_VALUES 189
#define TK_SPACE 300

View File

@ -62,6 +62,7 @@ int32_t vnodeOpen(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeSync(int32_t vgId);
int32_t vnodeClose(int32_t vgId);
int32_t vnodeCompact(int32_t vgId);
// vnodeMgmt
int32_t vnodeInitMgmt();

View File

@ -144,6 +144,7 @@ typedef struct SVgObj {
int8_t reserved0[4];
SVnodeGid vnodeGid[TSDB_MAX_REPLICA];
int32_t vgCfgVersion;
int8_t compact;
int8_t reserved1[8];
int8_t updateEnd[4];
int32_t refCount;

View File

@ -51,6 +51,7 @@ void mnodeSendDropVnodeMsg(int32_t vgId, SRpcEpSet *epSet, void *ahandle);
void mnodeSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle);
void mnodeSendAlterVgroupMsg(SVgObj *pVgroup,SMnodeMsg *pMsg);
void mnodeSendSyncVgroupMsg(SVgObj *pVgroup);
void mnodeSendCompactVgroupMsg(SVgObj *pVgroup);
SRpcEpSet mnodeGetEpSetFromVgroup(SVgObj *pVgroup);
SRpcEpSet mnodeGetEpSetFromIp(char *ep);

View File

@ -53,6 +53,7 @@ static int32_t mnodeRetrieveDbs(SShowObj *pShow, char *data, int32_t rows, void
static int32_t mnodeProcessCreateDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg);
static int32_t mnodeProcessCompactMsg(SMnodeMsg *pMsg);
int32_t mnodeProcessAlterDbMsg(SMnodeMsg *pMsg);
#ifndef _TOPIC
@ -200,10 +201,12 @@ int32_t mnodeInitDbs() {
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_ALTER_DB, mnodeProcessAlterDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_DROP_DB, mnodeProcessDropDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_SYNC_DB, mnodeProcessSyncDbMsg);
mnodeAddWriteMsgHandle(TSDB_MSG_TYPE_CM_COMPACT_VNODE, mnodeProcessCompactMsg);
mnodeAddShowMetaHandle(TSDB_MGMT_TABLE_DB, mnodeGetDbMeta);
mnodeAddShowRetrieveHandle(TSDB_MGMT_TABLE_DB, mnodeRetrieveDbs);
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_DB, mnodeCancelGetNextDb);
mDebug("table:dbs table is created");
return tpInit();
}
@ -1267,7 +1270,7 @@ static int32_t mnodeProcessDropDbMsg(SMnodeMsg *pMsg) {
static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) {
void *pIter = NULL;
SVgObj *pVgroup = NULL;
while (1) {
while (1) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDb) {
@ -1281,6 +1284,43 @@ static int32_t mnodeSyncDb(SDbObj *pDb, SMnodeMsg *pMsg) {
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeCompact(SDbObj *pDb, SCompactMsg *pCompactMsg) {
int32_t count = ntohs(pCompactMsg->numOfVgroup);
int32_t *buf = malloc(sizeof(int32_t) * count);
if (buf == NULL) {
return TSDB_CODE_MND_OUT_OF_MEMORY;
}
for (int32_t i = 0; i < count; i++) {
buf[i] = ntohs(pCompactMsg->vgid[i]);
}
// copy from mnodeSyncDb, so ugly
for (int32_t i = 0; i < count; i++) {
SVgObj *pVgroup = NULL;
void *pIter = NULL;
bool valid = false;
while (1) {
pIter = mnodeGetNextVgroup(pIter, &pVgroup);
if (pVgroup == NULL) break;
if (pVgroup->pDb == pDb && pVgroup->vgId == buf[i]) {
mnodeSendCompactVgroupMsg(pVgroup);
mnodeDecVgroupRef(pVgroup);
valid = true;
break;
}
mnodeDecVgroupRef(pVgroup);
}
if (valid == false) {
mLError("db:%s, cannot find valid vgId: %d", pDb->name, buf[i]);
}
}
free(buf);
mLInfo("db:%s, trigger compact", pDb->name);
return TSDB_CODE_SUCCESS;
}
static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) {
SSyncDbMsg *pSyncDb = pMsg->rpcMsg.pCont;
mDebug("db:%s, syncdb is received from thandle:%p, ignore:%d", pSyncDb->db, pMsg->rpcMsg.handle, pSyncDb->ignoreNotExists);
@ -1303,6 +1343,20 @@ static int32_t mnodeProcessSyncDbMsg(SMnodeMsg *pMsg) {
return mnodeSyncDb(pMsg->pDb, pMsg);
}
static int32_t mnodeProcessCompactMsg(SMnodeMsg *pMsg) {
SCompactMsg *pCompact = pMsg->rpcMsg.pCont;
mDebug("db:%s, compact is received from thandle:%p", pCompact->db, pMsg->rpcMsg.handle);
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCompact->db);
if (pMsg->pDb == NULL) return TSDB_CODE_MND_DB_NOT_SELECTED;
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
mError("db:%s, status:%d, in dropping, ignore compact request", pCompact->db, pMsg->pDb->status);
return TSDB_CODE_MND_DB_IN_DROPPING;
}
return mnodeCompact(pMsg->pDb, pCompact);
}
void mnodeDropAllDbs(SAcctObj *pAcct) {
int32_t numOfDbs = 0;

View File

@ -1202,4 +1202,4 @@ int32_t mnodeCompactWal() {
sdbInfo("vgId:1, compact mnode wal success");
return 0;
}
}

View File

@ -60,6 +60,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pConn);
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessCompactVnodeRsp(SRpcMsg *rpcMsg);
static void mnodeProcessDropVnodeRsp(SRpcMsg *rpcMsg);
static int32_t mnodeProcessVnodeCfgMsg(SMnodeMsg *pMsg) ;
static void mnodeSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle);
@ -236,6 +237,7 @@ int32_t mnodeInitVgroups() {
mnodeAddShowFreeIterHandle(TSDB_MGMT_TABLE_VGROUP, mnodeCancelGetNextVgroup);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_CREATE_VNODE_RSP, mnodeProcessCreateVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_ALTER_VNODE_RSP, mnodeProcessAlterVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_COMPACT_VNODE_RSP, mnodeProcessCompactVnodeRsp);
mnodeAddPeerRspHandle(TSDB_MSG_TYPE_MD_DROP_VNODE_RSP, mnodeProcessDropVnodeRsp);
mnodeAddPeerMsgHandle(TSDB_MSG_TYPE_DM_CONFIG_VNODE, mnodeProcessVnodeCfgMsg);
@ -350,6 +352,7 @@ void mnodeUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVl
pVgroup->pDb->dbCfgVersion, pVgroup->vgCfgVersion, pVgroup->numOfVnodes);
mnodeSendAlterVgroupMsg(pVgroup,NULL);
}
pVgroup->compact = pVload->compact;
}
static int32_t mnodeAllocVgroupIdPool(SVgObj *pInputVgroup) {
@ -717,6 +720,13 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
cols++;
}
pShow->bytes[cols] = 4;
pSchema[cols].type = TSDB_DATA_TYPE_INT;
strcpy(pSchema[cols].name, "compacting");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pMeta->numOfColumns = htons(cols);
pShow->numOfColumns = cols;
@ -820,7 +830,11 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, role, pShow->bytes[cols]);
cols++;
}
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int8_t *)pWrite = pVgroup->compact;
cols++;
mnodeDecVgroupRef(pVgroup);
numOfRows++;
}
@ -979,6 +993,7 @@ static SSyncVnodeMsg *mnodeBuildSyncVnodeMsg(int32_t vgId) {
return pSyncVnode;
}
static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
SSyncVnodeMsg *pSyncVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId);
SRpcMsg rpcMsg = {
@ -991,6 +1006,18 @@ static void mnodeSendSyncVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
dnodeSendMsgToDnode(epSet, &rpcMsg);
}
static void mnodeSendCompactVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet) {
SCompactVnodeMsg *pCompactVnode = mnodeBuildSyncVnodeMsg(pVgroup->vgId);
SRpcMsg rpcMsg = {
.ahandle = NULL,
.pCont = pCompactVnode,
.contLen = pCompactVnode ? sizeof(SCompactVnodeMsg) : 0,
.code = 0,
.msgType = TSDB_MSG_TYPE_MD_COMPACT_VNODE
};
dnodeSendMsgToDnode(epSet, &rpcMsg);
}
void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) {
mDebug("vgId:%d, send sync all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes,
@ -1004,6 +1031,17 @@ void mnodeSendSyncVgroupMsg(SVgObj *pVgroup) {
}
}
void mnodeSendCompactVgroupMsg(SVgObj *pVgroup) {
mDebug("vgId:%d, send compact all vnodes msg, numOfVnodes:%d db:%s", pVgroup->vgId, pVgroup->numOfVnodes, pVgroup->dbName);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
//if (pVgroup->vnodeGid[i].role != TAOS_SYNC_ROLE_SLAVE) continue; //TODO(yihaoDeng): compact slave or not ?
SRpcEpSet epSet = mnodeGetEpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
mDebug("vgId:%d, index:%d, send compact vnode msg to dnode %s", pVgroup->vgId, i,
pVgroup->vnodeGid[i].pDnode->dnodeEp);
mnodeSendCompactVnodeMsg(pVgroup, &epSet);
}
}
static void mnodeSendCreateVnodeMsg(SVgObj *pVgroup, SRpcEpSet *epSet, void *ahandle) {
SCreateVnodeMsg *pCreate = mnodeBuildVnodeMsg(pVgroup);
SRpcMsg rpcMsg = {
@ -1050,6 +1088,9 @@ static void mnodeProcessAlterVnodeRsp(SRpcMsg *rpcMsg) {
dnodeSendRpcMWriteRsp(mnodeMsg, code);
}
}
static void mnodeProcessCompactVnodeRsp(SRpcMsg *rpcMsg) {
mDebug("compact vnode rsp received");
}
static void mnodeProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
if (rpcMsg->ahandle == NULL) return;
@ -1346,4 +1387,4 @@ int32_t mnodeCompactVgroups() {
mInfo("end to compact vgroups table...");
return 0;
}
}

View File

@ -312,6 +312,8 @@ void setCreateUserSql(SSqlInfo *pInfo, SStrToken *pName, SStrToken *pPasswd);
void setKillSql(SSqlInfo *pInfo, int32_t type, SStrToken *ip);
void setAlterUserSql(SSqlInfo *pInfo, int16_t type, SStrToken *pName, SStrToken* pPwd, SStrToken *pPrivilege);
void setCompactVnodeSql(SSqlInfo *pInfo, int32_t type, SArray *pParam);
void setDefaultCreateDbOption(SCreateDbInfo *pDBInfo);
void setDefaultCreateTopicOption(SCreateDbInfo *pDBInfo);

View File

@ -174,6 +174,10 @@ cmd ::= ALTER TOPIC ids(X) alter_topic_optr(Y). { SStrToken t = {0}; setCreateD
cmd ::= ALTER ACCOUNT ids(X) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, NULL, &Z);}
cmd ::= ALTER ACCOUNT ids(X) PASS ids(Y) acct_optr(Z). { setCreateAcctSql(pInfo, TSDB_SQL_ALTER_ACCT, &X, &Y, &Z);}
////////////////////////////// COMPACT STATEMENT //////////////////////////////////////////////
cmd ::= COMPACT VNODES IN LP exprlist(Y) RP. { setCompactVnodeSql(pInfo, TSDB_SQL_COMPACT_VNODE, Y);}
// An IDENTIFIER can be a generic identifier, or one of several keywords.
// Any non-standard keyword can also be an identifier.
// And "ids" is an identifer-or-string.

View File

@ -1067,6 +1067,10 @@ void setCreateAcctSql(SSqlInfo *pInfo, int32_t type, SStrToken *pName, SStrToken
pInfo->pMiscInfo->user.passwd = *pPwd;
}
}
void setCompactVnodeSql(SSqlInfo *pInfo, int32_t type, SArray *pParam) {
pInfo->type = type;
pInfo->list = pParam;
}
void setCreateUserSql(SSqlInfo *pInfo, SStrToken *pName, SStrToken *pPasswd) {
pInfo->type = TSDB_SQL_CREATE_USER;

File diff suppressed because it is too large Load Diff

View File

@ -118,406 +118,407 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
return 0;
}
static int tsdbCompactTSData(STsdbRepo *pRepo) {
SCompactH compactH;
SDFileSet *pSet = NULL;
static int tsdbCompactTSData(STsdbRepo *pRepo) {
SCompactH compactH;
SDFileSet *pSet = NULL;
tsdbDebug("vgId:%d start to compact TS data", REPO_ID(pRepo));
tsdbDebug("vgId:%d start to compact TS data", REPO_ID(pRepo));
// If no file, just return 0;
if (taosArrayGetSize(REPO_FS(pRepo)->cstatus->df) <= 0) {
tsdbDebug("vgId:%d no TS data file to compact, compact over", REPO_ID(pRepo));
// If no file, just return 0;
if (taosArrayGetSize(REPO_FS(pRepo)->cstatus->df) <= 0) {
tsdbDebug("vgId:%d no TS data file to compact, compact over", REPO_ID(pRepo));
return 0;
}
if (tsdbInitCompactH(&compactH, pRepo) < 0) {
return -1;
}
while ((pSet = tsdbFSIterNext(&(compactH.fsIter)))) {
// Remove those expired files
if (pSet->fid < compactH.rtn.minFid) {
tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
continue;
}
if (TSDB_FSET_LEVEL(pSet) == TFS_MAX_LEVEL) {
tsdbDebug("vgId:%d FSET %d on level %d, should not compact", REPO_ID(pRepo), pSet->fid, TFS_MAX_LEVEL);
tsdbUpdateDFileSet(REPO_FS(pRepo), pSet);
continue;
}
if (tsdbCompactFSet(&compactH, pSet) < 0) {
tsdbDestroyCompactH(&compactH);
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
}
tsdbDestroyCompactH(&compactH);
tsdbDebug("vgId:%d compact TS data over", REPO_ID(pRepo));
return 0;
}
if (tsdbInitCompactH(&compactH, pRepo) < 0) {
return -1;
}
static int tsdbCompactFSet(SCompactH *pComph, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
SDiskID did;
while ((pSet = tsdbFSIterNext(&(compactH.fsIter)))) {
// Remove those expired files
if (pSet->fid < compactH.rtn.minFid) {
tsdbInfo("vgId:%d FSET %d on level %d disk id %d expires, remove it", REPO_ID(pRepo), pSet->fid,
TSDB_FSET_LEVEL(pSet), TSDB_FSET_ID(pSet));
continue;
}
tsdbDebug("vgId:%d start to compact FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet),
TSDB_FSET_ID(pSet));
if (TSDB_FSET_LEVEL(pSet) == TFS_MAX_LEVEL) {
tsdbDebug("vgId:%d FSET %d on level %d, should not compact", REPO_ID(pRepo), pSet->fid, TFS_MAX_LEVEL);
tsdbUpdateDFileSet(REPO_FS(pRepo), pSet);
continue;
}
if (tsdbCompactFSet(&compactH, pSet) < 0) {
tsdbDestroyCompactH(&compactH);
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
return -1;
}
}
tsdbDestroyCompactH(&compactH);
tsdbDebug("vgId:%d compact TS data over", REPO_ID(pRepo));
return 0;
}
static int tsdbCompactFSet(SCompactH *pComph, SDFileSet *pSet) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
SDiskID did;
tsdbDebug("vgId:%d start to compact FSET %d on level %d id %d", REPO_ID(pRepo), pSet->fid, TSDB_FSET_LEVEL(pSet),
TSDB_FSET_ID(pSet));
if (tsdbCompactFSetInit(pComph, pSet) < 0) {
return -1;
}
if (!tsdbShouldCompact(pComph)) {
tsdbDebug("vgId:%d no need to compact FSET %d", REPO_ID(pRepo), pSet->fid);
if (tsdbApplyRtnOnFSet(TSDB_COMPACT_REPO(pComph), pSet, &(pComph->rtn)) < 0) {
tsdbCompactFSetEnd(pComph);
return -1;
}
} else {
// Create new fset as compacted fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pComph->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCompactFSetEnd(pComph);
if (tsdbCompactFSetInit(pComph, pSet) < 0) {
return -1;
}
tsdbInitDFileSet(TSDB_COMPACT_WSET(pComph), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet),
FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateDFileSet(TSDB_COMPACT_WSET(pComph), true) < 0) {
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCompactFSetEnd(pComph);
return -1;
}
if (!tsdbShouldCompact(pComph)) {
tsdbDebug("vgId:%d no need to compact FSET %d", REPO_ID(pRepo), pSet->fid);
if (tsdbApplyRtnOnFSet(TSDB_COMPACT_REPO(pComph), pSet, &(pComph->rtn)) < 0) {
tsdbCompactFSetEnd(pComph);
return -1;
}
} else {
// Create new fset as compacted fset
tfsAllocDisk(tsdbGetFidLevel(pSet->fid, &(pComph->rtn)), &(did.level), &(did.id));
if (did.level == TFS_UNDECIDED_LEVEL) {
terrno = TSDB_CODE_TDB_NO_AVAIL_DISK;
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCompactFSetEnd(pComph);
return -1;
}
tsdbInitDFileSet(TSDB_COMPACT_WSET(pComph), did, REPO_ID(pRepo), TSDB_FSET_FID(pSet),
FS_TXN_VERSION(REPO_FS(pRepo)));
if (tsdbCreateDFileSet(TSDB_COMPACT_WSET(pComph), true) < 0) {
tsdbError("vgId:%d failed to compact FSET %d since %s", REPO_ID(pRepo), pSet->fid, tstrerror(terrno));
tsdbCompactFSetEnd(pComph);
return -1;
}
if (tsdbCompactFSetImpl(pComph) < 0) {
tsdbCloseDFileSet(TSDB_COMPACT_WSET(pComph));
tsdbRemoveDFileSet(TSDB_COMPACT_WSET(pComph));
tsdbCompactFSetEnd(pComph);
return -1;
}
if (tsdbCompactFSetImpl(pComph) < 0) {
tsdbCloseDFileSet(TSDB_COMPACT_WSET(pComph));
tsdbRemoveDFileSet(TSDB_COMPACT_WSET(pComph));
tsdbCompactFSetEnd(pComph);
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_COMPACT_WSET(pComph));
tsdbDebug("vgId:%d FSET %d compact over", REPO_ID(pRepo), pSet->fid);
}
tsdbCompactFSetEnd(pComph);
return 0;
}
static bool tsdbShouldCompact(SCompactH *pComph) {
STsdbRepo * pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pComph->readh);
STableCompactH *pTh;
SBlock * pBlock;
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
SDFile * pDataF = TSDB_READ_DATA_FILE(pReadh);
SDFile * pLastF = TSDB_READ_LAST_FILE(pReadh);
int tblocks = 0; // total blocks
int nSubBlocks = 0; // # of blocks with sub-blocks
int nSmallBlocks = 0; // # of blocks with rows < defaultRows
int64_t tsize = 0;
for (size_t i = 0; i < taosArrayGetSize(pComph->tbArray); i++) {
pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, i);
if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
for (size_t bidx = 0; bidx < pTh->pBlkIdx->numOfBlocks; bidx++) {
tblocks++;
pBlock = pTh->pInfo->blocks + bidx;
if (pBlock->numOfRows < defaultRows) {
nSmallBlocks++;
}
if (pBlock->numOfSubBlocks > 1) {
nSubBlocks++;
for (int k = 0; k < pBlock->numOfSubBlocks; k++) {
SBlock *iBlock = ((SBlock *)POINTER_SHIFT(pTh->pInfo, pBlock->offset)) + k;
tsize = tsize + iBlock->len;
}
} else if (pBlock->numOfSubBlocks == 1) {
tsize += pBlock->len;
} else {
ASSERT(0);
}
}
}
return (((nSubBlocks * 1.0 / tblocks) > 0.33) || ((nSmallBlocks * 1.0 / tblocks) > 0.33) ||
(tsize * 1.0 / (pDataF->info.size + pLastF->info.size - 2 * TSDB_FILE_HEAD_SIZE) < 0.85));
}
static int tsdbInitCompactH(SCompactH *pComph, STsdbRepo *pRepo) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pComph, 0, sizeof(*pComph));
TSDB_FSET_SET_CLOSED(TSDB_COMPACT_WSET(pComph));
tsdbGetRtnSnap(pRepo, &(pComph->rtn));
tsdbFSIterInit(&(pComph->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
if (tsdbInitReadH(&(pComph->readh), pRepo) < 0) {
return -1;
}
if (tsdbInitCompTbArray(pComph) < 0) {
tsdbDestroyCompactH(pComph);
return -1;
}
pComph->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (pComph->aBlkIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCompactH(pComph);
return -1;
}
pComph->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
if (pComph->aSupBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCompactH(pComph);
return -1;
}
pComph->pDataCols = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
if (pComph->pDataCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCompactH(pComph);
return -1;
}
return 0;
}
static void tsdbDestroyCompactH(SCompactH *pComph) {
pComph->pDataCols = tdFreeDataCols(pComph->pDataCols);
pComph->aSupBlk = taosArrayDestroy(pComph->aSupBlk);
pComph->aBlkIdx = taosArrayDestroy(pComph->aBlkIdx);
tsdbDestroyCompTbArray(pComph);
tsdbDestroyReadH(&(pComph->readh));
tsdbCloseDFileSet(TSDB_COMPACT_WSET(pComph));
tsdbUpdateDFileSet(REPO_FS(pRepo), TSDB_COMPACT_WSET(pComph));
tsdbDebug("vgId:%d FSET %d compact over", REPO_ID(pRepo), pSet->fid);
}
tsdbCompactFSetEnd(pComph);
return 0;
}
static int tsdbInitCompTbArray(SCompactH *pComph) { // Init pComp->tbArray
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbMeta *pMeta = pRepo->tsdbMeta;
static bool tsdbShouldCompact(SCompactH *pComph) {
STsdbRepo * pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pComph->readh);
STableCompactH *pTh;
SBlock * pBlock;
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
SDFile * pDataF = TSDB_READ_DATA_FILE(pReadh);
SDFile * pLastF = TSDB_READ_LAST_FILE(pReadh);
if (tsdbRLockRepoMeta(pRepo) < 0) return -1;
int tblocks = 0; // total blocks
int nSubBlocks = 0; // # of blocks with sub-blocks
int nSmallBlocks = 0; // # of blocks with rows < defaultRows
int64_t tsize = 0;
for (size_t i = 0; i < taosArrayGetSize(pComph->tbArray); i++) {
pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, i);
if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
for (size_t bidx = 0; bidx < pTh->pBlkIdx->numOfBlocks; bidx++) {
tblocks++;
pBlock = pTh->pInfo->blocks + bidx;
if (pBlock->numOfRows < defaultRows) {
nSmallBlocks++;
}
if (pBlock->numOfSubBlocks > 1) {
nSubBlocks++;
for (int k = 0; k < pBlock->numOfSubBlocks; k++) {
SBlock *iBlock = ((SBlock *)POINTER_SHIFT(pTh->pInfo, pBlock->offset)) + k;
tsize = tsize + iBlock->len;
}
} else if (pBlock->numOfSubBlocks == 1) {
tsize += pBlock->len;
} else {
ASSERT(0);
}
}
}
return (((nSubBlocks * 1.0 / tblocks) > 0.33) || ((nSmallBlocks * 1.0 / tblocks) > 0.33) ||
(tsize * 1.0 / (pDataF->info.size + pLastF->info.size - 2 * TSDB_FILE_HEAD_SIZE) < 0.85));
}
static int tsdbInitCompactH(SCompactH *pComph, STsdbRepo *pRepo) {
STsdbCfg *pCfg = REPO_CFG(pRepo);
memset(pComph, 0, sizeof(*pComph));
TSDB_FSET_SET_CLOSED(TSDB_COMPACT_WSET(pComph));
tsdbGetRtnSnap(pRepo, &(pComph->rtn));
tsdbFSIterInit(&(pComph->fsIter), REPO_FS(pRepo), TSDB_FS_ITER_FORWARD);
if (tsdbInitReadH(&(pComph->readh), pRepo) < 0) {
return -1;
}
if (tsdbInitCompTbArray(pComph) < 0) {
tsdbDestroyCompactH(pComph);
return -1;
}
pComph->aBlkIdx = taosArrayInit(1024, sizeof(SBlockIdx));
if (pComph->aBlkIdx == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCompactH(pComph);
return -1;
}
pComph->aSupBlk = taosArrayInit(1024, sizeof(SBlock));
if (pComph->aSupBlk == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCompactH(pComph);
return -1;
}
pComph->pDataCols = tdNewDataCols(0, 0, pCfg->maxRowsPerFileBlock);
if (pComph->pDataCols == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbDestroyCompactH(pComph);
return -1;
}
return 0;
}
static void tsdbDestroyCompactH(SCompactH *pComph) {
pComph->pDataCols = tdFreeDataCols(pComph->pDataCols);
pComph->aSupBlk = taosArrayDestroy(pComph->aSupBlk);
pComph->aBlkIdx = taosArrayDestroy(pComph->aBlkIdx);
tsdbDestroyCompTbArray(pComph);
tsdbDestroyReadH(&(pComph->readh));
tsdbCloseDFileSet(TSDB_COMPACT_WSET(pComph));
}
static int tsdbInitCompTbArray(SCompactH *pComph) { // Init pComp->tbArray
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbMeta *pMeta = pRepo->tsdbMeta;
if (tsdbRLockRepoMeta(pRepo) < 0) return -1;
pComph->tbArray = taosArrayInit(pMeta->maxTables, sizeof(STableCompactH));
if (pComph->tbArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
}
// Note here must start from 0
for (int i = 0; i < pMeta->maxTables; i++) {
STableCompactH ch = {0};
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
ch.pTable = pMeta->tables[i];
}
if (taosArrayPush(pComph->tbArray, &ch) == NULL) {
pComph->tbArray = taosArrayInit(pMeta->maxTables, sizeof(STableCompactH));
if (pComph->tbArray == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
}
}
if (tsdbUnlockRepoMeta(pRepo) < 0) return -1;
return 0;
}
// Note here must start from 0
for (int i = 0; i < pMeta->maxTables; i++) {
STableCompactH ch = {0};
if (pMeta->tables[i] != NULL) {
tsdbRefTable(pMeta->tables[i]);
ch.pTable = pMeta->tables[i];
}
static void tsdbDestroyCompTbArray(SCompactH *pComph) {
STableCompactH *pTh;
if (pComph->tbArray == NULL) return;
for (size_t i = 0; i < taosArrayGetSize(pComph->tbArray); i++) {
pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, i);
if (pTh->pTable) {
tsdbUnRefTable(pTh->pTable);
if (taosArrayPush(pComph->tbArray, &ch) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
tsdbUnlockRepoMeta(pRepo);
return -1;
}
}
pTh->pInfo = taosTZfree(pTh->pInfo);
if (tsdbUnlockRepoMeta(pRepo) < 0) return -1;
return 0;
}
pComph->tbArray = taosArrayDestroy(pComph->tbArray);
}
static void tsdbDestroyCompTbArray(SCompactH *pComph) {
STableCompactH *pTh;
static int tsdbCacheFSetIndex(SCompactH *pComph) {
SReadH *pReadH = &(pComph->readh);
if (pComph->tbArray == NULL) return;
if (tsdbLoadBlockIdx(pReadH) < 0) {
return -1;
for (size_t i = 0; i < taosArrayGetSize(pComph->tbArray); i++) {
pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, i);
if (pTh->pTable) {
tsdbUnRefTable(pTh->pTable);
}
pTh->pInfo = taosTZfree(pTh->pInfo);
}
pComph->tbArray = taosArrayDestroy(pComph->tbArray);
}
for (int tid = 1; tid < taosArrayGetSize(pComph->tbArray); tid++) {
STableCompactH *pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, tid);
pTh->pBlkIdx = NULL;
static int tsdbCacheFSetIndex(SCompactH *pComph) {
SReadH *pReadH = &(pComph->readh);
if (pTh->pTable == NULL) continue;
if (tsdbSetReadTable(pReadH, pTh->pTable) < 0) {
if (tsdbLoadBlockIdx(pReadH) < 0) {
return -1;
}
if (pReadH->pBlkIdx == NULL) continue;
pTh->bindex = *(pReadH->pBlkIdx);
pTh->pBlkIdx = &(pTh->bindex);
for (int tid = 1; tid < taosArrayGetSize(pComph->tbArray); tid++) {
STableCompactH *pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, tid);
pTh->pBlkIdx = NULL;
if (tsdbMakeRoom((void **)(&(pTh->pInfo)), pTh->pBlkIdx->len) < 0) {
return -1;
}
if (tsdbLoadBlockInfo(pReadH, (void *)(pTh->pInfo)) < 0) {
return -1;
}
}
return 0;
}
static int tsdbCompactFSetInit(SCompactH *pComph, SDFileSet *pSet) {
taosArrayClear(pComph->aBlkIdx);
taosArrayClear(pComph->aSupBlk);
if (tsdbSetAndOpenReadFSet(&(pComph->readh), pSet) < 0) {
return -1;
}
if (tsdbCacheFSetIndex(pComph) < 0) {
tsdbCloseAndUnsetFSet(&(pComph->readh));
return -1;
}
return 0;
}
static void tsdbCompactFSetEnd(SCompactH *pComph) { tsdbCloseAndUnsetFSet(&(pComph->readh)); }
static int tsdbCompactFSetImpl(SCompactH *pComph) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pComph->readh);
SBlockIdx blkIdx;
void ** ppBuf = &(TSDB_COMPACT_BUF(pComph));
void ** ppCBuf = &(TSDB_COMPACT_COMP_BUF(pComph));
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
taosArrayClear(pComph->aBlkIdx);
for (int tid = 1; tid < taosArrayGetSize(pComph->tbArray); tid++) {
STableCompactH *pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, tid);
STSchema * pSchema;
if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
pSchema = tsdbGetTableSchemaImpl(pTh->pTable, true, true, -1);
taosArrayClear(pComph->aSupBlk);
if ((tdInitDataCols(pComph->pDataCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
(tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tdFreeSchema(pSchema);
// Loop to compact each block data
for (int i = 0; i < pTh->pBlkIdx->numOfBlocks; i++) {
SBlock *pBlock = pTh->pInfo->blocks + i;
// Load the block data
if (tsdbLoadBlockData(pReadh, pBlock, pTh->pInfo) < 0) {
if (pTh->pTable == NULL) continue;
if (tsdbSetReadTable(pReadH, pTh->pTable) < 0) {
return -1;
}
// Merge pComph->pDataCols and pReadh->pDCols[0] and write data to file
if (pComph->pDataCols->numOfRows == 0 && pBlock->numOfRows >= defaultRows) {
if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pReadh->pDCols[0], ppBuf, ppCBuf) < 0) {
return -1;
}
} else {
int ridx = 0;
if (pReadH->pBlkIdx == NULL) continue;
pTh->bindex = *(pReadH->pBlkIdx);
pTh->pBlkIdx = &(pTh->bindex);
while (true) {
if (pReadh->pDCols[0]->numOfRows - ridx == 0) break;
int rowsToMerge = MIN(pReadh->pDCols[0]->numOfRows - ridx, defaultRows - pComph->pDataCols->numOfRows);
if (tsdbMakeRoom((void **)(&(pTh->pInfo)), pTh->pBlkIdx->len) < 0) {
return -1;
}
tdMergeDataCols(pComph->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx);
if (pComph->pDataCols->numOfRows < defaultRows) {
break;
}
if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf) < 0) {
return -1;
}
tdResetDataCols(pComph->pDataCols);
}
if (tsdbLoadBlockInfo(pReadH, (void *)(pTh->pInfo)) < 0) {
return -1;
}
}
if (pComph->pDataCols->numOfRows > 0 &&
tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf) < 0) {
return 0;
}
static int tsdbCompactFSetInit(SCompactH *pComph, SDFileSet *pSet) {
taosArrayClear(pComph->aBlkIdx);
taosArrayClear(pComph->aSupBlk);
if (tsdbSetAndOpenReadFSet(&(pComph->readh), pSet) < 0) {
return -1;
}
if (tsdbWriteBlockInfoImpl(TSDB_COMPACT_HEAD_FILE(pComph), pTh->pTable, pComph->aSupBlk, NULL, ppBuf, &blkIdx) <
0) {
if (tsdbCacheFSetIndex(pComph) < 0) {
tsdbCloseAndUnsetFSet(&(pComph->readh));
return -1;
}
if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(pComph->aBlkIdx, (void *)(&blkIdx)) == NULL)) {
return 0;
}
static void tsdbCompactFSetEnd(SCompactH *pComph) { tsdbCloseAndUnsetFSet(&(pComph->readh)); }
static int tsdbCompactFSetImpl(SCompactH *pComph) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SReadH * pReadh = &(pComph->readh);
SBlockIdx blkIdx;
void ** ppBuf = &(TSDB_COMPACT_BUF(pComph));
void ** ppCBuf = &(TSDB_COMPACT_COMP_BUF(pComph));
int defaultRows = TSDB_DEFAULT_BLOCK_ROWS(pCfg->maxRowsPerFileBlock);
taosArrayClear(pComph->aBlkIdx);
for (int tid = 1; tid < taosArrayGetSize(pComph->tbArray); tid++) {
STableCompactH *pTh = (STableCompactH *)taosArrayGet(pComph->tbArray, tid);
STSchema * pSchema;
if (pTh->pTable == NULL || pTh->pBlkIdx == NULL) continue;
pSchema = tsdbGetTableSchemaImpl(pTh->pTable, true, true, -1);
taosArrayClear(pComph->aSupBlk);
if ((tdInitDataCols(pComph->pDataCols, pSchema) < 0) || (tdInitDataCols(pReadh->pDCols[0], pSchema) < 0) ||
(tdInitDataCols(pReadh->pDCols[1], pSchema) < 0)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
tdFreeSchema(pSchema);
// Loop to compact each block data
for (int i = 0; i < pTh->pBlkIdx->numOfBlocks; i++) {
SBlock *pBlock = pTh->pInfo->blocks + i;
// Load the block data
if (tsdbLoadBlockData(pReadh, pBlock, pTh->pInfo) < 0) {
return -1;
}
// Merge pComph->pDataCols and pReadh->pDCols[0] and write data to file
if (pComph->pDataCols->numOfRows == 0 && pBlock->numOfRows >= defaultRows) {
if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pReadh->pDCols[0], ppBuf, ppCBuf) < 0) {
return -1;
}
} else {
int ridx = 0;
while (true) {
if (pReadh->pDCols[0]->numOfRows - ridx == 0) break;
int rowsToMerge = MIN(pReadh->pDCols[0]->numOfRows - ridx, defaultRows - pComph->pDataCols->numOfRows);
tdMergeDataCols(pComph->pDataCols, pReadh->pDCols[0], rowsToMerge, &ridx);
if (pComph->pDataCols->numOfRows < defaultRows) {
break;
}
if (tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf) < 0) {
return -1;
}
tdResetDataCols(pComph->pDataCols);
}
}
}
if (pComph->pDataCols->numOfRows > 0 &&
tsdbWriteBlockToRightFile(pComph, pTh->pTable, pComph->pDataCols, ppBuf, ppCBuf) < 0) {
return -1;
}
if (tsdbWriteBlockInfoImpl(TSDB_COMPACT_HEAD_FILE(pComph), pTh->pTable, pComph->aSupBlk, NULL, ppBuf, &blkIdx) <
0) {
return -1;
}
if ((blkIdx.numOfBlocks > 0) && (taosArrayPush(pComph->aBlkIdx, (void *)(&blkIdx)) == NULL)) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
if (tsdbWriteBlockIdx(TSDB_COMPACT_HEAD_FILE(pComph), pComph->aBlkIdx, ppBuf) < 0) {
return -1;
}
return 0;
}
static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf,
void **ppCBuf) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SDFile * pDFile;
bool isLast;
SBlock block;
ASSERT(pDataCols->numOfRows > 0);
if (pDataCols->numOfRows < pCfg->minRowsPerFileBlock) {
pDFile = TSDB_COMPACT_LAST_FILE(pComph);
isLast = true;
} else {
pDFile = TSDB_COMPACT_DATA_FILE(pComph);
isLast = false;
}
if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, pDataCols, &block, isLast, true, ppBuf, ppCBuf) < 0) {
return -1;
}
if (taosArrayPush(pComph->aSupBlk, (void *)(&block)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
}
if (tsdbWriteBlockIdx(TSDB_COMPACT_HEAD_FILE(pComph), pComph->aBlkIdx, ppBuf) < 0) {
return -1;
}
return 0;
return 0;
}
static int tsdbWriteBlockToRightFile(SCompactH *pComph, STable *pTable, SDataCols *pDataCols, void **ppBuf,
void **ppCBuf) {
STsdbRepo *pRepo = TSDB_COMPACT_REPO(pComph);
STsdbCfg * pCfg = REPO_CFG(pRepo);
SDFile * pDFile;
bool isLast;
SBlock block;
ASSERT(pDataCols->numOfRows > 0);
if (pDataCols->numOfRows < pCfg->minRowsPerFileBlock) {
pDFile = TSDB_COMPACT_LAST_FILE(pComph);
isLast = true;
} else {
pDFile = TSDB_COMPACT_DATA_FILE(pComph);
isLast = false;
}
if (tsdbWriteBlockImpl(pRepo, pTable, pDFile, pDataCols, &block, isLast, true, ppBuf, ppCBuf) < 0) {
return -1;
}
if (taosArrayPush(pComph->aSupBlk, (void *)(&block)) == NULL) {
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
return -1;
}
return 0;
}

View File

@ -219,6 +219,7 @@ static SKeyword keywordTable[] = {
{"PARTITIONS", TK_PARTITIONS},
{"TOPIC", TK_TOPIC},
{"TOPICS", TK_TOPICS},
{"COMPACT", TK_COMPACT},
{"MODIFY", TK_MODIFY}
};

View File

@ -114,6 +114,7 @@ int32_t vnodeSync(int32_t vgId) {
return TSDB_CODE_SUCCESS;
}
int32_t vnodeDrop(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
@ -133,6 +134,19 @@ int32_t vnodeDrop(int32_t vgId) {
return TSDB_CODE_SUCCESS;
}
int32_t vnodeCompact(int32_t vgId) {
void *pVnode = vnodeAcquire(vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, compact vnode msg is received", vgId);
//not care success or not
tsdbCompact(((SVnodeObj*)pVnode)->tsdb);
vnodeRelease(pVnode);
} else {
vInfo("vgId:%d, vnode not exist, can't compact it", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeAlterImp(SVnodeObj *pVnode, SCreateVnodeMsg *pVnodeCfg) {
STsdbCfg tsdbCfg = pVnode->tsdbCfg;

View File

@ -148,6 +148,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
pLoad->status = pVnode->status;
pLoad->role = pVnode->role;
pLoad->replica = pVnode->syncCfg.replica;
pLoad->compact = (pVnode->tsdb != NULL) && tsdbInCompact(pVnode->tsdb) ? 1 : 0;
}
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {