Merge pull request #10021 from taosdata/feature/fetch

for fetch rpc client
This commit is contained in:
Shengliang Guan 2022-01-26 09:31:21 +08:00 committed by GitHub
commit cd1e59a9c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 41 additions and 13 deletions

View File

@ -24,6 +24,7 @@ extern "C" {
typedef void* qTaskInfo_t; typedef void* qTaskInfo_t;
typedef void* DataSinkHandle; typedef void* DataSinkHandle;
struct SRpcMsg;
struct SSubplan; struct SSubplan;
/** /**
@ -208,6 +209,8 @@ void** qReleaseTask(void* pMgmt, void* pQInfo, bool freeHandle);
*/ */
void** qDeregisterQInfo(void* pMgmt, void* pQInfo); void** qDeregisterQInfo(void* pMgmt, void* pQInfo);
void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -49,9 +49,10 @@ typedef struct {
} SQWorkerStat; } SQWorkerStat;
typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *); typedef int32_t (*putReqToQueryQFp)(void *, struct SRpcMsg *);
typedef int32_t (*sendReqToDnodeFp)(void *, struct SEpSet *, struct SRpcMsg *);
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj,
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp); putReqToQueryQFp fp1, sendReqToDnodeFp fp2);
int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
@ -65,6 +66,8 @@ int32_t qWorkerProcessStatusMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);
int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg); int32_t qWorkerProcessDropMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg);

View File

@ -289,6 +289,7 @@ int32_t dndInit(const SDnodeEnvCfg *pCfg) {
.charset = pCfg->charset, .charset = pCfg->charset,
.nthreads = pCfg->numOfCommitThreads, .nthreads = pCfg->numOfCommitThreads,
.putReqToVQueryQFp = dndPutReqToVQueryQ, .putReqToVQueryQFp = dndPutReqToVQueryQ,
.sendReqToDnodeFp = dndSendReqToDnode
}; };
if (vnodeInit(&vnodeOpt) != 0) { if (vnodeInit(&vnodeOpt) != 0) {

View File

@ -122,6 +122,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_CONTINUE)] = dndProcessVnodeQueryMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_QUERY_CONTINUE)] = dndProcessVnodeQueryMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_FETCH_RSP)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_ALTER_TABLE)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_UPDATE_TAG_VAL)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_TABLE_META)] = dndProcessVnodeFetchMsg;

View File

@ -33,6 +33,7 @@ extern "C" {
typedef struct SVnode SVnode; typedef struct SVnode SVnode;
typedef struct SDnode SDnode; typedef struct SDnode SDnode;
typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq); typedef int32_t (*PutReqToVQueryQFp)(SDnode *pDnode, struct SRpcMsg *pReq);
typedef int32_t (*SendReqToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg);
typedef struct STqCfg { typedef struct STqCfg {
// TODO // TODO
@ -64,6 +65,7 @@ typedef struct {
const char *charset; const char *charset;
uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO) uint16_t nthreads; // number of commit threads. 0 for no threads and a schedule queue should be given (TODO)
PutReqToVQueryQFp putReqToVQueryQFp; PutReqToVQueryQFp putReqToVQueryQFp;
SendReqToDnodeFp sendReqToDnodeFp;
} SVnodeOpt; } SVnodeOpt;
typedef struct STqReadHandle { typedef struct STqReadHandle {

View File

@ -55,6 +55,7 @@ typedef struct SVnodeMgr {
// For vnode Mgmt // For vnode Mgmt
SDnode* pDnode; SDnode* pDnode;
PutReqToVQueryQFp putReqToVQueryQFp; PutReqToVQueryQFp putReqToVQueryQFp;
SendReqToDnodeFp sendReqToDnodeFp;
} SVnodeMgr; } SVnodeMgr;
extern SVnodeMgr vnodeMgr; extern SVnodeMgr vnodeMgr;
@ -85,6 +86,7 @@ struct SVnode {
int vnodeScheduleTask(SVnodeTask* task); int vnodeScheduleTask(SVnodeTask* task);
int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq); int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq);
void vnodeSendReqToDnode(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq);
// For Log // For Log
extern int32_t vDebugFlag; extern int32_t vDebugFlag;

View File

@ -26,6 +26,7 @@ int vnodeInit(const SVnodeOpt *pOption) {
vnodeMgr.stop = false; vnodeMgr.stop = false;
vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp; vnodeMgr.putReqToVQueryQFp = pOption->putReqToVQueryQFp;
vnodeMgr.sendReqToDnodeFp = pOption->sendReqToDnodeFp;
// Start commit handers // Start commit handers
if (pOption->nthreads > 0) { if (pOption->nthreads > 0) {
@ -96,6 +97,10 @@ int32_t vnodePutReqToVQueryQ(SVnode* pVnode, struct SRpcMsg* pReq) {
return (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq); return (*vnodeMgr.putReqToVQueryQFp)(pVnode->pDnode, pReq);
} }
void vnodeSendReqToDnode(SVnode* pVnode, struct SEpSet* epSet, struct SRpcMsg* pReq) {
(*vnodeMgr.sendReqToDnodeFp)(pVnode->pDnode, epSet, pReq);
}
/* ------------------------ STATIC METHODS ------------------------ */ /* ------------------------ STATIC METHODS ------------------------ */
static void* loop(void* arg) { static void* loop(void* arg) {
setThreadName("vnode-commit"); setThreadName("vnode-commit");

View File

@ -21,7 +21,7 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg);
int vnodeQueryOpen(SVnode *pVnode) { int vnodeQueryOpen(SVnode *pVnode) {
return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode, return qWorkerInit(NODE_TYPE_VNODE, pVnode->vgId, NULL, (void **)&pVnode->pQuery, pVnode,
(putReqToQueryQFp)vnodePutReqToVQueryQ); (putReqToQueryQFp)vnodePutReqToVQueryQ, (sendReqToDnodeFp)vnodeSendReqToDnode);
} }
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
@ -43,6 +43,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
case TDMT_VND_FETCH: case TDMT_VND_FETCH:
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_FETCH_RSP:
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_RES_READY: case TDMT_VND_RES_READY:
return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg); return qWorkerProcessReadyMsg(pVnode, pVnode->pQuery, pMsg);
case TDMT_VND_TASKS_STATUS: case TDMT_VND_TASKS_STATUS:

View File

@ -5115,7 +5115,7 @@ static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
tfree(pMsgBody); tfree(pMsgBody);
} }
void processRspMsg(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) { void qProcessFetchRsp(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle; SMsgSendInfo *pSendInfo = (SMsgSendInfo *) pMsg->ahandle;
assert(pMsg->ahandle != NULL); assert(pMsg->ahandle != NULL);
@ -5296,13 +5296,14 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
pOperator->exec = doLoadRemoteData; pOperator->exec = doLoadRemoteData;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
#if 1
{ // todo refactor { // todo refactor
SRpcInit rpcInit; SRpcInit rpcInit;
memset(&rpcInit, 0, sizeof(rpcInit)); memset(&rpcInit, 0, sizeof(rpcInit));
rpcInit.localPort = 0; rpcInit.localPort = 0;
rpcInit.label = "EX"; rpcInit.label = "EX";
rpcInit.numOfThreads = 1; rpcInit.numOfThreads = 1;
rpcInit.cfp = processRspMsg; rpcInit.cfp = qProcessFetchRsp;
rpcInit.sessions = tsMaxConnections; rpcInit.sessions = tsMaxConnections;
rpcInit.connType = TAOS_CONN_CLIENT; rpcInit.connType = TAOS_CONN_CLIENT;
rpcInit.user = (char *)"root"; rpcInit.user = (char *)"root";
@ -5316,7 +5317,7 @@ SOperatorInfo* createExchangeOperatorInfo(const SArray* pSources, const SArray*
return NULL; // todo return NULL; // todo
} }
} }
#endif
return pOperator; return pOperator;
} }

View File

@ -137,6 +137,7 @@ typedef struct SQWorkerMgmt {
SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx
void *nodeObj; void *nodeObj;
putReqToQueryQFp putToQueueFp; putReqToQueryQFp putToQueueFp;
sendReqToDnodeFp sendReqFp;
} SQWorkerMgmt; } SQWorkerMgmt;
#define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId #define QW_FPARAMS_DEF SQWorkerMgmt *mgmt, uint64_t sId, uint64_t qId, uint64_t tId

View File

@ -1313,8 +1313,9 @@ _return:
QW_RET(code); QW_RET(code);
} }
int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj, putReqToQueryQFp fp) { int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qWorkerMgmt, void *nodeObj,
if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp) { putReqToQueryQFp fp1, sendReqToDnodeFp fp2) {
if (NULL == qWorkerMgmt || NULL == nodeObj || NULL == fp1 || NULL == fp2) {
qError("invalid param to init qworker"); qError("invalid param to init qworker");
QW_RET(TSDB_CODE_QRY_INVALID_INPUT); QW_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
@ -1361,7 +1362,8 @@ int32_t qWorkerInit(int8_t nodeType, int32_t nodeId, SQWorkerCfg *cfg, void **qW
mgmt->nodeType = nodeType; mgmt->nodeType = nodeType;
mgmt->nodeId = nodeId; mgmt->nodeId = nodeId;
mgmt->nodeObj = nodeObj; mgmt->nodeObj = nodeObj;
mgmt->putToQueueFp = fp; mgmt->putToQueueFp = fp1;
mgmt->sendReqFp = fp2;
*qWorkerMgmt = mgmt; *qWorkerMgmt = mgmt;

View File

@ -421,6 +421,11 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t qWorkerProcessFetchRsp(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
qProcessFetchRsp(NULL, pMsg, NULL);
return TSDB_CODE_SUCCESS;
}
int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { int32_t qWorkerProcessCancelMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) {
if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) { if (NULL == node || NULL == qWorkerMgmt || NULL == pMsg) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;

View File

@ -1081,7 +1081,7 @@ TEST(rcTest, shortExecshortDelay) {
qwtTestStop = false; qwtTestStop = false;
qwtTestQuitThreadNum = 0; qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0; qwtTestMaxExecTaskUsec = 0;
@ -1162,7 +1162,7 @@ TEST(rcTest, longExecshortDelay) {
qwtTestStop = false; qwtTestStop = false;
qwtTestQuitThreadNum = 0; qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 1000000; qwtTestMaxExecTaskUsec = 1000000;
@ -1245,7 +1245,7 @@ TEST(rcTest, shortExeclongDelay) {
qwtTestStop = false; qwtTestStop = false;
qwtTestQuitThreadNum = 0; qwtTestQuitThreadNum = 0;
code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue, NULL);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
qwtTestMaxExecTaskUsec = 0; qwtTestMaxExecTaskUsec = 0;