Merge pull request #14600 from taosdata/fix/dnode
refactor: redirect vnode msg
This commit is contained in:
commit
c1667cdd84
|
@ -209,6 +209,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_CONTINUE, "query-continue", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_QUERY_HEARTBEAT, "query-heartbeat", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_FETCH, "fetch", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_FETCH, "fetch", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_SCH_MERGE_FETCH, "merge-fetch", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "cancel-task", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_CANCEL_TASK, "cancel-task", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_DROP_TASK, "drop-task", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_SCH_EXPLAIN, "explain", NULL, NULL)
|
||||||
|
|
|
@ -353,6 +353,7 @@ typedef struct SDownstreamSourceNode {
|
||||||
uint64_t taskId;
|
uint64_t taskId;
|
||||||
uint64_t schedId;
|
uint64_t schedId;
|
||||||
int32_t execId;
|
int32_t execId;
|
||||||
|
int32_t fetchMsgType;
|
||||||
} SDownstreamSourceNode;
|
} SDownstreamSourceNode;
|
||||||
|
|
||||||
typedef struct SExchangePhysiNode {
|
typedef struct SExchangePhysiNode {
|
||||||
|
|
|
@ -102,7 +102,7 @@ void closeTransporter(SAppInstInfo *pAppInfo) {
|
||||||
|
|
||||||
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
|
static bool clientRpcRfp(int32_t code, tmsg_t msgType) {
|
||||||
if (NEED_REDIRECT_ERROR(code)) {
|
if (NEED_REDIRECT_ERROR(code)) {
|
||||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
|
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -214,6 +214,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, mmPutMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, mmPutMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_CREATE_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_STB_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -111,6 +111,7 @@ SArray *qmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, qmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH_RSP, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_HEARTBEAT, qmPutNodeMsgToFetchQueue, 1) == NULL) goto _OVER;
|
||||||
|
|
||||||
|
|
|
@ -328,6 +328,7 @@ SArray *vmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_QUERY, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_QUERY_CONTINUE, vmPutMsgToQueryQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_SCH_MERGE_FETCH, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -238,9 +238,9 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) {
|
||||||
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeMsg);
|
pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode->pImpl, (FItems)vnodeProposeWriteMsg);
|
||||||
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
|
pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue);
|
||||||
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyMsg);
|
pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->applyPool, pVnode->pImpl, (FItems)vnodeApplyWriteMsg);
|
||||||
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue);
|
||||||
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
|
pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue);
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,7 @@ static inline void dmBuildMnodeRedirectRsp(SDnode *pDnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
|
static inline void dmSendRedirectRsp(SRpcMsg *pMsg, const SEpSet *pNewEpSet) {
|
||||||
pMsg->info.hasEpSet = 1;
|
pMsg->info.hasEpSet = 1;
|
||||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType};
|
||||||
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
|
int32_t contLen = tSerializeSEpSet(NULL, 0, pNewEpSet);
|
||||||
|
|
||||||
rsp.pCont = rpcMallocCont(contLen);
|
rsp.pCont = rpcMallocCont(contLen);
|
||||||
|
@ -88,6 +88,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
|
||||||
case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
|
case TDMT_MND_SYSTABLE_RETRIEVE_RSP:
|
||||||
case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
|
case TDMT_DND_SYSTABLE_RETRIEVE_RSP:
|
||||||
case TDMT_SCH_FETCH_RSP:
|
case TDMT_SCH_FETCH_RSP:
|
||||||
|
case TDMT_SCH_MERGE_FETCH_RSP:
|
||||||
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
|
qWorkerProcessFetchRsp(NULL, NULL, pRpc, 0);
|
||||||
return;
|
return;
|
||||||
case TDMT_MND_STATUS_RSP:
|
case TDMT_MND_STATUS_RSP:
|
||||||
|
@ -253,7 +254,7 @@ static inline void dmReleaseHandle(SRpcHandleInfo *pHandle, int8_t type) {
|
||||||
static bool rpcRfp(int32_t code, tmsg_t msgType) {
|
static bool rpcRfp(int32_t code, tmsg_t msgType) {
|
||||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
|
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
|
||||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
|
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -531,7 +531,8 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) {
|
||||||
if (!IsReq(pMsg)) return 0;
|
if (!IsReq(pMsg)) return 0;
|
||||||
if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
|
if (pMsg->msgType == TDMT_SCH_QUERY || pMsg->msgType == TDMT_SCH_MERGE_QUERY ||
|
||||||
pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
|
pMsg->msgType == TDMT_SCH_QUERY_CONTINUE || pMsg->msgType == TDMT_SCH_QUERY_HEARTBEAT ||
|
||||||
pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_DROP_TASK) {
|
pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_SCH_MERGE_FETCH ||
|
||||||
|
pMsg->msgType == TDMT_SCH_DROP_TASK) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
|
if (mndAcquireRpcRef(pMsg->info.node) == 0) return 0;
|
||||||
|
|
|
@ -45,6 +45,7 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
|
||||||
code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
|
code = qWorkerProcessCQueryMsg(&handle, pMnode->pQuery, pMsg, 0);
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_FETCH:
|
case TDMT_SCH_FETCH:
|
||||||
|
case TDMT_SCH_MERGE_FETCH:
|
||||||
code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg, 0);
|
code = qWorkerProcessFetchMsg(pMnode, pMnode->pQuery, pMsg, 0);
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_DROP_TASK:
|
case TDMT_SCH_DROP_TASK:
|
||||||
|
@ -72,6 +73,7 @@ int32_t mndInitQuery(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg);
|
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_QUERY, mndProcessQueryMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg);
|
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_CONTINUE, mndProcessQueryMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg);
|
mndSetMsgHandle(pMnode, TDMT_SCH_FETCH, mndProcessQueryMsg);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg);
|
mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg);
|
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg);
|
||||||
|
|
||||||
|
|
|
@ -86,6 +86,7 @@ int32_t qndProcessQueryMsg(SQnode *pQnode, int64_t ts, SRpcMsg *pMsg) {
|
||||||
code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
|
code = qWorkerProcessCQueryMsg(&handle, pQnode->pQuery, pMsg, ts);
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_FETCH:
|
case TDMT_SCH_FETCH:
|
||||||
|
case TDMT_SCH_MERGE_FETCH:
|
||||||
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
|
code = qWorkerProcessFetchMsg(pQnode, pQnode->pQuery, pMsg, ts);
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_FETCH_RSP:
|
case TDMT_SCH_FETCH_RSP:
|
||||||
|
|
|
@ -51,15 +51,7 @@ int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
|
||||||
void vnodeDestroy(const char *path, STfs *pTfs);
|
void vnodeDestroy(const char *path, STfs *pTfs);
|
||||||
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
|
||||||
void vnodeClose(SVnode *pVnode);
|
void vnodeClose(SVnode *pVnode);
|
||||||
int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg);
|
|
||||||
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
|
||||||
int32_t vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
|
||||||
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
|
||||||
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
|
||||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
|
||||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
|
||||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
|
||||||
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
|
|
||||||
int32_t vnodeStart(SVnode *pVnode);
|
int32_t vnodeStart(SVnode *pVnode);
|
||||||
void vnodeStop(SVnode *pVnode);
|
void vnodeStop(SVnode *pVnode);
|
||||||
int64_t vnodeGetSyncHandle(SVnode *pVnode);
|
int64_t vnodeGetSyncHandle(SVnode *pVnode);
|
||||||
|
@ -68,14 +60,25 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
|
||||||
int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever);
|
int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever);
|
||||||
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
|
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
|
||||||
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
|
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
|
||||||
|
|
||||||
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
|
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
|
||||||
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
|
int32_t vnodeGetAllTableList(SVnode *pVnode, uint64_t uid, SArray *list);
|
||||||
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
|
int32_t vnodeGetCtbIdList(SVnode *pVnode, int64_t suid, SArray *list);
|
||||||
void *vnodeGetIdx(SVnode *pVnode);
|
void *vnodeGetIdx(SVnode *pVnode);
|
||||||
void *vnodeGetIvtIdx(SVnode *pVnode);
|
void *vnodeGetIvtIdx(SVnode *pVnode);
|
||||||
|
|
||||||
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
|
||||||
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
int32_t vnodeValidateTableHash(SVnode *pVnode, char *tableFName);
|
||||||
|
|
||||||
|
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
|
||||||
|
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp);
|
||||||
|
int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
|
||||||
|
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
|
||||||
|
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
|
||||||
|
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||||
|
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs);
|
||||||
|
|
||||||
// meta
|
// meta
|
||||||
typedef struct SMeta SMeta; // todo: remove
|
typedef struct SMeta SMeta; // todo: remove
|
||||||
|
|
|
@ -94,6 +94,8 @@ int32_t vnodeAsyncCommit(SVnode* pVnode);
|
||||||
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
||||||
void vnodeSyncStart(SVnode* pVnode);
|
void vnodeSyncStart(SVnode* pVnode);
|
||||||
void vnodeSyncClose(SVnode* pVnode);
|
void vnodeSyncClose(SVnode* pVnode);
|
||||||
|
void vnodeRedirectRpcMsg(SVnode* pVnode, SRpcMsg* pMsg);
|
||||||
|
bool vnodeIsLeader(SVnode* pVnode);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,7 @@ static int32_t vnodeProcessAlterHasnRangeReq(SVnode *pVnode, int64_t version, vo
|
||||||
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessDropTtlTbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int32_t vnodeProcessDeleteReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
|
||||||
int32_t vnodePreProcessReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
int32_t vnodePreProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SDecoder dc = {0};
|
SDecoder dc = {0};
|
||||||
|
|
||||||
|
@ -133,7 +133,7 @@ _err:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeProcessWriteReq(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
|
int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRpcMsg *pRsp) {
|
||||||
void *ptr = NULL;
|
void *ptr = NULL;
|
||||||
void *pReq;
|
void *pReq;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
|
@ -261,6 +261,11 @@ int32_t vnodePreprocessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
vTrace("message in vnode query queue is processing");
|
vTrace("message in vnode query queue is processing");
|
||||||
|
if ((pMsg->msgType == TDMT_SCH_QUERY) && !vnodeIsLeader(pVnode)) {
|
||||||
|
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
SReadHandle handle = {.meta = pVnode->pMeta, .config = &pVnode->config, .vnode = pVnode, .pMsgCb = &pVnode->msgCb};
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_SCH_QUERY:
|
case TDMT_SCH_QUERY:
|
||||||
|
@ -276,11 +281,18 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
||||||
vTrace("message in fetch queue is processing");
|
vTrace("message in fetch queue is processing");
|
||||||
|
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META || pMsg->msgType == TDMT_VND_TABLE_CFG)
|
||||||
|
&& !vnodeIsLeader(pVnode)) {
|
||||||
|
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char *msgstr = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||||
|
|
||||||
switch (pMsg->msgType) {
|
switch (pMsg->msgType) {
|
||||||
case TDMT_SCH_FETCH:
|
case TDMT_SCH_FETCH:
|
||||||
|
case TDMT_SCH_MERGE_FETCH:
|
||||||
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
return qWorkerProcessFetchMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||||
case TDMT_SCH_FETCH_RSP:
|
case TDMT_SCH_FETCH_RSP:
|
||||||
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
|
return qWorkerProcessFetchRsp(pVnode, pVnode->pQuery, pMsg, 0);
|
||||||
|
|
|
@ -120,7 +120,24 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
void vnodeRedirectRpcMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
SEpSet newEpSet = {0};
|
||||||
|
syncGetRetryEpSet(pVnode->sync, &newEpSet);
|
||||||
|
|
||||||
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", pVnode->config.vgId, pMsg,
|
||||||
|
newEpSet.numOfEps, newEpSet.inUse);
|
||||||
|
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
|
||||||
|
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", pVnode->config.vgId, pMsg, i, newEpSet.eps[i].fqdn,
|
||||||
|
newEpSet.eps[i].port);
|
||||||
|
}
|
||||||
|
pMsg->info.hasEpSet = 1;
|
||||||
|
|
||||||
|
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info, .msgType = pMsg->msgType + 1};
|
||||||
|
tmsgSendRedirectRsp(&rsp, &newEpSet);
|
||||||
|
}
|
||||||
|
|
||||||
|
void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SVnode *pVnode = pInfo->ahandle;
|
SVnode *pVnode = pInfo->ahandle;
|
||||||
int32_t vgId = pVnode->config.vgId;
|
int32_t vgId = pVnode->config.vgId;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -131,7 +148,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
|
vGTrace("vgId:%d, msg:%p get from vnode-write queue handle:%p", vgId, pMsg, pMsg->info.handle);
|
||||||
|
|
||||||
code = vnodePreProcessReq(pVnode, pMsg);
|
code = vnodePreProcessWriteMsg(pVnode, pMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
|
vError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr());
|
||||||
} else {
|
} else {
|
||||||
|
@ -141,7 +158,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
|
code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
|
||||||
if (code > 0) {
|
if (code > 0) {
|
||||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||||
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||||
rsp.code = terrno;
|
rsp.code = terrno;
|
||||||
vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
|
vError("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
|
||||||
}
|
}
|
||||||
|
@ -156,16 +173,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
|
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
|
||||||
} else if (code < 0) {
|
} else if (code < 0) {
|
||||||
if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
if (terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
||||||
SEpSet newEpSet = {0};
|
vnodeRedirectRpcMsg(pVnode, pMsg);
|
||||||
syncGetRetryEpSet(pVnode->sync, &newEpSet);
|
|
||||||
vGTrace("vgId:%d, msg:%p is redirect since not leader, numOfEps:%d inUse:%d", vgId, pMsg, newEpSet.numOfEps,
|
|
||||||
newEpSet.inUse);
|
|
||||||
for (int32_t i = 0; i < newEpSet.numOfEps; ++i) {
|
|
||||||
vGTrace("vgId:%d, msg:%p redirect:%d ep:%s:%u", vgId, pMsg, i, newEpSet.eps[i].fqdn, newEpSet.eps[i].port);
|
|
||||||
}
|
|
||||||
pMsg->info.hasEpSet = 1;
|
|
||||||
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
|
|
||||||
tmsgSendRedirectRsp(&rsp, &newEpSet);
|
|
||||||
} else {
|
} else {
|
||||||
if (terrno != 0) code = terrno;
|
if (terrno != 0) code = terrno;
|
||||||
vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
|
vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
|
||||||
|
@ -185,7 +193,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
vnodeWaitBlockMsg(pVnode);
|
vnodeWaitBlockMsg(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
SVnode *pVnode = pInfo->ahandle;
|
SVnode *pVnode = pInfo->ahandle;
|
||||||
int32_t vgId = pVnode->config.vgId;
|
int32_t vgId = pVnode->config.vgId;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -199,7 +207,7 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
|
|
||||||
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
|
||||||
if (rsp.code == 0) {
|
if (rsp.code == 0) {
|
||||||
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
if (vnodeProcessWriteMsg(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
|
||||||
rsp.code = terrno;
|
rsp.code = terrno;
|
||||||
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
|
vError("vgId:%d, msg:%p failed to apply since %s", vgId, pMsg, terrstr());
|
||||||
}
|
}
|
||||||
|
@ -513,3 +521,17 @@ void vnodeSyncStart(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
||||||
|
|
||||||
|
bool vnodeIsLeader(SVnode *pVnode) {
|
||||||
|
if (!syncIsReady(pVnode->sync)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// todo
|
||||||
|
// if (!pVnode->restored) {
|
||||||
|
// terrno = TSDB_CODE_APP_NOT_READY;
|
||||||
|
// return false;
|
||||||
|
// }
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
|
@ -2053,7 +2053,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInf
|
||||||
pMsgSendInfo->param = pWrapper;
|
pMsgSendInfo->param = pWrapper;
|
||||||
pMsgSendInfo->msgInfo.pData = pMsg;
|
pMsgSendInfo->msgInfo.pData = pMsg;
|
||||||
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
|
pMsgSendInfo->msgInfo.len = sizeof(SResFetchReq);
|
||||||
pMsgSendInfo->msgType = TDMT_SCH_FETCH;
|
pMsgSendInfo->msgType = pSource->fetchMsgType;
|
||||||
pMsgSendInfo->fp = loadRemoteDataCallback;
|
pMsgSendInfo->fp = loadRemoteDataCallback;
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
|
|
|
@ -549,7 +549,7 @@ int32_t udfdLoadUdf(char *udfName, SUdf *udf) {
|
||||||
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
|
static bool udfdRpcRfp(int32_t code, tmsg_t msgType) {
|
||||||
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
if (code == TSDB_CODE_RPC_REDIRECT || code == TSDB_CODE_RPC_NETWORK_UNAVAIL || code == TSDB_CODE_NODE_NOT_DEPLOYED ||
|
||||||
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
|
code == TSDB_CODE_SYN_NOT_LEADER || code == TSDB_CODE_APP_NOT_READY || code == TSDB_CODE_RPC_BROKEN_LINK) {
|
||||||
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH) {
|
if (msgType == TDMT_SCH_QUERY || msgType == TDMT_SCH_MERGE_QUERY || msgType == TDMT_SCH_FETCH || msgType == TDMT_SCH_MERGE_FETCH) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -595,6 +595,7 @@ static int32_t downstreamSourceCopy(const SDownstreamSourceNode* pSrc, SDownstre
|
||||||
COPY_SCALAR_FIELD(taskId);
|
COPY_SCALAR_FIELD(taskId);
|
||||||
COPY_SCALAR_FIELD(schedId);
|
COPY_SCALAR_FIELD(schedId);
|
||||||
COPY_SCALAR_FIELD(execId);
|
COPY_SCALAR_FIELD(execId);
|
||||||
|
COPY_SCALAR_FIELD(fetchMsgType);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -3538,6 +3538,7 @@ static const char* jkDownstreamSourceAddr = "Addr";
|
||||||
static const char* jkDownstreamSourceTaskId = "TaskId";
|
static const char* jkDownstreamSourceTaskId = "TaskId";
|
||||||
static const char* jkDownstreamSourceSchedId = "SchedId";
|
static const char* jkDownstreamSourceSchedId = "SchedId";
|
||||||
static const char* jkDownstreamSourceExecId = "ExecId";
|
static const char* jkDownstreamSourceExecId = "ExecId";
|
||||||
|
static const char* jkDownstreamSourceFetchMsgType = "FetchMsgType";
|
||||||
|
|
||||||
static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
|
static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj;
|
const SDownstreamSourceNode* pNode = (const SDownstreamSourceNode*)pObj;
|
||||||
|
@ -3552,6 +3553,9 @@ static int32_t downstreamSourceNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceExecId, pNode->execId);
|
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceExecId, pNode->execId);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkDownstreamSourceFetchMsgType, pNode->fetchMsgType);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -3569,6 +3573,9 @@ static int32_t jsonToDownstreamSourceNode(const SJson* pJson, void* pObj) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonGetIntValue(pJson, jkDownstreamSourceExecId, &pNode->execId);
|
code = tjsonGetIntValue(pJson, jkDownstreamSourceExecId, &pNode->execId);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonGetIntValue(pJson, jkDownstreamSourceFetchMsgType, &pNode->fetchMsgType);
|
||||||
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -123,6 +123,7 @@ typedef struct SQWTaskCtx {
|
||||||
int8_t taskType;
|
int8_t taskType;
|
||||||
int8_t explain;
|
int8_t explain;
|
||||||
int32_t queryType;
|
int32_t queryType;
|
||||||
|
int32_t fetchType;
|
||||||
int32_t execId;
|
int32_t execId;
|
||||||
|
|
||||||
bool queryFetched;
|
bool queryFetched;
|
||||||
|
|
|
@ -35,8 +35,7 @@ int32_t qwProcessDelete(QW_FPARAMS_DEF, SQWMsg *qwMsg, SDeleteRes *pRes);
|
||||||
|
|
||||||
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
|
int32_t qwBuildAndSendDropRsp(SRpcHandleInfo *pConn, int32_t code);
|
||||||
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
|
int32_t qwBuildAndSendCancelRsp(SRpcHandleInfo *pConn, int32_t code);
|
||||||
int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength,
|
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code);
|
||||||
int32_t code);
|
|
||||||
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
|
void qwBuildFetchRsp(void *msg, SOutputData *input, int32_t len, bool qComplete);
|
||||||
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
|
int32_t qwBuildAndSendCQueryMsg(QW_FPARAMS_DEF, SRpcHandleInfo *pConn);
|
||||||
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
|
int32_t qwBuildAndSendQueryRsp(int32_t rspType, SRpcHandleInfo *pConn, int32_t code, STbVerInfo* tbInfo);
|
||||||
|
|
|
@ -104,7 +104,7 @@ int32_t qwBuildAndSendHbRsp(SRpcHandleInfo *pConn, SSchedulerHbRsp *pStatus, int
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
|
int32_t qwBuildAndSendFetchRsp(int32_t rspType, SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, int32_t dataLength, int32_t code) {
|
||||||
if (NULL == pRsp) {
|
if (NULL == pRsp) {
|
||||||
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
pRsp = (SRetrieveTableRsp *)rpcMallocCont(sizeof(SRetrieveTableRsp));
|
||||||
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
memset(pRsp, 0, sizeof(SRetrieveTableRsp));
|
||||||
|
@ -112,7 +112,7 @@ int32_t qwBuildAndSendFetchRsp(SRpcHandleInfo *pConn, SRetrieveTableRsp *pRsp, i
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {
|
SRpcMsg rpcRsp = {
|
||||||
.msgType = TDMT_SCH_FETCH_RSP,
|
.msgType = rspType,
|
||||||
.pCont = pRsp,
|
.pCont = pRsp,
|
||||||
.contLen = sizeof(*pRsp) + dataLength,
|
.contLen = sizeof(*pRsp) + dataLength,
|
||||||
.code = code,
|
.code = code,
|
||||||
|
@ -436,7 +436,7 @@ int32_t qWorkerProcessFetchMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg, int
|
||||||
int64_t rId = 0;
|
int64_t rId = 0;
|
||||||
int32_t eId = msg->execId;
|
int32_t eId = msg->execId;
|
||||||
|
|
||||||
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info};
|
SQWMsg qwMsg = {.node = node, .msg = NULL, .msgLen = 0, .connInfo = pMsg->info, .msgType = pMsg->msgType};
|
||||||
|
|
||||||
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
|
QW_SCH_TASK_DLOG("processFetch start, node:%p, handle:%p", node, pMsg->info.handle);
|
||||||
|
|
||||||
|
|
|
@ -606,7 +606,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
qwMsg->connInfo = ctx->dataConnInfo;
|
qwMsg->connInfo = ctx->dataConnInfo;
|
||||||
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH);
|
||||||
|
|
||||||
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
|
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, rsp, dataLen, code);
|
||||||
rsp = NULL;
|
rsp = NULL;
|
||||||
|
|
||||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
|
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code,
|
||||||
|
@ -628,7 +628,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
rsp = NULL;
|
rsp = NULL;
|
||||||
|
|
||||||
qwMsg->connInfo = ctx->dataConnInfo;
|
qwMsg->connInfo = ctx->dataConnInfo;
|
||||||
qwBuildAndSendFetchRsp(&qwMsg->connInfo, NULL, 0, code);
|
qwBuildAndSendFetchRsp(ctx->fetchType, &qwMsg->connInfo, NULL, 0, code);
|
||||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||||
0);
|
0);
|
||||||
}
|
}
|
||||||
|
@ -661,6 +661,8 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) {
|
||||||
|
|
||||||
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx));
|
||||||
|
|
||||||
|
ctx->queryType = qwMsg->msgType;
|
||||||
|
|
||||||
SOutputData sOutput = {0};
|
SOutputData sOutput = {0};
|
||||||
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
QW_ERR_JRET(qwGetQueryResFromSink(QW_FPARAMS(), ctx, &dataLen, &rsp, &sOutput));
|
||||||
|
|
||||||
|
@ -711,7 +713,7 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
if (code || rsp) {
|
if (code || rsp) {
|
||||||
qwBuildAndSendFetchRsp(&qwMsg->connInfo, rsp, dataLen, code);
|
qwBuildAndSendFetchRsp(qwMsg->msgType + 1, &qwMsg->connInfo, rsp, dataLen, code);
|
||||||
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
QW_TASK_DLOG("fetch rsp send, handle:%p, code:%x - %s, dataLen:%d", qwMsg->connInfo.handle, code, tstrerror(code),
|
||||||
dataLen);
|
dataLen);
|
||||||
}
|
}
|
||||||
|
|
|
@ -214,7 +214,8 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
|
||||||
rpcFreeCont(rsp);
|
rpcFreeCont(rsp);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_SCH_FETCH_RSP: {
|
case TDMT_SCH_FETCH_RSP:
|
||||||
|
case TDMT_SCH_MERGE_FETCH_RSP: {
|
||||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
|
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)pRsp->pCont;
|
||||||
|
|
||||||
if (0 == pRsp->code && 0 == rsp->completed) {
|
if (0 == pRsp->code && 0 == rsp->completed) {
|
||||||
|
@ -815,6 +816,7 @@ void *fetchQueueThread(void *param) {
|
||||||
|
|
||||||
switch (fetchRpc->msgType) {
|
switch (fetchRpc->msgType) {
|
||||||
case TDMT_SCH_FETCH:
|
case TDMT_SCH_FETCH:
|
||||||
|
case TDMT_SCH_MERGE_FETCH:
|
||||||
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
|
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_CANCEL_TASK:
|
case TDMT_SCH_CANCEL_TASK:
|
||||||
|
|
|
@ -35,7 +35,7 @@ extern "C" {
|
||||||
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
|
#define SCH_DEFAULT_TASK_TIMEOUT_USEC 10000000
|
||||||
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
|
#define SCH_MAX_TASK_TIMEOUT_USEC 60000000
|
||||||
|
|
||||||
#define SCH_TASK_MAX_EXEC_TIMES 5
|
#define SCH_TASK_MAX_EXEC_TIMES 8
|
||||||
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
|
#define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
|
@ -318,6 +318,7 @@ extern SSchedulerMgmt schMgmt;
|
||||||
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
|
#define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true
|
||||||
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
|
#define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl)
|
||||||
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
|
#define SCH_TASK_NEED_FLOW_CTRL(_job, _task) (SCH_IS_DATA_SRC_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
|
||||||
|
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_SRC_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
|
||||||
|
|
||||||
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
|
#define SCH_SET_JOB_TYPE(_job, type) do { if ((type) != SUBPLAN_TYPE_MODIFY) { (_job)->attr.queryJob = true; } } while (0)
|
||||||
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
|
#define SCH_IS_QUERY_JOB(_job) ((_job)->attr.queryJob)
|
||||||
|
@ -327,7 +328,7 @@ extern SSchedulerMgmt schMgmt;
|
||||||
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
|
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
|
||||||
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
|
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL)
|
||||||
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0))
|
#define SCH_SUB_TASK_NETWORK_ERR(_code, _len) (SCH_NETWORK_ERR(_code) && ((_len) > 0))
|
||||||
#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH)
|
#define SCH_NEED_REDIRECT_MSGTYPE(_msgType) ((_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || (_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
|
||||||
#define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen)))
|
#define SCH_NEED_REDIRECT(_msgType, _code, _rspLen) (SCH_NEED_REDIRECT_MSGTYPE(_msgType) && (NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_SUB_TASK_NETWORK_ERR(_code, _rspLen)))
|
||||||
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
|
#define SCH_NEED_RETRY(_msgType, _code) ((SCH_NETWORK_ERR(_code) && SCH_NEED_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
|
||||||
|
|
||||||
|
|
|
@ -44,6 +44,7 @@ int32_t schValidateReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgTy
|
||||||
// SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
// SCH_SET_TASK_LASTMSG_TYPE(pTask, -1);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
case TDMT_SCH_FETCH_RSP:
|
case TDMT_SCH_FETCH_RSP:
|
||||||
|
case TDMT_SCH_MERGE_FETCH_RSP:
|
||||||
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
|
if (lastMsgType != reqMsgType && -1 != lastMsgType) {
|
||||||
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
|
SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType),
|
||||||
TMSG_INFO(msgType));
|
TMSG_INFO(msgType));
|
||||||
|
@ -304,7 +305,8 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDa
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_SCH_FETCH_RSP: {
|
case TDMT_SCH_FETCH_RSP:
|
||||||
|
case TDMT_SCH_MERGE_FETCH_RSP: {
|
||||||
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
|
SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg;
|
||||||
|
|
||||||
SCH_ERR_JRET(rspCode);
|
SCH_ERR_JRET(rspCode);
|
||||||
|
@ -558,6 +560,7 @@ int32_t schGetCallbackFp(int32_t msgType, __async_send_cb_fn_t *fp) {
|
||||||
case TDMT_VND_DELETE:
|
case TDMT_VND_DELETE:
|
||||||
case TDMT_SCH_EXPLAIN:
|
case TDMT_SCH_EXPLAIN:
|
||||||
case TDMT_SCH_FETCH:
|
case TDMT_SCH_FETCH:
|
||||||
|
case TDMT_SCH_MERGE_FETCH:
|
||||||
*fp = schHandleCallback;
|
*fp = schHandleCallback;
|
||||||
break;
|
break;
|
||||||
case TDMT_SCH_DROP_TASK:
|
case TDMT_SCH_DROP_TASK:
|
||||||
|
@ -1016,7 +1019,8 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
persistHandle = true;
|
persistHandle = true;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_SCH_FETCH: {
|
case TDMT_SCH_FETCH:
|
||||||
|
case TDMT_SCH_MERGE_FETCH: {
|
||||||
msgSize = sizeof(SResFetchReq);
|
msgSize = sizeof(SResFetchReq);
|
||||||
msg = taosMemoryCalloc(1, msgSize);
|
msg = taosMemoryCalloc(1, msgSize);
|
||||||
if (NULL == msg) {
|
if (NULL == msg) {
|
||||||
|
|
|
@ -258,7 +258,9 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||||
.taskId = pTask->taskId,
|
.taskId = pTask->taskId,
|
||||||
.schedId = schMgmt.sId,
|
.schedId = schMgmt.sId,
|
||||||
.execId = pTask->execId,
|
.execId = pTask->execId,
|
||||||
.addr = pTask->succeedAddr};
|
.addr = pTask->succeedAddr,
|
||||||
|
.fetchMsgType = SCH_FETCH_TYPE(pTask),
|
||||||
|
};
|
||||||
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
|
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
|
||||||
SCH_UNLOCK(SCH_WRITE, &parent->lock);
|
SCH_UNLOCK(SCH_WRITE, &parent->lock);
|
||||||
|
|
||||||
|
@ -818,7 +820,7 @@ int32_t schLaunchFetchTask(SSchJob *pJob) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, TDMT_SCH_FETCH));
|
SCH_ERR_JRET(schBuildAndSendMsg(pJob, pJob->fetchTask, &pJob->resNode, SCH_FETCH_TYPE(pJob->fetchTask)));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
|
|
@ -648,8 +648,6 @@ void setHeartbeatTimerMS(int64_t rid, int32_t hbTimerMS) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
||||||
int32_t ret = 0;
|
|
||||||
|
|
||||||
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
taosReleaseRef(tsNodeRefId, rid);
|
taosReleaseRef(tsNodeRefId, rid);
|
||||||
|
@ -657,8 +655,8 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
ASSERT(rid == pSyncNode->rid);
|
ASSERT(rid == pSyncNode->rid);
|
||||||
ret = syncNodePropose(pSyncNode, pMsg, isWeak);
|
|
||||||
|
|
||||||
|
int32_t ret = syncNodePropose(pSyncNode, pMsg, isWeak);
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -669,15 +667,14 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ret = 0;
|
|
||||||
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
||||||
if (pSyncNode == NULL) {
|
if (pSyncNode == NULL) {
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
ASSERT(rid == pSyncNode->rid);
|
ASSERT(rid == pSyncNode->rid);
|
||||||
ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize);
|
|
||||||
|
|
||||||
|
int32_t ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize);
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue