From ffe87632873aaebd106845064411b3f402697a3b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 12 Jan 2022 22:17:40 +0800 Subject: [PATCH 01/17] rename files --- source/libs/tfs/inc/{tfsint.h => tfsint1.h} | 3 + source/libs/tfs/src/{tdisk.c => tfsDisk.c} | 19 ++-- source/libs/tfs/src/{ttier.c => tfsTier.c} | 0 src/inc/tfs.h | 103 -------------------- 4 files changed, 13 insertions(+), 112 deletions(-) rename source/libs/tfs/inc/{tfsint.h => tfsint1.h} (98%) rename source/libs/tfs/src/{tdisk.c => tfsDisk.c} (79%) rename source/libs/tfs/src/{ttier.c => tfsTier.c} (100%) delete mode 100644 src/inc/tfs.h diff --git a/source/libs/tfs/inc/tfsint.h b/source/libs/tfs/inc/tfsint1.h similarity index 98% rename from source/libs/tfs/inc/tfsint.h rename to source/libs/tfs/inc/tfsint1.h index 3c5dccc63b..caa52b5cd6 100644 --- a/source/libs/tfs/inc/tfsint.h +++ b/source/libs/tfs/inc/tfsint1.h @@ -16,6 +16,9 @@ #ifndef TD_TFSINT_H #define TD_TFSINT_H +#include "os.h" + +#include "taoserror.h" #include "tlog.h" #include "tglobal.h" #include "tfs.h" diff --git a/source/libs/tfs/src/tdisk.c b/source/libs/tfs/src/tfsDisk.c similarity index 79% rename from source/libs/tfs/src/tdisk.c rename to source/libs/tfs/src/tfsDisk.c index 22601e48c3..f32069837d 100644 --- a/source/libs/tfs/src/tdisk.c +++ b/source/libs/tfs/src/tfsDisk.c @@ -12,14 +12,12 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "os.h" -#include "taoserror.h" +#define _DEFAULT_SOURCE #include "tfsint.h" -// PROTECTED ==================================== -SDisk *tfsNewDisk(int level, int id, const char *dir) { - SDisk *pDisk = (SDisk *)calloc(1, sizeof(*pDisk)); +SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir) { + SDisk *pDisk = calloc(1, sizeof(SDisk)); if (pDisk == NULL) { terrno = TSDB_CODE_FS_OUT_OF_MEMORY; return NULL; @@ -33,18 +31,21 @@ SDisk *tfsNewDisk(int level, int id, const char *dir) { } SDisk *tfsFreeDisk(SDisk *pDisk) { - if (pDisk) { + if (pDisk != NULL) { free(pDisk); } return NULL; } -int tfsUpdateDiskInfo(SDisk *pDisk) { - ASSERT(pDisk != NULL); +int32_t tfsUpdateDiskInfo(SDisk *pDisk) { + if (pDisk == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } SysDiskSize diskSize = {0}; - int code = taosGetDiskSize(pDisk->dir, &diskSize); + int32_t code = taosGetDiskSize(pDisk->dir, &diskSize); if (code != 0) { fError("failed to update disk information at level %d id %d dir %s since %s", pDisk->level, pDisk->id, pDisk->dir, strerror(errno)); diff --git a/source/libs/tfs/src/ttier.c b/source/libs/tfs/src/tfsTier.c similarity index 100% rename from source/libs/tfs/src/ttier.c rename to source/libs/tfs/src/tfsTier.c diff --git a/src/inc/tfs.h b/src/inc/tfs.h deleted file mode 100644 index 9ad7a8f66e..0000000000 --- a/src/inc/tfs.h +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TD_TFS_H -#define TD_TFS_H - -#include "tglobal.h" - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct { - int level; - int id; -} SDiskID; - -#define TFS_UNDECIDED_LEVEL -1 -#define TFS_UNDECIDED_ID -1 -#define TFS_PRIMARY_LEVEL 0 -#define TFS_PRIMARY_ID 0 -#define TFS_MIN_LEVEL 0 -#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1) - -// FS APIs ==================================== -typedef struct { - int64_t tsize; - int64_t used; - int64_t avail; -} SFSMeta; - -typedef struct { - int64_t size; - int64_t used; - int64_t free; - int16_t nAvailDisks; // # of Available disks -} STierMeta; - -int tfsInit(SDiskCfg *pDiskCfg, int ndisk); -void tfsCleanup(); -void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels); -void tfsGetMeta(SFSMeta *pMeta); -void tfsAllocDisk(int expLevel, int *level, int *id); - -const char *TFS_PRIMARY_PATH(); -const char *TFS_DISK_PATH(int level, int id); - -// TFILE APIs ==================================== -typedef struct { - int level; - int id; - char rname[TSDB_FILENAME_LEN]; // REL name - char aname[TSDB_FILENAME_LEN]; // ABS name -} TFILE; - -#define TFILE_LEVEL(pf) ((pf)->level) -#define TFILE_ID(pf) ((pf)->id) -#define TFILE_NAME(pf) ((pf)->aname) -#define TFILE_REL_NAME(pf) ((pf)->rname) - -#define tfsopen(pf, flags) open(TFILE_NAME(pf), flags) -#define tfsclose(fd) close(fd) -#define tfsremove(pf) remove(TFILE_NAME(pf)) -#define tfscopy(sf, df) taosCopy(TFILE_NAME(sf), TFILE_NAME(df)) -#define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df)) - -void tfsInitFile(TFILE *pf, int level, int id, const char *bname); -bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2); -int tfsEncodeFile(void **buf, TFILE *pf); -void *tfsDecodeFile(void *buf, TFILE *pf); -void tfsbasename(const TFILE *pf, char *dest); -void tfsdirname(const TFILE *pf, char *dest); - -// DIR APIs ==================================== -int tfsMkdirAt(const char *rname, int level, int id); -int tfsMkdirRecurAt(const char *rname, int level, int id); -int tfsMkdir(const char *rname); -int tfsRmdir(const char *rname); -int tfsRename(char *orname, char *nrname); - -typedef struct TDIR TDIR; - -TDIR * tfsOpendir(const char *rname); -const TFILE *tfsReaddir(TDIR *tdir); -void tfsClosedir(TDIR *tdir); - -#ifdef __cplusplus -} -#endif - -#endif From ba42f8cd93fdacfe900c1c44bce2ac9011ac797e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 12 Jan 2022 22:20:29 +0800 Subject: [PATCH 02/17] rename files --- source/libs/tfs/inc/{tfsint1.h => tfsInt.h} | 12 +++++------- source/libs/tfs/src/tfs.c | 9 ++------- source/libs/tfs/src/tfsDisk.c | 2 +- source/libs/tfs/src/tfsTier.c | 6 ++---- 4 files changed, 10 insertions(+), 19 deletions(-) rename source/libs/tfs/inc/{tfsint1.h => tfsInt.h} (98%) diff --git a/source/libs/tfs/inc/tfsint1.h b/source/libs/tfs/inc/tfsInt.h similarity index 98% rename from source/libs/tfs/inc/tfsint1.h rename to source/libs/tfs/inc/tfsInt.h index caa52b5cd6..40eb61ba07 100644 --- a/source/libs/tfs/inc/tfsint1.h +++ b/source/libs/tfs/inc/tfsInt.h @@ -18,15 +18,13 @@ #include "os.h" +#include "taosdef.h" #include "taoserror.h" -#include "tlog.h" -#include "tglobal.h" -#include "tfs.h" #include "tcoding.h" - -#ifdef __cplusplus -extern "C" { -#endif +#include "tfs.h" +#include "tglobal.h" +#include "thash.h" +#include "tlog.h" extern int fsDebugFlag; diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 88d6d587a7..a054981961 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -13,13 +13,8 @@ * along with this program. If not, see . */ -#include "os.h" - -#include "taosdef.h" -#include "taoserror.h" -#include "tfs.h" -#include "tfsint.h" -#include "thash.h" +#define _DEFAULT_SOURCE +#include "tfsInt.h" #define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32) diff --git a/source/libs/tfs/src/tfsDisk.c b/source/libs/tfs/src/tfsDisk.c index f32069837d..98656c84e6 100644 --- a/source/libs/tfs/src/tfsDisk.c +++ b/source/libs/tfs/src/tfsDisk.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "tfsint.h" +#include "tfsInt.h" SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir) { SDisk *pDisk = calloc(1, sizeof(SDisk)); diff --git a/source/libs/tfs/src/tfsTier.c b/source/libs/tfs/src/tfsTier.c index 3b19797acf..90057e61d5 100644 --- a/source/libs/tfs/src/tfsTier.c +++ b/source/libs/tfs/src/tfsTier.c @@ -12,11 +12,9 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "os.h" -#include "taosdef.h" -#include "taoserror.h" -#include "tfsint.h" +#define _DEFAULT_SOURCE +#include "tfsInt.h" #define tfsLockTier(pTier) pthread_spin_lock(&((pTier)->lock)) #define tfsUnLockTier(pTier) pthread_spin_unlock(&((pTier)->lock)) From 9e4141fa6d517fe32b6398015799d05f2e932f62 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 12 Jan 2022 22:57:38 +0800 Subject: [PATCH 03/17] add error codes --- include/util/taoserror.h | 2 +- source/libs/tfs/inc/tfsInt.h | 27 ++++++------- source/libs/tfs/src/tfsDisk.c | 13 +++--- source/libs/tfs/src/tfsTier.c | 75 ++++++++++++++++++++--------------- source/util/src/terror.c | 2 +- 5 files changed, 64 insertions(+), 55 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 854d16f67d..4f424111a8 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -406,7 +406,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) //"WAL out of memory") // tfs -#define TSDB_CODE_FS_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory") +#define TSDB_CODE_FS_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory") #define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201) //"tfs invalid mount config") #define TSDB_CODE_FS_TOO_MANY_MOUNT TAOS_DEF_ERROR_CODE(0, 0x2202) //"tfs too many mount") #define TSDB_CODE_FS_DUP_PRIMARY TAOS_DEF_ERROR_CODE(0, 0x2203) //"tfs duplicate primary mount") diff --git a/source/libs/tfs/inc/tfsInt.h b/source/libs/tfs/inc/tfsInt.h index 40eb61ba07..4e53fcdb8e 100644 --- a/source/libs/tfs/inc/tfsInt.h +++ b/source/libs/tfs/inc/tfsInt.h @@ -26,7 +26,7 @@ #include "thash.h" #include "tlog.h" -extern int fsDebugFlag; +extern int32_t fsDebugFlag; // For debug purpose #define fFatal(...) { if (fsDebugFlag & DEBUG_FATAL) { taosPrintLog("TFS FATAL ", 255, __VA_ARGS__); }} @@ -47,8 +47,8 @@ typedef struct { } SDiskMeta; typedef struct SDisk { - int level; - int id; + int32_t level; + int32_t id; char dir[TSDB_FILENAME_LEN]; SDiskMeta dmeta; } SDisk; @@ -61,19 +61,19 @@ typedef struct SDisk { #define DISK_USED_SIZE(pd) ((pd)->dmeta.used) #define DISK_FREE_SIZE(pd) ((pd)->dmeta.free) -SDisk *tfsNewDisk(int level, int id, const char *dir); -SDisk *tfsFreeDisk(SDisk *pDisk); -int tfsUpdateDiskInfo(SDisk *pDisk); +SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir); +SDisk *tfsFreeDisk(SDisk *pDisk); +int32_t tfsUpdateDiskInfo(SDisk *pDisk); // ttier.c ====================================================== typedef struct STier { pthread_spinlock_t lock; - int level; + int32_t level; int16_t ndisk; // # of disks mounted to this tier int16_t nextid; // next disk id to allocate STierMeta tmeta; - SDisk * disks[TSDB_MAX_DISKS_PER_TIER]; + SDisk *disks[TSDB_MAX_DISKS_PER_TIER]; } STier; #define TIER_LEVEL(pt) ((pt)->level) @@ -83,12 +83,11 @@ typedef struct STier { #define TIER_AVAIL_DISKS(pt) ((pt)->tmeta.nAvailDisks) #define DISK_AT_TIER(pt, id) ((pt)->disks[id]) -int tfsInitTier(STier *pTier, int level); -void tfsDestroyTier(STier *pTier); -SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg); -void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta); -int tfsAllocDiskOnTier(STier *pTier); -void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta); +int32_t tfsInitTier(STier *pTier, int32_t level); +void tfsDestroyTier(STier *pTier); +SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg); +void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta); +int32_t tfsAllocDiskOnTier(STier *pTier); void tfsPosNextId(STier *pTier); #ifdef __cplusplus diff --git a/source/libs/tfs/src/tfsDisk.c b/source/libs/tfs/src/tfsDisk.c index 98656c84e6..dd4f349c42 100644 --- a/source/libs/tfs/src/tfsDisk.c +++ b/source/libs/tfs/src/tfsDisk.c @@ -19,7 +19,7 @@ SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir) { SDisk *pDisk = calloc(1, sizeof(SDisk)); if (pDisk == NULL) { - terrno = TSDB_CODE_FS_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -34,6 +34,7 @@ SDisk *tfsFreeDisk(SDisk *pDisk) { if (pDisk != NULL) { free(pDisk); } + return NULL; } @@ -44,17 +45,15 @@ int32_t tfsUpdateDiskInfo(SDisk *pDisk) { } SysDiskSize diskSize = {0}; - - int32_t code = taosGetDiskSize(pDisk->dir, &diskSize); - if (code != 0) { + if (taosGetDiskSize(pDisk->dir, &diskSize) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); fError("failed to update disk information at level %d id %d dir %s since %s", pDisk->level, pDisk->id, pDisk->dir, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + return -1 } pDisk->dmeta.size = diskSize.tsize; pDisk->dmeta.used = diskSize.used; pDisk->dmeta.free = diskSize.avail; - - return code; + return 0; } diff --git a/source/libs/tfs/src/tfsTier.c b/source/libs/tfs/src/tfsTier.c index 90057e61d5..8b057d8755 100644 --- a/source/libs/tfs/src/tfsTier.c +++ b/source/libs/tfs/src/tfsTier.c @@ -16,45 +16,50 @@ #define _DEFAULT_SOURCE #include "tfsInt.h" -#define tfsLockTier(pTier) pthread_spin_lock(&((pTier)->lock)) -#define tfsUnLockTier(pTier) pthread_spin_unlock(&((pTier)->lock)) +#define tfsLockTier(pTier) pthread_spin_lock(&(pTier)->lock) +#define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock) -// PROTECTED ========================================== -int tfsInitTier(STier *pTier, int level) { - memset((void *)pTier, 0, sizeof(*pTier)); +int32_t tfsInitTier(STier *pTier, int32_t level) { + if (pTier == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } - int code = pthread_spin_init(&(pTier->lock), 0); - if (code) { + memset(pTier, 0, sizeof(STier)); + + int32_t code = pthread_spin_init(&pTier->lock, 0); + if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); return -1; } pTier->level = level; - return 0; } void tfsDestroyTier(STier *pTier) { - for (int id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) { + if (pTier == NULL) return; + + for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) { DISK_AT_TIER(pTier, id) = tfsFreeDisk(DISK_AT_TIER(pTier, id)); } pTier->ndisk = 0; - pthread_spin_destroy(&(pTier->lock)); } SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) { - ASSERT(pTier->level == pCfg->level); - - int id = 0; - SDisk *pDisk; + if (pTier == NULL || pCfg == NULL || pTier->level != pCfg->level) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } if (TIER_NDISKS(pTier) >= TSDB_MAX_DISKS_PER_TIER) { terrno = TSDB_CODE_FS_TOO_MANY_MOUNT; return NULL; } + int32_t id = 0; if (pTier->level == 0) { if (DISK_AT_TIER(pTier, 0) != NULL) { id = pTier->ndisk; @@ -73,30 +78,31 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) { id = pTier->ndisk; } - pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir); + SDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir); if (pDisk == NULL) return NULL; + DISK_AT_TIER(pTier, id) = pDisk; pTier->ndisk++; fInfo("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id); - return DISK_AT_TIER(pTier, id); } void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta) { - STierMeta tmeta; + STierMeta tmeta = {0}; if (pTierMeta == NULL) { pTierMeta = &tmeta; } - memset(pTierMeta, 0, sizeof(*pTierMeta)); + memset(pTierMeta, 0, sizeof(STierMeta)); tfsLockTier(pTier); - for (int id = 0; id < pTier->ndisk; id++) { - if (tfsUpdateDiskInfo(DISK_AT_TIER(pTier, id)) < 0) { + for (int32_t id = 0; id < pTier->ndisk; id++) { + if (tfsUpdateDiskInfo(DISK_AT_TIER(pTier, id)) != 0) { continue; } + pTierMeta->size += DISK_SIZE(DISK_AT_TIER(pTier, id)); pTierMeta->used += DISK_USED_SIZE(DISK_AT_TIER(pTier, id)); pTierMeta->free += DISK_FREE_SIZE(DISK_AT_TIER(pTier, id)); @@ -109,22 +115,26 @@ void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta) { } // Round-Robin to allocate disk on a tier -int tfsAllocDiskOnTier(STier *pTier) { - ASSERT(pTier->ndisk > 0); - int id = TFS_UNDECIDED_ID; - SDisk *pDisk; +int32_t tfsAllocDiskOnTier(STier *pTier) { + if (pTier == NULL || pTier->ndisk <= 0) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } tfsLockTier(pTier); if (TIER_AVAIL_DISKS(pTier) <= 0) { tfsUnLockTier(pTier); - return id; + return TFS_UNDECIDED_ID; } - id = pTier->nextid; + int32_t id = pTier->nextid; while (true) { - pDisk = DISK_AT_TIER(pTier, id); - ASSERT(pDisk != NULL); + SDisk *pDisk = DISK_AT_TIER(pTier, id); + if (pDisk == NULL) { + tfsUnLockTier(pTier); + return TFS_UNDECIDED_ID; + } if (DISK_FREE_SIZE(pDisk) < TFS_MIN_DISK_FREE_SIZE) { id = (id + 1) % pTier->ndisk; @@ -145,7 +155,7 @@ int tfsAllocDiskOnTier(STier *pTier) { } void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta) { - ASSERT(pTierMeta != NULL); + if (pTierMeta == NULL || pTierMeta == NULL) return; tfsLockTier(pTier); *pTierMeta = pTier->tmeta; @@ -153,10 +163,11 @@ void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta) { } void tfsPosNextId(STier *pTier) { - ASSERT(pTier->ndisk > 0); - int nextid = 0; + if (pTier == NULL || pTier->ndisk <= 0) return; - for (int id = 1; id < pTier->ndisk; id++) { + int32_t nextid = 0; + + for (int32_t id = 1; id < pTier->ndisk; id++) { SDisk *pLDisk = DISK_AT_TIER(pTier, nextid); SDisk *pDisk = DISK_AT_TIER(pTier, id); if (DISK_FREE_SIZE(pDisk) > TFS_MIN_DISK_FREE_SIZE && DISK_FREE_SIZE(pDisk) > DISK_FREE_SIZE(pLDisk)) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8294bca959..6d0547c7ae 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -387,7 +387,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit") // tfs -TAOS_DEFINE_ERROR(TSDB_CODE_FS_OUT_OF_MEMORY, "tfs out of memory") +TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config") TAOS_DEFINE_ERROR(TSDB_CODE_FS_TOO_MANY_MOUNT, "tfs too many mount") TAOS_DEFINE_ERROR(TSDB_CODE_FS_DUP_PRIMARY, "tfs duplicate primary mount") From 7087e4427b90e5d51c42a42f18bff378b0517187 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 12 Jan 2022 23:15:12 +0800 Subject: [PATCH 04/17] minor changes --- include/libs/tfs/tfs.h | 54 ++++++++++---------- source/libs/tfs/inc/tfsInt.h | 8 +-- source/libs/tfs/src/tfs.c | 98 ++++++++++++++++++------------------ 3 files changed, 80 insertions(+), 80 deletions(-) diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index 793c861363..973231c264 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TD_TFS_H -#define TD_TFS_H +#ifndef _TD_TFS_H_ +#define _TD_TFS_H_ #include "tglobal.h" @@ -23,8 +23,8 @@ extern "C" { #endif typedef struct { - int level; - int id; + int32_t level; + int32_t id; } SDiskID; #define TFS_UNDECIDED_LEVEL -1 @@ -48,21 +48,21 @@ typedef struct { int16_t nAvailDisks; // # of Available disks } STierMeta; -int tfsInit(SDiskCfg *pDiskCfg, int ndisk); -void tfsCleanup(); -void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels); -void tfsGetMeta(SFSMeta *pMeta); -void tfsAllocDisk(int expLevel, int *level, int *id); +int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk); +void tfsCleanup(); +void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels); +void tfsGetMeta(SFSMeta *pMeta); +void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id); const char *TFS_PRIMARY_PATH(); -const char *TFS_DISK_PATH(int level, int id); +const char *TFS_DISK_PATH(int32_t level, int32_t id); // TFILE APIs ==================================== typedef struct { - int level; - int id; - char rname[TSDB_FILENAME_LEN]; // REL name - char aname[TSDB_FILENAME_LEN]; // ABS name + int32_t level; + int32_t id; + char rname[TSDB_FILENAME_LEN]; // REL name + char aname[TSDB_FILENAME_LEN]; // ABS name } TFILE; #define TFILE_LEVEL(pf) ((pf)->level) @@ -76,23 +76,23 @@ typedef struct { #define tfscopy(sf, df) taosCopyFile(TFILE_NAME(sf), TFILE_NAME(df)) #define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df)) -void tfsInitFile(TFILE *pf, int level, int id, const char *bname); -bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2); -int tfsEncodeFile(void **buf, TFILE *pf); -void *tfsDecodeFile(void *buf, TFILE *pf); -void tfsbasename(const TFILE *pf, char *dest); -void tfsdirname(const TFILE *pf, char *dest); +void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname); +bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2); +int32_t tfsEncodeFile(void **buf, TFILE *pf); +void *tfsDecodeFile(void *buf, TFILE *pf); +void tfsbasename(const TFILE *pf, char *dest); +void tfsdirname(const TFILE *pf, char *dest); // DIR APIs ==================================== -int tfsMkdirAt(const char *rname, int level, int id); -int tfsMkdirRecurAt(const char *rname, int level, int id); -int tfsMkdir(const char *rname); -int tfsRmdir(const char *rname); -int tfsRename(char *orname, char *nrname); +int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id); +int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id); +int32_t tfsMkdir(const char *rname); +int32_t tfsRmdir(const char *rname); +int32_t tfsRename(char *orname, char *nrname); typedef struct TDIR TDIR; -TDIR * tfsOpendir(const char *rname); +TDIR *tfsOpendir(const char *rname); const TFILE *tfsReaddir(TDIR *tdir); void tfsClosedir(TDIR *tdir); @@ -100,4 +100,4 @@ void tfsClosedir(TDIR *tdir); } #endif -#endif +#endif /*_TD_TFS_H_*/ diff --git a/source/libs/tfs/inc/tfsInt.h b/source/libs/tfs/inc/tfsInt.h index 4e53fcdb8e..bfa4c4380d 100644 --- a/source/libs/tfs/inc/tfsInt.h +++ b/source/libs/tfs/inc/tfsInt.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TD_TFSINT_H -#define TD_TFSINT_H +#ifndef _TD_TFSINT_H_ +#define _TD_TFSINT_H_ #include "os.h" @@ -88,10 +88,10 @@ void tfsDestroyTier(STier *pTier); SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg); void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta); int32_t tfsAllocDiskOnTier(STier *pTier); -void tfsPosNextId(STier *pTier); +void tfsPosNextId(STier *pTier); #ifdef __cplusplus } #endif -#endif +#endif /*_TD_TFSINT_H_*/ diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index a054981961..bbeb66fe05 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -21,9 +21,9 @@ typedef struct { pthread_spinlock_t lock; SFSMeta meta; - int nlevel; + int32_t nlevel; STier tiers[TSDB_MAX_TIERS]; - SHashObj * map; // name to did map + SHashObj *map; // name to did map } SFS; typedef struct { @@ -47,21 +47,21 @@ static SFS tfs = {0}; static SFS *pfs = &tfs; // STATIC DECLARATION -static int tfsMount(SDiskCfg *pCfg); -static int tfsCheck(); -static int tfsCheckAndFormatCfg(SDiskCfg *pCfg); -static int tfsFormatDir(char *idir, char *odir); -static SDisk *tfsGetDiskByID(SDiskID did); -static SDisk *tfsGetDiskByName(const char *dir); -static int tfsOpendirImpl(TDIR *tdir); -static void tfsInitDiskIter(SDiskIter *pIter); -static SDisk *tfsNextDisk(SDiskIter *pIter); +static int32_t tfsMount(SDiskCfg *pCfg); +static int32_t tfsCheck(); +static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg); +static int32_t tfsFormatDir(char *idir, char *odir); +static SDisk *tfsGetDiskByID(SDiskID did); +static SDisk *tfsGetDiskByName(const char *dir); +static int32_t tfsOpendirImpl(TDIR *tdir); +static void tfsInitDiskIter(SDiskIter *pIter); +static SDisk *tfsNextDisk(SDiskIter *pIter); // FS APIs ==================================== -int tfsInit(SDiskCfg *pDiskCfg, int ndisk) { +int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) { ASSERT(ndisk > 0); - for (int level = 0; level < TSDB_MAX_TIERS; level++) { + for (int32_t level = 0; level < TSDB_MAX_TIERS; level++) { if (tfsInitTier(TFS_TIER_AT(level), level) < 0) { while (true) { level--; @@ -84,7 +84,7 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) { return -1; } - for (int idisk = 0; idisk < ndisk; idisk++) { + for (int32_t idisk = 0; idisk < ndisk; idisk++) { if (tfsMount(pDiskCfg + idisk) < 0) { tfsCleanup(); return -1; @@ -97,7 +97,7 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) { } tfsUpdateInfo(NULL, NULL, 0); - for (int level = 0; level < TFS_NLEVEL(); level++) { + for (int32_t level = 0; level < TFS_NLEVEL(); level++) { tfsPosNextId(TFS_TIER_AT(level)); } @@ -109,7 +109,7 @@ void tfsCleanup() { pfs->map = NULL; pthread_spin_destroy(&(pfs->lock)); - for (int level = 0; level < TFS_NLEVEL(); level++) { + for (int32_t level = 0; level < TFS_NLEVEL(); level++) { tfsDestroyTier(TFS_TIER_AT(level)); } } @@ -124,7 +124,7 @@ void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) { memset(pFSMeta, 0, sizeof(*pFSMeta)); - for (int level = 0; level < TFS_NLEVEL(); level++) { + for (int32_t level = 0; level < TFS_NLEVEL(); level++) { STierMeta *pTierMeta = &tierMeta; if (tierMetas && level < numTiers) { pTierMeta = tierMetas + level; @@ -152,7 +152,7 @@ void tfsGetMeta(SFSMeta *pMeta) { /* Allocate an existing available tier level */ -void tfsAllocDisk(int expLevel, int *level, int *id) { +void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id) { ASSERT(expLevel >= 0); *level = expLevel; @@ -177,10 +177,10 @@ void tfsAllocDisk(int expLevel, int *level, int *id) { } const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); } -const char *TFS_DISK_PATH(int level, int id) { return DISK_DIR(TFS_DISK_AT(level, id)); } +const char *TFS_DISK_PATH(int32_t level, int32_t id) { return DISK_DIR(TFS_DISK_AT(level, id)); } // TFILE APIs ==================================== -void tfsInitFile(TFILE *pf, int level, int id, const char *bname) { +void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname) { ASSERT(TFS_IS_VALID_DISK(level, id)); SDisk *pDisk = TFS_DISK_AT(level, id); @@ -203,8 +203,8 @@ bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2) { return true; } -int tfsEncodeFile(void **buf, TFILE *pf) { - int tlen = 0; +int32_t tfsEncodeFile(void **buf, TFILE *pf) { + int32_t tlen = 0; tlen += taosEncodeVariantI32(buf, pf->level); tlen += taosEncodeVariantI32(buf, pf->id); @@ -215,7 +215,7 @@ int tfsEncodeFile(void **buf, TFILE *pf) { void *tfsDecodeFile(void *buf, TFILE *pf) { int32_t level, id; - char * rname; + char *rname; buf = taosDecodeVariantI32(buf, &(level)); buf = taosDecodeVariantI32(buf, &(id)); @@ -242,7 +242,7 @@ void tfsdirname(const TFILE *pf, char *dest) { } // DIR APIs ==================================== -int tfsMkdirAt(const char *rname, int level, int id) { +int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id) { SDisk *pDisk = TFS_DISK_AT(level, id); char aname[TMPNAME_LEN]; @@ -255,7 +255,7 @@ int tfsMkdirAt(const char *rname, int level, int id) { return 0; } -int tfsMkdirRecurAt(const char *rname, int level, int id) { +int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id) { if (tfsMkdirAt(rname, level, id) < 0) { if (errno == ENOENT) { // Try to create upper @@ -288,10 +288,10 @@ int tfsMkdirRecurAt(const char *rname, int level, int id) { return 0; } -int tfsMkdir(const char *rname) { - for (int level = 0; level < TFS_NLEVEL(); level++) { +int32_t tfsMkdir(const char *rname) { + for (int32_t level = 0; level < TFS_NLEVEL(); level++) { STier *pTier = TFS_TIER_AT(level); - for (int id = 0; id < TIER_NDISKS(pTier); id++) { + for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) { if (tfsMkdirAt(rname, level, id) < 0) { return -1; } @@ -301,12 +301,12 @@ int tfsMkdir(const char *rname) { return 0; } -int tfsRmdir(const char *rname) { +int32_t tfsRmdir(const char *rname) { char aname[TMPNAME_LEN] = "\0"; - for (int level = 0; level < TFS_NLEVEL(); level++) { + for (int32_t level = 0; level < TFS_NLEVEL(); level++) { STier *pTier = TFS_TIER_AT(level); - for (int id = 0; id < TIER_NDISKS(pTier); id++) { + for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) { SDisk *pDisk = DISK_AT_TIER(pTier, id); snprintf(aname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), rname); @@ -318,13 +318,13 @@ int tfsRmdir(const char *rname) { return 0; } -int tfsRename(char *orname, char *nrname) { +int32_t tfsRename(char *orname, char *nrname) { char oaname[TMPNAME_LEN] = "\0"; char naname[TMPNAME_LEN] = "\0"; - for (int level = 0; level < pfs->nlevel; level++) { + for (int32_t level = 0; level < pfs->nlevel; level++) { STier *pTier = TFS_TIER_AT(level); - for (int id = 0; id < TIER_NDISKS(pTier); id++) { + for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) { SDisk *pDisk = DISK_AT_TIER(pTier, id); snprintf(oaname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), orname); @@ -339,11 +339,11 @@ int tfsRename(char *orname, char *nrname) { struct TDIR { SDiskIter iter; - int level; - int id; + int32_t level; + int32_t id; char dirname[TSDB_FILENAME_LEN]; TFILE tfile; - DIR * dir; + DIR *dir; }; TDIR *tfsOpendir(const char *rname) { @@ -402,9 +402,9 @@ void tfsClosedir(TDIR *tdir) { } // private -static int tfsMount(SDiskCfg *pCfg) { +static int32_t tfsMount(SDiskCfg *pCfg) { SDiskID did; - SDisk * pDisk = NULL; + SDisk *pDisk = NULL; if (tfsCheckAndFormatCfg(pCfg) < 0) return -1; @@ -422,7 +422,7 @@ static int tfsMount(SDiskCfg *pCfg) { return 0; } -static int tfsCheckAndFormatCfg(SDiskCfg *pCfg) { +static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg) { char dirName[TSDB_FILENAME_LEN] = "\0"; struct stat pstat; @@ -481,10 +481,10 @@ static int tfsCheckAndFormatCfg(SDiskCfg *pCfg) { return 0; } -static int tfsFormatDir(char *idir, char *odir) { +static int32_t tfsFormatDir(char *idir, char *odir) { wordexp_t wep = {0}; - int code = wordexp(idir, &wep, 0); + int32_t code = wordexp(idir, &wep, 0); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); return -1; @@ -502,14 +502,14 @@ static int tfsFormatDir(char *idir, char *odir) { return 0; } -static int tfsCheck() { +static int32_t tfsCheck() { if (TFS_PRIMARY_DISK() == NULL) { fError("no primary disk is set"); terrno = TSDB_CODE_FS_NO_PRIMARY_DISK; return -1; } - for (int level = 0; level < TFS_NLEVEL(); level++) { + for (int32_t level = 0; level < TFS_NLEVEL(); level++) { if (TIER_NDISKS(TFS_TIER_AT(level)) == 0) { fError("no disk at level %d", level); terrno = TSDB_CODE_FS_NO_MOUNT_AT_TIER; @@ -523,8 +523,8 @@ static int tfsCheck() { static SDisk *tfsGetDiskByID(SDiskID did) { return TFS_DISK_AT(did.level, did.id); } static SDisk *tfsGetDiskByName(const char *dir) { SDiskID did; - SDisk * pDisk = NULL; - void * pr = NULL; + SDisk *pDisk = NULL; + void *pr = NULL; pr = taosHashGet(pfs->map, (void *)dir, strnlen(dir, TSDB_FILENAME_LEN)); if (pr == NULL) return NULL; @@ -536,7 +536,7 @@ static SDisk *tfsGetDiskByName(const char *dir) { return pDisk; } -static int tfsOpendirImpl(TDIR *tdir) { +static int32_t tfsOpendirImpl(TDIR *tdir) { SDisk *pDisk = NULL; char adir[TMPNAME_LEN * 2] = "\0"; @@ -567,8 +567,8 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) { if (pDisk == NULL) return NULL; - int level = DISK_LEVEL(pDisk); - int id = DISK_ID(pDisk); + int32_t level = DISK_LEVEL(pDisk); + int32_t id = DISK_ID(pDisk); id++; if (id < TIER_NDISKS(TFS_TIER_AT(level))) { From 0c5a85d463d011295bc1486e62d2537f618c2893 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 17 Jan 2022 13:58:43 +0800 Subject: [PATCH 05/17] minor changes --- source/libs/tfs/src/tfs.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index bbeb66fe05..87adba6bea 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -79,7 +79,7 @@ int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) { pfs->map = taosHashInit(TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (pfs->map == NULL) { - terrno = TSDB_CODE_FS_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; tfsCleanup(); return -1; } @@ -318,6 +318,7 @@ int32_t tfsRmdir(const char *rname) { return 0; } +#if 0 int32_t tfsRename(char *orname, char *nrname) { char oaname[TMPNAME_LEN] = "\0"; char naname[TMPNAME_LEN] = "\0"; @@ -336,6 +337,7 @@ int32_t tfsRename(char *orname, char *nrname) { return 0; } +#endif struct TDIR { SDiskIter iter; From d6359f3ab11b7001af21b147dbcd622b6f1739b2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 17 Jan 2022 23:20:11 -0800 Subject: [PATCH 06/17] refact tier and disk in tfs module --- include/libs/tfs/tfs.h | 12 +--- include/os/osSysinfo.h | 6 +- source/libs/tfs/inc/tfsInt.h | 47 +++++-------- source/libs/tfs/src/tfs.c | 64 ++++++++---------- source/libs/tfs/src/tfsDisk.c | 31 ++++----- source/libs/tfs/src/tfsTier.c | 122 ++++++++++++---------------------- source/os/src/osSysinfo.c | 9 ++- 7 files changed, 109 insertions(+), 182 deletions(-) diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index 973231c264..3828a93144 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -36,22 +36,14 @@ typedef struct { // FS APIs ==================================== typedef struct { - int64_t tsize; + int64_t total; int64_t used; int64_t avail; } SFSMeta; -typedef struct { - int64_t size; - int64_t used; - int64_t free; - int16_t nAvailDisks; // # of Available disks -} STierMeta; - int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk); void tfsCleanup(); -void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels); -void tfsGetMeta(SFSMeta *pMeta); +void tfsUpdateSize(SFSMeta *pFSMeta); void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id); const char *TFS_PRIMARY_PATH(); diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 36ff4194d8..9dcb075489 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -35,12 +35,12 @@ extern char tsLocale[]; extern char tsCharset[]; // default encode string typedef struct { - int64_t tsize; + int64_t total; int64_t used; int64_t avail; -} SysDiskSize; +} SDiskSize; -int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize); +int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize); int32_t taosGetCpuCores(); void taosGetSystemInfo(); bool taosReadProcIO(int64_t *rchars, int64_t *wchars); diff --git a/source/libs/tfs/inc/tfsInt.h b/source/libs/tfs/inc/tfsInt.h index bfa4c4380d..ce1436eb29 100644 --- a/source/libs/tfs/inc/tfsInt.h +++ b/source/libs/tfs/inc/tfsInt.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_TFSINT_H_ -#define _TD_TFSINT_H_ +#ifndef _TD_TFS_INT_H_ +#define _TD_TFS_INT_H_ #include "os.h" @@ -39,54 +39,39 @@ extern int32_t fsDebugFlag; // Global Definitions #define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024 -// tdisk.c ====================================================== -typedef struct { - int64_t size; - int64_t used; - int64_t free; -} SDiskMeta; - typedef struct SDisk { int32_t level; int32_t id; - char dir[TSDB_FILENAME_LEN]; - SDiskMeta dmeta; + char *path; + SDiskSize size; } SDisk; -#define DISK_LEVEL(pd) ((pd)->level) -#define DISK_ID(pd) ((pd)->id) -#define DISK_DIR(pd) ((pd)->dir) -#define DISK_META(pd) ((pd)->dmeta) -#define DISK_SIZE(pd) ((pd)->dmeta.size) -#define DISK_USED_SIZE(pd) ((pd)->dmeta.used) -#define DISK_FREE_SIZE(pd) ((pd)->dmeta.free) - -SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir); -SDisk *tfsFreeDisk(SDisk *pDisk); -int32_t tfsUpdateDiskInfo(SDisk *pDisk); - -// ttier.c ====================================================== - typedef struct STier { pthread_spinlock_t lock; int32_t level; - int16_t ndisk; // # of disks mounted to this tier - int16_t nextid; // next disk id to allocate - STierMeta tmeta; + int16_t nextid; // next disk id to allocate + int16_t ndisk; // # of disks mounted to this tier + int16_t nAvailDisks; // # of Available disks SDisk *disks[TSDB_MAX_DISKS_PER_TIER]; + SDiskSize size; } STier; #define TIER_LEVEL(pt) ((pt)->level) #define TIER_NDISKS(pt) ((pt)->ndisk) #define TIER_SIZE(pt) ((pt)->tmeta.size) #define TIER_FREE_SIZE(pt) ((pt)->tmeta.free) -#define TIER_AVAIL_DISKS(pt) ((pt)->tmeta.nAvailDisks) + #define DISK_AT_TIER(pt, id) ((pt)->disks[id]) +#define DISK_DIR(pd) ((pd)->path) + +SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir); +SDisk *tfsFreeDisk(SDisk *pDisk); +int32_t tfsUpdateDiskSize(SDisk *pDisk); int32_t tfsInitTier(STier *pTier, int32_t level); void tfsDestroyTier(STier *pTier); SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg); -void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta); +void tfsUpdateTierSize(STier *pTier); int32_t tfsAllocDiskOnTier(STier *pTier); void tfsPosNextId(STier *pTier); @@ -94,4 +79,4 @@ void tfsPosNextId(STier *pTier); } #endif -#endif /*_TD_TFSINT_H_*/ +#endif /*_TD_TFS_INT_H_*/ diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 87adba6bea..1d11cd6df2 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -59,7 +59,10 @@ static SDisk *tfsNextDisk(SDiskIter *pIter); // FS APIs ==================================== int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) { - ASSERT(ndisk > 0); + if (ndisk < 0) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } for (int32_t level = 0; level < TSDB_MAX_TIERS; level++) { if (tfsInitTier(TFS_TIER_AT(level), level) < 0) { @@ -96,7 +99,7 @@ int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) { return -1; } - tfsUpdateInfo(NULL, NULL, 0); + tfsUpdateSize(NULL); for (int32_t level = 0; level < TFS_NLEVEL(); level++) { tfsPosNextId(TFS_TIER_AT(level)); } @@ -114,27 +117,22 @@ void tfsCleanup() { } } -void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) { - SFSMeta fsMeta; - STierMeta tierMeta; +void tfsUpdateSize(SFSMeta *pFSMeta) { + SFSMeta fsMeta = {0}; + SDiskSize size = {0}; if (pFSMeta == NULL) { pFSMeta = &fsMeta; } - memset(pFSMeta, 0, sizeof(*pFSMeta)); + memset(pFSMeta, 0, sizeof(SFSMeta)); for (int32_t level = 0; level < TFS_NLEVEL(); level++) { - STierMeta *pTierMeta = &tierMeta; - if (tierMetas && level < numTiers) { - pTierMeta = tierMetas + level; - } - STier *pTier = TFS_TIER_AT(level); - tfsUpdateTierInfo(pTier, pTierMeta); - pFSMeta->tsize += pTierMeta->size; - pFSMeta->avail += pTierMeta->free; - pFSMeta->used += pTierMeta->used; + tfsUpdateTierSize(pTier); + pFSMeta->total += pTier->size.total; + pFSMeta->avail += pTier->size.avail; + pFSMeta->used += pTier->size.used; } tfsLock(); @@ -142,14 +140,6 @@ void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) { tfsUnLock(); } -void tfsGetMeta(SFSMeta *pMeta) { - ASSERT(pMeta); - - tfsLock(); - *pMeta = pfs->meta; - tfsUnLock(); -} - /* Allocate an existing available tier level */ void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id) { @@ -307,9 +297,9 @@ int32_t tfsRmdir(const char *rname) { for (int32_t level = 0; level < TFS_NLEVEL(); level++) { STier *pTier = TFS_TIER_AT(level); for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) { - SDisk *pDisk = DISK_AT_TIER(pTier, id); + SDisk *pDisk = pTier->disks[id]; - snprintf(aname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), rname); + snprintf(aname, TMPNAME_LEN, "%s%s%s", DISK_DIR(pDisk), TS_PATH_DELIMITER, rname); taosRemoveDir(aname); } @@ -351,7 +341,7 @@ struct TDIR { TDIR *tfsOpendir(const char *rname) { TDIR *tdir = (TDIR *)calloc(1, sizeof(*tdir)); if (tdir == NULL) { - terrno = TSDB_CODE_FS_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -416,7 +406,7 @@ static int32_t tfsMount(SDiskCfg *pCfg) { fError("failed to mount disk %s to level %d since %s", pCfg->dir, pCfg->level, tstrerror(terrno)); return -1; } - did.id = DISK_ID(pDisk); + did.id = pDisk->id; taosHashPut(pfs->map, (void *)(pCfg->dir), strnlen(pCfg->dir, TSDB_FILENAME_LEN), (void *)(&did), sizeof(did)); if (pfs->nlevel < pCfg->level + 1) pfs->nlevel = pCfg->level + 1; @@ -551,10 +541,10 @@ static int32_t tfsOpendirImpl(TDIR *tdir) { pDisk = tfsNextDisk(&(tdir->iter)); if (pDisk == NULL) return 0; - tdir->level = DISK_LEVEL(pDisk); - tdir->id = DISK_ID(pDisk); + tdir->level = pDisk->level; + tdir->id = pDisk->id; - snprintf(adir, TMPNAME_LEN * 2, "%s/%s", DISK_DIR(pDisk), tdir->dirname); + snprintf(adir, TMPNAME_LEN * 2, "%s%s%s", pDisk->path, TS_PATH_DELIMITER,tdir->dirname); tdir->dir = opendir(adir); if (tdir->dir != NULL) break; } @@ -569,8 +559,8 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) { if (pDisk == NULL) return NULL; - int32_t level = DISK_LEVEL(pDisk); - int32_t id = DISK_ID(pDisk); + int32_t level = pDisk->level; + int32_t id = pDisk->id; id++; if (id < TIER_NDISKS(TFS_TIER_AT(level))) { @@ -593,21 +583,21 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) { // OTHER FUNCTIONS =================================== void taosGetDisk() { const double unit = 1024 * 1024 * 1024; - SysDiskSize diskSize; + SDiskSize diskSize; SFSMeta fsMeta; - tfsUpdateInfo(&fsMeta, NULL, 0); - tsTotalDataDirGB = (float)(fsMeta.tsize / unit); + tfsUpdateSize(&fsMeta); + tsTotalDataDirGB = (float)(fsMeta.total / unit); tsUsedDataDirGB = (float)(fsMeta.used / unit); tsAvailDataDirGB = (float)(fsMeta.avail / unit); if (taosGetDiskSize(tsLogDir, &diskSize) == 0) { - tsTotalLogDirGB = (float)(diskSize.tsize / unit); + tsTotalLogDirGB = (float)(diskSize.total / unit); tsAvailLogDirGB = (float)(diskSize.avail / unit); } if (taosGetDiskSize(tsTempDir, &diskSize) == 0) { - tsTotalTmpDirGB = (float)(diskSize.tsize / unit); + tsTotalTmpDirGB = (float)(diskSize.total / unit); tsAvailTmpDirectorySpace = (float)(diskSize.avail / unit); } } diff --git a/source/libs/tfs/src/tfsDisk.c b/source/libs/tfs/src/tfsDisk.c index dd4f349c42..a5ef1121ff 100644 --- a/source/libs/tfs/src/tfsDisk.c +++ b/source/libs/tfs/src/tfsDisk.c @@ -16,44 +16,41 @@ #define _DEFAULT_SOURCE #include "tfsInt.h" -SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir) { +SDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) { SDisk *pDisk = calloc(1, sizeof(SDisk)); if (pDisk == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pDisk->path = strdup(path); + if (pDisk->path == NULL) { + free(pDisk); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + pDisk->level = level; pDisk->id = id; - tstrncpy(pDisk->dir, dir, TSDB_FILENAME_LEN); - + taosGetDiskSize(pDisk->path, &pDisk->size); return pDisk; } SDisk *tfsFreeDisk(SDisk *pDisk) { if (pDisk != NULL) { + free(pDisk->path); free(pDisk); } return NULL; } -int32_t tfsUpdateDiskInfo(SDisk *pDisk) { - if (pDisk == NULL) { - terrno = TSDB_CODE_INVALID_PARA; +int32_t tfsUpdateDiskSize(SDisk *pDisk) { + if (taosGetDiskSize(pDisk->path, &pDisk->size) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + fError("failed to get disk:%s size, level:%d id:%d since %s", pDisk->path, pDisk->level, pDisk->id, terrstr()); return -1; } - SysDiskSize diskSize = {0}; - if (taosGetDiskSize(pDisk->dir, &diskSize) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - fError("failed to update disk information at level %d id %d dir %s since %s", pDisk->level, pDisk->id, pDisk->dir, - strerror(errno)); - return -1 - } - - pDisk->dmeta.size = diskSize.tsize; - pDisk->dmeta.used = diskSize.used; - pDisk->dmeta.free = diskSize.avail; return 0; } diff --git a/source/libs/tfs/src/tfsTier.c b/source/libs/tfs/src/tfsTier.c index 8b057d8755..2a26c23ead 100644 --- a/source/libs/tfs/src/tfsTier.c +++ b/source/libs/tfs/src/tfsTier.c @@ -20,16 +20,10 @@ #define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock) int32_t tfsInitTier(STier *pTier, int32_t level) { - if (pTier == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - memset(pTier, 0, sizeof(STier)); - int32_t code = pthread_spin_init(&pTier->lock, 0); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); + if (pthread_spin_init(&pTier->lock, 0) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -38,10 +32,8 @@ int32_t tfsInitTier(STier *pTier, int32_t level) { } void tfsDestroyTier(STier *pTier) { - if (pTier == NULL) return; - for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) { - DISK_AT_TIER(pTier, id) = tfsFreeDisk(DISK_AT_TIER(pTier, id)); + pTier->disks[id] = tfsFreeDisk(pTier->disks[id]); } pTier->ndisk = 0; @@ -49,19 +41,14 @@ void tfsDestroyTier(STier *pTier) { } SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) { - if (pTier == NULL || pCfg == NULL || pTier->level != pCfg->level) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - - if (TIER_NDISKS(pTier) >= TSDB_MAX_DISKS_PER_TIER) { + if (pTier->ndisk >= TSDB_MAX_DISKS_PER_TIER) { terrno = TSDB_CODE_FS_TOO_MANY_MOUNT; return NULL; } int32_t id = 0; if (pTier->level == 0) { - if (DISK_AT_TIER(pTier, 0) != NULL) { + if (pTier->disks[0] != NULL) { id = pTier->ndisk; } else { if (pCfg->primary) { @@ -69,108 +56,85 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) { } else { id = pTier->ndisk + 1; } - if (id >= TSDB_MAX_DISKS_PER_TIER) { - terrno = TSDB_CODE_FS_TOO_MANY_MOUNT; - return NULL; - } } } else { id = pTier->ndisk; } + if (id >= TSDB_MAX_DISKS_PER_TIER) { + terrno = TSDB_CODE_FS_TOO_MANY_MOUNT; + return NULL; + } + SDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir); if (pDisk == NULL) return NULL; - DISK_AT_TIER(pTier, id) = pDisk; + pTier->disks[id] = pDisk; pTier->ndisk++; fInfo("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id); - return DISK_AT_TIER(pTier, id); + return pTier->disks[id]; } -void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta) { - STierMeta tmeta = {0}; - - if (pTierMeta == NULL) { - pTierMeta = &tmeta; - } - memset(pTierMeta, 0, sizeof(STierMeta)); +void tfsUpdateTierSize(STier *pTier) { + SDiskSize size = {0}; + int16_t nAvailDisks = 0; tfsLockTier(pTier); for (int32_t id = 0; id < pTier->ndisk; id++) { - if (tfsUpdateDiskInfo(DISK_AT_TIER(pTier, id)) != 0) { - continue; - } + SDisk *pDisk = pTier->disks[id]; + if (pDisk == NULL) continue; - pTierMeta->size += DISK_SIZE(DISK_AT_TIER(pTier, id)); - pTierMeta->used += DISK_USED_SIZE(DISK_AT_TIER(pTier, id)); - pTierMeta->free += DISK_FREE_SIZE(DISK_AT_TIER(pTier, id)); - pTierMeta->nAvailDisks++; + size.total += pDisk->size.total; + size.used += pDisk->size.used; + size.avail += pDisk->size.avail; + nAvailDisks++; } - pTier->tmeta = *pTierMeta; + pTier->size = size; + pTier->nAvailDisks = nAvailDisks; tfsUnLockTier(pTier); } // Round-Robin to allocate disk on a tier int32_t tfsAllocDiskOnTier(STier *pTier) { - if (pTier == NULL || pTier->ndisk <= 0) { - terrno = TSDB_CODE_INVALID_PARA; + terrno = TSDB_CODE_FS_NO_VALID_DISK; + + tfsLockTier(pTier); + + if (pTier->ndisk <= 0 || pTier->nAvailDisks <= 0) { + tfsUnLockTier(pTier); return -1; } - tfsLockTier(pTier); + int32_t retId = -1; + for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; ++id) { + int32_t diskId = (pTier->nextid + id) % pTier->ndisk; + SDisk *pDisk = pTier->disks[diskId]; - if (TIER_AVAIL_DISKS(pTier) <= 0) { - tfsUnLockTier(pTier); - return TFS_UNDECIDED_ID; - } + if (pDisk == NULL) continue; - int32_t id = pTier->nextid; - while (true) { - SDisk *pDisk = DISK_AT_TIER(pTier, id); - if (pDisk == NULL) { - tfsUnLockTier(pTier); - return TFS_UNDECIDED_ID; - } + if (pDisk->size.avail < TFS_MIN_DISK_FREE_SIZE) continue; - if (DISK_FREE_SIZE(pDisk) < TFS_MIN_DISK_FREE_SIZE) { - id = (id + 1) % pTier->ndisk; - if (id == pTier->nextid) { - tfsUnLockTier(pTier); - return TFS_UNDECIDED_ID; - } else { - continue; - } - } else { - pTier->nextid = (id + 1) % pTier->ndisk; - break; - } + retId = diskId; + terrno = 0; + pTier->nextid = (diskId + 1) % pTier->ndisk; + break; } tfsUnLockTier(pTier); - return id; -} - -void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta) { - if (pTierMeta == NULL || pTierMeta == NULL) return; - - tfsLockTier(pTier); - *pTierMeta = pTier->tmeta; - tfsUnLockTier(pTier); + return retId; } void tfsPosNextId(STier *pTier) { - if (pTier == NULL || pTier->ndisk <= 0) return; - int32_t nextid = 0; for (int32_t id = 1; id < pTier->ndisk; id++) { - SDisk *pLDisk = DISK_AT_TIER(pTier, nextid); - SDisk *pDisk = DISK_AT_TIER(pTier, id); - if (DISK_FREE_SIZE(pDisk) > TFS_MIN_DISK_FREE_SIZE && DISK_FREE_SIZE(pDisk) > DISK_FREE_SIZE(pLDisk)) { + SDisk *pLDisk = pTier->disks[nextid]; + SDisk *pDisk = pTier->disks[id]; + if (pDisk->size.avail > TFS_MIN_DISK_FREE_SIZE && pDisk->size.avail > pLDisk->size.avail) { nextid = id; } } diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 0be17ca2b9..cae1b18b3c 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -121,7 +121,7 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) { return true; } -int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { +int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { unsigned _int64 i64FreeBytesToCaller; unsigned _int64 i64TotalBytes; unsigned _int64 i64FreeBytes; @@ -438,7 +438,7 @@ int taosSystem(const char *cmd) { void taosSetCoreDump() {} -int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { +int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { struct statvfs info; if (statvfs(dataDir, &info)) { //printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno)); @@ -771,13 +771,12 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) { return true; } -int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { +int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { struct statvfs info; if (statvfs(dataDir, &info)) { - //printf("failed to get disk size, dataDir:%s errno:%s", dataDir, strerror(errno)); return -1; } else { - diskSize->tsize = info.f_blocks * info.f_frsize; + diskSize->total = info.f_blocks * info.f_frsize; diskSize->avail = info.f_bavail * info.f_frsize; diskSize->used = (info.f_blocks - info.f_bfree) * info.f_frsize; return 0; From ddf7dcc94f637ce0a2e3df75e61da629326bbdc4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 18 Jan 2022 16:02:12 +0800 Subject: [PATCH 07/17] fix mem leak --- include/common/tmsg.h | 11 ++++--- source/client/src/clientHb.c | 12 ++++--- source/client/test/clientTests.cpp | 4 ++- source/dnode/mnode/impl/inc/mndDef.h | 18 +++++++++-- source/dnode/mnode/impl/src/mndConsumer.c | 39 ++++++++++++++--------- source/dnode/mnode/impl/src/mndProfile.c | 5 +++ source/dnode/mnode/impl/src/mnode.c | 11 +++++++ 7 files changed, 73 insertions(+), 27 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f32fdcbae7..dfd376f1e9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -188,16 +188,19 @@ void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); static FORCE_INLINE void tFreeClientHbReq(void *pReq) { SClientHbReq* req = (SClientHbReq*)pReq; - taosHashCleanup(req->info); - free(pReq); + if (req->info) taosHashCleanup(req->info); } int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); -static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) { +static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; - //taosArrayDestroyEx(req->reqs, tFreeClientHbReq); + if (deep) { + taosArrayDestroyEx(req->reqs, tFreeClientHbReq); + } else { + taosArrayDestroy(req->reqs); + } free(pReq); } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 6d7fc9f81a..97ef77abcc 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -60,15 +60,17 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } +#if 0 pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL); while (pIter != NULL) { FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter; SClientHbKey connKey; taosHashCopyKey(pIter, &connKey); - getConnInfoFp(connKey, NULL); + SArray* pArray = getConnInfoFp(connKey, NULL); pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter); } +#endif return pBatchReq; } @@ -99,12 +101,12 @@ static void* hbThreadFunc(void* param) { //TODO: error handling break; } - void *bufCopy = buf; - tSerializeSClientHbBatchReq(&bufCopy, pReq); + void *abuf = buf; + tSerializeSClientHbBatchReq(&abuf, pReq); SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo)); if (pInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - tFreeClientHbBatchReq(pReq); + tFreeClientHbBatchReq(pReq, false); free(buf); break; } @@ -120,7 +122,7 @@ static void* hbThreadFunc(void* param) { int64_t transporterId = 0; SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); - tFreeClientHbBatchReq(pReq); + tFreeClientHbBatchReq(pReq, false); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 415d6a57ce..13915fd85d 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -53,7 +53,9 @@ TEST(testCase, connect_Test) { if (pConn == NULL) { printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); } - sleep(3); + while (1) { + sleep(3); + } taos_close(pConn); } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index de101b0f06..7da0f3826e 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -325,6 +325,19 @@ typedef struct SMqTopicConsumer { } SMqTopicConsumer; #endif +typedef struct SMqConsumerEp { + int32_t vgId; + SEpSet epset; + int64_t consumerId; +} SMqConsumerEp; + +typedef struct SMqCgroupTopicPair { + char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN]; + SArray* assigned; + SArray* unassignedConsumer; + SArray* unassignedVg; +} SMqCgroupTopicPair; + typedef struct SMqCGroup { char name[TSDB_CONSUMER_GROUP_LEN]; int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal @@ -351,10 +364,11 @@ typedef struct SMqTopicObj { // TODO: add cache and change name to id typedef struct SMqConsumerTopic { + char name[TSDB_TOPIC_FNAME_LEN]; int32_t epoch; - char name[TSDB_TOPIC_NAME_LEN]; //TODO: replace with something with ep SList *vgroups; // SList + SArray *pVgInfo; // SArray } SMqConsumerTopic; typedef struct SMqConsumerObj { @@ -362,7 +376,7 @@ typedef struct SMqConsumerObj { SRWLatch lock; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray *topics; // SArray - SHashObj *topicHash; + SHashObj *topicHash; //SHashObj } SMqConsumerObj; typedef struct SMqSubConsumerObj { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 54e640d8b7..d27bf53a90 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -204,34 +204,37 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) { static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; char *msgStr = pMsg->rpcMsg.pCont; - SCMSubscribeReq *pSubscribe; - tDeserializeSCMSubscribeReq(msgStr, pSubscribe); - int64_t consumerId = pSubscribe->consumerId; - char *consumerGroup = pSubscribe->consumerGroup; + SCMSubscribeReq subscribe; + tDeserializeSCMSubscribeReq(msgStr, &subscribe); + int64_t consumerId = subscribe.consumerId; + char *consumerGroup = subscribe.consumerGroup; int32_t cgroupLen = strlen(consumerGroup); SArray *newSub = NULL; - int newTopicNum = pSubscribe->topicNum; + int newTopicNum = subscribe.topicNum; if (newTopicNum) { newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); } + SMqConsumerTopic *pConsumerTopics = calloc(newTopicNum, sizeof(SMqConsumerTopic)); + if (pConsumerTopics == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } for (int i = 0; i < newTopicNum; i++) { char *newTopicName = taosArrayGetP(newSub, i); - SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic)); - if (pConsumerTopic == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - // TODO: free - return -1; - } + SMqConsumerTopic *pConsumerTopic = &pConsumerTopics[i]; + strcpy(pConsumerTopic->name, newTopicName); pConsumerTopic->vgroups = tdListNew(sizeof(int64_t)); - taosArrayPush(newSub, pConsumerTopic); - free(pConsumerTopic); } + + taosArrayAddBatch(newSub, pConsumerTopics, newTopicNum); + free(pConsumerTopics); taosArraySortString(newSub, taosArrayCompareString); SArray *oldSub = NULL; int oldTopicNum = 0; + // create consumer if not exist SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { // create consumer @@ -249,6 +252,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { + //TODO: free memory return -1; } @@ -286,6 +290,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } if (pOldTopic != NULL) { + //cancel subscribe of that old topic ASSERT(pNewTopic == NULL); char *oldTopicName = pOldTopic->name; SList *vgroups = pOldTopic->vgroups; @@ -298,13 +303,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); while ((pn = tdListNext(&iter)) != NULL) { int32_t vgId = *(int64_t *)pn->data; + // acquire and get epset SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - // TODO release + // TODO what time to release? if (pVgObj == NULL) { // TODO handle error continue; } - // acquire and get epset + //build reset msg void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup); // TODO:serialize if (pMsg == NULL) { @@ -323,10 +329,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { return -1; } } + //delete data in mnode taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen); mndReleaseTopic(pMnode, pTopic); } else if (pNewTopic != NULL) { + // save subscribe info to mnode ASSERT(pOldTopic == NULL); char *newTopicName = pNewTopic->name; @@ -351,6 +359,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { // add into cgroups taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup)); } + /*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/ // put the consumer into list // rebalance will be triggered by timer diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 902eaa5c1c..3444a2409a 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -357,6 +357,11 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { } } } + for (int i = 0; i < sz; i++) { + SClientHbReq* pHbReq = taosArrayGet(pArray, i); + tFreeClientHbReq(pHbReq); + } + taosArrayDestroy(pArray); int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); void* buf = rpcMallocCont(tlen); void* abuf = buf; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index d70c93e758..cab30702ea 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -69,6 +69,17 @@ static void mndTransReExecute(void *param, void *tmrId) { taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer); } +static void mndCalMqRebalance(void* param, void* tmrId) { + SMnode* pMnode = param; + if (mndIsMaster(pMnode)) { + // iterate cgroup, cal rebalance + // sync with raft + // write sdb + } + + taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->transTimer); +} + static int32_t mndInitTimer(SMnode *pMnode) { if (pMnode->timer == NULL) { pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND"); From d14a599a32bda1bdad49e14b77c598a40d6531d5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 18 Jan 2022 16:15:24 +0800 Subject: [PATCH 08/17] fix mem leak --- source/dnode/mnode/impl/src/mndProfile.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 3444a2409a..3773750ed3 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -357,15 +357,13 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { } } } - for (int i = 0; i < sz; i++) { - SClientHbReq* pHbReq = taosArrayGet(pArray, i); - tFreeClientHbReq(pHbReq); - } - taosArrayDestroy(pArray); + taosArrayDestroyEx(pArray, tFreeClientHbReq); + int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); void* buf = rpcMallocCont(tlen); void* abuf = buf; tSerializeSClientHbBatchRsp(&abuf, &batchRsp); + taosArrayDestroy(batchRsp.rsps); pReq->contLen = tlen; pReq->pCont = buf; return 0; From cc8a02aeb496bcb2901d3c9569f80c4f56265fb2 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 18 Jan 2022 16:17:58 +0800 Subject: [PATCH 09/17] fix mem leak --- source/libs/wal/src/walMeta.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index d630080086..a3894ceedd 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -149,6 +149,7 @@ int walCheckAndRepairMeta(SWal* pWal) { } } + closedir(dir); regfree(&logRegPattern); regfree(&idxRegPattern); From 623acecafdbfae6aa1471214040a001ea90b63fd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 18 Jan 2022 16:24:54 +0800 Subject: [PATCH 10/17] init hb count --- source/client/src/clientHb.c | 3 +++ source/client/test/clientTests.cpp | 4 +--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 97ef77abcc..0f4ff6f725 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -157,6 +157,9 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) { } // init stat pAppHbMgr->startTime = taosGetTimestampMs(); + pAppHbMgr->connKeyCnt = 0; + pAppHbMgr->reportCnt = 0; + pAppHbMgr->reportBytes = 0; // init app info pAppHbMgr->pAppInstInfo = pAppInstInfo; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 13915fd85d..415d6a57ce 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -53,9 +53,7 @@ TEST(testCase, connect_Test) { if (pConn == NULL) { printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); } - while (1) { - sleep(3); - } + sleep(3); taos_close(pConn); } From 66f610f8df52cf8897452fb06da972066a369ad4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 18 Jan 2022 16:34:09 +0800 Subject: [PATCH 11/17] make jenkins happy --- source/dnode/mnode/impl/inc/mndDef.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 7da0f3826e..a2d6bbf4e6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -333,7 +333,7 @@ typedef struct SMqConsumerEp { typedef struct SMqCgroupTopicPair { char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN]; - SArray* assigned; + SArray* assigned; // SArray SArray* unassignedConsumer; SArray* unassignedVg; } SMqCgroupTopicPair; From c4e0e94fc6f6e8a33739ffed6e066548941c8dd4 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Tue, 18 Jan 2022 17:53:21 +0800 Subject: [PATCH 12/17] [add create table scriptes] --- tests/script/sh/massiveTable/cleanCluster.sh | 57 ++++++++ .../script/sh/massiveTable/compileVersion.sh | 81 +++++++++++ tests/script/sh/massiveTable/deployCluster.sh | 25 ++++ tests/script/sh/massiveTable/setupDnodes.sh | 136 ++++++++++++++++++ 4 files changed, 299 insertions(+) create mode 100644 tests/script/sh/massiveTable/cleanCluster.sh create mode 100644 tests/script/sh/massiveTable/compileVersion.sh create mode 100644 tests/script/sh/massiveTable/deployCluster.sh create mode 100644 tests/script/sh/massiveTable/setupDnodes.sh diff --git a/tests/script/sh/massiveTable/cleanCluster.sh b/tests/script/sh/massiveTable/cleanCluster.sh new file mode 100644 index 0000000000..af278933b2 --- /dev/null +++ b/tests/script/sh/massiveTable/cleanCluster.sh @@ -0,0 +1,57 @@ +#!/bin/bash +# +# clean test environment + +set -e +#set -x + +# cleanCluster.sh +# -r [ dnode root dir] + + +dataRootDir="/data" + + +while getopts "hr:" arg +do + case $arg in + r) + dataRootDir=$(echo $OPTARG) + ;; + h) + echo "Usage: `basename $0` -r [ dnode root dir] " + exit 0 + ;; + ?) #unknow option + echo "unkonw argument" + exit 1 + ;; + esac +done + + +rmDnodesDataDir() { + if [ -d ${dataRootDir} ]; then + rm -rf ${dataRootDir}/dnode* + else + echo "${dataRootDir} not exist" + exit 1 + fi +} + +function kill_process() { + pid=$(ps -ef | grep "$1" | grep -v "grep" | awk '{print $2}') + if [ -n "$pid" ]; then + kill -9 $pid || : + fi +} + +######################################################################################## +############################### main process ########################################## + +## kill all taosd process +kill_process taosd + +rmDnodesDataDir + + diff --git a/tests/script/sh/massiveTable/compileVersion.sh b/tests/script/sh/massiveTable/compileVersion.sh new file mode 100644 index 0000000000..1976e8f14a --- /dev/null +++ b/tests/script/sh/massiveTable/compileVersion.sh @@ -0,0 +1,81 @@ +#!/bin/bash +# +# compile test version + +set -e +#set -x + +# compileVersion.sh +# -r [ TDengine project dir] +# -v [ TDengine branch version ] + + +projectDir=/root/TDengine +TDengineBrVer="3.0" + +while getopts "hr:v:" arg +do + case $arg in + r) + projectDir=$(echo $OPTARG) + ;; + v) + TDengineBrVer=$(echo $OPTARG) + ;; + h) + echo "Usage: `basename $0` -r [ TDengine project dir] " + echo " -v [ TDengine branch version] " + exit 0 + ;; + ?) #unknow option + echo "unkonw argument" + exit 1 + ;; + esac +done + +echo "projectDir=${projectDir} TDengineBrVer=${TDengineBrVer}" + +function gitPullBranchInfo () { + branch_name=$1 + + git checkout $branch_name + echo "==== git pull $branch_name start ====" +## git submodule update --init --recursive + git pull origin $branch_name ||: + echo "==== git pull $branch_name end ====" +} + +function compileTDengineVersion() { + debugDir=debug + if [ -d ${debugDir} ]; then + rm -rf ${debugDir}/* ||: + else + mkdir -p ${debugDir} + fi + + cd ${debugDir} + cmake .. + make -j24 +} +######################################################################################## +############################### main process ########################################## + +## checkout all branchs and git pull +cd ${projectDir} +gitPullBranchInfo $TDengineBrVer +compileTDengineVersion + +taos_dir=${projectDir}/debug/tools/shell +taosd_dir=${projectDir}/debug/source/dnode/mgmt/daemon +create_table_dir=${projectDir}/debug/tests/test/c + +rm -f /usr/bin/taos +rm -f /usr/bin/taosd +rm -f /usr/bin/create_table + +ln -s $taos_dir/taos /usr/bin/taos +ln -s $taosd_dir/taosd /usr/bin/taosd +ln -s $create_table_dir/create_table /usr/bin/create_table + + diff --git a/tests/script/sh/massiveTable/deployCluster.sh b/tests/script/sh/massiveTable/deployCluster.sh new file mode 100644 index 0000000000..47802ea6a7 --- /dev/null +++ b/tests/script/sh/massiveTable/deployCluster.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# +# deploy test cluster + +set -e +#set -x + +# deployCluster.sh + +curr_dir=$(pwd) + +source ./cleanCluster.sh -r /data +source ./cleanCluster.sh -r /data2 + +source ./compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0" + +source ./setupDnodes.sh -r /data -n 1 -f trd02:7000 -p 7000 +source ./setupDnodes.sh -r /data2 -n 1 -f trd02:7000 -p 8000 + +#source ./setupDnodes.sh -r /data -n 2 -f trd02:7000 -p 7000 +#source ./setupDnodes.sh -r /data2 -n 2 -f trd02:7000 -p 8000 + + + + diff --git a/tests/script/sh/massiveTable/setupDnodes.sh b/tests/script/sh/massiveTable/setupDnodes.sh new file mode 100644 index 0000000000..034c15c4eb --- /dev/null +++ b/tests/script/sh/massiveTable/setupDnodes.sh @@ -0,0 +1,136 @@ +#!/bin/bash +# +# setup test environment + +set -e +#set -x + +# setupDnodes.sh +# -e [ new | old] +# -n [ dnode number] +# -f [ first ep] +# -p [ start port] +# -r [ dnode root dir] + +# set parameters by default value +enviMode=new +dataRootDir="/data" +firstEp="localhost:7000" +startPort=7000 +dnodeNumber=1 + + +while getopts "he:f:n:r:p:" arg +do + case $arg in + e) + enviMode=$( echo $OPTARG ) + ;; + n) + dnodeNumber=$(echo $OPTARG) + ;; + f) + firstEp=$(echo $OPTARG) + ;; + p) + startPort=$(echo $OPTARG) + ;; + r) + dataRootDir=$(echo $OPTARG) + ;; + h) + echo "Usage: `basename $0` -e [new | old] " + echo " -n [ dnode number] " + echo " -f [ first ep] " + echo " -p [ start port] " + echo " -r [ dnode root dir] " + exit 0 + ;; + ?) #unknow option + echo "unkonw argument" + exit 1 + ;; + esac +done + +echo "enviMode=${enviMode} dnodeNumber=${dnodeNumber} dataRootDir=${dataRootDir} firstEp=${firstEp} startPort=${startPort}" + +#curr_dir=$(pwd) + + +createNewCfgFile() { + cfgFile=$1/taos.cfg + dataDir=$2 + logDir=$3 + firstEp=$4 + serverPort=$5 + + echo "debugFlag 131" > ${cfgFile} + echo "firstEp ${firstEp}" >> ${cfgFile} + echo "dataDir ${dataDir}" >> ${cfgFile} + echo "logDir ${logDir}" >> ${cfgFile} + echo "serverPort ${serverPort}" >> ${cfgFile} + + echo "supportVnodes 1024" >> ${cfgFile} + #echo "asyncLog 0" >> ${cfgFile} + echo "telemetryReporting 0" >> ${cfgFile} +} + +createNewDnodesDataDir() { + if [ -d ${dataRootDir} ]; then + rm -rf ${dataRootDir}/dnode* + else + echo "${dataRootDir} not exist" + exit 1 + fi + + dnodeNumber=$1 + firstEp=$2 + + serverPort=${startPort} + for ((i=0; i<${dnodeNumber}; i++)); do + mkdir -p ${dataRootDir}/dnode_${i}/cfg + mkdir -p ${dataRootDir}/dnode_${i}/log + mkdir -p ${dataRootDir}/dnode_${i}/data + + createNewCfgFile ${dataRootDir}/dnode_${i}/cfg ${dataRootDir}/dnode_${i}/data ${dataRootDir}/dnode_${i}/log ${firstEp} ${serverPort} + echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}" + serverPort=$((10#${serverPort}+100)) + done +} + +function kill_process() { + pid=$(ps -ef | grep "$1" | grep -v "grep" | awk '{print $2}') + if [ -n "$pid" ]; then + kill -9 $pid || : + fi +} + +startDnodes() { + dnodeNumber=$1 + + for ((i=0; i<${dnodeNumber}; i++)); do + if [ -d ${dataRootDir}/dnode_${i} ]; then + nohup taosd -c ${dataRootDir}/dnode_${i}/cfg >/dev/null 2>&1 & + echo "start taosd ${dataRootDir}/dnode_${i}" + fi + done +} + +######################################################################################## +############################### main process ########################################## + +## kill all taosd process +kill_process taosd + +## create director for all dnode +if [[ "$enviMode" == "new" ]]; then + createNewDnodesDataDir ${dnodeNumber} ${firstEp} +fi + +## start all dnode by nohup +startDnodes ${dnodeNumber} + +echo " run setupDnodes.sh end !!!" + + From 45f7b12f83d08d0825aae66890e5d46fdf77910f Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Tue, 18 Jan 2022 17:59:20 +0800 Subject: [PATCH 13/17] [modify] --- tests/script/sh/massiveTable/cleanCluster.sh | 0 tests/script/sh/massiveTable/compileVersion.sh | 0 tests/script/sh/massiveTable/deployCluster.sh | 0 tests/script/sh/massiveTable/setupDnodes.sh | 0 4 files changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 tests/script/sh/massiveTable/cleanCluster.sh mode change 100644 => 100755 tests/script/sh/massiveTable/compileVersion.sh mode change 100644 => 100755 tests/script/sh/massiveTable/deployCluster.sh mode change 100644 => 100755 tests/script/sh/massiveTable/setupDnodes.sh diff --git a/tests/script/sh/massiveTable/cleanCluster.sh b/tests/script/sh/massiveTable/cleanCluster.sh old mode 100644 new mode 100755 diff --git a/tests/script/sh/massiveTable/compileVersion.sh b/tests/script/sh/massiveTable/compileVersion.sh old mode 100644 new mode 100755 diff --git a/tests/script/sh/massiveTable/deployCluster.sh b/tests/script/sh/massiveTable/deployCluster.sh old mode 100644 new mode 100755 diff --git a/tests/script/sh/massiveTable/setupDnodes.sh b/tests/script/sh/massiveTable/setupDnodes.sh old mode 100644 new mode 100755 From 0b2aea0f3519b519161fe8f420cafe786c936914 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 18 Jan 2022 18:01:21 +0800 Subject: [PATCH 14/17] add client --- source/libs/transport/inc/transComm.h | 120 +++++ source/libs/transport/inc/transportInt.h | 88 ++- source/libs/transport/src/trans.c | 64 +++ source/libs/transport/src/transCli.c | 180 +++++++ source/libs/transport/src/transComm.c | 117 ++++ source/libs/transport/src/transSrv.c | 540 +++++++++++++++++++ source/libs/transport/test/transportTests.cc | 3 +- 7 files changed, 1066 insertions(+), 46 deletions(-) create mode 100644 source/libs/transport/inc/transComm.h create mode 100644 source/libs/transport/src/trans.c create mode 100644 source/libs/transport/src/transCli.c create mode 100644 source/libs/transport/src/transComm.c create mode 100644 source/libs/transport/src/transSrv.c diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h new file mode 100644 index 0000000000..792200639a --- /dev/null +++ b/source/libs/transport/inc/transComm.h @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifdef USE_UV + +#include +#include "lz4.h" +#include "os.h" +#include "rpcCache.h" +#include "rpcHead.h" +#include "rpcLog.h" +#include "rpcTcp.h" +#include "rpcUdp.h" +#include "taoserror.h" +#include "tglobal.h" +#include "thash.h" +#include "tidpool.h" +#include "tmd5.h" +#include "tmempool.h" +#include "tmsg.h" +#include "transportInt.h" +#include "tref.h" +#include "trpc.h" +#include "ttimer.h" +#include "tutil.h" + +typedef void* queue[2]; +/* Private macros. */ +#define QUEUE_NEXT(q) (*(queue**)&((*(q))[0])) +#define QUEUE_PREV(q) (*(queue**)&((*(q))[1])) + +#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) +#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) +/* Initialize an empty queue. */ +#define QUEUE_INIT(q) \ + { \ + QUEUE_NEXT(q) = (q); \ + QUEUE_PREV(q) = (q); \ + } + +/* Return true if the queue has no element. */ +#define QUEUE_IS_EMPTY(q) ((const queue*)(q) == (const queue*)QUEUE_NEXT(q)) + +/* Insert an element at the back of a queue. */ +#define QUEUE_PUSH(q, e) \ + { \ + QUEUE_NEXT(e) = (q); \ + QUEUE_PREV(e) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(e) = (e); \ + QUEUE_PREV(q) = (e); \ + } + +/* Remove the given element from the queue. Any element can be removed at any * + * time. */ +#define QUEUE_REMOVE(e) \ + { \ + QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ + QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ + } + +/* Return the element at the front of the queue. */ +#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) + +/* Return the element at the back of the queue. */ +#define QUEUE_TAIL(q) (QUEUE_PREV(q)) + +/* Iterate over the element of a queue. * Mutating the queue while iterating + * results in undefined behavior. */ +#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q)) + +/* Return the structure holding the given element. */ +#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) + +typedef struct { + SRpcInfo* pRpc; // associated SRpcInfo + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app + struct SRpcConn* pConn; // pConn allocated + tmsg_t msgType; // message type + uint8_t* pCont; // content provided by app + int32_t contLen; // content length + int32_t code; // error code + int16_t numOfTry; // number of try for different servers + int8_t oldInUse; // server EP inUse passed by app + int8_t redirect; // flag to indicate redirect + int8_t connType; // connection type + int64_t rid; // refId returned by taosAddRef + SRpcMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API + SEpSet* pSet; // for synchronous API + char msg[0]; // RpcHead starts from here +} SRpcReqContext; + +#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) +#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext)) + +#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) +#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) +#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) +#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) +#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) +#define rpcIsReq(type) (type & 1U) + +int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); +void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); +int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); +SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead); + +#endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index f93753cfe9..0addea5d1a 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -16,62 +16,60 @@ #ifndef _TD_TRANSPORT_INT_H_ #define _TD_TRANSPORT_INT_H_ +#include +#include "lz4.h" +#include "os.h" +#include "rpcCache.h" #include "rpcHead.h" +#include "rpcLog.h" +#include "rpcTcp.h" +#include "rpcUdp.h" +#include "taoserror.h" +#include "tglobal.h" +#include "thash.h" +#include "tidpool.h" +#include "tmsg.h" +#include "transportInt.h" +#include "tref.h" +#include "trpc.h" +#include "ttimer.h" +#include "tutil.h" + #ifdef __cplusplus extern "C" { #endif #ifdef USE_UV -#include -typedef void *queue[2]; +void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); -/* Private macros. */ -#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0])) -#define QUEUE_PREV(q) (*(queue **)&((*(q))[1])) +typedef struct { + int sessions; // number of sessions allowed + int numOfThreads; // number of threads to process incoming messages + int idleTime; // milliseconds; + uint16_t localPort; + int8_t connType; + int64_t index; + char label[TSDB_LABEL_LEN]; -#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) -#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) + char user[TSDB_UNI_LEN]; // meter ID + char spi; // security parameter index + char encrypt; // encrypt algorithm + char secret[TSDB_PASSWORD_LEN]; // secret for the link + char ckey[TSDB_PASSWORD_LEN]; // ciphering key -/* Initialize an empty queue. */ -#define QUEUE_INIT(q) \ - { \ - QUEUE_NEXT(q) = (q); \ - QUEUE_PREV(q) = (q); \ - } + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); + int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); -/* Return true if the queue has no element. */ -#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q)) - -/* Insert an element at the back of a queue. */ -#define QUEUE_PUSH(q, e) \ - { \ - QUEUE_NEXT(e) = (q); \ - QUEUE_PREV(e) = QUEUE_PREV(q); \ - QUEUE_PREV_NEXT(e) = (e); \ - QUEUE_PREV(q) = (e); \ - } - -/* Remove the given element from the queue. Any element can be removed at any * - * time. */ -#define QUEUE_REMOVE(e) \ - { \ - QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ - QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ - } - -/* Return the element at the front of the queue. */ -#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) - -/* Return the element at the back of the queue. */ -#define QUEUE_TAIL(q) (QUEUE_PREV(q)) - -/* Iterate over the element of a queue. * Mutating the queue while iterating - * results in undefined behavior. */ -#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q)) - -/* Return the structure holding the given element. */ -#define QUEUE_DATA(e, type, field) ((type *)((void *)((char *)(e)-offsetof(type, field)))) + int32_t refCount; + void* parent; + void* idPool; // handle to ID pool + void* tmrCtrl; // handle to timer + SHashObj* hash; // handle returned by hash utility + void* tcphandle; // returned handle from TCP initialization + pthread_mutex_t mutex; +} SRpcInfo; #endif // USE_LIBUV diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c new file mode 100644 index 0000000000..a59f2f88c7 --- /dev/null +++ b/source/libs/transport/src/trans.c @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifdef USE_UV + +#include "transComm.h" + +typedef struct SConnBuffer { + char* buf; + int len; + int cap; + int left; +} SConnBuffer; + +void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { + taosInitServer, taosInitClient}; + +void* rpcOpen(const SRpcInit* pInit) { + SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); + if (pRpc == NULL) { + return NULL; + } + if (pInit->label) { + tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); + } + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + pRpc->connType = pInit->connType; + pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + + return pRpc; +} +void rpcClose(void* arg) { return; } +void* rpcMallocCont(int contLen) { return NULL; } +void rpcFreeCont(void* cont) { return; } +void* rpcReallocCont(void* ptr, int contLen) { return NULL; } + +void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} +int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } +void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } +int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } +void rpcCancelRequest(int64_t rid) { return; } + +int32_t rpcInit(void) { + // impl later + return -1; +} + +void rpcCleanup(void) { + // impl later + return; +} +#endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c new file mode 100644 index 0000000000..f848932c0f --- /dev/null +++ b/source/libs/transport/src/transCli.c @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifdef USE_UV + +#include "transComm.h" + +typedef struct SCliConn { + uv_connect_t connReq; + uv_stream_t* stream; + void* data; + queue conn; +} SCliConn; +typedef struct SCliMsg { + SRpcReqContext* context; + queue q; +} SCliMsg; + +typedef struct SCliThrdObj { + pthread_t thread; + uv_loop_t* loop; + uv_async_t* cliAsync; // + void* cache; // conn pool + queue msg; + pthread_mutex_t msgMtx; + void* shandle; +} SCliThrdObj; + +typedef struct SClientObj { + char label[TSDB_LABEL_LEN]; + int32_t index; + int numOfThreads; + SCliThrdObj** pThreadObj; +} SClientObj; + +static void clientWriteCb(uv_write_t* req, int status); +static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void clientConnCb(struct uv_connect_s* req, int status); +static void clientAsyncCb(uv_async_t* handle); + +static void* clientThread(void* arg); + +static void clientWriteCb(uv_write_t* req, int status) { + // impl later +} +static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { + // impl later +} +static void clientConnCb(struct uv_connect_s* req, int status) { + // impl later +} + +static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { + // impl later + return NULL; +} +static void clientAsyncCb(uv_async_t* handle) { + SCliThrdObj* pThrd = handle->data; + SCliMsg* pMsg = NULL; + pthread_mutex_lock(&pThrd->msgMtx); + if (!QUEUE_IS_EMPTY(&pThrd->msg)) { + queue* head = QUEUE_HEAD(&pThrd->msg); + pMsg = QUEUE_DATA(head, SCliMsg, q); + QUEUE_REMOVE(head); + } + pthread_mutex_unlock(&pThrd->msgMtx); + + SEpSet* pEpSet = &pMsg->context->epSet; + char* fqdn = pEpSet->fqdn[pEpSet->inUse]; + uint32_t port = pEpSet->port[pEpSet->inUse]; + + SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port); + if (conn != NULL) { + } else { + SCliConn* conn = malloc(sizeof(SCliConn)); + + conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + + conn->connReq.data = conn; + conn->data = pMsg; + + struct sockaddr_in addr; + uv_ip4_addr(fqdn, port, &addr); + // handle error in callback if connect error + uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); + } + + // SRpcReqContext* pCxt = pMsg->context; + + // SRpcHead* pHead = rpcHeadFromCont(pCtx->pCont); + // char* msg = (char*)pHead; + // int len = rpcMsgLenFromCont(pCtx->contLen); + // tmsg_t msgType = pCtx->msgType; + + // impl later +} + +static void* clientThread(void* arg) { + SCliThrdObj* pThrd = (SCliThrdObj*)arg; + + QUEUE_INIT(&pThrd->msg); + pthread_mutex_init(&pThrd->msgMtx, NULL); + + // QUEUE_INIT(&pThrd->clientCache); + + pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + uv_loop_init(pThrd->loop); + + pThrd->cliAsync = malloc(sizeof(uv_async_t)); + uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); + pThrd->cliAsync->data = pThrd; + + uv_run(pThrd->loop, UV_RUN_DEFAULT); +} + +void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + SClientObj* cli = calloc(1, sizeof(SClientObj)); + memcpy(cli->label, label, strlen(label)); + cli->numOfThreads = numOfThreads; + cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); + + for (int i = 0; i < cli->numOfThreads; i++) { + SCliThrdObj* thrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); + + thrd->shandle = shandle; + int err = pthread_create(&thrd->thread, NULL, clientThread, (void*)(thrd)); + if (err == 0) { + tDebug("sucess to create tranport-client thread %d", i); + } + cli->pThreadObj[i] = thrd; + } + return cli; +} + +void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { + // impl later + SRpcInfo* pRpc = (SRpcInfo*)shandle; + + SRpcReqContext* pContext; + int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); + pContext = (SRpcReqContext*)((char*)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); + pContext->ahandle = pMsg->ahandle; + pContext->pRpc = (SRpcInfo*)shandle; + pContext->epSet = *pEpSet; + pContext->contLen = contLen; + pContext->pCont = pMsg->pCont; + pContext->msgType = pMsg->msgType; + pContext->oldInUse = pEpSet->inUse; + + assert(pRpc->connType == TAOS_CONN_CLIENT); + // atomic or not + int64_t index = pRpc->index; + if (pRpc->index++ >= pRpc->numOfThreads) { + pRpc->index = 0; + } + SCliMsg* msg = malloc(sizeof(SCliMsg)); + msg->context = pContext; + + SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads]; + + pthread_mutex_lock(&thrd->msgMtx); + QUEUE_PUSH(&thrd->msg, &msg->q); + pthread_mutex_unlock(&thrd->msgMtx); + + uv_async_send(thrd->cliAsync); +} +#endif diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c new file mode 100644 index 0000000000..f23cfb6e2d --- /dev/null +++ b/source/libs/transport/src/transComm.c @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifdef USE_UV + +#include "transComm.h" + +int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { + T_MD5_CTX context; + int ret = -1; + + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Update(&context, (uint8_t*)pMsg, msgLen); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Final(&context); + + if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0; + + return ret; +} +void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { + T_MD5_CTX context; + + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Update(&context, (uint8_t*)pMsg, msgLen); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Final(&context); + + memcpy(pAuth, context.digest, sizeof(context.digest)); +} + +int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { + SRpcHead* pHead = rpcHeadFromCont(pCont); + int32_t finalLen = 0; + int overhead = sizeof(SRpcComp); + + if (!NEEDTO_COMPRESSS_MSG(contLen)) { + return contLen; + } + + char* buf = malloc(contLen + overhead + 8); // 8 extra bytes + if (buf == NULL) { + tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); + return contLen; + } + + int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); + tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead); + + /* + * only the compressed size is less than the value of contLen - overhead, the compression is applied + * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message + */ + if (compLen > 0 && compLen < contLen - overhead) { + SRpcComp* pComp = (SRpcComp*)pCont; + pComp->reserved = 0; + pComp->contLen = htonl(contLen); + memcpy(pCont + overhead, buf, compLen); + + pHead->comp = 1; + tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen); + finalLen = compLen + overhead; + } else { + finalLen = contLen; + } + + free(buf); + return finalLen; +} + +SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) { + int overhead = sizeof(SRpcComp); + SRpcHead* pNewHead = NULL; + uint8_t* pCont = pHead->content; + SRpcComp* pComp = (SRpcComp*)pHead->content; + + if (pHead->comp) { + // decompress the content + assert(pComp->reserved == 0); + int contLen = htonl(pComp->contLen); + + // prepare the temporary buffer to decompress message + char* temp = (char*)malloc(contLen + RPC_MSG_OVERHEAD); + pNewHead = (SRpcHead*)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext + + if (pNewHead) { + int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; + int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char*)pNewHead->content, compLen, contLen); + assert(origLen == contLen); + + memcpy(pNewHead, pHead, sizeof(SRpcHead)); + pNewHead->msgLen = rpcMsgLenFromCont(origLen); + /// rpcFreeMsg(pHead); // free the compressed message buffer + pHead = pNewHead; + tTrace("decomp malloc mem:%p", temp); + } else { + tError("failed to allocate memory to decompress msg, contLen:%d", contLen); + } + } + + return pHead; +} + +#endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c new file mode 100644 index 0000000000..0bf39b9985 --- /dev/null +++ b/source/libs/transport/src/transSrv.c @@ -0,0 +1,540 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifdef USE_UV +#include "transComm.h" + +typedef struct SConnBuffer { + char* buf; + int len; + int cap; + int left; +} SConnBuffer; + +typedef struct SConn { + uv_tcp_t* pTcp; + uv_write_t* pWriter; + uv_timer_t* pTimer; + + uv_async_t* pWorkerAsync; + queue queue; + int ref; + int persist; // persist connection or not + SConnBuffer connBuf; // read buf, + SConnBuffer writeBuf; // write buf + int count; + void* shandle; // rpc init + void* ahandle; // + void* hostThrd; + // del later + char secured; + int spi; + char info[64]; + char user[TSDB_UNI_LEN]; // user ID for the link + char secret[TSDB_PASSWORD_LEN]; + char ckey[TSDB_PASSWORD_LEN]; // ciphering key +} SConn; + +typedef struct SWorkThrdObj { + pthread_t thread; + uv_pipe_t* pipe; + int fd; + uv_loop_t* loop; + uv_async_t* workerAsync; // + queue conn; + pthread_mutex_t connMtx; + void* shandle; +} SWorkThrdObj; + +typedef struct SServerObj { + pthread_t thread; + uv_tcp_t server; + uv_loop_t* loop; + int workerIdx; + int numOfThreads; + SWorkThrdObj** pThreadObj; + uv_pipe_t** pipe; + uint32_t ip; + uint32_t port; +} SServerObj; + +static const char* notify = "a"; + +// refactor later +static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen); + +static int uvAuthMsg(SConn* pConn, char* msg, int msgLen); + +static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void uvOnTimeoutCb(uv_timer_t* handle); +static void uvOnWriteCb(uv_write_t* req, int status); +static void uvOnAcceptCb(uv_stream_t* stream, int status); +static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); +static void uvWorkerAsyncCb(uv_async_t* handle); + +// already read complete packet +static bool readComplete(SConnBuffer* buf); + +static SConn* connCreate(); +static void connDestroy(SConn* conn); +static void uvConnDestroy(uv_handle_t* handle); + +// server worke thread +static void* workerThread(void* arg); +static void* acceptThread(void* arg); + +void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + /* + * formate of data buffer: + * |<-------SRpcReqContext------->|<------------data read from socket----------->| + */ + static const int CAPACITY = 1024; + + SConn* conn = handle->data; + SConnBuffer* pBuf = &conn->connBuf; + if (pBuf->cap == 0) { + pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char)); + pBuf->len = 0; + pBuf->cap = CAPACITY; + pBuf->left = -1; + + buf->base = pBuf->buf + RPC_RESERVE_SIZE; + buf->len = CAPACITY; + } else { + if (pBuf->len >= pBuf->cap) { + if (pBuf->left == -1) { + pBuf->cap *= 2; + pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE); + } else if (pBuf->len + pBuf->left > pBuf->cap) { + pBuf->cap = pBuf->len + pBuf->left; + pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE); + } + } + buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE; + buf->len = pBuf->cap - pBuf->len; + } +} + +// check data read from socket completely or not +// +static bool readComplete(SConnBuffer* data) { + // TODO(yihao): handle pipeline later + SRpcHead rpcHead; + int32_t headLen = sizeof(rpcHead); + if (data->len >= headLen) { + memcpy((char*)&rpcHead, data->buf + RPC_RESERVE_SIZE, headLen); + int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); + if (msgLen > data->len) { + data->left = msgLen - data->len; + return false; + } else { + return true; + } + } else { + return false; + } +} + +static void uvDoProcess(SRecvInfo* pRecv) { + SRpcHead* pHead = (SRpcHead*)pRecv->msg; + SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; + SConn* pConn = pRecv->thandle; + + tDump(pRecv->msg, pRecv->msgLen); + + terrno = 0; + SRpcReqContext* pContest; + + // do auth and check +} + +static int uvAuthMsg(SConn* pConn, char* msg, int len) { + SRpcHead* pHead = (SRpcHead*)msg; + int code = 0; + + if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) { + // secured link, or no authentication + pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); + // tTrace("%s, secured link, no auth is required", pConn->info); + return 0; + } + + if (!rpcIsReq(pHead->msgType)) { + // for response, if code is auth failure, it shall bypass the auth process + code = htonl(pHead->code); + if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || + code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED || + code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { + pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); + // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); + return 0; + } + } + + code = 0; + if (pHead->spi == pConn->spi) { + // authentication + SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest)); + + int32_t delta; + delta = (int32_t)htonl(pDigest->timeStamp); + delta -= (int32_t)taosGetTimestampSec(); + if (abs(delta) > 900) { + tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); + code = TSDB_CODE_RPC_INVALID_TIME_STAMP; + } else { + if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { + // tDebug("%s, authentication failed, msg discarded", pConn->info); + code = TSDB_CODE_RPC_AUTH_FAILURE; + } else { + pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); + if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client + // tTrace("%s, message is authenticated", pConn->info); + } + } + } else { + tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi); + code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED; + } + + return code; +} + +// refers specifically to query or insert timeout +static void uvHandleActivityTimeout(uv_timer_t* handle) { + // impl later + SConn* conn = handle->data; +} + +static void uvProcessData(SConn* pConn) { + SRecvInfo info; + SRecvInfo* p = &info; + SConnBuffer* pBuf = &pConn->connBuf; + p->msg = pBuf->buf + RPC_RESERVE_SIZE; + p->msgLen = pBuf->len; + p->ip = 0; + p->port = 0; + p->shandle = pConn->shandle; // + p->thandle = pConn; + p->chandle = NULL; + + // + SRpcHead* pHead = (SRpcHead*)p->msg; + assert(rpcIsReq(pHead->msgType)); + + SRpcInfo* pRpc = (SRpcInfo*)p->shandle; + pConn->ahandle = (void*)pHead->ahandle; + // auth here + + int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen); + if (code != 0) { + terrno = code; + return; + } + pHead->code = htonl(pHead->code); + + SRpcMsg rpcMsg; + + pHead = rpcDecompressRpcMsg(pHead); + rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); + rpcMsg.pCont = pHead->content; + rpcMsg.msgType = pHead->msgType; + rpcMsg.code = pHead->code; + rpcMsg.ahandle = pConn->ahandle; + rpcMsg.handle = pConn; + + (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); + uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime, 0); + // auth + // validate msg type +} + +void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { + // opt + SConn* ctx = cli->data; + SConnBuffer* pBuf = &ctx->connBuf; + if (nread > 0) { + pBuf->len += nread; + if (readComplete(pBuf)) { + tDebug("alread read complete packet"); + uvProcessData(ctx); + } else { + tDebug("read half packet, continue to read"); + } + return; + } + if (terrno != 0) { + // handle err code + } + + if (nread != UV_EOF) { + tDebug("Read error %s\n", uv_err_name(nread)); + } + uv_close((uv_handle_t*)cli, uvConnDestroy); +} +void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + buf->base = malloc(sizeof(char)); + buf->len = 2; +} + +void uvOnTimeoutCb(uv_timer_t* handle) { + // opt + tDebug("time out"); +} + +void uvOnWriteCb(uv_write_t* req, int status) { + SConn* conn = req->data; + if (status == 0) { + tDebug("data already was written on stream"); + } else { + connDestroy(conn); + } + // opt +} + +void uvWorkerAsyncCb(uv_async_t* handle) { + SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync); + SConn* conn = NULL; + + // opt later + pthread_mutex_lock(&pThrd->connMtx); + if (!QUEUE_IS_EMPTY(&pThrd->conn)) { + queue* head = QUEUE_HEAD(&pThrd->conn); + conn = QUEUE_DATA(head, SConn, queue); + QUEUE_REMOVE(head); + } + pthread_mutex_unlock(&pThrd->connMtx); + if (conn == NULL) { + tError("except occurred, do nothing"); + return; + } + uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); + uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); +} + +void uvOnAcceptCb(uv_stream_t* stream, int status) { + if (status == -1) { + return; + } + SServerObj* pObj = container_of(stream, SServerObj, server); + + uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pObj->loop, cli); + + if (uv_accept(stream, (uv_stream_t*)cli) == 0) { + uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); + + uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); + + pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads; + tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); + uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb); + } else { + uv_close((uv_handle_t*)cli, NULL); + } +} +void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { + tDebug("connection coming"); + if (nread < 0) { + if (nread != UV_EOF) { + tError("read error %s", uv_err_name(nread)); + } + // TODO(log other failure reason) + uv_close((uv_handle_t*)q, NULL); + return; + } + // free memory allocated by + assert(nread == strlen(notify)); + assert(buf->base[0] == notify[0]); + free(buf->base); + + SWorkThrdObj* pThrd = q->data; + + uv_pipe_t* pipe = (uv_pipe_t*)q; + if (!uv_pipe_pending_count(pipe)) { + tError("No pending count"); + return; + } + + uv_handle_type pending = uv_pipe_pending_type(pipe); + assert(pending == UV_TCP); + + SConn* pConn = connCreate(); + pConn->shandle = pThrd->shandle; + /* init conn timer*/ + pConn->pTimer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pThrd->loop, pConn->pTimer); + pConn->pTimer->data = pConn; + + pConn->hostThrd = pThrd; + pConn->pWorkerAsync = pThrd->workerAsync; // thread safty + + // init client handle + pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pThrd->loop, pConn->pTcp); + pConn->pTcp->data = pConn; + + // init write request, just + pConn->pWriter = calloc(1, sizeof(uv_write_t)); + pConn->pWriter->data = pConn; + + if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { + uv_os_fd_t fd; + uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); + tDebug("new connection created: %d", fd); + uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); + } else { + connDestroy(pConn); + } +} + +void* acceptThread(void* arg) { + // opt + SServerObj* srv = (SServerObj*)arg; + uv_tcp_init(srv->loop, &srv->server); + + struct sockaddr_in bind_addr; + + uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); + uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); + int err = 0; + if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) { + tError("Listen error %s\n", uv_err_name(err)); + return NULL; + } + uv_run(srv->loop, UV_RUN_DEFAULT); +} +void* workerThread(void* arg) { + SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; + + pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + uv_loop_init(pThrd->loop); + + // SRpcInfo* pRpc = pThrd->shandle; + uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + uv_pipe_open(pThrd->pipe, pThrd->fd); + + pThrd->pipe->data = pThrd; + + QUEUE_INIT(&pThrd->conn); + pthread_mutex_init(&pThrd->connMtx, NULL); + + pThrd->workerAsync = malloc(sizeof(uv_async_t)); + uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb); + + uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + uv_run(pThrd->loop, UV_RUN_DEFAULT); +} + +static SConn* connCreate() { + SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); + return pConn; +} +static void connDestroy(SConn* conn) { + if (conn == NULL) { + return; + } + uv_timer_stop(conn->pTimer); + free(conn->pTimer); + uv_close((uv_handle_t*)conn->pTcp, NULL); + free(conn->connBuf.buf); + free(conn->pTcp); + free(conn->pWriter); + free(conn); + // handle +} +static void uvConnDestroy(uv_handle_t* handle) { + SConn* conn = handle->data; + connDestroy(conn); +} +static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen) { + SRpcHead* pHead = (SRpcHead*)msg; + + if (pConn->spi && pConn->secured == 0) { + // add auth part + pHead->spi = pConn->spi; + SRpcDigest* pDigest = (SRpcDigest*)(msg + msgLen); + pDigest->timeStamp = htonl(taosGetTimestampSec()); + msgLen += sizeof(SRpcDigest); + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); + } else { + pHead->spi = 0; + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + } + + return msgLen; +} + +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + SServerObj* srv = calloc(1, sizeof(SServerObj)); + srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + srv->numOfThreads = numOfThreads; + srv->workerIdx = 0; + srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*)); + srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*)); + srv->ip = ip; + srv->port = port; + uv_loop_init(srv->loop); + + for (int i = 0; i < srv->numOfThreads; i++) { + SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); + srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); + int fds[2]; + if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + return NULL; + } + uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write + + thrd->shandle = shandle; + thrd->fd = fds[0]; + thrd->pipe = &(srv->pipe[i][1]); // init read + int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); + if (err == 0) { + tDebug("sucess to create worker-thread %d", i); + // printf("thread %d create\n", i); + } else { + // TODO: clear all other resource later + tError("failed to create worker-thread %d", i); + } + srv->pThreadObj[i] = thrd; + } + + int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); + if (err == 0) { + tDebug("success to create accept-thread"); + } else { + // clear all resource later + } + + return srv; +} + +void rpcSendResponse(const SRpcMsg* pMsg) { + SConn* pConn = pMsg->handle; + SWorkThrdObj* pThrd = pConn->hostThrd; + + // opt later + pthread_mutex_lock(&pThrd->connMtx); + QUEUE_PUSH(&pThrd->conn, &pConn->queue); + pthread_mutex_unlock(&pThrd->connMtx); + + uv_async_send(pConn->pWorkerAsync); +} + +#endif diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc index 151deaf29b..53910aa30c 100644 --- a/source/libs/transport/test/transportTests.cc +++ b/source/libs/transport/test/transportTests.cc @@ -22,6 +22,7 @@ #include #include +#include "transComm.h" #include "transportInt.h" #include "trpc.h" @@ -46,7 +47,7 @@ class QueueObj { if (!IsEmpty()) { queue *h = QUEUE_HEAD(&head); el = QUEUE_DATA(h, QueueElem, q); - QUEUE_REMOVE(&el->q); + QUEUE_REMOVE(h); } return el; } From caeb8ae159a6a8180bb3fed9e6cf5dbeba919b69 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 18 Jan 2022 18:14:43 +0800 Subject: [PATCH 15/17] add client --- source/libs/transport/inc/transportInt.h | 3 ++- source/libs/transport/src/transCli.c | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 0addea5d1a..e39e0d9273 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -16,7 +16,9 @@ #ifndef _TD_TRANSPORT_INT_H_ #define _TD_TRANSPORT_INT_H_ +#ifdef USE_UV #include +#endif #include "lz4.h" #include "os.h" #include "rpcCache.h" @@ -29,7 +31,6 @@ #include "thash.h" #include "tidpool.h" #include "tmsg.h" -#include "transportInt.h" #include "tref.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f848932c0f..bd54e8e57b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -149,13 +149,14 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* // impl later SRpcInfo* pRpc = (SRpcInfo*)shandle; + int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); + SRpcReqContext* pContext; - int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); pContext = (SRpcReqContext*)((char*)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); pContext->ahandle = pMsg->ahandle; pContext->pRpc = (SRpcInfo*)shandle; pContext->epSet = *pEpSet; - pContext->contLen = contLen; + pContext->contLen = len; pContext->pCont = pMsg->pCont; pContext->msgType = pMsg->msgType; pContext->oldInUse = pEpSet->inUse; From ea57fba331c99db40d9cc3dc360d659105fb6cdc Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Tue, 18 Jan 2022 18:22:13 +0800 Subject: [PATCH 16/17] [modify] --- tests/script/sh/massiveTable/deployCluster.sh | 15 ++++++++------- tests/script/sh/massiveTable/setupDnodes.sh | 5 +++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/tests/script/sh/massiveTable/deployCluster.sh b/tests/script/sh/massiveTable/deployCluster.sh index 47802ea6a7..2341d512c0 100755 --- a/tests/script/sh/massiveTable/deployCluster.sh +++ b/tests/script/sh/massiveTable/deployCluster.sh @@ -8,17 +8,18 @@ set -e # deployCluster.sh curr_dir=$(pwd) +echo "currect pwd: ${curr_dir}" -source ./cleanCluster.sh -r /data -source ./cleanCluster.sh -r /data2 +./cleanCluster.sh -r "/data" +./cleanCluster.sh -r "/data2" -source ./compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0" +./compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0" -source ./setupDnodes.sh -r /data -n 1 -f trd02:7000 -p 7000 -source ./setupDnodes.sh -r /data2 -n 1 -f trd02:7000 -p 8000 +./setupDnodes.sh -r "/data" -n 1 -f "trd02:7000" -p 7000 +./setupDnodes.sh -r "/data2" -n 1 -f "trd02:7000" -p 8000 -#source ./setupDnodes.sh -r /data -n 2 -f trd02:7000 -p 7000 -#source ./setupDnodes.sh -r /data2 -n 2 -f trd02:7000 -p 8000 +#./setupDnodes.sh -r "/data" -n 2 -f trd02:7000 -p 7000 +#./setupDnodes.sh -r "/data2" -n 2 -f trd02:7000 -p 8000 diff --git a/tests/script/sh/massiveTable/setupDnodes.sh b/tests/script/sh/massiveTable/setupDnodes.sh index 034c15c4eb..e09b2fb396 100755 --- a/tests/script/sh/massiveTable/setupDnodes.sh +++ b/tests/script/sh/massiveTable/setupDnodes.sh @@ -94,7 +94,7 @@ createNewDnodesDataDir() { mkdir -p ${dataRootDir}/dnode_${i}/data createNewCfgFile ${dataRootDir}/dnode_${i}/cfg ${dataRootDir}/dnode_${i}/data ${dataRootDir}/dnode_${i}/log ${firstEp} ${serverPort} - echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}" + #echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}" serverPort=$((10#${serverPort}+100)) done } @@ -131,6 +131,7 @@ fi ## start all dnode by nohup startDnodes ${dnodeNumber} -echo " run setupDnodes.sh end !!!" +echo "====run setupDnodes.sh end====" +echo " " From 04e3df71d35dd855c38cdee54ce8d0bdd1bbb4c4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 18 Jan 2022 18:39:10 +0800 Subject: [PATCH 17/17] add client --- source/libs/transport/src/transCli.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index bd54e8e57b..8ba1f1a0f6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -59,6 +59,18 @@ static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // impl later } static void clientConnCb(struct uv_connect_s* req, int status) { + SCliConn* pConn = req->data; + SCliMsg* pMsg = pConn->data; + SEpSet* pEpSet = &pMsg->context->epSet; + + char* fqdn = pEpSet->fqdn[pEpSet->inUse]; + uint32_t port = pEpSet->port[pEpSet->inUse]; + if (status != 0) { + // call user fp later + tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status)); + return; + } + // impl later } @@ -83,6 +95,7 @@ static void clientAsyncCb(uv_async_t* handle) { SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port); if (conn != NULL) { + // impl later } else { SCliConn* conn = malloc(sizeof(SCliConn));