Merge pull request #22482 from taosdata/fix/TD-25818

fix(vnode/destroy): delete objects
This commit is contained in:
wade zhang 2023-08-18 17:49:22 +08:00 committed by GitHub
commit a7e11c2520
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 114 additions and 91 deletions

View File

@ -201,24 +201,24 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
if(req.learnerReplica == 0) if (req.learnerReplica == 0) {
{
req.learnerSelfIndex = -1; req.learnerSelfIndex = -1;
} }
dInfo("vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d szBuf:%" PRIu64 dInfo(
", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64 "vgId:%d, vnode management handle msgType:%s, start to create vnode, page:%d pageSize:%d buffer:%d szPage:%d "
"szBuf:%" PRIu64 ", cacheLast:%d cacheLastSize:%d sstTrigger:%d tsdbPageSize:%d %d dbname:%s dbId:%" PRId64
", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d" ", days:%d keep0:%d keep1:%d keep2:%d tsma:%d precision:%d compression:%d minRows:%d maxRows:%d"
", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64 ", wal fsync:%d level:%d retentionPeriod:%d retentionSize:%" PRId64 " rollPeriod:%d segSize:%" PRId64
", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d " ", hash method:%d begin:%u end:%u prefix:%d surfix:%d replica:%d selfIndex:%d "
"learnerReplica:%d learnerSelfIndex:%d strict:%d", "learnerReplica:%d learnerSelfIndex:%d strict:%d",
req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024, req.vgId, TMSG_INFO(pMsg->msgType), req.pages, req.pageSize, req.buffer, req.pageSize * 1024,
(uint64_t)req.buffer * 1024 * 1024, (uint64_t)req.buffer * 1024 * 1024, req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize,
req.cacheLast, req.cacheLastSize, req.sstTrigger, req.tsdbPageSize, req.tsdbPageSize * 1024, req.db, req.dbUid, req.tsdbPageSize * 1024, req.db, req.dbUid, req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2,
req.daysPerFile, req.daysToKeep0, req.daysToKeep1, req.daysToKeep2, req.isTsma, req.precision, req.compression, req.isTsma, req.precision, req.compression, req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel,
req.minRows, req.maxRows, req.walFsyncPeriod, req.walLevel, req.walRetentionPeriod, req.walRetentionSize, req.walRetentionPeriod, req.walRetentionSize, req.walRollPeriod, req.walSegmentSize, req.hashMethod,
req.walRollPeriod, req.walSegmentSize, req.hashMethod, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashBegin, req.hashEnd, req.hashPrefix, req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica,
req.hashSuffix, req.replica, req.selfIndex, req.learnerReplica, req.learnerSelfIndex, req.strict); req.learnerSelfIndex, req.strict);
for (int32_t i = 0; i < req.replica; ++i) { for (int32_t i = 0; i < req.replica; ++i) {
dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port, dInfo("vgId:%d, replica:%d ep:%s:%u dnode:%d", req.vgId, i, req.replicas[i].fqdn, req.replicas[i].port,
req.replicas[i].id); req.replicas[i].id);
@ -231,8 +231,7 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SReplica *pReplica = NULL; SReplica *pReplica = NULL;
if (req.selfIndex != -1) { if (req.selfIndex != -1) {
pReplica = &req.replicas[req.selfIndex]; pReplica = &req.replicas[req.selfIndex];
} } else {
else{
pReplica = &req.learnerReplicas[req.learnerSelfIndex]; pReplica = &req.learnerReplicas[req.learnerSelfIndex];
} }
if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort || if (pReplica->id != pMgmt->pData->dnodeId || pReplica->port != tsServerPort ||
@ -313,10 +312,10 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
_OVER: _OVER:
if (code != 0) { if (code != 0) {
vnodeClose(pImpl); vnodeClose(pImpl);
vnodeDestroy(path, pMgmt->pTfs); vnodeDestroy(0, path, pMgmt->pTfs);
} else { } else {
dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", dInfo("vgId:%d, vnode management handle msgType:%s, end to create vnode, vnode is created", req.vgId,
req.vgId, TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
} }
tFreeSCreateVnodeReq(&req); tFreeSCreateVnodeReq(&req);
@ -335,8 +334,8 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
req.learnerSelfIndex = -1; req.learnerSelfIndex = -1;
} }
dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", dInfo("vgId:%d, vnode management handle msgType:%s, start to process alter-node-type-request", req.vgId,
req.vgId, TMSG_INFO(pMsg->msgType)); TMSG_INFO(pMsg->msgType));
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId); SVnodeObj *pVnode = vmAcquireVnode(pMgmt, req.vgId);
if (pVnode == NULL) { if (pVnode == NULL) {
@ -375,9 +374,8 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id); dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->id);
} }
if (req.replica <= 0 || if (req.replica <= 0 || (req.selfIndex < 0 && req.learnerSelfIndex < 0) || req.selfIndex >= req.replica ||
(req.selfIndex < 0 && req.learnerSelfIndex <0)|| req.learnerSelfIndex >= req.learnerReplica) {
req.selfIndex >= req.replica || req.learnerSelfIndex >= req.learnerReplica) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, failed to alter replica since invalid msg", vgId); dError("vgId:%d, failed to alter replica since invalid msg", vgId);
vmReleaseVnode(pMgmt, pVnode); vmReleaseVnode(pMgmt, pVnode);
@ -387,8 +385,7 @@ int32_t vmProcessAlterVnodeTypeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SReplica *pReplica = NULL; SReplica *pReplica = NULL;
if (req.selfIndex != -1) { if (req.selfIndex != -1) {
pReplica = &req.replicas[req.selfIndex]; pReplica = &req.replicas[req.selfIndex];
} } else {
else{
pReplica = &req.learnerReplicas[req.learnerSelfIndex]; pReplica = &req.learnerReplicas[req.learnerSelfIndex];
} }
@ -560,7 +557,8 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
} }
int32_t vgId = alterReq.vgId; int32_t vgId = alterReq.vgId;
dInfo("vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d " dInfo(
"vgId:%d,vnode management handle msgType:%s, start to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d", "learnerSelfIndex:%d strict:%d",
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict); alterReq.learnerSelfIndex, alterReq.strict);
@ -573,8 +571,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port); dInfo("vgId:%d, learnerReplicas:%d ep:%s:%u dnode:%d", vgId, i, pReplica->fqdn, pReplica->port, pReplica->port);
} }
if (alterReq.replica <= 0 || if (alterReq.replica <= 0 || (alterReq.selfIndex < 0 && alterReq.learnerSelfIndex < 0) ||
(alterReq.selfIndex < 0 && alterReq.learnerSelfIndex <0)||
alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) { alterReq.selfIndex >= alterReq.replica || alterReq.learnerSelfIndex >= alterReq.learnerReplica) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
dError("vgId:%d, failed to alter replica since invalid msg", vgId); dError("vgId:%d, failed to alter replica since invalid msg", vgId);
@ -584,8 +581,7 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SReplica *pReplica = NULL; SReplica *pReplica = NULL;
if (alterReq.selfIndex != -1) { if (alterReq.selfIndex != -1) {
pReplica = &alterReq.replicas[alterReq.selfIndex]; pReplica = &alterReq.replicas[alterReq.selfIndex];
} } else {
else{
pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex]; pReplica = &alterReq.learnerReplicas[alterReq.learnerSelfIndex];
} }
@ -641,7 +637,8 @@ int32_t vmProcessAlterVnodeReplicaReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
dInfo("vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d " dInfo(
"vgId:%d, vnode management handle msgType:%s, end to alter vnode replica:%d selfIndex:%d leanerReplica:%d "
"learnerSelfIndex:%d strict:%d", "learnerSelfIndex:%d strict:%d",
vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica, vgId, TMSG_INFO(pMsg->msgType), alterReq.replica, alterReq.selfIndex, alterReq.learnerReplica,
alterReq.learnerSelfIndex, alterReq.strict); alterReq.learnerSelfIndex, alterReq.strict);

