diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index f32fdcbae7..dfd376f1e9 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -188,16 +188,19 @@ void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp);
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
SClientHbReq* req = (SClientHbReq*)pReq;
- taosHashCleanup(req->info);
- free(pReq);
+ if (req->info) taosHashCleanup(req->info);
}
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq);
-static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) {
+static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq;
- //taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
+ if (deep) {
+ taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
+ } else {
+ taosArrayDestroy(req->reqs);
+ }
free(pReq);
}
diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h
index 793c861363..3828a93144 100644
--- a/include/libs/tfs/tfs.h
+++ b/include/libs/tfs/tfs.h
@@ -13,8 +13,8 @@
* along with this program. If not, see .
*/
-#ifndef TD_TFS_H
-#define TD_TFS_H
+#ifndef _TD_TFS_H_
+#define _TD_TFS_H_
#include "tglobal.h"
@@ -23,8 +23,8 @@ extern "C" {
#endif
typedef struct {
- int level;
- int id;
+ int32_t level;
+ int32_t id;
} SDiskID;
#define TFS_UNDECIDED_LEVEL -1
@@ -36,33 +36,25 @@ typedef struct {
// FS APIs ====================================
typedef struct {
- int64_t tsize;
+ int64_t total;
int64_t used;
int64_t avail;
} SFSMeta;
-typedef struct {
- int64_t size;
- int64_t used;
- int64_t free;
- int16_t nAvailDisks; // # of Available disks
-} STierMeta;
-
-int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
-void tfsCleanup();
-void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels);
-void tfsGetMeta(SFSMeta *pMeta);
-void tfsAllocDisk(int expLevel, int *level, int *id);
+int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk);
+void tfsCleanup();
+void tfsUpdateSize(SFSMeta *pFSMeta);
+void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id);
const char *TFS_PRIMARY_PATH();
-const char *TFS_DISK_PATH(int level, int id);
+const char *TFS_DISK_PATH(int32_t level, int32_t id);
// TFILE APIs ====================================
typedef struct {
- int level;
- int id;
- char rname[TSDB_FILENAME_LEN]; // REL name
- char aname[TSDB_FILENAME_LEN]; // ABS name
+ int32_t level;
+ int32_t id;
+ char rname[TSDB_FILENAME_LEN]; // REL name
+ char aname[TSDB_FILENAME_LEN]; // ABS name
} TFILE;
#define TFILE_LEVEL(pf) ((pf)->level)
@@ -76,23 +68,23 @@ typedef struct {
#define tfscopy(sf, df) taosCopyFile(TFILE_NAME(sf), TFILE_NAME(df))
#define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df))
-void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
-bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2);
-int tfsEncodeFile(void **buf, TFILE *pf);
-void *tfsDecodeFile(void *buf, TFILE *pf);
-void tfsbasename(const TFILE *pf, char *dest);
-void tfsdirname(const TFILE *pf, char *dest);
+void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname);
+bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2);
+int32_t tfsEncodeFile(void **buf, TFILE *pf);
+void *tfsDecodeFile(void *buf, TFILE *pf);
+void tfsbasename(const TFILE *pf, char *dest);
+void tfsdirname(const TFILE *pf, char *dest);
// DIR APIs ====================================
-int tfsMkdirAt(const char *rname, int level, int id);
-int tfsMkdirRecurAt(const char *rname, int level, int id);
-int tfsMkdir(const char *rname);
-int tfsRmdir(const char *rname);
-int tfsRename(char *orname, char *nrname);
+int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id);
+int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id);
+int32_t tfsMkdir(const char *rname);
+int32_t tfsRmdir(const char *rname);
+int32_t tfsRename(char *orname, char *nrname);
typedef struct TDIR TDIR;
-TDIR * tfsOpendir(const char *rname);
+TDIR *tfsOpendir(const char *rname);
const TFILE *tfsReaddir(TDIR *tdir);
void tfsClosedir(TDIR *tdir);
@@ -100,4 +92,4 @@ void tfsClosedir(TDIR *tdir);
}
#endif
-#endif
+#endif /*_TD_TFS_H_*/
diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h
index 36ff4194d8..9dcb075489 100644
--- a/include/os/osSysinfo.h
+++ b/include/os/osSysinfo.h
@@ -35,12 +35,12 @@ extern char tsLocale[];
extern char tsCharset[]; // default encode string
typedef struct {
- int64_t tsize;
+ int64_t total;
int64_t used;
int64_t avail;
-} SysDiskSize;
+} SDiskSize;
-int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize);
+int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize);
int32_t taosGetCpuCores();
void taosGetSystemInfo();
bool taosReadProcIO(int64_t *rchars, int64_t *wchars);
diff --git a/include/util/taoserror.h b/include/util/taoserror.h
index fc895069af..e93577e620 100644
--- a/include/util/taoserror.h
+++ b/include/util/taoserror.h
@@ -412,7 +412,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) //"WAL out of memory")
// tfs
-#define TSDB_CODE_FS_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory")
+#define TSDB_CODE_FS_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory")
#define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201) //"tfs invalid mount config")
#define TSDB_CODE_FS_TOO_MANY_MOUNT TAOS_DEF_ERROR_CODE(0, 0x2202) //"tfs too many mount")
#define TSDB_CODE_FS_DUP_PRIMARY TAOS_DEF_ERROR_CODE(0, 0x2203) //"tfs duplicate primary mount")
diff --git a/source/client/src/clientHb.c b/source/client/src/clientHb.c
index 6d7fc9f81a..0f4ff6f725 100644
--- a/source/client/src/clientHb.c
+++ b/source/client/src/clientHb.c
@@ -60,15 +60,17 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
}
+#if 0
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL);
while (pIter != NULL) {
FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
SClientHbKey connKey;
taosHashCopyKey(pIter, &connKey);
- getConnInfoFp(connKey, NULL);
+ SArray* pArray = getConnInfoFp(connKey, NULL);
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter);
}
+#endif
return pBatchReq;
}
@@ -99,12 +101,12 @@ static void* hbThreadFunc(void* param) {
//TODO: error handling
break;
}
- void *bufCopy = buf;
- tSerializeSClientHbBatchReq(&bufCopy, pReq);
+ void *abuf = buf;
+ tSerializeSClientHbBatchReq(&abuf, pReq);
SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo));
if (pInfo == NULL) {
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
- tFreeClientHbBatchReq(pReq);
+ tFreeClientHbBatchReq(pReq, false);
free(buf);
break;
}
@@ -120,7 +122,7 @@ static void* hbThreadFunc(void* param) {
int64_t transporterId = 0;
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
- tFreeClientHbBatchReq(pReq);
+ tFreeClientHbBatchReq(pReq, false);
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
}
@@ -155,6 +157,9 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) {
}
// init stat
pAppHbMgr->startTime = taosGetTimestampMs();
+ pAppHbMgr->connKeyCnt = 0;
+ pAppHbMgr->reportCnt = 0;
+ pAppHbMgr->reportBytes = 0;
// init app info
pAppHbMgr->pAppInstInfo = pAppInstInfo;
diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h
index de101b0f06..a2d6bbf4e6 100644
--- a/source/dnode/mnode/impl/inc/mndDef.h
+++ b/source/dnode/mnode/impl/inc/mndDef.h
@@ -325,6 +325,19 @@ typedef struct SMqTopicConsumer {
} SMqTopicConsumer;
#endif
+typedef struct SMqConsumerEp {
+ int32_t vgId;
+ SEpSet epset;
+ int64_t consumerId;
+} SMqConsumerEp;
+
+typedef struct SMqCgroupTopicPair {
+ char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN];
+ SArray* assigned; // SArray
+ SArray* unassignedConsumer;
+ SArray* unassignedVg;
+} SMqCgroupTopicPair;
+
typedef struct SMqCGroup {
char name[TSDB_CONSUMER_GROUP_LEN];
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal
@@ -351,10 +364,11 @@ typedef struct SMqTopicObj {
// TODO: add cache and change name to id
typedef struct SMqConsumerTopic {
+ char name[TSDB_TOPIC_FNAME_LEN];
int32_t epoch;
- char name[TSDB_TOPIC_NAME_LEN];
//TODO: replace with something with ep
SList *vgroups; // SList
+ SArray *pVgInfo; // SArray
} SMqConsumerTopic;
typedef struct SMqConsumerObj {
@@ -362,7 +376,7 @@ typedef struct SMqConsumerObj {
SRWLatch lock;
char cgroup[TSDB_CONSUMER_GROUP_LEN];
SArray *topics; // SArray
- SHashObj *topicHash;
+ SHashObj *topicHash; //SHashObj
} SMqConsumerObj;
typedef struct SMqSubConsumerObj {
diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c
index 54e640d8b7..d27bf53a90 100644
--- a/source/dnode/mnode/impl/src/mndConsumer.c
+++ b/source/dnode/mnode/impl/src/mndConsumer.c
@@ -204,34 +204,37 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode;
char *msgStr = pMsg->rpcMsg.pCont;
- SCMSubscribeReq *pSubscribe;
- tDeserializeSCMSubscribeReq(msgStr, pSubscribe);
- int64_t consumerId = pSubscribe->consumerId;
- char *consumerGroup = pSubscribe->consumerGroup;
+ SCMSubscribeReq subscribe;
+ tDeserializeSCMSubscribeReq(msgStr, &subscribe);
+ int64_t consumerId = subscribe.consumerId;
+ char *consumerGroup = subscribe.consumerGroup;
int32_t cgroupLen = strlen(consumerGroup);
SArray *newSub = NULL;
- int newTopicNum = pSubscribe->topicNum;
+ int newTopicNum = subscribe.topicNum;
if (newTopicNum) {
newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
}
+ SMqConsumerTopic *pConsumerTopics = calloc(newTopicNum, sizeof(SMqConsumerTopic));
+ if (pConsumerTopics == NULL) {
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return -1;
+ }
for (int i = 0; i < newTopicNum; i++) {
char *newTopicName = taosArrayGetP(newSub, i);
- SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
- if (pConsumerTopic == NULL) {
- terrno = TSDB_CODE_OUT_OF_MEMORY;
- // TODO: free
- return -1;
- }
+ SMqConsumerTopic *pConsumerTopic = &pConsumerTopics[i];
+
strcpy(pConsumerTopic->name, newTopicName);
pConsumerTopic->vgroups = tdListNew(sizeof(int64_t));
- taosArrayPush(newSub, pConsumerTopic);
- free(pConsumerTopic);
}
+
+ taosArrayAddBatch(newSub, pConsumerTopics, newTopicNum);
+ free(pConsumerTopics);
taosArraySortString(newSub, taosArrayCompareString);
SArray *oldSub = NULL;
int oldTopicNum = 0;
+ // create consumer if not exist
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
if (pConsumer == NULL) {
// create consumer
@@ -249,6 +252,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
if (pTrans == NULL) {
+ //TODO: free memory
return -1;
}
@@ -286,6 +290,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
if (pOldTopic != NULL) {
+ //cancel subscribe of that old topic
ASSERT(pNewTopic == NULL);
char *oldTopicName = pOldTopic->name;
SList *vgroups = pOldTopic->vgroups;
@@ -298,13 +303,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
while ((pn = tdListNext(&iter)) != NULL) {
int32_t vgId = *(int64_t *)pn->data;
+ // acquire and get epset
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
- // TODO release
+ // TODO what time to release?
if (pVgObj == NULL) {
// TODO handle error
continue;
}
- // acquire and get epset
+ //build reset msg
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
// TODO:serialize
if (pMsg == NULL) {
@@ -323,10 +329,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return -1;
}
}
+ //delete data in mnode
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
mndReleaseTopic(pMnode, pTopic);
} else if (pNewTopic != NULL) {
+ // save subscribe info to mnode
ASSERT(pOldTopic == NULL);
char *newTopicName = pNewTopic->name;
@@ -351,6 +359,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
// add into cgroups
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
}
+ /*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
// put the consumer into list
// rebalance will be triggered by timer
diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c
index 902eaa5c1c..3773750ed3 100644
--- a/source/dnode/mnode/impl/src/mndProfile.c
+++ b/source/dnode/mnode/impl/src/mndProfile.c
@@ -357,10 +357,13 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
}
}
}
+ taosArrayDestroyEx(pArray, tFreeClientHbReq);
+
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
void* buf = rpcMallocCont(tlen);
void* abuf = buf;
tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
+ taosArrayDestroy(batchRsp.rsps);
pReq->contLen = tlen;
pReq->pCont = buf;
return 0;
diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c
index d70c93e758..cab30702ea 100644
--- a/source/dnode/mnode/impl/src/mnode.c
+++ b/source/dnode/mnode/impl/src/mnode.c
@@ -69,6 +69,17 @@ static void mndTransReExecute(void *param, void *tmrId) {
taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
}
+static void mndCalMqRebalance(void* param, void* tmrId) {
+ SMnode* pMnode = param;
+ if (mndIsMaster(pMnode)) {
+ // iterate cgroup, cal rebalance
+ // sync with raft
+ // write sdb
+ }
+
+ taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
+}
+
static int32_t mndInitTimer(SMnode *pMnode) {
if (pMnode->timer == NULL) {
pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
diff --git a/source/libs/tfs/inc/tfsint.h b/source/libs/tfs/inc/tfsInt.h
similarity index 55%
rename from source/libs/tfs/inc/tfsint.h
rename to source/libs/tfs/inc/tfsInt.h
index 3c5dccc63b..ce1436eb29 100644
--- a/source/libs/tfs/inc/tfsint.h
+++ b/source/libs/tfs/inc/tfsInt.h
@@ -13,19 +13,20 @@
* along with this program. If not, see .
*/
-#ifndef TD_TFSINT_H
-#define TD_TFSINT_H
+#ifndef _TD_TFS_INT_H_
+#define _TD_TFS_INT_H_
-#include "tlog.h"
-#include "tglobal.h"
-#include "tfs.h"
+#include "os.h"
+
+#include "taosdef.h"
+#include "taoserror.h"
#include "tcoding.h"
+#include "tfs.h"
+#include "tglobal.h"
+#include "thash.h"
+#include "tlog.h"
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-extern int fsDebugFlag;
+extern int32_t fsDebugFlag;
// For debug purpose
#define fFatal(...) { if (fsDebugFlag & DEBUG_FATAL) { taosPrintLog("TFS FATAL ", 255, __VA_ARGS__); }}
@@ -38,60 +39,44 @@ extern int fsDebugFlag;
// Global Definitions
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
-// tdisk.c ======================================================
-typedef struct {
- int64_t size;
- int64_t used;
- int64_t free;
-} SDiskMeta;
-
typedef struct SDisk {
- int level;
- int id;
- char dir[TSDB_FILENAME_LEN];
- SDiskMeta dmeta;
+ int32_t level;
+ int32_t id;
+ char *path;
+ SDiskSize size;
} SDisk;
-#define DISK_LEVEL(pd) ((pd)->level)
-#define DISK_ID(pd) ((pd)->id)
-#define DISK_DIR(pd) ((pd)->dir)
-#define DISK_META(pd) ((pd)->dmeta)
-#define DISK_SIZE(pd) ((pd)->dmeta.size)
-#define DISK_USED_SIZE(pd) ((pd)->dmeta.used)
-#define DISK_FREE_SIZE(pd) ((pd)->dmeta.free)
-
-SDisk *tfsNewDisk(int level, int id, const char *dir);
-SDisk *tfsFreeDisk(SDisk *pDisk);
-int tfsUpdateDiskInfo(SDisk *pDisk);
-
-// ttier.c ======================================================
-
typedef struct STier {
pthread_spinlock_t lock;
- int level;
- int16_t ndisk; // # of disks mounted to this tier
- int16_t nextid; // next disk id to allocate
- STierMeta tmeta;
- SDisk * disks[TSDB_MAX_DISKS_PER_TIER];
+ int32_t level;
+ int16_t nextid; // next disk id to allocate
+ int16_t ndisk; // # of disks mounted to this tier
+ int16_t nAvailDisks; // # of Available disks
+ SDisk *disks[TSDB_MAX_DISKS_PER_TIER];
+ SDiskSize size;
} STier;
#define TIER_LEVEL(pt) ((pt)->level)
#define TIER_NDISKS(pt) ((pt)->ndisk)
#define TIER_SIZE(pt) ((pt)->tmeta.size)
#define TIER_FREE_SIZE(pt) ((pt)->tmeta.free)
-#define TIER_AVAIL_DISKS(pt) ((pt)->tmeta.nAvailDisks)
-#define DISK_AT_TIER(pt, id) ((pt)->disks[id])
-int tfsInitTier(STier *pTier, int level);
-void tfsDestroyTier(STier *pTier);
-SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg);
-void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta);
-int tfsAllocDiskOnTier(STier *pTier);
-void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta);
-void tfsPosNextId(STier *pTier);
+#define DISK_AT_TIER(pt, id) ((pt)->disks[id])
+#define DISK_DIR(pd) ((pd)->path)
+
+SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir);
+SDisk *tfsFreeDisk(SDisk *pDisk);
+int32_t tfsUpdateDiskSize(SDisk *pDisk);
+
+int32_t tfsInitTier(STier *pTier, int32_t level);
+void tfsDestroyTier(STier *pTier);
+SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg);
+void tfsUpdateTierSize(STier *pTier);
+int32_t tfsAllocDiskOnTier(STier *pTier);
+void tfsPosNextId(STier *pTier);
#ifdef __cplusplus
}
#endif
-#endif
+#endif /*_TD_TFS_INT_H_*/
diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c
index 88d6d587a7..1d11cd6df2 100644
--- a/source/libs/tfs/src/tfs.c
+++ b/source/libs/tfs/src/tfs.c
@@ -13,22 +13,17 @@
* along with this program. If not, see .
*/
-#include "os.h"
-
-#include "taosdef.h"
-#include "taoserror.h"
-#include "tfs.h"
-#include "tfsint.h"
-#include "thash.h"
+#define _DEFAULT_SOURCE
+#include "tfsInt.h"
#define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32)
typedef struct {
pthread_spinlock_t lock;
SFSMeta meta;
- int nlevel;
+ int32_t nlevel;
STier tiers[TSDB_MAX_TIERS];
- SHashObj * map; // name to did map
+ SHashObj *map; // name to did map
} SFS;
typedef struct {
@@ -52,21 +47,24 @@ static SFS tfs = {0};
static SFS *pfs = &tfs;
// STATIC DECLARATION
-static int tfsMount(SDiskCfg *pCfg);
-static int tfsCheck();
-static int tfsCheckAndFormatCfg(SDiskCfg *pCfg);
-static int tfsFormatDir(char *idir, char *odir);
-static SDisk *tfsGetDiskByID(SDiskID did);
-static SDisk *tfsGetDiskByName(const char *dir);
-static int tfsOpendirImpl(TDIR *tdir);
-static void tfsInitDiskIter(SDiskIter *pIter);
-static SDisk *tfsNextDisk(SDiskIter *pIter);
+static int32_t tfsMount(SDiskCfg *pCfg);
+static int32_t tfsCheck();
+static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg);
+static int32_t tfsFormatDir(char *idir, char *odir);
+static SDisk *tfsGetDiskByID(SDiskID did);
+static SDisk *tfsGetDiskByName(const char *dir);
+static int32_t tfsOpendirImpl(TDIR *tdir);
+static void tfsInitDiskIter(SDiskIter *pIter);
+static SDisk *tfsNextDisk(SDiskIter *pIter);
// FS APIs ====================================
-int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
- ASSERT(ndisk > 0);
+int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) {
+ if (ndisk < 0) {
+ terrno = TSDB_CODE_INVALID_PARA;
+ return -1;
+ }
- for (int level = 0; level < TSDB_MAX_TIERS; level++) {
+ for (int32_t level = 0; level < TSDB_MAX_TIERS; level++) {
if (tfsInitTier(TFS_TIER_AT(level), level) < 0) {
while (true) {
level--;
@@ -84,12 +82,12 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
pfs->map = taosHashInit(TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER * 2,
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if (pfs->map == NULL) {
- terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
tfsCleanup();
return -1;
}
- for (int idisk = 0; idisk < ndisk; idisk++) {
+ for (int32_t idisk = 0; idisk < ndisk; idisk++) {
if (tfsMount(pDiskCfg + idisk) < 0) {
tfsCleanup();
return -1;
@@ -101,8 +99,8 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
return -1;
}
- tfsUpdateInfo(NULL, NULL, 0);
- for (int level = 0; level < TFS_NLEVEL(); level++) {
+ tfsUpdateSize(NULL);
+ for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
tfsPosNextId(TFS_TIER_AT(level));
}
@@ -114,32 +112,27 @@ void tfsCleanup() {
pfs->map = NULL;
pthread_spin_destroy(&(pfs->lock));
- for (int level = 0; level < TFS_NLEVEL(); level++) {
+ for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
tfsDestroyTier(TFS_TIER_AT(level));
}
}
-void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) {
- SFSMeta fsMeta;
- STierMeta tierMeta;
+void tfsUpdateSize(SFSMeta *pFSMeta) {
+ SFSMeta fsMeta = {0};
+ SDiskSize size = {0};
if (pFSMeta == NULL) {
pFSMeta = &fsMeta;
}
- memset(pFSMeta, 0, sizeof(*pFSMeta));
-
- for (int level = 0; level < TFS_NLEVEL(); level++) {
- STierMeta *pTierMeta = &tierMeta;
- if (tierMetas && level < numTiers) {
- pTierMeta = tierMetas + level;
- }
+ memset(pFSMeta, 0, sizeof(SFSMeta));
+ for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
STier *pTier = TFS_TIER_AT(level);
- tfsUpdateTierInfo(pTier, pTierMeta);
- pFSMeta->tsize += pTierMeta->size;
- pFSMeta->avail += pTierMeta->free;
- pFSMeta->used += pTierMeta->used;
+ tfsUpdateTierSize(pTier);
+ pFSMeta->total += pTier->size.total;
+ pFSMeta->avail += pTier->size.avail;
+ pFSMeta->used += pTier->size.used;
}
tfsLock();
@@ -147,17 +140,9 @@ void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) {
tfsUnLock();
}
-void tfsGetMeta(SFSMeta *pMeta) {
- ASSERT(pMeta);
-
- tfsLock();
- *pMeta = pfs->meta;
- tfsUnLock();
-}
-
/* Allocate an existing available tier level
*/
-void tfsAllocDisk(int expLevel, int *level, int *id) {
+void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id) {
ASSERT(expLevel >= 0);
*level = expLevel;
@@ -182,10 +167,10 @@ void tfsAllocDisk(int expLevel, int *level, int *id) {
}
const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); }
-const char *TFS_DISK_PATH(int level, int id) { return DISK_DIR(TFS_DISK_AT(level, id)); }
+const char *TFS_DISK_PATH(int32_t level, int32_t id) { return DISK_DIR(TFS_DISK_AT(level, id)); }
// TFILE APIs ====================================
-void tfsInitFile(TFILE *pf, int level, int id, const char *bname) {
+void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname) {
ASSERT(TFS_IS_VALID_DISK(level, id));
SDisk *pDisk = TFS_DISK_AT(level, id);
@@ -208,8 +193,8 @@ bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2) {
return true;
}
-int tfsEncodeFile(void **buf, TFILE *pf) {
- int tlen = 0;
+int32_t tfsEncodeFile(void **buf, TFILE *pf) {
+ int32_t tlen = 0;
tlen += taosEncodeVariantI32(buf, pf->level);
tlen += taosEncodeVariantI32(buf, pf->id);
@@ -220,7 +205,7 @@ int tfsEncodeFile(void **buf, TFILE *pf) {
void *tfsDecodeFile(void *buf, TFILE *pf) {
int32_t level, id;
- char * rname;
+ char *rname;
buf = taosDecodeVariantI32(buf, &(level));
buf = taosDecodeVariantI32(buf, &(id));
@@ -247,7 +232,7 @@ void tfsdirname(const TFILE *pf, char *dest) {
}
// DIR APIs ====================================
-int tfsMkdirAt(const char *rname, int level, int id) {
+int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id) {
SDisk *pDisk = TFS_DISK_AT(level, id);
char aname[TMPNAME_LEN];
@@ -260,7 +245,7 @@ int tfsMkdirAt(const char *rname, int level, int id) {
return 0;
}
-int tfsMkdirRecurAt(const char *rname, int level, int id) {
+int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id) {
if (tfsMkdirAt(rname, level, id) < 0) {
if (errno == ENOENT) {
// Try to create upper
@@ -293,10 +278,10 @@ int tfsMkdirRecurAt(const char *rname, int level, int id) {
return 0;
}
-int tfsMkdir(const char *rname) {
- for (int level = 0; level < TFS_NLEVEL(); level++) {
+int32_t tfsMkdir(const char *rname) {
+ for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
STier *pTier = TFS_TIER_AT(level);
- for (int id = 0; id < TIER_NDISKS(pTier); id++) {
+ for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) {
if (tfsMkdirAt(rname, level, id) < 0) {
return -1;
}
@@ -306,15 +291,15 @@ int tfsMkdir(const char *rname) {
return 0;
}
-int tfsRmdir(const char *rname) {
+int32_t tfsRmdir(const char *rname) {
char aname[TMPNAME_LEN] = "\0";
- for (int level = 0; level < TFS_NLEVEL(); level++) {
+ for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
STier *pTier = TFS_TIER_AT(level);
- for (int id = 0; id < TIER_NDISKS(pTier); id++) {
- SDisk *pDisk = DISK_AT_TIER(pTier, id);
+ for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) {
+ SDisk *pDisk = pTier->disks[id];
- snprintf(aname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), rname);
+ snprintf(aname, TMPNAME_LEN, "%s%s%s", DISK_DIR(pDisk), TS_PATH_DELIMITER, rname);
taosRemoveDir(aname);
}
@@ -323,13 +308,14 @@ int tfsRmdir(const char *rname) {
return 0;
}
-int tfsRename(char *orname, char *nrname) {
+#if 0
+int32_t tfsRename(char *orname, char *nrname) {
char oaname[TMPNAME_LEN] = "\0";
char naname[TMPNAME_LEN] = "\0";
- for (int level = 0; level < pfs->nlevel; level++) {
+ for (int32_t level = 0; level < pfs->nlevel; level++) {
STier *pTier = TFS_TIER_AT(level);
- for (int id = 0; id < TIER_NDISKS(pTier); id++) {
+ for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) {
SDisk *pDisk = DISK_AT_TIER(pTier, id);
snprintf(oaname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), orname);
@@ -341,20 +327,21 @@ int tfsRename(char *orname, char *nrname) {
return 0;
}
+#endif
struct TDIR {
SDiskIter iter;
- int level;
- int id;
+ int32_t level;
+ int32_t id;
char dirname[TSDB_FILENAME_LEN];
TFILE tfile;
- DIR * dir;
+ DIR *dir;
};
TDIR *tfsOpendir(const char *rname) {
TDIR *tdir = (TDIR *)calloc(1, sizeof(*tdir));
if (tdir == NULL) {
- terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
@@ -407,9 +394,9 @@ void tfsClosedir(TDIR *tdir) {
}
// private
-static int tfsMount(SDiskCfg *pCfg) {
+static int32_t tfsMount(SDiskCfg *pCfg) {
SDiskID did;
- SDisk * pDisk = NULL;
+ SDisk *pDisk = NULL;
if (tfsCheckAndFormatCfg(pCfg) < 0) return -1;
@@ -419,7 +406,7 @@ static int tfsMount(SDiskCfg *pCfg) {
fError("failed to mount disk %s to level %d since %s", pCfg->dir, pCfg->level, tstrerror(terrno));
return -1;
}
- did.id = DISK_ID(pDisk);
+ did.id = pDisk->id;
taosHashPut(pfs->map, (void *)(pCfg->dir), strnlen(pCfg->dir, TSDB_FILENAME_LEN), (void *)(&did), sizeof(did));
if (pfs->nlevel < pCfg->level + 1) pfs->nlevel = pCfg->level + 1;
@@ -427,7 +414,7 @@ static int tfsMount(SDiskCfg *pCfg) {
return 0;
}
-static int tfsCheckAndFormatCfg(SDiskCfg *pCfg) {
+static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg) {
char dirName[TSDB_FILENAME_LEN] = "\0";
struct stat pstat;
@@ -486,10 +473,10 @@ static int tfsCheckAndFormatCfg(SDiskCfg *pCfg) {
return 0;
}
-static int tfsFormatDir(char *idir, char *odir) {
+static int32_t tfsFormatDir(char *idir, char *odir) {
wordexp_t wep = {0};
- int code = wordexp(idir, &wep, 0);
+ int32_t code = wordexp(idir, &wep, 0);
if (code != 0) {
terrno = TAOS_SYSTEM_ERROR(code);
return -1;
@@ -507,14 +494,14 @@ static int tfsFormatDir(char *idir, char *odir) {
return 0;
}
-static int tfsCheck() {
+static int32_t tfsCheck() {
if (TFS_PRIMARY_DISK() == NULL) {
fError("no primary disk is set");
terrno = TSDB_CODE_FS_NO_PRIMARY_DISK;
return -1;
}
- for (int level = 0; level < TFS_NLEVEL(); level++) {
+ for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
if (TIER_NDISKS(TFS_TIER_AT(level)) == 0) {
fError("no disk at level %d", level);
terrno = TSDB_CODE_FS_NO_MOUNT_AT_TIER;
@@ -528,8 +515,8 @@ static int tfsCheck() {
static SDisk *tfsGetDiskByID(SDiskID did) { return TFS_DISK_AT(did.level, did.id); }
static SDisk *tfsGetDiskByName(const char *dir) {
SDiskID did;
- SDisk * pDisk = NULL;
- void * pr = NULL;
+ SDisk *pDisk = NULL;
+ void *pr = NULL;
pr = taosHashGet(pfs->map, (void *)dir, strnlen(dir, TSDB_FILENAME_LEN));
if (pr == NULL) return NULL;
@@ -541,7 +528,7 @@ static SDisk *tfsGetDiskByName(const char *dir) {
return pDisk;
}
-static int tfsOpendirImpl(TDIR *tdir) {
+static int32_t tfsOpendirImpl(TDIR *tdir) {
SDisk *pDisk = NULL;
char adir[TMPNAME_LEN * 2] = "\0";
@@ -554,10 +541,10 @@ static int tfsOpendirImpl(TDIR *tdir) {
pDisk = tfsNextDisk(&(tdir->iter));
if (pDisk == NULL) return 0;
- tdir->level = DISK_LEVEL(pDisk);
- tdir->id = DISK_ID(pDisk);
+ tdir->level = pDisk->level;
+ tdir->id = pDisk->id;
- snprintf(adir, TMPNAME_LEN * 2, "%s/%s", DISK_DIR(pDisk), tdir->dirname);
+ snprintf(adir, TMPNAME_LEN * 2, "%s%s%s", pDisk->path, TS_PATH_DELIMITER,tdir->dirname);
tdir->dir = opendir(adir);
if (tdir->dir != NULL) break;
}
@@ -572,8 +559,8 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) {
if (pDisk == NULL) return NULL;
- int level = DISK_LEVEL(pDisk);
- int id = DISK_ID(pDisk);
+ int32_t level = pDisk->level;
+ int32_t id = pDisk->id;
id++;
if (id < TIER_NDISKS(TFS_TIER_AT(level))) {
@@ -596,21 +583,21 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) {
// OTHER FUNCTIONS ===================================
void taosGetDisk() {
const double unit = 1024 * 1024 * 1024;
- SysDiskSize diskSize;
+ SDiskSize diskSize;
SFSMeta fsMeta;
- tfsUpdateInfo(&fsMeta, NULL, 0);
- tsTotalDataDirGB = (float)(fsMeta.tsize / unit);
+ tfsUpdateSize(&fsMeta);
+ tsTotalDataDirGB = (float)(fsMeta.total / unit);
tsUsedDataDirGB = (float)(fsMeta.used / unit);
tsAvailDataDirGB = (float)(fsMeta.avail / unit);
if (taosGetDiskSize(tsLogDir, &diskSize) == 0) {
- tsTotalLogDirGB = (float)(diskSize.tsize / unit);
+ tsTotalLogDirGB = (float)(diskSize.total / unit);
tsAvailLogDirGB = (float)(diskSize.avail / unit);
}
if (taosGetDiskSize(tsTempDir, &diskSize) == 0) {
- tsTotalTmpDirGB = (float)(diskSize.tsize / unit);
+ tsTotalTmpDirGB = (float)(diskSize.total / unit);
tsAvailTmpDirectorySpace = (float)(diskSize.avail / unit);
}
}
diff --git a/source/libs/tfs/src/tdisk.c b/source/libs/tfs/src/tfsDisk.c
similarity index 52%
rename from source/libs/tfs/src/tdisk.c
rename to source/libs/tfs/src/tfsDisk.c
index 22601e48c3..a5ef1121ff 100644
--- a/source/libs/tfs/src/tdisk.c
+++ b/source/libs/tfs/src/tfsDisk.c
@@ -12,48 +12,45 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*/
-#include "os.h"
-#include "taoserror.h"
-#include "tfsint.h"
+#define _DEFAULT_SOURCE
+#include "tfsInt.h"
-// PROTECTED ====================================
-SDisk *tfsNewDisk(int level, int id, const char *dir) {
- SDisk *pDisk = (SDisk *)calloc(1, sizeof(*pDisk));
+SDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) {
+ SDisk *pDisk = calloc(1, sizeof(SDisk));
if (pDisk == NULL) {
- terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
+ return NULL;
+ }
+
+ pDisk->path = strdup(path);
+ if (pDisk->path == NULL) {
+ free(pDisk);
+ terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pDisk->level = level;
pDisk->id = id;
- tstrncpy(pDisk->dir, dir, TSDB_FILENAME_LEN);
-
+ taosGetDiskSize(pDisk->path, &pDisk->size);
return pDisk;
}
SDisk *tfsFreeDisk(SDisk *pDisk) {
- if (pDisk) {
+ if (pDisk != NULL) {
+ free(pDisk->path);
free(pDisk);
}
+
return NULL;
}
-int tfsUpdateDiskInfo(SDisk *pDisk) {
- ASSERT(pDisk != NULL);
-
- SysDiskSize diskSize = {0};
-
- int code = taosGetDiskSize(pDisk->dir, &diskSize);
- if (code != 0) {
- fError("failed to update disk information at level %d id %d dir %s since %s", pDisk->level, pDisk->id, pDisk->dir,
- strerror(errno));
+int32_t tfsUpdateDiskSize(SDisk *pDisk) {
+ if (taosGetDiskSize(pDisk->path, &pDisk->size) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
+ fError("failed to get disk:%s size, level:%d id:%d since %s", pDisk->path, pDisk->level, pDisk->id, terrstr());
+ return -1;
}
- pDisk->dmeta.size = diskSize.tsize;
- pDisk->dmeta.used = diskSize.used;
- pDisk->dmeta.free = diskSize.avail;
-
- return code;
+ return 0;
}
diff --git a/source/libs/tfs/src/tfsTier.c b/source/libs/tfs/src/tfsTier.c
new file mode 100644
index 0000000000..2a26c23ead
--- /dev/null
+++ b/source/libs/tfs/src/tfsTier.c
@@ -0,0 +1,143 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "tfsInt.h"
+
+#define tfsLockTier(pTier) pthread_spin_lock(&(pTier)->lock)
+#define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock)
+
+int32_t tfsInitTier(STier *pTier, int32_t level) {
+ memset(pTier, 0, sizeof(STier));
+
+ if (pthread_spin_init(&pTier->lock, 0) != 0) {
+ terrno = TAOS_SYSTEM_ERROR(errno);
+ return -1;
+ }
+
+ pTier->level = level;
+ return 0;
+}
+
+void tfsDestroyTier(STier *pTier) {
+ for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) {
+ pTier->disks[id] = tfsFreeDisk(pTier->disks[id]);
+ }
+
+ pTier->ndisk = 0;
+ pthread_spin_destroy(&(pTier->lock));
+}
+
+SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
+ if (pTier->ndisk >= TSDB_MAX_DISKS_PER_TIER) {
+ terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
+ return NULL;
+ }
+
+ int32_t id = 0;
+ if (pTier->level == 0) {
+ if (pTier->disks[0] != NULL) {
+ id = pTier->ndisk;
+ } else {
+ if (pCfg->primary) {
+ id = 0;
+ } else {
+ id = pTier->ndisk + 1;
+ }
+ }
+ } else {
+ id = pTier->ndisk;
+ }
+
+ if (id >= TSDB_MAX_DISKS_PER_TIER) {
+ terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
+ return NULL;
+ }
+
+ SDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir);
+ if (pDisk == NULL) return NULL;
+
+ pTier->disks[id] = pDisk;
+ pTier->ndisk++;
+
+ fInfo("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id);
+ return pTier->disks[id];
+}
+
+void tfsUpdateTierSize(STier *pTier) {
+ SDiskSize size = {0};
+ int16_t nAvailDisks = 0;
+
+ tfsLockTier(pTier);
+
+ for (int32_t id = 0; id < pTier->ndisk; id++) {
+ SDisk *pDisk = pTier->disks[id];
+ if (pDisk == NULL) continue;
+
+ size.total += pDisk->size.total;
+ size.used += pDisk->size.used;
+ size.avail += pDisk->size.avail;
+ nAvailDisks++;
+ }
+
+ pTier->size = size;
+ pTier->nAvailDisks = nAvailDisks;
+
+ tfsUnLockTier(pTier);
+}
+
+// Round-Robin to allocate disk on a tier
+int32_t tfsAllocDiskOnTier(STier *pTier) {
+ terrno = TSDB_CODE_FS_NO_VALID_DISK;
+
+ tfsLockTier(pTier);
+
+ if (pTier->ndisk <= 0 || pTier->nAvailDisks <= 0) {
+ tfsUnLockTier(pTier);
+ return -1;
+ }
+
+ int32_t retId = -1;
+ for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; ++id) {
+ int32_t diskId = (pTier->nextid + id) % pTier->ndisk;
+ SDisk *pDisk = pTier->disks[diskId];
+
+ if (pDisk == NULL) continue;
+
+ if (pDisk->size.avail < TFS_MIN_DISK_FREE_SIZE) continue;
+
+ retId = diskId;
+ terrno = 0;
+ pTier->nextid = (diskId + 1) % pTier->ndisk;
+ break;
+ }
+
+ tfsUnLockTier(pTier);
+ return retId;
+}
+
+void tfsPosNextId(STier *pTier) {
+ int32_t nextid = 0;
+
+ for (int32_t id = 1; id < pTier->ndisk; id++) {
+ SDisk *pLDisk = pTier->disks[nextid];
+ SDisk *pDisk = pTier->disks[id];
+ if (pDisk->size.avail > TFS_MIN_DISK_FREE_SIZE && pDisk->size.avail > pLDisk->size.avail) {
+ nextid = id;
+ }
+ }
+
+ pTier->nextid = nextid;
+}
diff --git a/source/libs/tfs/src/ttier.c b/source/libs/tfs/src/ttier.c
deleted file mode 100644
index 3b19797acf..0000000000
--- a/source/libs/tfs/src/ttier.c
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-#include "os.h"
-
-#include "taosdef.h"
-#include "taoserror.h"
-#include "tfsint.h"
-
-#define tfsLockTier(pTier) pthread_spin_lock(&((pTier)->lock))
-#define tfsUnLockTier(pTier) pthread_spin_unlock(&((pTier)->lock))
-
-// PROTECTED ==========================================
-int tfsInitTier(STier *pTier, int level) {
- memset((void *)pTier, 0, sizeof(*pTier));
-
- int code = pthread_spin_init(&(pTier->lock), 0);
- if (code) {
- terrno = TAOS_SYSTEM_ERROR(code);
- return -1;
- }
-
- pTier->level = level;
-
- return 0;
-}
-
-void tfsDestroyTier(STier *pTier) {
- for (int id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) {
- DISK_AT_TIER(pTier, id) = tfsFreeDisk(DISK_AT_TIER(pTier, id));
- }
-
- pTier->ndisk = 0;
-
- pthread_spin_destroy(&(pTier->lock));
-}
-
-SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
- ASSERT(pTier->level == pCfg->level);
-
- int id = 0;
- SDisk *pDisk;
-
- if (TIER_NDISKS(pTier) >= TSDB_MAX_DISKS_PER_TIER) {
- terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
- return NULL;
- }
-
- if (pTier->level == 0) {
- if (DISK_AT_TIER(pTier, 0) != NULL) {
- id = pTier->ndisk;
- } else {
- if (pCfg->primary) {
- id = 0;
- } else {
- id = pTier->ndisk + 1;
- }
- if (id >= TSDB_MAX_DISKS_PER_TIER) {
- terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
- return NULL;
- }
- }
- } else {
- id = pTier->ndisk;
- }
-
- pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir);
- if (pDisk == NULL) return NULL;
- DISK_AT_TIER(pTier, id) = pDisk;
- pTier->ndisk++;
-
- fInfo("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id);
-
- return DISK_AT_TIER(pTier, id);
-}
-
-void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta) {
- STierMeta tmeta;
-
- if (pTierMeta == NULL) {
- pTierMeta = &tmeta;
- }
- memset(pTierMeta, 0, sizeof(*pTierMeta));
-
- tfsLockTier(pTier);
-
- for (int id = 0; id < pTier->ndisk; id++) {
- if (tfsUpdateDiskInfo(DISK_AT_TIER(pTier, id)) < 0) {
- continue;
- }
- pTierMeta->size += DISK_SIZE(DISK_AT_TIER(pTier, id));
- pTierMeta->used += DISK_USED_SIZE(DISK_AT_TIER(pTier, id));
- pTierMeta->free += DISK_FREE_SIZE(DISK_AT_TIER(pTier, id));
- pTierMeta->nAvailDisks++;
- }
-
- pTier->tmeta = *pTierMeta;
-
- tfsUnLockTier(pTier);
-}
-
-// Round-Robin to allocate disk on a tier
-int tfsAllocDiskOnTier(STier *pTier) {
- ASSERT(pTier->ndisk > 0);
- int id = TFS_UNDECIDED_ID;
- SDisk *pDisk;
-
- tfsLockTier(pTier);
-
- if (TIER_AVAIL_DISKS(pTier) <= 0) {
- tfsUnLockTier(pTier);
- return id;
- }
-
- id = pTier->nextid;
- while (true) {
- pDisk = DISK_AT_TIER(pTier, id);
- ASSERT(pDisk != NULL);
-
- if (DISK_FREE_SIZE(pDisk) < TFS_MIN_DISK_FREE_SIZE) {
- id = (id + 1) % pTier->ndisk;
- if (id == pTier->nextid) {
- tfsUnLockTier(pTier);
- return TFS_UNDECIDED_ID;
- } else {
- continue;
- }
- } else {
- pTier->nextid = (id + 1) % pTier->ndisk;
- break;
- }
- }
-
- tfsUnLockTier(pTier);
- return id;
-}
-
-void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta) {
- ASSERT(pTierMeta != NULL);
-
- tfsLockTier(pTier);
- *pTierMeta = pTier->tmeta;
- tfsUnLockTier(pTier);
-}
-
-void tfsPosNextId(STier *pTier) {
- ASSERT(pTier->ndisk > 0);
- int nextid = 0;
-
- for (int id = 1; id < pTier->ndisk; id++) {
- SDisk *pLDisk = DISK_AT_TIER(pTier, nextid);
- SDisk *pDisk = DISK_AT_TIER(pTier, id);
- if (DISK_FREE_SIZE(pDisk) > TFS_MIN_DISK_FREE_SIZE && DISK_FREE_SIZE(pDisk) > DISK_FREE_SIZE(pLDisk)) {
- nextid = id;
- }
- }
-
- pTier->nextid = nextid;
-}
diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h
new file mode 100644
index 0000000000..792200639a
--- /dev/null
+++ b/source/libs/transport/inc/transComm.h
@@ -0,0 +1,120 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#ifdef USE_UV
+
+#include
+#include "lz4.h"
+#include "os.h"
+#include "rpcCache.h"
+#include "rpcHead.h"
+#include "rpcLog.h"
+#include "rpcTcp.h"
+#include "rpcUdp.h"
+#include "taoserror.h"
+#include "tglobal.h"
+#include "thash.h"
+#include "tidpool.h"
+#include "tmd5.h"
+#include "tmempool.h"
+#include "tmsg.h"
+#include "transportInt.h"
+#include "tref.h"
+#include "trpc.h"
+#include "ttimer.h"
+#include "tutil.h"
+
+typedef void* queue[2];
+/* Private macros. */
+#define QUEUE_NEXT(q) (*(queue**)&((*(q))[0]))
+#define QUEUE_PREV(q) (*(queue**)&((*(q))[1]))
+
+#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
+#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
+/* Initialize an empty queue. */
+#define QUEUE_INIT(q) \
+ { \
+ QUEUE_NEXT(q) = (q); \
+ QUEUE_PREV(q) = (q); \
+ }
+
+/* Return true if the queue has no element. */
+#define QUEUE_IS_EMPTY(q) ((const queue*)(q) == (const queue*)QUEUE_NEXT(q))
+
+/* Insert an element at the back of a queue. */
+#define QUEUE_PUSH(q, e) \
+ { \
+ QUEUE_NEXT(e) = (q); \
+ QUEUE_PREV(e) = QUEUE_PREV(q); \
+ QUEUE_PREV_NEXT(e) = (e); \
+ QUEUE_PREV(q) = (e); \
+ }
+
+/* Remove the given element from the queue. Any element can be removed at any *
+ * time. */
+#define QUEUE_REMOVE(e) \
+ { \
+ QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
+ QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
+ }
+
+/* Return the element at the front of the queue. */
+#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
+
+/* Return the element at the back of the queue. */
+#define QUEUE_TAIL(q) (QUEUE_PREV(q))
+
+/* Iterate over the element of a queue. * Mutating the queue while iterating
+ * results in undefined behavior. */
+#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))
+
+/* Return the structure holding the given element. */
+#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field))))
+
+typedef struct {
+ SRpcInfo* pRpc; // associated SRpcInfo
+ SEpSet epSet; // ip list provided by app
+ void* ahandle; // handle provided by app
+ struct SRpcConn* pConn; // pConn allocated
+ tmsg_t msgType; // message type
+ uint8_t* pCont; // content provided by app
+ int32_t contLen; // content length
+ int32_t code; // error code
+ int16_t numOfTry; // number of try for different servers
+ int8_t oldInUse; // server EP inUse passed by app
+ int8_t redirect; // flag to indicate redirect
+ int8_t connType; // connection type
+ int64_t rid; // refId returned by taosAddRef
+ SRpcMsg* pRsp; // for synchronous API
+ tsem_t* pSem; // for synchronous API
+ SEpSet* pSet; // for synchronous API
+ char msg[0]; // RpcHead starts from here
+} SRpcReqContext;
+
+#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member)))
+#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext))
+
+#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest))
+#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead)))
+#define rpcContFromHead(msg) (msg + sizeof(SRpcHead))
+#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead))
+#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead))
+#define rpcIsReq(type) (type & 1U)
+
+int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey);
+void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey);
+int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen);
+SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead);
+
+#endif
diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h
index f93753cfe9..e39e0d9273 100644
--- a/source/libs/transport/inc/transportInt.h
+++ b/source/libs/transport/inc/transportInt.h
@@ -16,62 +16,61 @@
#ifndef _TD_TRANSPORT_INT_H_
#define _TD_TRANSPORT_INT_H_
+#ifdef USE_UV
+#include
+#endif
+#include "lz4.h"
+#include "os.h"
+#include "rpcCache.h"
#include "rpcHead.h"
+#include "rpcLog.h"
+#include "rpcTcp.h"
+#include "rpcUdp.h"
+#include "taoserror.h"
+#include "tglobal.h"
+#include "thash.h"
+#include "tidpool.h"
+#include "tmsg.h"
+#include "tref.h"
+#include "trpc.h"
+#include "ttimer.h"
+#include "tutil.h"
+
#ifdef __cplusplus
extern "C" {
#endif
#ifdef USE_UV
-#include
-typedef void *queue[2];
+void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
+void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle);
-/* Private macros. */
-#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0]))
-#define QUEUE_PREV(q) (*(queue **)&((*(q))[1]))
+typedef struct {
+ int sessions; // number of sessions allowed
+ int numOfThreads; // number of threads to process incoming messages
+ int idleTime; // milliseconds;
+ uint16_t localPort;
+ int8_t connType;
+ int64_t index;
+ char label[TSDB_LABEL_LEN];
-#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q)))
-#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q)))
+ char user[TSDB_UNI_LEN]; // meter ID
+ char spi; // security parameter index
+ char encrypt; // encrypt algorithm
+ char secret[TSDB_PASSWORD_LEN]; // secret for the link
+ char ckey[TSDB_PASSWORD_LEN]; // ciphering key
-/* Initialize an empty queue. */
-#define QUEUE_INIT(q) \
- { \
- QUEUE_NEXT(q) = (q); \
- QUEUE_PREV(q) = (q); \
- }
+ void (*cfp)(void* parent, SRpcMsg*, SEpSet*);
+ int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey);
-/* Return true if the queue has no element. */
-#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q))
-
-/* Insert an element at the back of a queue. */
-#define QUEUE_PUSH(q, e) \
- { \
- QUEUE_NEXT(e) = (q); \
- QUEUE_PREV(e) = QUEUE_PREV(q); \
- QUEUE_PREV_NEXT(e) = (e); \
- QUEUE_PREV(q) = (e); \
- }
-
-/* Remove the given element from the queue. Any element can be removed at any *
- * time. */
-#define QUEUE_REMOVE(e) \
- { \
- QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \
- QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \
- }
-
-/* Return the element at the front of the queue. */
-#define QUEUE_HEAD(q) (QUEUE_NEXT(q))
-
-/* Return the element at the back of the queue. */
-#define QUEUE_TAIL(q) (QUEUE_PREV(q))
-
-/* Iterate over the element of a queue. * Mutating the queue while iterating
- * results in undefined behavior. */
-#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q))
-
-/* Return the structure holding the given element. */
-#define QUEUE_DATA(e, type, field) ((type *)((void *)((char *)(e)-offsetof(type, field))))
+ int32_t refCount;
+ void* parent;
+ void* idPool; // handle to ID pool
+ void* tmrCtrl; // handle to timer
+ SHashObj* hash; // handle returned by hash utility
+ void* tcphandle; // returned handle from TCP initialization
+ pthread_mutex_t mutex;
+} SRpcInfo;
#endif // USE_LIBUV
diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c
new file mode 100644
index 0000000000..a59f2f88c7
--- /dev/null
+++ b/source/libs/transport/src/trans.c
@@ -0,0 +1,64 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifdef USE_UV
+
+#include "transComm.h"
+
+typedef struct SConnBuffer {
+ char* buf;
+ int len;
+ int cap;
+ int left;
+} SConnBuffer;
+
+void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {
+ taosInitServer, taosInitClient};
+
+void* rpcOpen(const SRpcInit* pInit) {
+ SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo));
+ if (pRpc == NULL) {
+ return NULL;
+ }
+ if (pInit->label) {
+ tstrncpy(pRpc->label, pInit->label, strlen(pInit->label));
+ }
+ pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads;
+ pRpc->connType = pInit->connType;
+ pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc);
+
+ return pRpc;
+}
+void rpcClose(void* arg) { return; }
+void* rpcMallocCont(int contLen) { return NULL; }
+void rpcFreeCont(void* cont) { return; }
+void* rpcReallocCont(void* ptr, int contLen) { return NULL; }
+
+void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {}
+int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; }
+void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; }
+int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; }
+void rpcCancelRequest(int64_t rid) { return; }
+
+int32_t rpcInit(void) {
+ // impl later
+ return -1;
+}
+
+void rpcCleanup(void) {
+ // impl later
+ return;
+}
+#endif
diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c
new file mode 100644
index 0000000000..8ba1f1a0f6
--- /dev/null
+++ b/source/libs/transport/src/transCli.c
@@ -0,0 +1,194 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifdef USE_UV
+
+#include "transComm.h"
+
+typedef struct SCliConn {
+ uv_connect_t connReq;
+ uv_stream_t* stream;
+ void* data;
+ queue conn;
+} SCliConn;
+typedef struct SCliMsg {
+ SRpcReqContext* context;
+ queue q;
+} SCliMsg;
+
+typedef struct SCliThrdObj {
+ pthread_t thread;
+ uv_loop_t* loop;
+ uv_async_t* cliAsync; //
+ void* cache; // conn pool
+ queue msg;
+ pthread_mutex_t msgMtx;
+ void* shandle;
+} SCliThrdObj;
+
+typedef struct SClientObj {
+ char label[TSDB_LABEL_LEN];
+ int32_t index;
+ int numOfThreads;
+ SCliThrdObj** pThreadObj;
+} SClientObj;
+
+static void clientWriteCb(uv_write_t* req, int status);
+static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
+static void clientConnCb(struct uv_connect_s* req, int status);
+static void clientAsyncCb(uv_async_t* handle);
+
+static void* clientThread(void* arg);
+
+static void clientWriteCb(uv_write_t* req, int status) {
+ // impl later
+}
+static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
+ // impl later
+}
+static void clientConnCb(struct uv_connect_s* req, int status) {
+ SCliConn* pConn = req->data;
+ SCliMsg* pMsg = pConn->data;
+ SEpSet* pEpSet = &pMsg->context->epSet;
+
+ char* fqdn = pEpSet->fqdn[pEpSet->inUse];
+ uint32_t port = pEpSet->port[pEpSet->inUse];
+ if (status != 0) {
+ // call user fp later
+ tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status));
+ return;
+ }
+
+ // impl later
+}
+
+static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) {
+ // impl later
+ return NULL;
+}
+static void clientAsyncCb(uv_async_t* handle) {
+ SCliThrdObj* pThrd = handle->data;
+ SCliMsg* pMsg = NULL;
+ pthread_mutex_lock(&pThrd->msgMtx);
+ if (!QUEUE_IS_EMPTY(&pThrd->msg)) {
+ queue* head = QUEUE_HEAD(&pThrd->msg);
+ pMsg = QUEUE_DATA(head, SCliMsg, q);
+ QUEUE_REMOVE(head);
+ }
+ pthread_mutex_unlock(&pThrd->msgMtx);
+
+ SEpSet* pEpSet = &pMsg->context->epSet;
+ char* fqdn = pEpSet->fqdn[pEpSet->inUse];
+ uint32_t port = pEpSet->port[pEpSet->inUse];
+
+ SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port);
+ if (conn != NULL) {
+ // impl later
+ } else {
+ SCliConn* conn = malloc(sizeof(SCliConn));
+
+ conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t));
+ uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream));
+
+ conn->connReq.data = conn;
+ conn->data = pMsg;
+
+ struct sockaddr_in addr;
+ uv_ip4_addr(fqdn, port, &addr);
+ // handle error in callback if connect error
+ uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb);
+ }
+
+ // SRpcReqContext* pCxt = pMsg->context;
+
+ // SRpcHead* pHead = rpcHeadFromCont(pCtx->pCont);
+ // char* msg = (char*)pHead;
+ // int len = rpcMsgLenFromCont(pCtx->contLen);
+ // tmsg_t msgType = pCtx->msgType;
+
+ // impl later
+}
+
+static void* clientThread(void* arg) {
+ SCliThrdObj* pThrd = (SCliThrdObj*)arg;
+
+ QUEUE_INIT(&pThrd->msg);
+ pthread_mutex_init(&pThrd->msgMtx, NULL);
+
+ // QUEUE_INIT(&pThrd->clientCache);
+
+ pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
+ uv_loop_init(pThrd->loop);
+
+ pThrd->cliAsync = malloc(sizeof(uv_async_t));
+ uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb);
+ pThrd->cliAsync->data = pThrd;
+
+ uv_run(pThrd->loop, UV_RUN_DEFAULT);
+}
+
+void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
+ SClientObj* cli = calloc(1, sizeof(SClientObj));
+ memcpy(cli->label, label, strlen(label));
+ cli->numOfThreads = numOfThreads;
+ cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*));
+
+ for (int i = 0; i < cli->numOfThreads; i++) {
+ SCliThrdObj* thrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj));
+
+ thrd->shandle = shandle;
+ int err = pthread_create(&thrd->thread, NULL, clientThread, (void*)(thrd));
+ if (err == 0) {
+ tDebug("sucess to create tranport-client thread %d", i);
+ }
+ cli->pThreadObj[i] = thrd;
+ }
+ return cli;
+}
+
+void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) {
+ // impl later
+ SRpcInfo* pRpc = (SRpcInfo*)shandle;
+
+ int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen);
+
+ SRpcReqContext* pContext;
+ pContext = (SRpcReqContext*)((char*)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext));
+ pContext->ahandle = pMsg->ahandle;
+ pContext->pRpc = (SRpcInfo*)shandle;
+ pContext->epSet = *pEpSet;
+ pContext->contLen = len;
+ pContext->pCont = pMsg->pCont;
+ pContext->msgType = pMsg->msgType;
+ pContext->oldInUse = pEpSet->inUse;
+
+ assert(pRpc->connType == TAOS_CONN_CLIENT);
+ // atomic or not
+ int64_t index = pRpc->index;
+ if (pRpc->index++ >= pRpc->numOfThreads) {
+ pRpc->index = 0;
+ }
+ SCliMsg* msg = malloc(sizeof(SCliMsg));
+ msg->context = pContext;
+
+ SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads];
+
+ pthread_mutex_lock(&thrd->msgMtx);
+ QUEUE_PUSH(&thrd->msg, &msg->q);
+ pthread_mutex_unlock(&thrd->msgMtx);
+
+ uv_async_send(thrd->cliAsync);
+}
+#endif
diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c
new file mode 100644
index 0000000000..f23cfb6e2d
--- /dev/null
+++ b/source/libs/transport/src/transComm.c
@@ -0,0 +1,117 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+#ifdef USE_UV
+
+#include "transComm.h"
+
+int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) {
+ T_MD5_CTX context;
+ int ret = -1;
+
+ tMD5Init(&context);
+ tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
+ tMD5Update(&context, (uint8_t*)pMsg, msgLen);
+ tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
+ tMD5Final(&context);
+
+ if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0;
+
+ return ret;
+}
+void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) {
+ T_MD5_CTX context;
+
+ tMD5Init(&context);
+ tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
+ tMD5Update(&context, (uint8_t*)pMsg, msgLen);
+ tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN);
+ tMD5Final(&context);
+
+ memcpy(pAuth, context.digest, sizeof(context.digest));
+}
+
+int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) {
+ SRpcHead* pHead = rpcHeadFromCont(pCont);
+ int32_t finalLen = 0;
+ int overhead = sizeof(SRpcComp);
+
+ if (!NEEDTO_COMPRESSS_MSG(contLen)) {
+ return contLen;
+ }
+
+ char* buf = malloc(contLen + overhead + 8); // 8 extra bytes
+ if (buf == NULL) {
+ tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen);
+ return contLen;
+ }
+
+ int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead);
+ tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead);
+
+ /*
+ * only the compressed size is less than the value of contLen - overhead, the compression is applied
+ * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message
+ */
+ if (compLen > 0 && compLen < contLen - overhead) {
+ SRpcComp* pComp = (SRpcComp*)pCont;
+ pComp->reserved = 0;
+ pComp->contLen = htonl(contLen);
+ memcpy(pCont + overhead, buf, compLen);
+
+ pHead->comp = 1;
+ tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen);
+ finalLen = compLen + overhead;
+ } else {
+ finalLen = contLen;
+ }
+
+ free(buf);
+ return finalLen;
+}
+
+SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) {
+ int overhead = sizeof(SRpcComp);
+ SRpcHead* pNewHead = NULL;
+ uint8_t* pCont = pHead->content;
+ SRpcComp* pComp = (SRpcComp*)pHead->content;
+
+ if (pHead->comp) {
+ // decompress the content
+ assert(pComp->reserved == 0);
+ int contLen = htonl(pComp->contLen);
+
+ // prepare the temporary buffer to decompress message
+ char* temp = (char*)malloc(contLen + RPC_MSG_OVERHEAD);
+ pNewHead = (SRpcHead*)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext
+
+ if (pNewHead) {
+ int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead;
+ int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char*)pNewHead->content, compLen, contLen);
+ assert(origLen == contLen);
+
+ memcpy(pNewHead, pHead, sizeof(SRpcHead));
+ pNewHead->msgLen = rpcMsgLenFromCont(origLen);
+ /// rpcFreeMsg(pHead); // free the compressed message buffer
+ pHead = pNewHead;
+ tTrace("decomp malloc mem:%p", temp);
+ } else {
+ tError("failed to allocate memory to decompress msg, contLen:%d", contLen);
+ }
+ }
+
+ return pHead;
+}
+
+#endif
diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c
new file mode 100644
index 0000000000..0bf39b9985
--- /dev/null
+++ b/source/libs/transport/src/transSrv.c
@@ -0,0 +1,540 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#ifdef USE_UV
+#include "transComm.h"
+
+typedef struct SConnBuffer {
+ char* buf;
+ int len;
+ int cap;
+ int left;
+} SConnBuffer;
+
+typedef struct SConn {
+ uv_tcp_t* pTcp;
+ uv_write_t* pWriter;
+ uv_timer_t* pTimer;
+
+ uv_async_t* pWorkerAsync;
+ queue queue;
+ int ref;
+ int persist; // persist connection or not
+ SConnBuffer connBuf; // read buf,
+ SConnBuffer writeBuf; // write buf
+ int count;
+ void* shandle; // rpc init
+ void* ahandle; //
+ void* hostThrd;
+ // del later
+ char secured;
+ int spi;
+ char info[64];
+ char user[TSDB_UNI_LEN]; // user ID for the link
+ char secret[TSDB_PASSWORD_LEN];
+ char ckey[TSDB_PASSWORD_LEN]; // ciphering key
+} SConn;
+
+typedef struct SWorkThrdObj {
+ pthread_t thread;
+ uv_pipe_t* pipe;
+ int fd;
+ uv_loop_t* loop;
+ uv_async_t* workerAsync; //
+ queue conn;
+ pthread_mutex_t connMtx;
+ void* shandle;
+} SWorkThrdObj;
+
+typedef struct SServerObj {
+ pthread_t thread;
+ uv_tcp_t server;
+ uv_loop_t* loop;
+ int workerIdx;
+ int numOfThreads;
+ SWorkThrdObj** pThreadObj;
+ uv_pipe_t** pipe;
+ uint32_t ip;
+ uint32_t port;
+} SServerObj;
+
+static const char* notify = "a";
+
+// refactor later
+static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen);
+
+static int uvAuthMsg(SConn* pConn, char* msg, int msgLen);
+
+static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
+static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf);
+static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf);
+static void uvOnTimeoutCb(uv_timer_t* handle);
+static void uvOnWriteCb(uv_write_t* req, int status);
+static void uvOnAcceptCb(uv_stream_t* stream, int status);
+static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf);
+static void uvWorkerAsyncCb(uv_async_t* handle);
+
+// already read complete packet
+static bool readComplete(SConnBuffer* buf);
+
+static SConn* connCreate();
+static void connDestroy(SConn* conn);
+static void uvConnDestroy(uv_handle_t* handle);
+
+// server worke thread
+static void* workerThread(void* arg);
+static void* acceptThread(void* arg);
+
+void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
+ /*
+ * formate of data buffer:
+ * |<-------SRpcReqContext------->|<------------data read from socket----------->|
+ */
+ static const int CAPACITY = 1024;
+
+ SConn* conn = handle->data;
+ SConnBuffer* pBuf = &conn->connBuf;
+ if (pBuf->cap == 0) {
+ pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char));
+ pBuf->len = 0;
+ pBuf->cap = CAPACITY;
+ pBuf->left = -1;
+
+ buf->base = pBuf->buf + RPC_RESERVE_SIZE;
+ buf->len = CAPACITY;
+ } else {
+ if (pBuf->len >= pBuf->cap) {
+ if (pBuf->left == -1) {
+ pBuf->cap *= 2;
+ pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE);
+ } else if (pBuf->len + pBuf->left > pBuf->cap) {
+ pBuf->cap = pBuf->len + pBuf->left;
+ pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE);
+ }
+ }
+ buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE;
+ buf->len = pBuf->cap - pBuf->len;
+ }
+}
+
+// check data read from socket completely or not
+//
+static bool readComplete(SConnBuffer* data) {
+ // TODO(yihao): handle pipeline later
+ SRpcHead rpcHead;
+ int32_t headLen = sizeof(rpcHead);
+ if (data->len >= headLen) {
+ memcpy((char*)&rpcHead, data->buf + RPC_RESERVE_SIZE, headLen);
+ int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen);
+ if (msgLen > data->len) {
+ data->left = msgLen - data->len;
+ return false;
+ } else {
+ return true;
+ }
+ } else {
+ return false;
+ }
+}
+
+static void uvDoProcess(SRecvInfo* pRecv) {
+ SRpcHead* pHead = (SRpcHead*)pRecv->msg;
+ SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle;
+ SConn* pConn = pRecv->thandle;
+
+ tDump(pRecv->msg, pRecv->msgLen);
+
+ terrno = 0;
+ SRpcReqContext* pContest;
+
+ // do auth and check
+}
+
+static int uvAuthMsg(SConn* pConn, char* msg, int len) {
+ SRpcHead* pHead = (SRpcHead*)msg;
+ int code = 0;
+
+ if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) {
+ // secured link, or no authentication
+ pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
+ // tTrace("%s, secured link, no auth is required", pConn->info);
+ return 0;
+ }
+
+ if (!rpcIsReq(pHead->msgType)) {
+ // for response, if code is auth failure, it shall bypass the auth process
+ code = htonl(pHead->code);
+ if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE ||
+ code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED ||
+ code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) {
+ pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen);
+ // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code);
+ return 0;
+ }
+ }
+
+ code = 0;
+ if (pHead->spi == pConn->spi) {
+ // authentication
+ SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest));
+
+ int32_t delta;
+ delta = (int32_t)htonl(pDigest->timeStamp);
+ delta -= (int32_t)taosGetTimestampSec();
+ if (abs(delta) > 900) {
+ tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta);
+ code = TSDB_CODE_RPC_INVALID_TIME_STAMP;
+ } else {
+ if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) {
+ // tDebug("%s, authentication failed, msg discarded", pConn->info);
+ code = TSDB_CODE_RPC_AUTH_FAILURE;
+ } else {
+ pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest);
+ if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client
+ // tTrace("%s, message is authenticated", pConn->info);
+ }
+ }
+ } else {
+ tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi);
+ code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED;
+ }
+
+ return code;
+}
+
+// refers specifically to query or insert timeout
+static void uvHandleActivityTimeout(uv_timer_t* handle) {
+ // impl later
+ SConn* conn = handle->data;
+}
+
+static void uvProcessData(SConn* pConn) {
+ SRecvInfo info;
+ SRecvInfo* p = &info;
+ SConnBuffer* pBuf = &pConn->connBuf;
+ p->msg = pBuf->buf + RPC_RESERVE_SIZE;
+ p->msgLen = pBuf->len;
+ p->ip = 0;
+ p->port = 0;
+ p->shandle = pConn->shandle; //
+ p->thandle = pConn;
+ p->chandle = NULL;
+
+ //
+ SRpcHead* pHead = (SRpcHead*)p->msg;
+ assert(rpcIsReq(pHead->msgType));
+
+ SRpcInfo* pRpc = (SRpcInfo*)p->shandle;
+ pConn->ahandle = (void*)pHead->ahandle;
+ // auth here
+
+ int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen);
+ if (code != 0) {
+ terrno = code;
+ return;
+ }
+ pHead->code = htonl(pHead->code);
+
+ SRpcMsg rpcMsg;
+
+ pHead = rpcDecompressRpcMsg(pHead);
+ rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen);
+ rpcMsg.pCont = pHead->content;
+ rpcMsg.msgType = pHead->msgType;
+ rpcMsg.code = pHead->code;
+ rpcMsg.ahandle = pConn->ahandle;
+ rpcMsg.handle = pConn;
+
+ (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL);
+ uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime, 0);
+ // auth
+ // validate msg type
+}
+
+void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
+ // opt
+ SConn* ctx = cli->data;
+ SConnBuffer* pBuf = &ctx->connBuf;
+ if (nread > 0) {
+ pBuf->len += nread;
+ if (readComplete(pBuf)) {
+ tDebug("alread read complete packet");
+ uvProcessData(ctx);
+ } else {
+ tDebug("read half packet, continue to read");
+ }
+ return;
+ }
+ if (terrno != 0) {
+ // handle err code
+ }
+
+ if (nread != UV_EOF) {
+ tDebug("Read error %s\n", uv_err_name(nread));
+ }
+ uv_close((uv_handle_t*)cli, uvConnDestroy);
+}
+void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
+ buf->base = malloc(sizeof(char));
+ buf->len = 2;
+}
+
+void uvOnTimeoutCb(uv_timer_t* handle) {
+ // opt
+ tDebug("time out");
+}
+
+void uvOnWriteCb(uv_write_t* req, int status) {
+ SConn* conn = req->data;
+ if (status == 0) {
+ tDebug("data already was written on stream");
+ } else {
+ connDestroy(conn);
+ }
+ // opt
+}
+
+void uvWorkerAsyncCb(uv_async_t* handle) {
+ SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync);
+ SConn* conn = NULL;
+
+ // opt later
+ pthread_mutex_lock(&pThrd->connMtx);
+ if (!QUEUE_IS_EMPTY(&pThrd->conn)) {
+ queue* head = QUEUE_HEAD(&pThrd->conn);
+ conn = QUEUE_DATA(head, SConn, queue);
+ QUEUE_REMOVE(head);
+ }
+ pthread_mutex_unlock(&pThrd->connMtx);
+ if (conn == NULL) {
+ tError("except occurred, do nothing");
+ return;
+ }
+ uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len);
+ uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb);
+}
+
+void uvOnAcceptCb(uv_stream_t* stream, int status) {
+ if (status == -1) {
+ return;
+ }
+ SServerObj* pObj = container_of(stream, SServerObj, server);
+
+ uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
+ uv_tcp_init(pObj->loop, cli);
+
+ if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
+ uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));
+
+ uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
+
+ pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
+ tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
+ uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb);
+ } else {
+ uv_close((uv_handle_t*)cli, NULL);
+ }
+}
+void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
+ tDebug("connection coming");
+ if (nread < 0) {
+ if (nread != UV_EOF) {
+ tError("read error %s", uv_err_name(nread));
+ }
+ // TODO(log other failure reason)
+ uv_close((uv_handle_t*)q, NULL);
+ return;
+ }
+ // free memory allocated by
+ assert(nread == strlen(notify));
+ assert(buf->base[0] == notify[0]);
+ free(buf->base);
+
+ SWorkThrdObj* pThrd = q->data;
+
+ uv_pipe_t* pipe = (uv_pipe_t*)q;
+ if (!uv_pipe_pending_count(pipe)) {
+ tError("No pending count");
+ return;
+ }
+
+ uv_handle_type pending = uv_pipe_pending_type(pipe);
+ assert(pending == UV_TCP);
+
+ SConn* pConn = connCreate();
+ pConn->shandle = pThrd->shandle;
+ /* init conn timer*/
+ pConn->pTimer = malloc(sizeof(uv_timer_t));
+ uv_timer_init(pThrd->loop, pConn->pTimer);
+ pConn->pTimer->data = pConn;
+
+ pConn->hostThrd = pThrd;
+ pConn->pWorkerAsync = pThrd->workerAsync; // thread safty
+
+ // init client handle
+ pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t));
+ uv_tcp_init(pThrd->loop, pConn->pTcp);
+ pConn->pTcp->data = pConn;
+
+ // init write request, just
+ pConn->pWriter = calloc(1, sizeof(uv_write_t));
+ pConn->pWriter->data = pConn;
+
+ if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) {
+ uv_os_fd_t fd;
+ uv_fileno((const uv_handle_t*)pConn->pTcp, &fd);
+ tDebug("new connection created: %d", fd);
+ uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb);
+ } else {
+ connDestroy(pConn);
+ }
+}
+
+void* acceptThread(void* arg) {
+ // opt
+ SServerObj* srv = (SServerObj*)arg;
+ uv_tcp_init(srv->loop, &srv->server);
+
+ struct sockaddr_in bind_addr;
+
+ uv_ip4_addr("0.0.0.0", srv->port, &bind_addr);
+ uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0);
+ int err = 0;
+ if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) {
+ tError("Listen error %s\n", uv_err_name(err));
+ return NULL;
+ }
+ uv_run(srv->loop, UV_RUN_DEFAULT);
+}
+void* workerThread(void* arg) {
+ SWorkThrdObj* pThrd = (SWorkThrdObj*)arg;
+
+ pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
+ uv_loop_init(pThrd->loop);
+
+ // SRpcInfo* pRpc = pThrd->shandle;
+ uv_pipe_init(pThrd->loop, pThrd->pipe, 1);
+ uv_pipe_open(pThrd->pipe, pThrd->fd);
+
+ pThrd->pipe->data = pThrd;
+
+ QUEUE_INIT(&pThrd->conn);
+ pthread_mutex_init(&pThrd->connMtx, NULL);
+
+ pThrd->workerAsync = malloc(sizeof(uv_async_t));
+ uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb);
+
+ uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb);
+ uv_run(pThrd->loop, UV_RUN_DEFAULT);
+}
+
+static SConn* connCreate() {
+ SConn* pConn = (SConn*)calloc(1, sizeof(SConn));
+ return pConn;
+}
+static void connDestroy(SConn* conn) {
+ if (conn == NULL) {
+ return;
+ }
+ uv_timer_stop(conn->pTimer);
+ free(conn->pTimer);
+ uv_close((uv_handle_t*)conn->pTcp, NULL);
+ free(conn->connBuf.buf);
+ free(conn->pTcp);
+ free(conn->pWriter);
+ free(conn);
+ // handle
+}
+static void uvConnDestroy(uv_handle_t* handle) {
+ SConn* conn = handle->data;
+ connDestroy(conn);
+}
+static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen) {
+ SRpcHead* pHead = (SRpcHead*)msg;
+
+ if (pConn->spi && pConn->secured == 0) {
+ // add auth part
+ pHead->spi = pConn->spi;
+ SRpcDigest* pDigest = (SRpcDigest*)(msg + msgLen);
+ pDigest->timeStamp = htonl(taosGetTimestampSec());
+ msgLen += sizeof(SRpcDigest);
+ pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
+ rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret);
+ } else {
+ pHead->spi = 0;
+ pHead->msgLen = (int32_t)htonl((uint32_t)msgLen);
+ }
+
+ return msgLen;
+}
+
+void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) {
+ SServerObj* srv = calloc(1, sizeof(SServerObj));
+ srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t));
+ srv->numOfThreads = numOfThreads;
+ srv->workerIdx = 0;
+ srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*));
+ srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*));
+ srv->ip = ip;
+ srv->port = port;
+ uv_loop_init(srv->loop);
+
+ for (int i = 0; i < srv->numOfThreads; i++) {
+ SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj));
+ srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t));
+ int fds[2];
+ if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) {
+ return NULL;
+ }
+ uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1);
+ uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write
+
+ thrd->shandle = shandle;
+ thrd->fd = fds[0];
+ thrd->pipe = &(srv->pipe[i][1]); // init read
+ int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd));
+ if (err == 0) {
+ tDebug("sucess to create worker-thread %d", i);
+ // printf("thread %d create\n", i);
+ } else {
+ // TODO: clear all other resource later
+ tError("failed to create worker-thread %d", i);
+ }
+ srv->pThreadObj[i] = thrd;
+ }
+
+ int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv);
+ if (err == 0) {
+ tDebug("success to create accept-thread");
+ } else {
+ // clear all resource later
+ }
+
+ return srv;
+}
+
+void rpcSendResponse(const SRpcMsg* pMsg) {
+ SConn* pConn = pMsg->handle;
+ SWorkThrdObj* pThrd = pConn->hostThrd;
+
+ // opt later
+ pthread_mutex_lock(&pThrd->connMtx);
+ QUEUE_PUSH(&pThrd->conn, &pConn->queue);
+ pthread_mutex_unlock(&pThrd->connMtx);
+
+ uv_async_send(pConn->pWorkerAsync);
+}
+
+#endif
diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc
index 151deaf29b..53910aa30c 100644
--- a/source/libs/transport/test/transportTests.cc
+++ b/source/libs/transport/test/transportTests.cc
@@ -22,6 +22,7 @@
#include
#include
+#include "transComm.h"
#include "transportInt.h"
#include "trpc.h"
@@ -46,7 +47,7 @@ class QueueObj {
if (!IsEmpty()) {
queue *h = QUEUE_HEAD(&head);
el = QUEUE_DATA(h, QueueElem, q);
- QUEUE_REMOVE(&el->q);
+ QUEUE_REMOVE(h);
}
return el;
}
diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c
index d630080086..a3894ceedd 100644
--- a/source/libs/wal/src/walMeta.c
+++ b/source/libs/wal/src/walMeta.c
@@ -149,6 +149,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
}
+ closedir(dir);
regfree(&logRegPattern);
regfree(&idxRegPattern);
diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c
index 0be17ca2b9..cae1b18b3c 100644
--- a/source/os/src/osSysinfo.c
+++ b/source/os/src/osSysinfo.c
@@ -121,7 +121,7 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) {
return true;
}
-int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
+int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
unsigned _int64 i64FreeBytesToCaller;
unsigned _int64 i64TotalBytes;
unsigned _int64 i64FreeBytes;
@@ -438,7 +438,7 @@ int taosSystem(const char *cmd) {
void taosSetCoreDump() {}
-int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
+int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
struct statvfs info;
if (statvfs(dataDir, &info)) {
//printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno));
@@ -771,13 +771,12 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) {
return true;
}
-int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
+int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
struct statvfs info;
if (statvfs(dataDir, &info)) {
- //printf("failed to get disk size, dataDir:%s errno:%s", dataDir, strerror(errno));
return -1;
} else {
- diskSize->tsize = info.f_blocks * info.f_frsize;
+ diskSize->total = info.f_blocks * info.f_frsize;
diskSize->avail = info.f_bavail * info.f_frsize;
diskSize->used = (info.f_blocks - info.f_bfree) * info.f_frsize;
return 0;
diff --git a/source/util/src/terror.c b/source/util/src/terror.c
index f520585315..a5dd1483ec 100644
--- a/source/util/src/terror.c
+++ b/source/util/src/terror.c
@@ -397,7 +397,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit")
// tfs
-TAOS_DEFINE_ERROR(TSDB_CODE_FS_OUT_OF_MEMORY, "tfs out of memory")
+TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_TOO_MANY_MOUNT, "tfs too many mount")
TAOS_DEFINE_ERROR(TSDB_CODE_FS_DUP_PRIMARY, "tfs duplicate primary mount")
diff --git a/src/inc/tfs.h b/src/inc/tfs.h
deleted file mode 100644
index 9ad7a8f66e..0000000000
--- a/src/inc/tfs.h
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#ifndef TD_TFS_H
-#define TD_TFS_H
-
-#include "tglobal.h"
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-typedef struct {
- int level;
- int id;
-} SDiskID;
-
-#define TFS_UNDECIDED_LEVEL -1
-#define TFS_UNDECIDED_ID -1
-#define TFS_PRIMARY_LEVEL 0
-#define TFS_PRIMARY_ID 0
-#define TFS_MIN_LEVEL 0
-#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1)
-
-// FS APIs ====================================
-typedef struct {
- int64_t tsize;
- int64_t used;
- int64_t avail;
-} SFSMeta;
-
-typedef struct {
- int64_t size;
- int64_t used;
- int64_t free;
- int16_t nAvailDisks; // # of Available disks
-} STierMeta;
-
-int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
-void tfsCleanup();
-void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels);
-void tfsGetMeta(SFSMeta *pMeta);
-void tfsAllocDisk(int expLevel, int *level, int *id);
-
-const char *TFS_PRIMARY_PATH();
-const char *TFS_DISK_PATH(int level, int id);
-
-// TFILE APIs ====================================
-typedef struct {
- int level;
- int id;
- char rname[TSDB_FILENAME_LEN]; // REL name
- char aname[TSDB_FILENAME_LEN]; // ABS name
-} TFILE;
-
-#define TFILE_LEVEL(pf) ((pf)->level)
-#define TFILE_ID(pf) ((pf)->id)
-#define TFILE_NAME(pf) ((pf)->aname)
-#define TFILE_REL_NAME(pf) ((pf)->rname)
-
-#define tfsopen(pf, flags) open(TFILE_NAME(pf), flags)
-#define tfsclose(fd) close(fd)
-#define tfsremove(pf) remove(TFILE_NAME(pf))
-#define tfscopy(sf, df) taosCopy(TFILE_NAME(sf), TFILE_NAME(df))
-#define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df))
-
-void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
-bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2);
-int tfsEncodeFile(void **buf, TFILE *pf);
-void *tfsDecodeFile(void *buf, TFILE *pf);
-void tfsbasename(const TFILE *pf, char *dest);
-void tfsdirname(const TFILE *pf, char *dest);
-
-// DIR APIs ====================================
-int tfsMkdirAt(const char *rname, int level, int id);
-int tfsMkdirRecurAt(const char *rname, int level, int id);
-int tfsMkdir(const char *rname);
-int tfsRmdir(const char *rname);
-int tfsRename(char *orname, char *nrname);
-
-typedef struct TDIR TDIR;
-
-TDIR * tfsOpendir(const char *rname);
-const TFILE *tfsReaddir(TDIR *tdir);
-void tfsClosedir(TDIR *tdir);
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif
diff --git a/tests/script/sh/massiveTable/cleanCluster.sh b/tests/script/sh/massiveTable/cleanCluster.sh
new file mode 100755
index 0000000000..af278933b2
--- /dev/null
+++ b/tests/script/sh/massiveTable/cleanCluster.sh
@@ -0,0 +1,57 @@
+#!/bin/bash
+#
+# clean test environment
+
+set -e
+#set -x
+
+# cleanCluster.sh
+# -r [ dnode root dir]
+
+
+dataRootDir="/data"
+
+
+while getopts "hr:" arg
+do
+ case $arg in
+ r)
+ dataRootDir=$(echo $OPTARG)
+ ;;
+ h)
+ echo "Usage: `basename $0` -r [ dnode root dir] "
+ exit 0
+ ;;
+ ?) #unknow option
+ echo "unkonw argument"
+ exit 1
+ ;;
+ esac
+done
+
+
+rmDnodesDataDir() {
+ if [ -d ${dataRootDir} ]; then
+ rm -rf ${dataRootDir}/dnode*
+ else
+ echo "${dataRootDir} not exist"
+ exit 1
+ fi
+}
+
+function kill_process() {
+ pid=$(ps -ef | grep "$1" | grep -v "grep" | awk '{print $2}')
+ if [ -n "$pid" ]; then
+ kill -9 $pid || :
+ fi
+}
+
+########################################################################################
+############################### main process ##########################################
+
+## kill all taosd process
+kill_process taosd
+
+rmDnodesDataDir
+
+
diff --git a/tests/script/sh/massiveTable/compileVersion.sh b/tests/script/sh/massiveTable/compileVersion.sh
new file mode 100755
index 0000000000..1976e8f14a
--- /dev/null
+++ b/tests/script/sh/massiveTable/compileVersion.sh
@@ -0,0 +1,81 @@
+#!/bin/bash
+#
+# compile test version
+
+set -e
+#set -x
+
+# compileVersion.sh
+# -r [ TDengine project dir]
+# -v [ TDengine branch version ]
+
+
+projectDir=/root/TDengine
+TDengineBrVer="3.0"
+
+while getopts "hr:v:" arg
+do
+ case $arg in
+ r)
+ projectDir=$(echo $OPTARG)
+ ;;
+ v)
+ TDengineBrVer=$(echo $OPTARG)
+ ;;
+ h)
+ echo "Usage: `basename $0` -r [ TDengine project dir] "
+ echo " -v [ TDengine branch version] "
+ exit 0
+ ;;
+ ?) #unknow option
+ echo "unkonw argument"
+ exit 1
+ ;;
+ esac
+done
+
+echo "projectDir=${projectDir} TDengineBrVer=${TDengineBrVer}"
+
+function gitPullBranchInfo () {
+ branch_name=$1
+
+ git checkout $branch_name
+ echo "==== git pull $branch_name start ===="
+## git submodule update --init --recursive
+ git pull origin $branch_name ||:
+ echo "==== git pull $branch_name end ===="
+}
+
+function compileTDengineVersion() {
+ debugDir=debug
+ if [ -d ${debugDir} ]; then
+ rm -rf ${debugDir}/* ||:
+ else
+ mkdir -p ${debugDir}
+ fi
+
+ cd ${debugDir}
+ cmake ..
+ make -j24
+}
+########################################################################################
+############################### main process ##########################################
+
+## checkout all branchs and git pull
+cd ${projectDir}
+gitPullBranchInfo $TDengineBrVer
+compileTDengineVersion
+
+taos_dir=${projectDir}/debug/tools/shell
+taosd_dir=${projectDir}/debug/source/dnode/mgmt/daemon
+create_table_dir=${projectDir}/debug/tests/test/c
+
+rm -f /usr/bin/taos
+rm -f /usr/bin/taosd
+rm -f /usr/bin/create_table
+
+ln -s $taos_dir/taos /usr/bin/taos
+ln -s $taosd_dir/taosd /usr/bin/taosd
+ln -s $create_table_dir/create_table /usr/bin/create_table
+
+
diff --git a/tests/script/sh/massiveTable/deployCluster.sh b/tests/script/sh/massiveTable/deployCluster.sh
new file mode 100755
index 0000000000..2341d512c0
--- /dev/null
+++ b/tests/script/sh/massiveTable/deployCluster.sh
@@ -0,0 +1,26 @@
+#!/bin/bash
+#
+# deploy test cluster
+
+set -e
+#set -x
+
+# deployCluster.sh
+
+curr_dir=$(pwd)
+echo "currect pwd: ${curr_dir}"
+
+./cleanCluster.sh -r "/data"
+./cleanCluster.sh -r "/data2"
+
+./compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0"
+
+./setupDnodes.sh -r "/data" -n 1 -f "trd02:7000" -p 7000
+./setupDnodes.sh -r "/data2" -n 1 -f "trd02:7000" -p 8000
+
+#./setupDnodes.sh -r "/data" -n 2 -f trd02:7000 -p 7000
+#./setupDnodes.sh -r "/data2" -n 2 -f trd02:7000 -p 8000
+
+
+
+
diff --git a/tests/script/sh/massiveTable/setupDnodes.sh b/tests/script/sh/massiveTable/setupDnodes.sh
new file mode 100755
index 0000000000..e09b2fb396
--- /dev/null
+++ b/tests/script/sh/massiveTable/setupDnodes.sh
@@ -0,0 +1,137 @@
+#!/bin/bash
+#
+# setup test environment
+
+set -e
+#set -x
+
+# setupDnodes.sh
+# -e [ new | old]
+# -n [ dnode number]
+# -f [ first ep]
+# -p [ start port]
+# -r [ dnode root dir]
+
+# set parameters by default value
+enviMode=new
+dataRootDir="/data"
+firstEp="localhost:7000"
+startPort=7000
+dnodeNumber=1
+
+
+while getopts "he:f:n:r:p:" arg
+do
+ case $arg in
+ e)
+ enviMode=$( echo $OPTARG )
+ ;;
+ n)
+ dnodeNumber=$(echo $OPTARG)
+ ;;
+ f)
+ firstEp=$(echo $OPTARG)
+ ;;
+ p)
+ startPort=$(echo $OPTARG)
+ ;;
+ r)
+ dataRootDir=$(echo $OPTARG)
+ ;;
+ h)
+ echo "Usage: `basename $0` -e [new | old] "
+ echo " -n [ dnode number] "
+ echo " -f [ first ep] "
+ echo " -p [ start port] "
+ echo " -r [ dnode root dir] "
+ exit 0
+ ;;
+ ?) #unknow option
+ echo "unkonw argument"
+ exit 1
+ ;;
+ esac
+done
+
+echo "enviMode=${enviMode} dnodeNumber=${dnodeNumber} dataRootDir=${dataRootDir} firstEp=${firstEp} startPort=${startPort}"
+
+#curr_dir=$(pwd)
+
+
+createNewCfgFile() {
+ cfgFile=$1/taos.cfg
+ dataDir=$2
+ logDir=$3
+ firstEp=$4
+ serverPort=$5
+
+ echo "debugFlag 131" > ${cfgFile}
+ echo "firstEp ${firstEp}" >> ${cfgFile}
+ echo "dataDir ${dataDir}" >> ${cfgFile}
+ echo "logDir ${logDir}" >> ${cfgFile}
+ echo "serverPort ${serverPort}" >> ${cfgFile}
+
+ echo "supportVnodes 1024" >> ${cfgFile}
+ #echo "asyncLog 0" >> ${cfgFile}
+ echo "telemetryReporting 0" >> ${cfgFile}
+}
+
+createNewDnodesDataDir() {
+ if [ -d ${dataRootDir} ]; then
+ rm -rf ${dataRootDir}/dnode*
+ else
+ echo "${dataRootDir} not exist"
+ exit 1
+ fi
+
+ dnodeNumber=$1
+ firstEp=$2
+
+ serverPort=${startPort}
+ for ((i=0; i<${dnodeNumber}; i++)); do
+ mkdir -p ${dataRootDir}/dnode_${i}/cfg
+ mkdir -p ${dataRootDir}/dnode_${i}/log
+ mkdir -p ${dataRootDir}/dnode_${i}/data
+
+ createNewCfgFile ${dataRootDir}/dnode_${i}/cfg ${dataRootDir}/dnode_${i}/data ${dataRootDir}/dnode_${i}/log ${firstEp} ${serverPort}
+ #echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}"
+ serverPort=$((10#${serverPort}+100))
+ done
+}
+
+function kill_process() {
+ pid=$(ps -ef | grep "$1" | grep -v "grep" | awk '{print $2}')
+ if [ -n "$pid" ]; then
+ kill -9 $pid || :
+ fi
+}
+
+startDnodes() {
+ dnodeNumber=$1
+
+ for ((i=0; i<${dnodeNumber}; i++)); do
+ if [ -d ${dataRootDir}/dnode_${i} ]; then
+ nohup taosd -c ${dataRootDir}/dnode_${i}/cfg >/dev/null 2>&1 &
+ echo "start taosd ${dataRootDir}/dnode_${i}"
+ fi
+ done
+}
+
+########################################################################################
+############################### main process ##########################################
+
+## kill all taosd process
+kill_process taosd
+
+## create director for all dnode
+if [[ "$enviMode" == "new" ]]; then
+ createNewDnodesDataDir ${dnodeNumber} ${firstEp}
+fi
+
+## start all dnode by nohup
+startDnodes ${dnodeNumber}
+
+echo "====run setupDnodes.sh end===="
+echo " "
+
+