commit
bb5a0f8241
|
@ -59,6 +59,22 @@ void tscPrintMgmtIp() {
|
|||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* For each management node, try twice at least in case of poor network situation.
|
||||
* If the client start to connect to a non-management node from the client, and the first retry may fail due to
|
||||
* the poor network quality. And then, the second retry get the response with redirection command.
|
||||
* The retry will not be executed since only *two* retry is allowed in case of single management node in the cluster.
|
||||
* Therefore, we need to multiply the retry times by factor of 2 to fix this problem.
|
||||
*/
|
||||
static int32_t tscGetMgmtConnMaxRetryTimes() {
|
||||
int32_t factor = 2;
|
||||
#ifdef CLUSTER
|
||||
return tscMgmtIpList.numOfIps * factor;
|
||||
#else
|
||||
return 1*factor;
|
||||
#endif
|
||||
}
|
||||
|
||||
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||
STscObj *pObj = (STscObj *)param;
|
||||
if (pObj == NULL) return;
|
||||
|
@ -134,18 +150,17 @@ void tscProcessActivityTimer(void *handle, void *tmrId) {
|
|||
tscProcessSql(pObj->pHb);
|
||||
}
|
||||
|
||||
//TODO HANDLE error from mgmt
|
||||
void tscGetConnToMgmt(SSqlObj *pSql, uint8_t *pCode) {
|
||||
STscObj *pTscObj = pSql->pTscObj;
|
||||
#ifdef CLUSTER
|
||||
if (pSql->retry < tscMgmtIpList.numOfIps) {
|
||||
if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) {
|
||||
*pCode = 0;
|
||||
pSql->retry++;
|
||||
pSql->index = pSql->index % tscMgmtIpList.numOfIps;
|
||||
if (pSql->cmd.command > TSDB_SQL_READ && pSql->index == 0) pSql->index = 1;
|
||||
void *thandle = taosGetConnFromCache(tscConnCache, tscMgmtIpList.ip[pSql->index], TSC_MGMT_VNODE, pTscObj->user);
|
||||
#else
|
||||
if (pSql->retry < 1) {
|
||||
if (pSql->retry < tscGetMgmtConnMaxRetryTimes()) {
|
||||
*pCode = 0;
|
||||
pSql->retry++;
|
||||
void *thandle = taosGetConnFromCache(tscConnCache, tsServerIp, TSC_MGMT_VNODE, pTscObj->user);
|
||||
|
@ -444,17 +459,14 @@ void *tscProcessMsgFromServer(char *msg, void *ahandle, void *thandle) {
|
|||
}
|
||||
} else {
|
||||
uint16_t rspCode = pMsg->content[0];
|
||||
#ifdef CLUSTER
|
||||
|
||||
#ifdef CLUSTER
|
||||
|
||||
if (rspCode == TSDB_CODE_REDIRECT) {
|
||||
tscTrace("%p it shall be redirected!", pSql);
|
||||
taosAddConnIntoCache(tscConnCache, thandle, pSql->ip, pSql->vnode, pObj->user);
|
||||
pSql->thandle = NULL;
|
||||
|
||||
// reset the retry times for a new mgmt node
|
||||
pSql->retry = 0;
|
||||
|
||||
if (pCmd->command > TSDB_SQL_MGMT) {
|
||||
tscProcessMgmtRedirect(pSql, pMsg->content + 1);
|
||||
} else {
|
||||
|
|
|
@ -256,9 +256,9 @@ static void doUnmapHeaderFileData(SQueryRuntimeEnv* pRuntimeEnv) {
|
|||
|
||||
otherVnodeFiles->pHeaderFileData = NULL;
|
||||
pRuntimeEnv->mmapedHFileIndex = -1;
|
||||
} else {
|
||||
assert(pRuntimeEnv->mmapedHFileIndex == -1);
|
||||
}
|
||||
|
||||
assert(pRuntimeEnv->mmapedHFileIndex == -1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -286,12 +286,12 @@ char *vnodeGetHeaderFileData(SQueryRuntimeEnv *pRuntimeEnv, int32_t fileIndex) {
|
|||
pVnodeFiles->pHeaderFileData = mmap(NULL, size, PROT_READ, MAP_SHARED, pVnodeFiles->headerFd, 0);
|
||||
if (pVnodeFiles->pHeaderFileData == MAP_FAILED) {
|
||||
pVnodeFiles->pHeaderFileData = NULL;
|
||||
dError("QInfo:%p failed to map header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath, size,
|
||||
dError("QInfo:%p failed to mmap header file:%s, size:%lld, %s", pQInfo, pVnodeFiles->headerFilePath, size,
|
||||
strerror(errno));
|
||||
} else {
|
||||
pRuntimeEnv->mmapedHFileIndex = fileIndex; // set the value in case of success mmap file
|
||||
if (madvise(pVnodeFiles->pHeaderFileData, size, MADV_SEQUENTIAL) == -1) {
|
||||
dError("QInfo:%p failed to advise kernel the usage of header files, reason:%s", pQInfo, strerror(errno));
|
||||
dError("QInfo:%p failed to advise kernel the usage of header file, reason:%s", pQInfo, strerror(errno));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
@ -326,9 +326,9 @@ static int vnodeGetCompBlockInfo(SMeterObj *pMeterObj, SQueryRuntimeEnv *pRuntim
|
|||
pSummary->numOfSeek++;
|
||||
|
||||
#if 1
|
||||
char *data = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex); // failed to load the header file data into memory
|
||||
char *data = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex);
|
||||
if (data == NULL) {
|
||||
return -1;
|
||||
return -1; // failed to load the header file data into memory
|
||||
}
|
||||
|
||||
#else
|
||||
|
@ -2842,9 +2842,8 @@ int64_t loadRequiredBlockIntoMem(SQueryRuntimeEnv *pRuntimeEnv, SPositionInfo *p
|
|||
* currently opened file is not the start file, reset to the start file
|
||||
*/
|
||||
int32_t fileIdx = vnodeGetVnodeHeaderFileIdx(&pQuery->fileId, pRuntimeEnv, pQuery->order.order);
|
||||
if (fileIdx < 0) {
|
||||
if (fileIdx < 0) { // ignore the files on disk
|
||||
dError("QInfo:%p failed to get data file:%d", GET_QINFO_ADDR(pQuery), pQuery->fileId);
|
||||
// ignore the files on disk
|
||||
position->fileId = -1;
|
||||
return -1;
|
||||
}
|
||||
|
@ -5494,11 +5493,15 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t
|
|||
|
||||
SVnodeObj *pVnode = &vnodeList[vid];
|
||||
|
||||
char * pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex);
|
||||
char * pHeaderFileData = vnodeGetHeaderFileData(pRuntimeEnv, fileIndex);
|
||||
if (pHeaderFileData == NULL) { // failed to load header file into buffer
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tmsize = sizeof(SCompHeader) * (pVnode->cfg.maxSessions) + sizeof(TSCKSUM);
|
||||
|
||||
// file is corrupted, abort query in current file
|
||||
if (validateHeaderOffsetSegment(pQInfo, pQueryFileInfo->headerFilePath, vid, pHeaderData, tmsize) < 0) {
|
||||
if (validateHeaderOffsetSegment(pQInfo, pQueryFileInfo->headerFilePath, vid, pHeaderFileData, tmsize) < 0) {
|
||||
*numOfMeters = 0;
|
||||
return 0;
|
||||
}
|
||||
|
@ -5549,7 +5552,7 @@ SMeterDataInfo **vnodeFilterQualifiedMeters(SQInfo *pQInfo, int32_t vid, int32_t
|
|||
|
||||
int64_t headerOffset = TSDB_FILE_HEADER_LEN + sizeof(SCompHeader) * pMeterObj->sid;
|
||||
|
||||
SCompHeader *compHeader = (SCompHeader *)(pHeaderData + headerOffset);
|
||||
SCompHeader *compHeader = (SCompHeader *)(pHeaderFileData + headerOffset);
|
||||
|
||||
if (compHeader->compInfoOffset == 0) {
|
||||
continue;
|
||||
|
|
|
@ -290,11 +290,14 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
|
|||
pSummary->numOfFiles++;
|
||||
|
||||
SQueryFileInfo *pQueryFileInfo = &pRuntimeEnv->pVnodeFiles[fileIdx];
|
||||
char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIdx);
|
||||
if (pHeaderData == NULL) { // failed to mmap header file into buffer, ignore current file, try next
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t numOfQualifiedMeters = 0;
|
||||
SMeterDataInfo **pReqMeterDataInfo = vnodeFilterQualifiedMeters(pQInfo, vnodeId, fileIdx, pSupporter->pSidSet,
|
||||
pMeterDataInfo, &numOfQualifiedMeters);
|
||||
dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pQueryFileInfo->dataFilePath, numOfQualifiedMeters);
|
||||
|
||||
if (pReqMeterDataInfo == NULL) {
|
||||
dError("QInfo:%p failed to allocate memory to perform query processing, abort", pQInfo);
|
||||
|
@ -304,6 +307,8 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
|
|||
return NULL;
|
||||
}
|
||||
|
||||
dTrace("QInfo:%p file:%s, %d meters qualified", pQInfo, pQueryFileInfo->dataFilePath, numOfQualifiedMeters);
|
||||
|
||||
// none of meters in query set have pHeaderData in this file, try next file
|
||||
if (numOfQualifiedMeters == 0) {
|
||||
fid += step;
|
||||
|
@ -311,11 +316,6 @@ static SMeterDataInfo *queryOnMultiDataFiles(SQInfo *pQInfo, SMeterDataInfo *pMe
|
|||
continue;
|
||||
}
|
||||
|
||||
char *pHeaderData = vnodeGetHeaderFileData(pRuntimeEnv, fileIdx);
|
||||
if (pHeaderData == NULL) { // failed to mmap header file into buffer
|
||||
continue;
|
||||
}
|
||||
|
||||
uint32_t numOfBlocks = getDataBlocksForMeters(pSupporter, pQuery, pHeaderData, numOfQualifiedMeters, pQueryFileInfo,
|
||||
pReqMeterDataInfo);
|
||||
|
||||
|
|
|
@ -510,7 +510,7 @@ static void doInitGlobalConfig() {
|
|||
0, TSDB_MAX_VNODES, 0, TSDB_CFG_UTYPE_NONE);
|
||||
tsInitConfigOption(cfg++, "tables", &tsSessionsPerVnode, TSDB_CFG_VTYPE_INT,
|
||||
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
|
||||
4, 220000, 0, TSDB_CFG_UTYPE_NONE);
|
||||
TSDB_MIN_TABLES_PER_VNODE, TSDB_MAX_TABLES_PER_VNODE, 0, TSDB_CFG_UTYPE_NONE);
|
||||
tsInitConfigOption(cfg++, "cache", &tsCacheBlockSize, TSDB_CFG_VTYPE_INT,
|
||||
TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW,
|
||||
100, 1048576, 0, TSDB_CFG_UTYPE_BYTE);
|
||||
|
|
Loading…
Reference in New Issue