From a50c3f3d5e65996b2a6148b77389a828d2793968 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 19 Jan 2022 18:19:48 +0800 Subject: [PATCH 1/6] add client --- source/libs/transport/inc/transComm.h | 19 ++++ source/libs/transport/src/trans.c | 1 + source/libs/transport/src/transCli.c | 145 +++++++++++++++++++------- source/libs/transport/src/transSrv.c | 30 +++--- source/libs/transport/test/rclient.c | 6 +- 5 files changed, 143 insertions(+), 58 deletions(-) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 792200639a..4b14f9f2c7 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -68,6 +68,25 @@ typedef void* queue[2]; QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ } +#define QUEUE_SPLIT(h, q, n) \ + do { \ + QUEUE_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(n) = (n); \ + QUEUE_NEXT(n) = (q); \ + QUEUE_PREV(h) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(h) = (h); \ + QUEUE_PREV(q) = (n); \ + } while (0) + +#define QUEUE_MOVE(h, n) \ + do { \ + if (QUEUE_IS_EMPTY(h)) { \ + QUEUE_INIT(n); \ + } else { \ + queue* q = QUEUE_HEAD(h); \ + QUEUE_SPLIT(h, q, n); \ + } \ + } while (0) /* Return the element at the front of the queue. */ #define QUEUE_HEAD(q) (QUEUE_NEXT(q)) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 89361b13ad..cb8ef87b48 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -35,6 +35,7 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->label) { tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); } + pRpc->cfp = pInit->cfp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->connType = pInit->connType; pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 29f3361b10..7e5874d8cd 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -20,12 +20,14 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; + uv_write_t* writeReq; void* data; queue conn; } SCliConn; typedef struct SCliMsg { SRpcReqContext* context; queue q; + uint64_t st; } SCliMsg; typedef struct SCliThrdObj { @@ -45,86 +47,153 @@ typedef struct SClientObj { SCliThrdObj** pThreadObj; } SClientObj; -static void clientWriteCb(uv_write_t* req, int status); +static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); -static void clientConnCb(struct uv_connect_s* req, int status); +static void clientWriteCb(uv_write_t* req, int status); +static void clientConnCb(uv_connect_t* req, int status); static void clientAsyncCb(uv_async_t* handle); +static void clientDestroy(uv_handle_t* handle); +static void clientConnDestroy(SCliConn* pConn); static void* clientThread(void* arg); +static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); + +static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + // impl later +} +static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { + // impl later + SCliConn* conn = handle->data; + if (nread > 0) { + return; + } + // + uv_close((uv_handle_t*)handle, clientDestroy); +} + +static void clientConnDestroy(SCliConn* conn) { + // impl later + // +} +static void clientDestroy(uv_handle_t* handle) { + SCliConn* conn = handle->data; + clientConnDestroy(conn); +} + static void clientWriteCb(uv_write_t* req, int status) { - // impl later -} -static void clientFailedCb(uv_handle_t* handle) { - // impl later - tDebug("close handle"); -} -static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { - // impl later -} -static void clientConnCb(struct uv_connect_s* req, int status) { SCliConn* pConn = req->data; - SCliMsg* pMsg = pConn->data; - SEpSet* pEpSet = &pMsg->context->epSet; + if (status == 0) { + tDebug("data already was written on stream"); + } else { + uv_close((uv_handle_t*)pConn->stream, clientDestroy); + return; + } + + uv_read_start((uv_stream_t*)pConn->stream, clientAllocrReadBufferCb, clientReadCb); + // impl later +} +static void clientConnCb(uv_connect_t* req, int status) { + // impl later + SCliConn* pConn = req->data; + if (status != 0) { + tError("failed to connect %s", uv_err_name(status)); + clientConnDestroy(pConn); + return; + } + + SCliMsg* pMsg = pConn->data; + SEpSet* pEpSet = &pMsg->context->epSet; + SRpcMsg rpcMsg; + rpcMsg.ahandle = pMsg->context->ahandle; + rpcMsg.pCont = NULL; char* fqdn = pEpSet->fqdn[pEpSet->inUse]; uint32_t port = pEpSet->port[pEpSet->inUse]; if (status != 0) { // call user fp later tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status)); - uv_close((uv_handle_t*)req->handle, clientFailedCb); + SRpcInfo* pRpc = pMsg->context->pRpc; + (pRpc->cfp)(NULL, &rpcMsg, pEpSet); + uv_close((uv_handle_t*)req->handle, clientDestroy); return; } assert(pConn->stream == req->handle); - // impl later + uv_buf_t wb; + uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { // impl later return NULL; } -static void clientAsyncCb(uv_async_t* handle) { - SCliThrdObj* pThrd = handle->data; - SCliMsg* pMsg = NULL; - pthread_mutex_lock(&pThrd->msgMtx); - if (!QUEUE_IS_EMPTY(&pThrd->msg)) { - queue* head = QUEUE_HEAD(&pThrd->msg); - pMsg = QUEUE_DATA(head, SCliMsg, q); - QUEUE_REMOVE(head); - } - pthread_mutex_unlock(&pThrd->msgMtx); - SEpSet* pEpSet = &pMsg->context->epSet; +static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { + SEpSet* pEpSet = &pMsg->context->epSet; + char* fqdn = pEpSet->fqdn[pEpSet->inUse]; uint32_t port = pEpSet->port[pEpSet->inUse]; + uint64_t el = taosGetTimestampUs() - pMsg->st; + tDebug("msg tran time cost: %" PRIu64 "", el); + SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port); if (conn != NULL) { + conn->data = pMsg; + conn->writeReq->data = conn; + uv_buf_t wb; + uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb); // impl later } else { SCliConn* conn = malloc(sizeof(SCliConn)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + conn->writeReq = malloc(sizeof(uv_write_t)); conn->connReq.data = conn; conn->data = pMsg; - struct sockaddr_in addr; uv_ip4_addr(fqdn, port, &addr); - // handle error in callback if connect error + // handle error in callback if fail to connect uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); + + // SRpcMsg rpcMsg; + // SEpSet* pEpSet = &pMsg->context->epSet; + // SRpcInfo* pRpc = pMsg->context->pRpc; + //// rpcMsg.ahandle = pMsg->context->ahandle; + // rpcMsg.pCont = NULL; + // rpcMsg.ahandle = pMsg->context->ahandle; + // uint64_t el1 = taosGetTimestampUs() - et; + // tError("msg tran back first: time cost: %" PRIu64 "", el1); + // et = taosGetTimestampUs(); + //(pRpc->cfp)(NULL, &rpcMsg, pEpSet); + // uint64_t el2 = taosGetTimestampUs() - et; + // tError("msg tran back second: time cost: %" PRIu64 "", el2); } +} +static void clientAsyncCb(uv_async_t* handle) { + SCliThrdObj* pThrd = handle->data; + SCliMsg* pMsg = NULL; + queue wq; - // SRpcReqContext* pCxt = pMsg->context; + // batch process to avoid to lock/unlock frequently + pthread_mutex_lock(&pThrd->msgMtx); + QUEUE_MOVE(&pThrd->msg, &wq); + pthread_mutex_unlock(&pThrd->msgMtx); - // SRpcHead* pHead = rpcHeadFromCont(pCtx->pCont); - // char* msg = (char*)pHead; - // int len = rpcMsgLenFromCont(pCtx->contLen); - // tmsg_t msgType = pCtx->msgType; - - // impl later + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + pMsg = QUEUE_DATA(h, SCliMsg, q); + clientHandleReq(pMsg, pThrd); + count++; + if (count >= 2) { + tError("send batch size: %d", count); + } + } } static void* clientThread(void* arg) { @@ -142,9 +211,6 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); QUEUE_INIT(&pThrd->msg); pthread_mutex_init(&pThrd->msgMtx, NULL); - - // QUEUE_INIT(&pThrd->clientCache); - pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); @@ -186,6 +252,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* } SCliMsg* msg = malloc(sizeof(SCliMsg)); msg->context = pContext; + msg->st = taosGetTimestampUs(); SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads]; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 0bf39b9985..bc4cc695b0 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -277,10 +277,6 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } return; } - if (terrno != 0) { - // handle err code - } - if (nread != UV_EOF) { tDebug("Read error %s\n", uv_err_name(nread)); } @@ -309,21 +305,23 @@ void uvOnWriteCb(uv_write_t* req, int status) { void uvWorkerAsyncCb(uv_async_t* handle) { SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync); SConn* conn = NULL; - - // opt later + queue wq; + // batch process to avoid to lock/unlock frequently pthread_mutex_lock(&pThrd->connMtx); - if (!QUEUE_IS_EMPTY(&pThrd->conn)) { - queue* head = QUEUE_HEAD(&pThrd->conn); - conn = QUEUE_DATA(head, SConn, queue); - QUEUE_REMOVE(head); - } + QUEUE_MOVE(&pThrd->conn, &wq); pthread_mutex_unlock(&pThrd->connMtx); - if (conn == NULL) { - tError("except occurred, do nothing"); - return; + + while (!QUEUE_IS_EMPTY(&wq)) { + queue* head = QUEUE_HEAD(&wq); + QUEUE_REMOVE(head); + SConn* conn = QUEUE_DATA(head, SConn, queue); + if (conn == NULL) { + tError("except occurred, do nothing"); + return; + } + uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); + uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } - uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); - uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } void uvOnAcceptCb(uv_stream_t* stream, int status) { diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 58fbf6ae85..6339e58560 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -34,8 +34,8 @@ typedef struct { static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->ahandle; - tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, - pMsg->code); + // tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, + // pMsg->code); if (pEpSet) pInfo->epSet = *pEpSet; @@ -57,7 +57,7 @@ static void *sendRequest(void *param) { rpcMsg.contLen = pInfo->msgSize; rpcMsg.ahandle = pInfo; rpcMsg.msgType = 1; - tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); + // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); tsem_wait(&pInfo->rspSem); From a5253f2517a8b50802a0b76fbba24f0c1bb427d1 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Wed, 19 Jan 2022 19:14:25 +0800 Subject: [PATCH 2/6] add client --- source/libs/transport/src/transCli.c | 34 +++++++++++++++++++++------- 1 file changed, 26 insertions(+), 8 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 7e5874d8cd..f197e72ec5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -23,6 +23,8 @@ typedef struct SCliConn { uv_write_t* writeReq; void* data; queue conn; + char spi; + char secured; } SCliConn; typedef struct SCliMsg { SRpcReqContext* context; @@ -47,6 +49,10 @@ typedef struct SClientObj { SCliThrdObj** pThreadObj; } SClientObj; +// conn pool +static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); +static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); + static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); static void clientWriteCb(uv_write_t* req, int status); @@ -93,6 +99,16 @@ static void clientWriteCb(uv_write_t* req, int status) { uv_read_start((uv_stream_t*)pConn->stream, clientAllocrReadBufferCb, clientReadCb); // impl later } + +static void clientWrite(SCliConn* pConn) { + SCliMsg* pMsg = pConn->data; + SRpcHead* pHead = rpcHeadFromCont(pMsg->context->pCont); + int msgLen = rpcMsgLenFromCont(pMsg->context->contLen); + char* msg = (char*)(pHead); + + uv_buf_t wb = uv_buf_init(msg, msgLen); + uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); +} static void clientConnCb(uv_connect_t* req, int status) { // impl later SCliConn* pConn = req->data; @@ -105,8 +121,8 @@ static void clientConnCb(uv_connect_t* req, int status) { SCliMsg* pMsg = pConn->data; SEpSet* pEpSet = &pMsg->context->epSet; SRpcMsg rpcMsg; - rpcMsg.ahandle = pMsg->context->ahandle; - rpcMsg.pCont = NULL; + // rpcMsg.ahandle = pMsg->context->ahandle; + // rpcMsg.pCont = NULL; char* fqdn = pEpSet->fqdn[pEpSet->inUse]; uint32_t port = pEpSet->port[pEpSet->inUse]; @@ -119,15 +135,16 @@ static void clientConnCb(uv_connect_t* req, int status) { return; } assert(pConn->stream == req->handle); - - uv_buf_t wb; - uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); } static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { // impl later + return NULL; } +static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) { + // impl later +} static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { SEpSet* pEpSet = &pMsg->context->epSet; @@ -140,11 +157,12 @@ static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port); if (conn != NULL) { + // impl later conn->data = pMsg; conn->writeReq->data = conn; - uv_buf_t wb; - uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb); - // impl later + clientWrite(conn); + // uv_buf_t wb; + // uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb); } else { SCliConn* conn = malloc(sizeof(SCliConn)); From 6cf90b0a92061a85a7deea655c556c8fbbd1bb36 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 20 Jan 2022 09:49:27 +0800 Subject: [PATCH 3/6] add timer for mq rebalance --- include/common/tmsg.h | 4 + include/common/tmsgdef.h | 1 + source/client/test/clientTests.cpp | 38 +++---- source/dnode/mgmt/impl/src/dndTransport.c | 2 +- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/inc/mndInt.h | 1 + source/dnode/mnode/impl/src/mndSubscribe.c | 114 +++++++++++++++++++-- source/dnode/mnode/impl/src/mnode.c | 25 +++-- source/dnode/vnode/src/vnd/vnodeWrite.c | 9 ++ 9 files changed, 158 insertions(+), 37 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 15daa97dbf..f6bee57c94 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1089,6 +1089,10 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq return buf; } +typedef struct SMqTmrMsg { + int32_t reserved; +} SMqTmrMsg; + typedef struct { int64_t status; } SMVSubscribeRsp; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index bfeba885d0..93adc58eca 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -140,6 +140,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 415d6a57ce..1250402caf 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -148,30 +148,30 @@ TEST(testCase, connect_Test) { // taos_close(pConn); //} // -//TEST(testCase, create_db_Test) { - //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - //assert(pConn != NULL); +TEST(testCase, create_db_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); - //TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); - //if (taos_errno(pRes) != 0) { - //printf("error in create db, reason:%s\n", taos_errstr(pRes)); - //} + TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } - //TAOS_FIELD* pFields = taos_fetch_fields(pRes); - //ASSERT_TRUE(pFields == NULL); + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); - //int32_t numOfFields = taos_num_fields(pRes); - //ASSERT_EQ(numOfFields, 0); + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); - //taos_free_result(pRes); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database abc1 vgroups 4"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + taos_close(pConn); +} - //pRes = taos_query(pConn, "create database abc1 vgroups 4"); - //if (taos_errno(pRes) != 0) { - //printf("error in create db, reason:%s\n", taos_errstr(pRes)); - //} - //taos_close(pConn); -//} -// //TEST(testCase, create_dnode_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index ab4ae4ac53..f4fda75bd8 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -100,7 +100,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 45759117b5..ed88b42d39 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -354,6 +354,7 @@ typedef struct SMqSubscribeObj { char key[TSDB_SUBSCRIBE_KEY_LEN]; int32_t epoch; //TODO: replace with priority queue + int32_t nextConsumerIdx; SArray* availConsumer; // SArray (consumerId) SArray* assigned; // SArray SArray* unassignedConsumer; // SArray diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index d2107b9d07..29ccd43622 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -80,6 +80,7 @@ typedef struct SMnode { SReplica replicas[TSDB_MAX_REPLICA]; tmr_h timer; tmr_h transTimer; + tmr_h mqTimer; char *path; SMnodeCfg cfg; int64_t checkTime; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 52f33fa89c..7b95e93257 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -13,13 +13,13 @@ * along with this program. If not, see . */ +#include "mndSubscribe.h" #include "mndConsumer.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" #include "mndShow.h" #include "mndStb.h" -#include "mndSubscribe.h" #include "mndTopic.h" #include "mndTrans.h" #include "mndUser.h" @@ -40,6 +40,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); +static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); + +static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, + SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic); int32_t mndInitSubscribe(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_SUBSCRIBE, @@ -54,9 +58,90 @@ int32_t mndInitSubscribe(SMnode *pMnode) { /*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/ /*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/ mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); + mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); return sdbSetTable(pMnode->pSdb, table); } +static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { + int i = 0; + while (key[i] != ':') { + i++; + } + key[i] = 0; + *topic = strdup(key); + key[i] = ':'; + *cgroup = strdup(&key[i + 1]); + return 0; +} + +static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + SMqSubscribeObj *pSub = NULL; + void *pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub); + int sz; + while (pIter != NULL) { + if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) { + char *topic = NULL; + char *cgroup = NULL; + mndSplitSubscribeKey(pSub->key, &topic, &cgroup); + + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); + + // create trans + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); + for (int i = 0; i < sz; i++) { + int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx); + SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg); + pCEp->consumerId = consumerId; + taosArrayPush(pSub->assigned, pCEp); + pSub->nextConsumerIdx++; + + // build msg + SMqSetCVgReq req = { + .vgId = pCEp->vgId, + .consumerId = consumerId, + }; + strcpy(req.cGroup, cgroup); + strcpy(req.topicName, topic); + strcpy(req.sql, pTopic->sql); + strcpy(req.logicalPlan, pTopic->logicalPlan); + strcpy(req.physicalPlan, pTopic->physicalPlan); + int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); + void *reqStr = malloc(tlen); + if (reqStr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + void *abuf = reqStr; + tEncodeSMqSetCVgReq(abuf, &req); + + // persist msg + STransAction action = {0}; + action.epSet = pCEp->epset; + action.pCont = reqStr; + action.contLen = tlen; + action.msgType = TDMT_VND_MQ_SET_CONN; + mndTransAppendRedoAction(pTrans, &action); + + // persist raw + SSdbRaw *pRaw = mndSubActionEncode(pSub); + mndTransAppendRedolog(pTrans, pRaw); + + tfree(topic); + tfree(cgroup); + } + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + } + mndReleaseTopic(pMnode, pTopic); + mndTransDrop(pTrans); + } + pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub); + } + return 0; +} + static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { SMqConsumerEp CEp; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; @@ -76,7 +161,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas } static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, - SMqConsumerTopic *pConsumerTopic) { + SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic) { int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); for (int32_t i = 0; i < sz; i++) { int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i); @@ -86,7 +171,10 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume .consumerId = pConsumer->consumerId, }; strcpy(req.cGroup, pConsumer->cgroup); - strcpy(req.topicName, pConsumerTopic->name); + strcpy(req.topicName, pTopic->name); + strcpy(req.sql, pTopic->sql); + strcpy(req.logicalPlan, pTopic->logicalPlan); + strcpy(req.physicalPlan, pTopic->physicalPlan); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); void *reqStr = malloc(tlen); if (reqStr == NULL) { @@ -94,7 +182,7 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume return -1; } void *abuf = reqStr; - tEncodeSMqSetCVgReq(abuf, &req); + tEncodeSMqSetCVgReq(&abuf, &req); STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgObj); @@ -278,20 +366,20 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { int i = 0, j = 0; while (i < newTopicNum || j < oldTopicNum) { - char* newTopicName = NULL; - char* oldTopicName = NULL; + char *newTopicName = NULL; + char *oldTopicName = NULL; if (i >= newTopicNum) { // encode unset topic msg to all vnodes related to that topic - oldTopicName = ((SMqConsumerTopic*)taosArrayGet(oldSub, j))->name; + oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; j++; } else if (j >= oldTopicNum) { newTopicName = taosArrayGet(newSub, i); i++; } else { newTopicName = taosArrayGet(newSub, i); - oldTopicName = ((SMqConsumerTopic*)taosArrayGet(oldSub, j))->name; + oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; - int comp = compareLenPrefixedStr(newTopicName, oldTopicName); + int comp = compareLenPrefixedStr(newTopicName, oldTopicName); if (comp == 0) { // do nothing oldTopicName = newTopicName = NULL; @@ -374,19 +462,25 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { // set unassigned vg mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); } + taosArrayPush(pSub->availConsumer, &consumerId); + + //TODO: no need SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub); taosArrayPush(pConsumer->topics, pConsumerTopic); if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) { int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); // send setmsg to vnode - if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic) < 0) { + if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic) < 0) { // TODO return -1; } } taosArrayDestroy(pConsumerTopic->pVgInfo); free(pConsumerTopic); + SSdbRaw *pRaw = mndSubActionEncode(pSub); + /*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/ + mndTransAppendRedolog(pTrans, pRaw); #if 0 SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); if (pGroup == NULL) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index cab30702ea..e57ee3eabc 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -18,6 +18,7 @@ #include "mndAuth.h" #include "mndBnode.h" #include "mndCluster.h" +#include "mndConsumer.h" #include "mndDb.h" #include "mndDnode.h" #include "mndFunc.h" @@ -27,6 +28,7 @@ #include "mndShow.h" #include "mndSnode.h" #include "mndStb.h" +#include "mndSubscribe.h" #include "mndSync.h" #include "mndTelem.h" #include "mndTopic.h" @@ -69,15 +71,15 @@ static void mndTransReExecute(void *param, void *tmrId) { taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer); } -static void mndCalMqRebalance(void* param, void* tmrId) { - SMnode* pMnode = param; +static void mndCalMqRebalance(void *param, void *tmrId) { + SMnode *pMnode = param; if (mndIsMaster(pMnode)) { - // iterate cgroup, cal rebalance - // sync with raft - // write sdb + SMqTmrMsg *pMsg = rpcMallocCont(sizeof(SMqTmrMsg)); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pMsg, .contLen = sizeof(SMqTmrMsg)}; + pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); } - taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->transTimer); + taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer); } static int32_t mndInitTimer(SMnode *pMnode) { @@ -95,6 +97,11 @@ static int32_t mndInitTimer(SMnode *pMnode) { return -1; } + if (taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + return 0; } @@ -102,6 +109,8 @@ static void mndCleanupTimer(SMnode *pMnode) { if (pMnode->timer != NULL) { taosTmrStop(pMnode->transTimer); pMnode->transTimer = NULL; + taosTmrStop(pMnode->mqTimer); + pMnode->mqTimer = NULL; taosTmrCleanUp(pMnode->timer); pMnode->timer = NULL; } @@ -171,6 +180,8 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1; if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1; if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; @@ -377,7 +388,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { return NULL; } - if (pRpcMsg->msgType != TDMT_MND_TRANS) { + if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER) { SRpcConnInfo connInfo = {0}; if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { taosFreeQitem(pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index bb863d6ed0..ea69f4c870 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -108,6 +108,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: handle error } break; + case TDMT_VND_MQ_SET_CONN: { + char* reqStr = ptr; + SMqSetCVgReq req; + /*tDecodeSMqSetCVgReq(reqStr, &req);*/ + // create topic if not exist + // convert to task + // write mq meta + } + break; default: ASSERT(0); break; From bc1d8f6a7e1d07d21cdf2e3809993a37f5306571 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 20 Jan 2022 10:12:06 +0800 Subject: [PATCH 4/6] [modify insert] --- tests/test/c/create_table.c | 87 +++++++++++++++++++++++++++---------- 1 file changed, 65 insertions(+), 22 deletions(-) diff --git a/tests/test/c/create_table.c b/tests/test/c/create_table.c index 6df31c7b8b..d387bf483b 100644 --- a/tests/test/c/create_table.c +++ b/tests/test/c/create_table.c @@ -28,9 +28,14 @@ int32_t numOfThreads = 1; int64_t numOfTables = 200000; int32_t createTable = 1; int32_t insertData = 0; -int32_t batchNum = 100; +int32_t batchNumOfTbl = 100; +int32_t batchNumOfRow = 1; int32_t numOfVgroups = 2; int32_t showTablesFlag = 0; +int32_t queryFlag = 0; + +int64_t startTimestamp = 1640966400000; // 2020-01-01 00:00:00.000 + typedef struct { int64_t tableBeginIndex; @@ -167,7 +172,7 @@ void showTables() { void *threadFunc(void *param) { SThreadInfo *pInfo = (SThreadInfo *)param; - char *qstr = malloc(2000 * 1000); + char *qstr = malloc(batchNumOfTbl * batchNumOfRow * 128); int32_t code = 0; TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); @@ -192,7 +197,7 @@ void *threadFunc(void *param) { // batch = MIN(batch, batchNum); int32_t len = sprintf(qstr, "create table"); - for (int32_t i = 0; i < batchNum;) { + for (int32_t i = 0; i < batchNumOfTbl;) { len += sprintf(qstr + len, " %s_t%" PRId64 " using %s tags(%" PRId64 ")", stbName, t, stbName, t); t++; i++; @@ -205,7 +210,7 @@ void *threadFunc(void *param) { TAOS_RES *pRes = taos_query(con, qstr); code = taos_errno(pRes); if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { - pError("failed to create table t%" PRId64 ", code: %d, reason:%s", t, code, tstrerror(code)); + pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code)); } taos_free_result(pRes); int64_t endTs = taosGetTimestampUs(); @@ -227,31 +232,49 @@ void *threadFunc(void *param) { if (insertData) { int64_t curMs = 0; int64_t beginMs = taosGetTimestampMs(); + pInfo->startMs = beginMs; + int64_t t = pInfo->tableBeginIndex; + for (; t <= pInfo->tableEndIndex;) { + // int64_t batch = (pInfo->tableEndIndex - t); + // batch = MIN(batch, batchNum); - pInfo->startMs = taosGetTimestampMs(); - for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { - int64_t batch = (pInfo->tableEndIndex - t); - batch = MIN(batch, batchNum); - - int32_t len = sprintf(qstr, "insert into"); - for (int32_t i = 0; i < batch; ++i) { - len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i); - } + int32_t len = sprintf(qstr, "insert into "); + + for (int32_t i = 0; i < batchNumOfTbl;) { + int64_t ts = startTimestamp; + len += sprintf(qstr + len, "%s_t%" PRId64 " values ", stbName, t); + for (int32_t j = 0; j < batchNumOfRow; j++) { + len += sprintf(qstr + len, "(%" PRId64 ", 6666) ", ts++); + } + + t++; + i++; + if (t > pInfo->tableEndIndex) { + break; + } + } + int64_t startTs = taosGetTimestampUs(); TAOS_RES *pRes = taos_query(con, qstr); code = taos_errno(pRes); - if (code != 0) { - pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code)); + if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { + pError("failed to insert %s_t%" PRId64 ", reason:%s", stbName, t, tstrerror(code)); } taos_free_result(pRes); + int64_t endTs = taosGetTimestampUs(); + int64_t delay = endTs - startTs; + // printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay); + if (delay > pInfo->maxDelay) pInfo->maxDelay = delay; + if (delay < pInfo->minDelay) pInfo->minDelay = delay; curMs = taosGetTimestampMs(); if (curMs - beginMs > 10000) { + beginMs = curMs; + // printf("==== tableBeginIndex: %"PRId64", t: %"PRId64"\n", pInfo->tableBeginIndex, t); printInsertProgress(pInfo, t); } - t += (batch - 1); } - printInsertProgress(pInfo, pInfo->tableEndIndex); + printInsertProgress(pInfo, t); } taos_close(con); @@ -280,9 +303,13 @@ void printHelp() { printf("%s%s\n", indent, "-i"); printf("%s%s%s%d\n", indent, indent, "insertData, default is ", insertData); printf("%s%s\n", indent, "-b"); - printf("%s%s%s%d\n", indent, indent, "batchNum, default is ", batchNum); + printf("%s%s%s%d\n", indent, indent, "batchNumOfTbl, default is ", batchNumOfTbl); printf("%s%s\n", indent, "-w"); printf("%s%s%s%d\n", indent, indent, "showTablesFlag, default is ", showTablesFlag); + printf("%s%s\n", indent, "-q"); + printf("%s%s%s%d\n", indent, indent, "queryFlag, default is ", queryFlag); + printf("%s%s\n", indent, "-l"); + printf("%s%s%s%d\n", indent, indent, "batchNumOfRow, default is ", batchNumOfRow); exit(EXIT_SUCCESS); } @@ -309,10 +336,15 @@ void parseArgument(int32_t argc, char *argv[]) { } else if (strcmp(argv[i], "-i") == 0) { insertData = atoi(argv[++i]); } else if (strcmp(argv[i], "-b") == 0) { - batchNum = atoi(argv[++i]); + batchNumOfTbl = atoi(argv[++i]); + } else if (strcmp(argv[i], "-l") == 0) { + batchNumOfRow = atoi(argv[++i]); } else if (strcmp(argv[i], "-w") == 0) { showTablesFlag = atoi(argv[++i]); + } else if (strcmp(argv[i], "-q") == 0) { + queryFlag = atoi(argv[++i]); } else { + pPrint("%s unknow para: %s %s", GREEN, argv[++i], NC); } } @@ -324,8 +356,10 @@ void parseArgument(int32_t argc, char *argv[]) { pPrint("%s numOfVgroups:%d %s", GREEN, numOfVgroups, NC); pPrint("%s createTable:%d %s", GREEN, createTable, NC); pPrint("%s insertData:%d %s", GREEN, insertData, NC); - pPrint("%s batchNum:%d %s", GREEN, batchNum, NC); + pPrint("%s batchNumOfTbl:%d %s", GREEN, batchNumOfTbl, NC); + pPrint("%s batchNumOfRow:%d %s", GREEN, batchNumOfRow, NC); pPrint("%s showTablesFlag:%d %s", GREEN, showTablesFlag, NC); + pPrint("%s queryFlag:%d %s", GREEN, queryFlag, NC); pPrint("%s start create table performace test %s", GREEN, NC); } @@ -338,8 +372,15 @@ int32_t main(int32_t argc, char *argv[]) { return 0; } - createDbAndStb(); + if (queryFlag) { + //selectRowsFromTable(); + return 0; + } + if (createTable) { + createDbAndStb(); + } + pPrint("%d threads are spawned to create %" PRId64 " tables", numOfThreads, numOfTables); pthread_attr_t thattr; @@ -396,9 +437,11 @@ int32_t main(int32_t argc, char *argv[]) { insertDataSpeed += pInfo[i].insertDataSpeed; } - pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64 + if (createTable) { + pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64 "us %s", GREEN, numOfTables, createTableSpeed, numOfThreads, maxDelay, minDelay, NC); + } if (insertData) { pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed, From 5d77fab535473d59f1f6fbca47d341adb9a8fd67 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 20 Jan 2022 10:45:15 +0800 Subject: [PATCH 5/6] add msg for vnode consume --- include/common/tmsg.h | 19 +++++++ include/common/tmsgdef.h | 1 + source/dnode/mnode/impl/src/mndSubscribe.c | 7 ++- source/dnode/vnode/CMakeLists.txt | 1 + source/dnode/vnode/inc/tq.h | 60 +++++++++++++++------- source/dnode/vnode/src/inc/vnd.h | 2 +- source/dnode/vnode/src/vnd/vnodeQuery.c | 2 + source/dnode/vnode/src/vnd/vnodeWrite.c | 30 ++++++++++- 8 files changed, 99 insertions(+), 23 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f6bee57c94..1f7ec3be1e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1552,6 +1552,25 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { return buf; } +typedef struct SMqCVConsumeReq { + int64_t reqId; + int64_t offset; + int64_t clientId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cgroup[TSDB_CONSUMER_GROUP_LEN]; +} SMqCVConsumeReq; + +typedef struct SMqCVConsumeRsp { + int64_t reqId; + int64_t clientId; + int64_t committedOffset; + int64_t receiveOffset; + int64_t rspOffset; + int32_t skipLogNum; + int32_t bodyLen; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char body[]; +} SMqCvConsumeRsp; #ifdef __cplusplus } diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 93adc58eca..ed7fdea2a8 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -175,6 +175,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_SCHEDULE_DATA_SINK, "vnode-schedule-data-sink", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) + TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp) // Requests handled by QNODE diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7b95e93257..e786a8972c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -464,7 +464,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { } taosArrayPush(pSub->availConsumer, &consumerId); - //TODO: no need + // TODO: no need SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub); taosArrayPush(pConsumer->topics, pConsumerTopic); @@ -542,7 +542,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { return 0; } -static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) { + mndTransProcessRsp(pRsp); + return 0; +} static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 9dc4bb1873..75734d2d29 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -25,6 +25,7 @@ target_link_libraries( PUBLIC bdb PUBLIC tfs PUBLIC wal + PUBLIC scheduler PUBLIC qworker ) diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 8089826a80..1a41a02673 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -18,14 +18,15 @@ #include "common.h" #include "mallocator.h" +#include "meta.h" #include "os.h" +#include "scheduler.h" #include "taoserror.h" -#include "tmsg.h" #include "tlist.h" +#include "tmsg.h" #include "trpc.h" #include "ttimer.h" #include "tutil.h" -#include "meta.h" #ifdef __cplusplus extern "C" { @@ -150,15 +151,40 @@ typedef struct STqListHandle { } STqList; typedef struct STqGroup { - int64_t clientId; - int64_t cgId; - void* ahandle; - int32_t topicNum; + int64_t clientId; + int64_t cgId; + void* ahandle; + int32_t topicNum; STqList* head; SList* topicList; // SList STqRspHandle rspHandle; } STqGroup; +typedef struct STqTaskItem { + int32_t status; + void* dst; + SSubQueryMsg* pMsg; +} STqTaskItem; + +// new version +typedef struct STqBuffer { + int64_t firstOffset; + int64_t lastOffset; + STqTaskItem output[TQ_BUFFER_SIZE]; +} STqBuffer; + +typedef struct STqClientHandle { + int64_t clientId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cGroup[TSDB_TOPIC_FNAME_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + int64_t committedOffset; + int64_t currentOffset; + STqBuffer buffer; +} STqClientHandle; + typedef struct STqQueryMsg { STqMsgItem* item; struct STqQueryMsg* next; @@ -253,7 +279,7 @@ typedef struct STqMetaStore { // a table head STqMetaList* unpersistHead; // topics that are not connectted - STqMetaList* unconnectTopic; + STqMetaList* unconnectTopic; // TODO:temporaral use, to be replaced by unified tfile int fileFd; @@ -316,24 +342,22 @@ const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**); static int tqQueryExecuting(int32_t status) { return status; } typedef struct STqReadHandle { - int64_t ver; - SSubmitMsg* pMsg; - SSubmitBlk* pBlock; + int64_t ver; + SSubmitMsg* pMsg; + SSubmitBlk* pBlock; SSubmitMsgIter msgIter; SSubmitBlkIter blkIter; - SMeta* pMeta; + SMeta* pMeta; } STqReadHandle; typedef struct SSubmitBlkScanInfo { - } SSubmitBlkScanInfo; -STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg); -bool tqNextDataBlock(STqReadHandle* pHandle); -int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo *pBlockInfo); -//return SArray -SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList); -//int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status); +STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg); +bool tqNextDataBlock(STqReadHandle* pHandle); +int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo); +// return SArray +SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList); #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 6b99b71b62..be32ed6829 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -177,4 +177,4 @@ bool vmaIsFull(SVMemAllocator* pVMA); } #endif -#endif /*_TD_VNODE_DEF_H_*/ \ No newline at end of file +#endif /*_TD_VNODE_DEF_H_*/ diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 2e9c77c59b..729d64f8b3 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -57,6 +57,8 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg); case TDMT_VND_TABLE_META: return vnodeGetTableMeta(pVnode, pMsg, pRsp); + case TDMT_VND_CONSUME: + return 0; default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index ea69f4c870..2f3a4d5409 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -14,6 +14,7 @@ */ #include "vnd.h" +#include "tq.h" int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { switch (pMsg->msgType) { @@ -111,9 +112,34 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { case TDMT_VND_MQ_SET_CONN: { char* reqStr = ptr; SMqSetCVgReq req; - /*tDecodeSMqSetCVgReq(reqStr, &req);*/ - // create topic if not exist + tDecodeSMqSetCVgReq(reqStr, &req); + STqClientHandle* pHandle = calloc(sizeof(STqClientHandle), 1); + if (pHandle == NULL) { + // TODO: handle error + } + strcpy(pHandle->topicName, req.topicName); + strcpy(pHandle->cGroup, req.cGroup); + strcpy(pHandle->sql, req.sql); + strcpy(pHandle->logicalPlan, req.logicalPlan); + strcpy(pHandle->physicalPlan, req.physicalPlan); + SArray *pArray; + //TODO: deserialize to SQueryDag + SQueryDag *pDag; // convert to task + if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) { + // TODO: handle error + } + ASSERT(taosArrayGetSize(pArray) == 0); + STaskInfo *pInfo = taosArrayGet(pArray, 0); + SArray* pTasks; + schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); + pHandle->buffer.firstOffset = -1; + pHandle->buffer.lastOffset = -1; + for (int i = 0; i < TQ_BUFFER_SIZE; i++) { + SSubQueryMsg* pMsg = taosArrayGet(pTasks, i); + pHandle->buffer.output[i].pMsg = pMsg; + pHandle->buffer.output[i].status = 0; + } // write mq meta } break; From a0a61fa91dbde00c88d84abeb6130181cb6172d8 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Wed, 19 Jan 2022 21:51:23 -0500 Subject: [PATCH 6/6] TD-13120 SELECT statement data structure definition --- include/libs/function/functionMgt.h | 66 +++++ include/nodes/nodes.h | 271 +++++++++++++++++++++ source/CMakeLists.txt | 3 +- source/libs/function/CMakeLists.txt | 2 +- source/libs/function/inc/functionMgtInt.h | 76 ++++++ source/libs/function/inc/taggfunction.h | 4 + source/libs/function/src/buildins.c | 21 ++ source/libs/function/src/functionMgt.c | 80 ++++++ source/libs/function/src/taggfunction.c | 6 + source/libs/planner/src/logicPlan.c | 2 +- source/libs/planner/src/physicalPlanJson.c | 32 ++- source/nodes/CMakeLists.txt | 15 ++ source/nodes/src/nodesClone.c | 20 ++ source/nodes/src/nodesCode.c | 24 ++ source/nodes/src/nodesEqual.c | 141 +++++++++++ source/nodes/src/nodesTraverse.c | 83 +++++++ source/nodes/src/nodesUtil.c | 24 ++ source/nodes/test/CMakeLists.txt | 19 ++ source/nodes/test/nodesTest.cpp | 25 ++ 19 files changed, 907 insertions(+), 7 deletions(-) create mode 100644 include/libs/function/functionMgt.h create mode 100644 include/nodes/nodes.h create mode 100644 source/libs/function/inc/functionMgtInt.h create mode 100644 source/libs/function/src/buildins.c create mode 100644 source/libs/function/src/functionMgt.c create mode 100644 source/nodes/CMakeLists.txt create mode 100644 source/nodes/src/nodesClone.c create mode 100644 source/nodes/src/nodesCode.c create mode 100644 source/nodes/src/nodesEqual.c create mode 100644 source/nodes/src/nodesTraverse.c create mode 100644 source/nodes/src/nodesUtil.c create mode 100644 source/nodes/test/CMakeLists.txt create mode 100644 source/nodes/test/nodesTest.cpp diff --git a/include/libs/function/functionMgt.h b/include/libs/function/functionMgt.h new file mode 100644 index 0000000000..3c365335be --- /dev/null +++ b/include/libs/function/functionMgt.h @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_FUNCTION_MGT_H_ +#define _TD_FUNCTION_MGT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "nodes.h" + +struct SQLFunctionCtx; +struct SResultRowEntryInfo; +struct STimeWindow; + +typedef struct SFuncExecEnv { + int32_t calcMemSize; +} SFuncExecEnv; + +typedef void* FuncMgtHandle; +typedef bool (*FExecGetEnv)(SFunctionNode* pFunc, SFuncExecEnv* pEnv); +typedef bool (*FExecInit)(struct SQLFunctionCtx *pCtx, struct SResultRowEntryInfo* pResultCellInfo); +typedef void (*FExecProcess)(struct SQLFunctionCtx *pCtx); +typedef void (*FExecFinalize)(struct SQLFunctionCtx *pCtx); + +typedef struct SFuncExecFuncs { + FExecGetEnv getEnv; + FExecInit init; + FExecProcess process; + FExecFinalize finalize; +} SFuncExecFuncs; + +int32_t fmFuncMgtInit(); + +int32_t fmGetHandle(FuncMgtHandle* pHandle); + +int32_t fmGetFuncId(FuncMgtHandle handle, const char* name); +int32_t fmGetFuncResultType(FuncMgtHandle handle, SFunctionNode* pFunc); +bool fmIsAggFunc(int32_t funcId); +bool fmIsStringFunc(int32_t funcId); +bool fmIsTimestampFunc(int32_t funcId); +bool fmIsTimelineFunc(int32_t funcId); +bool fmIsTimeorderFunc(int32_t funcId); +bool fmIsNonstandardSQLFunc(int32_t funcId); +int32_t fmFuncScanType(int32_t funcId); + +int32_t fmGetFuncExecFuncs(FuncMgtHandle handle, int32_t funcId, SFuncExecFuncs* pFpSet); + +#ifdef __cplusplus +} +#endif + +#endif // _TD_FUNCTION_MGT_H_ diff --git a/include/nodes/nodes.h b/include/nodes/nodes.h new file mode 100644 index 0000000000..e6b41adf14 --- /dev/null +++ b/include/nodes/nodes.h @@ -0,0 +1,271 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_NODES_H_ +#define _TD_NODES_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "tarray.h" +#include "tdef.h" + +typedef enum ENodeType { + QUERY_NODE_COLUMN = 1, + QUERY_NODE_VALUE, + QUERY_NODE_OPERATOR, + QUERY_NODE_LOGIC_CONDITION, + QUERY_NODE_IS_NULL_CONDITION, + QUERY_NODE_FUNCTION, + QUERY_NODE_REAL_TABLE, + QUERY_NODE_TEMP_TABLE, + QUERY_NODE_JOIN_TABLE, + QUERY_NODE_GROUPING_SET, + QUERY_NODE_ORDER_BY_EXPR, + QUERY_NODE_STATE_WINDOW, + QUERY_NODE_SESSION_WINDOW, + QUERY_NODE_INTERVAL_WINDOW, + + QUERY_NODE_SET_OPERATOR, + QUERY_NODE_SELECT_STMT +} ENodeType; + +/** + * The first field of a node of any type is guaranteed to be the ENodeType. + * Hence the type of any node can be gotten by casting it to SNode. + */ +typedef struct SNode { + ENodeType type; +} SNode; + +#define nodeType(nodeptr) (((const SNode*)(nodeptr))->type) + +typedef struct SDataType { + uint8_t type; + uint8_t precision; + uint8_t scale; + int32_t bytes; +} SDataType; + +typedef struct SExprNode { + ENodeType nodeType; + SDataType resType; + char aliasName[TSDB_COL_NAME_LEN]; +} SExprNode; + +typedef enum EColumnType { + COLUMN_TYPE_COLUMN = 1, + COLUMN_TYPE_TAG +} EColumnType; + +typedef struct SColumnNode { + SExprNode node; // QUERY_NODE_COLUMN + int16_t colId; + EColumnType colType; // column or tag + char dbName[TSDB_DB_NAME_LEN]; + char tableName[TSDB_TABLE_NAME_LEN]; + char colName[TSDB_COL_NAME_LEN]; +} SColumnNode; + +typedef struct SValueNode { + SExprNode type; // QUERY_NODE_VALUE + char* literal; +} SValueNode; + +typedef enum EOperatorType { + // arithmetic operator + OP_TYPE_ADD = 1, + OP_TYPE_SUB, + OP_TYPE_MULTI, + OP_TYPE_DIV, + OP_TYPE_MOD, + + // comparison operator + OP_TYPE_GREATER_THAN, + OP_TYPE_GREATER_EQUAL, + OP_TYPE_LOWER_THAN, + OP_TYPE_LOWER_EQUAL, + OP_TYPE_EQUAL, + OP_TYPE_NOT_EQUAL, + OP_TYPE_IN, + OP_TYPE_NOT_IN, + OP_TYPE_LIKE, + OP_TYPE_NOT_LIKE, + OP_TYPE_MATCH, + OP_TYPE_NMATCH, + + // json operator + OP_TYPE_JSON_GET_VALUE, + OP_TYPE_JSON_CONTAINS +} EOperatorType; + +typedef struct SOperatorNode { + SExprNode type; // QUERY_NODE_OPERATOR + EOperatorType opType; + SNode* pLeft; + SNode* pRight; +} SOperatorNode; + +typedef enum ELogicConditionType { + LOGIC_COND_TYPE_AND, + LOGIC_COND_TYPE_OR, + LOGIC_COND_TYPE_NOT, +} ELogicConditionType; + +typedef struct SLogicConditionNode { + ENodeType type; // QUERY_NODE_LOGIC_CONDITION + ELogicConditionType condType; + SArray* pParameterList; +} SLogicConditionNode; + +typedef struct SIsNullCondNode { + ENodeType type; // QUERY_NODE_IS_NULL_CONDITION + SNode* pExpr; + bool isNot; +} SIsNullCondNode; + +typedef struct SFunctionNode { + SExprNode type; // QUERY_NODE_FUNCTION + char functionName[TSDB_FUNC_NAME_LEN]; + int32_t funcId; + SArray* pParameterList; // SNode +} SFunctionNode; + +typedef struct STableNode { + ENodeType type; + char tableName[TSDB_TABLE_NAME_LEN]; + char tableAliasName[TSDB_COL_NAME_LEN]; +} STableNode; + +typedef struct SRealTableNode { + STableNode type; // QUERY_NODE_REAL_TABLE + char dbName[TSDB_DB_NAME_LEN]; +} SRealTableNode; + +typedef struct STempTableNode { + STableNode type; // QUERY_NODE_TEMP_TABLE + SNode* pSubquery; +} STempTableNode; + +typedef enum EJoinType { + JOIN_TYPE_INNER = 1 +} EJoinType; + +typedef struct SJoinTableNode { + STableNode type; // QUERY_NODE_JOIN_TABLE + EJoinType joinType; + SNode* pLeft; + SNode* pRight; + SNode* pOnCond; +} SJoinTableNode; + +typedef enum EGroupingSetType { + GP_TYPE_NORMAL = 1 +} EGroupingSetType; + +typedef struct SGroupingSetNode { + ENodeType type; // QUERY_NODE_GROUPING_SET + EGroupingSetType groupingSetType; + SArray* pParameterList; +} SGroupingSetNode; + +typedef enum EOrder { + ORDER_ASC = 1, + ORDER_DESC +} EOrder; + +typedef enum ENullOrder { + NULL_ORDER_FIRST = 1, + NULL_ORDER_LAST +} ENullOrder; + +typedef struct SOrderByExprNode { + ENodeType type; // QUERY_NODE_ORDER_BY_EXPR + SNode* pExpr; + EOrder order; + ENullOrder nullOrder; +} SOrderByExprNode; + +typedef struct SLimitInfo { + uint64_t limit; + uint64_t offset; +} SLimitInfo; + +typedef struct SStateWindowNode { + ENodeType type; // QUERY_NODE_STATE_WINDOW + SNode* pCol; +} SStateWindowNode; + +typedef struct SSessionWindowNode { + ENodeType type; // QUERY_NODE_SESSION_WINDOW + int64_t gap; // gap between two session window(in microseconds) + SNode* pCol; +} SSessionWindowNode; + +typedef struct SIntervalWindowNode { + ENodeType type; // QUERY_NODE_INTERVAL_WINDOW + int64_t interval; + int64_t sliding; + int64_t offset; +} SIntervalWindowNode; + +typedef struct SSelectStmt { + ENodeType type; // QUERY_NODE_SELECT_STMT + bool isDistinct; + SArray* pProjectionList; // SNode + SNode* pFromTable; + SNode* pWhereCond; + SArray* pPartitionByList; // SNode + SNode* pWindowClause; + SArray* pGroupByList; // SGroupingSetNode + SArray* pOrderByList; // SOrderByExprNode + SLimitInfo limit; + SLimitInfo slimit; +} SSelectStmt; + +typedef enum ESetOperatorType { + SET_OP_TYPE_UNION_ALL = 1 +} ESetOperatorType; + +typedef struct SSetOperator { + ENodeType type; // QUERY_NODE_SET_OPERATOR + ESetOperatorType opType; + SNode* pLeft; + SNode* pRight; +} SSetOperator; + +typedef bool (*FQueryNodeWalker)(SNode* pNode, void* pContext); + +bool nodeArrayWalker(SArray* pArray, FQueryNodeWalker walker, void* pContext); +bool nodeTreeWalker(SNode* pNode, FQueryNodeWalker walker, void* pContext); + +bool stmtWalker(SNode* pNode, FQueryNodeWalker walker, void* pContext); + +bool nodeEqual(const SNode* a, const SNode* b); + +void cloneNode(const SNode* pNode); + +int32_t nodeToString(const SNode* pNode, char** pStr, int32_t* pLen); +int32_t stringToNode(const char* pStr, SNode** pNode); + +bool isTimeorderQuery(const SNode* pQuery); +bool isTimelineQuery(const SNode* pQuery); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_NODES_H_*/ diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index 2833b329a7..fbf045b99c 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -3,4 +3,5 @@ add_subdirectory(util) add_subdirectory(common) add_subdirectory(libs) add_subdirectory(client) -add_subdirectory(dnode) \ No newline at end of file +add_subdirectory(dnode) +add_subdirectory(nodes) \ No newline at end of file diff --git a/source/libs/function/CMakeLists.txt b/source/libs/function/CMakeLists.txt index a4aa7025e4..9f700dbb3c 100644 --- a/source/libs/function/CMakeLists.txt +++ b/source/libs/function/CMakeLists.txt @@ -8,5 +8,5 @@ target_include_directories( target_link_libraries( function - PRIVATE os util common + PRIVATE os util common nodes ) \ No newline at end of file diff --git a/source/libs/function/inc/functionMgtInt.h b/source/libs/function/inc/functionMgtInt.h new file mode 100644 index 0000000000..9b9d82f0e1 --- /dev/null +++ b/source/libs/function/inc/functionMgtInt.h @@ -0,0 +1,76 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_FUNCTION_MGT_INT_H_ +#define _TD_FUNCTION_MGT_INT_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +#include "functionMgt.h" + +#define FUNC_MGT_DATA_TYPE_MASK(n) (1 << n) + +#define FUNC_MGT_DATA_TYPE_NULL 0 +#define FUNC_MGT_DATA_TYPE_BOOL FUNC_MGT_DATA_TYPE_MASK(0) +#define FUNC_MGT_DATA_TYPE_TINYINT FUNC_MGT_DATA_TYPE_MASK(1) +#define FUNC_MGT_DATA_TYPE_SMALLINT FUNC_MGT_DATA_TYPE_MASK(2) +#define FUNC_MGT_DATA_TYPE_INT FUNC_MGT_DATA_TYPE_MASK(3) +#define FUNC_MGT_DATA_TYPE_BIGINT FUNC_MGT_DATA_TYPE_MASK(4) +#define FUNC_MGT_DATA_TYPE_FLOAT FUNC_MGT_DATA_TYPE_MASK(5) +#define FUNC_MGT_DATA_TYPE_DOUBLE FUNC_MGT_DATA_TYPE_MASK(6) +#define FUNC_MGT_DATA_TYPE_BINARY FUNC_MGT_DATA_TYPE_MASK(7) +#define FUNC_MGT_DATA_TYPE_TIMESTAMP FUNC_MGT_DATA_TYPE_MASK(8) +#define FUNC_MGT_DATA_TYPE_NCHAR FUNC_MGT_DATA_TYPE_MASK(9) +#define FUNC_MGT_DATA_TYPE_UTINYINT FUNC_MGT_DATA_TYPE_MASK(10) +#define FUNC_MGT_DATA_TYPE_USMALLINT FUNC_MGT_DATA_TYPE_MASK(11) +#define FUNC_MGT_DATA_TYPE_UINT FUNC_MGT_DATA_TYPE_MASK(12) +#define FUNC_MGT_DATA_TYPE_UBIGINT FUNC_MGT_DATA_TYPE_MASK(13) +#define FUNC_MGT_DATA_TYPE_VARCHAR FUNC_MGT_DATA_TYPE_MASK(14) +#define FUNC_MGT_DATA_TYPE_VARBINARY FUNC_MGT_DATA_TYPE_MASK(15) +#define FUNC_MGT_DATA_TYPE_JSON FUNC_MGT_DATA_TYPE_MASK(16) +#define FUNC_MGT_DATA_TYPE_DECIMAL FUNC_MGT_DATA_TYPE_MASK(17) +#define FUNC_MGT_DATA_TYPE_BLOB FUNC_MGT_DATA_TYPE_MASK(18) + +#define FUNC_MGT_EXACT_NUMERIC_DATA_TYPE \ + (FUNC_MGT_DATA_TYPE_TINYINT | FUNC_MGT_DATA_TYPE_SMALLINT | FUNC_MGT_DATA_TYPE_INT | FUNC_MGT_DATA_TYPE_BIGINT \ + | FUNC_MGT_DATA_TYPE_UTINYINT | FUNC_MGT_DATA_TYPE_USMALLINT | FUNC_MGT_DATA_TYPE_UINT | FUNC_MGT_DATA_TYPE_UBIGINT) + +#define FUNC_MGT_APPRO_NUMERIC_DATA_TYPE (FUNC_MGT_DATA_TYPE_FLOAT | FUNC_MGT_DATA_TYPE_DOUBLE) + +#define FUNC_MGT_NUMERIC_DATA_TYPE (FUNC_MGT_EXACT_NUMERIC_DATA_TYPE | FUNC_MGT_APPRO_NUMERIC_DATA_TYPE) + +typedef void* FuncDef; + +typedef struct SFuncElement { + FuncDef (*defineFunc)(); +} SFuncElement; + +extern const SFuncElement gBuiltinFuncs[]; + +FuncDef createFuncDef(const char* name, int32_t maxNumOfParams); +FuncDef setOneParamSignature(FuncDef def, int64_t resDataType, int64_t paramDataType); +FuncDef setTwoParamsSignature(FuncDef def, int64_t resDataType, int64_t p1DataType, int64_t p2DataType); +FuncDef setFollowParamSignature(FuncDef def, int64_t paramDataType); +FuncDef setFollowParamsSignature(FuncDef def, int64_t p1DataType, int64_t p2DataType, int32_t followNo); + +FuncDef setExecFuncs(FuncDef def, FExecGetEnv getEnv, FExecInit init, FExecProcess process, FExecFinalize finalize); + +#ifdef __cplusplus +} +#endif + +#endif // _TD_FUNCTION_MGT_INT_H_ diff --git a/source/libs/function/inc/taggfunction.h b/source/libs/function/inc/taggfunction.h index 41c7309a18..9192c19951 100644 --- a/source/libs/function/inc/taggfunction.h +++ b/source/libs/function/inc/taggfunction.h @@ -95,6 +95,10 @@ static FORCE_INLINE void initResultRowEntry(SResultRowEntryInfo *pResInfo, int32 memset(GET_ROWCELL_INTERBUF(pResInfo), 0, bufLen); } +#include "functionMgtInt.h" + +FuncDef defineCount(); + #ifdef __cplusplus } #endif diff --git a/source/libs/function/src/buildins.c b/source/libs/function/src/buildins.c new file mode 100644 index 0000000000..ea2e9f3f2f --- /dev/null +++ b/source/libs/function/src/buildins.c @@ -0,0 +1,21 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "functionMgtInt.h" +#include "taggfunction.h" + +const SFuncElement gBuiltinFuncs[] = { + {.defineFunc = defineCount} +}; diff --git a/source/libs/function/src/functionMgt.c b/source/libs/function/src/functionMgt.c new file mode 100644 index 0000000000..c6ed2c8c03 --- /dev/null +++ b/source/libs/function/src/functionMgt.c @@ -0,0 +1,80 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "functionMgt.h" + +#include "functionMgtInt.h" +#include "taos.h" +#include "taoserror.h" +#include "thash.h" + +typedef struct SFuncMgtService { + SHashObj* pFuncNameHashTable; +} SFuncMgtService; + +static SFuncMgtService gFunMgtService; + +int32_t fmFuncMgtInit() { + gFunMgtService.pFuncNameHashTable = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + if (NULL == gFunMgtService.pFuncNameHashTable) { + return TSDB_CODE_FAILED; + } + return TSDB_CODE_SUCCESS; +} + +typedef struct SFuncDef { + char name[TSDB_FUNC_NAME_LEN]; + int32_t maxNumOfParams; + SFuncExecFuncs execFuncs; +} SFuncDef ; + +FuncDef createFuncDef(const char* name, int32_t maxNumOfParams) { + SFuncDef* pDef = calloc(1, sizeof(SFuncDef)); + if (NULL == pDef) { + return NULL; + } + strcpy(pDef->name, name); + pDef->maxNumOfParams = maxNumOfParams; + return pDef; +} + +FuncDef setOneParamSignature(FuncDef def, int64_t resDataType, int64_t paramDataType) { + // todo +} + +FuncDef setTwoParamsSignature(FuncDef def, int64_t resDataType, int64_t p1DataType, int64_t p2DataType) { + // todo +} + +FuncDef setFollowParamSignature(FuncDef def, int64_t paramDataType) { + // todo +} + +FuncDef setFollowParamsSignature(FuncDef def, int64_t p1DataType, int64_t p2DataType, int32_t followNo) { + // todo +} + +FuncDef setExecFuncs(FuncDef def, FExecGetEnv getEnv, FExecInit init, FExecProcess process, FExecFinalize finalize) { + SFuncDef* pDef = (SFuncDef*)def; + pDef->execFuncs.getEnv = getEnv; + pDef->execFuncs.init = init; + pDef->execFuncs.process = process; + pDef->execFuncs.finalize = finalize; + return def; +} + +int32_t registerFunc(FuncDef func) { + +} diff --git a/source/libs/function/src/taggfunction.c b/source/libs/function/src/taggfunction.c index 3af4a8fe57..3b3edf1501 100644 --- a/source/libs/function/src/taggfunction.c +++ b/source/libs/function/src/taggfunction.c @@ -4835,3 +4835,9 @@ SAggFunctionInfo aggFunc[35] = {{ statisRequired, } }; + +FuncDef defineCount() { + FuncDef def = createFuncDef("count", 1); + // todo define signature + return setExecFuncs(def, NULL, function_setup, count_function, doFinalizer); +} diff --git a/source/libs/planner/src/logicPlan.c b/source/libs/planner/src/logicPlan.c index 93f72bba95..a0feaca72b 100644 --- a/source/libs/planner/src/logicPlan.c +++ b/source/libs/planner/src/logicPlan.c @@ -212,7 +212,7 @@ static SQueryPlanNode* doAddTableColumnNode(const SQueryStmtInfo* pQueryInfo, SQ pExpr[i] = p; } - pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL); + // pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExpr, numOfCols, NULL); tfree(pExpr); } diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index c7a4e438ba..04282a2f80 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -109,6 +109,26 @@ static bool fromPnode(const cJSON* json, const char* name, FFromJson func, void* return func(jObj, *obj); } +static bool fromPnodeArray(const cJSON* json, const char* name, FFromJson func, SArray** array) { + const cJSON* jArray = cJSON_GetObjectItem(json, name); + int32_t size = (NULL == jArray ? 0 : cJSON_GetArraySize(jArray)); + if (size > 0) { + *array = taosArrayInit(size, POINTER_BYTES); + if (NULL == *array) { + return false; + } + } + for (int32_t i = 0; i < size; ++i) { + cJSON* jItem = cJSON_GetArrayItem(jArray, i); + void* item = calloc(1, getPnodeTypeSize(jItem)); + if (NULL == item || !func(jItem, item)) { + return false; + } + taosArrayPush(*array, &item); + } + return true; +} + static bool addTarray(cJSON* json, const char* name, FToJson func, const SArray* array, bool isPoint) { size_t size = (NULL == array) ? 0 : taosArrayGetSize(array); if (size > 0) { @@ -379,8 +399,8 @@ static const char* jkFunctionChild = "Child"; static bool functionToJson(const void* obj, cJSON* jFunc) { const tExprNode* exprInfo = (const tExprNode*)obj; bool res = cJSON_AddStringToObject(jFunc, jkFunctionName, exprInfo->_function.functionName); - if (res) { - res = addRawArray(jFunc, jkFunctionChild, exprNodeToJson, exprInfo->_function.pChild, sizeof(tExprNode*), exprInfo->_function.num); + if (res && NULL != exprInfo->_function.pChild) { + res = addRawArray(jFunc, jkFunctionChild, exprNodeToJson, *(exprInfo->_function.pChild), sizeof(tExprNode*), exprInfo->_function.num); } return res; } @@ -388,6 +408,10 @@ static bool functionToJson(const void* obj, cJSON* jFunc) { static bool functionFromJson(const cJSON* json, void* obj) { tExprNode* exprInfo = (tExprNode*)obj; copyString(json, jkFunctionName, exprInfo->_function.functionName); + exprInfo->_function.pChild = calloc(1, sizeof(tExprNode*)); + if (NULL == exprInfo->_function.pChild) { + return false; + } return fromRawArrayWithAlloc(json, jkFunctionChild, exprNodeFromJson, (void**)exprInfo->_function.pChild, sizeof(tExprNode*), &exprInfo->_function.num); } @@ -808,7 +832,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) { case OP_SystemTableScan: return scanNodeFromJson(json, obj); case OP_Aggregate: - break; // todo + return aggNodeFromJson(json, obj); case OP_Project: return true; // case OP_Groupby: @@ -879,7 +903,7 @@ static bool phyNodeFromJson(const cJSON* json, void* obj) { res = fromObject(json, jkPnodeSchema, dataBlockSchemaFromJson, &node->targetSchema, true); } if (res) { - res = fromArray(json, jkPnodeChildren, phyNodeFromJson, &node->pChildren, sizeof(SSlotSchema)); + res = fromPnodeArray(json, jkPnodeChildren, phyNodeFromJson, &node->pChildren); } if (res) { res = fromObject(json, node->info.name, specificPhyNodeFromJson, node, true); diff --git a/source/nodes/CMakeLists.txt b/source/nodes/CMakeLists.txt new file mode 100644 index 0000000000..b30534f3f2 --- /dev/null +++ b/source/nodes/CMakeLists.txt @@ -0,0 +1,15 @@ +aux_source_directory(src NODES_SRC) +add_library(nodes STATIC ${NODES_SRC}) +target_include_directories( + nodes + PUBLIC "${CMAKE_SOURCE_DIR}/include/nodes" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" +) +target_link_libraries( + nodes + PRIVATE os util +) + +if(${BUILD_TEST}) + ADD_SUBDIRECTORY(test) +endif(${BUILD_TEST}) \ No newline at end of file diff --git a/source/nodes/src/nodesClone.c b/source/nodes/src/nodesClone.c new file mode 100644 index 0000000000..04f6df5623 --- /dev/null +++ b/source/nodes/src/nodesClone.c @@ -0,0 +1,20 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "nodes.h" + +void cloneNode(const SNode* pNode) { + +} diff --git a/source/nodes/src/nodesCode.c b/source/nodes/src/nodesCode.c new file mode 100644 index 0000000000..7fe919ffe8 --- /dev/null +++ b/source/nodes/src/nodesCode.c @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "nodes.h" + +int32_t nodeToString(const SNode* pNode, char** pStr, int32_t* pLen) { + +} + +int32_t stringToNode(const char* pStr, SNode** pNode) { + +} diff --git a/source/nodes/src/nodesEqual.c b/source/nodes/src/nodesEqual.c new file mode 100644 index 0000000000..bef025fbea --- /dev/null +++ b/source/nodes/src/nodesEqual.c @@ -0,0 +1,141 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "nodes.h" + +#define COMPARE_SCALAR_FIELD(fldname) \ + do { \ + if (a->fldname != b->fldname) \ + return false; \ + } while (0) + +#define COMPARE_STRING(a, b) \ + (((a) != NULL && (b) != NULL) ? (strcmp(a, b) == 0) : (a) == (b)) + +#define COMPARE_STRING_FIELD(fldname) \ + do { \ + if (!COMPARE_STRING(a->fldname, b->fldname)) \ + return false; \ + } while (0) + +#define COMPARE_NODE_FIELD(fldname) \ + do { \ + if (!nodeEqual(a->fldname, b->fldname)) \ + return false; \ + } while (0) + +#define COMPARE_ARRAY_FIELD(fldname) \ + do { \ + if (!nodeArrayEqual(a->fldname, b->fldname)) \ + return false; \ + } while (0) + +static bool nodeArrayEqual(const SArray* a, const SArray* b) { + if (a == b) { + return true; + } + + if (NULL == a || NULL == b) { + return false; + } + + if (taosArrayGetSize(a) != taosArrayGetSize(b)) { + return false; + } + + size_t size = taosArrayGetSize(a); + for (size_t i = 0; i < size; ++i) { + if (!nodeEqual((SNode*)taosArrayGetP(a, i), (SNode*)taosArrayGetP(b, i))) { + return false; + } + } + return true; +} + +static bool columnNodeEqual(const SColumnNode* a, const SColumnNode* b) { + COMPARE_STRING_FIELD(dbName); + COMPARE_STRING_FIELD(tableName); + COMPARE_STRING_FIELD(colName); + return true; +} + +static bool valueNodeEqual(const SValueNode* a, const SValueNode* b) { + COMPARE_STRING_FIELD(literal); + return true; +} + +static bool operatorNodeEqual(const SOperatorNode* a, const SOperatorNode* b) { + COMPARE_SCALAR_FIELD(opType); + COMPARE_NODE_FIELD(pLeft); + COMPARE_NODE_FIELD(pRight); + return true; +} + +static bool logicConditionNodeEqual(const SLogicConditionNode* a, const SLogicConditionNode* b) { + COMPARE_SCALAR_FIELD(condType); + COMPARE_ARRAY_FIELD(pParameterList); + return true; +} + +static bool isNullConditionNodeEqual(const SIsNullCondNode* a, const SIsNullCondNode* b) { + COMPARE_NODE_FIELD(pExpr); + COMPARE_SCALAR_FIELD(isNot); + return true; +} + +static bool functionNodeEqual(const SFunctionNode* a, const SFunctionNode* b) { + COMPARE_SCALAR_FIELD(funcId); + COMPARE_ARRAY_FIELD(pParameterList); + return true; +} + +bool nodeEqual(const SNode* a, const SNode* b) { + if (a == b) { + return true; + } + + if (NULL == a || NULL == b) { + return false; + } + + if (nodeType(a) != nodeType(b)) { + return false; + } + + switch (nodeType(a)) { + case QUERY_NODE_COLUMN: + return columnNodeEqual((const SColumnNode*)a, (const SColumnNode*)b); + case QUERY_NODE_VALUE: + return valueNodeEqual((const SValueNode*)a, (const SValueNode*)b); + case QUERY_NODE_OPERATOR: + return operatorNodeEqual((const SOperatorNode*)a, (const SOperatorNode*)b); + case QUERY_NODE_LOGIC_CONDITION: + return logicConditionNodeEqual((const SLogicConditionNode*)a, (const SLogicConditionNode*)b); + case QUERY_NODE_IS_NULL_CONDITION: + return isNullConditionNodeEqual((const SIsNullCondNode*)a, (const SIsNullCondNode*)b); + case QUERY_NODE_FUNCTION: + return functionNodeEqual((const SFunctionNode*)a, (const SFunctionNode*)b); + case QUERY_NODE_REAL_TABLE: + case QUERY_NODE_TEMP_TABLE: + case QUERY_NODE_JOIN_TABLE: + case QUERY_NODE_GROUPING_SET: + case QUERY_NODE_ORDER_BY_EXPR: + return false; // todo + default: + break; + } + + return false; +} diff --git a/source/nodes/src/nodesTraverse.c b/source/nodes/src/nodesTraverse.c new file mode 100644 index 0000000000..eac8288099 --- /dev/null +++ b/source/nodes/src/nodesTraverse.c @@ -0,0 +1,83 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "nodes.h" + +typedef bool (*FQueryNodeWalker)(SNode* pNode, void* pContext); + +bool nodeArrayWalker(SArray* pArray, FQueryNodeWalker walker, void* pContext) { + size_t size = taosArrayGetSize(pArray); + for (size_t i = 0; i < size; ++i) { + if (!nodeTreeWalker((SNode*)taosArrayGetP(pArray, i), walker, pContext)) { + return false; + } + } + return true; +} + +bool nodeTreeWalker(SNode* pNode, FQueryNodeWalker walker, void* pContext) { + if (NULL == pNode) { + return true; + } + + if (!walker(pNode, pContext)) { + return false; + } + + switch (nodeType(pNode)) { + case QUERY_NODE_COLUMN: + case QUERY_NODE_VALUE: + // these node types with no subnodes + return true; + case QUERY_NODE_OPERATOR: { + SOperatorNode* pOpNode = (SOperatorNode*)pNode; + if (!nodeTreeWalker(pOpNode->pLeft, walker, pContext)) { + return false; + } + return nodeTreeWalker(pOpNode->pRight, walker, pContext); + } + case QUERY_NODE_LOGIC_CONDITION: + return nodeArrayWalker(((SLogicConditionNode*)pNode)->pParameterList, walker, pContext); + case QUERY_NODE_IS_NULL_CONDITION: + return nodeTreeWalker(((SIsNullCondNode*)pNode)->pExpr, walker, pContext); + case QUERY_NODE_FUNCTION: + return nodeArrayWalker(((SFunctionNode*)pNode)->pParameterList, walker, pContext); + case QUERY_NODE_REAL_TABLE: + case QUERY_NODE_TEMP_TABLE: + return true; // todo + case QUERY_NODE_JOIN_TABLE: { + SJoinTableNode* pJoinTableNode = (SJoinTableNode*)pNode; + if (!nodeTreeWalker(pJoinTableNode->pLeft, walker, pContext)) { + return false; + } + if (!nodeTreeWalker(pJoinTableNode->pRight, walker, pContext)) { + return false; + } + return nodeTreeWalker(pJoinTableNode->pOnCond, walker, pContext); + } + case QUERY_NODE_GROUPING_SET: + return nodeArrayWalker(((SGroupingSetNode*)pNode)->pParameterList, walker, pContext); + case QUERY_NODE_ORDER_BY_EXPR: + return nodeTreeWalker(((SOrderByExprNode*)pNode)->pExpr, walker, pContext); + default: + break; + } + + return false; +} + +bool stmtWalker(SNode* pNode, FQueryNodeWalker walker, void* pContext) { + +} diff --git a/source/nodes/src/nodesUtil.c b/source/nodes/src/nodesUtil.c new file mode 100644 index 0000000000..fe5883d809 --- /dev/null +++ b/source/nodes/src/nodesUtil.c @@ -0,0 +1,24 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "nodes.h" + +bool isTimeorderQuery(const SNode* pQuery) { + +} + +bool isTimelineQuery(const SNode* pQuery) { + +} diff --git a/source/nodes/test/CMakeLists.txt b/source/nodes/test/CMakeLists.txt new file mode 100644 index 0000000000..80725a79fb --- /dev/null +++ b/source/nodes/test/CMakeLists.txt @@ -0,0 +1,19 @@ + +MESSAGE(STATUS "build nodes unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +ADD_EXECUTABLE(nodesTest ${SOURCE_LIST}) + +TARGET_INCLUDE_DIRECTORIES( + nodesTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/nodes/" + PRIVATE "${CMAKE_SOURCE_DIR}/source/nodes/inc" +) + +TARGET_LINK_LIBRARIES( + nodesTest + PUBLIC os util common nodes gtest +) diff --git a/source/nodes/test/nodesTest.cpp b/source/nodes/test/nodesTest.cpp new file mode 100644 index 0000000000..7df3cd8b4c --- /dev/null +++ b/source/nodes/test/nodesTest.cpp @@ -0,0 +1,25 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +TEST(NodesTest, traverseTest) { + // todo +} + +int main(int argc, char* argv[]) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +}