From 1d2e13c985fc1e3b47b5368a55d636669dcab909 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 28 May 2022 21:10:09 +0800 Subject: [PATCH] enh: refactor rpc code --- source/libs/executor/src/executorimpl.c | 28 ++-- .../transport/src/{transSrv.c => transSvr.c} | 152 +++++++++--------- 2 files changed, 89 insertions(+), 91 deletions(-) rename source/libs/transport/src/{transSrv.c => transSvr.c} (91%) diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index bd4ae7afde..089b06ac8f 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4510,8 +4510,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo)); } - SArray* tableIdList = extractTableIdList(pTableListInfo); - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode, pTaskInfo); + SArray* tableIdList = extractTableIdList(pTableListInfo); + SOperatorInfo* pOperator = + createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode, pTaskInfo); taosArrayDestroy(tableIdList); return pOperator; @@ -4699,9 +4700,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } int32_t compareTimeWindow(const void* p1, const void* p2, const void* param) { - const SQueryTableDataCond *pCond = param; - const STimeWindow *pWin1 = p1; - const STimeWindow *pWin2 = p2; + const SQueryTableDataCond* pCond = param; + const STimeWindow* pWin1 = p1; + const STimeWindow* pWin2 = p2; if (pCond->order == TSDB_ORDER_ASC) { return pWin1->skey - pWin2->skey; } else if (pCond->order == TSDB_ORDER_DESC) { @@ -4721,8 +4722,8 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi return terrno; } - //pCond->twindow = pTableScanNode->scanRange; - //TODO: get it from stable scan node + // pCond->twindow = pTableScanNode->scanRange; + // TODO: get it from stable scan node pCond->numOfTWindows = 1; pCond->twindows = taosMemoryCalloc(pCond->numOfTWindows, sizeof(STimeWindow)); pCond->twindows[0] = pTableScanNode->scanRange; @@ -4743,11 +4744,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi TSWAP(pCond->twindows[i].skey, pCond->twindows[i].ekey); } } - taosqsort(pCond->twindows, - pCond->numOfTWindows, - sizeof(STimeWindow), - pCond, - compareTimeWindow); + taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow); pCond->type = BLOCK_LOAD_OFFSET_SEQ_ORDER; // pCond->type = pTableScanNode->scanFlag; @@ -4907,7 +4904,8 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod return pList; } -int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond) { +int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, + SNode* pTagCond) { int32_t code = TSDB_CODE_SUCCESS; pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo)); @@ -4918,12 +4916,12 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa SArray* res = taosArrayInit(8, sizeof(uint64_t)); code = doFilterTag(pTagCond, &metaArg, res); if (code != TSDB_CODE_SUCCESS) { - qError("doFilterTag error:%d", code); + qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid); taosArrayDestroy(res); terrno = code; return code; } else { - qDebug("doFilterTag error:%d, suid: %" PRIu64 "", code, tableUid); + qDebug("sucess to get tableIds, size: %d, suid: %" PRIu64 "", (int)taosArrayGetSize(res), tableUid); } for (int i = 0; i < taosArrayGetSize(res); i++) { STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)}; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSvr.c similarity index 91% rename from source/libs/transport/src/transSrv.c rename to source/libs/transport/src/transSvr.c index 9018eaacf6..52b36433bb 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSvr.c @@ -20,15 +20,15 @@ static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT; static char* notify = "a"; -static int transSrvInst = 0; +static int tranSSvrInst = 0; typedef struct { int notifyCount; // int init; // init or not STransMsg msg; -} SSrvRegArg; +} SSvrRegArg; -typedef struct SSrvConn { +typedef struct SSvrConn { T_REF_DECLARE() uv_tcp_t* pTcp; uv_write_t pWriter; @@ -42,7 +42,7 @@ typedef struct SSrvConn { void* hostThrd; STransQueue srvMsgs; - SSrvRegArg regArg; + SSvrRegArg regArg; bool broken; // conn broken; ConnStatus status; @@ -55,14 +55,14 @@ typedef struct SSrvConn { char user[TSDB_UNI_LEN]; // user ID for the link char secret[TSDB_PASSWORD_LEN]; char ckey[TSDB_PASSWORD_LEN]; // ciphering key -} SSrvConn; +} SSvrConn; -typedef struct SSrvMsg { - SSrvConn* pConn; +typedef struct SSvrMsg { + SSvrConn* pConn; STransMsg msg; queue q; STransMsgType type; -} SSrvMsg; +} SSvrMsg; typedef struct SWorkThrdObj { TdThread thread; @@ -127,25 +127,25 @@ static void uvWorkAfterTask(uv_work_t* req, int status); static void uvWalkCb(uv_handle_t* handle, void* arg); static void uvFreeCb(uv_handle_t* handle); -static void uvStartSendRespInternal(SSrvMsg* smsg); -static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb); -static void uvStartSendResp(SSrvMsg* msg); +static void uvStartSendRespInternal(SSvrMsg* smsg); +static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb); +static void uvStartSendResp(SSvrMsg* msg); -static void uvNotifyLinkBrokenToApp(SSrvConn* conn); +static void uvNotifyLinkBrokenToApp(SSvrConn* conn); -static void destroySmsg(SSrvMsg* smsg); +static void destroySmsg(SSvrMsg* smsg); // check whether already read complete packet -static SSrvConn* createConn(void* hThrd); -static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/); -static void destroyConnRegArg(SSrvConn* conn); +static SSvrConn* createConn(void* hThrd); +static void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/); +static void destroyConnRegArg(SSvrConn* conn); -static int reallocConnRefHandle(SSrvConn* conn); +static int reallocConnRefHandle(SSvrConn* conn); -static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd); -static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd); -static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd); -static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd); -static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, +static void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd); +static void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd); +static void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd); +static void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd); +static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease, uvHandleRegister, NULL}; static int32_t exHandlesMgt; @@ -178,7 +178,7 @@ static bool addHandleToAcceptloop(void* arg); tTrace("server conn %p received release request", conn); \ \ STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \ - SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \ + SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \ srvMsg->msg = tmsg; \ srvMsg->type = Release; \ srvMsg->pConn = conn; \ @@ -233,18 +233,18 @@ static bool addHandleToAcceptloop(void* arg); } while (0) void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { - SSrvConn* conn = handle->data; + SSvrConn* conn = handle->data; SConnBuffer* pBuf = &conn->readBuf; transAllocBuffer(pBuf, buf); } // refers specifically to query or insert timeout static void uvHandleActivityTimeout(uv_timer_t* handle) { - SSrvConn* conn = handle->data; + SSvrConn* conn = handle->data; tDebug("%p timeout since no activity", conn); } -static void uvHandleReq(SSrvConn* pConn) { +static void uvHandleReq(SSvrConn* pConn) { SConnBuffer* pBuf = &pConn->readBuf; char* msg = pBuf->buf; uint32_t msgLen = pBuf->len; @@ -316,7 +316,7 @@ static void uvHandleReq(SSrvConn* pConn) { void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { // opt - SSrvConn* conn = cli->data; + SSvrConn* conn = cli->data; SConnBuffer* pBuf = &conn->readBuf; if (nread > 0) { pBuf->len += nread; @@ -354,17 +354,17 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b void uvOnTimeoutCb(uv_timer_t* handle) { // opt - SSrvConn* pConn = handle->data; + SSvrConn* pConn = handle->data; tError("server conn %p time out", pConn); } void uvOnSendCb(uv_write_t* req, int status) { - SSrvConn* conn = req->data; + SSvrConn* conn = req->data; // transClearBuffer(&conn->readBuf); if (status == 0) { tTrace("server conn %p data already was written on stream", conn); if (!transQueueEmpty(&conn->srvMsgs)) { - SSrvMsg* msg = transQueuePop(&conn->srvMsgs); + SSvrMsg* msg = transQueuePop(&conn->srvMsgs); // if (msg->type == Release && conn->status != ConnNormal) { // conn->status = ConnNormal; // transUnrefSrvHandle(conn); @@ -376,7 +376,7 @@ void uvOnSendCb(uv_write_t* req, int status) { destroySmsg(msg); // send second data, just use for push if (!transQueueEmpty(&conn->srvMsgs)) { - msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0); + msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0); if (msg->type == Register && conn->status == ConnAcquire) { conn->regArg.notifyCount = 0; conn->regArg.init = 1; @@ -389,7 +389,7 @@ void uvOnSendCb(uv_write_t* req, int status) { transQueuePop(&conn->srvMsgs); taosMemoryFree(msg); - msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0); + msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0); if (msg != NULL) { uvStartSendRespInternal(msg); } @@ -415,10 +415,10 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) { taosMemoryFree(req); } -static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { +static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) { tTrace("server conn %p prepare to send resp", smsg->pConn); - SSrvConn* pConn = smsg->pConn; + SSvrConn* pConn = smsg->pConn; STransMsg* pMsg = &smsg->msg; if (pMsg->pCont == 0) { pMsg->pCont = (void*)rpcMallocCont(0); @@ -455,17 +455,17 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) { wb->len = len; } -static void uvStartSendRespInternal(SSrvMsg* smsg) { +static void uvStartSendRespInternal(SSvrMsg* smsg) { uv_buf_t wb; uvPrepareSendData(smsg, &wb); - SSrvConn* pConn = smsg->pConn; + SSvrConn* pConn = smsg->pConn; // uv_timer_stop(&pConn->pTimer); uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb); } -static void uvStartSendResp(SSrvMsg* smsg) { +static void uvStartSendResp(SSvrMsg* smsg) { // impl - SSrvConn* pConn = smsg->pConn; + SSvrConn* pConn = smsg->pConn; if (pConn->broken == true) { // persist by @@ -485,7 +485,7 @@ static void uvStartSendResp(SSrvMsg* smsg) { return; } -static void destroySmsg(SSrvMsg* smsg) { +static void destroySmsg(SSvrMsg* smsg) { if (smsg == NULL) { return; } @@ -499,7 +499,7 @@ static void destroyAllConn(SWorkThrdObj* pThrd) { QUEUE_REMOVE(h); QUEUE_INIT(h); - SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue); + SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue); while (T_REF_VAL_GET(c) >= 2) { transUnrefSrvHandle(c); } @@ -509,7 +509,7 @@ static void destroyAllConn(SWorkThrdObj* pThrd) { void uvWorkerAsyncCb(uv_async_t* handle) { SAsyncItem* item = handle->data; SWorkThrdObj* pThrd = item->pThrd; - SSrvConn* conn = NULL; + SSvrConn* conn = NULL; queue wq; // batch process to avoid to lock/unlock frequently @@ -521,7 +521,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) { queue* head = QUEUE_HEAD(&wq); QUEUE_REMOVE(head); - SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q); + SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q); if (msg == NULL) { tError("unexcept occurred, continue"); continue; @@ -649,7 +649,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) { uv_handle_type pending = uv_pipe_pending_type(pipe); assert(pending == UV_TCP); - SSrvConn* pConn = createConn(pThrd); + SSvrConn* pConn = createConn(pThrd); pConn->pTransInst = pThrd->pTransInst; /* init conn timer*/ @@ -768,10 +768,10 @@ void* transWorkerThread(void* arg) { return NULL; } -static SSrvConn* createConn(void* hThrd) { +static SSvrConn* createConn(void* hThrd) { SWorkThrdObj* pThrd = hThrd; - SSrvConn* pConn = (SSrvConn*)taosMemoryCalloc(1, sizeof(SSrvConn)); + SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn)); QUEUE_INIT(&pConn->queue); QUEUE_PUSH(&pThrd->conn, &pConn->queue); @@ -794,7 +794,7 @@ static SSrvConn* createConn(void* hThrd) { return pConn; } -static void destroyConn(SSrvConn* conn, bool clear) { +static void destroyConn(SSvrConn* conn, bool clear) { if (conn == NULL) { return; } @@ -808,13 +808,13 @@ static void destroyConn(SSrvConn* conn, bool clear) { // uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb); } } -static void destroyConnRegArg(SSrvConn* conn) { +static void destroyConnRegArg(SSvrConn* conn) { if (conn->regArg.init == 1) { transFreeMsg(conn->regArg.msg.pCont); conn->regArg.init = 0; } } -static int reallocConnRefHandle(SSrvConn* conn) { +static int reallocConnRefHandle(SSvrConn* conn) { uvReleaseExHandle(conn->refId); uvRemoveExHandle(conn->refId); // avoid app continue to send msg on invalid handle @@ -828,7 +828,7 @@ static int reallocConnRefHandle(SSrvConn* conn) { return 0; } static void uvDestroyConn(uv_handle_t* handle) { - SSrvConn* conn = handle->data; + SSvrConn* conn = handle->data; if (conn == NULL) { return; } @@ -884,7 +884,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads, uv_loop_init(srv->loop); taosThreadOnce(&transModuleInit, uvInitEnv); - transSrvInst++; + tranSSvrInst++; assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0)); #ifdef WINDOWS @@ -981,7 +981,7 @@ void uvDestoryExHandle(void* handle) { taosMemoryFree(handle); } -void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { +void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd) { thrd->quit = true; if (QUEUE_IS_EMPTY(&thrd->conn)) { uv_walk(thrd->loop, uvWalkCb, NULL); @@ -990,8 +990,8 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) { } taosMemoryFree(msg); } -void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { - SSrvConn* conn = msg->pConn; +void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) { + SSvrConn* conn = msg->pConn; if (conn->status == ConnAcquire) { reallocConnRefHandle(conn); if (!transQueuePush(&conn->srvMsgs, msg)) { @@ -1004,13 +1004,13 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) { } destroySmsg(msg); } -void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) { +void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd) { // send msg to client tDebug("server conn %p start to send resp (2/2)", msg->pConn); uvStartSendResp(msg); } -void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) { - SSrvConn* conn = msg->pConn; +void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) { + SSvrConn* conn = msg->pConn; tDebug("server conn %p register brokenlink callback", conn); if (conn->status == ConnAcquire) { if (!transQueuePush(&conn->srvMsgs, msg)) { @@ -1036,13 +1036,13 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) { } taosThreadJoin(pThrd->thread, NULL); SRV_RELEASE_UV(pThrd->loop); - TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSrvMsg, destroySmsg); + TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg); transDestroyAsyncPool(pThrd->asyncPool); taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd); } void sendQuitToWorkThrd(SWorkThrdObj* pThrd) { - SSrvMsg* msg = taosMemoryCalloc(1, sizeof(SSrvMsg)); + SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg)); msg->type = Quit; tDebug("server send quit msg to work thread"); transSendAsync(pThrd->asyncPool, &msg->q); @@ -1075,8 +1075,8 @@ void transCloseServer(void* arg) { taosMemoryFree(srv); - transSrvInst--; - if (transSrvInst == 0) { + tranSSvrInst--; + if (tranSSvrInst == 0) { TdThreadOnce tmpInit = PTHREAD_ONCE_INIT; memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce)); uvCloseExHandleMgt(); @@ -1087,7 +1087,7 @@ void transRefSrvHandle(void* handle) { if (handle == NULL) { return; } - int ref = T_REF_INC((SSrvConn*)handle); + int ref = T_REF_INC((SSvrConn*)handle); tDebug("server conn %p ref count: %d", handle, ref); } @@ -1095,10 +1095,10 @@ void transUnrefSrvHandle(void* handle) { if (handle == NULL) { return; } - int ref = T_REF_DEC((SSrvConn*)handle); + int ref = T_REF_DEC((SSvrConn*)handle); tDebug("server conn %p ref count: %d", handle, ref); if (ref == 0) { - destroyConn((SSrvConn*)handle, true); + destroyConn((SSvrConn*)handle, true); } } @@ -1113,12 +1113,12 @@ void transReleaseSrvHandle(void* handle) { STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId}; - SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); - srvMsg->msg = tmsg; - srvMsg->type = Release; + SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); + m->msg = tmsg; + m->type = Release; tTrace("server conn %p start to release", exh->handle); - transSendAsync(pThrd->asyncPool, &srvMsg->q); + transSendAsync(pThrd->asyncPool, &m->q); uvReleaseExHandle(refId); return; _return1: @@ -1141,11 +1141,11 @@ void transSendResponse(const STransMsg* msg) { SWorkThrdObj* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); - SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); - srvMsg->msg = tmsg; - srvMsg->type = Normal; + SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); + m->msg = tmsg; + m->type = Normal; tDebug("server conn %p start to send resp (1/2)", exh->handle); - transSendAsync(pThrd->asyncPool, &srvMsg->q); + transSendAsync(pThrd->asyncPool, &m->q); uvReleaseExHandle(refId); return; _return1: @@ -1169,11 +1169,11 @@ void transRegisterMsg(const STransMsg* msg) { SWorkThrdObj* pThrd = exh->pThrd; ASYNC_ERR_JRET(pThrd); - SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); - srvMsg->msg = tmsg; - srvMsg->type = Register; + SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg)); + m->msg = tmsg; + m->type = Register; tTrace("server conn %p start to register brokenlink callback", exh->handle); - transSendAsync(pThrd->asyncPool, &srvMsg->q); + transSendAsync(pThrd->asyncPool, &m->q); uvReleaseExHandle(refId); return; @@ -1193,7 +1193,7 @@ int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) { return -1; } SExHandle* ex = thandle; - SSrvConn* pConn = ex->handle; + SSvrConn* pConn = ex->handle; struct sockaddr_in addr = pConn->addr; pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);