Merge pull request #11618 from taosdata/fix/valgrind
fix: memory leak when sending and receiving messages
This commit is contained in:
commit
7273581072
|
@ -36,12 +36,12 @@ static int32_t dmProcessStatusRsp(SDnode *pDnode, SRpcMsg *pRsp) {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SStatusRsp statusRsp = {0};
|
SStatusRsp statusRsp = {0};
|
||||||
if (pRsp->pCont != NULL && pRsp->contLen > 0 &&
|
if (pRsp->pCont != NULL && pRsp->contLen > 0 && tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
|
||||||
tDeserializeSStatusRsp(pRsp->pCont, pRsp->contLen, &statusRsp) == 0) {
|
|
||||||
pDnode->data.dnodeVer = statusRsp.dnodeVer;
|
pDnode->data.dnodeVer = statusRsp.dnodeVer;
|
||||||
dmUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg);
|
dmUpdateDnodeCfg(pDnode, &statusRsp.dnodeCfg);
|
||||||
dmUpdateEps(pDnode, statusRsp.pDnodeEps);
|
dmUpdateEps(pDnode, statusRsp.pDnodeEps);
|
||||||
}
|
}
|
||||||
|
rpcFreeCont(pRsp->pCont);
|
||||||
tFreeSStatusRsp(&statusRsp);
|
tFreeSStatusRsp(&statusRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -76,7 +76,7 @@ void dmSendStatusReq(SDnode *pDnode) {
|
||||||
req.pVloads = info.pVloads;
|
req.pVloads = info.pVloads;
|
||||||
|
|
||||||
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
|
int32_t contLen = tSerializeSStatusReq(NULL, 0, &req);
|
||||||
void *pHead = rpcMallocCont(contLen);
|
void * pHead = rpcMallocCont(contLen);
|
||||||
tSerializeSStatusReq(pHead, contLen, &req);
|
tSerializeSStatusReq(pHead, contLen, &req);
|
||||||
tFreeSStatusReq(&req);
|
tFreeSStatusReq(&req);
|
||||||
|
|
||||||
|
@ -101,7 +101,7 @@ int32_t dmProcessGrantRsp(SDnode *pDnode, SNodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dmProcessConfigReq(SDnode *pDnode, SNodeMsg *pMsg) {
|
int32_t dmProcessConfigReq(SDnode *pDnode, SNodeMsg *pMsg) {
|
||||||
SRpcMsg *pReq = &pMsg->rpcMsg;
|
SRpcMsg * pReq = &pMsg->rpcMsg;
|
||||||
SDCfgDnodeReq *pCfg = pReq->pCont;
|
SDCfgDnodeReq *pCfg = pReq->pCont;
|
||||||
dError("config req is received, but not supported yet");
|
dError("config req is received, but not supported yet");
|
||||||
return TSDB_CODE_OPS_NOT_SUPPORT;
|
return TSDB_CODE_OPS_NOT_SUPPORT;
|
||||||
|
|
|
@ -41,8 +41,8 @@ static const SInfosTableSchema mnodesSchema[] = {
|
||||||
|
|
||||||
static const SInfosTableSchema modulesSchema[] = {
|
static const SInfosTableSchema modulesSchema[] = {
|
||||||
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "endpoint", .bytes = 134, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "endpoint", .bytes = 134 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "module", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "module", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SInfosTableSchema qnodesSchema[] = {
|
static const SInfosTableSchema qnodesSchema[] = {
|
||||||
|
@ -145,9 +145,9 @@ static const SInfosTableSchema userTblsSchema[] = {
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SInfosTableSchema userTblDistSchema[] = {
|
static const SInfosTableSchema userTblDistSchema[] = {
|
||||||
{.name = "db_name", .bytes = 32, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "db_name", .bytes = 32 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "table_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "distributed_histogram", .bytes = 500, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "distributed_histogram", .bytes = 500 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "min_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "min_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "max_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "max_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "avg_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "avg_of_rows", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
|
@ -168,35 +168,33 @@ static const SInfosTableSchema userUsersSchema[] = {
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SInfosTableSchema grantsSchema[] = {
|
static const SInfosTableSchema grantsSchema[] = {
|
||||||
{.name = "version", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "version", .bytes = 8 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "expire time", .bytes = 19 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "expired", .bytes = 5 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "storage(GB)", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "timeseries", .bytes = 21 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "databases", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "users", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "accounts", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "dnodes", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "connections", .bytes = 11 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "streams", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "cpu cores", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "speed(PPS)", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY},
|
{.name = "querytime", .bytes = 9 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
};
|
};
|
||||||
|
|
||||||
static const SInfosTableSchema vgroupsSchema[] = {
|
static const SInfosTableSchema vgroupsSchema[] = {
|
||||||
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "vgroup_id", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "tables", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
|
||||||
{.name = "onlines", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
|
||||||
{.name = "v1_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "v1_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "v1_status", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "v1_status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "v2_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "v2_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "v2_status", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "v2_status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "v3_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "v3_dnode", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "v3_status", .bytes = 10, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "v3_status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "compacting", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "status", .bytes = 12 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "nfiles", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "nfiles", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
{.name = "file_size", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "file_size", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
};
|
};
|
||||||
|
@ -206,7 +204,7 @@ static const SInfosTableSchema topicSchema[] = {
|
||||||
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "topic_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "db_name", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
|
||||||
{.name = "sql", .bytes = 1024, .type = TSDB_DATA_TYPE_VARCHAR},
|
{.name = "sql", .bytes = 1024 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
|
||||||
{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
{.name = "row_len", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -537,17 +537,6 @@ static int32_t mndRetrieveVgroups(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock*
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->numOfTables, false);
|
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->numOfTables, false);
|
||||||
|
|
||||||
// status
|
|
||||||
char buf[10] = {0};
|
|
||||||
STR_TO_VARSTR(buf, "ready"); // TODO
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataAppend(pColInfo, numOfRows, buf, false);
|
|
||||||
|
|
||||||
// onlines
|
|
||||||
int32_t onlines = pVgroup->replica;
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
|
||||||
colDataAppend(pColInfo, numOfRows, (const char *)&onlines, false);
|
|
||||||
|
|
||||||
// default 3 replica
|
// default 3 replica
|
||||||
for (int32_t i = 0; i < 3; ++i) {
|
for (int32_t i = 0; i < 3; ++i) {
|
||||||
|
|
||||||
|
|
|
@ -212,10 +212,8 @@ static void destroyThrdObj(SCliThrdObj* pThrd);
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define CONN_NO_PERSIST_BY_APP(conn) \
|
#define CONN_NO_PERSIST_BY_APP(conn) (((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||||
(((conn)->status == ConnNormal || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
#define CONN_RELEASE_BY_SERVER(conn) (((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
||||||
#define CONN_RELEASE_BY_SERVER(conn) \
|
|
||||||
(((conn)->status == ConnRelease || (conn)->status == ConnInPool) && T_REF_VAL_GET(conn) == 1)
|
|
||||||
|
|
||||||
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
#define REQUEST_NO_RESP(msg) ((msg)->noResp == 1)
|
||||||
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
#define REQUEST_PERSIS_HANDLE(msg) ((msg)->persistHandle == 1)
|
||||||
|
@ -290,9 +288,8 @@ void cliHandleResp(SCliConn* conn) {
|
||||||
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
tDebug("%s cli conn %p ref by app", CONN_GET_INST_LABEL(conn), conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn,
|
tDebug("%s cli conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pTransInst->label, conn, TMSG_INFO(pHead->msgType),
|
||||||
TMSG_INFO(pHead->msgType), taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port),
|
taosInetNtoa(conn->addr.sin_addr), ntohs(conn->addr.sin_port), taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
||||||
taosInetNtoa(conn->locaddr.sin_addr), ntohs(conn->locaddr.sin_port), transMsg.contLen);
|
|
||||||
|
|
||||||
conn->secured = pHead->secured;
|
conn->secured = pHead->secured;
|
||||||
|
|
||||||
|
@ -358,12 +355,10 @@ void cliHandleExcept(SCliConn* pConn) {
|
||||||
|
|
||||||
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
if (pMsg == NULL && !CONN_NO_PERSIST_BY_APP(pConn)) {
|
||||||
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
transMsg.ahandle = transCtxDumpVal(&pConn->ctx, transMsg.msgType);
|
||||||
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle,
|
tDebug("%s cli conn %p construct ahandle %p by %s", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle, TMSG_INFO(transMsg.msgType));
|
||||||
TMSG_INFO(transMsg.msgType));
|
|
||||||
if (transMsg.ahandle == NULL) {
|
if (transMsg.ahandle == NULL) {
|
||||||
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
|
transMsg.ahandle = transCtxDumpBrokenlinkVal(&pConn->ctx, (int32_t*)&(transMsg.msgType));
|
||||||
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn,
|
tDebug("%s cli conn %p construct ahandle %p due to brokenlink", CONN_GET_INST_LABEL(pConn), pConn, transMsg.ahandle);
|
||||||
transMsg.ahandle);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
transMsg.ahandle = pCtx ? pCtx->ahandle : NULL;
|
||||||
|
@ -546,6 +541,7 @@ static void cliDestroy(uv_handle_t* handle) {
|
||||||
transCtxCleanup(&conn->ctx);
|
transCtxCleanup(&conn->ctx);
|
||||||
transQueueDestroy(&conn->cliMsgs);
|
transQueueDestroy(&conn->cliMsgs);
|
||||||
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
tTrace("%s cli conn %p destroy successfully", CONN_GET_INST_LABEL(conn), conn);
|
||||||
|
transDestroyBuffer(&conn->readBuf);
|
||||||
taosMemoryFree(conn);
|
taosMemoryFree(conn);
|
||||||
}
|
}
|
||||||
static bool cliHandleNoResp(SCliConn* conn) {
|
static bool cliHandleNoResp(SCliConn* conn) {
|
||||||
|
@ -635,9 +631,8 @@ void cliSend(SCliConn* pConn) {
|
||||||
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
pHead->release = REQUEST_RELEASE_HANDLE(pCliMsg) ? 1 : 0;
|
||||||
|
|
||||||
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
uv_buf_t wb = uv_buf_init((char*)pHead, msgLen);
|
||||||
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn,
|
tDebug("%s cli conn %p %s is send to %s:%d, local info %s:%d", CONN_GET_INST_LABEL(pConn), pConn, TMSG_INFO(pHead->msgType),
|
||||||
TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
|
||||||
|
|
||||||
if (pHead->persist == 1) {
|
if (pHead->persist == 1) {
|
||||||
CONN_SET_PERSIST_BY_APP(pConn);
|
CONN_SET_PERSIST_BY_APP(pConn);
|
||||||
|
@ -675,10 +670,9 @@ static void cliHandleQuit(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
tDebug("cli work thread %p start to quit", pThrd);
|
tDebug("cli work thread %p start to quit", pThrd);
|
||||||
destroyCmsg(pMsg);
|
destroyCmsg(pMsg);
|
||||||
destroyConnPool(pThrd->pool);
|
destroyConnPool(pThrd->pool);
|
||||||
|
|
||||||
uv_timer_stop(&pThrd->timer);
|
uv_timer_stop(&pThrd->timer);
|
||||||
|
|
||||||
pThrd->quit = true;
|
pThrd->quit = true;
|
||||||
|
|
||||||
uv_stop(pThrd->loop);
|
uv_stop(pThrd->loop);
|
||||||
}
|
}
|
||||||
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
static void cliHandleRelease(SCliMsg* pMsg, SCliThrdObj* pThrd) {
|
||||||
|
|
|
@ -195,7 +195,7 @@ SAsyncPool* transCreateAsyncPool(uv_loop_t* loop, int sz, void* arg, AsyncCB cb)
|
||||||
void transDestroyAsyncPool(SAsyncPool* pool) {
|
void transDestroyAsyncPool(SAsyncPool* pool) {
|
||||||
for (int i = 0; i < pool->nAsync; i++) {
|
for (int i = 0; i < pool->nAsync; i++) {
|
||||||
uv_async_t* async = &(pool->asyncs[i]);
|
uv_async_t* async = &(pool->asyncs[i]);
|
||||||
|
uv_close((uv_handle_t*)async, NULL);
|
||||||
SAsyncItem* item = async->data;
|
SAsyncItem* item = async->data;
|
||||||
taosThreadMutexDestroy(&item->mtx);
|
taosThreadMutexDestroy(&item->mtx);
|
||||||
taosMemoryFree(item);
|
taosMemoryFree(item);
|
||||||
|
|
|
@ -126,6 +126,11 @@ static void uvWorkerAsyncCb(uv_async_t* handle);
|
||||||
static void uvAcceptAsyncCb(uv_async_t* handle);
|
static void uvAcceptAsyncCb(uv_async_t* handle);
|
||||||
static void uvShutDownCb(uv_shutdown_t* req, int status);
|
static void uvShutDownCb(uv_shutdown_t* req, int status);
|
||||||
|
|
||||||
|
static void uvFreeCb(uv_handle_t* handle) {
|
||||||
|
//
|
||||||
|
taosMemoryFree(handle);
|
||||||
|
}
|
||||||
|
|
||||||
static void uvStartSendRespInternal(SSrvMsg* smsg);
|
static void uvStartSendRespInternal(SSrvMsg* smsg);
|
||||||
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
|
||||||
static void uvStartSendResp(SSrvMsg* msg);
|
static void uvStartSendResp(SSrvMsg* msg);
|
||||||
|
@ -141,8 +146,7 @@ static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
|
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
|
||||||
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
|
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, uvHandleRegister};
|
||||||
uvHandleRegister};
|
|
||||||
|
|
||||||
static void uvDestroyConn(uv_handle_t* handle);
|
static void uvDestroyConn(uv_handle_t* handle);
|
||||||
|
|
||||||
|
@ -205,13 +209,12 @@ static void uvHandleReq(SSrvConn* pConn) {
|
||||||
}
|
}
|
||||||
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
if (pConn->status == ConnNormal && pHead->noResp == 0) {
|
||||||
transRefSrvHandle(pConn);
|
transRefSrvHandle(pConn);
|
||||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType),
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d", pConn, TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr),
|
||||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
|
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
||||||
ntohs(pConn->locaddr.sin_port), transMsg.contLen);
|
|
||||||
} else {
|
} else {
|
||||||
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn,
|
tDebug("server conn %p %s received from %s:%d, local info: %s:%d, msg size: %d, resp:%d ", pConn, TMSG_INFO(transMsg.msgType),
|
||||||
TMSG_INFO(transMsg.msgType), taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port),
|
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port),
|
||||||
taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port), transMsg.contLen, pHead->noResp);
|
transMsg.contLen, pHead->noResp);
|
||||||
// no ref here
|
// no ref here
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -318,6 +321,8 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
|
||||||
} else {
|
} else {
|
||||||
tError("fail to dispatch conn to work thread");
|
tError("fail to dispatch conn to work thread");
|
||||||
}
|
}
|
||||||
|
uv_close((uv_handle_t*)req->data, uvFreeCb);
|
||||||
|
// taosMemoryFree(req->data);
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -349,9 +354,8 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
|
||||||
|
|
||||||
char* msg = (char*)pHead;
|
char* msg = (char*)pHead;
|
||||||
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
int32_t len = transMsgLenFromCont(pMsg->contLen);
|
||||||
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType),
|
tDebug("server conn %p %s is sent to %s:%d, local info: %s:%d", pConn, TMSG_INFO(pHead->msgType), taosInetNtoa(pConn->addr.sin_addr),
|
||||||
taosInetNtoa(pConn->addr.sin_addr), ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr),
|
ntohs(pConn->addr.sin_port), taosInetNtoa(pConn->locaddr.sin_addr), ntohs(pConn->locaddr.sin_port));
|
||||||
ntohs(pConn->locaddr.sin_port));
|
|
||||||
pHead->msgLen = htonl(len);
|
pHead->msgLen = htonl(len);
|
||||||
|
|
||||||
wb->base = msg;
|
wb->base = msg;
|
||||||
|
@ -429,11 +433,39 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
|
||||||
(*transAsyncHandle[msg->type])(msg, pThrd);
|
(*transAsyncHandle[msg->type])(msg, pThrd);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static void uvWalkCb(uv_handle_t* handle, void* arg) {
|
||||||
|
if (!uv_is_closing(handle)) {
|
||||||
|
uv_close(handle, NULL);
|
||||||
|
// uv_unref(handle);
|
||||||
|
tDebug("handle: %p -----test----", handle);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#define MAKE_VALGRIND_HAPPY(loop) \
|
||||||
|
do { \
|
||||||
|
uv_walk(loop, uvWalkCb, NULL); \
|
||||||
|
uv_run(loop, UV_RUN_DEFAULT); \
|
||||||
|
uv_loop_close(loop); \
|
||||||
|
} while (0);
|
||||||
|
|
||||||
static void uvAcceptAsyncCb(uv_async_t* async) {
|
static void uvAcceptAsyncCb(uv_async_t* async) {
|
||||||
SServerObj* srv = async->data;
|
SServerObj* srv = async->data;
|
||||||
tDebug("close server port %d", srv->port);
|
tDebug("close server port %d", srv->port);
|
||||||
uv_close((uv_handle_t*)&srv->server, NULL);
|
uv_walk(srv->loop, uvWalkCb, NULL);
|
||||||
uv_stop(srv->loop);
|
// uv_close((uv_handle_t*)async, NULL);
|
||||||
|
// uv_close((uv_handle_t*)&srv->server, NULL);
|
||||||
|
// uv_stop(srv->loop);
|
||||||
|
// uv_print_all_handles(srv->loop, stderr);
|
||||||
|
// int ref = uv_loop_alive(srv->loop);
|
||||||
|
// assert(ref == 0);
|
||||||
|
// tError("active size %d", ref);
|
||||||
|
// uv_stop(srv->loop);
|
||||||
|
// uv_run(srv->loop, UV_RUN_DEFAULT);
|
||||||
|
// fprintf(stderr, "------------------------------------");
|
||||||
|
// uv_print_all_handles(srv->loop, stderr);
|
||||||
|
|
||||||
|
// int ret = uv_loop_close(srv->loop);
|
||||||
|
// tError("(loop)->active_reqs.count: %d, ret: %d", (srv->loop)->active_reqs.count, ret);
|
||||||
|
// assert(ret == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void uvShutDownCb(uv_shutdown_t* req, int status) {
|
static void uvShutDownCb(uv_shutdown_t* req, int status) {
|
||||||
|
@ -455,16 +487,16 @@ void uvOnAcceptCb(uv_stream_t* stream, int status) {
|
||||||
|
|
||||||
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
|
if (uv_accept(stream, (uv_stream_t*)cli) == 0) {
|
||||||
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
|
uv_write_t* wr = (uv_write_t*)taosMemoryMalloc(sizeof(uv_write_t));
|
||||||
|
wr->data = cli;
|
||||||
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
|
uv_buf_t buf = uv_buf_init((char*)notify, strlen(notify));
|
||||||
|
|
||||||
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
|
pObj->workerIdx = (pObj->workerIdx + 1) % pObj->numOfThreads;
|
||||||
|
|
||||||
tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
|
tTrace("new conntion accepted by main server, dispatch to %dth worker-thread", pObj->workerIdx);
|
||||||
|
|
||||||
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
|
uv_write2(wr, (uv_stream_t*)&(pObj->pipe[pObj->workerIdx][0]), &buf, 1, (uv_stream_t*)cli, uvOnPipeWriteCb);
|
||||||
} else {
|
} else {
|
||||||
uv_close((uv_handle_t*)cli, NULL);
|
uv_close((uv_handle_t*)cli, NULL);
|
||||||
taosMemoryFree(cli);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
|
@ -474,7 +506,10 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
|
||||||
tError("read error %s", uv_err_name(nread));
|
tError("read error %s", uv_err_name(nread));
|
||||||
}
|
}
|
||||||
// TODO(log other failure reason)
|
// TODO(log other failure reason)
|
||||||
// uv_close((uv_handle_t*)q, NULL);
|
tError("failed to create connect: %p", q);
|
||||||
|
taosMemoryFree(buf->base);
|
||||||
|
uv_close((uv_handle_t*)q, NULL);
|
||||||
|
// taosMemoryFree(q);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// free memory allocated by
|
// free memory allocated by
|
||||||
|
@ -650,6 +685,7 @@ static void uvDestroyConn(uv_handle_t* handle) {
|
||||||
|
|
||||||
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
if (thrd->quit && QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||||
tTrace("work thread quit");
|
tTrace("work thread quit");
|
||||||
|
// uv_walk(thrd->loop, uvWalkCb, NULL);
|
||||||
uv_loop_close(thrd->loop);
|
uv_loop_close(thrd->loop);
|
||||||
uv_stop(thrd->loop);
|
uv_stop(thrd->loop);
|
||||||
}
|
}
|
||||||
|
@ -713,6 +749,7 @@ End:
|
||||||
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
|
||||||
thrd->quit = true;
|
thrd->quit = true;
|
||||||
if (QUEUE_IS_EMPTY(&thrd->conn)) {
|
if (QUEUE_IS_EMPTY(&thrd->conn)) {
|
||||||
|
// uv_walk(thrd->loop, uvWalkCb, NULL);
|
||||||
uv_loop_close(thrd->loop);
|
uv_loop_close(thrd->loop);
|
||||||
uv_stop(thrd->loop);
|
uv_stop(thrd->loop);
|
||||||
} else {
|
} else {
|
||||||
|
@ -765,8 +802,9 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
taosThreadJoin(pThrd->thread, NULL);
|
taosThreadJoin(pThrd->thread, NULL);
|
||||||
taosMemoryFree(pThrd->loop);
|
// MAKE_VALGRIND_HAPPY(pThrd->loop);
|
||||||
transDestroyAsyncPool(pThrd->asyncPool);
|
transDestroyAsyncPool(pThrd->asyncPool);
|
||||||
|
taosMemoryFree(pThrd->loop);
|
||||||
taosMemoryFree(pThrd);
|
taosMemoryFree(pThrd);
|
||||||
}
|
}
|
||||||
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
|
||||||
|
@ -784,6 +822,8 @@ void transCloseServer(void* arg) {
|
||||||
uv_async_send(srv->pAcceptAsync);
|
uv_async_send(srv->pAcceptAsync);
|
||||||
taosThreadJoin(srv->thread, NULL);
|
taosThreadJoin(srv->thread, NULL);
|
||||||
|
|
||||||
|
MAKE_VALGRIND_HAPPY(srv->loop);
|
||||||
|
|
||||||
for (int i = 0; i < srv->numOfThreads; i++) {
|
for (int i = 0; i < srv->numOfThreads; i++) {
|
||||||
sendQuitToWorkThrd(srv->pThreadObj[i]);
|
sendQuitToWorkThrd(srv->pThreadObj[i]);
|
||||||
destroyWorkThrd(srv->pThreadObj[i]);
|
destroyWorkThrd(srv->pThreadObj[i]);
|
||||||
|
|
|
@ -15,9 +15,9 @@
|
||||||
|
|
||||||
#define __USE_XOPEN
|
#define __USE_XOPEN
|
||||||
|
|
||||||
|
#include "shellCommand.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "shell.h"
|
#include "shell.h"
|
||||||
#include "shellCommand.h"
|
|
||||||
|
|
||||||
#include <regex.h>
|
#include <regex.h>
|
||||||
|
|
||||||
|
@ -48,7 +48,7 @@ void getPrevCharSize(const char *str, int pos, int *size, int *width) {
|
||||||
while (--pos >= 0) {
|
while (--pos >= 0) {
|
||||||
*size += 1;
|
*size += 1;
|
||||||
|
|
||||||
if (str[pos] > 0 || countPrefixOnes((unsigned char )str[pos]) > 1) break;
|
if (str[pos] > 0 || countPrefixOnes((unsigned char)str[pos]) > 1) break;
|
||||||
}
|
}
|
||||||
|
|
||||||
int rc = taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
|
int rc = taosMbToWchar(&wc, str + pos, MB_CUR_MAX);
|
||||||
|
@ -106,8 +106,7 @@ void clearLineBefore(Command *cmd) {
|
||||||
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
|
assert(cmd->cursorOffset <= cmd->commandSize && cmd->endOffset >= cmd->screenOffset);
|
||||||
|
|
||||||
clearScreen(cmd->endOffset + prompt_size, cmd->screenOffset + prompt_size);
|
clearScreen(cmd->endOffset + prompt_size, cmd->screenOffset + prompt_size);
|
||||||
memmove(cmd->command, cmd->command + cmd->cursorOffset,
|
memmove(cmd->command, cmd->command + cmd->cursorOffset, cmd->commandSize - cmd->cursorOffset);
|
||||||
cmd->commandSize - cmd->cursorOffset);
|
|
||||||
cmd->commandSize -= cmd->cursorOffset;
|
cmd->commandSize -= cmd->cursorOffset;
|
||||||
cmd->cursorOffset = 0;
|
cmd->cursorOffset = 0;
|
||||||
cmd->screenOffset = 0;
|
cmd->screenOffset = 0;
|
||||||
|
@ -235,8 +234,8 @@ int isReadyGo(Command *cmd) {
|
||||||
sprintf(total, "%s%s", cmd->buffer, cmd->command);
|
sprintf(total, "%s%s", cmd->buffer, cmd->command);
|
||||||
|
|
||||||
char *reg_str =
|
char *reg_str =
|
||||||
"(^.*;\\s*$)|(^\\s*$)|(^\\s*exit\\s*$)|(^\\s*q\\s*$)|(^\\s*quit\\s*$)|(^"
|
"(^.*;\\s*$)|(^\\s*$)|(^\\s*exit\\s*$)|(^\\s*q\\s*$)|(^\\s*quit\\s*$)|(^"
|
||||||
"\\s*clear\\s*$)";
|
"\\s*clear\\s*$)";
|
||||||
if (regex_match(total, reg_str, REG_EXTENDED | REG_ICASE)) {
|
if (regex_match(total, reg_str, REG_EXTENDED | REG_ICASE)) {
|
||||||
taosMemoryFree(total);
|
taosMemoryFree(total);
|
||||||
return 1;
|
return 1;
|
||||||
|
|
|
@ -23,40 +23,43 @@
|
||||||
#include "shellCommand.h"
|
#include "shellCommand.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
|
#include "tconfig.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
#include "ttypes.h"
|
#include "ttypes.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
#include "tconfig.h"
|
|
||||||
|
|
||||||
#include <regex.h>
|
#include <regex.h>
|
||||||
#include <wordexp.h>
|
#include <wordexp.h>
|
||||||
|
|
||||||
/**************** Global variables ****************/
|
/**************** Global variables ****************/
|
||||||
#ifdef _TD_POWER_
|
#ifdef _TD_POWER_
|
||||||
char CLIENT_VERSION[] = "Welcome to the PowerDB shell from %s, Client Version:%s\n"
|
char CLIENT_VERSION[] =
|
||||||
"Copyright (c) 2020 by PowerDB, Inc. All rights reserved.\n\n";
|
"Welcome to the PowerDB shell from %s, Client Version:%s\n"
|
||||||
char PROMPT_HEADER[] = "power> ";
|
"Copyright (c) 2020 by PowerDB, Inc. All rights reserved.\n\n";
|
||||||
|
char PROMPT_HEADER[] = "power> ";
|
||||||
|
|
||||||
char CONTINUE_PROMPT[] = " -> ";
|
char CONTINUE_PROMPT[] = " -> ";
|
||||||
int prompt_size = 7;
|
int prompt_size = 7;
|
||||||
#elif (_TD_TQ_ == true)
|
#elif (_TD_TQ_ == true)
|
||||||
char CLIENT_VERSION[] = "Welcome to the TQ shell from %s, Client Version:%s\n"
|
char CLIENT_VERSION[] =
|
||||||
"Copyright (c) 2020 by TQ, Inc. All rights reserved.\n\n";
|
"Welcome to the TQ shell from %s, Client Version:%s\n"
|
||||||
char PROMPT_HEADER[] = "tq> ";
|
"Copyright (c) 2020 by TQ, Inc. All rights reserved.\n\n";
|
||||||
|
char PROMPT_HEADER[] = "tq> ";
|
||||||
|
|
||||||
char CONTINUE_PROMPT[] = " -> ";
|
char CONTINUE_PROMPT[] = " -> ";
|
||||||
int prompt_size = 4;
|
int prompt_size = 4;
|
||||||
#else
|
#else
|
||||||
char CLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n"
|
char CLIENT_VERSION[] =
|
||||||
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
|
"Welcome to the TDengine shell from %s, Client Version:%s\n"
|
||||||
char PROMPT_HEADER[] = "taos> ";
|
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
|
||||||
|
char PROMPT_HEADER[] = "taos> ";
|
||||||
|
|
||||||
char CONTINUE_PROMPT[] = " -> ";
|
char CONTINUE_PROMPT[] = " -> ";
|
||||||
int prompt_size = 6;
|
int prompt_size = 6;
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int64_t result = 0;
|
int64_t result = 0;
|
||||||
SShellHistory history;
|
SShellHistory history;
|
||||||
|
|
||||||
#define DEFAULT_MAX_BINARY_DISPLAY_WIDTH 30
|
#define DEFAULT_MAX_BINARY_DISPLAY_WIDTH 30
|
||||||
extern int32_t tsMaxBinaryDisplayWidth;
|
extern int32_t tsMaxBinaryDisplayWidth;
|
||||||
|
@ -344,7 +347,7 @@ void shellRunCommandOnServer(TAOS *con, char command[]) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_FIELD* pFields = taos_fetch_fields(pSql);
|
TAOS_FIELD *pFields = taos_fetch_fields(pSql);
|
||||||
if (pFields != NULL) { // select and show kinds of commands
|
if (pFields != NULL) { // select and show kinds of commands
|
||||||
int error_no = 0;
|
int error_no = 0;
|
||||||
|
|
||||||
|
@ -521,7 +524,8 @@ static int dumpResultToFile(const char *fname, TAOS_RES *tres) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// FILE *fp = fopen(full_path.we_wordv[0], "w");
|
// FILE *fp = fopen(full_path.we_wordv[0], "w");
|
||||||
TdFilePtr pFile = taosOpenFile(full_path.we_wordv[0], TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
TdFilePtr pFile =
|
||||||
|
taosOpenFile(full_path.we_wordv[0], TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||||
if (pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
fprintf(stderr, "ERROR: failed to open file: %s\n", full_path.we_wordv[0]);
|
fprintf(stderr, "ERROR: failed to open file: %s\n", full_path.we_wordv[0]);
|
||||||
wordfree(&full_path);
|
wordfree(&full_path);
|
||||||
|
@ -901,8 +905,8 @@ void read_history() {
|
||||||
memset(history.hist, 0, sizeof(char *) * MAX_HISTORY_SIZE);
|
memset(history.hist, 0, sizeof(char *) * MAX_HISTORY_SIZE);
|
||||||
history.hstart = 0;
|
history.hstart = 0;
|
||||||
history.hend = 0;
|
history.hend = 0;
|
||||||
char *line = NULL;
|
char *line = NULL;
|
||||||
int read_size = 0;
|
int read_size = 0;
|
||||||
|
|
||||||
char f_history[TSDB_FILENAME_LEN];
|
char f_history[TSDB_FILENAME_LEN];
|
||||||
get_history_path(f_history);
|
get_history_path(f_history);
|
||||||
|
@ -929,7 +933,7 @@ void read_history() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if(line != NULL) taosMemoryFree(line);
|
if (line != NULL) taosMemoryFree(line);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1026,7 +1030,7 @@ void source_file(TAOS *con, char *fptr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(cmd);
|
taosMemoryFree(cmd);
|
||||||
if(line != NULL) taosMemoryFree(line);
|
if (line != NULL) taosMemoryFree(line);
|
||||||
wordfree(&full_path);
|
wordfree(&full_path);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
}
|
}
|
||||||
|
|
|
@ -14,45 +14,38 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define __USE_XOPEN
|
#define __USE_XOPEN
|
||||||
#include "os.h"
|
|
||||||
#include "shell.h"
|
|
||||||
#include "tglobal.h"
|
|
||||||
#include "tconfig.h"
|
|
||||||
#include "shellCommand.h"
|
#include "shellCommand.h"
|
||||||
#include "tbase64.h"
|
#include "tglobal.h"
|
||||||
#include "tlog.h"
|
#include "tlog.h"
|
||||||
#include "version.h"
|
|
||||||
|
|
||||||
#include <wordexp.h>
|
|
||||||
#include <argp.h>
|
#include <argp.h>
|
||||||
#include <termio.h>
|
#include <termio.h>
|
||||||
|
#include <wordexp.h>
|
||||||
|
|
||||||
#define OPT_ABORT 1 /* abort */
|
#define OPT_ABORT 1 /* abort */
|
||||||
|
|
||||||
|
|
||||||
int indicator = 1;
|
int indicator = 1;
|
||||||
|
|
||||||
void insertChar(Command *cmd, char *c, int size);
|
void insertChar(Command *cmd, char *c, int size);
|
||||||
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen,
|
void taosNetTest(char *role, char *host, int32_t port, int32_t pkgLen, int32_t pkgNum, char *pkgType);
|
||||||
int32_t pkgNum, char *pkgType);
|
|
||||||
const char *argp_program_version = version;
|
const char *argp_program_version = version;
|
||||||
const char *argp_program_bug_address = "<support@taosdata.com>";
|
const char *argp_program_bug_address = "<support@taosdata.com>";
|
||||||
static char doc[] = "";
|
static char doc[] = "";
|
||||||
static char args_doc[] = "";
|
static char args_doc[] = "";
|
||||||
|
|
||||||
TdThread pid;
|
TdThread pid;
|
||||||
static tsem_t cancelSem;
|
static tsem_t cancelSem;
|
||||||
|
|
||||||
static struct argp_option options[] = {
|
static struct argp_option options[] = {
|
||||||
{"host", 'h', "HOST", 0, "TDengine server FQDN to connect. The default host is localhost."},
|
{"host", 'h', "HOST", 0, "TDengine server FQDN to connect. The default host is localhost."},
|
||||||
{"password", 'p', 0, 0, "The password to use when connecting to the server."},
|
{"password", 'p', NULL, 0, "The password to use when connecting to the server."},
|
||||||
{"port", 'P', "PORT", 0, "The TCP/IP port number to use for the connection."},
|
{"port", 'P', "PORT", 0, "The TCP/IP port number to use for the connection."},
|
||||||
{"user", 'u', "USER", 0, "The user name to use when connecting to the server."},
|
{"user", 'u', "USER", 0, "The user name to use when connecting to the server."},
|
||||||
{"auth", 'A', "Auth", 0, "The auth string to use when connecting to the server."},
|
{"auth", 'A', "Auth", 0, "The auth string to use when connecting to the server."},
|
||||||
{"config-dir", 'c', "CONFIG_DIR", 0, "Configuration directory."},
|
{"config-dir", 'c', "CONFIG_DIR", 0, "Configuration directory."},
|
||||||
{"dump-config", 'C', 0, 0, "Dump configuration."},
|
{"dump-config",'C', NULL, 0, "Dump configuration."},
|
||||||
{"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."},
|
{"commands", 's', "COMMANDS", 0, "Commands to run without enter the shell."},
|
||||||
{"raw-time", 'r', 0, 0, "Output time as uint64_t."},
|
{"raw-time", 'r', NULL, 0, "Output time as uint64_t."},
|
||||||
{"file", 'f', "FILE", 0, "Script to run without enter the shell."},
|
{"file", 'f', "FILE", 0, "Script to run without enter the shell."},
|
||||||
{"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."},
|
{"directory", 'D', "DIRECTORY", 0, "Use multi-thread to import all SQL files in the directory separately."},
|
||||||
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
|
{"thread", 'T', "THREADNUM", 0, "Number of threads when using multi-thread to import data."},
|
||||||
|
@ -70,7 +63,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
||||||
/* Get the input argument from argp_parse, which we
|
/* Get the input argument from argp_parse, which we
|
||||||
know is a pointer to our arguments structure. */
|
know is a pointer to our arguments structure. */
|
||||||
SShellArguments *arguments = state->input;
|
SShellArguments *arguments = state->input;
|
||||||
wordexp_t full_path;
|
wordexp_t full_path;
|
||||||
|
|
||||||
switch (key) {
|
switch (key) {
|
||||||
case 'h':
|
case 'h':
|
||||||
|
@ -80,7 +73,7 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
||||||
break;
|
break;
|
||||||
case 'P':
|
case 'P':
|
||||||
if (arg) {
|
if (arg) {
|
||||||
arguments->port = atoi(arg);
|
arguments->port = atoi(arg);
|
||||||
} else {
|
} else {
|
||||||
fprintf(stderr, "Invalid port\n");
|
fprintf(stderr, "Invalid port\n");
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -182,35 +175,33 @@ static error_t parse_opt(int key, char *arg, struct argp_state *state) {
|
||||||
/* Our argp parser. */
|
/* Our argp parser. */
|
||||||
static struct argp argp = {options, parse_opt, args_doc, doc};
|
static struct argp argp = {options, parse_opt, args_doc, doc};
|
||||||
|
|
||||||
char LINUXCLIENT_VERSION[] = "Welcome to the TDengine shell from %s, Client Version:%s\n"
|
char LINUXCLIENT_VERSION[] =
|
||||||
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
|
"Welcome to the TDengine shell from %s, Client Version:%s\n"
|
||||||
|
"Copyright (c) 2020 by TAOS Data, Inc. All rights reserved.\n\n";
|
||||||
char g_password[SHELL_MAX_PASSWORD_LEN];
|
char g_password[SHELL_MAX_PASSWORD_LEN];
|
||||||
|
|
||||||
static void parse_args(
|
static void parse_args(int argc, char *argv[], SShellArguments *arguments) {
|
||||||
int argc, char *argv[], SShellArguments *arguments) {
|
for (int i = 1; i < argc; i++) {
|
||||||
for (int i = 1; i < argc; i++) {
|
if ((strncmp(argv[i], "-p", 2) == 0) || (strncmp(argv[i], "--password", 10) == 0)) {
|
||||||
if ((strncmp(argv[i], "-p", 2) == 0)
|
printf(LINUXCLIENT_VERSION, tsOsName, taos_get_client_info());
|
||||||
|| (strncmp(argv[i], "--password", 10) == 0)) {
|
if ((strlen(argv[i]) == 2) || (strncmp(argv[i], "--password", 10) == 0)) {
|
||||||
printf(LINUXCLIENT_VERSION, tsOsName, taos_get_client_info());
|
printf("Enter password: ");
|
||||||
if ((strlen(argv[i]) == 2)
|
taosSetConsoleEcho(false);
|
||||||
|| (strncmp(argv[i], "--password", 10) == 0)) {
|
if (scanf("%20s", g_password) > 1) {
|
||||||
printf("Enter password: ");
|
fprintf(stderr, "password reading error\n");
|
||||||
taosSetConsoleEcho(false);
|
|
||||||
if (scanf("%20s", g_password) > 1) {
|
|
||||||
fprintf(stderr, "password reading error\n");
|
|
||||||
}
|
|
||||||
taosSetConsoleEcho(true);
|
|
||||||
if (EOF == getchar()) {
|
|
||||||
fprintf(stderr, "getchar() return EOF\n");
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
tstrncpy(g_password, (char *)(argv[i] + 2), SHELL_MAX_PASSWORD_LEN);
|
|
||||||
strcpy(argv[i], "-p");
|
|
||||||
}
|
|
||||||
arguments->password = g_password;
|
|
||||||
arguments->is_use_passwd = true;
|
|
||||||
}
|
}
|
||||||
|
taosSetConsoleEcho(true);
|
||||||
|
if (EOF == getchar()) {
|
||||||
|
fprintf(stderr, "getchar() return EOF\n");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
tstrncpy(g_password, (char *)(argv[i] + 2), SHELL_MAX_PASSWORD_LEN);
|
||||||
|
strcpy(argv[i], "-p");
|
||||||
|
}
|
||||||
|
arguments->password = g_password;
|
||||||
|
arguments->is_use_passwd = true;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
|
void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
|
||||||
|
@ -225,20 +216,20 @@ void shellParseArgument(int argc, char *argv[], SShellArguments *arguments) {
|
||||||
|
|
||||||
argp_parse(&argp, argc, argv, 0, 0, arguments);
|
argp_parse(&argp, argc, argv, 0, 0, arguments);
|
||||||
if (arguments->abort) {
|
if (arguments->abort) {
|
||||||
#ifndef _ALPINE
|
#ifndef _ALPINE
|
||||||
#if 0
|
#if 0
|
||||||
error(10, 0, "ABORTED");
|
error(10, 0, "ABORTED");
|
||||||
#endif
|
#endif
|
||||||
#else
|
#else
|
||||||
abort();
|
abort();
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t shellReadCommand(TAOS *con, char *command) {
|
int32_t shellReadCommand(TAOS *con, char *command) {
|
||||||
unsigned hist_counter = history.hend;
|
unsigned hist_counter = history.hend;
|
||||||
char utf8_array[10] = "\0";
|
char utf8_array[10] = "\0";
|
||||||
Command cmd;
|
Command cmd;
|
||||||
memset(&cmd, 0, sizeof(cmd));
|
memset(&cmd, 0, sizeof(cmd));
|
||||||
cmd.buffer = (char *)taosMemoryCalloc(1, MAX_COMMAND_SIZE);
|
cmd.buffer = (char *)taosMemoryCalloc(1, MAX_COMMAND_SIZE);
|
||||||
cmd.command = (char *)taosMemoryCalloc(1, MAX_COMMAND_SIZE);
|
cmd.command = (char *)taosMemoryCalloc(1, MAX_COMMAND_SIZE);
|
||||||
|
@ -247,7 +238,7 @@ int32_t shellReadCommand(TAOS *con, char *command) {
|
||||||
// Read input.
|
// Read input.
|
||||||
char c;
|
char c;
|
||||||
while (1) {
|
while (1) {
|
||||||
c = (char)getchar(); // getchar() return an 'int' value
|
c = (char)getchar(); // getchar() return an 'int' value
|
||||||
|
|
||||||
if (c == EOF) {
|
if (c == EOF) {
|
||||||
return c;
|
return c;
|
||||||
|
@ -406,7 +397,7 @@ void *shellLoopQuery(void *arg) {
|
||||||
taosThreadCleanupPush(cleanup_handler, NULL);
|
taosThreadCleanupPush(cleanup_handler, NULL);
|
||||||
|
|
||||||
char *command = taosMemoryMalloc(MAX_COMMAND_SIZE);
|
char *command = taosMemoryMalloc(MAX_COMMAND_SIZE);
|
||||||
if (command == NULL){
|
if (command == NULL) {
|
||||||
uError("failed to malloc command");
|
uError("failed to malloc command");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -437,7 +428,7 @@ void get_history_path(char *_history) { snprintf(_history, TSDB_FILENAME_LEN, "%
|
||||||
void clearScreen(int ecmd_pos, int cursor_pos) {
|
void clearScreen(int ecmd_pos, int cursor_pos) {
|
||||||
struct winsize w;
|
struct winsize w;
|
||||||
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
|
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
|
||||||
//fprintf(stderr, "No stream device, and use default value(col 120, row 30)\n");
|
// fprintf(stderr, "No stream device, and use default value(col 120, row 30)\n");
|
||||||
w.ws_col = 120;
|
w.ws_col = 120;
|
||||||
w.ws_row = 30;
|
w.ws_row = 30;
|
||||||
}
|
}
|
||||||
|
@ -458,13 +449,13 @@ void clearScreen(int ecmd_pos, int cursor_pos) {
|
||||||
void showOnScreen(Command *cmd) {
|
void showOnScreen(Command *cmd) {
|
||||||
struct winsize w;
|
struct winsize w;
|
||||||
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
|
if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) {
|
||||||
//fprintf(stderr, "No stream device\n");
|
// fprintf(stderr, "No stream device\n");
|
||||||
w.ws_col = 120;
|
w.ws_col = 120;
|
||||||
w.ws_row = 30;
|
w.ws_row = 30;
|
||||||
}
|
}
|
||||||
|
|
||||||
TdWchar wc;
|
TdWchar wc;
|
||||||
int size = 0;
|
int size = 0;
|
||||||
|
|
||||||
// Print out the command.
|
// Print out the command.
|
||||||
char *total_string = taosMemoryMalloc(MAX_COMMAND_SIZE);
|
char *total_string = taosMemoryMalloc(MAX_COMMAND_SIZE);
|
||||||
|
@ -531,13 +522,11 @@ void showOnScreen(Command *cmd) {
|
||||||
void cleanup_handler(void *arg) { resetTerminalMode(); }
|
void cleanup_handler(void *arg) { resetTerminalMode(); }
|
||||||
|
|
||||||
void exitShell() {
|
void exitShell() {
|
||||||
/*int32_t ret =*/ resetTerminalMode();
|
/*int32_t ret =*/resetTerminalMode();
|
||||||
taos_cleanup();
|
taos_cleanup();
|
||||||
exit(EXIT_SUCCESS);
|
exit(EXIT_SUCCESS);
|
||||||
}
|
}
|
||||||
void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) {
|
void shellQueryInterruptHandler(int32_t signum, void *sigInfo, void *context) { tsem_post(&cancelSem); }
|
||||||
tsem_post(&cancelSem);
|
|
||||||
}
|
|
||||||
|
|
||||||
void *cancelHandler(void *arg) {
|
void *cancelHandler(void *arg) {
|
||||||
setThreadName("cancelHandler");
|
setThreadName("cancelHandler");
|
||||||
|
@ -640,11 +629,11 @@ int main(int argc, char *argv[]) {
|
||||||
con = taos_connect_auth(args.host, args.user, args.auth, args.database, args.port);
|
con = taos_connect_auth(args.host, args.user, args.auth, args.database, args.port);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if (taos_init()) {
|
// if (taos_init()) {
|
||||||
printf("Failed to init taos");
|
// printf("Failed to init taos");
|
||||||
exit(EXIT_FAILURE);
|
// exit(EXIT_FAILURE);
|
||||||
}
|
// }
|
||||||
*/
|
|
||||||
taosNetTest(args.netTestRole, args.host, args.port, args.pktLen, args.pktNum, args.pktType);
|
taosNetTest(args.netTestRole, args.host, args.port, args.pktLen, args.pktNum, args.pktType);
|
||||||
taos_close(con);
|
taos_close(con);
|
||||||
exit(0);
|
exit(0);
|
||||||
|
|
Loading…
Reference in New Issue