From ffe87632873aaebd106845064411b3f402697a3b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 12 Jan 2022 22:17:40 +0800 Subject: [PATCH 01/14] rename files --- source/libs/tfs/inc/{tfsint.h => tfsint1.h} | 3 + source/libs/tfs/src/{tdisk.c => tfsDisk.c} | 19 ++-- source/libs/tfs/src/{ttier.c => tfsTier.c} | 0 src/inc/tfs.h | 103 -------------------- 4 files changed, 13 insertions(+), 112 deletions(-) rename source/libs/tfs/inc/{tfsint.h => tfsint1.h} (98%) rename source/libs/tfs/src/{tdisk.c => tfsDisk.c} (79%) rename source/libs/tfs/src/{ttier.c => tfsTier.c} (100%) delete mode 100644 src/inc/tfs.h diff --git a/source/libs/tfs/inc/tfsint.h b/source/libs/tfs/inc/tfsint1.h similarity index 98% rename from source/libs/tfs/inc/tfsint.h rename to source/libs/tfs/inc/tfsint1.h index 3c5dccc63b..caa52b5cd6 100644 --- a/source/libs/tfs/inc/tfsint.h +++ b/source/libs/tfs/inc/tfsint1.h @@ -16,6 +16,9 @@ #ifndef TD_TFSINT_H #define TD_TFSINT_H +#include "os.h" + +#include "taoserror.h" #include "tlog.h" #include "tglobal.h" #include "tfs.h" diff --git a/source/libs/tfs/src/tdisk.c b/source/libs/tfs/src/tfsDisk.c similarity index 79% rename from source/libs/tfs/src/tdisk.c rename to source/libs/tfs/src/tfsDisk.c index 22601e48c3..f32069837d 100644 --- a/source/libs/tfs/src/tdisk.c +++ b/source/libs/tfs/src/tfsDisk.c @@ -12,14 +12,12 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "os.h" -#include "taoserror.h" +#define _DEFAULT_SOURCE #include "tfsint.h" -// PROTECTED ==================================== -SDisk *tfsNewDisk(int level, int id, const char *dir) { - SDisk *pDisk = (SDisk *)calloc(1, sizeof(*pDisk)); +SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir) { + SDisk *pDisk = calloc(1, sizeof(SDisk)); if (pDisk == NULL) { terrno = TSDB_CODE_FS_OUT_OF_MEMORY; return NULL; @@ -33,18 +31,21 @@ SDisk *tfsNewDisk(int level, int id, const char *dir) { } SDisk *tfsFreeDisk(SDisk *pDisk) { - if (pDisk) { + if (pDisk != NULL) { free(pDisk); } return NULL; } -int tfsUpdateDiskInfo(SDisk *pDisk) { - ASSERT(pDisk != NULL); +int32_t tfsUpdateDiskInfo(SDisk *pDisk) { + if (pDisk == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } SysDiskSize diskSize = {0}; - int code = taosGetDiskSize(pDisk->dir, &diskSize); + int32_t code = taosGetDiskSize(pDisk->dir, &diskSize); if (code != 0) { fError("failed to update disk information at level %d id %d dir %s since %s", pDisk->level, pDisk->id, pDisk->dir, strerror(errno)); diff --git a/source/libs/tfs/src/ttier.c b/source/libs/tfs/src/tfsTier.c similarity index 100% rename from source/libs/tfs/src/ttier.c rename to source/libs/tfs/src/tfsTier.c diff --git a/src/inc/tfs.h b/src/inc/tfs.h deleted file mode 100644 index 9ad7a8f66e..0000000000 --- a/src/inc/tfs.h +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TD_TFS_H -#define TD_TFS_H - -#include "tglobal.h" - -#ifdef __cplusplus -extern "C" { -#endif - -typedef struct { - int level; - int id; -} SDiskID; - -#define TFS_UNDECIDED_LEVEL -1 -#define TFS_UNDECIDED_ID -1 -#define TFS_PRIMARY_LEVEL 0 -#define TFS_PRIMARY_ID 0 -#define TFS_MIN_LEVEL 0 -#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1) - -// FS APIs ==================================== -typedef struct { - int64_t tsize; - int64_t used; - int64_t avail; -} SFSMeta; - -typedef struct { - int64_t size; - int64_t used; - int64_t free; - int16_t nAvailDisks; // # of Available disks -} STierMeta; - -int tfsInit(SDiskCfg *pDiskCfg, int ndisk); -void tfsCleanup(); -void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels); -void tfsGetMeta(SFSMeta *pMeta); -void tfsAllocDisk(int expLevel, int *level, int *id); - -const char *TFS_PRIMARY_PATH(); -const char *TFS_DISK_PATH(int level, int id); - -// TFILE APIs ==================================== -typedef struct { - int level; - int id; - char rname[TSDB_FILENAME_LEN]; // REL name - char aname[TSDB_FILENAME_LEN]; // ABS name -} TFILE; - -#define TFILE_LEVEL(pf) ((pf)->level) -#define TFILE_ID(pf) ((pf)->id) -#define TFILE_NAME(pf) ((pf)->aname) -#define TFILE_REL_NAME(pf) ((pf)->rname) - -#define tfsopen(pf, flags) open(TFILE_NAME(pf), flags) -#define tfsclose(fd) close(fd) -#define tfsremove(pf) remove(TFILE_NAME(pf)) -#define tfscopy(sf, df) taosCopy(TFILE_NAME(sf), TFILE_NAME(df)) -#define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df)) - -void tfsInitFile(TFILE *pf, int level, int id, const char *bname); -bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2); -int tfsEncodeFile(void **buf, TFILE *pf); -void *tfsDecodeFile(void *buf, TFILE *pf); -void tfsbasename(const TFILE *pf, char *dest); -void tfsdirname(const TFILE *pf, char *dest); - -// DIR APIs ==================================== -int tfsMkdirAt(const char *rname, int level, int id); -int tfsMkdirRecurAt(const char *rname, int level, int id); -int tfsMkdir(const char *rname); -int tfsRmdir(const char *rname); -int tfsRename(char *orname, char *nrname); - -typedef struct TDIR TDIR; - -TDIR * tfsOpendir(const char *rname); -const TFILE *tfsReaddir(TDIR *tdir); -void tfsClosedir(TDIR *tdir); - -#ifdef __cplusplus -} -#endif - -#endif From ba42f8cd93fdacfe900c1c44bce2ac9011ac797e Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 12 Jan 2022 22:20:29 +0800 Subject: [PATCH 02/14] rename files --- source/libs/tfs/inc/{tfsint1.h => tfsInt.h} | 12 +++++------- source/libs/tfs/src/tfs.c | 9 ++------- source/libs/tfs/src/tfsDisk.c | 2 +- source/libs/tfs/src/tfsTier.c | 6 ++---- 4 files changed, 10 insertions(+), 19 deletions(-) rename source/libs/tfs/inc/{tfsint1.h => tfsInt.h} (98%) diff --git a/source/libs/tfs/inc/tfsint1.h b/source/libs/tfs/inc/tfsInt.h similarity index 98% rename from source/libs/tfs/inc/tfsint1.h rename to source/libs/tfs/inc/tfsInt.h index caa52b5cd6..40eb61ba07 100644 --- a/source/libs/tfs/inc/tfsint1.h +++ b/source/libs/tfs/inc/tfsInt.h @@ -18,15 +18,13 @@ #include "os.h" +#include "taosdef.h" #include "taoserror.h" -#include "tlog.h" -#include "tglobal.h" -#include "tfs.h" #include "tcoding.h" - -#ifdef __cplusplus -extern "C" { -#endif +#include "tfs.h" +#include "tglobal.h" +#include "thash.h" +#include "tlog.h" extern int fsDebugFlag; diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 88d6d587a7..a054981961 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -13,13 +13,8 @@ * along with this program. If not, see . */ -#include "os.h" - -#include "taosdef.h" -#include "taoserror.h" -#include "tfs.h" -#include "tfsint.h" -#include "thash.h" +#define _DEFAULT_SOURCE +#include "tfsInt.h" #define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32) diff --git a/source/libs/tfs/src/tfsDisk.c b/source/libs/tfs/src/tfsDisk.c index f32069837d..98656c84e6 100644 --- a/source/libs/tfs/src/tfsDisk.c +++ b/source/libs/tfs/src/tfsDisk.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "tfsint.h" +#include "tfsInt.h" SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir) { SDisk *pDisk = calloc(1, sizeof(SDisk)); diff --git a/source/libs/tfs/src/tfsTier.c b/source/libs/tfs/src/tfsTier.c index 3b19797acf..90057e61d5 100644 --- a/source/libs/tfs/src/tfsTier.c +++ b/source/libs/tfs/src/tfsTier.c @@ -12,11 +12,9 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "os.h" -#include "taosdef.h" -#include "taoserror.h" -#include "tfsint.h" +#define _DEFAULT_SOURCE +#include "tfsInt.h" #define tfsLockTier(pTier) pthread_spin_lock(&((pTier)->lock)) #define tfsUnLockTier(pTier) pthread_spin_unlock(&((pTier)->lock)) From 9e4141fa6d517fe32b6398015799d05f2e932f62 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 12 Jan 2022 22:57:38 +0800 Subject: [PATCH 03/14] add error codes --- include/util/taoserror.h | 2 +- source/libs/tfs/inc/tfsInt.h | 27 ++++++------- source/libs/tfs/src/tfsDisk.c | 13 +++--- source/libs/tfs/src/tfsTier.c | 75 ++++++++++++++++++++--------------- source/util/src/terror.c | 2 +- 5 files changed, 64 insertions(+), 55 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 854d16f67d..4f424111a8 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -406,7 +406,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) //"WAL out of memory") // tfs -#define TSDB_CODE_FS_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory") +#define TSDB_CODE_FS_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory") #define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201) //"tfs invalid mount config") #define TSDB_CODE_FS_TOO_MANY_MOUNT TAOS_DEF_ERROR_CODE(0, 0x2202) //"tfs too many mount") #define TSDB_CODE_FS_DUP_PRIMARY TAOS_DEF_ERROR_CODE(0, 0x2203) //"tfs duplicate primary mount") diff --git a/source/libs/tfs/inc/tfsInt.h b/source/libs/tfs/inc/tfsInt.h index 40eb61ba07..4e53fcdb8e 100644 --- a/source/libs/tfs/inc/tfsInt.h +++ b/source/libs/tfs/inc/tfsInt.h @@ -26,7 +26,7 @@ #include "thash.h" #include "tlog.h" -extern int fsDebugFlag; +extern int32_t fsDebugFlag; // For debug purpose #define fFatal(...) { if (fsDebugFlag & DEBUG_FATAL) { taosPrintLog("TFS FATAL ", 255, __VA_ARGS__); }} @@ -47,8 +47,8 @@ typedef struct { } SDiskMeta; typedef struct SDisk { - int level; - int id; + int32_t level; + int32_t id; char dir[TSDB_FILENAME_LEN]; SDiskMeta dmeta; } SDisk; @@ -61,19 +61,19 @@ typedef struct SDisk { #define DISK_USED_SIZE(pd) ((pd)->dmeta.used) #define DISK_FREE_SIZE(pd) ((pd)->dmeta.free) -SDisk *tfsNewDisk(int level, int id, const char *dir); -SDisk *tfsFreeDisk(SDisk *pDisk); -int tfsUpdateDiskInfo(SDisk *pDisk); +SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir); +SDisk *tfsFreeDisk(SDisk *pDisk); +int32_t tfsUpdateDiskInfo(SDisk *pDisk); // ttier.c ====================================================== typedef struct STier { pthread_spinlock_t lock; - int level; + int32_t level; int16_t ndisk; // # of disks mounted to this tier int16_t nextid; // next disk id to allocate STierMeta tmeta; - SDisk * disks[TSDB_MAX_DISKS_PER_TIER]; + SDisk *disks[TSDB_MAX_DISKS_PER_TIER]; } STier; #define TIER_LEVEL(pt) ((pt)->level) @@ -83,12 +83,11 @@ typedef struct STier { #define TIER_AVAIL_DISKS(pt) ((pt)->tmeta.nAvailDisks) #define DISK_AT_TIER(pt, id) ((pt)->disks[id]) -int tfsInitTier(STier *pTier, int level); -void tfsDestroyTier(STier *pTier); -SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg); -void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta); -int tfsAllocDiskOnTier(STier *pTier); -void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta); +int32_t tfsInitTier(STier *pTier, int32_t level); +void tfsDestroyTier(STier *pTier); +SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg); +void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta); +int32_t tfsAllocDiskOnTier(STier *pTier); void tfsPosNextId(STier *pTier); #ifdef __cplusplus diff --git a/source/libs/tfs/src/tfsDisk.c b/source/libs/tfs/src/tfsDisk.c index 98656c84e6..dd4f349c42 100644 --- a/source/libs/tfs/src/tfsDisk.c +++ b/source/libs/tfs/src/tfsDisk.c @@ -19,7 +19,7 @@ SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir) { SDisk *pDisk = calloc(1, sizeof(SDisk)); if (pDisk == NULL) { - terrno = TSDB_CODE_FS_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -34,6 +34,7 @@ SDisk *tfsFreeDisk(SDisk *pDisk) { if (pDisk != NULL) { free(pDisk); } + return NULL; } @@ -44,17 +45,15 @@ int32_t tfsUpdateDiskInfo(SDisk *pDisk) { } SysDiskSize diskSize = {0}; - - int32_t code = taosGetDiskSize(pDisk->dir, &diskSize); - if (code != 0) { + if (taosGetDiskSize(pDisk->dir, &diskSize) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); fError("failed to update disk information at level %d id %d dir %s since %s", pDisk->level, pDisk->id, pDisk->dir, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); + return -1 } pDisk->dmeta.size = diskSize.tsize; pDisk->dmeta.used = diskSize.used; pDisk->dmeta.free = diskSize.avail; - - return code; + return 0; } diff --git a/source/libs/tfs/src/tfsTier.c b/source/libs/tfs/src/tfsTier.c index 90057e61d5..8b057d8755 100644 --- a/source/libs/tfs/src/tfsTier.c +++ b/source/libs/tfs/src/tfsTier.c @@ -16,45 +16,50 @@ #define _DEFAULT_SOURCE #include "tfsInt.h" -#define tfsLockTier(pTier) pthread_spin_lock(&((pTier)->lock)) -#define tfsUnLockTier(pTier) pthread_spin_unlock(&((pTier)->lock)) +#define tfsLockTier(pTier) pthread_spin_lock(&(pTier)->lock) +#define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock) -// PROTECTED ========================================== -int tfsInitTier(STier *pTier, int level) { - memset((void *)pTier, 0, sizeof(*pTier)); +int32_t tfsInitTier(STier *pTier, int32_t level) { + if (pTier == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } - int code = pthread_spin_init(&(pTier->lock), 0); - if (code) { + memset(pTier, 0, sizeof(STier)); + + int32_t code = pthread_spin_init(&pTier->lock, 0); + if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); return -1; } pTier->level = level; - return 0; } void tfsDestroyTier(STier *pTier) { - for (int id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) { + if (pTier == NULL) return; + + for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) { DISK_AT_TIER(pTier, id) = tfsFreeDisk(DISK_AT_TIER(pTier, id)); } pTier->ndisk = 0; - pthread_spin_destroy(&(pTier->lock)); } SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) { - ASSERT(pTier->level == pCfg->level); - - int id = 0; - SDisk *pDisk; + if (pTier == NULL || pCfg == NULL || pTier->level != pCfg->level) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } if (TIER_NDISKS(pTier) >= TSDB_MAX_DISKS_PER_TIER) { terrno = TSDB_CODE_FS_TOO_MANY_MOUNT; return NULL; } + int32_t id = 0; if (pTier->level == 0) { if (DISK_AT_TIER(pTier, 0) != NULL) { id = pTier->ndisk; @@ -73,30 +78,31 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) { id = pTier->ndisk; } - pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir); + SDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir); if (pDisk == NULL) return NULL; + DISK_AT_TIER(pTier, id) = pDisk; pTier->ndisk++; fInfo("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id); - return DISK_AT_TIER(pTier, id); } void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta) { - STierMeta tmeta; + STierMeta tmeta = {0}; if (pTierMeta == NULL) { pTierMeta = &tmeta; } - memset(pTierMeta, 0, sizeof(*pTierMeta)); + memset(pTierMeta, 0, sizeof(STierMeta)); tfsLockTier(pTier); - for (int id = 0; id < pTier->ndisk; id++) { - if (tfsUpdateDiskInfo(DISK_AT_TIER(pTier, id)) < 0) { + for (int32_t id = 0; id < pTier->ndisk; id++) { + if (tfsUpdateDiskInfo(DISK_AT_TIER(pTier, id)) != 0) { continue; } + pTierMeta->size += DISK_SIZE(DISK_AT_TIER(pTier, id)); pTierMeta->used += DISK_USED_SIZE(DISK_AT_TIER(pTier, id)); pTierMeta->free += DISK_FREE_SIZE(DISK_AT_TIER(pTier, id)); @@ -109,22 +115,26 @@ void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta) { } // Round-Robin to allocate disk on a tier -int tfsAllocDiskOnTier(STier *pTier) { - ASSERT(pTier->ndisk > 0); - int id = TFS_UNDECIDED_ID; - SDisk *pDisk; +int32_t tfsAllocDiskOnTier(STier *pTier) { + if (pTier == NULL || pTier->ndisk <= 0) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } tfsLockTier(pTier); if (TIER_AVAIL_DISKS(pTier) <= 0) { tfsUnLockTier(pTier); - return id; + return TFS_UNDECIDED_ID; } - id = pTier->nextid; + int32_t id = pTier->nextid; while (true) { - pDisk = DISK_AT_TIER(pTier, id); - ASSERT(pDisk != NULL); + SDisk *pDisk = DISK_AT_TIER(pTier, id); + if (pDisk == NULL) { + tfsUnLockTier(pTier); + return TFS_UNDECIDED_ID; + } if (DISK_FREE_SIZE(pDisk) < TFS_MIN_DISK_FREE_SIZE) { id = (id + 1) % pTier->ndisk; @@ -145,7 +155,7 @@ int tfsAllocDiskOnTier(STier *pTier) { } void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta) { - ASSERT(pTierMeta != NULL); + if (pTierMeta == NULL || pTierMeta == NULL) return; tfsLockTier(pTier); *pTierMeta = pTier->tmeta; @@ -153,10 +163,11 @@ void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta) { } void tfsPosNextId(STier *pTier) { - ASSERT(pTier->ndisk > 0); - int nextid = 0; + if (pTier == NULL || pTier->ndisk <= 0) return; - for (int id = 1; id < pTier->ndisk; id++) { + int32_t nextid = 0; + + for (int32_t id = 1; id < pTier->ndisk; id++) { SDisk *pLDisk = DISK_AT_TIER(pTier, nextid); SDisk *pDisk = DISK_AT_TIER(pTier, id); if (DISK_FREE_SIZE(pDisk) > TFS_MIN_DISK_FREE_SIZE && DISK_FREE_SIZE(pDisk) > DISK_FREE_SIZE(pLDisk)) { diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 8294bca959..6d0547c7ae 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -387,7 +387,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit") // tfs -TAOS_DEFINE_ERROR(TSDB_CODE_FS_OUT_OF_MEMORY, "tfs out of memory") +TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config") TAOS_DEFINE_ERROR(TSDB_CODE_FS_TOO_MANY_MOUNT, "tfs too many mount") TAOS_DEFINE_ERROR(TSDB_CODE_FS_DUP_PRIMARY, "tfs duplicate primary mount") From 7087e4427b90e5d51c42a42f18bff378b0517187 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 12 Jan 2022 23:15:12 +0800 Subject: [PATCH 04/14] minor changes --- include/libs/tfs/tfs.h | 54 ++++++++++---------- source/libs/tfs/inc/tfsInt.h | 8 +-- source/libs/tfs/src/tfs.c | 98 ++++++++++++++++++------------------ 3 files changed, 80 insertions(+), 80 deletions(-) diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index 793c861363..973231c264 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TD_TFS_H -#define TD_TFS_H +#ifndef _TD_TFS_H_ +#define _TD_TFS_H_ #include "tglobal.h" @@ -23,8 +23,8 @@ extern "C" { #endif typedef struct { - int level; - int id; + int32_t level; + int32_t id; } SDiskID; #define TFS_UNDECIDED_LEVEL -1 @@ -48,21 +48,21 @@ typedef struct { int16_t nAvailDisks; // # of Available disks } STierMeta; -int tfsInit(SDiskCfg *pDiskCfg, int ndisk); -void tfsCleanup(); -void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels); -void tfsGetMeta(SFSMeta *pMeta); -void tfsAllocDisk(int expLevel, int *level, int *id); +int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk); +void tfsCleanup(); +void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels); +void tfsGetMeta(SFSMeta *pMeta); +void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id); const char *TFS_PRIMARY_PATH(); -const char *TFS_DISK_PATH(int level, int id); +const char *TFS_DISK_PATH(int32_t level, int32_t id); // TFILE APIs ==================================== typedef struct { - int level; - int id; - char rname[TSDB_FILENAME_LEN]; // REL name - char aname[TSDB_FILENAME_LEN]; // ABS name + int32_t level; + int32_t id; + char rname[TSDB_FILENAME_LEN]; // REL name + char aname[TSDB_FILENAME_LEN]; // ABS name } TFILE; #define TFILE_LEVEL(pf) ((pf)->level) @@ -76,23 +76,23 @@ typedef struct { #define tfscopy(sf, df) taosCopyFile(TFILE_NAME(sf), TFILE_NAME(df)) #define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df)) -void tfsInitFile(TFILE *pf, int level, int id, const char *bname); -bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2); -int tfsEncodeFile(void **buf, TFILE *pf); -void *tfsDecodeFile(void *buf, TFILE *pf); -void tfsbasename(const TFILE *pf, char *dest); -void tfsdirname(const TFILE *pf, char *dest); +void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname); +bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2); +int32_t tfsEncodeFile(void **buf, TFILE *pf); +void *tfsDecodeFile(void *buf, TFILE *pf); +void tfsbasename(const TFILE *pf, char *dest); +void tfsdirname(const TFILE *pf, char *dest); // DIR APIs ==================================== -int tfsMkdirAt(const char *rname, int level, int id); -int tfsMkdirRecurAt(const char *rname, int level, int id); -int tfsMkdir(const char *rname); -int tfsRmdir(const char *rname); -int tfsRename(char *orname, char *nrname); +int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id); +int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id); +int32_t tfsMkdir(const char *rname); +int32_t tfsRmdir(const char *rname); +int32_t tfsRename(char *orname, char *nrname); typedef struct TDIR TDIR; -TDIR * tfsOpendir(const char *rname); +TDIR *tfsOpendir(const char *rname); const TFILE *tfsReaddir(TDIR *tdir); void tfsClosedir(TDIR *tdir); @@ -100,4 +100,4 @@ void tfsClosedir(TDIR *tdir); } #endif -#endif +#endif /*_TD_TFS_H_*/ diff --git a/source/libs/tfs/inc/tfsInt.h b/source/libs/tfs/inc/tfsInt.h index 4e53fcdb8e..bfa4c4380d 100644 --- a/source/libs/tfs/inc/tfsInt.h +++ b/source/libs/tfs/inc/tfsInt.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef TD_TFSINT_H -#define TD_TFSINT_H +#ifndef _TD_TFSINT_H_ +#define _TD_TFSINT_H_ #include "os.h" @@ -88,10 +88,10 @@ void tfsDestroyTier(STier *pTier); SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg); void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta); int32_t tfsAllocDiskOnTier(STier *pTier); -void tfsPosNextId(STier *pTier); +void tfsPosNextId(STier *pTier); #ifdef __cplusplus } #endif -#endif +#endif /*_TD_TFSINT_H_*/ diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index a054981961..bbeb66fe05 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -21,9 +21,9 @@ typedef struct { pthread_spinlock_t lock; SFSMeta meta; - int nlevel; + int32_t nlevel; STier tiers[TSDB_MAX_TIERS]; - SHashObj * map; // name to did map + SHashObj *map; // name to did map } SFS; typedef struct { @@ -47,21 +47,21 @@ static SFS tfs = {0}; static SFS *pfs = &tfs; // STATIC DECLARATION -static int tfsMount(SDiskCfg *pCfg); -static int tfsCheck(); -static int tfsCheckAndFormatCfg(SDiskCfg *pCfg); -static int tfsFormatDir(char *idir, char *odir); -static SDisk *tfsGetDiskByID(SDiskID did); -static SDisk *tfsGetDiskByName(const char *dir); -static int tfsOpendirImpl(TDIR *tdir); -static void tfsInitDiskIter(SDiskIter *pIter); -static SDisk *tfsNextDisk(SDiskIter *pIter); +static int32_t tfsMount(SDiskCfg *pCfg); +static int32_t tfsCheck(); +static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg); +static int32_t tfsFormatDir(char *idir, char *odir); +static SDisk *tfsGetDiskByID(SDiskID did); +static SDisk *tfsGetDiskByName(const char *dir); +static int32_t tfsOpendirImpl(TDIR *tdir); +static void tfsInitDiskIter(SDiskIter *pIter); +static SDisk *tfsNextDisk(SDiskIter *pIter); // FS APIs ==================================== -int tfsInit(SDiskCfg *pDiskCfg, int ndisk) { +int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) { ASSERT(ndisk > 0); - for (int level = 0; level < TSDB_MAX_TIERS; level++) { + for (int32_t level = 0; level < TSDB_MAX_TIERS; level++) { if (tfsInitTier(TFS_TIER_AT(level), level) < 0) { while (true) { level--; @@ -84,7 +84,7 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) { return -1; } - for (int idisk = 0; idisk < ndisk; idisk++) { + for (int32_t idisk = 0; idisk < ndisk; idisk++) { if (tfsMount(pDiskCfg + idisk) < 0) { tfsCleanup(); return -1; @@ -97,7 +97,7 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) { } tfsUpdateInfo(NULL, NULL, 0); - for (int level = 0; level < TFS_NLEVEL(); level++) { + for (int32_t level = 0; level < TFS_NLEVEL(); level++) { tfsPosNextId(TFS_TIER_AT(level)); } @@ -109,7 +109,7 @@ void tfsCleanup() { pfs->map = NULL; pthread_spin_destroy(&(pfs->lock)); - for (int level = 0; level < TFS_NLEVEL(); level++) { + for (int32_t level = 0; level < TFS_NLEVEL(); level++) { tfsDestroyTier(TFS_TIER_AT(level)); } } @@ -124,7 +124,7 @@ void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) { memset(pFSMeta, 0, sizeof(*pFSMeta)); - for (int level = 0; level < TFS_NLEVEL(); level++) { + for (int32_t level = 0; level < TFS_NLEVEL(); level++) { STierMeta *pTierMeta = &tierMeta; if (tierMetas && level < numTiers) { pTierMeta = tierMetas + level; @@ -152,7 +152,7 @@ void tfsGetMeta(SFSMeta *pMeta) { /* Allocate an existing available tier level */ -void tfsAllocDisk(int expLevel, int *level, int *id) { +void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id) { ASSERT(expLevel >= 0); *level = expLevel; @@ -177,10 +177,10 @@ void tfsAllocDisk(int expLevel, int *level, int *id) { } const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); } -const char *TFS_DISK_PATH(int level, int id) { return DISK_DIR(TFS_DISK_AT(level, id)); } +const char *TFS_DISK_PATH(int32_t level, int32_t id) { return DISK_DIR(TFS_DISK_AT(level, id)); } // TFILE APIs ==================================== -void tfsInitFile(TFILE *pf, int level, int id, const char *bname) { +void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname) { ASSERT(TFS_IS_VALID_DISK(level, id)); SDisk *pDisk = TFS_DISK_AT(level, id); @@ -203,8 +203,8 @@ bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2) { return true; } -int tfsEncodeFile(void **buf, TFILE *pf) { - int tlen = 0; +int32_t tfsEncodeFile(void **buf, TFILE *pf) { + int32_t tlen = 0; tlen += taosEncodeVariantI32(buf, pf->level); tlen += taosEncodeVariantI32(buf, pf->id); @@ -215,7 +215,7 @@ int tfsEncodeFile(void **buf, TFILE *pf) { void *tfsDecodeFile(void *buf, TFILE *pf) { int32_t level, id; - char * rname; + char *rname; buf = taosDecodeVariantI32(buf, &(level)); buf = taosDecodeVariantI32(buf, &(id)); @@ -242,7 +242,7 @@ void tfsdirname(const TFILE *pf, char *dest) { } // DIR APIs ==================================== -int tfsMkdirAt(const char *rname, int level, int id) { +int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id) { SDisk *pDisk = TFS_DISK_AT(level, id); char aname[TMPNAME_LEN]; @@ -255,7 +255,7 @@ int tfsMkdirAt(const char *rname, int level, int id) { return 0; } -int tfsMkdirRecurAt(const char *rname, int level, int id) { +int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id) { if (tfsMkdirAt(rname, level, id) < 0) { if (errno == ENOENT) { // Try to create upper @@ -288,10 +288,10 @@ int tfsMkdirRecurAt(const char *rname, int level, int id) { return 0; } -int tfsMkdir(const char *rname) { - for (int level = 0; level < TFS_NLEVEL(); level++) { +int32_t tfsMkdir(const char *rname) { + for (int32_t level = 0; level < TFS_NLEVEL(); level++) { STier *pTier = TFS_TIER_AT(level); - for (int id = 0; id < TIER_NDISKS(pTier); id++) { + for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) { if (tfsMkdirAt(rname, level, id) < 0) { return -1; } @@ -301,12 +301,12 @@ int tfsMkdir(const char *rname) { return 0; } -int tfsRmdir(const char *rname) { +int32_t tfsRmdir(const char *rname) { char aname[TMPNAME_LEN] = "\0"; - for (int level = 0; level < TFS_NLEVEL(); level++) { + for (int32_t level = 0; level < TFS_NLEVEL(); level++) { STier *pTier = TFS_TIER_AT(level); - for (int id = 0; id < TIER_NDISKS(pTier); id++) { + for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) { SDisk *pDisk = DISK_AT_TIER(pTier, id); snprintf(aname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), rname); @@ -318,13 +318,13 @@ int tfsRmdir(const char *rname) { return 0; } -int tfsRename(char *orname, char *nrname) { +int32_t tfsRename(char *orname, char *nrname) { char oaname[TMPNAME_LEN] = "\0"; char naname[TMPNAME_LEN] = "\0"; - for (int level = 0; level < pfs->nlevel; level++) { + for (int32_t level = 0; level < pfs->nlevel; level++) { STier *pTier = TFS_TIER_AT(level); - for (int id = 0; id < TIER_NDISKS(pTier); id++) { + for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) { SDisk *pDisk = DISK_AT_TIER(pTier, id); snprintf(oaname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), orname); @@ -339,11 +339,11 @@ int tfsRename(char *orname, char *nrname) { struct TDIR { SDiskIter iter; - int level; - int id; + int32_t level; + int32_t id; char dirname[TSDB_FILENAME_LEN]; TFILE tfile; - DIR * dir; + DIR *dir; }; TDIR *tfsOpendir(const char *rname) { @@ -402,9 +402,9 @@ void tfsClosedir(TDIR *tdir) { } // private -static int tfsMount(SDiskCfg *pCfg) { +static int32_t tfsMount(SDiskCfg *pCfg) { SDiskID did; - SDisk * pDisk = NULL; + SDisk *pDisk = NULL; if (tfsCheckAndFormatCfg(pCfg) < 0) return -1; @@ -422,7 +422,7 @@ static int tfsMount(SDiskCfg *pCfg) { return 0; } -static int tfsCheckAndFormatCfg(SDiskCfg *pCfg) { +static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg) { char dirName[TSDB_FILENAME_LEN] = "\0"; struct stat pstat; @@ -481,10 +481,10 @@ static int tfsCheckAndFormatCfg(SDiskCfg *pCfg) { return 0; } -static int tfsFormatDir(char *idir, char *odir) { +static int32_t tfsFormatDir(char *idir, char *odir) { wordexp_t wep = {0}; - int code = wordexp(idir, &wep, 0); + int32_t code = wordexp(idir, &wep, 0); if (code != 0) { terrno = TAOS_SYSTEM_ERROR(code); return -1; @@ -502,14 +502,14 @@ static int tfsFormatDir(char *idir, char *odir) { return 0; } -static int tfsCheck() { +static int32_t tfsCheck() { if (TFS_PRIMARY_DISK() == NULL) { fError("no primary disk is set"); terrno = TSDB_CODE_FS_NO_PRIMARY_DISK; return -1; } - for (int level = 0; level < TFS_NLEVEL(); level++) { + for (int32_t level = 0; level < TFS_NLEVEL(); level++) { if (TIER_NDISKS(TFS_TIER_AT(level)) == 0) { fError("no disk at level %d", level); terrno = TSDB_CODE_FS_NO_MOUNT_AT_TIER; @@ -523,8 +523,8 @@ static int tfsCheck() { static SDisk *tfsGetDiskByID(SDiskID did) { return TFS_DISK_AT(did.level, did.id); } static SDisk *tfsGetDiskByName(const char *dir) { SDiskID did; - SDisk * pDisk = NULL; - void * pr = NULL; + SDisk *pDisk = NULL; + void *pr = NULL; pr = taosHashGet(pfs->map, (void *)dir, strnlen(dir, TSDB_FILENAME_LEN)); if (pr == NULL) return NULL; @@ -536,7 +536,7 @@ static SDisk *tfsGetDiskByName(const char *dir) { return pDisk; } -static int tfsOpendirImpl(TDIR *tdir) { +static int32_t tfsOpendirImpl(TDIR *tdir) { SDisk *pDisk = NULL; char adir[TMPNAME_LEN * 2] = "\0"; @@ -567,8 +567,8 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) { if (pDisk == NULL) return NULL; - int level = DISK_LEVEL(pDisk); - int id = DISK_ID(pDisk); + int32_t level = DISK_LEVEL(pDisk); + int32_t id = DISK_ID(pDisk); id++; if (id < TIER_NDISKS(TFS_TIER_AT(level))) { From 0c5a85d463d011295bc1486e62d2537f618c2893 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 17 Jan 2022 13:58:43 +0800 Subject: [PATCH 05/14] minor changes --- source/libs/tfs/src/tfs.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index bbeb66fe05..87adba6bea 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -79,7 +79,7 @@ int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) { pfs->map = taosHashInit(TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); if (pfs->map == NULL) { - terrno = TSDB_CODE_FS_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; tfsCleanup(); return -1; } @@ -318,6 +318,7 @@ int32_t tfsRmdir(const char *rname) { return 0; } +#if 0 int32_t tfsRename(char *orname, char *nrname) { char oaname[TMPNAME_LEN] = "\0"; char naname[TMPNAME_LEN] = "\0"; @@ -336,6 +337,7 @@ int32_t tfsRename(char *orname, char *nrname) { return 0; } +#endif struct TDIR { SDiskIter iter; From 913455684f950cf30dd2ebde77c14318d65a6370 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 18 Jan 2022 14:12:25 +0800 Subject: [PATCH 06/14] feature/qnode --- include/libs/executor/dataSinkMgt.h | 2 +- include/libs/executor/executor.h | 6 +- source/libs/executor/inc/dataSinkInt.h | 2 +- source/libs/executor/src/dataDispatcher.c | 4 +- source/libs/executor/src/dataSinkMgt.c | 2 +- source/libs/executor/src/executorMain.c | 26 +++--- source/libs/executor/test/executorTests.cpp | 3 +- source/libs/qworker/src/qworker.c | 48 ++++++++--- source/libs/scheduler/src/scheduler.c | 11 ++- source/libs/scheduler/test/schedulerTests.cpp | 80 ++++++++++++++----- 10 files changed, 132 insertions(+), 52 deletions(-) diff --git a/include/libs/executor/dataSinkMgt.h b/include/libs/executor/dataSinkMgt.h index 5cef3b2253..371cb12405 100644 --- a/include/libs/executor/dataSinkMgt.h +++ b/include/libs/executor/dataSinkMgt.h @@ -70,7 +70,7 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH */ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue); -void dsEndPut(DataSinkHandle handle, int64_t useconds); +void dsEndPut(DataSinkHandle handle, uint64_t useconds); /** * Get the length of the data returned by the next call to dsGetDataBlock. diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index e4f8d291eb..0fc7fd679e 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -20,6 +20,8 @@ extern "C" { #endif +#include "common.h" + typedef void* qTaskInfo_t; typedef void* DataSinkHandle; struct SSubplan; @@ -34,7 +36,7 @@ struct SSubplan; * @param qId * @return */ -int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo); +int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle); /** * The main task execution function, including query on both table and multiple tables, @@ -44,7 +46,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskI * @param handle * @return */ -struct SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle); +int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds); /** * Retrieve the produced results information, if current query is not paused or completed, diff --git a/source/libs/executor/inc/dataSinkInt.h b/source/libs/executor/inc/dataSinkInt.h index 69727626af..7003564365 100644 --- a/source/libs/executor/inc/dataSinkInt.h +++ b/source/libs/executor/inc/dataSinkInt.h @@ -32,7 +32,7 @@ typedef struct SDataSinkManager { } SDataSinkManager; typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue); -typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds); +typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds); typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd); typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput); typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle); diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 8280f9d0af..edba4fc97d 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -44,7 +44,7 @@ typedef struct SDataDispatchHandle { SDataDispatchBuf nextOutput; int32_t status; bool queryEnd; - int64_t useconds; + uint64_t useconds; pthread_mutex_t mutex; } SDataDispatchHandle; @@ -158,7 +158,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput, return TSDB_CODE_SUCCESS; } -static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) { +static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) { SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle; pthread_mutex_lock(&pDispatcher->mutex); pDispatcher->queryEnd = true; diff --git a/source/libs/executor/src/dataSinkMgt.c b/source/libs/executor/src/dataSinkMgt.c index 80d99f96c6..eb1f75f359 100644 --- a/source/libs/executor/src/dataSinkMgt.c +++ b/source/libs/executor/src/dataSinkMgt.c @@ -37,7 +37,7 @@ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pC return pHandleImpl->fPut(pHandleImpl, pInput, pContinue); } -void dsEndPut(DataSinkHandle handle, int64_t useconds) { +void dsEndPut(DataSinkHandle handle, uint64_t useconds) { SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle; return pHandleImpl->fEndPut(pHandleImpl, useconds); } diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index e451f888bb..1f5d0cd059 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -68,7 +68,7 @@ void freeParam(STaskParam *param) { tfree(param->prevResult); } -int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo) { +int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) { assert(tsdb != NULL && pSubplan != NULL); SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo; @@ -85,6 +85,8 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_ code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle); + *handle = (*pTask)->dsHandle; + _error: // if failed to add ref for all tables in this query, abort current query return code; @@ -135,16 +137,18 @@ int waitMoment(SQInfo* pQInfo){ } #endif -SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { +int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; int64_t threadId = taosGetSelfPthreadId(); + *pRes = NULL; + int64_t curOwner = 0; if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) { qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner); pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC; - return NULL; + return pTaskInfo->code; } if (pTaskInfo->cost.start == 0) { @@ -153,7 +157,7 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { if (isTaskKilled(pTaskInfo)) { qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo)); - return NULL; + return TSDB_CODE_SUCCESS; } // STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv; @@ -170,7 +174,7 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { pTaskInfo->code = ret; qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code)); - return NULL; + return pTaskInfo->code; } qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo)); @@ -179,21 +183,21 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) { publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); int64_t st = 0; - if (handle) { - *handle = pTaskInfo->dsHandle; - } - st = taosGetTimestampUs(); - SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); + *pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup); pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st); publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); + if (NULL == *pRes) { + *useconds = pTaskInfo->cost.elapsedTime; + } + qDebug("QInfo:0x%" PRIx64 " query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d", GET_TASKID(pTaskInfo), 0, 0L, 0); atomic_store_64(&pTaskInfo->owner, 0); - return pRes; + return pTaskInfo->code; } int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) { diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index 5f5fddbe28..c528d879a3 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -217,5 +217,6 @@ TEST(testCase, build_executor_tree_Test) { "}"; SExecTaskInfo* pTaskInfo = nullptr; - int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo); + DataSinkHandle sinkHandle = nullptr; + int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo, &sinkHandle); } \ No newline at end of file diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 61bd82a49a..a0beaba61d 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -458,6 +458,37 @@ _return: QW_RET(code); } +int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) { + int32_t code = 0; + bool qcontinue = true; + SSDataBlock* pRes = NULL; + uint64_t useconds = 0; + + while (qcontinue) { + code = qExecTask(taskHandle, &pRes, &useconds); + if (code) { + QW_TASK_ELOG("qExecTask failed, code:%x", code); + QW_ERR_JRET(code); + } + + if (NULL == pRes) { + QW_TASK_DLOG("query done, useconds:%"PRIu64, useconds); + dsEndPut(sinkHandle, useconds); + break; + } + + SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL}; + code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue); + if (code) { + QW_TASK_ELOG("dsPutDataBlock failed, code:%x", code); + QW_ERR_JRET(code); + } + } + +_return: + + QW_RET(code); +} int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) { @@ -733,7 +764,9 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t } qTaskInfo_t pTaskInfo = NULL; - code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo); + DataSinkHandle sinkHandle = NULL; + + code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle); if (code) { QW_TASK_ELOG("qCreateExecTask failed, code:%x", code); QW_ERR_JRET(code); @@ -743,12 +776,7 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t queryRsped = true; - DataSinkHandle sinkHandle = NULL; - SSDataBlock* pRes = qExecTask(pTaskInfo, &sinkHandle); - if (code) { - QW_TASK_ELOG("qExecTask failed, code:%x", code); - QW_ERR_JRET(code); - } + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), pTaskInfo, sinkHandle)); _return: @@ -840,11 +868,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t qTaskInfo_t taskHandle = ctx->taskHandle; DataSinkHandle sinkHandle = ctx->sinkHandle; - code = qExecTask(taskHandle, &sinkHandle); - if (code) { - QW_TASK_ELOG("qExecTask failed, code:%x", code); - QW_ERR_JRET(code); - } + QW_ERR_JRET(qwExecTask(QW_FPARAMS(), taskHandle, sinkHandle)); QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 2d5322fc2c..c53926f8c1 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -412,6 +412,8 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + + ++addNum; } } @@ -792,6 +794,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch if (rspCode != TSDB_CODE_SUCCESS) { SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); } + + SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg; + if (rsp) { + pJob->resNumOfRows += rsp->affectedRows; + } #endif SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask)); @@ -1355,9 +1362,9 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru SSchJob *job = NULL; - SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, true)); + SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true)); - *pJob = job; + job = *pJob; pRes->code = atomic_load_32(&job->errCode); pRes->numOfRows = job->resNumOfRows; diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index d72c4920d5..5332c6fcd1 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -34,10 +34,12 @@ #include "stub.h" #include "addr_any.h" + namespace { extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); + void schtInitLogFile() { const char *defaultLogFileNamePrefix = "taoslog"; const int32_t maxLogFileNum = 10; @@ -113,9 +115,9 @@ void schtBuildInsertDag(SQueryDag *dag) { dag->queryId = qId; dag->numOfSubplans = 2; dag->pSubplans = taosArrayInit(1, POINTER_BYTES); - SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan)); + SArray *inserta = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); - SSubplan insertPlan[2] = {0}; + SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan)); insertPlan[0].id.queryId = qId; insertPlan[0].id.templateId = 0x0000000000000003; @@ -131,6 +133,7 @@ void schtBuildInsertDag(SQueryDag *dag) { insertPlan[0].pParents = NULL; insertPlan[0].pNode = NULL; insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); + insertPlan[0].msgType = TDMT_VND_SUBMIT; insertPlan[1].id.queryId = qId; insertPlan[1].id.templateId = 0x0000000000000003; @@ -146,10 +149,11 @@ void schtBuildInsertDag(SQueryDag *dag) { insertPlan[1].pParents = NULL; insertPlan[1].pNode = NULL; insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); + insertPlan[1].msgType = TDMT_VND_SUBMIT; - - taosArrayPush(inserta, &insertPlan[0]); - taosArrayPush(inserta, &insertPlan[1]); + taosArrayPush(inserta, &insertPlan); + insertPlan += 1; + taosArrayPush(inserta, &insertPlan); taosArrayPush(dag->pSubplans, &inserta); } @@ -210,6 +214,24 @@ void schtSetRpcSendRequest() { } } +int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { + return 0; +} + + +void schtSetAsyncSendMsgToServer() { + static Stub stub; + stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result); + for (const auto& f : result) { + stub.set(f.second, schtAsyncSendMsgToServer); + } + } +} + void *schtSendRsp(void *param) { SSchJob *job = NULL; @@ -230,7 +252,7 @@ void *schtSendRsp(void *param) { SShellSubmitRsp rsp = {0}; rsp.affectedRows = 10; - schHandleResponseMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0); + schHandleResponseMsg(job, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0); pIter = taosHashIterate(job->execTasks, pIter); } @@ -238,6 +260,23 @@ void *schtSendRsp(void *param) { return NULL; } +void *schtCreateFetchRspThread(void *param) { + struct SSchJob* job = (struct SSchJob*)param; + + sleep(1); + + int32_t code = 0; + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp)); + rsp->completed = 1; + rsp->numOfRows = 10; + code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(rsp), 0); + + assert(code == 0); +} + + + + struct SSchJob *pInsertJob = NULL; } @@ -266,6 +305,7 @@ TEST(queryTest, normalCase) { schtSetPlanToString(); schtSetExecNode(); + schtSetAsyncSendMsgToServer(); code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); ASSERT_EQ(code, 0); @@ -276,7 +316,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -287,8 +327,8 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; - code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); - + code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + printf("code:%d", code); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); } @@ -298,7 +338,7 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); @@ -309,22 +349,19 @@ TEST(queryTest, normalCase) { SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; - code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(job->execTasks, pIter); } - SRetrieveTableRsp rsp = {0}; - rsp.completed = 1; - rsp.numOfRows = 10; - code = schHandleResponseMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0); - - ASSERT_EQ(code, 0); + pthread_attr_t thattr; + pthread_attr_init(&thattr); + pthread_t thread1; + pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job); - void *data = NULL; - + void *data = NULL; code = scheduleFetchRows(job, &data); ASSERT_EQ(code, 0); @@ -340,6 +377,8 @@ TEST(queryTest, normalCase) { scheduleFreeJob(pJob); schtFreeQueryDag(&dag); + + schedulerDestroy(); } @@ -369,6 +408,7 @@ TEST(insertTest, normalCase) { schtBuildInsertDag(&dag); schtSetPlanToString(); + schtSetAsyncSendMsgToServer(); pthread_attr_t thattr; pthread_attr_init(&thattr); @@ -382,6 +422,8 @@ TEST(insertTest, normalCase) { ASSERT_EQ(res.numOfRows, 20); scheduleFreeJob(pInsertJob); + + schedulerDestroy(); } TEST(multiThread, forceFree) { From d6359f3ab11b7001af21b147dbcd622b6f1739b2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 17 Jan 2022 23:20:11 -0800 Subject: [PATCH 07/14] refact tier and disk in tfs module --- include/libs/tfs/tfs.h | 12 +--- include/os/osSysinfo.h | 6 +- source/libs/tfs/inc/tfsInt.h | 47 +++++-------- source/libs/tfs/src/tfs.c | 64 ++++++++---------- source/libs/tfs/src/tfsDisk.c | 31 ++++----- source/libs/tfs/src/tfsTier.c | 122 ++++++++++++---------------------- source/os/src/osSysinfo.c | 9 ++- 7 files changed, 109 insertions(+), 182 deletions(-) diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index 973231c264..3828a93144 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -36,22 +36,14 @@ typedef struct { // FS APIs ==================================== typedef struct { - int64_t tsize; + int64_t total; int64_t used; int64_t avail; } SFSMeta; -typedef struct { - int64_t size; - int64_t used; - int64_t free; - int16_t nAvailDisks; // # of Available disks -} STierMeta; - int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk); void tfsCleanup(); -void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels); -void tfsGetMeta(SFSMeta *pMeta); +void tfsUpdateSize(SFSMeta *pFSMeta); void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id); const char *TFS_PRIMARY_PATH(); diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index 36ff4194d8..9dcb075489 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -35,12 +35,12 @@ extern char tsLocale[]; extern char tsCharset[]; // default encode string typedef struct { - int64_t tsize; + int64_t total; int64_t used; int64_t avail; -} SysDiskSize; +} SDiskSize; -int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize); +int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize); int32_t taosGetCpuCores(); void taosGetSystemInfo(); bool taosReadProcIO(int64_t *rchars, int64_t *wchars); diff --git a/source/libs/tfs/inc/tfsInt.h b/source/libs/tfs/inc/tfsInt.h index bfa4c4380d..ce1436eb29 100644 --- a/source/libs/tfs/inc/tfsInt.h +++ b/source/libs/tfs/inc/tfsInt.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_TFSINT_H_ -#define _TD_TFSINT_H_ +#ifndef _TD_TFS_INT_H_ +#define _TD_TFS_INT_H_ #include "os.h" @@ -39,54 +39,39 @@ extern int32_t fsDebugFlag; // Global Definitions #define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024 -// tdisk.c ====================================================== -typedef struct { - int64_t size; - int64_t used; - int64_t free; -} SDiskMeta; - typedef struct SDisk { int32_t level; int32_t id; - char dir[TSDB_FILENAME_LEN]; - SDiskMeta dmeta; + char *path; + SDiskSize size; } SDisk; -#define DISK_LEVEL(pd) ((pd)->level) -#define DISK_ID(pd) ((pd)->id) -#define DISK_DIR(pd) ((pd)->dir) -#define DISK_META(pd) ((pd)->dmeta) -#define DISK_SIZE(pd) ((pd)->dmeta.size) -#define DISK_USED_SIZE(pd) ((pd)->dmeta.used) -#define DISK_FREE_SIZE(pd) ((pd)->dmeta.free) - -SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir); -SDisk *tfsFreeDisk(SDisk *pDisk); -int32_t tfsUpdateDiskInfo(SDisk *pDisk); - -// ttier.c ====================================================== - typedef struct STier { pthread_spinlock_t lock; int32_t level; - int16_t ndisk; // # of disks mounted to this tier - int16_t nextid; // next disk id to allocate - STierMeta tmeta; + int16_t nextid; // next disk id to allocate + int16_t ndisk; // # of disks mounted to this tier + int16_t nAvailDisks; // # of Available disks SDisk *disks[TSDB_MAX_DISKS_PER_TIER]; + SDiskSize size; } STier; #define TIER_LEVEL(pt) ((pt)->level) #define TIER_NDISKS(pt) ((pt)->ndisk) #define TIER_SIZE(pt) ((pt)->tmeta.size) #define TIER_FREE_SIZE(pt) ((pt)->tmeta.free) -#define TIER_AVAIL_DISKS(pt) ((pt)->tmeta.nAvailDisks) + #define DISK_AT_TIER(pt, id) ((pt)->disks[id]) +#define DISK_DIR(pd) ((pd)->path) + +SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir); +SDisk *tfsFreeDisk(SDisk *pDisk); +int32_t tfsUpdateDiskSize(SDisk *pDisk); int32_t tfsInitTier(STier *pTier, int32_t level); void tfsDestroyTier(STier *pTier); SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg); -void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta); +void tfsUpdateTierSize(STier *pTier); int32_t tfsAllocDiskOnTier(STier *pTier); void tfsPosNextId(STier *pTier); @@ -94,4 +79,4 @@ void tfsPosNextId(STier *pTier); } #endif -#endif /*_TD_TFSINT_H_*/ +#endif /*_TD_TFS_INT_H_*/ diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 87adba6bea..1d11cd6df2 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -59,7 +59,10 @@ static SDisk *tfsNextDisk(SDiskIter *pIter); // FS APIs ==================================== int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) { - ASSERT(ndisk > 0); + if (ndisk < 0) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } for (int32_t level = 0; level < TSDB_MAX_TIERS; level++) { if (tfsInitTier(TFS_TIER_AT(level), level) < 0) { @@ -96,7 +99,7 @@ int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) { return -1; } - tfsUpdateInfo(NULL, NULL, 0); + tfsUpdateSize(NULL); for (int32_t level = 0; level < TFS_NLEVEL(); level++) { tfsPosNextId(TFS_TIER_AT(level)); } @@ -114,27 +117,22 @@ void tfsCleanup() { } } -void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) { - SFSMeta fsMeta; - STierMeta tierMeta; +void tfsUpdateSize(SFSMeta *pFSMeta) { + SFSMeta fsMeta = {0}; + SDiskSize size = {0}; if (pFSMeta == NULL) { pFSMeta = &fsMeta; } - memset(pFSMeta, 0, sizeof(*pFSMeta)); + memset(pFSMeta, 0, sizeof(SFSMeta)); for (int32_t level = 0; level < TFS_NLEVEL(); level++) { - STierMeta *pTierMeta = &tierMeta; - if (tierMetas && level < numTiers) { - pTierMeta = tierMetas + level; - } - STier *pTier = TFS_TIER_AT(level); - tfsUpdateTierInfo(pTier, pTierMeta); - pFSMeta->tsize += pTierMeta->size; - pFSMeta->avail += pTierMeta->free; - pFSMeta->used += pTierMeta->used; + tfsUpdateTierSize(pTier); + pFSMeta->total += pTier->size.total; + pFSMeta->avail += pTier->size.avail; + pFSMeta->used += pTier->size.used; } tfsLock(); @@ -142,14 +140,6 @@ void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) { tfsUnLock(); } -void tfsGetMeta(SFSMeta *pMeta) { - ASSERT(pMeta); - - tfsLock(); - *pMeta = pfs->meta; - tfsUnLock(); -} - /* Allocate an existing available tier level */ void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id) { @@ -307,9 +297,9 @@ int32_t tfsRmdir(const char *rname) { for (int32_t level = 0; level < TFS_NLEVEL(); level++) { STier *pTier = TFS_TIER_AT(level); for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) { - SDisk *pDisk = DISK_AT_TIER(pTier, id); + SDisk *pDisk = pTier->disks[id]; - snprintf(aname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), rname); + snprintf(aname, TMPNAME_LEN, "%s%s%s", DISK_DIR(pDisk), TS_PATH_DELIMITER, rname); taosRemoveDir(aname); } @@ -351,7 +341,7 @@ struct TDIR { TDIR *tfsOpendir(const char *rname) { TDIR *tdir = (TDIR *)calloc(1, sizeof(*tdir)); if (tdir == NULL) { - terrno = TSDB_CODE_FS_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -416,7 +406,7 @@ static int32_t tfsMount(SDiskCfg *pCfg) { fError("failed to mount disk %s to level %d since %s", pCfg->dir, pCfg->level, tstrerror(terrno)); return -1; } - did.id = DISK_ID(pDisk); + did.id = pDisk->id; taosHashPut(pfs->map, (void *)(pCfg->dir), strnlen(pCfg->dir, TSDB_FILENAME_LEN), (void *)(&did), sizeof(did)); if (pfs->nlevel < pCfg->level + 1) pfs->nlevel = pCfg->level + 1; @@ -551,10 +541,10 @@ static int32_t tfsOpendirImpl(TDIR *tdir) { pDisk = tfsNextDisk(&(tdir->iter)); if (pDisk == NULL) return 0; - tdir->level = DISK_LEVEL(pDisk); - tdir->id = DISK_ID(pDisk); + tdir->level = pDisk->level; + tdir->id = pDisk->id; - snprintf(adir, TMPNAME_LEN * 2, "%s/%s", DISK_DIR(pDisk), tdir->dirname); + snprintf(adir, TMPNAME_LEN * 2, "%s%s%s", pDisk->path, TS_PATH_DELIMITER,tdir->dirname); tdir->dir = opendir(adir); if (tdir->dir != NULL) break; } @@ -569,8 +559,8 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) { if (pDisk == NULL) return NULL; - int32_t level = DISK_LEVEL(pDisk); - int32_t id = DISK_ID(pDisk); + int32_t level = pDisk->level; + int32_t id = pDisk->id; id++; if (id < TIER_NDISKS(TFS_TIER_AT(level))) { @@ -593,21 +583,21 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) { // OTHER FUNCTIONS =================================== void taosGetDisk() { const double unit = 1024 * 1024 * 1024; - SysDiskSize diskSize; + SDiskSize diskSize; SFSMeta fsMeta; - tfsUpdateInfo(&fsMeta, NULL, 0); - tsTotalDataDirGB = (float)(fsMeta.tsize / unit); + tfsUpdateSize(&fsMeta); + tsTotalDataDirGB = (float)(fsMeta.total / unit); tsUsedDataDirGB = (float)(fsMeta.used / unit); tsAvailDataDirGB = (float)(fsMeta.avail / unit); if (taosGetDiskSize(tsLogDir, &diskSize) == 0) { - tsTotalLogDirGB = (float)(diskSize.tsize / unit); + tsTotalLogDirGB = (float)(diskSize.total / unit); tsAvailLogDirGB = (float)(diskSize.avail / unit); } if (taosGetDiskSize(tsTempDir, &diskSize) == 0) { - tsTotalTmpDirGB = (float)(diskSize.tsize / unit); + tsTotalTmpDirGB = (float)(diskSize.total / unit); tsAvailTmpDirectorySpace = (float)(diskSize.avail / unit); } } diff --git a/source/libs/tfs/src/tfsDisk.c b/source/libs/tfs/src/tfsDisk.c index dd4f349c42..a5ef1121ff 100644 --- a/source/libs/tfs/src/tfsDisk.c +++ b/source/libs/tfs/src/tfsDisk.c @@ -16,44 +16,41 @@ #define _DEFAULT_SOURCE #include "tfsInt.h" -SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir) { +SDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) { SDisk *pDisk = calloc(1, sizeof(SDisk)); if (pDisk == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } + pDisk->path = strdup(path); + if (pDisk->path == NULL) { + free(pDisk); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + pDisk->level = level; pDisk->id = id; - tstrncpy(pDisk->dir, dir, TSDB_FILENAME_LEN); - + taosGetDiskSize(pDisk->path, &pDisk->size); return pDisk; } SDisk *tfsFreeDisk(SDisk *pDisk) { if (pDisk != NULL) { + free(pDisk->path); free(pDisk); } return NULL; } -int32_t tfsUpdateDiskInfo(SDisk *pDisk) { - if (pDisk == NULL) { - terrno = TSDB_CODE_INVALID_PARA; +int32_t tfsUpdateDiskSize(SDisk *pDisk) { + if (taosGetDiskSize(pDisk->path, &pDisk->size) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + fError("failed to get disk:%s size, level:%d id:%d since %s", pDisk->path, pDisk->level, pDisk->id, terrstr()); return -1; } - SysDiskSize diskSize = {0}; - if (taosGetDiskSize(pDisk->dir, &diskSize) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - fError("failed to update disk information at level %d id %d dir %s since %s", pDisk->level, pDisk->id, pDisk->dir, - strerror(errno)); - return -1 - } - - pDisk->dmeta.size = diskSize.tsize; - pDisk->dmeta.used = diskSize.used; - pDisk->dmeta.free = diskSize.avail; return 0; } diff --git a/source/libs/tfs/src/tfsTier.c b/source/libs/tfs/src/tfsTier.c index 8b057d8755..2a26c23ead 100644 --- a/source/libs/tfs/src/tfsTier.c +++ b/source/libs/tfs/src/tfsTier.c @@ -20,16 +20,10 @@ #define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock) int32_t tfsInitTier(STier *pTier, int32_t level) { - if (pTier == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - memset(pTier, 0, sizeof(STier)); - int32_t code = pthread_spin_init(&pTier->lock, 0); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); + if (pthread_spin_init(&pTier->lock, 0) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); return -1; } @@ -38,10 +32,8 @@ int32_t tfsInitTier(STier *pTier, int32_t level) { } void tfsDestroyTier(STier *pTier) { - if (pTier == NULL) return; - for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) { - DISK_AT_TIER(pTier, id) = tfsFreeDisk(DISK_AT_TIER(pTier, id)); + pTier->disks[id] = tfsFreeDisk(pTier->disks[id]); } pTier->ndisk = 0; @@ -49,19 +41,14 @@ void tfsDestroyTier(STier *pTier) { } SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) { - if (pTier == NULL || pCfg == NULL || pTier->level != pCfg->level) { - terrno = TSDB_CODE_INVALID_PARA; - return -1; - } - - if (TIER_NDISKS(pTier) >= TSDB_MAX_DISKS_PER_TIER) { + if (pTier->ndisk >= TSDB_MAX_DISKS_PER_TIER) { terrno = TSDB_CODE_FS_TOO_MANY_MOUNT; return NULL; } int32_t id = 0; if (pTier->level == 0) { - if (DISK_AT_TIER(pTier, 0) != NULL) { + if (pTier->disks[0] != NULL) { id = pTier->ndisk; } else { if (pCfg->primary) { @@ -69,108 +56,85 @@ SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) { } else { id = pTier->ndisk + 1; } - if (id >= TSDB_MAX_DISKS_PER_TIER) { - terrno = TSDB_CODE_FS_TOO_MANY_MOUNT; - return NULL; - } } } else { id = pTier->ndisk; } + if (id >= TSDB_MAX_DISKS_PER_TIER) { + terrno = TSDB_CODE_FS_TOO_MANY_MOUNT; + return NULL; + } + SDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir); if (pDisk == NULL) return NULL; - DISK_AT_TIER(pTier, id) = pDisk; + pTier->disks[id] = pDisk; pTier->ndisk++; fInfo("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id); - return DISK_AT_TIER(pTier, id); + return pTier->disks[id]; } -void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta) { - STierMeta tmeta = {0}; - - if (pTierMeta == NULL) { - pTierMeta = &tmeta; - } - memset(pTierMeta, 0, sizeof(STierMeta)); +void tfsUpdateTierSize(STier *pTier) { + SDiskSize size = {0}; + int16_t nAvailDisks = 0; tfsLockTier(pTier); for (int32_t id = 0; id < pTier->ndisk; id++) { - if (tfsUpdateDiskInfo(DISK_AT_TIER(pTier, id)) != 0) { - continue; - } + SDisk *pDisk = pTier->disks[id]; + if (pDisk == NULL) continue; - pTierMeta->size += DISK_SIZE(DISK_AT_TIER(pTier, id)); - pTierMeta->used += DISK_USED_SIZE(DISK_AT_TIER(pTier, id)); - pTierMeta->free += DISK_FREE_SIZE(DISK_AT_TIER(pTier, id)); - pTierMeta->nAvailDisks++; + size.total += pDisk->size.total; + size.used += pDisk->size.used; + size.avail += pDisk->size.avail; + nAvailDisks++; } - pTier->tmeta = *pTierMeta; + pTier->size = size; + pTier->nAvailDisks = nAvailDisks; tfsUnLockTier(pTier); } // Round-Robin to allocate disk on a tier int32_t tfsAllocDiskOnTier(STier *pTier) { - if (pTier == NULL || pTier->ndisk <= 0) { - terrno = TSDB_CODE_INVALID_PARA; + terrno = TSDB_CODE_FS_NO_VALID_DISK; + + tfsLockTier(pTier); + + if (pTier->ndisk <= 0 || pTier->nAvailDisks <= 0) { + tfsUnLockTier(pTier); return -1; } - tfsLockTier(pTier); + int32_t retId = -1; + for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; ++id) { + int32_t diskId = (pTier->nextid + id) % pTier->ndisk; + SDisk *pDisk = pTier->disks[diskId]; - if (TIER_AVAIL_DISKS(pTier) <= 0) { - tfsUnLockTier(pTier); - return TFS_UNDECIDED_ID; - } + if (pDisk == NULL) continue; - int32_t id = pTier->nextid; - while (true) { - SDisk *pDisk = DISK_AT_TIER(pTier, id); - if (pDisk == NULL) { - tfsUnLockTier(pTier); - return TFS_UNDECIDED_ID; - } + if (pDisk->size.avail < TFS_MIN_DISK_FREE_SIZE) continue; - if (DISK_FREE_SIZE(pDisk) < TFS_MIN_DISK_FREE_SIZE) { - id = (id + 1) % pTier->ndisk; - if (id == pTier->nextid) { - tfsUnLockTier(pTier); - return TFS_UNDECIDED_ID; - } else { - continue; - } - } else { - pTier->nextid = (id + 1) % pTier->ndisk; - break; - } + retId = diskId; + terrno = 0; + pTier->nextid = (diskId + 1) % pTier->ndisk; + break; } tfsUnLockTier(pTier); - return id; -} - -void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta) { - if (pTierMeta == NULL || pTierMeta == NULL) return; - - tfsLockTier(pTier); - *pTierMeta = pTier->tmeta; - tfsUnLockTier(pTier); + return retId; } void tfsPosNextId(STier *pTier) { - if (pTier == NULL || pTier->ndisk <= 0) return; - int32_t nextid = 0; for (int32_t id = 1; id < pTier->ndisk; id++) { - SDisk *pLDisk = DISK_AT_TIER(pTier, nextid); - SDisk *pDisk = DISK_AT_TIER(pTier, id); - if (DISK_FREE_SIZE(pDisk) > TFS_MIN_DISK_FREE_SIZE && DISK_FREE_SIZE(pDisk) > DISK_FREE_SIZE(pLDisk)) { + SDisk *pLDisk = pTier->disks[nextid]; + SDisk *pDisk = pTier->disks[id]; + if (pDisk->size.avail > TFS_MIN_DISK_FREE_SIZE && pDisk->size.avail > pLDisk->size.avail) { nextid = id; } } diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 0be17ca2b9..cae1b18b3c 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -121,7 +121,7 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) { return true; } -int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { +int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { unsigned _int64 i64FreeBytesToCaller; unsigned _int64 i64TotalBytes; unsigned _int64 i64FreeBytes; @@ -438,7 +438,7 @@ int taosSystem(const char *cmd) { void taosSetCoreDump() {} -int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { +int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { struct statvfs info; if (statvfs(dataDir, &info)) { //printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno)); @@ -771,13 +771,12 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) { return true; } -int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) { +int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { struct statvfs info; if (statvfs(dataDir, &info)) { - //printf("failed to get disk size, dataDir:%s errno:%s", dataDir, strerror(errno)); return -1; } else { - diskSize->tsize = info.f_blocks * info.f_frsize; + diskSize->total = info.f_blocks * info.f_frsize; diskSize->avail = info.f_bavail * info.f_frsize; diskSize->used = (info.f_blocks - info.f_bfree) * info.f_frsize; return 0; From ddf7dcc94f637ce0a2e3df75e61da629326bbdc4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 18 Jan 2022 16:02:12 +0800 Subject: [PATCH 08/14] fix mem leak --- include/common/tmsg.h | 11 ++++--- source/client/src/clientHb.c | 12 ++++--- source/client/test/clientTests.cpp | 4 ++- source/dnode/mnode/impl/inc/mndDef.h | 18 +++++++++-- source/dnode/mnode/impl/src/mndConsumer.c | 39 ++++++++++++++--------- source/dnode/mnode/impl/src/mndProfile.c | 5 +++ source/dnode/mnode/impl/src/mnode.c | 11 +++++++ 7 files changed, 73 insertions(+), 27 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f32fdcbae7..dfd376f1e9 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -188,16 +188,19 @@ void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); static FORCE_INLINE void tFreeClientHbReq(void *pReq) { SClientHbReq* req = (SClientHbReq*)pReq; - taosHashCleanup(req->info); - free(pReq); + if (req->info) taosHashCleanup(req->info); } int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); -static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) { +static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; - //taosArrayDestroyEx(req->reqs, tFreeClientHbReq); + if (deep) { + taosArrayDestroyEx(req->reqs, tFreeClientHbReq); + } else { + taosArrayDestroy(req->reqs); + } free(pReq); } diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 6d7fc9f81a..97ef77abcc 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -60,15 +60,17 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) { pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter); } +#if 0 pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL); while (pIter != NULL) { FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter; SClientHbKey connKey; taosHashCopyKey(pIter, &connKey); - getConnInfoFp(connKey, NULL); + SArray* pArray = getConnInfoFp(connKey, NULL); pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter); } +#endif return pBatchReq; } @@ -99,12 +101,12 @@ static void* hbThreadFunc(void* param) { //TODO: error handling break; } - void *bufCopy = buf; - tSerializeSClientHbBatchReq(&bufCopy, pReq); + void *abuf = buf; + tSerializeSClientHbBatchReq(&abuf, pReq); SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo)); if (pInfo == NULL) { terrno = TSDB_CODE_TSC_OUT_OF_MEMORY; - tFreeClientHbBatchReq(pReq); + tFreeClientHbBatchReq(pReq, false); free(buf); break; } @@ -120,7 +122,7 @@ static void* hbThreadFunc(void* param) { int64_t transporterId = 0; SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp); asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo); - tFreeClientHbBatchReq(pReq); + tFreeClientHbBatchReq(pReq, false); atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1); } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 415d6a57ce..13915fd85d 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -53,7 +53,9 @@ TEST(testCase, connect_Test) { if (pConn == NULL) { printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); } - sleep(3); + while (1) { + sleep(3); + } taos_close(pConn); } diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index de101b0f06..7da0f3826e 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -325,6 +325,19 @@ typedef struct SMqTopicConsumer { } SMqTopicConsumer; #endif +typedef struct SMqConsumerEp { + int32_t vgId; + SEpSet epset; + int64_t consumerId; +} SMqConsumerEp; + +typedef struct SMqCgroupTopicPair { + char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN]; + SArray* assigned; + SArray* unassignedConsumer; + SArray* unassignedVg; +} SMqCgroupTopicPair; + typedef struct SMqCGroup { char name[TSDB_CONSUMER_GROUP_LEN]; int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal @@ -351,10 +364,11 @@ typedef struct SMqTopicObj { // TODO: add cache and change name to id typedef struct SMqConsumerTopic { + char name[TSDB_TOPIC_FNAME_LEN]; int32_t epoch; - char name[TSDB_TOPIC_NAME_LEN]; //TODO: replace with something with ep SList *vgroups; // SList + SArray *pVgInfo; // SArray } SMqConsumerTopic; typedef struct SMqConsumerObj { @@ -362,7 +376,7 @@ typedef struct SMqConsumerObj { SRWLatch lock; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray *topics; // SArray - SHashObj *topicHash; + SHashObj *topicHash; //SHashObj } SMqConsumerObj; typedef struct SMqSubConsumerObj { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 54e640d8b7..d27bf53a90 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -204,34 +204,37 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) { static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; char *msgStr = pMsg->rpcMsg.pCont; - SCMSubscribeReq *pSubscribe; - tDeserializeSCMSubscribeReq(msgStr, pSubscribe); - int64_t consumerId = pSubscribe->consumerId; - char *consumerGroup = pSubscribe->consumerGroup; + SCMSubscribeReq subscribe; + tDeserializeSCMSubscribeReq(msgStr, &subscribe); + int64_t consumerId = subscribe.consumerId; + char *consumerGroup = subscribe.consumerGroup; int32_t cgroupLen = strlen(consumerGroup); SArray *newSub = NULL; - int newTopicNum = pSubscribe->topicNum; + int newTopicNum = subscribe.topicNum; if (newTopicNum) { newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); } + SMqConsumerTopic *pConsumerTopics = calloc(newTopicNum, sizeof(SMqConsumerTopic)); + if (pConsumerTopics == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } for (int i = 0; i < newTopicNum; i++) { char *newTopicName = taosArrayGetP(newSub, i); - SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic)); - if (pConsumerTopic == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - // TODO: free - return -1; - } + SMqConsumerTopic *pConsumerTopic = &pConsumerTopics[i]; + strcpy(pConsumerTopic->name, newTopicName); pConsumerTopic->vgroups = tdListNew(sizeof(int64_t)); - taosArrayPush(newSub, pConsumerTopic); - free(pConsumerTopic); } + + taosArrayAddBatch(newSub, pConsumerTopics, newTopicNum); + free(pConsumerTopics); taosArraySortString(newSub, taosArrayCompareString); SArray *oldSub = NULL; int oldTopicNum = 0; + // create consumer if not exist SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); if (pConsumer == NULL) { // create consumer @@ -249,6 +252,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); if (pTrans == NULL) { + //TODO: free memory return -1; } @@ -286,6 +290,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } if (pOldTopic != NULL) { + //cancel subscribe of that old topic ASSERT(pNewTopic == NULL); char *oldTopicName = pOldTopic->name; SList *vgroups = pOldTopic->vgroups; @@ -298,13 +303,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); while ((pn = tdListNext(&iter)) != NULL) { int32_t vgId = *(int64_t *)pn->data; + // acquire and get epset SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - // TODO release + // TODO what time to release? if (pVgObj == NULL) { // TODO handle error continue; } - // acquire and get epset + //build reset msg void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup); // TODO:serialize if (pMsg == NULL) { @@ -323,10 +329,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { return -1; } } + //delete data in mnode taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen); mndReleaseTopic(pMnode, pTopic); } else if (pNewTopic != NULL) { + // save subscribe info to mnode ASSERT(pOldTopic == NULL); char *newTopicName = pNewTopic->name; @@ -351,6 +359,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { // add into cgroups taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup)); } + /*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/ // put the consumer into list // rebalance will be triggered by timer diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 902eaa5c1c..3444a2409a 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -357,6 +357,11 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { } } } + for (int i = 0; i < sz; i++) { + SClientHbReq* pHbReq = taosArrayGet(pArray, i); + tFreeClientHbReq(pHbReq); + } + taosArrayDestroy(pArray); int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); void* buf = rpcMallocCont(tlen); void* abuf = buf; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index d70c93e758..cab30702ea 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -69,6 +69,17 @@ static void mndTransReExecute(void *param, void *tmrId) { taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer); } +static void mndCalMqRebalance(void* param, void* tmrId) { + SMnode* pMnode = param; + if (mndIsMaster(pMnode)) { + // iterate cgroup, cal rebalance + // sync with raft + // write sdb + } + + taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->transTimer); +} + static int32_t mndInitTimer(SMnode *pMnode) { if (pMnode->timer == NULL) { pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND"); From d14a599a32bda1bdad49e14b77c598a40d6531d5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 18 Jan 2022 16:15:24 +0800 Subject: [PATCH 09/14] fix mem leak --- source/dnode/mnode/impl/src/mndProfile.c | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 3444a2409a..3773750ed3 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -357,15 +357,13 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { } } } - for (int i = 0; i < sz; i++) { - SClientHbReq* pHbReq = taosArrayGet(pArray, i); - tFreeClientHbReq(pHbReq); - } - taosArrayDestroy(pArray); + taosArrayDestroyEx(pArray, tFreeClientHbReq); + int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp); void* buf = rpcMallocCont(tlen); void* abuf = buf; tSerializeSClientHbBatchRsp(&abuf, &batchRsp); + taosArrayDestroy(batchRsp.rsps); pReq->contLen = tlen; pReq->pCont = buf; return 0; From cc8a02aeb496bcb2901d3c9569f80c4f56265fb2 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 18 Jan 2022 16:17:58 +0800 Subject: [PATCH 10/14] fix mem leak --- source/libs/wal/src/walMeta.c | 1 + 1 file changed, 1 insertion(+) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index d630080086..a3894ceedd 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -149,6 +149,7 @@ int walCheckAndRepairMeta(SWal* pWal) { } } + closedir(dir); regfree(&logRegPattern); regfree(&idxRegPattern); From 623acecafdbfae6aa1471214040a001ea90b63fd Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 18 Jan 2022 16:24:54 +0800 Subject: [PATCH 11/14] init hb count --- source/client/src/clientHb.c | 3 +++ source/client/test/clientTests.cpp | 4 +--- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c index 97ef77abcc..0f4ff6f725 100644 --- a/source/client/src/clientHb.c +++ b/source/client/src/clientHb.c @@ -157,6 +157,9 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) { } // init stat pAppHbMgr->startTime = taosGetTimestampMs(); + pAppHbMgr->connKeyCnt = 0; + pAppHbMgr->reportCnt = 0; + pAppHbMgr->reportBytes = 0; // init app info pAppHbMgr->pAppInstInfo = pAppInstInfo; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 13915fd85d..415d6a57ce 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -53,9 +53,7 @@ TEST(testCase, connect_Test) { if (pConn == NULL) { printf("failed to connect to server, reason:%s\n", taos_errstr(NULL)); } - while (1) { - sleep(3); - } + sleep(3); taos_close(pConn); } From 66f610f8df52cf8897452fb06da972066a369ad4 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 18 Jan 2022 16:34:09 +0800 Subject: [PATCH 12/14] make jenkins happy --- source/dnode/mnode/impl/inc/mndDef.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 7da0f3826e..a2d6bbf4e6 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -333,7 +333,7 @@ typedef struct SMqConsumerEp { typedef struct SMqCgroupTopicPair { char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN]; - SArray* assigned; + SArray* assigned; // SArray SArray* unassignedConsumer; SArray* unassignedVg; } SMqCgroupTopicPair; From c4e0e94fc6f6e8a33739ffed6e066548941c8dd4 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Tue, 18 Jan 2022 17:53:21 +0800 Subject: [PATCH 13/14] [add create table scriptes] --- tests/script/sh/massiveTable/cleanCluster.sh | 57 ++++++++ .../script/sh/massiveTable/compileVersion.sh | 81 +++++++++++ tests/script/sh/massiveTable/deployCluster.sh | 25 ++++ tests/script/sh/massiveTable/setupDnodes.sh | 136 ++++++++++++++++++ 4 files changed, 299 insertions(+) create mode 100644 tests/script/sh/massiveTable/cleanCluster.sh create mode 100644 tests/script/sh/massiveTable/compileVersion.sh create mode 100644 tests/script/sh/massiveTable/deployCluster.sh create mode 100644 tests/script/sh/massiveTable/setupDnodes.sh diff --git a/tests/script/sh/massiveTable/cleanCluster.sh b/tests/script/sh/massiveTable/cleanCluster.sh new file mode 100644 index 0000000000..af278933b2 --- /dev/null +++ b/tests/script/sh/massiveTable/cleanCluster.sh @@ -0,0 +1,57 @@ +#!/bin/bash +# +# clean test environment + +set -e +#set -x + +# cleanCluster.sh +# -r [ dnode root dir] + + +dataRootDir="/data" + + +while getopts "hr:" arg +do + case $arg in + r) + dataRootDir=$(echo $OPTARG) + ;; + h) + echo "Usage: `basename $0` -r [ dnode root dir] " + exit 0 + ;; + ?) #unknow option + echo "unkonw argument" + exit 1 + ;; + esac +done + + +rmDnodesDataDir() { + if [ -d ${dataRootDir} ]; then + rm -rf ${dataRootDir}/dnode* + else + echo "${dataRootDir} not exist" + exit 1 + fi +} + +function kill_process() { + pid=$(ps -ef | grep "$1" | grep -v "grep" | awk '{print $2}') + if [ -n "$pid" ]; then + kill -9 $pid || : + fi +} + +######################################################################################## +############################### main process ########################################## + +## kill all taosd process +kill_process taosd + +rmDnodesDataDir + + diff --git a/tests/script/sh/massiveTable/compileVersion.sh b/tests/script/sh/massiveTable/compileVersion.sh new file mode 100644 index 0000000000..1976e8f14a --- /dev/null +++ b/tests/script/sh/massiveTable/compileVersion.sh @@ -0,0 +1,81 @@ +#!/bin/bash +# +# compile test version + +set -e +#set -x + +# compileVersion.sh +# -r [ TDengine project dir] +# -v [ TDengine branch version ] + + +projectDir=/root/TDengine +TDengineBrVer="3.0" + +while getopts "hr:v:" arg +do + case $arg in + r) + projectDir=$(echo $OPTARG) + ;; + v) + TDengineBrVer=$(echo $OPTARG) + ;; + h) + echo "Usage: `basename $0` -r [ TDengine project dir] " + echo " -v [ TDengine branch version] " + exit 0 + ;; + ?) #unknow option + echo "unkonw argument" + exit 1 + ;; + esac +done + +echo "projectDir=${projectDir} TDengineBrVer=${TDengineBrVer}" + +function gitPullBranchInfo () { + branch_name=$1 + + git checkout $branch_name + echo "==== git pull $branch_name start ====" +## git submodule update --init --recursive + git pull origin $branch_name ||: + echo "==== git pull $branch_name end ====" +} + +function compileTDengineVersion() { + debugDir=debug + if [ -d ${debugDir} ]; then + rm -rf ${debugDir}/* ||: + else + mkdir -p ${debugDir} + fi + + cd ${debugDir} + cmake .. + make -j24 +} +######################################################################################## +############################### main process ########################################## + +## checkout all branchs and git pull +cd ${projectDir} +gitPullBranchInfo $TDengineBrVer +compileTDengineVersion + +taos_dir=${projectDir}/debug/tools/shell +taosd_dir=${projectDir}/debug/source/dnode/mgmt/daemon +create_table_dir=${projectDir}/debug/tests/test/c + +rm -f /usr/bin/taos +rm -f /usr/bin/taosd +rm -f /usr/bin/create_table + +ln -s $taos_dir/taos /usr/bin/taos +ln -s $taosd_dir/taosd /usr/bin/taosd +ln -s $create_table_dir/create_table /usr/bin/create_table + + diff --git a/tests/script/sh/massiveTable/deployCluster.sh b/tests/script/sh/massiveTable/deployCluster.sh new file mode 100644 index 0000000000..47802ea6a7 --- /dev/null +++ b/tests/script/sh/massiveTable/deployCluster.sh @@ -0,0 +1,25 @@ +#!/bin/bash +# +# deploy test cluster + +set -e +#set -x + +# deployCluster.sh + +curr_dir=$(pwd) + +source ./cleanCluster.sh -r /data +source ./cleanCluster.sh -r /data2 + +source ./compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0" + +source ./setupDnodes.sh -r /data -n 1 -f trd02:7000 -p 7000 +source ./setupDnodes.sh -r /data2 -n 1 -f trd02:7000 -p 8000 + +#source ./setupDnodes.sh -r /data -n 2 -f trd02:7000 -p 7000 +#source ./setupDnodes.sh -r /data2 -n 2 -f trd02:7000 -p 8000 + + + + diff --git a/tests/script/sh/massiveTable/setupDnodes.sh b/tests/script/sh/massiveTable/setupDnodes.sh new file mode 100644 index 0000000000..034c15c4eb --- /dev/null +++ b/tests/script/sh/massiveTable/setupDnodes.sh @@ -0,0 +1,136 @@ +#!/bin/bash +# +# setup test environment + +set -e +#set -x + +# setupDnodes.sh +# -e [ new | old] +# -n [ dnode number] +# -f [ first ep] +# -p [ start port] +# -r [ dnode root dir] + +# set parameters by default value +enviMode=new +dataRootDir="/data" +firstEp="localhost:7000" +startPort=7000 +dnodeNumber=1 + + +while getopts "he:f:n:r:p:" arg +do + case $arg in + e) + enviMode=$( echo $OPTARG ) + ;; + n) + dnodeNumber=$(echo $OPTARG) + ;; + f) + firstEp=$(echo $OPTARG) + ;; + p) + startPort=$(echo $OPTARG) + ;; + r) + dataRootDir=$(echo $OPTARG) + ;; + h) + echo "Usage: `basename $0` -e [new | old] " + echo " -n [ dnode number] " + echo " -f [ first ep] " + echo " -p [ start port] " + echo " -r [ dnode root dir] " + exit 0 + ;; + ?) #unknow option + echo "unkonw argument" + exit 1 + ;; + esac +done + +echo "enviMode=${enviMode} dnodeNumber=${dnodeNumber} dataRootDir=${dataRootDir} firstEp=${firstEp} startPort=${startPort}" + +#curr_dir=$(pwd) + + +createNewCfgFile() { + cfgFile=$1/taos.cfg + dataDir=$2 + logDir=$3 + firstEp=$4 + serverPort=$5 + + echo "debugFlag 131" > ${cfgFile} + echo "firstEp ${firstEp}" >> ${cfgFile} + echo "dataDir ${dataDir}" >> ${cfgFile} + echo "logDir ${logDir}" >> ${cfgFile} + echo "serverPort ${serverPort}" >> ${cfgFile} + + echo "supportVnodes 1024" >> ${cfgFile} + #echo "asyncLog 0" >> ${cfgFile} + echo "telemetryReporting 0" >> ${cfgFile} +} + +createNewDnodesDataDir() { + if [ -d ${dataRootDir} ]; then + rm -rf ${dataRootDir}/dnode* + else + echo "${dataRootDir} not exist" + exit 1 + fi + + dnodeNumber=$1 + firstEp=$2 + + serverPort=${startPort} + for ((i=0; i<${dnodeNumber}; i++)); do + mkdir -p ${dataRootDir}/dnode_${i}/cfg + mkdir -p ${dataRootDir}/dnode_${i}/log + mkdir -p ${dataRootDir}/dnode_${i}/data + + createNewCfgFile ${dataRootDir}/dnode_${i}/cfg ${dataRootDir}/dnode_${i}/data ${dataRootDir}/dnode_${i}/log ${firstEp} ${serverPort} + echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}" + serverPort=$((10#${serverPort}+100)) + done +} + +function kill_process() { + pid=$(ps -ef | grep "$1" | grep -v "grep" | awk '{print $2}') + if [ -n "$pid" ]; then + kill -9 $pid || : + fi +} + +startDnodes() { + dnodeNumber=$1 + + for ((i=0; i<${dnodeNumber}; i++)); do + if [ -d ${dataRootDir}/dnode_${i} ]; then + nohup taosd -c ${dataRootDir}/dnode_${i}/cfg >/dev/null 2>&1 & + echo "start taosd ${dataRootDir}/dnode_${i}" + fi + done +} + +######################################################################################## +############################### main process ########################################## + +## kill all taosd process +kill_process taosd + +## create director for all dnode +if [[ "$enviMode" == "new" ]]; then + createNewDnodesDataDir ${dnodeNumber} ${firstEp} +fi + +## start all dnode by nohup +startDnodes ${dnodeNumber} + +echo " run setupDnodes.sh end !!!" + + From 45f7b12f83d08d0825aae66890e5d46fdf77910f Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Tue, 18 Jan 2022 17:59:20 +0800 Subject: [PATCH 14/14] [modify] --- tests/script/sh/massiveTable/cleanCluster.sh | 0 tests/script/sh/massiveTable/compileVersion.sh | 0 tests/script/sh/massiveTable/deployCluster.sh | 0 tests/script/sh/massiveTable/setupDnodes.sh | 0 4 files changed, 0 insertions(+), 0 deletions(-) mode change 100644 => 100755 tests/script/sh/massiveTable/cleanCluster.sh mode change 100644 => 100755 tests/script/sh/massiveTable/compileVersion.sh mode change 100644 => 100755 tests/script/sh/massiveTable/deployCluster.sh mode change 100644 => 100755 tests/script/sh/massiveTable/setupDnodes.sh diff --git a/tests/script/sh/massiveTable/cleanCluster.sh b/tests/script/sh/massiveTable/cleanCluster.sh old mode 100644 new mode 100755 diff --git a/tests/script/sh/massiveTable/compileVersion.sh b/tests/script/sh/massiveTable/compileVersion.sh old mode 100644 new mode 100755 diff --git a/tests/script/sh/massiveTable/deployCluster.sh b/tests/script/sh/massiveTable/deployCluster.sh old mode 100644 new mode 100755 diff --git a/tests/script/sh/massiveTable/setupDnodes.sh b/tests/script/sh/massiveTable/setupDnodes.sh old mode 100644 new mode 100755