fix crash found on 3.1

This commit is contained in:
Yihao Deng 2024-01-08 07:31:35 +00:00
parent 003e4a886d
commit 1eee7b44e0
11 changed files with 140 additions and 94 deletions

View File

@ -163,7 +163,8 @@ int rpcReleaseHandle(void *handle, int8_t type); // just release conn to rpc in
// These functions will not be called in the child process // These functions will not be called in the child process
int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx); int rpcSendRequestWithCtx(void *thandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *rid, SRpcCtx *ctx);
int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); int rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp);
int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int32_t timeoutMs); int rpcSendRecvWithTimeout(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp, int8_t *epUpdated,
int32_t timeoutMs);
int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn); int rpcSetDefaultAddr(void *thandle, const char *ip, const char *fqdn);
void *rpcAllocHandle(); void *rpcAllocHandle();
void rpcSetIpWhite(void *thandl, void *arg); void rpcSetIpWhite(void *thandl, void *arg);
@ -171,6 +172,7 @@ void rpcSetIpWhite(void *thandl, void *arg);
int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf); int32_t rpcUtilSIpRangeToStr(SIpV4Range *pRange, char *buf);
int32_t rpcUtilSWhiteListToStr(SIpWhiteList *pWhiteList, char **ppBuf); int32_t rpcUtilSWhiteListToStr(SIpWhiteList *pWhiteList, char **ppBuf);
int32_t rpcCvtErrCode(int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -68,6 +68,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED TAOS_DEF_ERROR_CODE(0, 0x0020) // "Vgroup could not be connected" #define TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED TAOS_DEF_ERROR_CODE(0, 0x0020) // "Vgroup could not be connected"
#define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) // #define TSDB_CODE_RPC_SOMENODE_BROKEN_LINK TAOS_DEF_ERROR_CODE(0, 0x0021) //
#define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) // #define TSDB_CODE_RPC_MAX_SESSIONS TAOS_DEF_ERROR_CODE(0, 0x0022) //
#define TSDB_CODE_RPC_NETWORK_ERROR TAOS_DEF_ERROR_CODE(0, 0x0023)

View File

@ -159,13 +159,18 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq); dTrace("send status req to mnode, dnodeVer:%" PRId64 " statusSeq:%d", req.dnodeVer, req.statusSeq);
SEpSet epSet = {0}; SEpSet epSet = {0};
int8_t epUpdated = 0;
dmGetMnodeEpSet(pMgmt->pData, &epSet); dmGetMnodeEpSet(pMgmt->pData, &epSet);
rpcSendRecvWithTimeout(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp, 5000); rpcSendRecvWithTimeout(pMgmt->msgCb.clientRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, 5000);
if (rpcRsp.code != 0) { if (rpcRsp.code != 0) {
dmRotateMnodeEpSet(pMgmt->pData); dmRotateMnodeEpSet(pMgmt->pData);
char tbuf[256]; char tbuf[512];
dmEpSetToStr(tbuf, sizeof(tbuf), &epSet); dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
dError("failed to send status req since %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code), tbuf, epSet.inUse); dError("failed to send status req since %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code), tbuf, epSet.inUse);
} else {
if (epUpdated == 1) {
dmSetMnodeEpSet(pMgmt->pData, &epSet);
}
} }
dmProcessStatusRsp(pMgmt, &rpcRsp); dmProcessStatusRsp(pMgmt, &rpcRsp);
} }

View File

