Merge remote-tracking branch 'origin/3.0' into feature/3.0_liaohj
This commit is contained in:
commit
3b23380c47
|
@ -188,16 +188,19 @@ void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp);
|
||||||
|
|
||||||
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
|
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
|
||||||
SClientHbReq* req = (SClientHbReq*)pReq;
|
SClientHbReq* req = (SClientHbReq*)pReq;
|
||||||
taosHashCleanup(req->info);
|
if (req->info) taosHashCleanup(req->info);
|
||||||
free(pReq);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
|
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
|
||||||
void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq);
|
void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq);
|
||||||
|
|
||||||
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq) {
|
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
|
||||||
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq;
|
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq;
|
||||||
//taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
|
if (deep) {
|
||||||
|
taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
|
||||||
|
} else {
|
||||||
|
taosArrayDestroy(req->reqs);
|
||||||
|
}
|
||||||
free(pReq);
|
free(pReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,7 @@ int32_t dsCreateDataSinker(const struct SDataSink *pDataSink, DataSinkHandle* pH
|
||||||
*/
|
*/
|
||||||
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue);
|
int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pContinue);
|
||||||
|
|
||||||
void dsEndPut(DataSinkHandle handle, int64_t useconds);
|
void dsEndPut(DataSinkHandle handle, uint64_t useconds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the length of the data returned by the next call to dsGetDataBlock.
|
* Get the length of the data returned by the next call to dsGetDataBlock.
|
||||||
|
|
|
@ -20,6 +20,8 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#include "common.h"
|
||||||
|
|
||||||
typedef void* qTaskInfo_t;
|
typedef void* qTaskInfo_t;
|
||||||
typedef void* DataSinkHandle;
|
typedef void* DataSinkHandle;
|
||||||
struct SSubplan;
|
struct SSubplan;
|
||||||
|
@ -34,7 +36,7 @@ struct SSubplan;
|
||||||
* @param qId
|
* @param qId
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo);
|
int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The main task execution function, including query on both table and multiple tables,
|
* The main task execution function, including query on both table and multiple tables,
|
||||||
|
@ -44,7 +46,7 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, struct SSubplan* pPlan, qTaskI
|
||||||
* @param handle
|
* @param handle
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
struct SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle);
|
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Retrieve the produced results information, if current query is not paused or completed,
|
* Retrieve the produced results information, if current query is not paused or completed,
|
||||||
|
|
|
@ -13,8 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef TD_TFS_H
|
#ifndef _TD_TFS_H_
|
||||||
#define TD_TFS_H
|
#define _TD_TFS_H_
|
||||||
|
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
|
||||||
|
@ -23,8 +23,8 @@ extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int level;
|
int32_t level;
|
||||||
int id;
|
int32_t id;
|
||||||
} SDiskID;
|
} SDiskID;
|
||||||
|
|
||||||
#define TFS_UNDECIDED_LEVEL -1
|
#define TFS_UNDECIDED_LEVEL -1
|
||||||
|
@ -36,33 +36,25 @@ typedef struct {
|
||||||
|
|
||||||
// FS APIs ====================================
|
// FS APIs ====================================
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t tsize;
|
int64_t total;
|
||||||
int64_t used;
|
int64_t used;
|
||||||
int64_t avail;
|
int64_t avail;
|
||||||
} SFSMeta;
|
} SFSMeta;
|
||||||
|
|
||||||
typedef struct {
|
int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk);
|
||||||
int64_t size;
|
void tfsCleanup();
|
||||||
int64_t used;
|
void tfsUpdateSize(SFSMeta *pFSMeta);
|
||||||
int64_t free;
|
void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id);
|
||||||
int16_t nAvailDisks; // # of Available disks
|
|
||||||
} STierMeta;
|
|
||||||
|
|
||||||
int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
|
|
||||||
void tfsCleanup();
|
|
||||||
void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels);
|
|
||||||
void tfsGetMeta(SFSMeta *pMeta);
|
|
||||||
void tfsAllocDisk(int expLevel, int *level, int *id);
|
|
||||||
|
|
||||||
const char *TFS_PRIMARY_PATH();
|
const char *TFS_PRIMARY_PATH();
|
||||||
const char *TFS_DISK_PATH(int level, int id);
|
const char *TFS_DISK_PATH(int32_t level, int32_t id);
|
||||||
|
|
||||||
// TFILE APIs ====================================
|
// TFILE APIs ====================================
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int level;
|
int32_t level;
|
||||||
int id;
|
int32_t id;
|
||||||
char rname[TSDB_FILENAME_LEN]; // REL name
|
char rname[TSDB_FILENAME_LEN]; // REL name
|
||||||
char aname[TSDB_FILENAME_LEN]; // ABS name
|
char aname[TSDB_FILENAME_LEN]; // ABS name
|
||||||
} TFILE;
|
} TFILE;
|
||||||
|
|
||||||
#define TFILE_LEVEL(pf) ((pf)->level)
|
#define TFILE_LEVEL(pf) ((pf)->level)
|
||||||
|
@ -76,23 +68,23 @@ typedef struct {
|
||||||
#define tfscopy(sf, df) taosCopyFile(TFILE_NAME(sf), TFILE_NAME(df))
|
#define tfscopy(sf, df) taosCopyFile(TFILE_NAME(sf), TFILE_NAME(df))
|
||||||
#define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df))
|
#define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df))
|
||||||
|
|
||||||
void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
|
void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname);
|
||||||
bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2);
|
bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2);
|
||||||
int tfsEncodeFile(void **buf, TFILE *pf);
|
int32_t tfsEncodeFile(void **buf, TFILE *pf);
|
||||||
void *tfsDecodeFile(void *buf, TFILE *pf);
|
void *tfsDecodeFile(void *buf, TFILE *pf);
|
||||||
void tfsbasename(const TFILE *pf, char *dest);
|
void tfsbasename(const TFILE *pf, char *dest);
|
||||||
void tfsdirname(const TFILE *pf, char *dest);
|
void tfsdirname(const TFILE *pf, char *dest);
|
||||||
|
|
||||||
// DIR APIs ====================================
|
// DIR APIs ====================================
|
||||||
int tfsMkdirAt(const char *rname, int level, int id);
|
int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id);
|
||||||
int tfsMkdirRecurAt(const char *rname, int level, int id);
|
int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id);
|
||||||
int tfsMkdir(const char *rname);
|
int32_t tfsMkdir(const char *rname);
|
||||||
int tfsRmdir(const char *rname);
|
int32_t tfsRmdir(const char *rname);
|
||||||
int tfsRename(char *orname, char *nrname);
|
int32_t tfsRename(char *orname, char *nrname);
|
||||||
|
|
||||||
typedef struct TDIR TDIR;
|
typedef struct TDIR TDIR;
|
||||||
|
|
||||||
TDIR * tfsOpendir(const char *rname);
|
TDIR *tfsOpendir(const char *rname);
|
||||||
const TFILE *tfsReaddir(TDIR *tdir);
|
const TFILE *tfsReaddir(TDIR *tdir);
|
||||||
void tfsClosedir(TDIR *tdir);
|
void tfsClosedir(TDIR *tdir);
|
||||||
|
|
||||||
|
@ -100,4 +92,4 @@ void tfsClosedir(TDIR *tdir);
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif
|
#endif /*_TD_TFS_H_*/
|
||||||
|
|
|
@ -35,12 +35,12 @@ extern char tsLocale[];
|
||||||
extern char tsCharset[]; // default encode string
|
extern char tsCharset[]; // default encode string
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t tsize;
|
int64_t total;
|
||||||
int64_t used;
|
int64_t used;
|
||||||
int64_t avail;
|
int64_t avail;
|
||||||
} SysDiskSize;
|
} SDiskSize;
|
||||||
|
|
||||||
int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize);
|
int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize);
|
||||||
int32_t taosGetCpuCores();
|
int32_t taosGetCpuCores();
|
||||||
void taosGetSystemInfo();
|
void taosGetSystemInfo();
|
||||||
bool taosReadProcIO(int64_t *rchars, int64_t *wchars);
|
bool taosReadProcIO(int64_t *rchars, int64_t *wchars);
|
||||||
|
|
|
@ -411,7 +411,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) //"WAL out of memory")
|
#define TSDB_CODE_WAL_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x1004) //"WAL out of memory")
|
||||||
|
|
||||||
// tfs
|
// tfs
|
||||||
#define TSDB_CODE_FS_OUT_OF_MEMORY TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory")
|
#define TSDB_CODE_FS_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x2200) //"tfs out of memory")
|
||||||
#define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201) //"tfs invalid mount config")
|
#define TSDB_CODE_FS_INVLD_CFG TAOS_DEF_ERROR_CODE(0, 0x2201) //"tfs invalid mount config")
|
||||||
#define TSDB_CODE_FS_TOO_MANY_MOUNT TAOS_DEF_ERROR_CODE(0, 0x2202) //"tfs too many mount")
|
#define TSDB_CODE_FS_TOO_MANY_MOUNT TAOS_DEF_ERROR_CODE(0, 0x2202) //"tfs too many mount")
|
||||||
#define TSDB_CODE_FS_DUP_PRIMARY TAOS_DEF_ERROR_CODE(0, 0x2203) //"tfs duplicate primary mount")
|
#define TSDB_CODE_FS_DUP_PRIMARY TAOS_DEF_ERROR_CODE(0, 0x2203) //"tfs duplicate primary mount")
|
||||||
|
|
|
@ -60,15 +60,17 @@ SClientHbBatchReq* hbGatherAllInfo(SAppHbMgr *pAppHbMgr) {
|
||||||
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
pIter = taosHashIterate(pAppHbMgr->activeInfo, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL);
|
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, NULL);
|
||||||
while (pIter != NULL) {
|
while (pIter != NULL) {
|
||||||
FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
|
FGetConnInfo getConnInfoFp = (FGetConnInfo)pIter;
|
||||||
SClientHbKey connKey;
|
SClientHbKey connKey;
|
||||||
taosHashCopyKey(pIter, &connKey);
|
taosHashCopyKey(pIter, &connKey);
|
||||||
getConnInfoFp(connKey, NULL);
|
SArray* pArray = getConnInfoFp(connKey, NULL);
|
||||||
|
|
||||||
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter);
|
pIter = taosHashIterate(pAppHbMgr->getInfoFuncs, pIter);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
return pBatchReq;
|
return pBatchReq;
|
||||||
}
|
}
|
||||||
|
@ -99,12 +101,12 @@ static void* hbThreadFunc(void* param) {
|
||||||
//TODO: error handling
|
//TODO: error handling
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
void *bufCopy = buf;
|
void *abuf = buf;
|
||||||
tSerializeSClientHbBatchReq(&bufCopy, pReq);
|
tSerializeSClientHbBatchReq(&abuf, pReq);
|
||||||
SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo));
|
SMsgSendInfo *pInfo = malloc(sizeof(SMsgSendInfo));
|
||||||
if (pInfo == NULL) {
|
if (pInfo == NULL) {
|
||||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
tFreeClientHbBatchReq(pReq);
|
tFreeClientHbBatchReq(pReq, false);
|
||||||
free(buf);
|
free(buf);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -120,7 +122,7 @@ static void* hbThreadFunc(void* param) {
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
|
SEpSet epSet = getEpSet_s(&pAppInstInfo->mgmtEp);
|
||||||
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
|
asyncSendMsgToServer(pAppInstInfo->pTransporter, &epSet, &transporterId, pInfo);
|
||||||
tFreeClientHbBatchReq(pReq);
|
tFreeClientHbBatchReq(pReq, false);
|
||||||
|
|
||||||
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
|
atomic_add_fetch_32(&pAppHbMgr->reportCnt, 1);
|
||||||
}
|
}
|
||||||
|
@ -155,6 +157,9 @@ SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo) {
|
||||||
}
|
}
|
||||||
// init stat
|
// init stat
|
||||||
pAppHbMgr->startTime = taosGetTimestampMs();
|
pAppHbMgr->startTime = taosGetTimestampMs();
|
||||||
|
pAppHbMgr->connKeyCnt = 0;
|
||||||
|
pAppHbMgr->reportCnt = 0;
|
||||||
|
pAppHbMgr->reportBytes = 0;
|
||||||
|
|
||||||
// init app info
|
// init app info
|
||||||
pAppHbMgr->pAppInstInfo = pAppInstInfo;
|
pAppHbMgr->pAppInstInfo = pAppInstInfo;
|
||||||
|
|
|
@ -325,6 +325,19 @@ typedef struct SMqTopicConsumer {
|
||||||
} SMqTopicConsumer;
|
} SMqTopicConsumer;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef struct SMqConsumerEp {
|
||||||
|
int32_t vgId;
|
||||||
|
SEpSet epset;
|
||||||
|
int64_t consumerId;
|
||||||
|
} SMqConsumerEp;
|
||||||
|
|
||||||
|
typedef struct SMqCgroupTopicPair {
|
||||||
|
char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN];
|
||||||
|
SArray* assigned; // SArray<SMqConsumerEp>
|
||||||
|
SArray* unassignedConsumer;
|
||||||
|
SArray* unassignedVg;
|
||||||
|
} SMqCgroupTopicPair;
|
||||||
|
|
||||||
typedef struct SMqCGroup {
|
typedef struct SMqCGroup {
|
||||||
char name[TSDB_CONSUMER_GROUP_LEN];
|
char name[TSDB_CONSUMER_GROUP_LEN];
|
||||||
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal
|
int32_t status; // 0 - uninitialized, 1 - wait rebalance, 2- normal
|
||||||
|
@ -351,10 +364,11 @@ typedef struct SMqTopicObj {
|
||||||
|
|
||||||
// TODO: add cache and change name to id
|
// TODO: add cache and change name to id
|
||||||
typedef struct SMqConsumerTopic {
|
typedef struct SMqConsumerTopic {
|
||||||
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
char name[TSDB_TOPIC_NAME_LEN];
|
|
||||||
//TODO: replace with something with ep
|
//TODO: replace with something with ep
|
||||||
SList *vgroups; // SList<int32_t>
|
SList *vgroups; // SList<int32_t>
|
||||||
|
SArray *pVgInfo; // SArray<int32_t>
|
||||||
} SMqConsumerTopic;
|
} SMqConsumerTopic;
|
||||||
|
|
||||||
typedef struct SMqConsumerObj {
|
typedef struct SMqConsumerObj {
|
||||||
|
@ -362,7 +376,7 @@ typedef struct SMqConsumerObj {
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
SArray *topics; // SArray<SMqConsumerTopic>
|
SArray *topics; // SArray<SMqConsumerTopic>
|
||||||
SHashObj *topicHash;
|
SHashObj *topicHash; //SHashObj<SMqConsumerTopic>
|
||||||
} SMqConsumerObj;
|
} SMqConsumerObj;
|
||||||
|
|
||||||
typedef struct SMqSubConsumerObj {
|
typedef struct SMqSubConsumerObj {
|
||||||
|
|
|
@ -204,34 +204,37 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
|
||||||
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
char *msgStr = pMsg->rpcMsg.pCont;
|
char *msgStr = pMsg->rpcMsg.pCont;
|
||||||
SCMSubscribeReq *pSubscribe;
|
SCMSubscribeReq subscribe;
|
||||||
tDeserializeSCMSubscribeReq(msgStr, pSubscribe);
|
tDeserializeSCMSubscribeReq(msgStr, &subscribe);
|
||||||
int64_t consumerId = pSubscribe->consumerId;
|
int64_t consumerId = subscribe.consumerId;
|
||||||
char *consumerGroup = pSubscribe->consumerGroup;
|
char *consumerGroup = subscribe.consumerGroup;
|
||||||
int32_t cgroupLen = strlen(consumerGroup);
|
int32_t cgroupLen = strlen(consumerGroup);
|
||||||
|
|
||||||
SArray *newSub = NULL;
|
SArray *newSub = NULL;
|
||||||
int newTopicNum = pSubscribe->topicNum;
|
int newTopicNum = subscribe.topicNum;
|
||||||
if (newTopicNum) {
|
if (newTopicNum) {
|
||||||
newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
|
newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic));
|
||||||
}
|
}
|
||||||
|
SMqConsumerTopic *pConsumerTopics = calloc(newTopicNum, sizeof(SMqConsumerTopic));
|
||||||
|
if (pConsumerTopics == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
for (int i = 0; i < newTopicNum; i++) {
|
for (int i = 0; i < newTopicNum; i++) {
|
||||||
char *newTopicName = taosArrayGetP(newSub, i);
|
char *newTopicName = taosArrayGetP(newSub, i);
|
||||||
SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
|
SMqConsumerTopic *pConsumerTopic = &pConsumerTopics[i];
|
||||||
if (pConsumerTopic == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// TODO: free
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
strcpy(pConsumerTopic->name, newTopicName);
|
strcpy(pConsumerTopic->name, newTopicName);
|
||||||
pConsumerTopic->vgroups = tdListNew(sizeof(int64_t));
|
pConsumerTopic->vgroups = tdListNew(sizeof(int64_t));
|
||||||
taosArrayPush(newSub, pConsumerTopic);
|
|
||||||
free(pConsumerTopic);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayAddBatch(newSub, pConsumerTopics, newTopicNum);
|
||||||
|
free(pConsumerTopics);
|
||||||
taosArraySortString(newSub, taosArrayCompareString);
|
taosArraySortString(newSub, taosArrayCompareString);
|
||||||
|
|
||||||
SArray *oldSub = NULL;
|
SArray *oldSub = NULL;
|
||||||
int oldTopicNum = 0;
|
int oldTopicNum = 0;
|
||||||
|
// create consumer if not exist
|
||||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||||
if (pConsumer == NULL) {
|
if (pConsumer == NULL) {
|
||||||
// create consumer
|
// create consumer
|
||||||
|
@ -249,6 +252,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
|
//TODO: free memory
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -286,6 +290,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pOldTopic != NULL) {
|
if (pOldTopic != NULL) {
|
||||||
|
//cancel subscribe of that old topic
|
||||||
ASSERT(pNewTopic == NULL);
|
ASSERT(pNewTopic == NULL);
|
||||||
char *oldTopicName = pOldTopic->name;
|
char *oldTopicName = pOldTopic->name;
|
||||||
SList *vgroups = pOldTopic->vgroups;
|
SList *vgroups = pOldTopic->vgroups;
|
||||||
|
@ -298,13 +303,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
|
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
|
||||||
while ((pn = tdListNext(&iter)) != NULL) {
|
while ((pn = tdListNext(&iter)) != NULL) {
|
||||||
int32_t vgId = *(int64_t *)pn->data;
|
int32_t vgId = *(int64_t *)pn->data;
|
||||||
|
// acquire and get epset
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||||
// TODO release
|
// TODO what time to release?
|
||||||
if (pVgObj == NULL) {
|
if (pVgObj == NULL) {
|
||||||
// TODO handle error
|
// TODO handle error
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
// acquire and get epset
|
//build reset msg
|
||||||
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
|
void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup);
|
||||||
// TODO:serialize
|
// TODO:serialize
|
||||||
if (pMsg == NULL) {
|
if (pMsg == NULL) {
|
||||||
|
@ -323,10 +329,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
//delete data in mnode
|
||||||
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
|
taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen);
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
|
||||||
} else if (pNewTopic != NULL) {
|
} else if (pNewTopic != NULL) {
|
||||||
|
// save subscribe info to mnode
|
||||||
ASSERT(pOldTopic == NULL);
|
ASSERT(pOldTopic == NULL);
|
||||||
|
|
||||||
char *newTopicName = pNewTopic->name;
|
char *newTopicName = pNewTopic->name;
|
||||||
|
@ -351,6 +359,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
// add into cgroups
|
// add into cgroups
|
||||||
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
|
taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup));
|
||||||
}
|
}
|
||||||
|
/*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/
|
||||||
|
|
||||||
// put the consumer into list
|
// put the consumer into list
|
||||||
// rebalance will be triggered by timer
|
// rebalance will be triggered by timer
|
||||||
|
|
|
@ -357,10 +357,13 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
taosArrayDestroyEx(pArray, tFreeClientHbReq);
|
||||||
|
|
||||||
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
|
int32_t tlen = tSerializeSClientHbBatchRsp(NULL, &batchRsp);
|
||||||
void* buf = rpcMallocCont(tlen);
|
void* buf = rpcMallocCont(tlen);
|
||||||
void* abuf = buf;
|
void* abuf = buf;
|
||||||
tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
|
tSerializeSClientHbBatchRsp(&abuf, &batchRsp);
|
||||||
|
taosArrayDestroy(batchRsp.rsps);
|
||||||
pReq->contLen = tlen;
|
pReq->contLen = tlen;
|
||||||
pReq->pCont = buf;
|
pReq->pCont = buf;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -69,6 +69,17 @@ static void mndTransReExecute(void *param, void *tmrId) {
|
||||||
taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
|
taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void mndCalMqRebalance(void* param, void* tmrId) {
|
||||||
|
SMnode* pMnode = param;
|
||||||
|
if (mndIsMaster(pMnode)) {
|
||||||
|
// iterate cgroup, cal rebalance
|
||||||
|
// sync with raft
|
||||||
|
// write sdb
|
||||||
|
}
|
||||||
|
|
||||||
|
taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->transTimer);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndInitTimer(SMnode *pMnode) {
|
static int32_t mndInitTimer(SMnode *pMnode) {
|
||||||
if (pMnode->timer == NULL) {
|
if (pMnode->timer == NULL) {
|
||||||
pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
|
pMnode->timer = taosTmrInit(5000, 200, 3600000, "MND");
|
||||||
|
|
|
@ -32,7 +32,7 @@ typedef struct SDataSinkManager {
|
||||||
} SDataSinkManager;
|
} SDataSinkManager;
|
||||||
|
|
||||||
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
|
typedef int32_t (*FPutDataBlock)(struct SDataSinkHandle* pHandle, const SInputData* pInput, bool* pContinue);
|
||||||
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, int64_t useconds);
|
typedef void (*FEndPut)(struct SDataSinkHandle* pHandle, uint64_t useconds);
|
||||||
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd);
|
typedef void (*FGetDataLength)(struct SDataSinkHandle* pHandle, int32_t* pLen, bool* pQueryEnd);
|
||||||
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
|
typedef int32_t (*FGetDataBlock)(struct SDataSinkHandle* pHandle, SOutputData* pOutput);
|
||||||
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
|
typedef int32_t (*FDestroyDataSinker)(struct SDataSinkHandle* pHandle);
|
||||||
|
|
|
@ -44,7 +44,7 @@ typedef struct SDataDispatchHandle {
|
||||||
SDataDispatchBuf nextOutput;
|
SDataDispatchBuf nextOutput;
|
||||||
int32_t status;
|
int32_t status;
|
||||||
bool queryEnd;
|
bool queryEnd;
|
||||||
int64_t useconds;
|
uint64_t useconds;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
} SDataDispatchHandle;
|
} SDataDispatchHandle;
|
||||||
|
|
||||||
|
@ -158,7 +158,7 @@ static int32_t putDataBlock(SDataSinkHandle* pHandle, const SInputData* pInput,
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void endPut(struct SDataSinkHandle* pHandle, int64_t useconds) {
|
static void endPut(struct SDataSinkHandle* pHandle, uint64_t useconds) {
|
||||||
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
SDataDispatchHandle* pDispatcher = (SDataDispatchHandle*)pHandle;
|
||||||
pthread_mutex_lock(&pDispatcher->mutex);
|
pthread_mutex_lock(&pDispatcher->mutex);
|
||||||
pDispatcher->queryEnd = true;
|
pDispatcher->queryEnd = true;
|
||||||
|
|
|
@ -37,7 +37,7 @@ int32_t dsPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* pC
|
||||||
return pHandleImpl->fPut(pHandleImpl, pInput, pContinue);
|
return pHandleImpl->fPut(pHandleImpl, pInput, pContinue);
|
||||||
}
|
}
|
||||||
|
|
||||||
void dsEndPut(DataSinkHandle handle, int64_t useconds) {
|
void dsEndPut(DataSinkHandle handle, uint64_t useconds) {
|
||||||
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
SDataSinkHandle* pHandleImpl = (SDataSinkHandle*)handle;
|
||||||
return pHandleImpl->fEndPut(pHandleImpl, useconds);
|
return pHandleImpl->fEndPut(pHandleImpl, useconds);
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,7 +68,7 @@ void freeParam(STaskParam *param) {
|
||||||
tfree(param->prevResult);
|
tfree(param->prevResult);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo) {
|
int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
|
||||||
assert(tsdb != NULL && pSubplan != NULL);
|
assert(tsdb != NULL && pSubplan != NULL);
|
||||||
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
|
||||||
|
|
||||||
|
@ -85,6 +85,8 @@ int32_t qCreateExecTask(void* tsdb, int32_t vgId, SSubplan* pSubplan, qTaskInfo_
|
||||||
|
|
||||||
code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle);
|
code = dsCreateDataSinker(pSubplan->pDataSink, &(*pTask)->dsHandle);
|
||||||
|
|
||||||
|
*handle = (*pTask)->dsHandle;
|
||||||
|
|
||||||
_error:
|
_error:
|
||||||
// if failed to add ref for all tables in this query, abort current query
|
// if failed to add ref for all tables in this query, abort current query
|
||||||
return code;
|
return code;
|
||||||
|
@ -135,16 +137,18 @@ int waitMoment(SQInfo* pQInfo){
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
|
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
|
||||||
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
|
||||||
int64_t threadId = taosGetSelfPthreadId();
|
int64_t threadId = taosGetSelfPthreadId();
|
||||||
|
|
||||||
|
*pRes = NULL;
|
||||||
|
|
||||||
int64_t curOwner = 0;
|
int64_t curOwner = 0;
|
||||||
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
|
||||||
qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
|
qError("QInfo:0x%" PRIx64 "-%p qhandle is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
|
||||||
(void*)curOwner);
|
(void*)curOwner);
|
||||||
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
|
||||||
return NULL;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTaskInfo->cost.start == 0) {
|
if (pTaskInfo->cost.start == 0) {
|
||||||
|
@ -153,7 +157,7 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
|
||||||
|
|
||||||
if (isTaskKilled(pTaskInfo)) {
|
if (isTaskKilled(pTaskInfo)) {
|
||||||
qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo));
|
qDebug("QInfo:0x%" PRIx64 " it is already killed, abort", GET_TASKID(pTaskInfo));
|
||||||
return NULL;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv;
|
// STaskRuntimeEnv* pRuntimeEnv = &pTaskInfo->runtimeEnv;
|
||||||
|
@ -170,7 +174,7 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
|
||||||
pTaskInfo->code = ret;
|
pTaskInfo->code = ret;
|
||||||
qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
|
qDebug("QInfo:0x%" PRIx64 " query abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
|
||||||
tstrerror(pTaskInfo->code));
|
tstrerror(pTaskInfo->code));
|
||||||
return NULL;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo));
|
qDebug("QInfo:0x%" PRIx64 " query task is launched", GET_TASKID(pTaskInfo));
|
||||||
|
@ -179,21 +183,21 @@ SSDataBlock* qExecTask(qTaskInfo_t tinfo, DataSinkHandle* handle) {
|
||||||
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
|
||||||
int64_t st = 0;
|
int64_t st = 0;
|
||||||
|
|
||||||
if (handle) {
|
|
||||||
*handle = pTaskInfo->dsHandle;
|
|
||||||
}
|
|
||||||
|
|
||||||
st = taosGetTimestampUs();
|
st = taosGetTimestampUs();
|
||||||
SSDataBlock* pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
|
*pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
|
||||||
|
|
||||||
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
|
pTaskInfo->cost.elapsedTime += (taosGetTimestampUs() - st);
|
||||||
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
|
||||||
|
|
||||||
|
if (NULL == *pRes) {
|
||||||
|
*useconds = pTaskInfo->cost.elapsedTime;
|
||||||
|
}
|
||||||
|
|
||||||
qDebug("QInfo:0x%" PRIx64 " query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d",
|
qDebug("QInfo:0x%" PRIx64 " query paused, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d",
|
||||||
GET_TASKID(pTaskInfo), 0, 0L, 0);
|
GET_TASKID(pTaskInfo), 0, 0L, 0);
|
||||||
|
|
||||||
atomic_store_64(&pTaskInfo->owner, 0);
|
atomic_store_64(&pTaskInfo->owner, 0);
|
||||||
return pRes;
|
return pTaskInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
|
int32_t qRetrieveQueryResultInfo(qTaskInfo_t qinfo, bool* buildRes, void* pRspContext) {
|
||||||
|
|
|
@ -217,5 +217,6 @@ TEST(testCase, build_executor_tree_Test) {
|
||||||
"}";
|
"}";
|
||||||
|
|
||||||
SExecTaskInfo* pTaskInfo = nullptr;
|
SExecTaskInfo* pTaskInfo = nullptr;
|
||||||
int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo);
|
DataSinkHandle sinkHandle = nullptr;
|
||||||
|
int32_t code = qCreateExecTask((void*) 1, 2, NULL, (void**) &pTaskInfo, &sinkHandle);
|
||||||
}
|
}
|
|
@ -458,6 +458,37 @@ _return:
|
||||||
QW_RET(code);
|
QW_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t qwExecTask(QW_FPARAMS_DEF, qTaskInfo_t taskHandle, DataSinkHandle sinkHandle) {
|
||||||
|
int32_t code = 0;
|
||||||
|
bool qcontinue = true;
|
||||||
|
SSDataBlock* pRes = NULL;
|
||||||
|
uint64_t useconds = 0;
|
||||||
|
|
||||||
|
while (qcontinue) {
|
||||||
|
code = qExecTask(taskHandle, &pRes, &useconds);
|
||||||
|
if (code) {
|
||||||
|
QW_TASK_ELOG("qExecTask failed, code:%x", code);
|
||||||
|
QW_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == pRes) {
|
||||||
|
QW_TASK_DLOG("query done, useconds:%"PRIu64, useconds);
|
||||||
|
dsEndPut(sinkHandle, useconds);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
SInputData inputData = {.pData = pRes, .pTableRetrieveTsMap = NULL};
|
||||||
|
code = dsPutDataBlock(sinkHandle, &inputData, &qcontinue);
|
||||||
|
if (code) {
|
||||||
|
QW_TASK_ELOG("dsPutDataBlock failed, code:%x", code);
|
||||||
|
QW_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
QW_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
|
int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void **rspMsg, SOutputData *pOutput) {
|
||||||
|
@ -733,7 +764,9 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
}
|
}
|
||||||
|
|
||||||
qTaskInfo_t pTaskInfo = NULL;
|
qTaskInfo_t pTaskInfo = NULL;
|
||||||
code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo);
|
DataSinkHandle sinkHandle = NULL;
|
||||||
|
|
||||||
|
code = qCreateExecTask(qwMsg->node, 0, (struct SSubplan *)plan, &pTaskInfo, &sinkHandle);
|
||||||
if (code) {
|
if (code) {
|
||||||
QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
|
QW_TASK_ELOG("qCreateExecTask failed, code:%x", code);
|
||||||
QW_ERR_JRET(code);
|
QW_ERR_JRET(code);
|
||||||
|
@ -743,12 +776,7 @@ int32_t qwProcessQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
|
|
||||||
queryRsped = true;
|
queryRsped = true;
|
||||||
|
|
||||||
DataSinkHandle sinkHandle = NULL;
|
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), pTaskInfo, sinkHandle));
|
||||||
SSDataBlock* pRes = qExecTask(pTaskInfo, &sinkHandle);
|
|
||||||
if (code) {
|
|
||||||
QW_TASK_ELOG("qExecTask failed, code:%x", code);
|
|
||||||
QW_ERR_JRET(code);
|
|
||||||
}
|
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
|
@ -840,11 +868,7 @@ int32_t qwProcessCQuery(SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t
|
||||||
qTaskInfo_t taskHandle = ctx->taskHandle;
|
qTaskInfo_t taskHandle = ctx->taskHandle;
|
||||||
DataSinkHandle sinkHandle = ctx->sinkHandle;
|
DataSinkHandle sinkHandle = ctx->sinkHandle;
|
||||||
|
|
||||||
code = qExecTask(taskHandle, &sinkHandle);
|
QW_ERR_JRET(qwExecTask(QW_FPARAMS(), taskHandle, sinkHandle));
|
||||||
if (code) {
|
|
||||||
QW_TASK_ELOG("qExecTask failed, code:%x", code);
|
|
||||||
QW_ERR_JRET(code);
|
|
||||||
}
|
|
||||||
|
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY);
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CQUERY);
|
||||||
|
|
||||||
|
|
|
@ -412,6 +412,8 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
||||||
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
|
SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, addNum:%d, errno:%d", addNum, errno);
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
++addNum;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -792,6 +794,11 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch
|
||||||
if (rspCode != TSDB_CODE_SUCCESS) {
|
if (rspCode != TSDB_CODE_SUCCESS) {
|
||||||
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
|
SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SShellSubmitRsp *rsp = (SShellSubmitRsp *)msg;
|
||||||
|
if (rsp) {
|
||||||
|
pJob->resNumOfRows += rsp->affectedRows;
|
||||||
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
SCH_ERR_RET(schProcessOnTaskSuccess(pJob, pTask));
|
||||||
|
@ -1355,9 +1362,9 @@ int32_t scheduleExecJob(void *transport, SArray *nodeList, SQueryDag* pDag, stru
|
||||||
|
|
||||||
SSchJob *job = NULL;
|
SSchJob *job = NULL;
|
||||||
|
|
||||||
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, &job, true));
|
SCH_ERR_RET(schExecJobImpl(transport, nodeList, pDag, pJob, true));
|
||||||
|
|
||||||
*pJob = job;
|
job = *pJob;
|
||||||
|
|
||||||
pRes->code = atomic_load_32(&job->errCode);
|
pRes->code = atomic_load_32(&job->errCode);
|
||||||
pRes->numOfRows = job->resNumOfRows;
|
pRes->numOfRows = job->resNumOfRows;
|
||||||
|
|
|
@ -34,10 +34,12 @@
|
||||||
#include "stub.h"
|
#include "stub.h"
|
||||||
#include "addr_any.h"
|
#include "addr_any.h"
|
||||||
|
|
||||||
|
|
||||||
namespace {
|
namespace {
|
||||||
|
|
||||||
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
|
extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode);
|
||||||
|
|
||||||
|
|
||||||
void schtInitLogFile() {
|
void schtInitLogFile() {
|
||||||
const char *defaultLogFileNamePrefix = "taoslog";
|
const char *defaultLogFileNamePrefix = "taoslog";
|
||||||
const int32_t maxLogFileNum = 10;
|
const int32_t maxLogFileNum = 10;
|
||||||
|
@ -113,9 +115,9 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
||||||
dag->queryId = qId;
|
dag->queryId = qId;
|
||||||
dag->numOfSubplans = 2;
|
dag->numOfSubplans = 2;
|
||||||
dag->pSubplans = taosArrayInit(1, POINTER_BYTES);
|
dag->pSubplans = taosArrayInit(1, POINTER_BYTES);
|
||||||
SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan));
|
SArray *inserta = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
|
||||||
|
|
||||||
SSubplan insertPlan[2] = {0};
|
SSubplan *insertPlan = (SSubplan *)calloc(2, sizeof(SSubplan));
|
||||||
|
|
||||||
insertPlan[0].id.queryId = qId;
|
insertPlan[0].id.queryId = qId;
|
||||||
insertPlan[0].id.templateId = 0x0000000000000003;
|
insertPlan[0].id.templateId = 0x0000000000000003;
|
||||||
|
@ -131,6 +133,7 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
||||||
insertPlan[0].pParents = NULL;
|
insertPlan[0].pParents = NULL;
|
||||||
insertPlan[0].pNode = NULL;
|
insertPlan[0].pNode = NULL;
|
||||||
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
||||||
|
insertPlan[0].msgType = TDMT_VND_SUBMIT;
|
||||||
|
|
||||||
insertPlan[1].id.queryId = qId;
|
insertPlan[1].id.queryId = qId;
|
||||||
insertPlan[1].id.templateId = 0x0000000000000003;
|
insertPlan[1].id.templateId = 0x0000000000000003;
|
||||||
|
@ -146,10 +149,11 @@ void schtBuildInsertDag(SQueryDag *dag) {
|
||||||
insertPlan[1].pParents = NULL;
|
insertPlan[1].pParents = NULL;
|
||||||
insertPlan[1].pNode = NULL;
|
insertPlan[1].pNode = NULL;
|
||||||
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink));
|
||||||
|
insertPlan[1].msgType = TDMT_VND_SUBMIT;
|
||||||
|
|
||||||
|
taosArrayPush(inserta, &insertPlan);
|
||||||
taosArrayPush(inserta, &insertPlan[0]);
|
insertPlan += 1;
|
||||||
taosArrayPush(inserta, &insertPlan[1]);
|
taosArrayPush(inserta, &insertPlan);
|
||||||
|
|
||||||
taosArrayPush(dag->pSubplans, &inserta);
|
taosArrayPush(dag->pSubplans, &inserta);
|
||||||
}
|
}
|
||||||
|
@ -210,6 +214,24 @@ void schtSetRpcSendRequest() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void schtSetAsyncSendMsgToServer() {
|
||||||
|
static Stub stub;
|
||||||
|
stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer);
|
||||||
|
{
|
||||||
|
AddrAny any("libtransport.so");
|
||||||
|
std::map<std::string,void*> result;
|
||||||
|
any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result);
|
||||||
|
for (const auto& f : result) {
|
||||||
|
stub.set(f.second, schtAsyncSendMsgToServer);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void *schtSendRsp(void *param) {
|
void *schtSendRsp(void *param) {
|
||||||
SSchJob *job = NULL;
|
SSchJob *job = NULL;
|
||||||
|
@ -230,7 +252,7 @@ void *schtSendRsp(void *param) {
|
||||||
|
|
||||||
SShellSubmitRsp rsp = {0};
|
SShellSubmitRsp rsp = {0};
|
||||||
rsp.affectedRows = 10;
|
rsp.affectedRows = 10;
|
||||||
schHandleResponseMsg(job, task, TDMT_VND_SUBMIT, (char *)&rsp, sizeof(rsp), 0);
|
schHandleResponseMsg(job, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||||
|
|
||||||
pIter = taosHashIterate(job->execTasks, pIter);
|
pIter = taosHashIterate(job->execTasks, pIter);
|
||||||
}
|
}
|
||||||
|
@ -238,6 +260,23 @@ void *schtSendRsp(void *param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void *schtCreateFetchRspThread(void *param) {
|
||||||
|
struct SSchJob* job = (struct SSchJob*)param;
|
||||||
|
|
||||||
|
sleep(1);
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp));
|
||||||
|
rsp->completed = 1;
|
||||||
|
rsp->numOfRows = 10;
|
||||||
|
code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(rsp), 0);
|
||||||
|
|
||||||
|
assert(code == 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
struct SSchJob *pInsertJob = NULL;
|
struct SSchJob *pInsertJob = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,6 +305,7 @@ TEST(queryTest, normalCase) {
|
||||||
|
|
||||||
schtSetPlanToString();
|
schtSetPlanToString();
|
||||||
schtSetExecNode();
|
schtSetExecNode();
|
||||||
|
schtSetAsyncSendMsgToServer();
|
||||||
|
|
||||||
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
|
code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
@ -276,7 +316,7 @@ TEST(queryTest, normalCase) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
|
||||||
SQueryTableRsp rsp = {0};
|
SQueryTableRsp rsp = {0};
|
||||||
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
|
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||||
|
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
pIter = taosHashIterate(job->execTasks, pIter);
|
pIter = taosHashIterate(job->execTasks, pIter);
|
||||||
|
@ -287,8 +327,8 @@ TEST(queryTest, normalCase) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
|
||||||
SResReadyRsp rsp = {0};
|
SResReadyRsp rsp = {0};
|
||||||
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
|
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||||
|
printf("code:%d", code);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
pIter = taosHashIterate(job->execTasks, pIter);
|
pIter = taosHashIterate(job->execTasks, pIter);
|
||||||
}
|
}
|
||||||
|
@ -298,7 +338,7 @@ TEST(queryTest, normalCase) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
|
||||||
SQueryTableRsp rsp = {0};
|
SQueryTableRsp rsp = {0};
|
||||||
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY, (char *)&rsp, sizeof(rsp), 0);
|
code = schHandleResponseMsg(job, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||||
|
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
pIter = taosHashIterate(job->execTasks, pIter);
|
pIter = taosHashIterate(job->execTasks, pIter);
|
||||||
|
@ -309,22 +349,19 @@ TEST(queryTest, normalCase) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
|
||||||
SResReadyRsp rsp = {0};
|
SResReadyRsp rsp = {0};
|
||||||
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY, (char *)&rsp, sizeof(rsp), 0);
|
code = schHandleResponseMsg(job, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
pIter = taosHashIterate(job->execTasks, pIter);
|
pIter = taosHashIterate(job->execTasks, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
SRetrieveTableRsp rsp = {0};
|
pthread_attr_t thattr;
|
||||||
rsp.completed = 1;
|
pthread_attr_init(&thattr);
|
||||||
rsp.numOfRows = 10;
|
|
||||||
code = schHandleResponseMsg(job, NULL, TDMT_VND_FETCH, (char *)&rsp, sizeof(rsp), 0);
|
|
||||||
|
|
||||||
ASSERT_EQ(code, 0);
|
|
||||||
|
|
||||||
|
pthread_t thread1;
|
||||||
|
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, job);
|
||||||
|
|
||||||
void *data = NULL;
|
void *data = NULL;
|
||||||
|
|
||||||
code = scheduleFetchRows(job, &data);
|
code = scheduleFetchRows(job, &data);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
@ -340,6 +377,8 @@ TEST(queryTest, normalCase) {
|
||||||
scheduleFreeJob(pJob);
|
scheduleFreeJob(pJob);
|
||||||
|
|
||||||
schtFreeQueryDag(&dag);
|
schtFreeQueryDag(&dag);
|
||||||
|
|
||||||
|
schedulerDestroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -369,6 +408,7 @@ TEST(insertTest, normalCase) {
|
||||||
schtBuildInsertDag(&dag);
|
schtBuildInsertDag(&dag);
|
||||||
|
|
||||||
schtSetPlanToString();
|
schtSetPlanToString();
|
||||||
|
schtSetAsyncSendMsgToServer();
|
||||||
|
|
||||||
pthread_attr_t thattr;
|
pthread_attr_t thattr;
|
||||||
pthread_attr_init(&thattr);
|
pthread_attr_init(&thattr);
|
||||||
|
@ -382,6 +422,8 @@ TEST(insertTest, normalCase) {
|
||||||
ASSERT_EQ(res.numOfRows, 20);
|
ASSERT_EQ(res.numOfRows, 20);
|
||||||
|
|
||||||
scheduleFreeJob(pInsertJob);
|
scheduleFreeJob(pInsertJob);
|
||||||
|
|
||||||
|
schedulerDestroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(multiThread, forceFree) {
|
TEST(multiThread, forceFree) {
|
||||||
|
|
|
@ -13,19 +13,20 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#ifndef TD_TFSINT_H
|
#ifndef _TD_TFS_INT_H_
|
||||||
#define TD_TFSINT_H
|
#define _TD_TFS_INT_H_
|
||||||
|
|
||||||
#include "tlog.h"
|
#include "os.h"
|
||||||
#include "tglobal.h"
|
|
||||||
#include "tfs.h"
|
#include "taosdef.h"
|
||||||
|
#include "taoserror.h"
|
||||||
#include "tcoding.h"
|
#include "tcoding.h"
|
||||||
|
#include "tfs.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "thash.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
extern int32_t fsDebugFlag;
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
extern int fsDebugFlag;
|
|
||||||
|
|
||||||
// For debug purpose
|
// For debug purpose
|
||||||
#define fFatal(...) { if (fsDebugFlag & DEBUG_FATAL) { taosPrintLog("TFS FATAL ", 255, __VA_ARGS__); }}
|
#define fFatal(...) { if (fsDebugFlag & DEBUG_FATAL) { taosPrintLog("TFS FATAL ", 255, __VA_ARGS__); }}
|
||||||
|
@ -38,60 +39,44 @@ extern int fsDebugFlag;
|
||||||
// Global Definitions
|
// Global Definitions
|
||||||
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
|
#define TFS_MIN_DISK_FREE_SIZE 50 * 1024 * 1024
|
||||||
|
|
||||||
// tdisk.c ======================================================
|
|
||||||
typedef struct {
|
|
||||||
int64_t size;
|
|
||||||
int64_t used;
|
|
||||||
int64_t free;
|
|
||||||
} SDiskMeta;
|
|
||||||
|
|
||||||
typedef struct SDisk {
|
typedef struct SDisk {
|
||||||
int level;
|
int32_t level;
|
||||||
int id;
|
int32_t id;
|
||||||
char dir[TSDB_FILENAME_LEN];
|
char *path;
|
||||||
SDiskMeta dmeta;
|
SDiskSize size;
|
||||||
} SDisk;
|
} SDisk;
|
||||||
|
|
||||||
#define DISK_LEVEL(pd) ((pd)->level)
|
|
||||||
#define DISK_ID(pd) ((pd)->id)
|
|
||||||
#define DISK_DIR(pd) ((pd)->dir)
|
|
||||||
#define DISK_META(pd) ((pd)->dmeta)
|
|
||||||
#define DISK_SIZE(pd) ((pd)->dmeta.size)
|
|
||||||
#define DISK_USED_SIZE(pd) ((pd)->dmeta.used)
|
|
||||||
#define DISK_FREE_SIZE(pd) ((pd)->dmeta.free)
|
|
||||||
|
|
||||||
SDisk *tfsNewDisk(int level, int id, const char *dir);
|
|
||||||
SDisk *tfsFreeDisk(SDisk *pDisk);
|
|
||||||
int tfsUpdateDiskInfo(SDisk *pDisk);
|
|
||||||
|
|
||||||
// ttier.c ======================================================
|
|
||||||
|
|
||||||
typedef struct STier {
|
typedef struct STier {
|
||||||
pthread_spinlock_t lock;
|
pthread_spinlock_t lock;
|
||||||
int level;
|
int32_t level;
|
||||||
int16_t ndisk; // # of disks mounted to this tier
|
int16_t nextid; // next disk id to allocate
|
||||||
int16_t nextid; // next disk id to allocate
|
int16_t ndisk; // # of disks mounted to this tier
|
||||||
STierMeta tmeta;
|
int16_t nAvailDisks; // # of Available disks
|
||||||
SDisk * disks[TSDB_MAX_DISKS_PER_TIER];
|
SDisk *disks[TSDB_MAX_DISKS_PER_TIER];
|
||||||
|
SDiskSize size;
|
||||||
} STier;
|
} STier;
|
||||||
|
|
||||||
#define TIER_LEVEL(pt) ((pt)->level)
|
#define TIER_LEVEL(pt) ((pt)->level)
|
||||||
#define TIER_NDISKS(pt) ((pt)->ndisk)
|
#define TIER_NDISKS(pt) ((pt)->ndisk)
|
||||||
#define TIER_SIZE(pt) ((pt)->tmeta.size)
|
#define TIER_SIZE(pt) ((pt)->tmeta.size)
|
||||||
#define TIER_FREE_SIZE(pt) ((pt)->tmeta.free)
|
#define TIER_FREE_SIZE(pt) ((pt)->tmeta.free)
|
||||||
#define TIER_AVAIL_DISKS(pt) ((pt)->tmeta.nAvailDisks)
|
|
||||||
#define DISK_AT_TIER(pt, id) ((pt)->disks[id])
|
|
||||||
|
|
||||||
int tfsInitTier(STier *pTier, int level);
|
#define DISK_AT_TIER(pt, id) ((pt)->disks[id])
|
||||||
void tfsDestroyTier(STier *pTier);
|
#define DISK_DIR(pd) ((pd)->path)
|
||||||
SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg);
|
|
||||||
void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta);
|
SDisk *tfsNewDisk(int32_t level, int32_t id, const char *dir);
|
||||||
int tfsAllocDiskOnTier(STier *pTier);
|
SDisk *tfsFreeDisk(SDisk *pDisk);
|
||||||
void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta);
|
int32_t tfsUpdateDiskSize(SDisk *pDisk);
|
||||||
void tfsPosNextId(STier *pTier);
|
|
||||||
|
int32_t tfsInitTier(STier *pTier, int32_t level);
|
||||||
|
void tfsDestroyTier(STier *pTier);
|
||||||
|
SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg);
|
||||||
|
void tfsUpdateTierSize(STier *pTier);
|
||||||
|
int32_t tfsAllocDiskOnTier(STier *pTier);
|
||||||
|
void tfsPosNextId(STier *pTier);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#endif
|
#endif /*_TD_TFS_INT_H_*/
|
|
@ -13,22 +13,17 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "os.h"
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "tfsInt.h"
|
||||||
#include "taosdef.h"
|
|
||||||
#include "taoserror.h"
|
|
||||||
#include "tfs.h"
|
|
||||||
#include "tfsint.h"
|
|
||||||
#include "thash.h"
|
|
||||||
|
|
||||||
#define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32)
|
#define TMPNAME_LEN (TSDB_FILENAME_LEN * 2 + 32)
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
pthread_spinlock_t lock;
|
pthread_spinlock_t lock;
|
||||||
SFSMeta meta;
|
SFSMeta meta;
|
||||||
int nlevel;
|
int32_t nlevel;
|
||||||
STier tiers[TSDB_MAX_TIERS];
|
STier tiers[TSDB_MAX_TIERS];
|
||||||
SHashObj * map; // name to did map
|
SHashObj *map; // name to did map
|
||||||
} SFS;
|
} SFS;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -52,21 +47,24 @@ static SFS tfs = {0};
|
||||||
static SFS *pfs = &tfs;
|
static SFS *pfs = &tfs;
|
||||||
|
|
||||||
// STATIC DECLARATION
|
// STATIC DECLARATION
|
||||||
static int tfsMount(SDiskCfg *pCfg);
|
static int32_t tfsMount(SDiskCfg *pCfg);
|
||||||
static int tfsCheck();
|
static int32_t tfsCheck();
|
||||||
static int tfsCheckAndFormatCfg(SDiskCfg *pCfg);
|
static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg);
|
||||||
static int tfsFormatDir(char *idir, char *odir);
|
static int32_t tfsFormatDir(char *idir, char *odir);
|
||||||
static SDisk *tfsGetDiskByID(SDiskID did);
|
static SDisk *tfsGetDiskByID(SDiskID did);
|
||||||
static SDisk *tfsGetDiskByName(const char *dir);
|
static SDisk *tfsGetDiskByName(const char *dir);
|
||||||
static int tfsOpendirImpl(TDIR *tdir);
|
static int32_t tfsOpendirImpl(TDIR *tdir);
|
||||||
static void tfsInitDiskIter(SDiskIter *pIter);
|
static void tfsInitDiskIter(SDiskIter *pIter);
|
||||||
static SDisk *tfsNextDisk(SDiskIter *pIter);
|
static SDisk *tfsNextDisk(SDiskIter *pIter);
|
||||||
|
|
||||||
// FS APIs ====================================
|
// FS APIs ====================================
|
||||||
int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
|
int32_t tfsInit(SDiskCfg *pDiskCfg, int32_t ndisk) {
|
||||||
ASSERT(ndisk > 0);
|
if (ndisk < 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
for (int level = 0; level < TSDB_MAX_TIERS; level++) {
|
for (int32_t level = 0; level < TSDB_MAX_TIERS; level++) {
|
||||||
if (tfsInitTier(TFS_TIER_AT(level), level) < 0) {
|
if (tfsInitTier(TFS_TIER_AT(level), level) < 0) {
|
||||||
while (true) {
|
while (true) {
|
||||||
level--;
|
level--;
|
||||||
|
@ -84,12 +82,12 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
|
||||||
pfs->map = taosHashInit(TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER * 2,
|
pfs->map = taosHashInit(TSDB_MAX_TIERS * TSDB_MAX_DISKS_PER_TIER * 2,
|
||||||
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
if (pfs->map == NULL) {
|
if (pfs->map == NULL) {
|
||||||
terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tfsCleanup();
|
tfsCleanup();
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int idisk = 0; idisk < ndisk; idisk++) {
|
for (int32_t idisk = 0; idisk < ndisk; idisk++) {
|
||||||
if (tfsMount(pDiskCfg + idisk) < 0) {
|
if (tfsMount(pDiskCfg + idisk) < 0) {
|
||||||
tfsCleanup();
|
tfsCleanup();
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -101,8 +99,8 @@ int tfsInit(SDiskCfg *pDiskCfg, int ndisk) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tfsUpdateInfo(NULL, NULL, 0);
|
tfsUpdateSize(NULL);
|
||||||
for (int level = 0; level < TFS_NLEVEL(); level++) {
|
for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
|
||||||
tfsPosNextId(TFS_TIER_AT(level));
|
tfsPosNextId(TFS_TIER_AT(level));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,32 +112,27 @@ void tfsCleanup() {
|
||||||
pfs->map = NULL;
|
pfs->map = NULL;
|
||||||
|
|
||||||
pthread_spin_destroy(&(pfs->lock));
|
pthread_spin_destroy(&(pfs->lock));
|
||||||
for (int level = 0; level < TFS_NLEVEL(); level++) {
|
for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
|
||||||
tfsDestroyTier(TFS_TIER_AT(level));
|
tfsDestroyTier(TFS_TIER_AT(level));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) {
|
void tfsUpdateSize(SFSMeta *pFSMeta) {
|
||||||
SFSMeta fsMeta;
|
SFSMeta fsMeta = {0};
|
||||||
STierMeta tierMeta;
|
SDiskSize size = {0};
|
||||||
|
|
||||||
if (pFSMeta == NULL) {
|
if (pFSMeta == NULL) {
|
||||||
pFSMeta = &fsMeta;
|
pFSMeta = &fsMeta;
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(pFSMeta, 0, sizeof(*pFSMeta));
|
memset(pFSMeta, 0, sizeof(SFSMeta));
|
||||||
|
|
||||||
for (int level = 0; level < TFS_NLEVEL(); level++) {
|
|
||||||
STierMeta *pTierMeta = &tierMeta;
|
|
||||||
if (tierMetas && level < numTiers) {
|
|
||||||
pTierMeta = tierMetas + level;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
|
||||||
STier *pTier = TFS_TIER_AT(level);
|
STier *pTier = TFS_TIER_AT(level);
|
||||||
tfsUpdateTierInfo(pTier, pTierMeta);
|
tfsUpdateTierSize(pTier);
|
||||||
pFSMeta->tsize += pTierMeta->size;
|
pFSMeta->total += pTier->size.total;
|
||||||
pFSMeta->avail += pTierMeta->free;
|
pFSMeta->avail += pTier->size.avail;
|
||||||
pFSMeta->used += pTierMeta->used;
|
pFSMeta->used += pTier->size.used;
|
||||||
}
|
}
|
||||||
|
|
||||||
tfsLock();
|
tfsLock();
|
||||||
|
@ -147,17 +140,9 @@ void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numTiers) {
|
||||||
tfsUnLock();
|
tfsUnLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
void tfsGetMeta(SFSMeta *pMeta) {
|
|
||||||
ASSERT(pMeta);
|
|
||||||
|
|
||||||
tfsLock();
|
|
||||||
*pMeta = pfs->meta;
|
|
||||||
tfsUnLock();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* Allocate an existing available tier level
|
/* Allocate an existing available tier level
|
||||||
*/
|
*/
|
||||||
void tfsAllocDisk(int expLevel, int *level, int *id) {
|
void tfsAllocDisk(int32_t expLevel, int32_t *level, int32_t *id) {
|
||||||
ASSERT(expLevel >= 0);
|
ASSERT(expLevel >= 0);
|
||||||
|
|
||||||
*level = expLevel;
|
*level = expLevel;
|
||||||
|
@ -182,10 +167,10 @@ void tfsAllocDisk(int expLevel, int *level, int *id) {
|
||||||
}
|
}
|
||||||
|
|
||||||
const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); }
|
const char *TFS_PRIMARY_PATH() { return DISK_DIR(TFS_PRIMARY_DISK()); }
|
||||||
const char *TFS_DISK_PATH(int level, int id) { return DISK_DIR(TFS_DISK_AT(level, id)); }
|
const char *TFS_DISK_PATH(int32_t level, int32_t id) { return DISK_DIR(TFS_DISK_AT(level, id)); }
|
||||||
|
|
||||||
// TFILE APIs ====================================
|
// TFILE APIs ====================================
|
||||||
void tfsInitFile(TFILE *pf, int level, int id, const char *bname) {
|
void tfsInitFile(TFILE *pf, int32_t level, int32_t id, const char *bname) {
|
||||||
ASSERT(TFS_IS_VALID_DISK(level, id));
|
ASSERT(TFS_IS_VALID_DISK(level, id));
|
||||||
|
|
||||||
SDisk *pDisk = TFS_DISK_AT(level, id);
|
SDisk *pDisk = TFS_DISK_AT(level, id);
|
||||||
|
@ -208,8 +193,8 @@ bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tfsEncodeFile(void **buf, TFILE *pf) {
|
int32_t tfsEncodeFile(void **buf, TFILE *pf) {
|
||||||
int tlen = 0;
|
int32_t tlen = 0;
|
||||||
|
|
||||||
tlen += taosEncodeVariantI32(buf, pf->level);
|
tlen += taosEncodeVariantI32(buf, pf->level);
|
||||||
tlen += taosEncodeVariantI32(buf, pf->id);
|
tlen += taosEncodeVariantI32(buf, pf->id);
|
||||||
|
@ -220,7 +205,7 @@ int tfsEncodeFile(void **buf, TFILE *pf) {
|
||||||
|
|
||||||
void *tfsDecodeFile(void *buf, TFILE *pf) {
|
void *tfsDecodeFile(void *buf, TFILE *pf) {
|
||||||
int32_t level, id;
|
int32_t level, id;
|
||||||
char * rname;
|
char *rname;
|
||||||
|
|
||||||
buf = taosDecodeVariantI32(buf, &(level));
|
buf = taosDecodeVariantI32(buf, &(level));
|
||||||
buf = taosDecodeVariantI32(buf, &(id));
|
buf = taosDecodeVariantI32(buf, &(id));
|
||||||
|
@ -247,7 +232,7 @@ void tfsdirname(const TFILE *pf, char *dest) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// DIR APIs ====================================
|
// DIR APIs ====================================
|
||||||
int tfsMkdirAt(const char *rname, int level, int id) {
|
int32_t tfsMkdirAt(const char *rname, int32_t level, int32_t id) {
|
||||||
SDisk *pDisk = TFS_DISK_AT(level, id);
|
SDisk *pDisk = TFS_DISK_AT(level, id);
|
||||||
char aname[TMPNAME_LEN];
|
char aname[TMPNAME_LEN];
|
||||||
|
|
||||||
|
@ -260,7 +245,7 @@ int tfsMkdirAt(const char *rname, int level, int id) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tfsMkdirRecurAt(const char *rname, int level, int id) {
|
int32_t tfsMkdirRecurAt(const char *rname, int32_t level, int32_t id) {
|
||||||
if (tfsMkdirAt(rname, level, id) < 0) {
|
if (tfsMkdirAt(rname, level, id) < 0) {
|
||||||
if (errno == ENOENT) {
|
if (errno == ENOENT) {
|
||||||
// Try to create upper
|
// Try to create upper
|
||||||
|
@ -293,10 +278,10 @@ int tfsMkdirRecurAt(const char *rname, int level, int id) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tfsMkdir(const char *rname) {
|
int32_t tfsMkdir(const char *rname) {
|
||||||
for (int level = 0; level < TFS_NLEVEL(); level++) {
|
for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
|
||||||
STier *pTier = TFS_TIER_AT(level);
|
STier *pTier = TFS_TIER_AT(level);
|
||||||
for (int id = 0; id < TIER_NDISKS(pTier); id++) {
|
for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) {
|
||||||
if (tfsMkdirAt(rname, level, id) < 0) {
|
if (tfsMkdirAt(rname, level, id) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -306,15 +291,15 @@ int tfsMkdir(const char *rname) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tfsRmdir(const char *rname) {
|
int32_t tfsRmdir(const char *rname) {
|
||||||
char aname[TMPNAME_LEN] = "\0";
|
char aname[TMPNAME_LEN] = "\0";
|
||||||
|
|
||||||
for (int level = 0; level < TFS_NLEVEL(); level++) {
|
for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
|
||||||
STier *pTier = TFS_TIER_AT(level);
|
STier *pTier = TFS_TIER_AT(level);
|
||||||
for (int id = 0; id < TIER_NDISKS(pTier); id++) {
|
for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) {
|
||||||
SDisk *pDisk = DISK_AT_TIER(pTier, id);
|
SDisk *pDisk = pTier->disks[id];
|
||||||
|
|
||||||
snprintf(aname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), rname);
|
snprintf(aname, TMPNAME_LEN, "%s%s%s", DISK_DIR(pDisk), TS_PATH_DELIMITER, rname);
|
||||||
|
|
||||||
taosRemoveDir(aname);
|
taosRemoveDir(aname);
|
||||||
}
|
}
|
||||||
|
@ -323,13 +308,14 @@ int tfsRmdir(const char *rname) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tfsRename(char *orname, char *nrname) {
|
#if 0
|
||||||
|
int32_t tfsRename(char *orname, char *nrname) {
|
||||||
char oaname[TMPNAME_LEN] = "\0";
|
char oaname[TMPNAME_LEN] = "\0";
|
||||||
char naname[TMPNAME_LEN] = "\0";
|
char naname[TMPNAME_LEN] = "\0";
|
||||||
|
|
||||||
for (int level = 0; level < pfs->nlevel; level++) {
|
for (int32_t level = 0; level < pfs->nlevel; level++) {
|
||||||
STier *pTier = TFS_TIER_AT(level);
|
STier *pTier = TFS_TIER_AT(level);
|
||||||
for (int id = 0; id < TIER_NDISKS(pTier); id++) {
|
for (int32_t id = 0; id < TIER_NDISKS(pTier); id++) {
|
||||||
SDisk *pDisk = DISK_AT_TIER(pTier, id);
|
SDisk *pDisk = DISK_AT_TIER(pTier, id);
|
||||||
|
|
||||||
snprintf(oaname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), orname);
|
snprintf(oaname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), orname);
|
||||||
|
@ -341,20 +327,21 @@ int tfsRename(char *orname, char *nrname) {
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
struct TDIR {
|
struct TDIR {
|
||||||
SDiskIter iter;
|
SDiskIter iter;
|
||||||
int level;
|
int32_t level;
|
||||||
int id;
|
int32_t id;
|
||||||
char dirname[TSDB_FILENAME_LEN];
|
char dirname[TSDB_FILENAME_LEN];
|
||||||
TFILE tfile;
|
TFILE tfile;
|
||||||
DIR * dir;
|
DIR *dir;
|
||||||
};
|
};
|
||||||
|
|
||||||
TDIR *tfsOpendir(const char *rname) {
|
TDIR *tfsOpendir(const char *rname) {
|
||||||
TDIR *tdir = (TDIR *)calloc(1, sizeof(*tdir));
|
TDIR *tdir = (TDIR *)calloc(1, sizeof(*tdir));
|
||||||
if (tdir == NULL) {
|
if (tdir == NULL) {
|
||||||
terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -407,9 +394,9 @@ void tfsClosedir(TDIR *tdir) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// private
|
// private
|
||||||
static int tfsMount(SDiskCfg *pCfg) {
|
static int32_t tfsMount(SDiskCfg *pCfg) {
|
||||||
SDiskID did;
|
SDiskID did;
|
||||||
SDisk * pDisk = NULL;
|
SDisk *pDisk = NULL;
|
||||||
|
|
||||||
if (tfsCheckAndFormatCfg(pCfg) < 0) return -1;
|
if (tfsCheckAndFormatCfg(pCfg) < 0) return -1;
|
||||||
|
|
||||||
|
@ -419,7 +406,7 @@ static int tfsMount(SDiskCfg *pCfg) {
|
||||||
fError("failed to mount disk %s to level %d since %s", pCfg->dir, pCfg->level, tstrerror(terrno));
|
fError("failed to mount disk %s to level %d since %s", pCfg->dir, pCfg->level, tstrerror(terrno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
did.id = DISK_ID(pDisk);
|
did.id = pDisk->id;
|
||||||
|
|
||||||
taosHashPut(pfs->map, (void *)(pCfg->dir), strnlen(pCfg->dir, TSDB_FILENAME_LEN), (void *)(&did), sizeof(did));
|
taosHashPut(pfs->map, (void *)(pCfg->dir), strnlen(pCfg->dir, TSDB_FILENAME_LEN), (void *)(&did), sizeof(did));
|
||||||
if (pfs->nlevel < pCfg->level + 1) pfs->nlevel = pCfg->level + 1;
|
if (pfs->nlevel < pCfg->level + 1) pfs->nlevel = pCfg->level + 1;
|
||||||
|
@ -427,7 +414,7 @@ static int tfsMount(SDiskCfg *pCfg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tfsCheckAndFormatCfg(SDiskCfg *pCfg) {
|
static int32_t tfsCheckAndFormatCfg(SDiskCfg *pCfg) {
|
||||||
char dirName[TSDB_FILENAME_LEN] = "\0";
|
char dirName[TSDB_FILENAME_LEN] = "\0";
|
||||||
struct stat pstat;
|
struct stat pstat;
|
||||||
|
|
||||||
|
@ -486,10 +473,10 @@ static int tfsCheckAndFormatCfg(SDiskCfg *pCfg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tfsFormatDir(char *idir, char *odir) {
|
static int32_t tfsFormatDir(char *idir, char *odir) {
|
||||||
wordexp_t wep = {0};
|
wordexp_t wep = {0};
|
||||||
|
|
||||||
int code = wordexp(idir, &wep, 0);
|
int32_t code = wordexp(idir, &wep, 0);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -507,14 +494,14 @@ static int tfsFormatDir(char *idir, char *odir) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tfsCheck() {
|
static int32_t tfsCheck() {
|
||||||
if (TFS_PRIMARY_DISK() == NULL) {
|
if (TFS_PRIMARY_DISK() == NULL) {
|
||||||
fError("no primary disk is set");
|
fError("no primary disk is set");
|
||||||
terrno = TSDB_CODE_FS_NO_PRIMARY_DISK;
|
terrno = TSDB_CODE_FS_NO_PRIMARY_DISK;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int level = 0; level < TFS_NLEVEL(); level++) {
|
for (int32_t level = 0; level < TFS_NLEVEL(); level++) {
|
||||||
if (TIER_NDISKS(TFS_TIER_AT(level)) == 0) {
|
if (TIER_NDISKS(TFS_TIER_AT(level)) == 0) {
|
||||||
fError("no disk at level %d", level);
|
fError("no disk at level %d", level);
|
||||||
terrno = TSDB_CODE_FS_NO_MOUNT_AT_TIER;
|
terrno = TSDB_CODE_FS_NO_MOUNT_AT_TIER;
|
||||||
|
@ -528,8 +515,8 @@ static int tfsCheck() {
|
||||||
static SDisk *tfsGetDiskByID(SDiskID did) { return TFS_DISK_AT(did.level, did.id); }
|
static SDisk *tfsGetDiskByID(SDiskID did) { return TFS_DISK_AT(did.level, did.id); }
|
||||||
static SDisk *tfsGetDiskByName(const char *dir) {
|
static SDisk *tfsGetDiskByName(const char *dir) {
|
||||||
SDiskID did;
|
SDiskID did;
|
||||||
SDisk * pDisk = NULL;
|
SDisk *pDisk = NULL;
|
||||||
void * pr = NULL;
|
void *pr = NULL;
|
||||||
|
|
||||||
pr = taosHashGet(pfs->map, (void *)dir, strnlen(dir, TSDB_FILENAME_LEN));
|
pr = taosHashGet(pfs->map, (void *)dir, strnlen(dir, TSDB_FILENAME_LEN));
|
||||||
if (pr == NULL) return NULL;
|
if (pr == NULL) return NULL;
|
||||||
|
@ -541,7 +528,7 @@ static SDisk *tfsGetDiskByName(const char *dir) {
|
||||||
return pDisk;
|
return pDisk;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tfsOpendirImpl(TDIR *tdir) {
|
static int32_t tfsOpendirImpl(TDIR *tdir) {
|
||||||
SDisk *pDisk = NULL;
|
SDisk *pDisk = NULL;
|
||||||
char adir[TMPNAME_LEN * 2] = "\0";
|
char adir[TMPNAME_LEN * 2] = "\0";
|
||||||
|
|
||||||
|
@ -554,10 +541,10 @@ static int tfsOpendirImpl(TDIR *tdir) {
|
||||||
pDisk = tfsNextDisk(&(tdir->iter));
|
pDisk = tfsNextDisk(&(tdir->iter));
|
||||||
if (pDisk == NULL) return 0;
|
if (pDisk == NULL) return 0;
|
||||||
|
|
||||||
tdir->level = DISK_LEVEL(pDisk);
|
tdir->level = pDisk->level;
|
||||||
tdir->id = DISK_ID(pDisk);
|
tdir->id = pDisk->id;
|
||||||
|
|
||||||
snprintf(adir, TMPNAME_LEN * 2, "%s/%s", DISK_DIR(pDisk), tdir->dirname);
|
snprintf(adir, TMPNAME_LEN * 2, "%s%s%s", pDisk->path, TS_PATH_DELIMITER,tdir->dirname);
|
||||||
tdir->dir = opendir(adir);
|
tdir->dir = opendir(adir);
|
||||||
if (tdir->dir != NULL) break;
|
if (tdir->dir != NULL) break;
|
||||||
}
|
}
|
||||||
|
@ -572,8 +559,8 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) {
|
||||||
|
|
||||||
if (pDisk == NULL) return NULL;
|
if (pDisk == NULL) return NULL;
|
||||||
|
|
||||||
int level = DISK_LEVEL(pDisk);
|
int32_t level = pDisk->level;
|
||||||
int id = DISK_ID(pDisk);
|
int32_t id = pDisk->id;
|
||||||
|
|
||||||
id++;
|
id++;
|
||||||
if (id < TIER_NDISKS(TFS_TIER_AT(level))) {
|
if (id < TIER_NDISKS(TFS_TIER_AT(level))) {
|
||||||
|
@ -596,21 +583,21 @@ static SDisk *tfsNextDisk(SDiskIter *pIter) {
|
||||||
// OTHER FUNCTIONS ===================================
|
// OTHER FUNCTIONS ===================================
|
||||||
void taosGetDisk() {
|
void taosGetDisk() {
|
||||||
const double unit = 1024 * 1024 * 1024;
|
const double unit = 1024 * 1024 * 1024;
|
||||||
SysDiskSize diskSize;
|
SDiskSize diskSize;
|
||||||
SFSMeta fsMeta;
|
SFSMeta fsMeta;
|
||||||
|
|
||||||
tfsUpdateInfo(&fsMeta, NULL, 0);
|
tfsUpdateSize(&fsMeta);
|
||||||
tsTotalDataDirGB = (float)(fsMeta.tsize / unit);
|
tsTotalDataDirGB = (float)(fsMeta.total / unit);
|
||||||
tsUsedDataDirGB = (float)(fsMeta.used / unit);
|
tsUsedDataDirGB = (float)(fsMeta.used / unit);
|
||||||
tsAvailDataDirGB = (float)(fsMeta.avail / unit);
|
tsAvailDataDirGB = (float)(fsMeta.avail / unit);
|
||||||
|
|
||||||
if (taosGetDiskSize(tsLogDir, &diskSize) == 0) {
|
if (taosGetDiskSize(tsLogDir, &diskSize) == 0) {
|
||||||
tsTotalLogDirGB = (float)(diskSize.tsize / unit);
|
tsTotalLogDirGB = (float)(diskSize.total / unit);
|
||||||
tsAvailLogDirGB = (float)(diskSize.avail / unit);
|
tsAvailLogDirGB = (float)(diskSize.avail / unit);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosGetDiskSize(tsTempDir, &diskSize) == 0) {
|
if (taosGetDiskSize(tsTempDir, &diskSize) == 0) {
|
||||||
tsTotalTmpDirGB = (float)(diskSize.tsize / unit);
|
tsTotalTmpDirGB = (float)(diskSize.total / unit);
|
||||||
tsAvailTmpDirectorySpace = (float)(diskSize.avail / unit);
|
tsAvailTmpDirectorySpace = (float)(diskSize.avail / unit);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -12,48 +12,45 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
#include "os.h"
|
|
||||||
|
|
||||||
#include "taoserror.h"
|
#define _DEFAULT_SOURCE
|
||||||
#include "tfsint.h"
|
#include "tfsInt.h"
|
||||||
|
|
||||||
// PROTECTED ====================================
|
SDisk *tfsNewDisk(int32_t level, int32_t id, const char *path) {
|
||||||
SDisk *tfsNewDisk(int level, int id, const char *dir) {
|
SDisk *pDisk = calloc(1, sizeof(SDisk));
|
||||||
SDisk *pDisk = (SDisk *)calloc(1, sizeof(*pDisk));
|
|
||||||
if (pDisk == NULL) {
|
if (pDisk == NULL) {
|
||||||
terrno = TSDB_CODE_FS_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDisk->path = strdup(path);
|
||||||
|
if (pDisk->path == NULL) {
|
||||||
|
free(pDisk);
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pDisk->level = level;
|
pDisk->level = level;
|
||||||
pDisk->id = id;
|
pDisk->id = id;
|
||||||
tstrncpy(pDisk->dir, dir, TSDB_FILENAME_LEN);
|
taosGetDiskSize(pDisk->path, &pDisk->size);
|
||||||
|
|
||||||
return pDisk;
|
return pDisk;
|
||||||
}
|
}
|
||||||
|
|
||||||
SDisk *tfsFreeDisk(SDisk *pDisk) {
|
SDisk *tfsFreeDisk(SDisk *pDisk) {
|
||||||
if (pDisk) {
|
if (pDisk != NULL) {
|
||||||
|
free(pDisk->path);
|
||||||
free(pDisk);
|
free(pDisk);
|
||||||
}
|
}
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int tfsUpdateDiskInfo(SDisk *pDisk) {
|
int32_t tfsUpdateDiskSize(SDisk *pDisk) {
|
||||||
ASSERT(pDisk != NULL);
|
if (taosGetDiskSize(pDisk->path, &pDisk->size) != 0) {
|
||||||
|
|
||||||
SysDiskSize diskSize = {0};
|
|
||||||
|
|
||||||
int code = taosGetDiskSize(pDisk->dir, &diskSize);
|
|
||||||
if (code != 0) {
|
|
||||||
fError("failed to update disk information at level %d id %d dir %s since %s", pDisk->level, pDisk->id, pDisk->dir,
|
|
||||||
strerror(errno));
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
fError("failed to get disk:%s size, level:%d id:%d since %s", pDisk->path, pDisk->level, pDisk->id, terrstr());
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pDisk->dmeta.size = diskSize.tsize;
|
return 0;
|
||||||
pDisk->dmeta.used = diskSize.used;
|
|
||||||
pDisk->dmeta.free = diskSize.avail;
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
}
|
|
@ -0,0 +1,143 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define _DEFAULT_SOURCE
|
||||||
|
#include "tfsInt.h"
|
||||||
|
|
||||||
|
#define tfsLockTier(pTier) pthread_spin_lock(&(pTier)->lock)
|
||||||
|
#define tfsUnLockTier(pTier) pthread_spin_unlock(&(pTier)->lock)
|
||||||
|
|
||||||
|
int32_t tfsInitTier(STier *pTier, int32_t level) {
|
||||||
|
memset(pTier, 0, sizeof(STier));
|
||||||
|
|
||||||
|
if (pthread_spin_init(&pTier->lock, 0) != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTier->level = level;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tfsDestroyTier(STier *pTier) {
|
||||||
|
for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) {
|
||||||
|
pTier->disks[id] = tfsFreeDisk(pTier->disks[id]);
|
||||||
|
}
|
||||||
|
|
||||||
|
pTier->ndisk = 0;
|
||||||
|
pthread_spin_destroy(&(pTier->lock));
|
||||||
|
}
|
||||||
|
|
||||||
|
SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
|
||||||
|
if (pTier->ndisk >= TSDB_MAX_DISKS_PER_TIER) {
|
||||||
|
terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t id = 0;
|
||||||
|
if (pTier->level == 0) {
|
||||||
|
if (pTier->disks[0] != NULL) {
|
||||||
|
id = pTier->ndisk;
|
||||||
|
} else {
|
||||||
|
if (pCfg->primary) {
|
||||||
|
id = 0;
|
||||||
|
} else {
|
||||||
|
id = pTier->ndisk + 1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
id = pTier->ndisk;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (id >= TSDB_MAX_DISKS_PER_TIER) {
|
||||||
|
terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
SDisk *pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir);
|
||||||
|
if (pDisk == NULL) return NULL;
|
||||||
|
|
||||||
|
pTier->disks[id] = pDisk;
|
||||||
|
pTier->ndisk++;
|
||||||
|
|
||||||
|
fInfo("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id);
|
||||||
|
return pTier->disks[id];
|
||||||
|
}
|
||||||
|
|
||||||
|
void tfsUpdateTierSize(STier *pTier) {
|
||||||
|
SDiskSize size = {0};
|
||||||
|
int16_t nAvailDisks = 0;
|
||||||
|
|
||||||
|
tfsLockTier(pTier);
|
||||||
|
|
||||||
|
for (int32_t id = 0; id < pTier->ndisk; id++) {
|
||||||
|
SDisk *pDisk = pTier->disks[id];
|
||||||
|
if (pDisk == NULL) continue;
|
||||||
|
|
||||||
|
size.total += pDisk->size.total;
|
||||||
|
size.used += pDisk->size.used;
|
||||||
|
size.avail += pDisk->size.avail;
|
||||||
|
nAvailDisks++;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTier->size = size;
|
||||||
|
pTier->nAvailDisks = nAvailDisks;
|
||||||
|
|
||||||
|
tfsUnLockTier(pTier);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Round-Robin to allocate disk on a tier
|
||||||
|
int32_t tfsAllocDiskOnTier(STier *pTier) {
|
||||||
|
terrno = TSDB_CODE_FS_NO_VALID_DISK;
|
||||||
|
|
||||||
|
tfsLockTier(pTier);
|
||||||
|
|
||||||
|
if (pTier->ndisk <= 0 || pTier->nAvailDisks <= 0) {
|
||||||
|
tfsUnLockTier(pTier);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t retId = -1;
|
||||||
|
for (int32_t id = 0; id < TSDB_MAX_DISKS_PER_TIER; ++id) {
|
||||||
|
int32_t diskId = (pTier->nextid + id) % pTier->ndisk;
|
||||||
|
SDisk *pDisk = pTier->disks[diskId];
|
||||||
|
|
||||||
|
if (pDisk == NULL) continue;
|
||||||
|
|
||||||
|
if (pDisk->size.avail < TFS_MIN_DISK_FREE_SIZE) continue;
|
||||||
|
|
||||||
|
retId = diskId;
|
||||||
|
terrno = 0;
|
||||||
|
pTier->nextid = (diskId + 1) % pTier->ndisk;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tfsUnLockTier(pTier);
|
||||||
|
return retId;
|
||||||
|
}
|
||||||
|
|
||||||
|
void tfsPosNextId(STier *pTier) {
|
||||||
|
int32_t nextid = 0;
|
||||||
|
|
||||||
|
for (int32_t id = 1; id < pTier->ndisk; id++) {
|
||||||
|
SDisk *pLDisk = pTier->disks[nextid];
|
||||||
|
SDisk *pDisk = pTier->disks[id];
|
||||||
|
if (pDisk->size.avail > TFS_MIN_DISK_FREE_SIZE && pDisk->size.avail > pLDisk->size.avail) {
|
||||||
|
nextid = id;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pTier->nextid = nextid;
|
||||||
|
}
|
|
@ -1,170 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
#include "os.h"
|
|
||||||
|
|
||||||
#include "taosdef.h"
|
|
||||||
#include "taoserror.h"
|
|
||||||
#include "tfsint.h"
|
|
||||||
|
|
||||||
#define tfsLockTier(pTier) pthread_spin_lock(&((pTier)->lock))
|
|
||||||
#define tfsUnLockTier(pTier) pthread_spin_unlock(&((pTier)->lock))
|
|
||||||
|
|
||||||
// PROTECTED ==========================================
|
|
||||||
int tfsInitTier(STier *pTier, int level) {
|
|
||||||
memset((void *)pTier, 0, sizeof(*pTier));
|
|
||||||
|
|
||||||
int code = pthread_spin_init(&(pTier->lock), 0);
|
|
||||||
if (code) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTier->level = level;
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tfsDestroyTier(STier *pTier) {
|
|
||||||
for (int id = 0; id < TSDB_MAX_DISKS_PER_TIER; id++) {
|
|
||||||
DISK_AT_TIER(pTier, id) = tfsFreeDisk(DISK_AT_TIER(pTier, id));
|
|
||||||
}
|
|
||||||
|
|
||||||
pTier->ndisk = 0;
|
|
||||||
|
|
||||||
pthread_spin_destroy(&(pTier->lock));
|
|
||||||
}
|
|
||||||
|
|
||||||
SDisk *tfsMountDiskToTier(STier *pTier, SDiskCfg *pCfg) {
|
|
||||||
ASSERT(pTier->level == pCfg->level);
|
|
||||||
|
|
||||||
int id = 0;
|
|
||||||
SDisk *pDisk;
|
|
||||||
|
|
||||||
if (TIER_NDISKS(pTier) >= TSDB_MAX_DISKS_PER_TIER) {
|
|
||||||
terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pTier->level == 0) {
|
|
||||||
if (DISK_AT_TIER(pTier, 0) != NULL) {
|
|
||||||
id = pTier->ndisk;
|
|
||||||
} else {
|
|
||||||
if (pCfg->primary) {
|
|
||||||
id = 0;
|
|
||||||
} else {
|
|
||||||
id = pTier->ndisk + 1;
|
|
||||||
}
|
|
||||||
if (id >= TSDB_MAX_DISKS_PER_TIER) {
|
|
||||||
terrno = TSDB_CODE_FS_TOO_MANY_MOUNT;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
id = pTier->ndisk;
|
|
||||||
}
|
|
||||||
|
|
||||||
pDisk = tfsNewDisk(pCfg->level, id, pCfg->dir);
|
|
||||||
if (pDisk == NULL) return NULL;
|
|
||||||
DISK_AT_TIER(pTier, id) = pDisk;
|
|
||||||
pTier->ndisk++;
|
|
||||||
|
|
||||||
fInfo("disk %s is mounted to tier level %d id %d", pCfg->dir, pCfg->level, id);
|
|
||||||
|
|
||||||
return DISK_AT_TIER(pTier, id);
|
|
||||||
}
|
|
||||||
|
|
||||||
void tfsUpdateTierInfo(STier *pTier, STierMeta *pTierMeta) {
|
|
||||||
STierMeta tmeta;
|
|
||||||
|
|
||||||
if (pTierMeta == NULL) {
|
|
||||||
pTierMeta = &tmeta;
|
|
||||||
}
|
|
||||||
memset(pTierMeta, 0, sizeof(*pTierMeta));
|
|
||||||
|
|
||||||
tfsLockTier(pTier);
|
|
||||||
|
|
||||||
for (int id = 0; id < pTier->ndisk; id++) {
|
|
||||||
if (tfsUpdateDiskInfo(DISK_AT_TIER(pTier, id)) < 0) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
pTierMeta->size += DISK_SIZE(DISK_AT_TIER(pTier, id));
|
|
||||||
pTierMeta->used += DISK_USED_SIZE(DISK_AT_TIER(pTier, id));
|
|
||||||
pTierMeta->free += DISK_FREE_SIZE(DISK_AT_TIER(pTier, id));
|
|
||||||
pTierMeta->nAvailDisks++;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTier->tmeta = *pTierMeta;
|
|
||||||
|
|
||||||
tfsUnLockTier(pTier);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Round-Robin to allocate disk on a tier
|
|
||||||
int tfsAllocDiskOnTier(STier *pTier) {
|
|
||||||
ASSERT(pTier->ndisk > 0);
|
|
||||||
int id = TFS_UNDECIDED_ID;
|
|
||||||
SDisk *pDisk;
|
|
||||||
|
|
||||||
tfsLockTier(pTier);
|
|
||||||
|
|
||||||
if (TIER_AVAIL_DISKS(pTier) <= 0) {
|
|
||||||
tfsUnLockTier(pTier);
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
id = pTier->nextid;
|
|
||||||
while (true) {
|
|
||||||
pDisk = DISK_AT_TIER(pTier, id);
|
|
||||||
ASSERT(pDisk != NULL);
|
|
||||||
|
|
||||||
if (DISK_FREE_SIZE(pDisk) < TFS_MIN_DISK_FREE_SIZE) {
|
|
||||||
id = (id + 1) % pTier->ndisk;
|
|
||||||
if (id == pTier->nextid) {
|
|
||||||
tfsUnLockTier(pTier);
|
|
||||||
return TFS_UNDECIDED_ID;
|
|
||||||
} else {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
pTier->nextid = (id + 1) % pTier->ndisk;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tfsUnLockTier(pTier);
|
|
||||||
return id;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tfsGetTierMeta(STier *pTier, STierMeta *pTierMeta) {
|
|
||||||
ASSERT(pTierMeta != NULL);
|
|
||||||
|
|
||||||
tfsLockTier(pTier);
|
|
||||||
*pTierMeta = pTier->tmeta;
|
|
||||||
tfsUnLockTier(pTier);
|
|
||||||
}
|
|
||||||
|
|
||||||
void tfsPosNextId(STier *pTier) {
|
|
||||||
ASSERT(pTier->ndisk > 0);
|
|
||||||
int nextid = 0;
|
|
||||||
|
|
||||||
for (int id = 1; id < pTier->ndisk; id++) {
|
|
||||||
SDisk *pLDisk = DISK_AT_TIER(pTier, nextid);
|
|
||||||
SDisk *pDisk = DISK_AT_TIER(pTier, id);
|
|
||||||
if (DISK_FREE_SIZE(pDisk) > TFS_MIN_DISK_FREE_SIZE && DISK_FREE_SIZE(pDisk) > DISK_FREE_SIZE(pLDisk)) {
|
|
||||||
nextid = id;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pTier->nextid = nextid;
|
|
||||||
}
|
|
|
@ -149,6 +149,7 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
closedir(dir);
|
||||||
regfree(&logRegPattern);
|
regfree(&logRegPattern);
|
||||||
regfree(&idxRegPattern);
|
regfree(&idxRegPattern);
|
||||||
|
|
||||||
|
|
|
@ -121,7 +121,7 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
|
int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
|
||||||
unsigned _int64 i64FreeBytesToCaller;
|
unsigned _int64 i64FreeBytesToCaller;
|
||||||
unsigned _int64 i64TotalBytes;
|
unsigned _int64 i64TotalBytes;
|
||||||
unsigned _int64 i64FreeBytes;
|
unsigned _int64 i64FreeBytes;
|
||||||
|
@ -438,7 +438,7 @@ int taosSystem(const char *cmd) {
|
||||||
|
|
||||||
void taosSetCoreDump() {}
|
void taosSetCoreDump() {}
|
||||||
|
|
||||||
int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
|
int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
|
||||||
struct statvfs info;
|
struct statvfs info;
|
||||||
if (statvfs(dataDir, &info)) {
|
if (statvfs(dataDir, &info)) {
|
||||||
//printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno));
|
//printf("failed to get disk size, dataDir:%s errno:%s", tsDataDir, strerror(errno));
|
||||||
|
@ -771,13 +771,12 @@ bool taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t taosGetDiskSize(char *dataDir, SysDiskSize *diskSize) {
|
int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
|
||||||
struct statvfs info;
|
struct statvfs info;
|
||||||
if (statvfs(dataDir, &info)) {
|
if (statvfs(dataDir, &info)) {
|
||||||
//printf("failed to get disk size, dataDir:%s errno:%s", dataDir, strerror(errno));
|
|
||||||
return -1;
|
return -1;
|
||||||
} else {
|
} else {
|
||||||
diskSize->tsize = info.f_blocks * info.f_frsize;
|
diskSize->total = info.f_blocks * info.f_frsize;
|
||||||
diskSize->avail = info.f_bavail * info.f_frsize;
|
diskSize->avail = info.f_bavail * info.f_frsize;
|
||||||
diskSize->used = (info.f_blocks - info.f_bfree) * info.f_frsize;
|
diskSize->used = (info.f_blocks - info.f_bfree) * info.f_frsize;
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -396,7 +396,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit")
|
TAOS_DEFINE_ERROR(TSDB_CODE_WAL_SIZE_LIMIT, "WAL size exceeds limit")
|
||||||
|
|
||||||
// tfs
|
// tfs
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_OUT_OF_MEMORY, "tfs out of memory")
|
TAOS_DEFINE_ERROR(TSDB_CODE_FS_APP_ERROR, "tfs out of memory")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config")
|
TAOS_DEFINE_ERROR(TSDB_CODE_FS_INVLD_CFG, "tfs invalid mount config")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_TOO_MANY_MOUNT, "tfs too many mount")
|
TAOS_DEFINE_ERROR(TSDB_CODE_FS_TOO_MANY_MOUNT, "tfs too many mount")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_FS_DUP_PRIMARY, "tfs duplicate primary mount")
|
TAOS_DEFINE_ERROR(TSDB_CODE_FS_DUP_PRIMARY, "tfs duplicate primary mount")
|
||||||
|
|
103
src/inc/tfs.h
103
src/inc/tfs.h
|
@ -1,103 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef TD_TFS_H
|
|
||||||
#define TD_TFS_H
|
|
||||||
|
|
||||||
#include "tglobal.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int level;
|
|
||||||
int id;
|
|
||||||
} SDiskID;
|
|
||||||
|
|
||||||
#define TFS_UNDECIDED_LEVEL -1
|
|
||||||
#define TFS_UNDECIDED_ID -1
|
|
||||||
#define TFS_PRIMARY_LEVEL 0
|
|
||||||
#define TFS_PRIMARY_ID 0
|
|
||||||
#define TFS_MIN_LEVEL 0
|
|
||||||
#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1)
|
|
||||||
|
|
||||||
// FS APIs ====================================
|
|
||||||
typedef struct {
|
|
||||||
int64_t tsize;
|
|
||||||
int64_t used;
|
|
||||||
int64_t avail;
|
|
||||||
} SFSMeta;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t size;
|
|
||||||
int64_t used;
|
|
||||||
int64_t free;
|
|
||||||
int16_t nAvailDisks; // # of Available disks
|
|
||||||
} STierMeta;
|
|
||||||
|
|
||||||
int tfsInit(SDiskCfg *pDiskCfg, int ndisk);
|
|
||||||
void tfsCleanup();
|
|
||||||
void tfsUpdateInfo(SFSMeta *pFSMeta, STierMeta *tierMetas, int8_t numLevels);
|
|
||||||
void tfsGetMeta(SFSMeta *pMeta);
|
|
||||||
void tfsAllocDisk(int expLevel, int *level, int *id);
|
|
||||||
|
|
||||||
const char *TFS_PRIMARY_PATH();
|
|
||||||
const char *TFS_DISK_PATH(int level, int id);
|
|
||||||
|
|
||||||
// TFILE APIs ====================================
|
|
||||||
typedef struct {
|
|
||||||
int level;
|
|
||||||
int id;
|
|
||||||
char rname[TSDB_FILENAME_LEN]; // REL name
|
|
||||||
char aname[TSDB_FILENAME_LEN]; // ABS name
|
|
||||||
} TFILE;
|
|
||||||
|
|
||||||
#define TFILE_LEVEL(pf) ((pf)->level)
|
|
||||||
#define TFILE_ID(pf) ((pf)->id)
|
|
||||||
#define TFILE_NAME(pf) ((pf)->aname)
|
|
||||||
#define TFILE_REL_NAME(pf) ((pf)->rname)
|
|
||||||
|
|
||||||
#define tfsopen(pf, flags) open(TFILE_NAME(pf), flags)
|
|
||||||
#define tfsclose(fd) close(fd)
|
|
||||||
#define tfsremove(pf) remove(TFILE_NAME(pf))
|
|
||||||
#define tfscopy(sf, df) taosCopy(TFILE_NAME(sf), TFILE_NAME(df))
|
|
||||||
#define tfsrename(sf, df) taosRename(TFILE_NAME(sf), TFILE_NAME(df))
|
|
||||||
|
|
||||||
void tfsInitFile(TFILE *pf, int level, int id, const char *bname);
|
|
||||||
bool tfsIsSameFile(const TFILE *pf1, const TFILE *pf2);
|
|
||||||
int tfsEncodeFile(void **buf, TFILE *pf);
|
|
||||||
void *tfsDecodeFile(void *buf, TFILE *pf);
|
|
||||||
void tfsbasename(const TFILE *pf, char *dest);
|
|
||||||
void tfsdirname(const TFILE *pf, char *dest);
|
|
||||||
|
|
||||||
// DIR APIs ====================================
|
|
||||||
int tfsMkdirAt(const char *rname, int level, int id);
|
|
||||||
int tfsMkdirRecurAt(const char *rname, int level, int id);
|
|
||||||
int tfsMkdir(const char *rname);
|
|
||||||
int tfsRmdir(const char *rname);
|
|
||||||
int tfsRename(char *orname, char *nrname);
|
|
||||||
|
|
||||||
typedef struct TDIR TDIR;
|
|
||||||
|
|
||||||
TDIR * tfsOpendir(const char *rname);
|
|
||||||
const TFILE *tfsReaddir(TDIR *tdir);
|
|
||||||
void tfsClosedir(TDIR *tdir);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif
|
|
|
@ -0,0 +1,57 @@
|
||||||
|
#!/bin/bash
|
||||||
|
#
|
||||||
|
# clean test environment
|
||||||
|
|
||||||
|
set -e
|
||||||
|
#set -x
|
||||||
|
|
||||||
|
# cleanCluster.sh
|
||||||
|
# -r [ dnode root dir]
|
||||||
|
|
||||||
|
|
||||||
|
dataRootDir="/data"
|
||||||
|
|
||||||
|
|
||||||
|
while getopts "hr:" arg
|
||||||
|
do
|
||||||
|
case $arg in
|
||||||
|
r)
|
||||||
|
dataRootDir=$(echo $OPTARG)
|
||||||
|
;;
|
||||||
|
h)
|
||||||
|
echo "Usage: `basename $0` -r [ dnode root dir] "
|
||||||
|
exit 0
|
||||||
|
;;
|
||||||
|
?) #unknow option
|
||||||
|
echo "unkonw argument"
|
||||||
|
exit 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
done
|
||||||
|
|
||||||
|
|
||||||
|
rmDnodesDataDir() {
|
||||||
|
if [ -d ${dataRootDir} ]; then
|
||||||
|
rm -rf ${dataRootDir}/dnode*
|
||||||
|
else
|
||||||
|
echo "${dataRootDir} not exist"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
function kill_process() {
|
||||||
|
pid=$(ps -ef | grep "$1" | grep -v "grep" | awk '{print $2}')
|
||||||
|
if [ -n "$pid" ]; then
|
||||||
|
kill -9 $pid || :
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
########################################################################################
|
||||||
|
############################### main process ##########################################
|
||||||
|
|
||||||
|
## kill all taosd process
|
||||||
|
kill_process taosd
|
||||||
|
|
||||||
|
rmDnodesDataDir
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,81 @@
|
||||||
|
#!/bin/bash
|
||||||
|
#
|
||||||
|
# compile test version
|
||||||
|
|
||||||
|
set -e
|
||||||
|
#set -x
|
||||||
|
|
||||||
|
# compileVersion.sh
|
||||||
|
# -r [ TDengine project dir]
|
||||||
|
# -v [ TDengine branch version ]
|
||||||
|
|
||||||
|
|
||||||
|
projectDir=/root/TDengine
|
||||||
|
TDengineBrVer="3.0"
|
||||||
|
|
||||||
|
while getopts "hr:v:" arg
|
||||||
|
do
|
||||||
|
case $arg in
|
||||||
|
r)
|
||||||
|
projectDir=$(echo $OPTARG)
|
||||||
|
;;
|
||||||
|
v)
|
||||||
|
TDengineBrVer=$(echo $OPTARG)
|
||||||
|
;;
|
||||||
|
h)
|
||||||
|
echo "Usage: `basename $0` -r [ TDengine project dir] "
|
||||||
|
echo " -v [ TDengine branch version] "
|
||||||
|
exit 0
|
||||||
|
;;
|
||||||
|
?) #unknow option
|
||||||
|
echo "unkonw argument"
|
||||||
|
exit 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
done
|
||||||
|
|
||||||
|
echo "projectDir=${projectDir} TDengineBrVer=${TDengineBrVer}"
|
||||||
|
|
||||||
|
function gitPullBranchInfo () {
|
||||||
|
branch_name=$1
|
||||||
|
|
||||||
|
git checkout $branch_name
|
||||||
|
echo "==== git pull $branch_name start ===="
|
||||||
|
## git submodule update --init --recursive
|
||||||
|
git pull origin $branch_name ||:
|
||||||
|
echo "==== git pull $branch_name end ===="
|
||||||
|
}
|
||||||
|
|
||||||
|
function compileTDengineVersion() {
|
||||||
|
debugDir=debug
|
||||||
|
if [ -d ${debugDir} ]; then
|
||||||
|
rm -rf ${debugDir}/* ||:
|
||||||
|
else
|
||||||
|
mkdir -p ${debugDir}
|
||||||
|
fi
|
||||||
|
|
||||||
|
cd ${debugDir}
|
||||||
|
cmake ..
|
||||||
|
make -j24
|
||||||
|
}
|
||||||
|
########################################################################################
|
||||||
|
############################### main process ##########################################
|
||||||
|
|
||||||
|
## checkout all branchs and git pull
|
||||||
|
cd ${projectDir}
|
||||||
|
gitPullBranchInfo $TDengineBrVer
|
||||||
|
compileTDengineVersion
|
||||||
|
|
||||||
|
taos_dir=${projectDir}/debug/tools/shell
|
||||||
|
taosd_dir=${projectDir}/debug/source/dnode/mgmt/daemon
|
||||||
|
create_table_dir=${projectDir}/debug/tests/test/c
|
||||||
|
|
||||||
|
rm -f /usr/bin/taos
|
||||||
|
rm -f /usr/bin/taosd
|
||||||
|
rm -f /usr/bin/create_table
|
||||||
|
|
||||||
|
ln -s $taos_dir/taos /usr/bin/taos
|
||||||
|
ln -s $taosd_dir/taosd /usr/bin/taosd
|
||||||
|
ln -s $create_table_dir/create_table /usr/bin/create_table
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,25 @@
|
||||||
|
#!/bin/bash
|
||||||
|
#
|
||||||
|
# deploy test cluster
|
||||||
|
|
||||||
|
set -e
|
||||||
|
#set -x
|
||||||
|
|
||||||
|
# deployCluster.sh
|
||||||
|
|
||||||
|
curr_dir=$(pwd)
|
||||||
|
|
||||||
|
source ./cleanCluster.sh -r /data
|
||||||
|
source ./cleanCluster.sh -r /data2
|
||||||
|
|
||||||
|
source ./compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0"
|
||||||
|
|
||||||
|
source ./setupDnodes.sh -r /data -n 1 -f trd02:7000 -p 7000
|
||||||
|
source ./setupDnodes.sh -r /data2 -n 1 -f trd02:7000 -p 8000
|
||||||
|
|
||||||
|
#source ./setupDnodes.sh -r /data -n 2 -f trd02:7000 -p 7000
|
||||||
|
#source ./setupDnodes.sh -r /data2 -n 2 -f trd02:7000 -p 8000
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,136 @@
|
||||||
|
#!/bin/bash
|
||||||
|
#
|
||||||
|
# setup test environment
|
||||||
|
|
||||||
|
set -e
|
||||||
|
#set -x
|
||||||
|
|
||||||
|
# setupDnodes.sh
|
||||||
|
# -e [ new | old]
|
||||||
|
# -n [ dnode number]
|
||||||
|
# -f [ first ep]
|
||||||
|
# -p [ start port]
|
||||||
|
# -r [ dnode root dir]
|
||||||
|
|
||||||
|
# set parameters by default value
|
||||||
|
enviMode=new
|
||||||
|
dataRootDir="/data"
|
||||||
|
firstEp="localhost:7000"
|
||||||
|
startPort=7000
|
||||||
|
dnodeNumber=1
|
||||||
|
|
||||||
|
|
||||||
|
while getopts "he:f:n:r:p:" arg
|
||||||
|
do
|
||||||
|
case $arg in
|
||||||
|
e)
|
||||||
|
enviMode=$( echo $OPTARG )
|
||||||
|
;;
|
||||||
|
n)
|
||||||
|
dnodeNumber=$(echo $OPTARG)
|
||||||
|
;;
|
||||||
|
f)
|
||||||
|
firstEp=$(echo $OPTARG)
|
||||||
|
;;
|
||||||
|
p)
|
||||||
|
startPort=$(echo $OPTARG)
|
||||||
|
;;
|
||||||
|
r)
|
||||||
|
dataRootDir=$(echo $OPTARG)
|
||||||
|
;;
|
||||||
|
h)
|
||||||
|
echo "Usage: `basename $0` -e [new | old] "
|
||||||
|
echo " -n [ dnode number] "
|
||||||
|
echo " -f [ first ep] "
|
||||||
|
echo " -p [ start port] "
|
||||||
|
echo " -r [ dnode root dir] "
|
||||||
|
exit 0
|
||||||
|
;;
|
||||||
|
?) #unknow option
|
||||||
|
echo "unkonw argument"
|
||||||
|
exit 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
done
|
||||||
|
|
||||||
|
echo "enviMode=${enviMode} dnodeNumber=${dnodeNumber} dataRootDir=${dataRootDir} firstEp=${firstEp} startPort=${startPort}"
|
||||||
|
|
||||||
|
#curr_dir=$(pwd)
|
||||||
|
|
||||||
|
|
||||||
|
createNewCfgFile() {
|
||||||
|
cfgFile=$1/taos.cfg
|
||||||
|
dataDir=$2
|
||||||
|
logDir=$3
|
||||||
|
firstEp=$4
|
||||||
|
serverPort=$5
|
||||||
|
|
||||||
|
echo "debugFlag 131" > ${cfgFile}
|
||||||
|
echo "firstEp ${firstEp}" >> ${cfgFile}
|
||||||
|
echo "dataDir ${dataDir}" >> ${cfgFile}
|
||||||
|
echo "logDir ${logDir}" >> ${cfgFile}
|
||||||
|
echo "serverPort ${serverPort}" >> ${cfgFile}
|
||||||
|
|
||||||
|
echo "supportVnodes 1024" >> ${cfgFile}
|
||||||
|
#echo "asyncLog 0" >> ${cfgFile}
|
||||||
|
echo "telemetryReporting 0" >> ${cfgFile}
|
||||||
|
}
|
||||||
|
|
||||||
|
createNewDnodesDataDir() {
|
||||||
|
if [ -d ${dataRootDir} ]; then
|
||||||
|
rm -rf ${dataRootDir}/dnode*
|
||||||
|
else
|
||||||
|
echo "${dataRootDir} not exist"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
dnodeNumber=$1
|
||||||
|
firstEp=$2
|
||||||
|
|
||||||
|
serverPort=${startPort}
|
||||||
|
for ((i=0; i<${dnodeNumber}; i++)); do
|
||||||
|
mkdir -p ${dataRootDir}/dnode_${i}/cfg
|
||||||
|
mkdir -p ${dataRootDir}/dnode_${i}/log
|
||||||
|
mkdir -p ${dataRootDir}/dnode_${i}/data
|
||||||
|
|
||||||
|
createNewCfgFile ${dataRootDir}/dnode_${i}/cfg ${dataRootDir}/dnode_${i}/data ${dataRootDir}/dnode_${i}/log ${firstEp} ${serverPort}
|
||||||
|
echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}"
|
||||||
|
serverPort=$((10#${serverPort}+100))
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
function kill_process() {
|
||||||
|
pid=$(ps -ef | grep "$1" | grep -v "grep" | awk '{print $2}')
|
||||||
|
if [ -n "$pid" ]; then
|
||||||
|
kill -9 $pid || :
|
||||||
|
fi
|
||||||
|
}
|
||||||
|
|
||||||
|
startDnodes() {
|
||||||
|
dnodeNumber=$1
|
||||||
|
|
||||||
|
for ((i=0; i<${dnodeNumber}; i++)); do
|
||||||
|
if [ -d ${dataRootDir}/dnode_${i} ]; then
|
||||||
|
nohup taosd -c ${dataRootDir}/dnode_${i}/cfg >/dev/null 2>&1 &
|
||||||
|
echo "start taosd ${dataRootDir}/dnode_${i}"
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
}
|
||||||
|
|
||||||
|
########################################################################################
|
||||||
|
############################### main process ##########################################
|
||||||
|
|
||||||
|
## kill all taosd process
|
||||||
|
kill_process taosd
|
||||||
|
|
||||||
|
## create director for all dnode
|
||||||
|
if [[ "$enviMode" == "new" ]]; then
|
||||||
|
createNewDnodesDataDir ${dnodeNumber} ${firstEp}
|
||||||
|
fi
|
||||||
|
|
||||||
|
## start all dnode by nohup
|
||||||
|
startDnodes ${dnodeNumber}
|
||||||
|
|
||||||
|
echo " run setupDnodes.sh end !!!"
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue