diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index accb7e6f24..e7b0b25653 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -163,7 +163,8 @@ int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc in // These functions will not be called in the child process int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); -int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int32_t timeoutMs); +int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated, + int32_t timeoutMs); int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); void *rpcAllocHandle(); void rpcSetIpWhite(void *thandl, void *arg); @@ -171,6 +172,7 @@ void rpcSetIpWhite(void *thandl, void *arg); int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf); int32_t rpcUtilSWhiteListToStr(SIpWhiteList *pWhiteList, char **ppBuf); +int32_t rpcCvtErrCode(int32_t code); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index c4d61157b6..b41078dfbf 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -68,6 +68,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED TAOS_DEF_ERROR_CODE(0, 0x0020) // "Vgroup could not be connected" #define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) // #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) // +#define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023) diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c index ac94390619..a95ec42bd0 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmHandle.c @@ -159,13 +159,18 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) { dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq); SEpSet epSet = {0}; + int8_t epUpdated = 0; dmGetMnodeEpSet(pMgmt->pData, &epSet); - rpcSendRecvWithTimeout(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp, 5000); + rpcSendRecvWithTimeout(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, 5000); if (rpcRsp.code != 0) { dmRotateMnodeEpSet(pMgmt->pData); - char tbuf[256]; + char tbuf[512]; dmEpSetToStr(tbuf, sizeof(tbuf), &epSet); dError("failed to send status req since %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code), tbuf, epSet.inUse); + } else { + if (epUpdated == 1) { + dmSetMnodeEpSet(pMgmt->pData, &epSet); + } } dmProcessStatusRsp(pMgmt, &rpcRsp); } diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index af43804db4..b79b3f9e11 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -22,10 +22,8 @@ static void *dmStatusThreadFp(void *param) { int64_t lastTime = taosGetTimestampMs(); setThreadName("dnode-status"); - const static int16_t TRIM_FREQ = 30; - int32_t trimCount = 0; - int32_t upTimeCount = 0; - int64_t upTime = 0; + int32_t upTimeCount = 0; + int64_t upTime = 0; while (1) { taosMsleep(200); @@ -38,11 +36,6 @@ static void *dmStatusThreadFp(void *param) { dmSendStatusReq(pMgmt); lastTime = curTime; - trimCount = (trimCount + 1) % TRIM_FREQ; - if (trimCount == 0) { - taosMemoryTrim(0); - } - if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) { upTime = taosGetOsUptime() - tsDndStartOsUptime; tsDndUpTime = TMAX(tsDndUpTime, upTime); @@ -55,27 +48,27 @@ static void *dmStatusThreadFp(void *param) { SDmNotifyHandle dmNotifyHdl = {.state = 0}; static void *dmNotifyThreadFp(void *param) { - SDnodeMgmt *pMgmt = param; - setThreadName("dnode-notify"); + SDnodeMgmt *pMgmt = param; + setThreadName("dnode-notify"); - if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) { - return NULL; + if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) { + return NULL; } - bool wait = true; - while (1) { - if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; + bool wait = true; + while (1) { + if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (wait) tsem_wait(&dmNotifyHdl.sem); atomic_store_8(&dmNotifyHdl.state, 1); - dmSendNotifyReq(pMgmt); - if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) { - wait = true; - continue; + dmSendNotifyReq(pMgmt); + if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) { + wait = true; + continue; } - wait = false; + wait = false; } - return NULL; + return NULL; } static void *dmMonitorThreadFp(void *param) { @@ -83,6 +76,9 @@ static void *dmMonitorThreadFp(void *param) { int64_t lastTime = taosGetTimestampMs(); setThreadName("dnode-monitor"); + static int32_t TRIM_FREQ = 20; + int32_t trimCount = 0; + while (1) { taosMsleep(200); if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; @@ -93,6 +89,11 @@ static void *dmMonitorThreadFp(void *param) { if (interval >= tsMonitorInterval) { (*pMgmt->sendMonitorReportFp)(); lastTime = curTime; + + trimCount = (trimCount + 1) % TRIM_FREQ; + if (trimCount == 0) { + taosMemoryTrim(0); + } } } @@ -126,14 +127,15 @@ static void *dmCrashReportThreadFp(void *param) { setThreadName("dnode-crashReport"); char filepath[PATH_MAX] = {0}; snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP); - char *pMsg = NULL; - int64_t msgLen = 0; + char *pMsg = NULL; + int64_t msgLen = 0; TdFilePtr pFile = NULL; - bool truncateFile = false; - int32_t sleepTime = 200; - int32_t reportPeriodNum = 3600 * 1000 / sleepTime;; + bool truncateFile = false; + int32_t sleepTime = 200; + int32_t reportPeriodNum = 3600 * 1000 / sleepTime; + ; int32_t loopTimes = reportPeriodNum; - + while (1) { if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (loopTimes++ < reportPeriodNum) { @@ -167,13 +169,13 @@ static void *dmCrashReportThreadFp(void *param) { pMsg = NULL; continue; } - + if (pFile) { taosReleaseCrashLogFile(pFile, truncateFile); pFile = NULL; truncateFile = false; } - + taosMsleep(sleepTime); loopTimes = 0; } diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 34ee18a5cc..e0559b4c48 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -15,6 +15,7 @@ #define _DEFAULT_SOURCE #include "mndProfile.h" +#include "audit.h" #include "mndDb.h" #include "mndDnode.h" #include "mndMnode.h" @@ -26,7 +27,6 @@ #include "mndView.h" #include "tglobal.h" #include "tversion.h" -#include "audit.h" typedef struct { uint32_t id; @@ -530,23 +530,24 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb } #ifdef TD_ENTERPRISE - bool needCheck = true; - int32_t key = HEARTBEAT_KEY_DYN_VIEW; - SDynViewVersion* pDynViewVer = NULL; - SKv* pKv = taosHashGet(pHbReq->info, &key, sizeof(key)); + bool needCheck = true; + int32_t key = HEARTBEAT_KEY_DYN_VIEW; + SDynViewVersion *pDynViewVer = NULL; + SKv *pKv = taosHashGet(pHbReq->info, &key, sizeof(key)); if (NULL != pKv) { pDynViewVer = pKv->value; mTrace("recv view dyn ver, bootTs:%" PRId64 ", ver:%" PRIu64, pDynViewVer->svrBootTs, pDynViewVer->dynViewVer); - SDynViewVersion* pRspVer = NULL; + SDynViewVersion *pRspVer = NULL; if (0 != mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck, &pRspVer)) { return -1; } - + if (needCheck) { SKv kv1 = {.key = HEARTBEAT_KEY_DYN_VIEW, .valueLen = sizeof(*pDynViewVer), .value = pRspVer}; taosArrayPush(hbRsp.info, &kv1); - mTrace("need to check view ver, lastest bootTs:%" PRId64 ", ver:%" PRIu64, pRspVer->svrBootTs, pRspVer->dynViewVer); + mTrace("need to check view ver, lastest bootTs:%" PRId64 ", ver:%" PRIu64, pRspVer->svrBootTs, + pRspVer->dynViewVer); } } #endif @@ -594,7 +595,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb if (!needCheck) { break; } - + void *rspMsg = NULL; int32_t rspLen = 0; mndValidateViewInfo(pMnode, kv->value, kv->valueLen / sizeof(SViewVersion), &rspMsg, &rspLen); @@ -814,8 +815,9 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl * @param offset skip [offset] queries in pConn * @param rowsToPack at most rows to pack * @return rows packed -*/ -static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBlock* pBlock, uint32_t offset, uint32_t rowsToPack) { + */ +static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBlock *pBlock, uint32_t offset, + uint32_t rowsToPack) { int32_t cols = 0; taosRLockLatch(&pConn->queryLock); int32_t numOfQueries = taosArrayGetSize(pConn->pQueries); @@ -826,7 +828,7 @@ static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBloc int32_t i = offset; for (; i < numOfQueries && (i - offset) < rowsToPack; ++i) { - int32_t curRowIndex = pBlock->info.rows; + int32_t curRowIndex = pBlock->info.rows; SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i); cols = 0; @@ -877,14 +879,19 @@ static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBloc colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->subPlanNum, false); char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0}; + int64_t reserve = 64; int32_t strSize = sizeof(subStatus); int32_t offset = VARSTR_HEADER_SIZE; - for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) { + for (int32_t i = 0; i < pQuery->subPlanNum && offset + reserve < strSize; ++i) { if (i) { - offset += snprintf(subStatus + offset, strSize - offset - 1, ","); + offset += sprintf(subStatus + offset, ","); + } + if (offset + reserve < strSize) { + SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i); + offset += sprintf(subStatus + offset, "%" PRIu64 ":%s", pDesc->tid, pDesc->status); + } else { + break; } - SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i); - offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status); } varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); @@ -903,8 +910,8 @@ static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBloc } static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { - SMnode * pMnode = pReq->info.node; - SSdb * pSdb = pMnode->pSdb; + SMnode *pMnode = pReq->info.node; + SSdb *pSdb = pMnode->pSdb; int32_t numOfRows = 0; SConnObj *pConn = NULL; diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index ef8bf9674a..2797cb2d82 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -16,16 +16,14 @@ #include "executorInt.h" #include "filter.h" #include "function.h" -#include "index.h" #include "operator.h" -#include "os.h" #include "query.h" #include "querytask.h" #include "tdatablock.h" #include "thash.h" #include "tmsg.h" -#include "tname.h" #include "tref.h" +#include "trpc.h" typedef struct SFetchRspHandleWrapper { uint32_t exchangeId; @@ -131,14 +129,14 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn if (pRsp->completed == 1) { pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 - ", total:%.2f Kb, try next %d/%" PRIzu, + " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 + ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1, totalSources); } else { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 - " execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb", + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%" PRId64 + ", totalRows:%" PRIu64 ", total:%.2f Kb", GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); } @@ -232,7 +230,7 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) { if (blockDataGetNumOfRows(pBlock) == 0) { continue; } - + SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo; if (hasLimitOffsetInfo(pLimitInfo)) { int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false); @@ -261,7 +259,7 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const if (pInfo->dynamicOp) { return TSDB_CODE_SUCCESS; } - + for (int32_t i = 0; i < numOfSources; ++i) { SSourceDataInfo dataInfo = {0}; dataInfo.status = EX_SOURCE_DATA_NOT_READY; @@ -343,8 +341,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode goto _error; } - pOperator->fpSet = - createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; _error: @@ -383,7 +381,7 @@ void doDestroyExchangeOperatorInfo(void* param) { blockDataDestroy(pExInfo->pDummyBlock); tSimpleHashCleanup(pExInfo->pHashSources); - + tsem_destroy(&pExInfo->ready); taosMemoryFreeClear(param); } @@ -411,13 +409,18 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { pRsp->useconds = htobe64(pRsp->useconds); pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); - qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, - pRsp->numOfRows, pExchangeInfo); + qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index, + pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo); } else { taosMemoryFree(pMsg->pData); - pSourceDataInfo->code = code; - qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), - pExchangeInfo); + pSourceDataInfo->code = rpcCvtErrCode(code); + if (pSourceDataInfo->code != code) { + qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index, + tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo); + } else { + qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), + pExchangeInfo); + } } pSourceDataInfo->status = EX_SOURCE_DATA_READY; @@ -450,7 +453,7 @@ int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, in return TSDB_CODE_OUT_OF_MEMORY; } pScan->tableSeq = tableSeq; - + (*ppRes)->opType = srcOpType; (*ppRes)->downstreamIdx = 0; (*ppRes)->value = pScan; @@ -459,9 +462,8 @@ int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, in return TSDB_CODE_SUCCESS; } - int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) { - SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); + SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) { return TSDB_CODE_SUCCESS; } @@ -490,7 +492,8 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas req.queryId = pTaskInfo->id.queryId; req.execId = pSource->execId; if (pDataInfo->pSrcUidList) { - int32_t code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq); + int32_t code = + buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq); taosArrayDestroy(pDataInfo->pSrcUidList); pDataInfo->pSrcUidList = NULL; if (TSDB_CODE_SUCCESS != code) { @@ -499,7 +502,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas return pTaskInfo->code; } } - + int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req); if (msgSize < 0) { pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; @@ -741,8 +744,8 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) { SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; if (pRsp->completed == 1) { - qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 - ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, + qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 + ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, totalSources); @@ -769,7 +772,7 @@ _error: } int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) { - SExchangeInfo* pExchangeInfo = pOperator->info; + SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId)); if (NULL == pIdx) { qError("No exchange source for vgId: %d", pBasicParam->vgId); @@ -784,7 +787,7 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); dataInfo.srcOpType = pBasicParam->srcOpType; dataInfo.tableSeq = pBasicParam->tableSeq; - + taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo); pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1; } else { @@ -800,15 +803,14 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic return TSDB_CODE_SUCCESS; } - int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { - SExchangeInfo* pExchangeInfo = pOperator->info; - int32_t code = TSDB_CODE_SUCCESS; + SExchangeInfo* pExchangeInfo = pOperator->info; + int32_t code = TSDB_CODE_SUCCESS; SExchangeOperatorBasicParam* pBasicParam = NULL; - SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value; + SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value; if (pParam->multiParams) { SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value; - int32_t iter = 0; + int32_t iter = 0; while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) { code = addSingleExchangeSource(pOperator, pBasicParam); if (code) { @@ -826,11 +828,11 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } - int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; - int32_t code = TSDB_CODE_SUCCESS; - if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) || (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) { + int32_t code = TSDB_CODE_SUCCESS; + if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) || + (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) { return TSDB_CODE_SUCCESS; } diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index 76de204cab..0c421bf354 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -15,9 +15,7 @@ #include "executorInt.h" #include "filter.h" -#include "function.h" #include "functionMgt.h" -#include "os.h" #include "querynodes.h" #include "systable.h" #include "tname.h" @@ -32,6 +30,7 @@ #include "storageapi.h" #include "tcompare.h" #include "thash.h" +#include "trpc.h" #include "ttypes.h" typedef int (*__optSysFilter)(void* a, void* b, int16_t dtype); @@ -1789,8 +1788,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); - pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; _error: @@ -1867,7 +1866,13 @@ int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) { pRsp->handle = htobe64(pRsp->handle); pRsp->compLen = htonl(pRsp->compLen); } else { - operator->pTaskInfo->code = code; + operator->pTaskInfo->code = rpcCvtErrCode(code); + if (operator->pTaskInfo->code != code) { + qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code), + tstrerror(operator->pTaskInfo->code)); + } else { + qError("load systable rsp received, error:%s", tstrerror(code)); + } } tsem_post(&pScanResInfo->ready); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 19b2dfc300..7483588593 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -122,6 +122,8 @@ typedef struct SExHandle { typedef struct { STransMsg* pRsp; + SEpSet epSet; + int8_t hasEpSet; tsem_t* pSem; int8_t inited; SRWLatch latch; @@ -317,7 +319,8 @@ int transReleaseSrvHandle(void* handle); int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); -int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int32_t timeoutMs); +int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated, + int32_t timeoutMs); int transSendResponse(const STransMsg* msg); int transRegisterMsg(const STransMsg* msg); int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index b23d229931..64302a78fc 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -169,8 +169,9 @@ int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, in int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { return transSendRecv(shandle, pEpSet, pMsg, pRsp); } -int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int32_t timeoutMs) { - return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, timeoutMs); +int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated, + int32_t timeoutMs) { + return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs); } int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); } @@ -197,6 +198,13 @@ int32_t rpcUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf) { return transUtilSWhiteListToStr(pWhiteList, ppBuf); } +int32_t rpcCvtErrCode(int32_t code) { + if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) { + return TSDB_CODE_RPC_NETWORK_ERROR; + } + return code; +} + int32_t rpcInit() { transInit(); return 0; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 936d11c151..4b3000b47e 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -2425,6 +2425,10 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) { STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), pCtx->syncMsgRef); if (pSyncMsg != NULL) { memcpy(pSyncMsg->pRsp, (char*)pResp, sizeof(*pResp)); + if (cliIsEpsetUpdated(pResp->code, pCtx)) { + pSyncMsg->hasEpSet = 1; + epsetAssign(&pSyncMsg->epSet, &pCtx->epSet); + } tsem_post(pSyncMsg->pSem); taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef); } else { @@ -2640,10 +2644,12 @@ int64_t transCreateSyncMsg(STransMsg* pTransMsg) { pSyncMsg->inited = 0; pSyncMsg->pRsp = pTransMsg; pSyncMsg->pSem = sem; + pSyncMsg->hasEpSet = 0; return taosAddRef(transGetSyncMsgMgt(), pSyncMsg); } -int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int32_t timeoutMs) { +int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated, + int32_t timeoutMs) { STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); if (pTransInst == NULL) { @@ -2695,6 +2701,10 @@ int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pRe ret = TSDB_CODE_TIMEOUT_ERROR; } else { memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg)); + if (pSyncMsg->hasEpSet == 1) { + epsetAssign(pEpSet, &pSyncMsg->epSet); + *epUpdated = 1; + } ret = 0; } _RETURN: diff --git a/source/util/src/terror.c b/source/util/src/terror.c index c294270e4a..360689cef3 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -53,6 +53,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") +TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error") //common & util TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")