@ -22,10 +22,8 @@ static void *dmStatusThreadFp(void *param) {
int64_t lastTime = taosGetTimestampMs(); int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-status"); setThreadName("dnode-status");
const static int16_t TRIM_FREQ = 30; int32_t upTimeCount = 0;
int32_t trimCount = 0; int64_t upTime = 0;
int32_t upTimeCount = 0;
int64_t upTime = 0;
while (1) { while (1) {
taosMsleep(200); taosMsleep(200);
@ -38,11 +36,6 @@ static void *dmStatusThreadFp(void *param) {
dmSendStatusReq(pMgmt); dmSendStatusReq(pMgmt);
lastTime = curTime; lastTime = curTime;
trimCount = (trimCount + 1) % TRIM_FREQ;
if (trimCount == 0) {
taosMemoryTrim(0);
}
if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) { if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
upTime = taosGetOsUptime() - tsDndStartOsUptime; upTime = taosGetOsUptime() - tsDndStartOsUptime;
tsDndUpTime = TMAX(tsDndUpTime, upTime); tsDndUpTime = TMAX(tsDndUpTime, upTime);
@ -55,27 +48,27 @@ static void *dmStatusThreadFp(void *param) {
SDmNotifyHandle dmNotifyHdl = {.state = 0}; SDmNotifyHandle dmNotifyHdl = {.state = 0};
static void *dmNotifyThreadFp(void *param) { static void *dmNotifyThreadFp(void *param) {
SDnodeMgmt *pMgmt = param; SDnodeMgmt *pMgmt = param;
setThreadName("dnode-notify"); setThreadName("dnode-notify");
if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) { if (tsem_init(&dmNotifyHdl.sem, 0, 0) != 0) {
return NULL; return NULL;
} }
bool wait = true; bool wait = true;
while (1) { while (1) {
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
if (wait) tsem_wait(&dmNotifyHdl.sem); if (wait) tsem_wait(&dmNotifyHdl.sem);
atomic_store_8(&dmNotifyHdl.state, 1); atomic_store_8(&dmNotifyHdl.state, 1);
dmSendNotifyReq(pMgmt); dmSendNotifyReq(pMgmt);
if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) { if (1 == atomic_val_compare_exchange_8(&dmNotifyHdl.state, 1, 0)) {
wait = true; wait = true;
continue; continue;
} }
wait = false; wait = false;
} }
return NULL; return NULL;
} }
static void *dmMonitorThreadFp(void *param) { static void *dmMonitorThreadFp(void *param) {
@ -83,6 +76,9 @@ static void *dmMonitorThreadFp(void *param) {
int64_t lastTime = taosGetTimestampMs(); int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-monitor"); setThreadName("dnode-monitor");
static int32_t TRIM_FREQ = 20;
int32_t trimCount = 0;
while (1) { while (1) {
taosMsleep(200); taosMsleep(200);
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
@ -93,6 +89,11 @@ static void *dmMonitorThreadFp(void *param) {
if (interval >= tsMonitorInterval) { if (interval >= tsMonitorInterval) {
(*pMgmt->sendMonitorReportFp)(); (*pMgmt->sendMonitorReportFp)();
lastTime = curTime; lastTime = curTime;
trimCount = (trimCount + 1) % TRIM_FREQ;
if (trimCount == 0) {
taosMemoryTrim(0);
}
} }
} }
@ -126,14 +127,15 @@ static void *dmCrashReportThreadFp(void *param) {
setThreadName("dnode-crashReport"); setThreadName("dnode-crashReport");
char filepath[PATH_MAX] = {0}; char filepath[PATH_MAX] = {0};
snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP); snprintf(filepath, sizeof(filepath), "%s%s.taosdCrashLog", tsLogDir, TD_DIRSEP);
char *pMsg = NULL; char *pMsg = NULL;
int64_t msgLen = 0; int64_t msgLen = 0;
TdFilePtr pFile = NULL; TdFilePtr pFile = NULL;
bool truncateFile = false; bool truncateFile = false;
int32_t sleepTime = 200; int32_t sleepTime = 200;
int32_t reportPeriodNum = 3600 * 1000 / sleepTime;; int32_t reportPeriodNum = 3600 * 1000 / sleepTime;
;
int32_t loopTimes = reportPeriodNum; int32_t loopTimes = reportPeriodNum;
while (1) { while (1) {
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
if (loopTimes++ < reportPeriodNum) { if (loopTimes++ < reportPeriodNum) {
@ -167,13 +169,13 @@ static void *dmCrashReportThreadFp(void *param) {
pMsg = NULL; pMsg = NULL;
continue; continue;
} }
if (pFile) { if (pFile) {
taosReleaseCrashLogFile(pFile, truncateFile); taosReleaseCrashLogFile(pFile, truncateFile);
pFile = NULL; pFile = NULL;
truncateFile = false; truncateFile = false;
} }
taosMsleep(sleepTime); taosMsleep(sleepTime);
loopTimes = 0; loopTimes = 0;
} }

View File

@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mndProfile.h" #include "mndProfile.h"
#include "audit.h"
#include "mndDb.h" #include "mndDb.h"
#include "mndDnode.h" #include "mndDnode.h"
#include "mndMnode.h" #include "mndMnode.h"
@ -26,7 +27,6 @@
#include "mndView.h" #include "mndView.h"
#include "tglobal.h" #include "tglobal.h"
#include "tversion.h" #include "tversion.h"
#include "audit.h"
typedef struct { typedef struct {
uint32_t id; uint32_t id;
@ -530,23 +530,24 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
} }
#ifdef TD_ENTERPRISE #ifdef TD_ENTERPRISE
bool needCheck = true; bool needCheck = true;
int32_t key = HEARTBEAT_KEY_DYN_VIEW; int32_t key = HEARTBEAT_KEY_DYN_VIEW;
SDynViewVersion* pDynViewVer = NULL; SDynViewVersion *pDynViewVer = NULL;
SKv* pKv = taosHashGet(pHbReq->info, &key, sizeof(key)); SKv *pKv = taosHashGet(pHbReq->info, &key, sizeof(key));
if (NULL != pKv) { if (NULL != pKv) {
pDynViewVer = pKv->value; pDynViewVer = pKv->value;
mTrace("recv view dyn ver, bootTs:%" PRId64 ", ver:%" PRIu64, pDynViewVer->svrBootTs, pDynViewVer->dynViewVer); mTrace("recv view dyn ver, bootTs:%" PRId64 ", ver:%" PRIu64, pDynViewVer->svrBootTs, pDynViewVer->dynViewVer);
SDynViewVersion* pRspVer = NULL; SDynViewVersion *pRspVer = NULL;
if (0 != mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck, &pRspVer)) { if (0 != mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck, &pRspVer)) {
return -1; return -1;
} }
if (needCheck) { if (needCheck) {
SKv kv1 = {.key = HEARTBEAT_KEY_DYN_VIEW, .valueLen = sizeof(*pDynViewVer), .value = pRspVer}; SKv kv1 = {.key = HEARTBEAT_KEY_DYN_VIEW, .valueLen = sizeof(*pDynViewVer), .value = pRspVer};
taosArrayPush(hbRsp.info, &kv1); taosArrayPush(hbRsp.info, &kv1);
mTrace("need to check view ver, lastest bootTs:%" PRId64 ", ver:%" PRIu64, pRspVer->svrBootTs, pRspVer->dynViewVer); mTrace("need to check view ver, lastest bootTs:%" PRId64 ", ver:%" PRIu64, pRspVer->svrBootTs,
pRspVer->dynViewVer);
} }
} }
#endif #endif
@ -594,7 +595,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
if (!needCheck) { if (!needCheck) {
break; break;
} }
void *rspMsg = NULL; void *rspMsg = NULL;
int32_t rspLen = 0; int32_t rspLen = 0;
mndValidateViewInfo(pMnode, kv->value, kv->valueLen / sizeof(SViewVersion), &rspMsg, &rspLen); mndValidateViewInfo(pMnode, kv->value, kv->valueLen / sizeof(SViewVersion), &rspMsg, &rspLen);
@ -814,8 +815,9 @@ static int32_t mndRetrieveConns(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBl
* @param offset skip [offset] queries in pConn * @param offset skip [offset] queries in pConn
* @param rowsToPack at most rows to pack * @param rowsToPack at most rows to pack
* @return rows packed * @return rows packed
*/ */
static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBlock* pBlock, uint32_t offset, uint32_t rowsToPack) { static int32_t packQueriesIntoBlock(SShowObj *pShow, SConnObj *pConn, SSDataBlock *pBlock, uint32_t offset,
uint32_t rowsToPack) {
int32_t cols = 0; int32_t cols = 0;
taosRLockLatch(&pConn->queryLock); taosRLockLatch(&pConn->queryLock);
int32_t numOfQueries = taosArrayGetSize(pConn->pQueries); int32_t numOfQueries = taosArrayGetSize(pConn->pQueries);
@ -826,7 +828,7 @@ static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBloc
int32_t i = offset; int32_t i = offset;
for (; i < numOfQueries && (i - offset) < rowsToPack; ++i) { for (; i < numOfQueries && (i - offset) < rowsToPack; ++i) {
int32_t curRowIndex = pBlock->info.rows; int32_t curRowIndex = pBlock->info.rows;
SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i); SQueryDesc *pQuery = taosArrayGet(pConn->pQueries, i);
cols = 0; cols = 0;
@ -877,14 +879,19 @@ static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBloc
colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->subPlanNum, false); colDataSetVal(pColInfo, curRowIndex, (const char *)&pQuery->subPlanNum, false);
char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0}; char subStatus[TSDB_SHOW_SUBQUERY_LEN + VARSTR_HEADER_SIZE] = {0};
int64_t reserve = 64;
int32_t strSize = sizeof(subStatus); int32_t strSize = sizeof(subStatus);
int32_t offset = VARSTR_HEADER_SIZE; int32_t offset = VARSTR_HEADER_SIZE;
for (int32_t i = 0; i < pQuery->subPlanNum && offset < strSize; ++i) { for (int32_t i = 0; i < pQuery->subPlanNum && offset + reserve < strSize; ++i) {
if (i) { if (i) {
offset += snprintf(subStatus + offset, strSize - offset - 1, ","); offset += sprintf(subStatus + offset, ",");
}
if (offset + reserve < strSize) {
SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
offset += sprintf(subStatus + offset, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
} else {
break;
} }
SQuerySubDesc *pDesc = taosArrayGet(pQuery->subDesc, i);
offset += snprintf(subStatus + offset, strSize - offset - 1, "%" PRIu64 ":%s", pDesc->tid, pDesc->status);
} }
varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]); varDataLen(subStatus) = strlen(&subStatus[VARSTR_HEADER_SIZE]);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
@ -903,8 +910,8 @@ static int32_t packQueriesIntoBlock(SShowObj* pShow, SConnObj* pConn, SSDataBloc
} }
static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) { static int32_t mndRetrieveQueries(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode * pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SSdb * pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SConnObj *pConn = NULL; SConnObj *pConn = NULL;

View File

@ -16,16 +16,14 @@
#include "executorInt.h" #include "executorInt.h"
#include "filter.h" #include "filter.h"
#include "function.h" #include "function.h"
#include "index.h"
#include "operator.h" #include "operator.h"
#include "os.h"
#include "query.h" #include "query.h"
#include "querytask.h" #include "querytask.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "thash.h" #include "thash.h"
#include "tmsg.h" #include "tmsg.h"
#include "tname.h"
#include "tref.h" #include "tref.h"
#include "trpc.h"
typedef struct SFetchRspHandleWrapper { typedef struct SFetchRspHandleWrapper {
uint32_t exchangeId; uint32_t exchangeId;
@ -131,14 +129,14 @@ static void concurrentlyLoadRemoteDataImpl(SOperatorInfo* pOperator, SExchangeIn
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED; pDataInfo->status = EX_SOURCE_DATA_EXHAUSTED;
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64
" execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 " execId:%d index:%d completed, blocks:%d, numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64
", total:%.2f Kb, try next %d/%" PRIzu, ", totalRows:%" PRIu64 ", total:%.2f Kb, try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, i, pRsp->numOfBlocks,
pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1, pRsp->numOfRows, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0, i + 1,
totalSources); totalSources);
} else { } else {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d blocks:%d, numOfRows:%" PRId64
" execId:%d blocks:%d, numOfRows:%" PRId64 ", totalRows:%" PRIu64 ", total:%.2f Kb", ", totalRows:%" PRIu64 ", total:%.2f Kb",
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRsp->numOfBlocks,
pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0); pRsp->numOfRows, pLoadInfo->totalRows, pLoadInfo->totalSize / 1024.0);
} }
@ -232,7 +230,7 @@ static SSDataBlock* loadRemoteData(SOperatorInfo* pOperator) {
if (blockDataGetNumOfRows(pBlock) == 0) { if (blockDataGetNumOfRows(pBlock) == 0) {
continue; continue;
} }
SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo; SLimitInfo* pLimitInfo = &pExchangeInfo->limitInfo;
if (hasLimitOffsetInfo(pLimitInfo)) { if (hasLimitOffsetInfo(pLimitInfo)) {
int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false); int32_t status = handleLimitOffset(pOperator, pLimitInfo, pBlock, false);
@ -261,7 +259,7 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const
if (pInfo->dynamicOp) { if (pInfo->dynamicOp) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
for (int32_t i = 0; i < numOfSources; ++i) { for (int32_t i = 0; i < numOfSources; ++i) {
SSourceDataInfo dataInfo = {0}; SSourceDataInfo dataInfo = {0};
dataInfo.status = EX_SOURCE_DATA_NOT_READY; dataInfo.status = EX_SOURCE_DATA_NOT_READY;
@ -343,8 +341,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode
goto _error; goto _error;
} }
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo,
createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator; return pOperator;
_error: _error:
@ -383,7 +381,7 @@ void doDestroyExchangeOperatorInfo(void* param) {
blockDataDestroy(pExInfo->pDummyBlock); blockDataDestroy(pExInfo->pDummyBlock);
tSimpleHashCleanup(pExInfo->pHashSources); tSimpleHashCleanup(pExInfo->pHashSources);
tsem_destroy(&pExInfo->ready); tsem_destroy(&pExInfo->ready);
taosMemoryFreeClear(param); taosMemoryFreeClear(param);
} }
@ -411,13 +409,18 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) {
pRsp->useconds = htobe64(pRsp->useconds); pRsp->useconds = htobe64(pRsp->useconds);
pRsp->numOfBlocks = htonl(pRsp->numOfBlocks); pRsp->numOfBlocks = htonl(pRsp->numOfBlocks);
qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index, pRsp->numOfBlocks, qDebug("%s fetch rsp received, index:%d, blocks:%d, rows:%" PRId64 ", %p", pSourceDataInfo->taskId, index,
pRsp->numOfRows, pExchangeInfo); pRsp->numOfBlocks, pRsp->numOfRows, pExchangeInfo);
} else { } else {
taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pData);
pSourceDataInfo->code = code; pSourceDataInfo->code = rpcCvtErrCode(code);
qDebug("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code), if (pSourceDataInfo->code != code) {
pExchangeInfo); qError("%s fetch rsp received, index:%d, error:%s, cvted error: %s, %p", pSourceDataInfo->taskId, index,
tstrerror(code), tstrerror(pSourceDataInfo->code), pExchangeInfo);
} else {
qError("%s fetch rsp received, index:%d, error:%s, %p", pSourceDataInfo->taskId, index, tstrerror(code),
pExchangeInfo);
}
} }
pSourceDataInfo->status = EX_SOURCE_DATA_READY; pSourceDataInfo->status = EX_SOURCE_DATA_READY;
@ -450,7 +453,7 @@ int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, in
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
pScan->tableSeq = tableSeq; pScan->tableSeq = tableSeq;
(*ppRes)->opType = srcOpType; (*ppRes)->opType = srcOpType;
(*ppRes)->downstreamIdx = 0; (*ppRes)->downstreamIdx = 0;
(*ppRes)->value = pScan; (*ppRes)->value = pScan;
@ -459,9 +462,8 @@ int32_t buildTableScanOperatorParam(SOperatorParam** ppRes, SArray* pUidList, in
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) { int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) {
SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex);
if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) { if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -490,7 +492,8 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
req.queryId = pTaskInfo->id.queryId; req.queryId = pTaskInfo->id.queryId;
req.execId = pSource->execId; req.execId = pSource->execId;
if (pDataInfo->pSrcUidList) { if (pDataInfo->pSrcUidList) {
int32_t code = buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq); int32_t code =
buildTableScanOperatorParam(&req.pOpParam, pDataInfo->pSrcUidList, pDataInfo->srcOpType, pDataInfo->tableSeq);
taosArrayDestroy(pDataInfo->pSrcUidList); taosArrayDestroy(pDataInfo->pSrcUidList);
pDataInfo->pSrcUidList = NULL; pDataInfo->pSrcUidList = NULL;
if (TSDB_CODE_SUCCESS != code) { if (TSDB_CODE_SUCCESS != code) {
@ -499,7 +502,7 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas
return pTaskInfo->code; return pTaskInfo->code;
} }
} }
int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req); int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req);
if (msgSize < 0) { if (msgSize < 0) {
pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY;
@ -741,8 +744,8 @@ int32_t seqLoadRemoteData(SOperatorInfo* pOperator) {
SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp; SRetrieveTableRsp* pRetrieveRsp = pDataInfo->pRsp;
if (pRsp->completed == 1) { if (pRsp->completed == 1) {
qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64 ", rowsOfSource:%" PRIu64 qDebug("%s fetch msg rsp from vgId:%d, taskId:0x%" PRIx64 " execId:%d numOfRows:%" PRId64
", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu, ", rowsOfSource:%" PRIu64 ", totalRows:%" PRIu64 ", totalBytes:%" PRIu64 " try next %d/%" PRIzu,
GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows, GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->taskId, pSource->execId, pRetrieveRsp->numOfRows,
pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1, pDataInfo->totalRows, pLoadInfo->totalRows, pLoadInfo->totalSize, pExchangeInfo->current + 1,
totalSources); totalSources);
@ -769,7 +772,7 @@ _error:
} }
int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) { int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasicParam* pBasicParam) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId)); SExchangeSrcIndex* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pBasicParam->vgId, sizeof(pBasicParam->vgId));
if (NULL == pIdx) { if (NULL == pIdx) {
qError("No exchange source for vgId: %d", pBasicParam->vgId); qError("No exchange source for vgId: %d", pBasicParam->vgId);
@ -784,7 +787,7 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic
dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL); dataInfo.pSrcUidList = taosArrayDup(pBasicParam->uidList, NULL);
dataInfo.srcOpType = pBasicParam->srcOpType; dataInfo.srcOpType = pBasicParam->srcOpType;
dataInfo.tableSeq = pBasicParam->tableSeq; dataInfo.tableSeq = pBasicParam->tableSeq;
taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo); taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo);
pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1; pIdx->inUseIdx = taosArrayGetSize(pExchangeInfo->pSourceDataInfo) - 1;
} else { } else {
@ -800,15 +803,14 @@ int32_t addSingleExchangeSource(SOperatorInfo* pOperator, SExchangeOperatorBasic
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SExchangeOperatorBasicParam* pBasicParam = NULL; SExchangeOperatorBasicParam* pBasicParam = NULL;
SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value; SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorGetParam->value;
if (pParam->multiParams) { if (pParam->multiParams) {
SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value; SExchangeOperatorBatchParam* pBatch = (SExchangeOperatorBatchParam*)pOperator->pOperatorGetParam->value;
int32_t iter = 0; int32_t iter = 0;
while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) { while (NULL != (pBasicParam = tSimpleHashIterate(pBatch->pBatchs, pBasicParam, &iter))) {
code = addSingleExchangeSource(pOperator, pBasicParam); code = addSingleExchangeSource(pOperator, pBasicParam);
if (code) { if (code) {
@ -826,11 +828,11 @@ int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) {
SExchangeInfo* pExchangeInfo = pOperator->info; SExchangeInfo* pExchangeInfo = pOperator->info;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) || (pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) { if ((OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) ||
(pExchangeInfo->dynamicOp && NULL == pOperator->pOperatorGetParam)) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -15,9 +15,7 @@
#include "executorInt.h" #include "executorInt.h"
#include "filter.h" #include "filter.h"
#include "function.h"
#include "functionMgt.h" #include "functionMgt.h"
#include "os.h"
#include "querynodes.h" #include "querynodes.h"
#include "systable.h" #include "systable.h"
#include "tname.h" #include "tname.h"
@ -32,6 +30,7 @@
#include "storageapi.h" #include "storageapi.h"
#include "tcompare.h" #include "tcompare.h"
#include "thash.h" #include "thash.h"
#include "trpc.h"
#include "ttypes.h" #include "ttypes.h"
typedef int (*__optSysFilter)(void* a, void* b, int16_t dtype); typedef int (*__optSysFilter)(void* a, void* b, int16_t dtype);
@ -1789,8 +1788,8 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan
setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED, setOperatorInfo(pOperator, "SysTableScanOperator", QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN, false, OP_NOT_OPENED,
pInfo, pTaskInfo); pInfo, pTaskInfo);
pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock);
pOperator->fpSet = pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator,
createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
return pOperator; return pOperator;
_error: _error:
@ -1867,7 +1866,13 @@ int32_t loadSysTableCallback(void* param, SDataBuf* pMsg, int32_t code) {
pRsp->handle = htobe64(pRsp->handle); pRsp->handle = htobe64(pRsp->handle);
pRsp->compLen = htonl(pRsp->compLen); pRsp->compLen = htonl(pRsp->compLen);
} else { } else {
operator->pTaskInfo->code = code; operator->pTaskInfo->code = rpcCvtErrCode(code);
if (operator->pTaskInfo->code != code) {
qError("load systable rsp received, error:%s, cvted error:%s", tstrerror(code),
tstrerror(operator->pTaskInfo->code));
} else {
qError("load systable rsp received, error:%s", tstrerror(code));
}
} }
tsem_post(&pScanResInfo->ready); tsem_post(&pScanResInfo->ready);

View File

@ -122,6 +122,8 @@ typedef struct SExHandle {
typedef struct { typedef struct {
STransMsg* pRsp; STransMsg* pRsp;
SEpSet epSet;
int8_t hasEpSet;
tsem_t* pSem; tsem_t* pSem;
int8_t inited; int8_t inited;
SRWLatch latch; SRWLatch latch;
@ -317,7 +319,8 @@ int transReleaseSrvHandle(void* handle);
int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx); int transSendRequest(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransCtx* pCtx);
int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp); int transSendRecv(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp);
int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int32_t timeoutMs); int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pMsg, STransMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs);
int transSendResponse(const STransMsg* msg); int transSendResponse(const STransMsg* msg);
int transRegisterMsg(const STransMsg* msg); int transRegisterMsg(const STransMsg* msg);
int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn); int transSetDefaultAddr(void* shandle, const char* ip, const char* fqdn);

View File

@ -169,8 +169,9 @@ int rpcSendRequestWithCtx(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, in
int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) { int rpcSendRecv(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp) {
return transSendRecv(shandle, pEpSet, pMsg, pRsp); return transSendRecv(shandle, pEpSet, pMsg, pRsp);
} }
int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int32_t timeoutMs) { int rpcSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, SRpcMsg* pMsg, SRpcMsg* pRsp, int8_t* epUpdated,
return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, timeoutMs); int32_t timeoutMs) {
return transSendRecvWithTimeout(shandle, pEpSet, pMsg, pRsp, epUpdated, timeoutMs);
} }
int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); } int rpcSendResponse(const SRpcMsg* pMsg) { return transSendResponse(pMsg); }
@ -197,6 +198,13 @@ int32_t rpcUtilSWhiteListToStr(SIpWhiteList* pWhiteList, char** ppBuf) {
return transUtilSWhiteListToStr(pWhiteList, ppBuf); return transUtilSWhiteListToStr(pWhiteList, ppBuf);
} }
int32_t rpcCvtErrCode(int32_t code) {
if (code == TSDB_CODE_RPC_BROKEN_LINK || code == TSDB_CODE_RPC_NETWORK_UNAVAIL) {
return TSDB_CODE_RPC_NETWORK_ERROR;
}
return code;
}
int32_t rpcInit() { int32_t rpcInit() {
transInit(); transInit();
return 0; return 0;

View File

@ -2425,6 +2425,10 @@ int cliAppCb(SCliConn* pConn, STransMsg* pResp, SCliMsg* pMsg) {
STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), pCtx->syncMsgRef); STransSyncMsg* pSyncMsg = taosAcquireRef(transGetSyncMsgMgt(), pCtx->syncMsgRef);
if (pSyncMsg != NULL) { if (pSyncMsg != NULL) {
memcpy(pSyncMsg->pRsp, (char*)pResp, sizeof(*pResp)); memcpy(pSyncMsg->pRsp, (char*)pResp, sizeof(*pResp));
if (cliIsEpsetUpdated(pResp->code, pCtx)) {
pSyncMsg->hasEpSet = 1;
epsetAssign(&pSyncMsg->epSet, &pCtx->epSet);
}
tsem_post(pSyncMsg->pSem); tsem_post(pSyncMsg->pSem);
taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef); taosReleaseRef(transGetSyncMsgMgt(), pCtx->syncMsgRef);
} else { } else {
@ -2640,10 +2644,12 @@ int64_t transCreateSyncMsg(STransMsg* pTransMsg) {
pSyncMsg->inited = 0; pSyncMsg->inited = 0;
pSyncMsg->pRsp = pTransMsg; pSyncMsg->pRsp = pTransMsg;
pSyncMsg->pSem = sem; pSyncMsg->pSem = sem;
pSyncMsg->hasEpSet = 0;
return taosAddRef(transGetSyncMsgMgt(), pSyncMsg); return taosAddRef(transGetSyncMsgMgt(), pSyncMsg);
} }
int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int32_t timeoutMs) { int transSendRecvWithTimeout(void* shandle, SEpSet* pEpSet, STransMsg* pReq, STransMsg* pRsp, int8_t* epUpdated,
int32_t timeoutMs) {
STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle); STrans* pTransInst = (STrans*)transAcquireExHandle(transGetInstMgt(), (int64_t)shandle);
STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg)); STransMsg* pTransMsg = taosMemoryCalloc(1, sizeof(STransMsg));
if (pTransInst == NULL) { if (pTransInst == NULL) {
@ -2695,6 +2701,10 @@ int transSendRecvWithTimeout(void* shandle, const SEpSet* pEpSet, STransMsg* pRe
ret = TSDB_CODE_TIMEOUT_ERROR; ret = TSDB_CODE_TIMEOUT_ERROR;
} else { } else {
memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg)); memcpy(pRsp, pSyncMsg->pRsp, sizeof(STransMsg));
if (pSyncMsg->hasEpSet == 1) {
epsetAssign(pEpSet, &pSyncMsg->epSet);
*epUpdated = 1;
}
ret = 0; ret = 0;
} }
_RETURN: _RETURN:

View File

@ -53,6 +53,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_RPC_BROKEN_LINK, "Conn is broken")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_TIMEOUT, "Conn read timeout")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED, "some vnode/qnode/mnode(s) out of service")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session") TAOS_DEFINE_ERROR(TSDB_CODE_RPC_MAX_SESSIONS, "rpc open too many session")
TAOS_DEFINE_ERROR(TSDB_CODE_RPC_NETWORK_ERROR, "rpc network error")
//common & util //common & util
TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized") TAOS_DEFINE_ERROR(TSDB_CODE_TIME_UNSYNCED, "Client and server's time is not synchronized")