diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dmHandle.c index 93b24535a9..1179db8ae2 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dmHandle.c @@ -36,12 +36,12 @@ static int32_t dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) { } } else { SStatusRsp statusRsp = {0}; - if (pRsp->pCont != NULL && pRsp->contLen > 0 && - tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { + if (pRsp->pCont != NULL && pRsp->contLen > 0 && tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) { pDnode->data.dnodeVer = statusRsp.dnodeVer; dmUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg); dmUpdateEps(pDnode, statusRsp.pDnodeEps); } + rpcFreeCont(pRsp->pCont); tFreeSStatusRsp(&statusRsp); } @@ -76,7 +76,7 @@ void dmSendStatusReq(SDnode *pDnode) { req.pVloads = info.pVloads; int32_t contLen = tSerializeSStatusReq(NULL, 0, &req); - void *pHead = rpcMallocCont(contLen); + void * pHead = rpcMallocCont(contLen); tSerializeSStatusReq(pHead, contLen, &req); tFreeSStatusReq(&req); @@ -101,7 +101,7 @@ int32_t dmProcessGrantRsp(SDnode *pDnode, SNodeMsg *pMsg) { } int32_t dmProcessConfigReq(SDnode *pDnode, SNodeMsg *pMsg) { - SRpcMsg *pReq = &pMsg->rpcMsg; + SRpcMsg * pReq = &pMsg->rpcMsg; SDCfgDnodeReq *pCfg = pReq->pCont; dError("config req is received, but not supported yet"); return TSDB_CODE_OPS_NOT_SUPPORT; diff --git a/source/dnode/mnode/impl/src/mndInfoSchema.c b/source/dnode/mnode/impl/src/mndInfoSchema.c index f626332fc2..d84c5f84cf 100644 --- a/source/dnode/mnode/impl/src/mndInfoSchema.c +++ b/source/dnode/mnode/impl/src/mndInfoSchema.c @@ -41,8 +41,8 @@ static const SInfosTableSchema mnodesSchema[] = { static const SInfosTableSchema modulesSchema[] = { {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "endpoint", .bytes = 134, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "module", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "endpoint", .bytes = 134 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "module", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, }; static const SInfosTableSchema qnodesSchema[] = { @@ -145,9 +145,9 @@ static const SInfosTableSchema userTblsSchema[] = { }; static const SInfosTableSchema userTblDistSchema[] = { - {.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "db_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "distributed_histogram", .bytes = 500, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "distributed_histogram", .bytes = 500 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "min_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "max_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "avg_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, @@ -168,35 +168,33 @@ static const SInfosTableSchema userUsersSchema[] = { }; static const SInfosTableSchema grantsSchema[] = { - {.name = "version", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + {.name = "version", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, }; static const SInfosTableSchema vgroupsSchema[] = { {.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "onlines", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "v1_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "v1_status", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "v1_status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "v2_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "v2_status", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "v2_status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "v3_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "v3_status", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR}, - {.name = "compacting", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, + {.name = "v3_status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "nfiles", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "file_size", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, }; @@ -206,7 +204,7 @@ static const SInfosTableSchema topicSchema[] = { {.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, - {.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_VARCHAR}, + {.name = "sql", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, }; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index dcb48c95a7..75fe409d2a 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -537,17 +537,6 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock* pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->numOfTables, false); - // status - char buf[10] = {0}; - STR_TO_VARSTR(buf, "ready"); // TODO - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, buf, false); - - // onlines - int32_t onlines = pVgroup->replica; - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&onlines, false); - // default 3 replica for (int32_t i = 0; i < 3; ++i) { diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c655c0bfc9..9dc12a3dec 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -212,10 +212,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd); } \ } while (0) -#define CONN_NO_PERSIST_BY_APP(conn) \ - (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) -#define CONN_RELEASE_BY_SERVER(conn) \ - (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) +#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) +#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1) #define REQUEST_NO_RESP(msg) ((msg)->noResp == 1) #define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1) @@ -290,9 +288,8 @@ void cliHandleResp(SCliConn* conn) { tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn); } - tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, - TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), - taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); + tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType), + taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen); conn->secured = pHead->secured; @@ -358,12 +355,10 @@ void cliHandleExcept(SCliConn* pConn) { if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) { transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType); - tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, - TMSG_INFO(transMsg.msgType)); + tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType)); if (transMsg.ahandle == NULL) { transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType)); - tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, - transMsg.ahandle); + tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle); } } else { transMsg.ahandle = pCtx ? pCtx->ahandle : NULL; @@ -546,6 +541,7 @@ static void cliDestroy(uv_handle_t* handle) { transCtxCleanup(&conn->ctx); transQueueDestroy(&conn->cliMsgs); tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn); + transDestroyBuffer(&conn->readBuf); taosMemoryFree(conn); } static bool cliHandleNoResp(SCliConn* conn) { @@ -635,9 +631,8 @@ void cliSend(SCliConn* pConn) { pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0; uv_buf_t wb = uv_buf_init((char*)pHead, msgLen); - tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, - TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); + tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType), + taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); if (pHead->persist == 1) { CONN_SET_PERSIST_BY_APP(pConn); @@ -675,10 +670,9 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) { tDebug("cli work thread %p start to quit", pThrd); destroyCmsg(pMsg); destroyConnPool(pThrd->pool); - uv_timer_stop(&pThrd->timer); - pThrd->quit = true; + uv_stop(pThrd->loop); } static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) { diff --git a/source/libs/transport/src/transComm.c b/source/libs/transport/src/transComm.c index ef9cccccb7..03b97b6fa1 100644 --- a/source/libs/transport/src/transComm.c +++ b/source/libs/transport/src/transComm.c @@ -195,7 +195,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb) void transDestroyAsyncPool(SAsyncPool* pool) { for (int i = 0; i < pool->nAsync; i++) { uv_async_t* async = &(pool->asyncs[i]); - + uv_close((uv_handle_t*)async, NULL); SAsyncItem* item = async->data; taosThreadMutexDestroy(&item->mtx); taosMemoryFree(item); diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index c5a74d4840..50ce0bddcd 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -126,6 +126,11 @@ static void uvWorkerAsyncCb(uv_async_t* handle); static void uvAcceptAsyncCb(uv_async_t* handle); static void uvShutDownCb(uv_shutdown_t* req, int status); +static void uvFreeCb(uv_handle_t* handle) { + // + taosMemoryFree(handle); +} + static void uvStartSendRespInternal(SSrvMsg* smsg); static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); static void uvStartSendResp(SSrvMsg* msg); @@ -141,8 +146,7 @@ static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd); static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); -static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, - uvHandleRegister}; +static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, uvHandleRegister}; static void uvDestroyConn(uv_handle_t* handle); @@ -205,13 +209,12 @@ static void uvHandleReq(SSrvConn* pConn) { } if (pConn->status == ConnNormal && pHead->noResp == 0) { transRefSrvHandle(pConn); - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), - ntohs(pConn->locaddr.sin_port), transMsg.contLen); + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), + ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen); } else { - tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, - TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), - taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp); + tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, TMSG_INFO(transMsg.msgType), + taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), + transMsg.contLen, pHead->noResp); // no ref here } @@ -318,6 +321,8 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { } else { tError("fail to dispatch conn to work thread"); } + uv_close((uv_handle_t*)req->data, uvFreeCb); + // taosMemoryFree(req->data); taosMemoryFree(req); } @@ -349,9 +354,8 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { char* msg = (char*)pHead; int32_t len = transMsgLenFromCont(pMsg->contLen); - tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), - taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), - ntohs(pConn->locaddr.sin_port)); + tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), + ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port)); pHead->msgLen = htonl(len); wb->base = msg; @@ -429,11 +433,39 @@ void uvWorkerAsyncCb(uv_async_t* handle) { (*transAsyncHandle[msg->type])(msg, pThrd); } } +static void uvWalkCb(uv_handle_t* handle, void* arg) { + if (!uv_is_closing(handle)) { + uv_close(handle, NULL); + // uv_unref(handle); + tDebug("handle: %p -----test----", handle); + } +} +#define MAKE_VALGRIND_HAPPY(loop) \ + do { \ + uv_walk(loop, uvWalkCb, NULL); \ + uv_run(loop, UV_RUN_DEFAULT); \ + uv_loop_close(loop); \ + } while (0); + static void uvAcceptAsyncCb(uv_async_t* async) { SServerObj* srv = async->data; tDebug("close server port %d", srv->port); - uv_close((uv_handle_t*)&srv->server, NULL); - uv_stop(srv->loop); + uv_walk(srv->loop, uvWalkCb, NULL); + // uv_close((uv_handle_t*)async, NULL); + // uv_close((uv_handle_t*)&srv->server, NULL); + // uv_stop(srv->loop); + // uv_print_all_handles(srv->loop, stderr); + // int ref = uv_loop_alive(srv->loop); + // assert(ref == 0); + // tError("active size %d", ref); + // uv_stop(srv->loop); + // uv_run(srv->loop, UV_RUN_DEFAULT); + // fprintf(stderr, "------------------------------------"); + // uv_print_all_handles(srv->loop, stderr); + + // int ret = uv_loop_close(srv->loop); + // tError("(loop)->active_reqs.count: %d, ret: %d", (srv->loop)->active_reqs.count, ret); + // assert(ret == 0); } static void uvShutDownCb(uv_shutdown_t* req, int status) { @@ -455,16 +487,16 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) { if (uv_accept(stream, (uv_stream_t*)cli) == 0) { uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t)); - + wr->data = cli; uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify)); pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads; tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx); + uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb); } else { uv_close((uv_handle_t*)cli, NULL); - taosMemoryFree(cli); } } void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { @@ -474,7 +506,10 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { tError("read error %s", uv_err_name(nread)); } // TODO(log other failure reason) - // uv_close((uv_handle_t*)q, NULL); + tError("failed to create connect: %p", q); + taosMemoryFree(buf->base); + uv_close((uv_handle_t*)q, NULL); + // taosMemoryFree(q); return; } // free memory allocated by @@ -650,6 +685,7 @@ static void uvDestroyConn(uv_handle_t* handle) { if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) { tTrace("work thread quit"); + // uv_walk(thrd->loop, uvWalkCb, NULL); uv_loop_close(thrd->loop); uv_stop(thrd->loop); } @@ -713,6 +749,7 @@ End: void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { thrd->quit = true; if (QUEUE_IS_EMPTY(&thrd->conn)) { + // uv_walk(thrd->loop, uvWalkCb, NULL); uv_loop_close(thrd->loop); uv_stop(thrd->loop); } else { @@ -765,8 +802,9 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { return; } taosThreadJoin(pThrd->thread, NULL); - taosMemoryFree(pThrd->loop); + // MAKE_VALGRIND_HAPPY(pThrd->loop); transDestroyAsyncPool(pThrd->asyncPool); + taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { @@ -784,6 +822,8 @@ void transCloseServer(void* arg) { uv_async_send(srv->pAcceptAsync); taosThreadJoin(srv->thread, NULL); + MAKE_VALGRIND_HAPPY(srv->loop); + for (int i = 0; i < srv->numOfThreads; i++) { sendQuitToWorkThrd(srv->pThreadObj[i]); destroyWorkThrd(srv->pThreadObj[i]); diff --git a/tools/shell/src/shellCommand.c b/tools/shell/src/shellCommand.c index 7cbadfaf5b..546b19f83c 100644 --- a/tools/shell/src/shellCommand.c +++ b/tools/shell/src/shellCommand.c @@ -15,9 +15,9 @@ #define __USE_XOPEN +#include "shellCommand.h" #include "os.h" #include "shell.h" -#include "shellCommand.h" #include @@ -48,7 +48,7 @@ void getPrevCharSize(const char *str, int pos, int *size, int *width) { while (--pos >= 0) { *size += 1; - if (str[pos] > 0 || countPrefixOnes((unsigned char )str[pos]) > 1) break; + if (str[pos] > 0 || countPrefixOnes((unsigned char)str[pos]) > 1) break; } int rc = taosMbToWchar(&wc, str + pos, MB_CUR_MAX); @@ -106,8 +106,7 @@ void clearLineBefore(Command *cmd) { assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset); clearScreen(cmd->endOffset + prompt_size, cmd->screenOffset + prompt_size); - memmove(cmd->command, cmd->command + cmd->cursorOffset, - cmd->commandSize - cmd->cursorOffset); + memmove(cmd->command, cmd->command + cmd->cursorOffset, cmd->commandSize - cmd->cursorOffset); cmd->commandSize -= cmd->cursorOffset; cmd->cursorOffset = 0; cmd->screenOffset = 0; @@ -235,8 +234,8 @@ int isReadyGo(Command *cmd) { sprintf(total, "%s%s", cmd->buffer, cmd->command); char *reg_str = - "(^.*;\\s*$)|(^\\s*$)|(^\\s*exit\\s*$)|(^\\s*q\\s*$)|(^\\s*quit\\s*$)|(^" - "\\s*clear\\s*$)"; + "(^.*;\\s*$)|(^\\s*$)|(^\\s*exit\\s*$)|(^\\s*q\\s*$)|(^\\s*quit\\s*$)|(^" + "\\s*clear\\s*$)"; if (regex_match(total, reg_str, REG_EXTENDED | REG_ICASE)) { taosMemoryFree(total); return 1; diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 548d2169b1..36d2866fb5 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -23,40 +23,43 @@ #include "shellCommand.h" #include "taosdef.h" #include "taoserror.h" +#include "tconfig.h" #include "tglobal.h" #include "ttypes.h" #include "tutil.h" -#include "tconfig.h" #include #include /**************** Global variables ****************/ #ifdef _TD_POWER_ -char CLIENT_VERSION[] = "Welcome to the PowerDB shell from %s, Client Version:%s\n" - "Copyright (c) 2020 by PowerDB, Inc. All rights reserved.\n\n"; -char PROMPT_HEADER[] = "power> "; +char CLIENT_VERSION[] = + "Welcome to the PowerDB shell from %s, Client Version:%s\n" + "Copyright (c) 2020 by PowerDB, Inc. All rights reserved.\n\n"; +char PROMPT_HEADER[] = "power> "; -char CONTINUE_PROMPT[] = " -> "; -int prompt_size = 7; +char CONTINUE_PROMPT[] = " -> "; +int prompt_size = 7; #elif (_TD_TQ_ == true) -char CLIENT_VERSION[] = "Welcome to the TQ shell from %s, Client Version:%s\n" - "Copyright (c) 2020 by TQ, Inc. All rights reserved.\n\n"; -char PROMPT_HEADER[] = "tq> "; +char CLIENT_VERSION[] = + "Welcome to the TQ shell from %s, Client Version:%s\n" + "Copyright (c) 2020 by TQ, Inc. All rights reserved.\n\n"; +char PROMPT_HEADER[] = "tq> "; -char CONTINUE_PROMPT[] = " -> "; -int prompt_size = 4; +char CONTINUE_PROMPT[] = " -> "; +int prompt_size = 4; #else -char CLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n" - "Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n"; -char PROMPT_HEADER[] = "taos> "; +char CLIENT_VERSION[] = + "Welcome to the TDengine shell from %s, Client Version:%s\n" + "Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n"; +char PROMPT_HEADER[] = "taos> "; -char CONTINUE_PROMPT[] = " -> "; -int prompt_size = 6; +char CONTINUE_PROMPT[] = " -> "; +int prompt_size = 6; #endif -int64_t result = 0; -SShellHistory history; +int64_t result = 0; +SShellHistory history; #define DEFAULT_MAX_BINARY_DISPLAY_WIDTH 30 extern int32_t tsMaxBinaryDisplayWidth; @@ -339,12 +342,12 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { atomic_store_64(&result, 0); freeResultWithRid(oresult); - taos_free_result(pSql); - + taos_free_result(pSql); + return; } - TAOS_FIELD* pFields = taos_fetch_fields(pSql); + TAOS_FIELD *pFields = taos_fetch_fields(pSql); if (pFields != NULL) { // select and show kinds of commands int error_no = 0; @@ -361,7 +364,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) { } else { printf("Query interrupted (%s), %d row(s) in set (%.6fs)\n", taos_errstr(pSql), numOfRows, (et - st) / 1E6); } - taos_free_result(pSql); + taos_free_result(pSql); } else { int num_rows_affacted = taos_affected_rows(pSql); taos_free_result(pSql); @@ -521,7 +524,8 @@ static int dumpResultToFile(const char *fname, TAOS_RES *tres) { } // FILE *fp = fopen(full_path.we_wordv[0], "w"); - TdFilePtr pFile = taosOpenFile(full_path.we_wordv[0], TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); + TdFilePtr pFile = + taosOpenFile(full_path.we_wordv[0], TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM); if (pFile == NULL) { fprintf(stderr, "ERROR: failed to open file: %s\n", full_path.we_wordv[0]); wordfree(&full_path); @@ -901,8 +905,8 @@ void read_history() { memset(history.hist, 0, sizeof(char *) * MAX_HISTORY_SIZE); history.hstart = 0; history.hend = 0; - char *line = NULL; - int read_size = 0; + char *line = NULL; + int read_size = 0; char f_history[TSDB_FILENAME_LEN]; get_history_path(f_history); @@ -929,7 +933,7 @@ void read_history() { } } - if(line != NULL) taosMemoryFree(line); + if (line != NULL) taosMemoryFree(line); taosCloseFile(&pFile); } @@ -1026,7 +1030,7 @@ void source_file(TAOS *con, char *fptr) { } taosMemoryFree(cmd); - if(line != NULL) taosMemoryFree(line); + if (line != NULL) taosMemoryFree(line); wordfree(&full_path); taosCloseFile(&pFile); } diff --git a/tools/shell/src/shellMain.c b/tools/shell/src/shellMain.c index 8ca8142eca..0d397eb80b 100644 --- a/tools/shell/src/shellMain.c +++ b/tools/shell/src/shellMain.c @@ -14,45 +14,38 @@ */ #define __USE_XOPEN -#include "os.h" -#include "shell.h" -#include "tglobal.h" -#include "tconfig.h" #include "shellCommand.h" -#include "tbase64.h" +#include "tglobal.h" #include "tlog.h" -#include "version.h" -#include #include #include +#include #define OPT_ABORT 1 /* abort */ - int indicator = 1; -void insertChar(Command *cmd, char *c, int size); -void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen, - int32_t pkgNum, char *pkgType); +void insertChar(Command *cmd, char *c, int size); +void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen, int32_t pkgNum, char *pkgType); const char *argp_program_version = version; const char *argp_program_bug_address = ""; static char doc[] = ""; static char args_doc[] = ""; -TdThread pid; +TdThread pid; static tsem_t cancelSem; static struct argp_option options[] = { {"host", 'h', "HOST", 0, "TDengine server FQDN to connect. The default host is localhost."}, - {"password", 'p', 0, 0, "The password to use when connecting to the server."}, + {"password", 'p', NULL, 0, "The password to use when connecting to the server."}, {"port", 'P', "PORT", 0, "The TCP/IP port number to use for the connection."}, {"user", 'u', "USER", 0, "The user name to use when connecting to the server."}, {"auth", 'A', "Auth", 0, "The auth string to use when connecting to the server."}, {"config-dir", 'c', "CONFIG_DIR", 0, "Configuration directory."}, - {"dump-config", 'C', 0, 0, "Dump configuration."}, + {"dump-config",'C', NULL, 0, "Dump configuration."}, {"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."}, - {"raw-time", 'r', 0, 0, "Output time as uint64_t."}, + {"raw-time", 'r', NULL, 0, "Output time as uint64_t."}, {"file", 'f', "FILE", 0, "Script to run without enter the shell."}, {"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."}, {"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."}, @@ -70,7 +63,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { /* Get the input argument from argp_parse, which we know is a pointer to our arguments structure. */ SShellArguments *arguments = state->input; - wordexp_t full_path; + wordexp_t full_path; switch (key) { case 'h': @@ -80,7 +73,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { break; case 'P': if (arg) { - arguments->port = atoi(arg); + arguments->port = atoi(arg); } else { fprintf(stderr, "Invalid port\n"); return -1; @@ -182,35 +175,33 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) { /* Our argp parser. */ static struct argp argp = {options, parse_opt, args_doc, doc}; -char LINUXCLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n" - "Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n"; +char LINUXCLIENT_VERSION[] = + "Welcome to the TDengine shell from %s, Client Version:%s\n" + "Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n"; char g_password[SHELL_MAX_PASSWORD_LEN]; -static void parse_args( - int argc, char *argv[], SShellArguments *arguments) { - for (int i = 1; i < argc; i++) { - if ((strncmp(argv[i], "-p", 2) == 0) - || (strncmp(argv[i], "--password", 10) == 0)) { - printf(LINUXCLIENT_VERSION, tsOsName, taos_get_client_info()); - if ((strlen(argv[i]) == 2) - || (strncmp(argv[i], "--password", 10) == 0)) { - printf("Enter password: "); - taosSetConsoleEcho(false); - if (scanf("%20s", g_password) > 1) { - fprintf(stderr, "password reading error\n"); - } - taosSetConsoleEcho(true); - if (EOF == getchar()) { - fprintf(stderr, "getchar() return EOF\n"); - } - } else { - tstrncpy(g_password, (char *)(argv[i] + 2), SHELL_MAX_PASSWORD_LEN); - strcpy(argv[i], "-p"); - } - arguments->password = g_password; - arguments->is_use_passwd = true; +static void parse_args(int argc, char *argv[], SShellArguments *arguments) { + for (int i = 1; i < argc; i++) { + if ((strncmp(argv[i], "-p", 2) == 0) || (strncmp(argv[i], "--password", 10) == 0)) { + printf(LINUXCLIENT_VERSION, tsOsName, taos_get_client_info()); + if ((strlen(argv[i]) == 2) || (strncmp(argv[i], "--password", 10) == 0)) { + printf("Enter password: "); + taosSetConsoleEcho(false); + if (scanf("%20s", g_password) > 1) { + fprintf(stderr, "password reading error\n"); } + taosSetConsoleEcho(true); + if (EOF == getchar()) { + fprintf(stderr, "getchar() return EOF\n"); + } + } else { + tstrncpy(g_password, (char *)(argv[i] + 2), SHELL_MAX_PASSWORD_LEN); + strcpy(argv[i], "-p"); + } + arguments->password = g_password; + arguments->is_use_passwd = true; } + } } void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) { @@ -225,20 +216,20 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) { argp_parse(&argp, argc, argv, 0, 0, arguments); if (arguments->abort) { - #ifndef _ALPINE - #if 0 +#ifndef _ALPINE +#if 0 error(10, 0, "ABORTED"); - #endif - #else - abort(); - #endif +#endif +#else + abort(); +#endif } } int32_t shellReadCommand(TAOS *con, char *command) { unsigned hist_counter = history.hend; - char utf8_array[10] = "\0"; - Command cmd; + char utf8_array[10] = "\0"; + Command cmd; memset(&cmd, 0, sizeof(cmd)); cmd.buffer = (char *)taosMemoryCalloc(1, MAX_COMMAND_SIZE); cmd.command = (char *)taosMemoryCalloc(1, MAX_COMMAND_SIZE); @@ -247,7 +238,7 @@ int32_t shellReadCommand(TAOS *con, char *command) { // Read input. char c; while (1) { - c = (char)getchar(); // getchar() return an 'int' value + c = (char)getchar(); // getchar() return an 'int' value if (c == EOF) { return c; @@ -406,13 +397,13 @@ void *shellLoopQuery(void *arg) { taosThreadCleanupPush(cleanup_handler, NULL); char *command = taosMemoryMalloc(MAX_COMMAND_SIZE); - if (command == NULL){ + if (command == NULL) { uError("failed to malloc command"); return NULL; } int32_t err = 0; - + do { // Read command from shell. memset(command, 0, MAX_COMMAND_SIZE); @@ -423,12 +414,12 @@ void *shellLoopQuery(void *arg) { } resetTerminalMode(); } while (shellRunCommand(con, command) == 0); - + taosMemoryFreeClear(command); exitShell(); taosThreadCleanupPop(1); - + return NULL; } @@ -437,7 +428,7 @@ void get_history_path(char *_history) { snprintf(_history, TSDB_FILENAME_LEN, "% void clearScreen(int ecmd_pos, int cursor_pos) { struct winsize w; if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) { - //fprintf(stderr, "No stream device, and use default value(col 120, row 30)\n"); + // fprintf(stderr, "No stream device, and use default value(col 120, row 30)\n"); w.ws_col = 120; w.ws_row = 30; } @@ -458,13 +449,13 @@ void clearScreen(int ecmd_pos, int cursor_pos) { void showOnScreen(Command *cmd) { struct winsize w; if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) { - //fprintf(stderr, "No stream device\n"); + // fprintf(stderr, "No stream device\n"); w.ws_col = 120; w.ws_row = 30; } TdWchar wc; - int size = 0; + int size = 0; // Print out the command. char *total_string = taosMemoryMalloc(MAX_COMMAND_SIZE); @@ -531,13 +522,11 @@ void showOnScreen(Command *cmd) { void cleanup_handler(void *arg) { resetTerminalMode(); } void exitShell() { - /*int32_t ret =*/ resetTerminalMode(); + /*int32_t ret =*/resetTerminalMode(); taos_cleanup(); exit(EXIT_SUCCESS); } -void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { - tsem_post(&cancelSem); -} +void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { tsem_post(&cancelSem); } void *cancelHandler(void *arg) { setThreadName("cancelHandler"); @@ -554,7 +543,7 @@ void *cancelHandler(void *arg) { SSqlObj* pSql = taosAcquireRef(tscObjRef, rid); taos_stop_query(pSql); taosReleaseRef(tscObjRef, rid); -#endif +#endif #else resetTerminalMode(); printf("\nReceive ctrl+c or other signal, quit shell.\n"); @@ -640,11 +629,11 @@ int main(int argc, char *argv[]) { con = taos_connect_auth(args.host, args.user, args.auth, args.database, args.port); } -/* if (taos_init()) { - printf("Failed to init taos"); - exit(EXIT_FAILURE); - } - */ + // if (taos_init()) { + // printf("Failed to init taos"); + // exit(EXIT_FAILURE); + // } + taosNetTest(args.netTestRole, args.host, args.port, args.pktLen, args.pktNum, args.pktType); taos_close(con); exit(0);