Merge remote-tracking branch 'origin/feature/vnode' into feature/qnode
This commit is contained in:
commit
d7f756b587
|
@ -356,9 +356,9 @@ typedef struct SEpSet {
|
||||||
} SEpSet;
|
} SEpSet;
|
||||||
|
|
||||||
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
|
static FORCE_INLINE int taosEncodeSEpSet(void** buf, const SEpSet* pEp) {
|
||||||
if(buf == NULL) return sizeof(SEpSet);
|
if (buf == NULL) return sizeof(SEpSet);
|
||||||
memcpy(buf, pEp, sizeof(SEpSet));
|
memcpy(buf, pEp, sizeof(SEpSet));
|
||||||
//TODO: endian conversion
|
// TODO: endian conversion
|
||||||
return sizeof(SEpSet);
|
return sizeof(SEpSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1124,10 +1124,10 @@ typedef struct STaskDropRsp {
|
||||||
} STaskDropRsp;
|
} STaskDropRsp;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t igExists;
|
int8_t igExists;
|
||||||
char* name;
|
char* name;
|
||||||
char* physicalPlan;
|
char* physicalPlan;
|
||||||
char* logicalPlan;
|
char* logicalPlan;
|
||||||
} SCMCreateTopicReq;
|
} SCMCreateTopicReq;
|
||||||
|
|
||||||
static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) {
|
static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) {
|
||||||
|
@ -1163,8 +1163,8 @@ static FORCE_INLINE void* tDeserializeSCMCreateTopicRsp(void* buf, SCMCreateTopi
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char* topicName;
|
char* topicName;
|
||||||
char* consumerGroup;
|
char* consumerGroup;
|
||||||
int64_t consumerId;
|
int64_t consumerId;
|
||||||
} SCMSubscribeReq;
|
} SCMSubscribeReq;
|
||||||
|
|
||||||
|
@ -1185,7 +1185,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
SEpSet pEpSet;
|
SEpSet pEpSet;
|
||||||
} SCMSubscribeRsp;
|
} SCMSubscribeRsp;
|
||||||
|
|
||||||
static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) {
|
static FORCE_INLINE int tSerializeSCMSubscribeRsp(void** buf, const SCMSubscribeRsp* pRsp) {
|
||||||
|
|
|
@ -0,0 +1,59 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_UTIL_FREELIST_H_
|
||||||
|
#define _TD_UTIL_FREELIST_H_
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "tlist.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
struct SFreeListNode {
|
||||||
|
TD_SLIST_NODE(SFreeListNode);
|
||||||
|
char payload[];
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef TD_SLIST(SFreeListNode) SFreeList;
|
||||||
|
|
||||||
|
#define TFL_MALLOC(SIZE, LIST) \
|
||||||
|
({ \
|
||||||
|
void *ptr = malloc((SIZE) + sizeof(struct SFreeListNode)); \
|
||||||
|
if (ptr) { \
|
||||||
|
TD_SLIST_PUSH((LIST), (struct SFreeListNode *)ptr); \
|
||||||
|
ptr = ((struct SFreeListNode *)ptr)->payload; \
|
||||||
|
} \
|
||||||
|
ptr; \
|
||||||
|
})
|
||||||
|
|
||||||
|
#define tFreeListInit(pFL) TD_SLIST_INIT(pFL)
|
||||||
|
|
||||||
|
static FORCE_INLINE void tFreeListClear(SFreeList *pFL) {
|
||||||
|
struct SFreeListNode *pNode;
|
||||||
|
for (;;) {
|
||||||
|
pNode = TD_SLIST_HEAD(pFL);
|
||||||
|
if (pNode == NULL) break;
|
||||||
|
TD_SLIST_POP(pFL);
|
||||||
|
free(pNode);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /*_TD_UTIL_FREELIST_H_*/
|
|
@ -17,7 +17,7 @@
|
||||||
#include "vnodeDef.h"
|
#include "vnodeDef.h"
|
||||||
|
|
||||||
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
|
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
|
|
||||||
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); }
|
int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NULL, &pVnode->pQuery); }
|
||||||
|
|
||||||
|
@ -43,7 +43,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg);
|
return qWorkerProcessShowMsg(pVnode, pVnode->pQuery, pMsg);
|
||||||
case TDMT_VND_SHOW_TABLES_FETCH:
|
case TDMT_VND_SHOW_TABLES_FETCH:
|
||||||
return vnodeGetTableList(pVnode, pMsg);
|
return vnodeGetTableList(pVnode, pMsg);
|
||||||
// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
|
// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
|
||||||
case TDMT_VND_TABLE_META:
|
case TDMT_VND_TABLE_META:
|
||||||
return vnodeGetTableMeta(pVnode, pMsg, pRsp);
|
return vnodeGetTableMeta(pVnode, pMsg, pRsp);
|
||||||
default:
|
default:
|
||||||
|
@ -62,16 +62,17 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
SSchemaWrapper *pSW;
|
SSchemaWrapper *pSW;
|
||||||
STableMetaMsg * pTbMetaMsg;
|
STableMetaMsg * pTbMetaMsg;
|
||||||
SSchema * pTagSchema;
|
SSchema * pTagSchema;
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
|
||||||
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid);
|
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tableFname, &uid);
|
||||||
if (pTbCfg == NULL) {
|
if (pTbCfg == NULL) {
|
||||||
return -1;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTbCfg->type == META_CHILD_TABLE) {
|
if (pTbCfg->type == META_CHILD_TABLE) {
|
||||||
pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid);
|
pStbCfg = metaGetTbInfoByUid(pVnode->pMeta, pTbCfg->ctbCfg.suid);
|
||||||
if (pStbCfg == NULL) {
|
if (pStbCfg == NULL) {
|
||||||
return -1;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSW = metaGetTableSchema(pVnode->pMeta, pTbCfg->ctbCfg.suid, 0, true);
|
pSW = metaGetTableSchema(pVnode->pMeta, pTbCfg->ctbCfg.suid, 0, true);
|
||||||
|
@ -94,7 +95,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
int msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols);
|
int msgLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (nCols + nTagCols);
|
||||||
pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen);
|
pTbMetaMsg = (STableMetaMsg *)rpcMallocCont(msgLen);
|
||||||
if (pTbMetaMsg == NULL) {
|
if (pTbMetaMsg == NULL) {
|
||||||
return -1;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
strcpy(pTbMetaMsg->tbFname, pTbCfg->name);
|
strcpy(pTbMetaMsg->tbFname, pTbCfg->name);
|
||||||
|
@ -119,13 +120,13 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
pSch->bytes = htonl(pSch->bytes);
|
pSch->bytes = htonl(pSch->bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
_exit:
|
||||||
.handle = pMsg->handle,
|
|
||||||
.ahandle = pMsg->ahandle,
|
rpcMsg.handle = pMsg->handle;
|
||||||
.pCont = pTbMetaMsg,
|
rpcMsg.ahandle = pMsg->ahandle;
|
||||||
.contLen = msgLen,
|
rpcMsg.pCont = pTbMetaMsg;
|
||||||
.code = 0,
|
rpcMsg.contLen = msgLen;
|
||||||
};
|
rpcMsg.code = 0;
|
||||||
|
|
||||||
rpcSendResponse(&rpcMsg);
|
rpcSendResponse(&rpcMsg);
|
||||||
|
|
||||||
|
@ -138,10 +139,10 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
* @param pRsp
|
* @param pRsp
|
||||||
*/
|
*/
|
||||||
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
|
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
SMTbCursor* pCur = metaOpenTbCursor(pVnode->pMeta);
|
SMTbCursor *pCur = metaOpenTbCursor(pVnode->pMeta);
|
||||||
SArray* pArray = taosArrayInit(10, POINTER_BYTES);
|
SArray * pArray = taosArrayInit(10, POINTER_BYTES);
|
||||||
|
|
||||||
char* name = NULL;
|
char * name = NULL;
|
||||||
int32_t totalLen = 0;
|
int32_t totalLen = 0;
|
||||||
while ((name = metaTbCursorNext(pCur)) != NULL) {
|
while ((name = metaTbCursorNext(pCur)) != NULL) {
|
||||||
taosArrayPush(pArray, &name);
|
taosArrayPush(pArray, &name);
|
||||||
|
@ -150,18 +151,19 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
metaCloseTbCursor(pCur);
|
metaCloseTbCursor(pCur);
|
||||||
|
|
||||||
int32_t rowLen = (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 2 + (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 4;
|
int32_t rowLen =
|
||||||
int32_t numOfTables = (int32_t) taosArrayGetSize(pArray);
|
(TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 2 + (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE) + 8 + 4;
|
||||||
|
int32_t numOfTables = (int32_t)taosArrayGetSize(pArray);
|
||||||
|
|
||||||
int32_t payloadLen = rowLen * numOfTables;
|
int32_t payloadLen = rowLen * numOfTables;
|
||||||
// SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
|
// SVShowTablesFetchReq *pFetchReq = pMsg->pCont;
|
||||||
|
|
||||||
SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp) + payloadLen);
|
SVShowTablesFetchRsp *pFetchRsp = (SVShowTablesFetchRsp *)rpcMallocCont(sizeof(SVShowTablesFetchRsp) + payloadLen);
|
||||||
memset(pFetchRsp, 0, sizeof(struct SVShowTablesFetchRsp) + payloadLen);
|
memset(pFetchRsp, 0, sizeof(struct SVShowTablesFetchRsp) + payloadLen);
|
||||||
|
|
||||||
char* p = pFetchRsp->data;
|
char *p = pFetchRsp->data;
|
||||||
for(int32_t i = 0; i < numOfTables; ++i) {
|
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||||
char* n = taosArrayGetP(pArray, i);
|
char *n = taosArrayGetP(pArray, i);
|
||||||
STR_TO_VARSTR(p, n);
|
STR_TO_VARSTR(p, n);
|
||||||
|
|
||||||
p += (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
|
p += (TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE);
|
||||||
|
@ -171,11 +173,11 @@ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
pFetchRsp->precision = 0;
|
pFetchRsp->precision = 0;
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.handle = pMsg->handle,
|
.handle = pMsg->handle,
|
||||||
.ahandle = pMsg->ahandle,
|
.ahandle = pMsg->ahandle,
|
||||||
.pCont = pFetchRsp,
|
.pCont = pFetchRsp,
|
||||||
.contLen = sizeof(SVShowTablesFetchRsp) + payloadLen,
|
.contLen = sizeof(SVShowTablesFetchRsp) + payloadLen,
|
||||||
.code = 0,
|
.code = 0,
|
||||||
};
|
};
|
||||||
|
|
||||||
rpcSendResponse(&rpcMsg);
|
rpcSendResponse(&rpcMsg);
|
||||||
|
|
|
@ -33,4 +33,12 @@ ENDIF()
|
||||||
|
|
||||||
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/util/inc)
|
||||||
|
|
||||||
|
# freelistTest
|
||||||
|
add_executable(freelistTest "")
|
||||||
|
target_sources(freelistTest
|
||||||
|
PRIVATE
|
||||||
|
"freelistTest.cpp"
|
||||||
|
)
|
||||||
|
target_link_libraries(freelistTest os util gtest gtest_main)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,16 @@
|
||||||
|
#include "gtest/gtest.h"
|
||||||
|
|
||||||
|
#include "freelist.h"
|
||||||
|
|
||||||
|
TEST(TD_UTIL_FREELIST_TEST, simple_test) {
|
||||||
|
SFreeList fl;
|
||||||
|
|
||||||
|
tFreeListInit(&fl);
|
||||||
|
|
||||||
|
for (size_t i = 0; i < 1000; i++) {
|
||||||
|
void *ptr = TFL_MALLOC(1024, &fl);
|
||||||
|
GTEST_ASSERT_NE(ptr, nullptr);
|
||||||
|
}
|
||||||
|
|
||||||
|
tFreeListClear(&fl);
|
||||||
|
}
|
Loading…
Reference in New Issue