View File

@ -208,7 +208,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
if (pVnode->dropped) { if (pVnode->dropped) {
dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped); dInfo("vgId:%d, vnode is destroyed, dropped:%d", pVnode->vgId, pVnode->dropped);
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pVnode->vgId);
vnodeDestroy(path, pMgmt->pTfs); vnodeDestroy(pVnode->vgId, path, pMgmt->pTfs);
} }
taosMemoryFree(pVnode->path); taosMemoryFree(pVnode->path);

View File

@ -33,8 +33,8 @@
#include "tmsg.h" #include "tmsg.h"
#include "trow.h" #include "trow.h"
#include "tdb.h"
#include "storageapi.h" #include "storageapi.h"
#include "tdb.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -57,7 +57,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
int32_t diskPrimary, STfs *pTfs); int32_t diskPrimary, STfs *pTfs);
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId,
int32_t diskPrimary, STfs *pTfs); int32_t diskPrimary, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs); void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode); void vnodePreClose(SVnode *pVnode);
void vnodePostClose(SVnode *pVnode); void vnodePostClose(SVnode *pVnode);
@ -80,7 +80,8 @@ ESyncRole vnodeGetRole(SVnode *pVnode);
int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list); int32_t vnodeGetCtbIdList(void *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg); int32_t vnodeGetCtbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg), void *arg);
int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list); int32_t vnodeGetStbIdList(SVnode *pVnode, int64_t suid, SArray *list);
int32_t vnodeGetStbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg, void* arg1), void *arg); int32_t vnodeGetStbIdListByFilter(SVnode *pVnode, int64_t suid, SArray *list, bool (*filter)(void *arg, void *arg1),
void *arg);
void *vnodeGetIdx(void *pVnode); void *vnodeGetIdx(void *pVnode);
void *vnodeGetIvtIdx(void *pVnode); void *vnodeGetIvtIdx(void *pVnode);

View File

@ -27,6 +27,7 @@ extern int8_t tsS3Enabled;
int32_t s3Init(); int32_t s3Init();
void s3CleanUp(); void s3CleanUp();
int32_t s3PutObjectFromFile(const char *file, const char *object); int32_t s3PutObjectFromFile(const char *file, const char *object);
void s3DeleteObjectsByPrefix(const char *prefix);
void s3DeleteObjects(const char *object_name[], int nobject); void s3DeleteObjects(const char *object_name[], int nobject);
bool s3Exists(const char *object_name); bool s3Exists(const char *object_name);
bool s3Get(const char *object_name, const char *path); bool s3Get(const char *object_name, const char *path);

View File

@ -85,6 +85,25 @@ int32_t s3PutObjectFromFile(const char *file_str, const char *object_str) {
return code; return code;
} }
void s3DeleteObjectsByPrefix(const char *prefix_str) {
cos_pool_t *p = NULL;
cos_request_options_t *options = NULL;
int is_cname = 0;
cos_string_t bucket;
cos_status_t *s = NULL;
cos_string_t prefix;
cos_pool_create(&p, NULL);
options = cos_request_options_create(p);
s3InitRequestOptions(options, is_cname);
cos_str_set(&bucket, tsS3BucketName);
cos_str_set(&prefix, prefix_str);
s = cos_delete_objects_by_prefix(options, &bucket, &prefix);
log_status(s);
cos_pool_destroy(p);
}
void s3DeleteObjects(const char *object_name[], int nobject) { void s3DeleteObjects(const char *object_name[], int nobject) {
cos_pool_t *p = NULL; cos_pool_t *p = NULL;
int is_cname = 0; int is_cname = 0;
@ -314,6 +333,7 @@ long s3Size(const char *object_name) {
int32_t s3Init() { return 0; } int32_t s3Init() { return 0; }
void s3CleanUp() {} void s3CleanUp() {}
int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; } int32_t s3PutObjectFromFile(const char *file, const char *object) { return 0; }
void s3DeleteObjectsByPrefix(const char *prefix) {}
void s3DeleteObjects(const char *object_name[], int nobject) {} void s3DeleteObjects(const char *object_name[], int nobject) {}
bool s3Exists(const char *object_name) { return false; } bool s3Exists(const char *object_name) { return false; }
bool s3Get(const char *object_name, const char *path) { return false; } bool s3Get(const char *object_name, const char *path) { return false; }

View File

@ -14,6 +14,7 @@
*/ */
#include "vnd.h" #include "vnd.h"
#include "vndCos.h"
int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) { int32_t vnodeGetPrimaryDir(const char *relPath, int32_t diskPrimary, STfs *pTfs, char *buf, size_t bufLen) {
if (pTfs) { if (pTfs) {
@ -118,8 +119,8 @@ int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, int32_t
pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex; pCfg->myIndex = pReq->replica + pReq->learnerSelfIndex;
} }
vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", vInfo("vgId:%d, save config while alter, replicas:%d totalReplicas:%d selfIndex:%d", pReq->vgId, pCfg->replicaNum,
pReq->vgId, pCfg->replicaNum, pCfg->totalReplicaNum, pCfg->myIndex); pCfg->totalReplicaNum, pCfg->myIndex);
info.config.syncCfg = *pCfg; info.config.syncCfg = *pCfg;
ret = vnodeSaveInfo(dir, &info); ret = vnodeSaveInfo(dir, &info);
@ -293,9 +294,16 @@ int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t s
return dstVgId; return dstVgId;
} }
void vnodeDestroy(const char *path, STfs *pTfs) { void vnodeDestroy(int32_t vgId, const char *path, STfs *pTfs) {
vInfo("path:%s is removed while destroy vnode", path); vInfo("path:%s is removed while destroy vnode", path);
tfsRmdir(pTfs, path); tfsRmdir(pTfs, path);
int32_t nlevel = tfsGetLevel(pTfs);
if (vgId > 0 && nlevel > 1 && tsS3Enabled) {
char vnode_prefix[TSDB_FILENAME_LEN];
snprintf(vnode_prefix, TSDB_FILENAME_LEN, "v%df", vgId);
s3DeleteObjectsByPrefix(vnode_prefix);
}
} }
static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) { static int32_t vnodeCheckDisk(int32_t diskPrimary, STfs *pTfs) {
@ -497,13 +505,9 @@ void vnodeClose(SVnode *pVnode) {
// start the sync timer after the queue is ready // start the sync timer after the queue is ready
int32_t vnodeStart(SVnode *pVnode) { return vnodeSyncStart(pVnode); } int32_t vnodeStart(SVnode *pVnode) { return vnodeSyncStart(pVnode); }
int32_t vnodeIsCatchUp(SVnode *pVnode){ int32_t vnodeIsCatchUp(SVnode *pVnode) { return syncIsCatchUp(pVnode->sync); }
return syncIsCatchUp(pVnode->sync);
}
ESyncRole vnodeGetRole(SVnode *pVnode){ ESyncRole vnodeGetRole(SVnode *pVnode) { return syncGetRole(pVnode->sync); }
return syncGetRole(pVnode->sync);
}
void vnodeStop(SVnode *pVnode) {} void vnodeStop(SVnode *pVnode) {}