enh: refactor rpc code

This commit is contained in:
yihaoDeng 2022-05-28 21:10:09 +08:00
parent ddab675207
commit 1d2e13c985
2 changed files with 89 additions and 91 deletions

View File

@ -4510,8 +4510,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
qDebug("%s pDataReader is not NULL", GET_TASKID(pTaskInfo));
}
SArray* tableIdList = extractTableIdList(pTableListInfo);
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode, pTaskInfo);
SArray* tableIdList = extractTableIdList(pTableListInfo);
SOperatorInfo* pOperator =
createStreamScanOperatorInfo(pDataReader, pHandle, tableIdList, pTableScanNode, pTaskInfo);
taosArrayDestroy(tableIdList);
return pOperator;
@ -4699,9 +4700,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
}
int32_t compareTimeWindow(const void* p1, const void* p2, const void* param) {
const SQueryTableDataCond *pCond = param;
const STimeWindow *pWin1 = p1;
const STimeWindow *pWin2 = p2;
const SQueryTableDataCond* pCond = param;
const STimeWindow* pWin1 = p1;
const STimeWindow* pWin2 = p2;
if (pCond->order == TSDB_ORDER_ASC) {
return pWin1->skey - pWin2->skey;
} else if (pCond->order == TSDB_ORDER_DESC) {
@ -4721,8 +4722,8 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
return terrno;
}
//pCond->twindow = pTableScanNode->scanRange;
//TODO: get it from stable scan node
// pCond->twindow = pTableScanNode->scanRange;
// TODO: get it from stable scan node
pCond->numOfTWindows = 1;
pCond->twindows = taosMemoryCalloc(pCond->numOfTWindows, sizeof(STimeWindow));
pCond->twindows[0] = pTableScanNode->scanRange;
@ -4743,11 +4744,7 @@ int32_t initQueryTableDataCond(SQueryTableDataCond* pCond, const STableScanPhysi
TSWAP(pCond->twindows[i].skey, pCond->twindows[i].ekey);
}
}
taosqsort(pCond->twindows,
pCond->numOfTWindows,
sizeof(STimeWindow),
pCond,
compareTimeWindow);
taosqsort(pCond->twindows, pCond->numOfTWindows, sizeof(STimeWindow), pCond, compareTimeWindow);
pCond->type = BLOCK_LOAD_OFFSET_SEQ_ORDER;
// pCond->type = pTableScanNode->scanFlag;
@ -4907,7 +4904,8 @@ SArray* extractColMatchInfo(SNodeList* pNodeList, SDataBlockDescNode* pOutputNod
return pList;
}
int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo, SNode* pTagCond) {
int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STableListInfo* pListInfo,
SNode* pTagCond) {
int32_t code = TSDB_CODE_SUCCESS;
pListInfo->pTableList = taosArrayInit(8, sizeof(STableKeyInfo));
@ -4918,12 +4916,12 @@ int32_t getTableList(void* metaHandle, int32_t tableType, uint64_t tableUid, STa
SArray* res = taosArrayInit(8, sizeof(uint64_t));
code = doFilterTag(pTagCond, &metaArg, res);
if (code != TSDB_CODE_SUCCESS) {
qError("doFilterTag error:%d", code);
qError("failed to get tableIds, reason: %s, suid: %" PRIu64 "", tstrerror(code), tableUid);
taosArrayDestroy(res);
terrno = code;
return code;
} else {
qDebug("doFilterTag error:%d, suid: %" PRIu64 "", code, tableUid);
qDebug("sucess to get tableIds, size: %d, suid: %" PRIu64 "", (int)taosArrayGetSize(res), tableUid);
}
for (int i = 0; i < taosArrayGetSize(res); i++) {
STableKeyInfo info = {.lastKey = TSKEY_INITIAL_VAL, .uid = *(uint64_t*)taosArrayGet(res, i)};

View File

@ -20,15 +20,15 @@
static TdThreadOnce transModuleInit = PTHREAD_ONCE_INIT;
static char* notify = "a";
static int transSrvInst = 0;
static int tranSSvrInst = 0;
typedef struct {
int notifyCount; //
int init; // init or not
STransMsg msg;
} SSrvRegArg;
} SSvrRegArg;
typedef struct SSrvConn {
typedef struct SSvrConn {
T_REF_DECLARE()
uv_tcp_t* pTcp;
uv_write_t pWriter;
@ -42,7 +42,7 @@ typedef struct SSrvConn {
void* hostThrd;
STransQueue srvMsgs;
SSrvRegArg regArg;
SSvrRegArg regArg;
bool broken; // conn broken;
ConnStatus status;
@ -55,14 +55,14 @@ typedef struct SSrvConn {
char user[TSDB_UNI_LEN]; // user ID for the link
char secret[TSDB_PASSWORD_LEN];
char ckey[TSDB_PASSWORD_LEN]; // ciphering key
} SSrvConn;
} SSvrConn;
typedef struct SSrvMsg {
SSrvConn* pConn;
typedef struct SSvrMsg {
SSvrConn* pConn;
STransMsg msg;
queue q;
STransMsgType type;
} SSrvMsg;
} SSvrMsg;
typedef struct SWorkThrdObj {
TdThread thread;
@ -127,25 +127,25 @@ static void uvWorkAfterTask(uv_work_t* req, int status);
static void uvWalkCb(uv_handle_t* handle, void* arg);
static void uvFreeCb(uv_handle_t* handle);
static void uvStartSendRespInternal(SSrvMsg* smsg);
static void uvPrepareSendData(SSrvMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSrvMsg* msg);
static void uvStartSendRespInternal(SSvrMsg* smsg);
static void uvPrepareSendData(SSvrMsg* msg, uv_buf_t* wb);
static void uvStartSendResp(SSvrMsg* msg);
static void uvNotifyLinkBrokenToApp(SSrvConn* conn);
static void uvNotifyLinkBrokenToApp(SSvrConn* conn);
static void destroySmsg(SSrvMsg* smsg);
static void destroySmsg(SSvrMsg* smsg);
// check whether already read complete packet
static SSrvConn* createConn(void* hThrd);
static void destroyConn(SSrvConn* conn, bool clear /*clear handle or not*/);
static void destroyConnRegArg(SSrvConn* conn);
static SSvrConn* createConn(void* hThrd);
static void destroyConn(SSvrConn* conn, bool clear /*clear handle or not*/);
static void destroyConnRegArg(SSvrConn* conn);
static int reallocConnRefHandle(SSrvConn* conn);
static int reallocConnRefHandle(SSvrConn* conn);
static void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd);
static void (*transAsyncHandle[])(SSrvMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
static void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd);
static void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd);
static void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd);
static void (*transAsyncHandle[])(SSvrMsg* msg, SWorkThrdObj* thrd) = {uvHandleResp, uvHandleQuit, uvHandleRelease,
uvHandleRegister, NULL};
static int32_t exHandlesMgt;
@ -178,7 +178,7 @@ static bool addHandleToAcceptloop(void* arg);
tTrace("server conn %p received release request", conn); \
\
STransMsg tmsg = {.code = 0, .info.handle = (void*)conn, .info.ahandle = NULL}; \
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg)); \
SSvrMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSvrMsg)); \
srvMsg->msg = tmsg; \
srvMsg->type = Release; \
srvMsg->pConn = conn; \
@ -233,18 +233,18 @@ static bool addHandleToAcceptloop(void* arg);
} while (0)
void uvAllocRecvBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) {
SSrvConn* conn = handle->data;
SSvrConn* conn = handle->data;
SConnBuffer* pBuf = &conn->readBuf;
transAllocBuffer(pBuf, buf);
}
// refers specifically to query or insert timeout
static void uvHandleActivityTimeout(uv_timer_t* handle) {
SSrvConn* conn = handle->data;
SSvrConn* conn = handle->data;
tDebug("%p timeout since no activity", conn);
}
static void uvHandleReq(SSrvConn* pConn) {
static void uvHandleReq(SSvrConn* pConn) {
SConnBuffer* pBuf = &pConn->readBuf;
char* msg = pBuf->buf;
uint32_t msgLen = pBuf->len;
@ -316,7 +316,7 @@ static void uvHandleReq(SSrvConn* pConn) {
void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) {
// opt
SSrvConn* conn = cli->data;
SSvrConn* conn = cli->data;
SConnBuffer* pBuf = &conn->readBuf;
if (nread > 0) {
pBuf->len += nread;
@ -354,17 +354,17 @@ void uvAllocConnBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* b
void uvOnTimeoutCb(uv_timer_t* handle) {
// opt
SSrvConn* pConn = handle->data;
SSvrConn* pConn = handle->data;
tError("server conn %p time out", pConn);
}
void uvOnSendCb(uv_write_t* req, int status) {
SSrvConn* conn = req->data;
SSvrConn* conn = req->data;
// transClearBuffer(&conn->readBuf);
if (status == 0) {
tTrace("server conn %p data already was written on stream", conn);
if (!transQueueEmpty(&conn->srvMsgs)) {
SSrvMsg* msg = transQueuePop(&conn->srvMsgs);
SSvrMsg* msg = transQueuePop(&conn->srvMsgs);
// if (msg->type == Release && conn->status != ConnNormal) {
// conn->status = ConnNormal;
// transUnrefSrvHandle(conn);
@ -376,7 +376,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
destroySmsg(msg);
// send second data, just use for push
if (!transQueueEmpty(&conn->srvMsgs)) {
msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
if (msg->type == Register && conn->status == ConnAcquire) {
conn->regArg.notifyCount = 0;
conn->regArg.init = 1;
@ -389,7 +389,7 @@ void uvOnSendCb(uv_write_t* req, int status) {
transQueuePop(&conn->srvMsgs);
taosMemoryFree(msg);
msg = (SSrvMsg*)transQueueGet(&conn->srvMsgs, 0);
msg = (SSvrMsg*)transQueueGet(&conn->srvMsgs, 0);
if (msg != NULL) {
uvStartSendRespInternal(msg);
}
@ -415,10 +415,10 @@ static void uvOnPipeWriteCb(uv_write_t* req, int status) {
taosMemoryFree(req);
}
static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
static void uvPrepareSendData(SSvrMsg* smsg, uv_buf_t* wb) {
tTrace("server conn %p prepare to send resp", smsg->pConn);
SSrvConn* pConn = smsg->pConn;
SSvrConn* pConn = smsg->pConn;
STransMsg* pMsg = &smsg->msg;
if (pMsg->pCont == 0) {
pMsg->pCont = (void*)rpcMallocCont(0);
@ -455,17 +455,17 @@ static void uvPrepareSendData(SSrvMsg* smsg, uv_buf_t* wb) {
wb->len = len;
}
static void uvStartSendRespInternal(SSrvMsg* smsg) {
static void uvStartSendRespInternal(SSvrMsg* smsg) {
uv_buf_t wb;
uvPrepareSendData(smsg, &wb);
SSrvConn* pConn = smsg->pConn;
SSvrConn* pConn = smsg->pConn;
// uv_timer_stop(&pConn->pTimer);
uv_write(&pConn->pWriter, (uv_stream_t*)pConn->pTcp, &wb, 1, uvOnSendCb);
}
static void uvStartSendResp(SSrvMsg* smsg) {
static void uvStartSendResp(SSvrMsg* smsg) {
// impl
SSrvConn* pConn = smsg->pConn;
SSvrConn* pConn = smsg->pConn;
if (pConn->broken == true) {
// persist by
@ -485,7 +485,7 @@ static void uvStartSendResp(SSrvMsg* smsg) {
return;
}
static void destroySmsg(SSrvMsg* smsg) {
static void destroySmsg(SSvrMsg* smsg) {
if (smsg == NULL) {
return;
}
@ -499,7 +499,7 @@ static void destroyAllConn(SWorkThrdObj* pThrd) {
QUEUE_REMOVE(h);
QUEUE_INIT(h);
SSrvConn* c = QUEUE_DATA(h, SSrvConn, queue);
SSvrConn* c = QUEUE_DATA(h, SSvrConn, queue);
while (T_REF_VAL_GET(c) >= 2) {
transUnrefSrvHandle(c);
}
@ -509,7 +509,7 @@ static void destroyAllConn(SWorkThrdObj* pThrd) {
void uvWorkerAsyncCb(uv_async_t* handle) {
SAsyncItem* item = handle->data;
SWorkThrdObj* pThrd = item->pThrd;
SSrvConn* conn = NULL;
SSvrConn* conn = NULL;
queue wq;
// batch process to avoid to lock/unlock frequently
@ -521,7 +521,7 @@ void uvWorkerAsyncCb(uv_async_t* handle) {
queue* head = QUEUE_HEAD(&wq);
QUEUE_REMOVE(head);
SSrvMsg* msg = QUEUE_DATA(head, SSrvMsg, q);
SSvrMsg* msg = QUEUE_DATA(head, SSvrMsg, q);
if (msg == NULL) {
tError("unexcept occurred, continue");
continue;
@ -649,7 +649,7 @@ void uvOnConnectionCb(uv_stream_t* q, ssize_t nread, const uv_buf_t* buf) {
uv_handle_type pending = uv_pipe_pending_type(pipe);
assert(pending == UV_TCP);
SSrvConn* pConn = createConn(pThrd);
SSvrConn* pConn = createConn(pThrd);
pConn->pTransInst = pThrd->pTransInst;
/* init conn timer*/
@ -768,10 +768,10 @@ void* transWorkerThread(void* arg) {
return NULL;
}
static SSrvConn* createConn(void* hThrd) {
static SSvrConn* createConn(void* hThrd) {
SWorkThrdObj* pThrd = hThrd;
SSrvConn* pConn = (SSrvConn*)taosMemoryCalloc(1, sizeof(SSrvConn));
SSvrConn* pConn = (SSvrConn*)taosMemoryCalloc(1, sizeof(SSvrConn));
QUEUE_INIT(&pConn->queue);
QUEUE_PUSH(&pThrd->conn, &pConn->queue);
@ -794,7 +794,7 @@ static SSrvConn* createConn(void* hThrd) {
return pConn;
}
static void destroyConn(SSrvConn* conn, bool clear) {
static void destroyConn(SSvrConn* conn, bool clear) {
if (conn == NULL) {
return;
}
@ -808,13 +808,13 @@ static void destroyConn(SSrvConn* conn, bool clear) {
// uv_shutdown(req, (uv_stream_t*)conn->pTcp, uvShutDownCb);
}
}
static void destroyConnRegArg(SSrvConn* conn) {
static void destroyConnRegArg(SSvrConn* conn) {
if (conn->regArg.init == 1) {
transFreeMsg(conn->regArg.msg.pCont);
conn->regArg.init = 0;
}
}
static int reallocConnRefHandle(SSrvConn* conn) {
static int reallocConnRefHandle(SSvrConn* conn) {
uvReleaseExHandle(conn->refId);
uvRemoveExHandle(conn->refId);
// avoid app continue to send msg on invalid handle
@ -828,7 +828,7 @@ static int reallocConnRefHandle(SSrvConn* conn) {
return 0;
}
static void uvDestroyConn(uv_handle_t* handle) {
SSrvConn* conn = handle->data;
SSvrConn* conn = handle->data;
if (conn == NULL) {
return;
}
@ -884,7 +884,7 @@ void* transInitServer(uint32_t ip, uint32_t port, char* label, int numOfThreads,
uv_loop_init(srv->loop);
taosThreadOnce(&transModuleInit, uvInitEnv);
transSrvInst++;
tranSSvrInst++;
assert(0 == uv_pipe_init(srv->loop, &srv->pipeListen, 0));
#ifdef WINDOWS
@ -981,7 +981,7 @@ void uvDestoryExHandle(void* handle) {
taosMemoryFree(handle);
}
void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
void uvHandleQuit(SSvrMsg* msg, SWorkThrdObj* thrd) {
thrd->quit = true;
if (QUEUE_IS_EMPTY(&thrd->conn)) {
uv_walk(thrd->loop, uvWalkCb, NULL);
@ -990,8 +990,8 @@ void uvHandleQuit(SSrvMsg* msg, SWorkThrdObj* thrd) {
}
taosMemoryFree(msg);
}
void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
SSrvConn* conn = msg->pConn;
void uvHandleRelease(SSvrMsg* msg, SWorkThrdObj* thrd) {
SSvrConn* conn = msg->pConn;
if (conn->status == ConnAcquire) {
reallocConnRefHandle(conn);
if (!transQueuePush(&conn->srvMsgs, msg)) {
@ -1004,13 +1004,13 @@ void uvHandleRelease(SSrvMsg* msg, SWorkThrdObj* thrd) {
}
destroySmsg(msg);
}
void uvHandleResp(SSrvMsg* msg, SWorkThrdObj* thrd) {
void uvHandleResp(SSvrMsg* msg, SWorkThrdObj* thrd) {
// send msg to client
tDebug("server conn %p start to send resp (2/2)", msg->pConn);
uvStartSendResp(msg);
}
void uvHandleRegister(SSrvMsg* msg, SWorkThrdObj* thrd) {
SSrvConn* conn = msg->pConn;
void uvHandleRegister(SSvrMsg* msg, SWorkThrdObj* thrd) {
SSvrConn* conn = msg->pConn;
tDebug("server conn %p register brokenlink callback", conn);
if (conn->status == ConnAcquire) {
if (!transQueuePush(&conn->srvMsgs, msg)) {
@ -1036,13 +1036,13 @@ void destroyWorkThrd(SWorkThrdObj* pThrd) {
}
taosThreadJoin(pThrd->thread, NULL);
SRV_RELEASE_UV(pThrd->loop);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSrvMsg, destroySmsg);
TRANS_DESTROY_ASYNC_POOL_MSG(pThrd->asyncPool, SSvrMsg, destroySmsg);
transDestroyAsyncPool(pThrd->asyncPool);
taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd);
}
void sendQuitToWorkThrd(SWorkThrdObj* pThrd) {
SSrvMsg* msg = taosMemoryCalloc(1, sizeof(SSrvMsg));
SSvrMsg* msg = taosMemoryCalloc(1, sizeof(SSvrMsg));
msg->type = Quit;
tDebug("server send quit msg to work thread");
transSendAsync(pThrd->asyncPool, &msg->q);
@ -1075,8 +1075,8 @@ void transCloseServer(void* arg) {
taosMemoryFree(srv);
transSrvInst--;
if (transSrvInst == 0) {
tranSSvrInst--;
if (tranSSvrInst == 0) {
TdThreadOnce tmpInit = PTHREAD_ONCE_INIT;
memcpy(&transModuleInit, &tmpInit, sizeof(TdThreadOnce));
uvCloseExHandleMgt();
@ -1087,7 +1087,7 @@ void transRefSrvHandle(void* handle) {
if (handle == NULL) {
return;
}
int ref = T_REF_INC((SSrvConn*)handle);
int ref = T_REF_INC((SSvrConn*)handle);
tDebug("server conn %p ref count: %d", handle, ref);
}
@ -1095,10 +1095,10 @@ void transUnrefSrvHandle(void* handle) {
if (handle == NULL) {
return;
}
int ref = T_REF_DEC((SSrvConn*)handle);
int ref = T_REF_DEC((SSvrConn*)handle);
tDebug("server conn %p ref count: %d", handle, ref);
if (ref == 0) {
destroyConn((SSrvConn*)handle, true);
destroyConn((SSvrConn*)handle, true);
}
}
@ -1113,12 +1113,12 @@ void transReleaseSrvHandle(void* handle) {
STransMsg tmsg = {.code = 0, .info.handle = exh, .info.ahandle = NULL, .info.refId = refId};
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->msg = tmsg;
srvMsg->type = Release;
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
m->msg = tmsg;
m->type = Release;
tTrace("server conn %p start to release", exh->handle);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
transSendAsync(pThrd->asyncPool, &m->q);
uvReleaseExHandle(refId);
return;
_return1:
@ -1141,11 +1141,11 @@ void transSendResponse(const STransMsg* msg) {
SWorkThrdObj* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd);
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->msg = tmsg;
srvMsg->type = Normal;
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
m->msg = tmsg;
m->type = Normal;
tDebug("server conn %p start to send resp (1/2)", exh->handle);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
transSendAsync(pThrd->asyncPool, &m->q);
uvReleaseExHandle(refId);
return;
_return1:
@ -1169,11 +1169,11 @@ void transRegisterMsg(const STransMsg* msg) {
SWorkThrdObj* pThrd = exh->pThrd;
ASYNC_ERR_JRET(pThrd);
SSrvMsg* srvMsg = taosMemoryCalloc(1, sizeof(SSrvMsg));
srvMsg->msg = tmsg;
srvMsg->type = Register;
SSvrMsg* m = taosMemoryCalloc(1, sizeof(SSvrMsg));
m->msg = tmsg;
m->type = Register;
tTrace("server conn %p start to register brokenlink callback", exh->handle);
transSendAsync(pThrd->asyncPool, &srvMsg->q);
transSendAsync(pThrd->asyncPool, &m->q);
uvReleaseExHandle(refId);
return;
@ -1193,7 +1193,7 @@ int transGetConnInfo(void* thandle, STransHandleInfo* pInfo) {
return -1;
}
SExHandle* ex = thandle;
SSrvConn* pConn = ex->handle;
SSvrConn* pConn = ex->handle;
struct sockaddr_in addr = pConn->addr;
pInfo->clientIp = (uint32_t)(addr.sin_addr.s_addr);