From 0b2aea0f3519b519161fe8f420cafe786c936914 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 18 Jan 2022 18:01:21 +0800 Subject: [PATCH 1/8] add client --- source/libs/transport/inc/transComm.h | 120 +++++ source/libs/transport/inc/transportInt.h | 88 ++- source/libs/transport/src/trans.c | 64 +++ source/libs/transport/src/transCli.c | 180 +++++++ source/libs/transport/src/transComm.c | 117 ++++ source/libs/transport/src/transSrv.c | 540 +++++++++++++++++++ source/libs/transport/test/transportTests.cc | 3 +- 7 files changed, 1066 insertions(+), 46 deletions(-) create mode 100644 source/libs/transport/inc/transComm.h create mode 100644 source/libs/transport/src/trans.c create mode 100644 source/libs/transport/src/transCli.c create mode 100644 source/libs/transport/src/transComm.c create mode 100644 source/libs/transport/src/transSrv.c diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h new file mode 100644 index 0000000000..792200639a --- /dev/null +++ b/source/libs/transport/inc/transComm.h @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifdef USE_UV + +#include +#include "lz4.h" +#include "os.h" +#include "rpcCache.h" +#include "rpcHead.h" +#include "rpcLog.h" +#include "rpcTcp.h" +#include "rpcUdp.h" +#include "taoserror.h" +#include "tglobal.h" +#include "thash.h" +#include "tidpool.h" +#include "tmd5.h" +#include "tmempool.h" +#include "tmsg.h" +#include "transportInt.h" +#include "tref.h" +#include "trpc.h" +#include "ttimer.h" +#include "tutil.h" + +typedef void* queue[2]; +/* Private macros. */ +#define QUEUE_NEXT(q) (*(queue**)&((*(q))[0])) +#define QUEUE_PREV(q) (*(queue**)&((*(q))[1])) + +#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) +#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) +/* Initialize an empty queue. */ +#define QUEUE_INIT(q) \ + { \ + QUEUE_NEXT(q) = (q); \ + QUEUE_PREV(q) = (q); \ + } + +/* Return true if the queue has no element. */ +#define QUEUE_IS_EMPTY(q) ((const queue*)(q) == (const queue*)QUEUE_NEXT(q)) + +/* Insert an element at the back of a queue. */ +#define QUEUE_PUSH(q, e) \ + { \ + QUEUE_NEXT(e) = (q); \ + QUEUE_PREV(e) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(e) = (e); \ + QUEUE_PREV(q) = (e); \ + } + +/* Remove the given element from the queue. Any element can be removed at any * + * time. */ +#define QUEUE_REMOVE(e) \ + { \ + QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ + QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ + } + +/* Return the element at the front of the queue. */ +#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) + +/* Return the element at the back of the queue. */ +#define QUEUE_TAIL(q) (QUEUE_PREV(q)) + +/* Iterate over the element of a queue. * Mutating the queue while iterating + * results in undefined behavior. */ +#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q)) + +/* Return the structure holding the given element. */ +#define QUEUE_DATA(e, type, field) ((type*)((void*)((char*)(e)-offsetof(type, field)))) + +typedef struct { + SRpcInfo* pRpc; // associated SRpcInfo + SEpSet epSet; // ip list provided by app + void* ahandle; // handle provided by app + struct SRpcConn* pConn; // pConn allocated + tmsg_t msgType; // message type + uint8_t* pCont; // content provided by app + int32_t contLen; // content length + int32_t code; // error code + int16_t numOfTry; // number of try for different servers + int8_t oldInUse; // server EP inUse passed by app + int8_t redirect; // flag to indicate redirect + int8_t connType; // connection type + int64_t rid; // refId returned by taosAddRef + SRpcMsg* pRsp; // for synchronous API + tsem_t* pSem; // for synchronous API + SEpSet* pSet; // for synchronous API + char msg[0]; // RpcHead starts from here +} SRpcReqContext; + +#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) +#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext)) + +#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) +#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) +#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) +#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) +#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) +#define rpcIsReq(type) (type & 1U) + +int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); +void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); +int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); +SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead); + +#endif diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index f93753cfe9..0addea5d1a 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -16,62 +16,60 @@ #ifndef _TD_TRANSPORT_INT_H_ #define _TD_TRANSPORT_INT_H_ +#include +#include "lz4.h" +#include "os.h" +#include "rpcCache.h" #include "rpcHead.h" +#include "rpcLog.h" +#include "rpcTcp.h" +#include "rpcUdp.h" +#include "taoserror.h" +#include "tglobal.h" +#include "thash.h" +#include "tidpool.h" +#include "tmsg.h" +#include "transportInt.h" +#include "tref.h" +#include "trpc.h" +#include "ttimer.h" +#include "tutil.h" + #ifdef __cplusplus extern "C" { #endif #ifdef USE_UV -#include -typedef void *queue[2]; +void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); -/* Private macros. */ -#define QUEUE_NEXT(q) (*(queue **)&((*(q))[0])) -#define QUEUE_PREV(q) (*(queue **)&((*(q))[1])) +typedef struct { + int sessions; // number of sessions allowed + int numOfThreads; // number of threads to process incoming messages + int idleTime; // milliseconds; + uint16_t localPort; + int8_t connType; + int64_t index; + char label[TSDB_LABEL_LEN]; -#define QUEUE_PREV_NEXT(q) (QUEUE_NEXT(QUEUE_PREV(q))) -#define QUEUE_NEXT_PREV(q) (QUEUE_PREV(QUEUE_NEXT(q))) + char user[TSDB_UNI_LEN]; // meter ID + char spi; // security parameter index + char encrypt; // encrypt algorithm + char secret[TSDB_PASSWORD_LEN]; // secret for the link + char ckey[TSDB_PASSWORD_LEN]; // ciphering key -/* Initialize an empty queue. */ -#define QUEUE_INIT(q) \ - { \ - QUEUE_NEXT(q) = (q); \ - QUEUE_PREV(q) = (q); \ - } + void (*cfp)(void* parent, SRpcMsg*, SEpSet*); + int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); -/* Return true if the queue has no element. */ -#define QUEUE_IS_EMPTY(q) ((const queue *)(q) == (const queue *)QUEUE_NEXT(q)) - -/* Insert an element at the back of a queue. */ -#define QUEUE_PUSH(q, e) \ - { \ - QUEUE_NEXT(e) = (q); \ - QUEUE_PREV(e) = QUEUE_PREV(q); \ - QUEUE_PREV_NEXT(e) = (e); \ - QUEUE_PREV(q) = (e); \ - } - -/* Remove the given element from the queue. Any element can be removed at any * - * time. */ -#define QUEUE_REMOVE(e) \ - { \ - QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ - QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ - } - -/* Return the element at the front of the queue. */ -#define QUEUE_HEAD(q) (QUEUE_NEXT(q)) - -/* Return the element at the back of the queue. */ -#define QUEUE_TAIL(q) (QUEUE_PREV(q)) - -/* Iterate over the element of a queue. * Mutating the queue while iterating - * results in undefined behavior. */ -#define QUEUE_FOREACH(q, e) for ((q) = QUEUE_NEXT(e); (q) != (e); (q) = QUEUE_NEXT(q)) - -/* Return the structure holding the given element. */ -#define QUEUE_DATA(e, type, field) ((type *)((void *)((char *)(e)-offsetof(type, field)))) + int32_t refCount; + void* parent; + void* idPool; // handle to ID pool + void* tmrCtrl; // handle to timer + SHashObj* hash; // handle returned by hash utility + void* tcphandle; // returned handle from TCP initialization + pthread_mutex_t mutex; +} SRpcInfo; #endif // USE_LIBUV diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c new file mode 100644 index 0000000000..a59f2f88c7 --- /dev/null +++ b/source/libs/transport/src/trans.c @@ -0,0 +1,64 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifdef USE_UV + +#include "transComm.h" + +typedef struct SConnBuffer { + char* buf; + int len; + int cap; + int left; +} SConnBuffer; + +void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = { + taosInitServer, taosInitClient}; + +void* rpcOpen(const SRpcInit* pInit) { + SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); + if (pRpc == NULL) { + return NULL; + } + if (pInit->label) { + tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); + } + pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; + pRpc->connType = pInit->connType; + pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); + + return pRpc; +} +void rpcClose(void* arg) { return; } +void* rpcMallocCont(int contLen) { return NULL; } +void rpcFreeCont(void* cont) { return; } +void* rpcReallocCont(void* ptr, int contLen) { return NULL; } + +void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} +int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } +void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } +int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } +void rpcCancelRequest(int64_t rid) { return; } + +int32_t rpcInit(void) { + // impl later + return -1; +} + +void rpcCleanup(void) { + // impl later + return; +} +#endif diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c new file mode 100644 index 0000000000..f848932c0f --- /dev/null +++ b/source/libs/transport/src/transCli.c @@ -0,0 +1,180 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifdef USE_UV + +#include "transComm.h" + +typedef struct SCliConn { + uv_connect_t connReq; + uv_stream_t* stream; + void* data; + queue conn; +} SCliConn; +typedef struct SCliMsg { + SRpcReqContext* context; + queue q; +} SCliMsg; + +typedef struct SCliThrdObj { + pthread_t thread; + uv_loop_t* loop; + uv_async_t* cliAsync; // + void* cache; // conn pool + queue msg; + pthread_mutex_t msgMtx; + void* shandle; +} SCliThrdObj; + +typedef struct SClientObj { + char label[TSDB_LABEL_LEN]; + int32_t index; + int numOfThreads; + SCliThrdObj** pThreadObj; +} SClientObj; + +static void clientWriteCb(uv_write_t* req, int status); +static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void clientConnCb(struct uv_connect_s* req, int status); +static void clientAsyncCb(uv_async_t* handle); + +static void* clientThread(void* arg); + +static void clientWriteCb(uv_write_t* req, int status) { + // impl later +} +static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { + // impl later +} +static void clientConnCb(struct uv_connect_s* req, int status) { + // impl later +} + +static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { + // impl later + return NULL; +} +static void clientAsyncCb(uv_async_t* handle) { + SCliThrdObj* pThrd = handle->data; + SCliMsg* pMsg = NULL; + pthread_mutex_lock(&pThrd->msgMtx); + if (!QUEUE_IS_EMPTY(&pThrd->msg)) { + queue* head = QUEUE_HEAD(&pThrd->msg); + pMsg = QUEUE_DATA(head, SCliMsg, q); + QUEUE_REMOVE(head); + } + pthread_mutex_unlock(&pThrd->msgMtx); + + SEpSet* pEpSet = &pMsg->context->epSet; + char* fqdn = pEpSet->fqdn[pEpSet->inUse]; + uint32_t port = pEpSet->port[pEpSet->inUse]; + + SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port); + if (conn != NULL) { + } else { + SCliConn* conn = malloc(sizeof(SCliConn)); + + conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + + conn->connReq.data = conn; + conn->data = pMsg; + + struct sockaddr_in addr; + uv_ip4_addr(fqdn, port, &addr); + // handle error in callback if connect error + uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); + } + + // SRpcReqContext* pCxt = pMsg->context; + + // SRpcHead* pHead = rpcHeadFromCont(pCtx->pCont); + // char* msg = (char*)pHead; + // int len = rpcMsgLenFromCont(pCtx->contLen); + // tmsg_t msgType = pCtx->msgType; + + // impl later +} + +static void* clientThread(void* arg) { + SCliThrdObj* pThrd = (SCliThrdObj*)arg; + + QUEUE_INIT(&pThrd->msg); + pthread_mutex_init(&pThrd->msgMtx, NULL); + + // QUEUE_INIT(&pThrd->clientCache); + + pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + uv_loop_init(pThrd->loop); + + pThrd->cliAsync = malloc(sizeof(uv_async_t)); + uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); + pThrd->cliAsync->data = pThrd; + + uv_run(pThrd->loop, UV_RUN_DEFAULT); +} + +void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + SClientObj* cli = calloc(1, sizeof(SClientObj)); + memcpy(cli->label, label, strlen(label)); + cli->numOfThreads = numOfThreads; + cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); + + for (int i = 0; i < cli->numOfThreads; i++) { + SCliThrdObj* thrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); + + thrd->shandle = shandle; + int err = pthread_create(&thrd->thread, NULL, clientThread, (void*)(thrd)); + if (err == 0) { + tDebug("sucess to create tranport-client thread %d", i); + } + cli->pThreadObj[i] = thrd; + } + return cli; +} + +void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* pRid) { + // impl later + SRpcInfo* pRpc = (SRpcInfo*)shandle; + + SRpcReqContext* pContext; + int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); + pContext = (SRpcReqContext*)((char*)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); + pContext->ahandle = pMsg->ahandle; + pContext->pRpc = (SRpcInfo*)shandle; + pContext->epSet = *pEpSet; + pContext->contLen = contLen; + pContext->pCont = pMsg->pCont; + pContext->msgType = pMsg->msgType; + pContext->oldInUse = pEpSet->inUse; + + assert(pRpc->connType == TAOS_CONN_CLIENT); + // atomic or not + int64_t index = pRpc->index; + if (pRpc->index++ >= pRpc->numOfThreads) { + pRpc->index = 0; + } + SCliMsg* msg = malloc(sizeof(SCliMsg)); + msg->context = pContext; + + SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads]; + + pthread_mutex_lock(&thrd->msgMtx); + QUEUE_PUSH(&thrd->msg, &msg->q); + pthread_mutex_unlock(&thrd->msgMtx); + + uv_async_send(thrd->cliAsync); +} +#endif diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c new file mode 100644 index 0000000000..f23cfb6e2d --- /dev/null +++ b/source/libs/transport/src/transComm.c @@ -0,0 +1,117 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#ifdef USE_UV + +#include "transComm.h" + +int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { + T_MD5_CTX context; + int ret = -1; + + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Update(&context, (uint8_t*)pMsg, msgLen); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Final(&context); + + if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0; + + return ret; +} +void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { + T_MD5_CTX context; + + tMD5Init(&context); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Update(&context, (uint8_t*)pMsg, msgLen); + tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); + tMD5Final(&context); + + memcpy(pAuth, context.digest, sizeof(context.digest)); +} + +int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { + SRpcHead* pHead = rpcHeadFromCont(pCont); + int32_t finalLen = 0; + int overhead = sizeof(SRpcComp); + + if (!NEEDTO_COMPRESSS_MSG(contLen)) { + return contLen; + } + + char* buf = malloc(contLen + overhead + 8); // 8 extra bytes + if (buf == NULL) { + tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); + return contLen; + } + + int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); + tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead); + + /* + * only the compressed size is less than the value of contLen - overhead, the compression is applied + * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message + */ + if (compLen > 0 && compLen < contLen - overhead) { + SRpcComp* pComp = (SRpcComp*)pCont; + pComp->reserved = 0; + pComp->contLen = htonl(contLen); + memcpy(pCont + overhead, buf, compLen); + + pHead->comp = 1; + tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen); + finalLen = compLen + overhead; + } else { + finalLen = contLen; + } + + free(buf); + return finalLen; +} + +SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) { + int overhead = sizeof(SRpcComp); + SRpcHead* pNewHead = NULL; + uint8_t* pCont = pHead->content; + SRpcComp* pComp = (SRpcComp*)pHead->content; + + if (pHead->comp) { + // decompress the content + assert(pComp->reserved == 0); + int contLen = htonl(pComp->contLen); + + // prepare the temporary buffer to decompress message + char* temp = (char*)malloc(contLen + RPC_MSG_OVERHEAD); + pNewHead = (SRpcHead*)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext + + if (pNewHead) { + int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; + int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char*)pNewHead->content, compLen, contLen); + assert(origLen == contLen); + + memcpy(pNewHead, pHead, sizeof(SRpcHead)); + pNewHead->msgLen = rpcMsgLenFromCont(origLen); + /// rpcFreeMsg(pHead); // free the compressed message buffer + pHead = pNewHead; + tTrace("decomp malloc mem:%p", temp); + } else { + tError("failed to allocate memory to decompress msg, contLen:%d", contLen); + } + } + + return pHead; +} + +#endif diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c new file mode 100644 index 0000000000..0bf39b9985 --- /dev/null +++ b/source/libs/transport/src/transSrv.c @@ -0,0 +1,540 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifdef USE_UV +#include "transComm.h" + +typedef struct SConnBuffer { + char* buf; + int len; + int cap; + int left; +} SConnBuffer; + +typedef struct SConn { + uv_tcp_t* pTcp; + uv_write_t* pWriter; + uv_timer_t* pTimer; + + uv_async_t* pWorkerAsync; + queue queue; + int ref; + int persist; // persist connection or not + SConnBuffer connBuf; // read buf, + SConnBuffer writeBuf; // write buf + int count; + void* shandle; // rpc init + void* ahandle; // + void* hostThrd; + // del later + char secured; + int spi; + char info[64]; + char user[TSDB_UNI_LEN]; // user ID for the link + char secret[TSDB_PASSWORD_LEN]; + char ckey[TSDB_PASSWORD_LEN]; // ciphering key +} SConn; + +typedef struct SWorkThrdObj { + pthread_t thread; + uv_pipe_t* pipe; + int fd; + uv_loop_t* loop; + uv_async_t* workerAsync; // + queue conn; + pthread_mutex_t connMtx; + void* shandle; +} SWorkThrdObj; + +typedef struct SServerObj { + pthread_t thread; + uv_tcp_t server; + uv_loop_t* loop; + int workerIdx; + int numOfThreads; + SWorkThrdObj** pThreadObj; + uv_pipe_t** pipe; + uint32_t ip; + uint32_t port; +} SServerObj; + +static const char* notify = "a"; + +// refactor later +static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen); + +static int uvAuthMsg(SConn* pConn, char* msg, int msgLen); + +static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); +static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); +static void uvOnTimeoutCb(uv_timer_t* handle); +static void uvOnWriteCb(uv_write_t* req, int status); +static void uvOnAcceptCb(uv_stream_t* stream, int status); +static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); +static void uvWorkerAsyncCb(uv_async_t* handle); + +// already read complete packet +static bool readComplete(SConnBuffer* buf); + +static SConn* connCreate(); +static void connDestroy(SConn* conn); +static void uvConnDestroy(uv_handle_t* handle); + +// server worke thread +static void* workerThread(void* arg); +static void* acceptThread(void* arg); + +void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + /* + * formate of data buffer: + * |<-------SRpcReqContext------->|<------------data read from socket----------->| + */ + static const int CAPACITY = 1024; + + SConn* conn = handle->data; + SConnBuffer* pBuf = &conn->connBuf; + if (pBuf->cap == 0) { + pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char)); + pBuf->len = 0; + pBuf->cap = CAPACITY; + pBuf->left = -1; + + buf->base = pBuf->buf + RPC_RESERVE_SIZE; + buf->len = CAPACITY; + } else { + if (pBuf->len >= pBuf->cap) { + if (pBuf->left == -1) { + pBuf->cap *= 2; + pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE); + } else if (pBuf->len + pBuf->left > pBuf->cap) { + pBuf->cap = pBuf->len + pBuf->left; + pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE); + } + } + buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE; + buf->len = pBuf->cap - pBuf->len; + } +} + +// check data read from socket completely or not +// +static bool readComplete(SConnBuffer* data) { + // TODO(yihao): handle pipeline later + SRpcHead rpcHead; + int32_t headLen = sizeof(rpcHead); + if (data->len >= headLen) { + memcpy((char*)&rpcHead, data->buf + RPC_RESERVE_SIZE, headLen); + int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); + if (msgLen > data->len) { + data->left = msgLen - data->len; + return false; + } else { + return true; + } + } else { + return false; + } +} + +static void uvDoProcess(SRecvInfo* pRecv) { + SRpcHead* pHead = (SRpcHead*)pRecv->msg; + SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; + SConn* pConn = pRecv->thandle; + + tDump(pRecv->msg, pRecv->msgLen); + + terrno = 0; + SRpcReqContext* pContest; + + // do auth and check +} + +static int uvAuthMsg(SConn* pConn, char* msg, int len) { + SRpcHead* pHead = (SRpcHead*)msg; + int code = 0; + + if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) { + // secured link, or no authentication + pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); + // tTrace("%s, secured link, no auth is required", pConn->info); + return 0; + } + + if (!rpcIsReq(pHead->msgType)) { + // for response, if code is auth failure, it shall bypass the auth process + code = htonl(pHead->code); + if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || + code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED || + code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { + pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); + // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); + return 0; + } + } + + code = 0; + if (pHead->spi == pConn->spi) { + // authentication + SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest)); + + int32_t delta; + delta = (int32_t)htonl(pDigest->timeStamp); + delta -= (int32_t)taosGetTimestampSec(); + if (abs(delta) > 900) { + tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); + code = TSDB_CODE_RPC_INVALID_TIME_STAMP; + } else { + if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { + // tDebug("%s, authentication failed, msg discarded", pConn->info); + code = TSDB_CODE_RPC_AUTH_FAILURE; + } else { + pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); + if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client + // tTrace("%s, message is authenticated", pConn->info); + } + } + } else { + tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi); + code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED; + } + + return code; +} + +// refers specifically to query or insert timeout +static void uvHandleActivityTimeout(uv_timer_t* handle) { + // impl later + SConn* conn = handle->data; +} + +static void uvProcessData(SConn* pConn) { + SRecvInfo info; + SRecvInfo* p = &info; + SConnBuffer* pBuf = &pConn->connBuf; + p->msg = pBuf->buf + RPC_RESERVE_SIZE; + p->msgLen = pBuf->len; + p->ip = 0; + p->port = 0; + p->shandle = pConn->shandle; // + p->thandle = pConn; + p->chandle = NULL; + + // + SRpcHead* pHead = (SRpcHead*)p->msg; + assert(rpcIsReq(pHead->msgType)); + + SRpcInfo* pRpc = (SRpcInfo*)p->shandle; + pConn->ahandle = (void*)pHead->ahandle; + // auth here + + int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen); + if (code != 0) { + terrno = code; + return; + } + pHead->code = htonl(pHead->code); + + SRpcMsg rpcMsg; + + pHead = rpcDecompressRpcMsg(pHead); + rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); + rpcMsg.pCont = pHead->content; + rpcMsg.msgType = pHead->msgType; + rpcMsg.code = pHead->code; + rpcMsg.ahandle = pConn->ahandle; + rpcMsg.handle = pConn; + + (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); + uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime, 0); + // auth + // validate msg type +} + +void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { + // opt + SConn* ctx = cli->data; + SConnBuffer* pBuf = &ctx->connBuf; + if (nread > 0) { + pBuf->len += nread; + if (readComplete(pBuf)) { + tDebug("alread read complete packet"); + uvProcessData(ctx); + } else { + tDebug("read half packet, continue to read"); + } + return; + } + if (terrno != 0) { + // handle err code + } + + if (nread != UV_EOF) { + tDebug("Read error %s\n", uv_err_name(nread)); + } + uv_close((uv_handle_t*)cli, uvConnDestroy); +} +void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + buf->base = malloc(sizeof(char)); + buf->len = 2; +} + +void uvOnTimeoutCb(uv_timer_t* handle) { + // opt + tDebug("time out"); +} + +void uvOnWriteCb(uv_write_t* req, int status) { + SConn* conn = req->data; + if (status == 0) { + tDebug("data already was written on stream"); + } else { + connDestroy(conn); + } + // opt +} + +void uvWorkerAsyncCb(uv_async_t* handle) { + SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync); + SConn* conn = NULL; + + // opt later + pthread_mutex_lock(&pThrd->connMtx); + if (!QUEUE_IS_EMPTY(&pThrd->conn)) { + queue* head = QUEUE_HEAD(&pThrd->conn); + conn = QUEUE_DATA(head, SConn, queue); + QUEUE_REMOVE(head); + } + pthread_mutex_unlock(&pThrd->connMtx); + if (conn == NULL) { + tError("except occurred, do nothing"); + return; + } + uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); + uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); +} + +void uvOnAcceptCb(uv_stream_t* stream, int status) { + if (status == -1) { + return; + } + SServerObj* pObj = container_of(stream, SServerObj, server); + + uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pObj->loop, cli); + + if (uv_accept(stream, (uv_stream_t*)cli) == 0) { + uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); + + uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); + + pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads; + tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); + uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb); + } else { + uv_close((uv_handle_t*)cli, NULL); + } +} +void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { + tDebug("connection coming"); + if (nread < 0) { + if (nread != UV_EOF) { + tError("read error %s", uv_err_name(nread)); + } + // TODO(log other failure reason) + uv_close((uv_handle_t*)q, NULL); + return; + } + // free memory allocated by + assert(nread == strlen(notify)); + assert(buf->base[0] == notify[0]); + free(buf->base); + + SWorkThrdObj* pThrd = q->data; + + uv_pipe_t* pipe = (uv_pipe_t*)q; + if (!uv_pipe_pending_count(pipe)) { + tError("No pending count"); + return; + } + + uv_handle_type pending = uv_pipe_pending_type(pipe); + assert(pending == UV_TCP); + + SConn* pConn = connCreate(); + pConn->shandle = pThrd->shandle; + /* init conn timer*/ + pConn->pTimer = malloc(sizeof(uv_timer_t)); + uv_timer_init(pThrd->loop, pConn->pTimer); + pConn->pTimer->data = pConn; + + pConn->hostThrd = pThrd; + pConn->pWorkerAsync = pThrd->workerAsync; // thread safty + + // init client handle + pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); + uv_tcp_init(pThrd->loop, pConn->pTcp); + pConn->pTcp->data = pConn; + + // init write request, just + pConn->pWriter = calloc(1, sizeof(uv_write_t)); + pConn->pWriter->data = pConn; + + if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { + uv_os_fd_t fd; + uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); + tDebug("new connection created: %d", fd); + uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); + } else { + connDestroy(pConn); + } +} + +void* acceptThread(void* arg) { + // opt + SServerObj* srv = (SServerObj*)arg; + uv_tcp_init(srv->loop, &srv->server); + + struct sockaddr_in bind_addr; + + uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); + uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); + int err = 0; + if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) { + tError("Listen error %s\n", uv_err_name(err)); + return NULL; + } + uv_run(srv->loop, UV_RUN_DEFAULT); +} +void* workerThread(void* arg) { + SWorkThrdObj* pThrd = (SWorkThrdObj*)arg; + + pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + uv_loop_init(pThrd->loop); + + // SRpcInfo* pRpc = pThrd->shandle; + uv_pipe_init(pThrd->loop, pThrd->pipe, 1); + uv_pipe_open(pThrd->pipe, pThrd->fd); + + pThrd->pipe->data = pThrd; + + QUEUE_INIT(&pThrd->conn); + pthread_mutex_init(&pThrd->connMtx, NULL); + + pThrd->workerAsync = malloc(sizeof(uv_async_t)); + uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb); + + uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); + uv_run(pThrd->loop, UV_RUN_DEFAULT); +} + +static SConn* connCreate() { + SConn* pConn = (SConn*)calloc(1, sizeof(SConn)); + return pConn; +} +static void connDestroy(SConn* conn) { + if (conn == NULL) { + return; + } + uv_timer_stop(conn->pTimer); + free(conn->pTimer); + uv_close((uv_handle_t*)conn->pTcp, NULL); + free(conn->connBuf.buf); + free(conn->pTcp); + free(conn->pWriter); + free(conn); + // handle +} +static void uvConnDestroy(uv_handle_t* handle) { + SConn* conn = handle->data; + connDestroy(conn); +} +static int rpcAddAuthPart(SConn* pConn, char* msg, int msgLen) { + SRpcHead* pHead = (SRpcHead*)msg; + + if (pConn->spi && pConn->secured == 0) { + // add auth part + pHead->spi = pConn->spi; + SRpcDigest* pDigest = (SRpcDigest*)(msg + msgLen); + pDigest->timeStamp = htonl(taosGetTimestampSec()); + msgLen += sizeof(SRpcDigest); + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); + } else { + pHead->spi = 0; + pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); + } + + return msgLen; +} + +void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { + SServerObj* srv = calloc(1, sizeof(SServerObj)); + srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + srv->numOfThreads = numOfThreads; + srv->workerIdx = 0; + srv->pThreadObj = (SWorkThrdObj**)calloc(srv->numOfThreads, sizeof(SWorkThrdObj*)); + srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*)); + srv->ip = ip; + srv->port = port; + uv_loop_init(srv->loop); + + for (int i = 0; i < srv->numOfThreads; i++) { + SWorkThrdObj* thrd = (SWorkThrdObj*)calloc(1, sizeof(SWorkThrdObj)); + srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); + int fds[2]; + if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { + return NULL; + } + uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); + uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write + + thrd->shandle = shandle; + thrd->fd = fds[0]; + thrd->pipe = &(srv->pipe[i][1]); // init read + int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); + if (err == 0) { + tDebug("sucess to create worker-thread %d", i); + // printf("thread %d create\n", i); + } else { + // TODO: clear all other resource later + tError("failed to create worker-thread %d", i); + } + srv->pThreadObj[i] = thrd; + } + + int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); + if (err == 0) { + tDebug("success to create accept-thread"); + } else { + // clear all resource later + } + + return srv; +} + +void rpcSendResponse(const SRpcMsg* pMsg) { + SConn* pConn = pMsg->handle; + SWorkThrdObj* pThrd = pConn->hostThrd; + + // opt later + pthread_mutex_lock(&pThrd->connMtx); + QUEUE_PUSH(&pThrd->conn, &pConn->queue); + pthread_mutex_unlock(&pThrd->connMtx); + + uv_async_send(pConn->pWorkerAsync); +} + +#endif diff --git a/source/libs/transport/test/transportTests.cc b/source/libs/transport/test/transportTests.cc index 151deaf29b..53910aa30c 100644 --- a/source/libs/transport/test/transportTests.cc +++ b/source/libs/transport/test/transportTests.cc @@ -22,6 +22,7 @@ #include #include +#include "transComm.h" #include "transportInt.h" #include "trpc.h" @@ -46,7 +47,7 @@ class QueueObj { if (!IsEmpty()) { queue *h = QUEUE_HEAD(&head); el = QUEUE_DATA(h, QueueElem, q); - QUEUE_REMOVE(&el->q); + QUEUE_REMOVE(h); } return el; } From caeb8ae159a6a8180bb3fed9e6cf5dbeba919b69 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 18 Jan 2022 18:14:43 +0800 Subject: [PATCH 2/8] add client --- source/libs/transport/inc/transportInt.h | 3 ++- source/libs/transport/src/transCli.c | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index 0addea5d1a..e39e0d9273 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -16,7 +16,9 @@ #ifndef _TD_TRANSPORT_INT_H_ #define _TD_TRANSPORT_INT_H_ +#ifdef USE_UV #include +#endif #include "lz4.h" #include "os.h" #include "rpcCache.h" @@ -29,7 +31,6 @@ #include "thash.h" #include "tidpool.h" #include "tmsg.h" -#include "transportInt.h" #include "tref.h" #include "trpc.h" #include "ttimer.h" diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index f848932c0f..bd54e8e57b 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -149,13 +149,14 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* // impl later SRpcInfo* pRpc = (SRpcInfo*)shandle; + int len = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); + SRpcReqContext* pContext; - int contLen = rpcCompressRpcMsg(pMsg->pCont, pMsg->contLen); pContext = (SRpcReqContext*)((char*)pMsg->pCont - sizeof(SRpcHead) - sizeof(SRpcReqContext)); pContext->ahandle = pMsg->ahandle; pContext->pRpc = (SRpcInfo*)shandle; pContext->epSet = *pEpSet; - pContext->contLen = contLen; + pContext->contLen = len; pContext->pCont = pMsg->pCont; pContext->msgType = pMsg->msgType; pContext->oldInUse = pEpSet->inUse; From ea57fba331c99db40d9cc3dc360d659105fb6cdc Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Tue, 18 Jan 2022 18:22:13 +0800 Subject: [PATCH 3/8] [modify] --- tests/script/sh/massiveTable/deployCluster.sh | 15 ++++++++------- tests/script/sh/massiveTable/setupDnodes.sh | 5 +++-- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/tests/script/sh/massiveTable/deployCluster.sh b/tests/script/sh/massiveTable/deployCluster.sh index 47802ea6a7..2341d512c0 100755 --- a/tests/script/sh/massiveTable/deployCluster.sh +++ b/tests/script/sh/massiveTable/deployCluster.sh @@ -8,17 +8,18 @@ set -e # deployCluster.sh curr_dir=$(pwd) +echo "currect pwd: ${curr_dir}" -source ./cleanCluster.sh -r /data -source ./cleanCluster.sh -r /data2 +./cleanCluster.sh -r "/data" +./cleanCluster.sh -r "/data2" -source ./compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0" +./compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0" -source ./setupDnodes.sh -r /data -n 1 -f trd02:7000 -p 7000 -source ./setupDnodes.sh -r /data2 -n 1 -f trd02:7000 -p 8000 +./setupDnodes.sh -r "/data" -n 1 -f "trd02:7000" -p 7000 +./setupDnodes.sh -r "/data2" -n 1 -f "trd02:7000" -p 8000 -#source ./setupDnodes.sh -r /data -n 2 -f trd02:7000 -p 7000 -#source ./setupDnodes.sh -r /data2 -n 2 -f trd02:7000 -p 8000 +#./setupDnodes.sh -r "/data" -n 2 -f trd02:7000 -p 7000 +#./setupDnodes.sh -r "/data2" -n 2 -f trd02:7000 -p 8000 diff --git a/tests/script/sh/massiveTable/setupDnodes.sh b/tests/script/sh/massiveTable/setupDnodes.sh index 034c15c4eb..e09b2fb396 100755 --- a/tests/script/sh/massiveTable/setupDnodes.sh +++ b/tests/script/sh/massiveTable/setupDnodes.sh @@ -94,7 +94,7 @@ createNewDnodesDataDir() { mkdir -p ${dataRootDir}/dnode_${i}/data createNewCfgFile ${dataRootDir}/dnode_${i}/cfg ${dataRootDir}/dnode_${i}/data ${dataRootDir}/dnode_${i}/log ${firstEp} ${serverPort} - echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}" + #echo "create dnode: ${serverPort}, ${dataRootDir}/dnode_${i}" serverPort=$((10#${serverPort}+100)) done } @@ -131,6 +131,7 @@ fi ## start all dnode by nohup startDnodes ${dnodeNumber} -echo " run setupDnodes.sh end !!!" +echo "====run setupDnodes.sh end====" +echo " " From 04e3df71d35dd855c38cdee54ce8d0bdd1bbb4c4 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 18 Jan 2022 18:39:10 +0800 Subject: [PATCH 4/8] add client --- source/libs/transport/src/transCli.c | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index bd54e8e57b..8ba1f1a0f6 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -59,6 +59,18 @@ static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // impl later } static void clientConnCb(struct uv_connect_s* req, int status) { + SCliConn* pConn = req->data; + SCliMsg* pMsg = pConn->data; + SEpSet* pEpSet = &pMsg->context->epSet; + + char* fqdn = pEpSet->fqdn[pEpSet->inUse]; + uint32_t port = pEpSet->port[pEpSet->inUse]; + if (status != 0) { + // call user fp later + tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status)); + return; + } + // impl later } @@ -83,6 +95,7 @@ static void clientAsyncCb(uv_async_t* handle) { SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port); if (conn != NULL) { + // impl later } else { SCliConn* conn = malloc(sizeof(SCliConn)); From e2d8dfafe8b17272f9baee1b9fb102ff36770ce5 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 18 Jan 2022 19:32:29 +0800 Subject: [PATCH 5/8] feature/qnode --- include/libs/scheduler/scheduler.h | 8 + include/util/taoserror.h | 1 + source/libs/scheduler/src/scheduler.c | 123 +++++++-- source/libs/scheduler/test/schedulerTests.cpp | 249 +++++++++++++++++- source/util/src/terror.c | 1 + 5 files changed, 359 insertions(+), 23 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 1f369067d6..3262b9437c 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -114,6 +114,14 @@ void schedulerDestroy(void); */ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks); +/** + * make one task info's multiple copies + * @param src + * @param dst SArray** + * @return + */ +int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum); + void schedulerFreeTaskList(SArray *taskList); diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a855e6d881..fc895069af 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -361,6 +361,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_QRY_TASK_DROPPING TAOS_DEF_ERROR_CODE(0, 0x0717) //"Task dropping") #define TSDB_CODE_QRY_DUPLICATTED_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0718) //"Duplicatted operation") #define TSDB_CODE_QRY_TASK_MSG_ERROR TAOS_DEF_ERROR_CODE(0, 0x0719) //"Task message error") +#define TSDB_CODE_QRY_JOB_FREED TAOS_DEF_ERROR_CODE(0, 0x071A) //"Job freed") // grant #define TSDB_CODE_GRANT_EXPIRED TAOS_DEF_ERROR_CODE(0, 0x0800) //"License expired") diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index c53926f8c1..ccf0c58ac0 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -20,12 +20,38 @@ static SSchedulerMgmt schMgmt = {0}; +uint64_t schGenTaskId(void) { + return atomic_add_fetch_64(&schMgmt.taskId, 1); +} + +uint64_t schGenUUID(void) { + static uint64_t hashId = 0; + static int32_t requestSerialId = 0; + + if (hashId == 0) { + char uid[64]; + int32_t code = taosGetSystemUUID(uid, tListLen(uid)); + if (code != TSDB_CODE_SUCCESS) { + qError("Failed to get the system uid, reason:%s", tstrerror(TAOS_SYSTEM_ERROR(errno))); + } else { + hashId = MurmurHash3_32(uid, strlen(uid)); + } + } + + int64_t ts = taosGetTimestampMs(); + uint64_t pid = taosGetPId(); + int32_t val = atomic_add_fetch_32(&requestSerialId, 1); + + uint64_t id = ((hashId & 0x0FFF) << 52) | ((pid & 0x0FFF) << 40) | ((ts & 0xFFFFFF) << 16) | (val & 0xFFFF); + return id; +} + int32_t schInitTask(SSchJob* pJob, SSchTask *pTask, SSubplan* pPlan, SSchLevel *pLevel) { pTask->plan = pPlan; pTask->level = pLevel; SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_NOT_START); - pTask->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); + pTask->taskId = schGenTaskId(); pTask->execAddrs = taosArrayInit(SCH_MAX_CONDIDATE_EP_NUM, sizeof(SQueryNodeAddr)); if (NULL == pTask->execAddrs) { SCH_TASK_ELOG("taosArrayInit %d exec addrs failed", SCH_MAX_CONDIDATE_EP_NUM); @@ -41,7 +67,7 @@ void schFreeTask(SSchTask* pTask) { } // TODO NEED TO VERFY WITH ASYNC_SEND MEMORY FREE - //tfree(pTask->msg); + tfree(pTask->msg); if (pTask->children) { taosArrayDestroy(pTask->children); @@ -141,7 +167,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { break; case JOB_TASK_STATUS_CANCELLED: case JOB_TASK_STATUS_DROPPING: - SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); break; default: @@ -541,12 +567,9 @@ int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, b return TSDB_CODE_SUCCESS; } - - -// Note: no more error processing, handled in function internal -int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { +int32_t schProcessOnJobFailureImpl(SSchJob *pJob, int32_t status, int32_t errCode) { // if already FAILED, no more processing - SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_FAILED)); + SCH_ERR_RET(schCheckAndUpdateJobStatus(pJob, status)); if (errCode) { atomic_store_32(&pJob->errCode, errCode); @@ -563,6 +586,17 @@ int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { +// Note: no more error processing, handled in function internal +int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode) { + SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_FAILED, errCode)); +} + +// Note: no more error processing, handled in function internal +int32_t schProcessOnJobDropped(SSchJob *pJob, int32_t errCode) { + SCH_RET(schProcessOnJobFailureImpl(pJob, JOB_TASK_STATUS_DROPPING, errCode)); +} + + // Note: no more error processing, handled in function internal int32_t schFetchFromRemote(SSchJob *pJob) { int32_t code = 0; @@ -871,7 +905,7 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in SSchJob **job = taosHashGet(schMgmt.jobs, &pParam->queryId, sizeof(pParam->queryId)); if (NULL == job || NULL == (*job)) { qError("QID:%"PRIx64" taosHashGet queryId not exist, may be dropped", pParam->queryId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + SCH_ERR_JRET(TSDB_CODE_QRY_JOB_FREED); } pJob = *job; @@ -880,13 +914,13 @@ int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, in int32_t s = taosHashGetSize(pJob->execTasks); if (s <= 0) { - qError("QID:%"PRIx64",TID:%"PRIx64" no task in execTask list", pParam->queryId, pParam->taskId); + qError("QID:%"PRIx64",TID:%"PRId64" no task in execTask list", pParam->queryId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } SSchTask **task = taosHashGet(pJob->execTasks, &pParam->taskId, sizeof(pParam->taskId)); if (NULL == task || NULL == (*task)) { - qError("QID:%"PRIx64",TID:%"PRIx64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId); + qError("QID:%"PRIx64",TID:%"PRId64" taosHashGet taskId not exist", pParam->queryId, pParam->taskId); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); } @@ -1434,7 +1468,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { pMsg->sId = htobe64(schMgmt.sId); pMsg->queryId = htobe64(plan->id.queryId); - pMsg->taskId = htobe64(atomic_add_fetch_64(&schMgmt.taskId, 1)); + pMsg->taskId = htobe64(schGenUUID()); pMsg->contentLen = htonl(msgLen); memcpy(pMsg->msg, msg, msgLen); @@ -1457,6 +1491,52 @@ _return: SCH_RET(code); } +int32_t schedulerCopyTask(STaskInfo *src, SArray **dst, int32_t copyNum) { + if (NULL == src || NULL == dst || copyNum <= 0) { + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); + } + + int32_t code = 0; + + *dst = taosArrayInit(copyNum, sizeof(STaskInfo)); + if (NULL == *dst) { + qError("taosArrayInit %d taskInfo failed", copyNum); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + int32_t msgSize = src->msg->contentLen + sizeof(*src->msg); + STaskInfo info = {0}; + + info.addr = src->addr; + + for (int32_t i = 0; i < copyNum; ++i) { + info.msg = malloc(msgSize); + if (NULL == info.msg) { + qError("malloc %d failed", msgSize); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + memcpy(info.msg, src->msg, msgSize); + + info.msg->taskId = schGenUUID(); + + if (NULL == taosArrayPush(*dst, &info)) { + qError("taosArrayPush failed, idx:%d", i); + free(info.msg); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + } + + return TSDB_CODE_SUCCESS; + +_return: + + schedulerFreeTaskList(*dst); + *dst = NULL; + + SCH_RET(code); +} + int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { if (NULL == pJob || NULL == pData) { @@ -1464,14 +1544,15 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { } int32_t code = 0; + atomic_add_fetch_32(&pJob->ref, 1); + int8_t status = SCH_GET_JOB_STATUS(pJob); if (status == JOB_TASK_STATUS_DROPPING) { SCH_JOB_ELOG("job is dropping, status:%d", status); - return TSDB_CODE_SCH_STATUS_ERROR; + atomic_sub_fetch_32(&pJob->ref, 1); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - atomic_add_fetch_32(&pJob->ref, 1); - if (!SCH_JOB_NEED_FETCH(&pJob->attr)) { SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob)); atomic_sub_fetch_32(&pJob->ref, 1); @@ -1484,7 +1565,7 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } - if (status == JOB_TASK_STATUS_FAILED) { + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { *pData = atomic_load_ptr(&pJob->res); atomic_store_ptr(&pJob->res, NULL); SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); @@ -1500,7 +1581,7 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { status = SCH_GET_JOB_STATUS(pJob); - if (status == JOB_TASK_STATUS_FAILED) { + if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { code = atomic_load_32(&pJob->errCode); SCH_ERR_JRET(code); } @@ -1547,6 +1628,7 @@ void scheduleFreeJob(void *job) { SSchJob *pJob = job; uint64_t queryId = pJob->queryId; + bool setJobFree = false; if (SCH_GET_JOB_STATUS(pJob) > 0) { if (0 != taosHashRemove(schMgmt.jobs, &pJob->queryId, sizeof(pJob->queryId))) { @@ -1554,8 +1636,6 @@ void scheduleFreeJob(void *job) { return; } - schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_DROPPING); - SCH_JOB_DLOG("job removed from list, no further ref, ref:%d", atomic_load_32(&pJob->ref)); while (true) { @@ -1563,6 +1643,11 @@ void scheduleFreeJob(void *job) { if (0 == ref) { break; } else if (ref > 0) { + if (1 == ref && atomic_load_8(&pJob->userFetch) > 0 && !setJobFree) { + schProcessOnJobDropped(pJob, TSDB_CODE_QRY_JOB_FREED); + setJobFree = true; + } + usleep(1); } else { assert(0); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 5332c6fcd1..58159f2306 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -38,6 +38,20 @@ namespace { extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); +extern "C" int32_t schHandleCallback(void* param, const SDataBuf* pMsg, int32_t msgType, int32_t rspCode); + +struct SSchJob *pInsertJob = NULL; +struct SSchJob *pQueryJob = NULL; + +uint64_t schtMergeTemplateId = 0x4; +uint64_t schtFetchTaskId = 0; +uint64_t schtQueryId = 1; + +bool schtTestStop = false; +bool schtTestDeadLoop = false; +int32_t schtTestMTRunSec = 60; +int32_t schtTestPrintNum = 1000; +int32_t schtStartFetch = 0; void schtInitLogFile() { @@ -57,7 +71,7 @@ void schtInitLogFile() { void schtBuildQueryDag(SQueryDag *dag) { - uint64_t qId = 0x0000000000000001; + uint64_t qId = schtQueryId; dag->queryId = qId; dag->numOfSubplans = 2; @@ -84,7 +98,7 @@ void schtBuildQueryDag(SQueryDag *dag) { scanPlan->msgType = TDMT_VND_QUERY; mergePlan->id.queryId = qId; - mergePlan->id.templateId = 0x4444444444; + mergePlan->id.templateId = schtMergeTemplateId; mergePlan->id.subplanId = 0x5555555555; mergePlan->type = QUERY_TYPE_MERGE; mergePlan->level = 0; @@ -269,15 +283,225 @@ void *schtCreateFetchRspThread(void *param) { SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp)); rsp->completed = 1; rsp->numOfRows = 10; + code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(rsp), 0); assert(code == 0); } +void *schtFetchRspThread(void *aa) { + SDataBuf dataBuf = {0}; + SSchCallbackParam* param = NULL; + + while (!schtTestStop) { + if (0 == atomic_val_compare_exchange_32(&schtStartFetch, 1, 0)) { + continue; + } + + usleep(1); + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + + param->queryId = schtQueryId; + param->taskId = schtFetchTaskId; + + int32_t code = 0; + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp)); + rsp->completed = 1; + rsp->numOfRows = 10; + + dataBuf.pData = rsp; + dataBuf.len = sizeof(*rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_FETCH_RSP, 0); + + assert(code == 0 || code); + } +} + +void schtFreeQueryJob(int32_t freeThread) { + static uint32_t freeNum = 0; + SSchJob *job = atomic_load_ptr(&pQueryJob); + + if (job && atomic_val_compare_exchange_ptr(&pQueryJob, job, NULL)) { + scheduleFreeJob(job); + if (freeThread) { + if (++freeNum % schtTestPrintNum == 0) { + printf("FreeNum:%d\n", freeNum); + } + } + } +} + +void* schtRunJobThread(void *aa) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + SVgroupInfo vgInfo = {0}; + SQueryDag dag = {0}; + + schtInitLogFile(); + + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + + SEpAddr qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + + int32_t code = schedulerInit(NULL); + assert(code == 0); + + + schtSetPlanToString(); + schtSetExecNode(); + schtSetAsyncSendMsgToServer(); + + SSchJob *job = NULL; + SSchCallbackParam *param = NULL; + SHashObj *execTasks = NULL; + SDataBuf dataBuf = {0}; + uint32_t jobFinished = 0; + + while (!schtTestStop) { + schtBuildQueryDag(&dag); + + code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &job); + assert(code == 0); + + execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + void *pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + schtFetchTaskId = task->taskId - 1; + + taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task)); + pIter = taosHashIterate(job->execTasks, pIter); + } + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + param->queryId = schtQueryId; + + pQueryJob = job; + + + pIter = taosHashIterate(execTasks, NULL); + while (pIter) { + SSchTask *task = (SSchTask *)pIter; + + param->taskId = task->taskId; + SQueryTableRsp rsp = {0}; + dataBuf.pData = &rsp; + dataBuf.len = sizeof(rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0); + assert(code == 0 || code); + + pIter = taosHashIterate(execTasks, pIter); + } + + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + param->queryId = schtQueryId; + + pIter = taosHashIterate(execTasks, NULL); + while (pIter) { + SSchTask *task = (SSchTask *)pIter; + + param->taskId = task->taskId; + SResReadyRsp rsp = {0}; + dataBuf.pData = &rsp; + dataBuf.len = sizeof(rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0); + assert(code == 0 || code); + + pIter = taosHashIterate(execTasks, pIter); + } + + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + param->queryId = schtQueryId; + + pIter = taosHashIterate(execTasks, NULL); + while (pIter) { + SSchTask *task = (SSchTask *)pIter; + + param->taskId = task->taskId - 1; + SQueryTableRsp rsp = {0}; + dataBuf.pData = &rsp; + dataBuf.len = sizeof(rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_QUERY_RSP, 0); + assert(code == 0 || code); + + pIter = taosHashIterate(execTasks, pIter); + } + + + param = (SSchCallbackParam *)calloc(1, sizeof(*param)); + param->queryId = schtQueryId; + + pIter = taosHashIterate(execTasks, NULL); + while (pIter) { + SSchTask *task = (SSchTask *)pIter; + + param->taskId = task->taskId - 1; + SResReadyRsp rsp = {0}; + dataBuf.pData = &rsp; + dataBuf.len = sizeof(rsp); + + code = schHandleCallback(param, &dataBuf, TDMT_VND_RES_READY_RSP, 0); + assert(code == 0 || code); + + pIter = taosHashIterate(execTasks, pIter); + } + + atomic_store_32(&schtStartFetch, 1); + + void *data = NULL; + code = scheduleFetchRows(pQueryJob, &data); + assert(code == 0 || code); + + if (0 == code) { + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; + assert(pRsp->completed == 1); + assert(pRsp->numOfRows == 10); + } + + data = NULL; + code = scheduleFetchRows(pQueryJob, &data); + assert(code == 0 || code); + + assert(data == (void*)NULL); + + schtFreeQueryJob(0); + + taosHashCleanup(execTasks); + + schtFreeQueryDag(&dag); + + if (++jobFinished % schtTestPrintNum == 0) { + printf("jobFinished:%d\n", jobFinished); + } + + ++schtQueryId; + } + + schedulerDestroy(); + +} + +void* schtFreeJobThread(void *aa) { + while (!schtTestStop) { + usleep(rand() % 2000); + schtFreeQueryJob(1); + } +} -struct SSchJob *pInsertJob = NULL; } TEST(queryTest, normalCase) { @@ -427,13 +651,30 @@ TEST(insertTest, normalCase) { } TEST(multiThread, forceFree) { + pthread_attr_t thattr; + pthread_attr_init(&thattr); - schtInitLogFile(); + pthread_t thread1, thread2, thread3; + pthread_create(&(thread1), &thattr, schtRunJobThread, NULL); + pthread_create(&(thread2), &thattr, schtFreeJobThread, NULL); + pthread_create(&(thread3), &thattr, schtFetchRspThread, NULL); + while (true) { + if (schtTestDeadLoop) { + sleep(1); + } else { + sleep(schtTestMTRunSec); + break; + } + } + + schtTestStop = true; + sleep(3); } int main(int argc, char** argv) { + srand(time(NULL)); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); } diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 144de08cd0..f520585315 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -360,6 +360,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_CANCELLING, "Task cancelling") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_DROPPING, "Task dropping") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_DUPLICATTED_OPERATION, "Duplicatted operation") TAOS_DEFINE_ERROR(TSDB_CODE_QRY_TASK_MSG_ERROR, "Task message error") +TAOS_DEFINE_ERROR(TSDB_CODE_QRY_JOB_FREED, "Job already freed") From df62f45256b3c884ffe7e4c15710fd49fa37f1ab Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 18 Jan 2022 21:23:40 +0800 Subject: [PATCH 6/8] add client --- source/libs/transport/src/trans.c | 13 +- source/libs/transport/src/transCli.c | 38 +- source/libs/transport/src/transport.c | 773 -------------------------- source/libs/transport/test/rclient.c | 6 +- 4 files changed, 37 insertions(+), 793 deletions(-) delete mode 100644 source/libs/transport/src/transport.c diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index a59f2f88c7..89361b13ad 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -42,7 +42,18 @@ void* rpcOpen(const SRpcInit* pInit) { return pRpc; } void rpcClose(void* arg) { return; } -void* rpcMallocCont(int contLen) { return NULL; } +void* rpcMallocCont(int contLen) { + int size = contLen + RPC_MSG_OVERHEAD; + + char* start = (char*)calloc(1, (size_t)size); + if (start == NULL) { + tError("failed to malloc msg, size:%d", size); + return NULL; + } else { + tTrace("malloc mem:%p size:%d", start, size); + } + return start + sizeof(SRpcReqContext) + sizeof(SRpcHead); +} void rpcFreeCont(void* cont) { return; } void* rpcReallocCont(void* ptr, int contLen) { return NULL; } diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 8ba1f1a0f6..29f3361b10 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -55,6 +55,10 @@ static void* clientThread(void* arg); static void clientWriteCb(uv_write_t* req, int status) { // impl later } +static void clientFailedCb(uv_handle_t* handle) { + // impl later + tDebug("close handle"); +} static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // impl later } @@ -68,8 +72,10 @@ static void clientConnCb(struct uv_connect_s* req, int status) { if (status != 0) { // call user fp later tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status)); + uv_close((uv_handle_t*)req->handle, clientFailedCb); return; } + assert(pConn->stream == req->handle); // impl later } @@ -123,19 +129,6 @@ static void clientAsyncCb(uv_async_t* handle) { static void* clientThread(void* arg) { SCliThrdObj* pThrd = (SCliThrdObj*)arg; - - QUEUE_INIT(&pThrd->msg); - pthread_mutex_init(&pThrd->msgMtx, NULL); - - // QUEUE_INIT(&pThrd->clientCache); - - pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - uv_loop_init(pThrd->loop); - - pThrd->cliAsync = malloc(sizeof(uv_async_t)); - uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); - pThrd->cliAsync->data = pThrd; - uv_run(pThrd->loop, UV_RUN_DEFAULT); } @@ -146,14 +139,25 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, cli->pThreadObj = (SCliThrdObj**)calloc(cli->numOfThreads, sizeof(SCliThrdObj*)); for (int i = 0; i < cli->numOfThreads; i++) { - SCliThrdObj* thrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); + SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); + QUEUE_INIT(&pThrd->msg); + pthread_mutex_init(&pThrd->msgMtx, NULL); - thrd->shandle = shandle; - int err = pthread_create(&thrd->thread, NULL, clientThread, (void*)(thrd)); + // QUEUE_INIT(&pThrd->clientCache); + + pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); + uv_loop_init(pThrd->loop); + + pThrd->cliAsync = malloc(sizeof(uv_async_t)); + uv_async_init(pThrd->loop, pThrd->cliAsync, clientAsyncCb); + pThrd->cliAsync->data = pThrd; + + pThrd->shandle = shandle; + int err = pthread_create(&pThrd->thread, NULL, clientThread, (void*)(pThrd)); if (err == 0) { tDebug("sucess to create tranport-client thread %d", i); } - cli->pThreadObj[i] = thrd; + cli->pThreadObj[i] = pThrd; } return cli; } diff --git a/source/libs/transport/src/transport.c b/source/libs/transport/src/transport.c deleted file mode 100644 index 6cc2ca8c49..0000000000 --- a/source/libs/transport/src/transport.c +++ /dev/null @@ -1,773 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifdef USE_UV - -#include -#include "lz4.h" -#include "os.h" -#include "rpcCache.h" -#include "rpcHead.h" -#include "rpcLog.h" -#include "rpcTcp.h" -#include "rpcUdp.h" -#include "taoserror.h" -#include "tglobal.h" -#include "thash.h" -#include "tidpool.h" -#include "tmd5.h" -#include "tmempool.h" -#include "tmsg.h" -#include "transportInt.h" -#include "tref.h" -#include "trpc.h" -#include "ttimer.h" -#include "tutil.h" - -#define container_of(ptr, type, member) ((type*)((char*)(ptr)-offsetof(type, member))) -#define RPC_RESERVE_SIZE (sizeof(SRpcReqContext)) -static const char* notify = "a"; - -typedef struct { - int sessions; // number of sessions allowed - int numOfThreads; // number of threads to process incoming messages - int idleTime; // milliseconds; - uint16_t localPort; - int8_t connType; - int index; // for UDP server only, round robin for multiple threads - char label[TSDB_LABEL_LEN]; - - char user[TSDB_UNI_LEN]; // meter ID - char spi; // security parameter index - char encrypt; // encrypt algorithm - char secret[TSDB_PASSWORD_LEN]; // secret for the link - char ckey[TSDB_PASSWORD_LEN]; // ciphering key - - void (*cfp)(void* parent, SRpcMsg*, SEpSet*); - int (*afp)(void* parent, char* user, char* spi, char* encrypt, char* secret, char* ckey); - - int32_t refCount; - void* parent; - void* idPool; // handle to ID pool - void* tmrCtrl; // handle to timer - SHashObj* hash; // handle returned by hash utility - void* tcphandle; // returned handle from TCP initialization - void* udphandle; // returned handle from UDP initialization - void* pCache; // connection cache - pthread_mutex_t mutex; - struct SRpcConn* connList; // connection list -} SRpcInfo; - -typedef struct { - SRpcInfo* pRpc; // associated SRpcInfo - SEpSet epSet; // ip list provided by app - void* ahandle; // handle provided by app - struct SRpcConn* pConn; // pConn allocated - tmsg_t msgType; // message type - uint8_t* pCont; // content provided by app - int32_t contLen; // content length - int32_t code; // error code - int16_t numOfTry; // number of try for different servers - int8_t oldInUse; // server EP inUse passed by app - int8_t redirect; // flag to indicate redirect - int8_t connType; // connection type - int64_t rid; // refId returned by taosAddRef - SRpcMsg* pRsp; // for synchronous API - tsem_t* pSem; // for synchronous API - SEpSet* pSet; // for synchronous API - char msg[0]; // RpcHead starts from here -} SRpcReqContext; - -typedef struct SThreadObj { - pthread_t thread; - uv_pipe_t* pipe; - int fd; - uv_loop_t* loop; - uv_async_t* workerAsync; // - queue conn; - pthread_mutex_t connMtx; - void* shandle; -} SThreadObj; - -typedef struct SClientObj { - char label[TSDB_LABEL_LEN]; - int32_t index; - int numOfThreads; - SThreadObj** pThreadObj; -} SClientObj; - -#define RPC_MSG_OVERHEAD (sizeof(SRpcReqContext) + sizeof(SRpcHead) + sizeof(SRpcDigest)) -#define rpcHeadFromCont(cont) ((SRpcHead*)((char*)cont - sizeof(SRpcHead))) -#define rpcContFromHead(msg) (msg + sizeof(SRpcHead)) -#define rpcMsgLenFromCont(contLen) (contLen + sizeof(SRpcHead)) -#define rpcContLenFromMsg(msgLen) (msgLen - sizeof(SRpcHead)) -#define rpcIsReq(type) (type & 1U) - -typedef struct SServerObj { - pthread_t thread; - uv_tcp_t server; - uv_loop_t* loop; - int workerIdx; - int numOfThreads; - SThreadObj** pThreadObj; - uv_pipe_t** pipe; - uint32_t ip; - uint32_t port; -} SServerObj; - -typedef struct SConnBuffer { - char* buf; - int len; - int cap; - int left; -} SConnBuffer; - -typedef struct SRpcConn { - uv_tcp_t* pTcp; - uv_write_t* pWriter; - uv_timer_t* pTimer; - - uv_async_t* pWorkerAsync; - queue queue; - int ref; - int persist; // persist connection or not - SConnBuffer connBuf; // read buf, - SConnBuffer writeBuf; // write buf - int count; - void* shandle; // rpc init - void* ahandle; // - void* hostThread; - // del later - char secured; - int spi; - char info[64]; - char user[TSDB_UNI_LEN]; // user ID for the link - char secret[TSDB_PASSWORD_LEN]; - char ckey[TSDB_PASSWORD_LEN]; // ciphering key -} SRpcConn; - -// auth function -static int uvAuthMsg(SRpcConn* pConn, char* msg, int msgLen); -static int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey); -static void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey); -static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen); -// compress data -static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen); -static SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead); - -static void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); -static void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); -static void uvOnTimeoutCb(uv_timer_t* handle); -static void uvOnWriteCb(uv_write_t* req, int status); -static void uvOnAcceptCb(uv_stream_t* stream, int status); -static void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf); -static void uvWorkerAsyncCb(uv_async_t* handle); - -static SRpcConn* connCreate(); -static void connDestroy(SRpcConn* conn); -static void uvConnDestroy(uv_handle_t* handle); - -static void* workerThread(void* arg); -static void* acceptThread(void* arg); - -void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); -void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle); - -void* (*taosHandle[])(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) = {taosInitServer, taosInitClient}; - -void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { - SClientObj* cli = calloc(1, sizeof(SClientObj)); - memcpy(cli->label, label, strlen(label)); - cli->numOfThreads = numOfThreads; - cli->pThreadObj = (SThreadObj**)calloc(cli->numOfThreads, sizeof(SThreadObj*)); - - for (int i = 0; i < cli->numOfThreads; i++) { - SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); - - int err = pthread_create(&thrd->thread, NULL, workerThread, (void*)(thrd)); - if (err == 0) { - tDebug("sucess to create tranport-client thread %d", i); - } - } - return cli; -} - -void* taosInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, void* fp, void* shandle) { - SServerObj* srv = calloc(1, sizeof(SServerObj)); - srv->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - srv->numOfThreads = numOfThreads; - srv->workerIdx = 0; - srv->pThreadObj = (SThreadObj**)calloc(srv->numOfThreads, sizeof(SThreadObj*)); - srv->pipe = (uv_pipe_t**)calloc(srv->numOfThreads, sizeof(uv_pipe_t*)); - srv->ip = ip; - srv->port = port; - uv_loop_init(srv->loop); - - for (int i = 0; i < srv->numOfThreads; i++) { - SThreadObj* thrd = (SThreadObj*)calloc(1, sizeof(SThreadObj)); - srv->pipe[i] = (uv_pipe_t*)calloc(2, sizeof(uv_pipe_t)); - int fds[2]; - if (uv_socketpair(AF_UNIX, SOCK_STREAM, fds, UV_NONBLOCK_PIPE, UV_NONBLOCK_PIPE) != 0) { - return NULL; - } - uv_pipe_init(srv->loop, &(srv->pipe[i][0]), 1); - uv_pipe_open(&(srv->pipe[i][0]), fds[1]); // init write - - thrd->shandle = shandle; - thrd->fd = fds[0]; - thrd->pipe = &(srv->pipe[i][1]); // init read - int err = pthread_create(&(thrd->thread), NULL, workerThread, (void*)(thrd)); - if (err == 0) { - tDebug("sucess to create worker-thread %d", i); - // printf("thread %d create\n", i); - } else { - // TODO: clear all other resource later - tError("failed to create worker-thread %d", i); - } - srv->pThreadObj[i] = thrd; - } - - int err = pthread_create(&srv->thread, NULL, acceptThread, (void*)srv); - if (err == 0) { - tDebug("success to create accept-thread"); - } else { - // clear all resource later - } - - return srv; -} -void uvAllocReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - /* - * formate of data buffer: - * |<-------SRpcReqContext------->|<------------data read from socket----------->| - */ - static const int CAPACITY = 1024; - - SRpcConn* ctx = handle->data; - SConnBuffer* pBuf = &ctx->connBuf; - if (pBuf->cap == 0) { - pBuf->buf = (char*)calloc(CAPACITY + RPC_RESERVE_SIZE, sizeof(char)); - pBuf->len = 0; - pBuf->cap = CAPACITY; - pBuf->left = -1; - - buf->base = pBuf->buf + RPC_RESERVE_SIZE; - buf->len = CAPACITY; - } else { - if (pBuf->len >= pBuf->cap) { - if (pBuf->left == -1) { - pBuf->cap *= 2; - pBuf->buf = realloc(pBuf->buf, pBuf->cap + RPC_RESERVE_SIZE); - } else if (pBuf->len + pBuf->left > pBuf->cap) { - pBuf->cap = pBuf->len + pBuf->left; - pBuf->buf = realloc(pBuf->buf, pBuf->len + pBuf->left + RPC_RESERVE_SIZE); - } - } - buf->base = pBuf->buf + pBuf->len + RPC_RESERVE_SIZE; - buf->len = pBuf->cap - pBuf->len; - } -} -// check data read from socket completely or not -// -static bool isReadAll(SConnBuffer* data) { - // TODO(yihao): handle pipeline later - SRpcHead rpcHead; - int32_t headLen = sizeof(rpcHead); - if (data->len >= headLen) { - memcpy((char*)&rpcHead, data->buf + RPC_RESERVE_SIZE, headLen); - int32_t msgLen = (int32_t)htonl((uint32_t)rpcHead.msgLen); - if (msgLen > data->len) { - data->left = msgLen - data->len; - return false; - } else { - return true; - } - } else { - return false; - } -} -static void uvDoProcess(SRecvInfo* pRecv) { - SRpcHead* pHead = (SRpcHead*)pRecv->msg; - SRpcInfo* pRpc = (SRpcInfo*)pRecv->shandle; - SRpcConn* pConn = pRecv->thandle; - - tDump(pRecv->msg, pRecv->msgLen); - - terrno = 0; - SRpcReqContext* pContest; - - // do auth and check -} -static int uvAuthMsg(SRpcConn* pConn, char* msg, int len) { - SRpcHead* pHead = (SRpcHead*)msg; - int code = 0; - - if ((pConn->secured && pHead->spi == 0) || (pHead->spi == 0 && pConn->spi == 0)) { - // secured link, or no authentication - pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); - // tTrace("%s, secured link, no auth is required", pConn->info); - return 0; - } - - if (!rpcIsReq(pHead->msgType)) { - // for response, if code is auth failure, it shall bypass the auth process - code = htonl(pHead->code); - if (code == TSDB_CODE_RPC_INVALID_TIME_STAMP || code == TSDB_CODE_RPC_AUTH_FAILURE || code == TSDB_CODE_RPC_INVALID_VERSION || code == TSDB_CODE_RPC_AUTH_REQUIRED || - code == TSDB_CODE_MND_USER_NOT_EXIST || code == TSDB_CODE_RPC_NOT_READY) { - pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen); - // tTrace("%s, dont check authentication since code is:0x%x", pConn->info, code); - return 0; - } - } - - code = 0; - if (pHead->spi == pConn->spi) { - // authentication - SRpcDigest* pDigest = (SRpcDigest*)((char*)pHead + len - sizeof(SRpcDigest)); - - int32_t delta; - delta = (int32_t)htonl(pDigest->timeStamp); - delta -= (int32_t)taosGetTimestampSec(); - if (abs(delta) > 900) { - tWarn("%s, time diff:%d is too big, msg discarded", pConn->info, delta); - code = TSDB_CODE_RPC_INVALID_TIME_STAMP; - } else { - if (rpcAuthenticateMsg(pHead, len - TSDB_AUTH_LEN, pDigest->auth, pConn->secret) < 0) { - // tDebug("%s, authentication failed, msg discarded", pConn->info); - code = TSDB_CODE_RPC_AUTH_FAILURE; - } else { - pHead->msgLen = (int32_t)htonl((uint32_t)pHead->msgLen) - sizeof(SRpcDigest); - if (!rpcIsReq(pHead->msgType)) pConn->secured = 1; // link is secured for client - // tTrace("%s, message is authenticated", pConn->info); - } - } - } else { - tDebug("%s, auth spi:%d not matched with received:%d", pConn->info, pConn->spi, pHead->spi); - code = pHead->spi ? TSDB_CODE_RPC_AUTH_FAILURE : TSDB_CODE_RPC_AUTH_REQUIRED; - } - - return code; -} -// refers specifically to query or insert timeout -static void uvHandleActivityTimeout(uv_timer_t* handle) { - // impl later - SRpcConn* conn = handle->data; -} -static void uvProcessData(SRpcConn* pConn) { - SRecvInfo info; - SRecvInfo* p = &info; - SConnBuffer* pBuf = &pConn->connBuf; - p->msg = pBuf->buf + RPC_RESERVE_SIZE; - p->msgLen = pBuf->len; - p->ip = 0; - p->port = 0; - p->shandle = pConn->shandle; // - p->thandle = pConn; - p->chandle = NULL; - - // - SRpcHead* pHead = (SRpcHead*)p->msg; - assert(rpcIsReq(pHead->msgType)); - - SRpcInfo* pRpc = (SRpcInfo*)p->shandle; - pConn->ahandle = (void*)pHead->ahandle; - // auth here - - int8_t code = uvAuthMsg(pConn, (char*)pHead, p->msgLen); - if (code != 0) { - terrno = code; - return; - } - pHead->code = htonl(pHead->code); - - SRpcMsg rpcMsg; - - pHead = rpcDecompressRpcMsg(pHead); - rpcMsg.contLen = rpcContLenFromMsg(pHead->msgLen); - rpcMsg.pCont = pHead->content; - rpcMsg.msgType = pHead->msgType; - rpcMsg.code = pHead->code; - rpcMsg.ahandle = pConn->ahandle; - rpcMsg.handle = pConn; - - (*(pRpc->cfp))(pRpc->parent, &rpcMsg, NULL); - uv_timer_start(pConn->pTimer, uvHandleActivityTimeout, pRpc->idleTime, 0); - // auth - // validate msg type -} -void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { - // opt - SRpcConn* ctx = cli->data; - SConnBuffer* pBuf = &ctx->connBuf; - if (nread > 0) { - pBuf->len += nread; - if (isReadAll(pBuf)) { - tDebug("alread read complete packet"); - uvProcessData(ctx); - } else { - tDebug("read half packet, continue to read"); - } - return; - } - if (terrno != 0) { - // handle err code - } - - if (nread != UV_EOF) { - tDebug("Read error %s\n", uv_err_name(nread)); - } - uv_close((uv_handle_t*)cli, uvConnDestroy); -} -void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - buf->base = malloc(sizeof(char)); - buf->len = 2; -} - -void uvOnTimeoutCb(uv_timer_t* handle) { - // opt - tDebug("time out"); -} - -void uvOnWriteCb(uv_write_t* req, int status) { - SRpcConn* conn = req->data; - if (status == 0) { - tDebug("data already was written on stream"); - } else { - connDestroy(conn); - } - // opt -} - -void uvWorkerAsyncCb(uv_async_t* handle) { - SThreadObj* pThrd = container_of(handle, SThreadObj, workerAsync); - SRpcConn* conn = NULL; - - // opt later - pthread_mutex_lock(&pThrd->connMtx); - if (!QUEUE_IS_EMPTY(&pThrd->conn)) { - queue* head = QUEUE_HEAD(&pThrd->conn); - conn = QUEUE_DATA(head, SRpcConn, queue); - QUEUE_REMOVE(&conn->queue); - } - pthread_mutex_unlock(&pThrd->connMtx); - if (conn == NULL) { - tError("except occurred, do nothing"); - return; - } - uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); - uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); -} - -void uvOnAcceptCb(uv_stream_t* stream, int status) { - if (status == -1) { - return; - } - SServerObj* pObj = container_of(stream, SServerObj, server); - - uv_tcp_t* cli = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); - uv_tcp_init(pObj->loop, cli); - - if (uv_accept(stream, (uv_stream_t*)cli) == 0) { - uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t)); - - uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); - - pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads; - tDebug("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); - uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnWriteCb); - } else { - uv_close((uv_handle_t*)cli, NULL); - } -} -void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { - tDebug("connection coming"); - if (nread < 0) { - if (nread != UV_EOF) { - tError("read error %s", uv_err_name(nread)); - } - // TODO(log other failure reason) - uv_close((uv_handle_t*)q, NULL); - return; - } - // free memory allocated by - assert(nread == strlen(notify)); - assert(buf->base[0] == notify[0]); - free(buf->base); - - SThreadObj* pThrd = q->data; - - uv_pipe_t* pipe = (uv_pipe_t*)q; - if (!uv_pipe_pending_count(pipe)) { - tError("No pending count"); - return; - } - - uv_handle_type pending = uv_pipe_pending_type(pipe); - assert(pending == UV_TCP); - - SRpcConn* pConn = connCreate(); - pConn->shandle = pThrd->shandle; - /* init conn timer*/ - pConn->pTimer = malloc(sizeof(uv_timer_t)); - uv_timer_init(pThrd->loop, pConn->pTimer); - pConn->pTimer->data = pConn; - - pConn->hostThread = pThrd; - pConn->pWorkerAsync = pThrd->workerAsync; // thread safty - - // init client handle - pConn->pTcp = (uv_tcp_t*)malloc(sizeof(uv_tcp_t)); - uv_tcp_init(pThrd->loop, pConn->pTcp); - pConn->pTcp->data = pConn; - - // init write request, just - pConn->pWriter = calloc(1, sizeof(uv_write_t)); - pConn->pWriter->data = pConn; - - if (uv_accept(q, (uv_stream_t*)(pConn->pTcp)) == 0) { - uv_os_fd_t fd; - uv_fileno((const uv_handle_t*)pConn->pTcp, &fd); - tDebug("new connection created: %d", fd); - uv_read_start((uv_stream_t*)(pConn->pTcp), uvAllocReadBufferCb, uvOnReadCb); - } else { - connDestroy(pConn); - } -} - -void* acceptThread(void* arg) { - // opt - SServerObj* srv = (SServerObj*)arg; - uv_tcp_init(srv->loop, &srv->server); - - struct sockaddr_in bind_addr; - - uv_ip4_addr("0.0.0.0", srv->port, &bind_addr); - uv_tcp_bind(&srv->server, (const struct sockaddr*)&bind_addr, 0); - int err = 0; - if ((err = uv_listen((uv_stream_t*)&srv->server, 128, uvOnAcceptCb)) != 0) { - tError("Listen error %s\n", uv_err_name(err)); - return NULL; - } - uv_run(srv->loop, UV_RUN_DEFAULT); -} -void* workerThread(void* arg) { - SThreadObj* pThrd = (SThreadObj*)arg; - - pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); - uv_loop_init(pThrd->loop); - - uv_pipe_init(pThrd->loop, pThrd->pipe, 1); - uv_pipe_open(pThrd->pipe, pThrd->fd); - - pThrd->pipe->data = pThrd; - - QUEUE_INIT(&pThrd->conn); - - pThrd->workerAsync = malloc(sizeof(uv_async_t)); - uv_async_init(pThrd->loop, pThrd->workerAsync, uvWorkerAsyncCb); - - uv_read_start((uv_stream_t*)pThrd->pipe, uvAllocConnBufferCb, uvOnConnectionCb); - uv_run(pThrd->loop, UV_RUN_DEFAULT); -} -static SRpcConn* connCreate() { - SRpcConn* pConn = (SRpcConn*)calloc(1, sizeof(SRpcConn)); - return pConn; -} -static void connDestroy(SRpcConn* conn) { - if (conn == NULL) { - return; - } - uv_timer_stop(conn->pTimer); - free(conn->pTimer); - uv_close((uv_handle_t*)conn->pTcp, NULL); - free(conn->connBuf.buf); - free(conn->pTcp); - free(conn->pWriter); - free(conn); - // handle -} -static void uvConnDestroy(uv_handle_t* handle) { - SRpcConn* conn = handle->data; - connDestroy(conn); -} -void* rpcOpen(const SRpcInit* pInit) { - SRpcInfo* pRpc = calloc(1, sizeof(SRpcInfo)); - if (pRpc == NULL) { - return NULL; - } - if (pInit->label) { - tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); - } - pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; - pRpc->connType = pInit->connType; - pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); - // pRpc->taosInitServer(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); - return pRpc; -} -void rpcClose(void* arg) { return; } -void* rpcMallocCont(int contLen) { return NULL; } -void rpcFreeCont(void* cont) { return; } -void* rpcReallocCont(void* ptr, int contLen) { return NULL; } - -void rpcSendRequest(void* thandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* rid) { - // impl later - return; -} - -void rpcSendResponse(const SRpcMsg* pMsg) { - SRpcConn* pConn = pMsg->handle; - SThreadObj* pThrd = pConn->hostThread; - - // opt later - pthread_mutex_lock(&pThrd->connMtx); - QUEUE_PUSH(&pThrd->conn, &pConn->queue); - pthread_mutex_unlock(&pThrd->connMtx); - - uv_async_send(pConn->pWorkerAsync); -} - -void rpcSendRedirectRsp(void* pConn, const SEpSet* pEpSet) {} -int rpcGetConnInfo(void* thandle, SRpcConnInfo* pInfo) { return -1; } -void rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pReq, SRpcMsg* pRsp) { return; } -int rpcReportProgress(void* pConn, char* pCont, int contLen) { return -1; } -void rpcCancelRequest(int64_t rid) { return; } - -static int rpcAuthenticateMsg(void* pMsg, int msgLen, void* pAuth, void* pKey) { - T_MD5_CTX context; - int ret = -1; - - tMD5Init(&context); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Update(&context, (uint8_t*)pMsg, msgLen); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Final(&context); - - if (memcmp(context.digest, pAuth, sizeof(context.digest)) == 0) ret = 0; - - return ret; -} -static void rpcBuildAuthHead(void* pMsg, int msgLen, void* pAuth, void* pKey) { - T_MD5_CTX context; - - tMD5Init(&context); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Update(&context, (uint8_t*)pMsg, msgLen); - tMD5Update(&context, (uint8_t*)pKey, TSDB_PASSWORD_LEN); - tMD5Final(&context); - - memcpy(pAuth, context.digest, sizeof(context.digest)); -} - -static int rpcAddAuthPart(SRpcConn* pConn, char* msg, int msgLen) { - SRpcHead* pHead = (SRpcHead*)msg; - - if (pConn->spi && pConn->secured == 0) { - // add auth part - pHead->spi = pConn->spi; - SRpcDigest* pDigest = (SRpcDigest*)(msg + msgLen); - pDigest->timeStamp = htonl(taosGetTimestampSec()); - msgLen += sizeof(SRpcDigest); - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - rpcBuildAuthHead(pHead, msgLen - TSDB_AUTH_LEN, pDigest->auth, pConn->secret); - } else { - pHead->spi = 0; - pHead->msgLen = (int32_t)htonl((uint32_t)msgLen); - } - - return msgLen; -} - -static int32_t rpcCompressRpcMsg(char* pCont, int32_t contLen) { - SRpcHead* pHead = rpcHeadFromCont(pCont); - int32_t finalLen = 0; - int overhead = sizeof(SRpcComp); - - if (!NEEDTO_COMPRESSS_MSG(contLen)) { - return contLen; - } - - char* buf = malloc(contLen + overhead + 8); // 8 extra bytes - if (buf == NULL) { - tError("failed to allocate memory for rpc msg compression, contLen:%d", contLen); - return contLen; - } - - int32_t compLen = LZ4_compress_default(pCont, buf, contLen, contLen + overhead); - tDebug("compress rpc msg, before:%d, after:%d, overhead:%d", contLen, compLen, overhead); - - /* - * only the compressed size is less than the value of contLen - overhead, the compression is applied - * The first four bytes is set to 0, the second four bytes are utilized to keep the original length of message - */ - if (compLen > 0 && compLen < contLen - overhead) { - SRpcComp* pComp = (SRpcComp*)pCont; - pComp->reserved = 0; - pComp->contLen = htonl(contLen); - memcpy(pCont + overhead, buf, compLen); - - pHead->comp = 1; - tDebug("compress rpc msg, before:%d, after:%d", contLen, compLen); - finalLen = compLen + overhead; - } else { - finalLen = contLen; - } - - free(buf); - return finalLen; -} - -static SRpcHead* rpcDecompressRpcMsg(SRpcHead* pHead) { - int overhead = sizeof(SRpcComp); - SRpcHead* pNewHead = NULL; - uint8_t* pCont = pHead->content; - SRpcComp* pComp = (SRpcComp*)pHead->content; - - if (pHead->comp) { - // decompress the content - assert(pComp->reserved == 0); - int contLen = htonl(pComp->contLen); - - // prepare the temporary buffer to decompress message - char* temp = (char*)malloc(contLen + RPC_MSG_OVERHEAD); - pNewHead = (SRpcHead*)(temp + sizeof(SRpcReqContext)); // reserve SRpcReqContext - - if (pNewHead) { - int compLen = rpcContLenFromMsg(pHead->msgLen) - overhead; - int origLen = LZ4_decompress_safe((char*)(pCont + overhead), (char*)pNewHead->content, compLen, contLen); - assert(origLen == contLen); - - memcpy(pNewHead, pHead, sizeof(SRpcHead)); - pNewHead->msgLen = rpcMsgLenFromCont(origLen); - /// rpcFreeMsg(pHead); // free the compressed message buffer - pHead = pNewHead; - tTrace("decomp malloc mem:%p", temp); - } else { - tError("failed to allocate memory to decompress msg, contLen:%d", contLen); - } - } - - return pHead; -} -int32_t rpcInit(void) { - // impl later - return -1; -} - -void rpcCleanup(void) { - // impl later - return; -} -#endif diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 045fb8520e..58fbf6ae85 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -34,7 +34,8 @@ typedef struct { static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->ahandle; - tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, pMsg->code); + tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, + pMsg->code); if (pEpSet) pInfo->epSet = *pEpSet; @@ -185,7 +186,8 @@ int main(int argc, char *argv[]) { // float usedTime = (endTime - startTime) / 1000.0f; // mseconds // tInfo("it takes %.3f mseconds to send %d requests to server", usedTime, numOfReqs * appThreads); - // tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, msgSize); + // tInfo("Performance: %.3f requests per second, msgSize:%d bytes", 1000.0 * numOfReqs * appThreads / usedTime, + // msgSize); int ch = getchar(); UNUSED(ch); From ee7393e589d8f81ee2893abc732d8daa6aee36fe Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Wed, 19 Jan 2022 10:00:24 +0800 Subject: [PATCH 7/8] [modify] --- tests/script/sh/massiveTable/deployCluster.sh | 15 +++++++-------- tests/script/sh/massiveTable/setupDnodes.sh | 2 +- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/tests/script/sh/massiveTable/deployCluster.sh b/tests/script/sh/massiveTable/deployCluster.sh index 2341d512c0..a86e9220cd 100755 --- a/tests/script/sh/massiveTable/deployCluster.sh +++ b/tests/script/sh/massiveTable/deployCluster.sh @@ -6,17 +6,16 @@ set -e #set -x # deployCluster.sh +curr_dir=$(readlink -f "$(dirname "$0")") +echo $curr_dir -curr_dir=$(pwd) -echo "currect pwd: ${curr_dir}" +${curr_dir}/cleanCluster.sh -r "/data" +${curr_dir}/cleanCluster.sh -r "/data2" -./cleanCluster.sh -r "/data" -./cleanCluster.sh -r "/data2" +${curr_dir}/compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0" -./compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0" - -./setupDnodes.sh -r "/data" -n 1 -f "trd02:7000" -p 7000 -./setupDnodes.sh -r "/data2" -n 1 -f "trd02:7000" -p 8000 +${curr_dir}/setupDnodes.sh -r "/data" -n 1 -f "trd02:7000" -p 7000 +${curr_dir}/setupDnodes.sh -r "/data2" -n 1 -f "trd02:7000" -p 8000 #./setupDnodes.sh -r "/data" -n 2 -f trd02:7000 -p 7000 #./setupDnodes.sh -r "/data2" -n 2 -f trd02:7000 -p 8000 diff --git a/tests/script/sh/massiveTable/setupDnodes.sh b/tests/script/sh/massiveTable/setupDnodes.sh index e09b2fb396..e45c7724ba 100755 --- a/tests/script/sh/massiveTable/setupDnodes.sh +++ b/tests/script/sh/massiveTable/setupDnodes.sh @@ -121,7 +121,7 @@ startDnodes() { ############################### main process ########################################## ## kill all taosd process -kill_process taosd +#kill_process taosd ## create director for all dnode if [[ "$enviMode" == "new" ]]; then From 76b8abe2df00a758b4649945db805ec4b9750365 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 19 Jan 2022 10:18:31 +0800 Subject: [PATCH 8/8] feature/qnode --- source/libs/scheduler/src/scheduler.c | 54 +++++++++++++------ source/libs/scheduler/test/schedulerTests.cpp | 35 ++++++------ 2 files changed, 56 insertions(+), 33 deletions(-) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index ccf0c58ac0..de295f77c6 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -66,7 +66,6 @@ void schFreeTask(SSchTask* pTask) { taosArrayDestroy(pTask->candidateAddrs); } - // TODO NEED TO VERFY WITH ASYNC_SEND MEMORY FREE tfree(pTask->msg); if (pTask->children) { @@ -97,7 +96,7 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%d, rspType:%d", lastMsgType, msgType); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); @@ -868,8 +867,18 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch SCH_ERR_RET(schProcessOnTaskFailure(pJob, pTask, rspCode)); } + if (pJob->res) { + SCH_TASK_ELOG("got fetch rsp while res already exists, res:%p", pJob->res); + tfree(rsp); + SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + } + atomic_store_ptr(&pJob->res, rsp); atomic_store_32(&pJob->resNumOfRows, rsp->numOfRows); + + if (rsp->completed) { + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCCEED); + } SCH_ERR_JRET(schProcessOnDataFetched(pJob)); @@ -1067,7 +1076,13 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, case TDMT_VND_CREATE_TABLE: case TDMT_VND_SUBMIT: { msgSize = pTask->msgLen; - msg = pTask->msg; + msg = calloc(1, msgSize); + if (NULL == msg) { + SCH_TASK_ELOG("calloc %d failed", msgSize); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + memcpy(msg, pTask->msg, msgSize); break; } @@ -1549,29 +1564,24 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { int8_t status = SCH_GET_JOB_STATUS(pJob); if (status == JOB_TASK_STATUS_DROPPING) { SCH_JOB_ELOG("job is dropping, status:%d", status); - atomic_sub_fetch_32(&pJob->ref, 1); - SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); + SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } if (!SCH_JOB_NEED_FETCH(&pJob->attr)) { SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob)); - atomic_sub_fetch_32(&pJob->ref, 1); - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } if (atomic_val_compare_exchange_8(&pJob->userFetch, 0, 1) != 0) { SCH_JOB_ELOG("prior fetching not finished, userFetch:%d", atomic_load_8(&pJob->userFetch)); - atomic_sub_fetch_32(&pJob->ref, 1); - SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { - *pData = atomic_load_ptr(&pJob->res); - atomic_store_ptr(&pJob->res, NULL); + SCH_JOB_ELOG("job failed or dropping, status:%d", status); SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } else if (status == JOB_TASK_STATUS_SUCCEED) { - *pData = atomic_load_ptr(&pJob->res); - atomic_store_ptr(&pJob->res, NULL); + SCH_JOB_ELOG("job already succeed, status:%d", status); goto _return; } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_ERR_JRET(schFetchFromRemote(pJob)); @@ -1582,14 +1592,16 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { status = SCH_GET_JOB_STATUS(pJob); if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { - code = atomic_load_32(&pJob->errCode); - SCH_ERR_JRET(code); + SCH_JOB_ELOG("job failed or dropping, status:%d", status); + SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } if (pJob->res && ((SRetrieveTableRsp *)pJob->res)->completed) { SCH_ERR_JRET(schCheckAndUpdateJobStatus(pJob, JOB_TASK_STATUS_SUCCEED)); } +_return: + while (true) { *pData = atomic_load_ptr(&pJob->res); @@ -1600,10 +1612,19 @@ int32_t scheduleFetchRows(SSchJob *pJob, void** pData) { break; } -_return: + if (NULL == *pData) { + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)calloc(1, sizeof(SRetrieveTableRsp)); + if (rsp) { + rsp->completed = 1; + } + + *pData = rsp; + } atomic_val_compare_exchange_8(&pJob->userFetch, 1, 0); + SCH_JOB_DLOG("fetch done, code:%x", code); + atomic_sub_fetch_32(&pJob->ref, 1); SCH_RET(code); @@ -1683,6 +1704,7 @@ void scheduleFreeJob(void *job) { taosHashCleanup(pJob->succTasks); taosArrayDestroy(pJob->levels); + taosArrayDestroy(pJob->nodeList); tfree(pJob->res); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 58159f2306..1425ac0e6c 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -49,7 +49,7 @@ uint64_t schtQueryId = 1; bool schtTestStop = false; bool schtTestDeadLoop = false; -int32_t schtTestMTRunSec = 60; +int32_t schtTestMTRunSec = 10; int32_t schtTestPrintNum = 1000; int32_t schtStartFetch = 0; @@ -187,8 +187,6 @@ void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int6 } - - void schtSetPlanToString() { static Stub stub; stub.set(qSubPlanToString, schtPlanToString); @@ -228,7 +226,12 @@ void schtSetRpcSendRequest() { } } -int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo) { +int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, SMsgSendInfo* pInfo) { + if (pInfo) { + tfree(pInfo->param); + tfree(pInfo->msgInfo.pData); + free(pInfo); + } return 0; } @@ -284,7 +287,7 @@ void *schtCreateFetchRspThread(void *param) { rsp->completed = 1; rsp->numOfRows = 10; - code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(rsp), 0); + code = schHandleResponseMsg(job, job->fetchTask, TDMT_VND_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0); assert(code == 0); } @@ -344,12 +347,6 @@ void* schtRunJobThread(void *aa) { schtInitLogFile(); - SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); - - SEpAddr qnodeAddr = {0}; - strcpy(qnodeAddr.fqdn, "qnode0.ep"); - qnodeAddr.port = 6031; - taosArrayPush(qnodeList, &qnodeAddr); int32_t code = schedulerInit(NULL); assert(code == 0); @@ -368,6 +365,13 @@ void* schtRunJobThread(void *aa) { while (!schtTestStop) { schtBuildQueryDag(&dag); + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + + SEpAddr qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &job); assert(code == 0); @@ -475,8 +479,6 @@ void* schtRunJobThread(void *aa) { code = scheduleFetchRows(pQueryJob, &data); assert(code == 0 || code); - assert(data == (void*)NULL); - schtFreeQueryJob(0); taosHashCleanup(execTasks); @@ -496,7 +498,7 @@ void* schtRunJobThread(void *aa) { void* schtFreeJobThread(void *aa) { while (!schtTestStop) { - usleep(rand() % 2000); + usleep(rand() % 100); schtFreeQueryJob(1); } } @@ -592,11 +594,12 @@ TEST(queryTest, normalCase) { SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; ASSERT_EQ(pRsp->completed, 1); ASSERT_EQ(pRsp->numOfRows, 10); + tfree(data); data = NULL; code = scheduleFetchRows(job, &data); ASSERT_EQ(code, 0); - ASSERT_EQ(data, (void*)NULL); + ASSERT_TRUE(data); scheduleFreeJob(pJob); @@ -607,7 +610,6 @@ TEST(queryTest, normalCase) { - TEST(insertTest, normalCase) { void *mockPointer = (void *)0x1; char *clusterId = "cluster1"; @@ -672,7 +674,6 @@ TEST(multiThread, forceFree) { sleep(3); } - int main(int argc, char** argv) { srand(time(NULL)); testing::InitGoogleTest(&argc, argv);