commit
6d1b19da35
|
@ -52,12 +52,20 @@ typedef struct STableComInfo {
|
||||||
int32_t rowSize;
|
int32_t rowSize;
|
||||||
} STableComInfo;
|
} STableComInfo;
|
||||||
|
|
||||||
|
typedef struct SCMCorVgroupInfo {
|
||||||
|
int32_t version;
|
||||||
|
int8_t inUse;
|
||||||
|
int8_t numOfEps;
|
||||||
|
SEpAddr epAddr[TSDB_MAX_REPLICA];
|
||||||
|
} SCMCorVgroupInfo;
|
||||||
|
|
||||||
typedef struct STableMeta {
|
typedef struct STableMeta {
|
||||||
STableComInfo tableInfo;
|
STableComInfo tableInfo;
|
||||||
uint8_t tableType;
|
uint8_t tableType;
|
||||||
int16_t sversion;
|
int16_t sversion;
|
||||||
int16_t tversion;
|
int16_t tversion;
|
||||||
SCMVgroupInfo vgroupInfo;
|
SCMVgroupInfo vgroupInfo;
|
||||||
|
SCMCorVgroupInfo corVgroupInfo;
|
||||||
int32_t sid; // the index of one table in a virtual node
|
int32_t sid; // the index of one table in a virtual node
|
||||||
uint64_t uid; // unique id of a table
|
uint64_t uid; // unique id of a table
|
||||||
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
|
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
|
||||||
|
@ -456,7 +464,8 @@ extern void * tscQhandle;
|
||||||
extern int tscKeepConn[];
|
extern int tscKeepConn[];
|
||||||
extern int tsInsertHeadSize;
|
extern int tsInsertHeadSize;
|
||||||
extern int tscNumOfThreads;
|
extern int tscNumOfThreads;
|
||||||
extern SRpcEpSet tscMgmtEpSet;
|
|
||||||
|
extern SRpcCorEpSet tscMgmtEpSet;
|
||||||
|
|
||||||
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
|
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
|
||||||
|
|
||||||
|
|
|
@ -140,7 +140,15 @@ struct SSchema tscGetTbnameColumnSchema() {
|
||||||
strcpy(s.name, TSQL_TBNAME_L);
|
strcpy(s.name, TSQL_TBNAME_L);
|
||||||
return s;
|
return s;
|
||||||
}
|
}
|
||||||
|
static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) {
|
||||||
|
corVgroupInfo->version = 0;
|
||||||
|
corVgroupInfo->inUse = 0;
|
||||||
|
corVgroupInfo->numOfEps = vgroupInfo->numOfEps;
|
||||||
|
for (int32_t i = 0; i < corVgroupInfo->numOfEps; i++) {
|
||||||
|
strncpy(corVgroupInfo->epAddr[i].fqdn, vgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN);
|
||||||
|
corVgroupInfo->epAddr[i].port = vgroupInfo->epAddr[i].port;
|
||||||
|
}
|
||||||
|
}
|
||||||
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) {
|
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) {
|
||||||
assert(pTableMetaMsg != NULL);
|
assert(pTableMetaMsg != NULL);
|
||||||
|
|
||||||
|
@ -157,6 +165,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
|
||||||
pTableMeta->sid = pTableMetaMsg->sid;
|
pTableMeta->sid = pTableMetaMsg->sid;
|
||||||
pTableMeta->uid = pTableMetaMsg->uid;
|
pTableMeta->uid = pTableMetaMsg->uid;
|
||||||
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
|
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
|
||||||
|
|
||||||
|
tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo);
|
||||||
|
|
||||||
pTableMeta->sversion = pTableMetaMsg->sversion;
|
pTableMeta->sversion = pTableMetaMsg->sversion;
|
||||||
pTableMeta->tversion = pTableMetaMsg->tversion;
|
pTableMeta->tversion = pTableMetaMsg->tversion;
|
||||||
|
|
||||||
|
|
|
@ -26,10 +26,11 @@
|
||||||
#include "ttime.h"
|
#include "ttime.h"
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
#include "tlockfree.h"
|
||||||
|
|
||||||
#define TSC_MGMT_VNODE 999
|
#define TSC_MGMT_VNODE 999
|
||||||
|
|
||||||
SRpcEpSet tscMgmtEpSet;
|
SRpcCorEpSet tscMgmtEpSet;
|
||||||
SRpcEpSet tscDnodeEpSet;
|
SRpcEpSet tscDnodeEpSet;
|
||||||
|
|
||||||
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
|
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
|
||||||
|
@ -51,40 +52,81 @@ static void tscSetDnodeEpSet(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
|
||||||
pEpSet->numOfEps = 0;
|
pEpSet->numOfEps = 0;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
pEpSet->numOfEps = pVgroupInfo->numOfEps;
|
pEpSet->numOfEps = pVgroupInfo->numOfEps;
|
||||||
for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
|
for(int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
|
||||||
strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
|
strcpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn);
|
||||||
pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
|
pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
static void tscDumpMgmtEpSet(SRpcEpSet *epSet) {
|
||||||
|
taosCorBeginRead(&tscMgmtEpSet.version);
|
||||||
|
*epSet = tscMgmtEpSet.epSet;
|
||||||
|
taosCorEndRead(&tscMgmtEpSet.version);
|
||||||
|
}
|
||||||
|
static void tscEpSetHtons(SRpcEpSet *s) {
|
||||||
|
for (int32_t i = 0; i < s->numOfEps; i++) {
|
||||||
|
s->port[i] = htons(s->port[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
|
||||||
|
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < s1->numOfEps; i++) {
|
||||||
|
if (s1->port[i] != s2->port[i]
|
||||||
|
|| strncmp(s1->fqdn[i], s2->fqdn[i], TSDB_FQDN_LEN) != 0)
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
void tscUpdateMgmtEpSet(SRpcEpSet *pEpSet) {
|
||||||
|
// no need to update if equal
|
||||||
|
taosCorBeginWrite(&tscMgmtEpSet.version);
|
||||||
|
tscMgmtEpSet.epSet = *pEpSet;
|
||||||
|
taosCorEndWrite(&tscMgmtEpSet.version);
|
||||||
|
}
|
||||||
|
static void tscDumpEpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcEpSet *pEpSet) {
|
||||||
|
if (pVgroupInfo == NULL) { return;}
|
||||||
|
taosCorBeginRead(&pVgroupInfo->version);
|
||||||
|
int8_t inUse = pVgroupInfo->inUse;
|
||||||
|
pEpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0;
|
||||||
|
pEpSet->numOfEps = pVgroupInfo->numOfEps;
|
||||||
|
for (int32_t i = 0; i < pVgroupInfo->numOfEps; ++i) {
|
||||||
|
strncpy(pEpSet->fqdn[i], pVgroupInfo->epAddr[i].fqdn, TSDB_FQDN_LEN);
|
||||||
|
pEpSet->port[i] = pVgroupInfo->epAddr[i].port;
|
||||||
|
}
|
||||||
|
taosCorEndRead(&pVgroupInfo->version);
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
|
||||||
|
SSqlCmd *pCmd = &pObj->cmd;
|
||||||
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||||
|
if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
|
||||||
|
SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
|
||||||
|
|
||||||
|
taosCorBeginWrite(&pVgroupInfo->version);
|
||||||
|
//TODO(dengyihao), dont care vgid
|
||||||
|
pVgroupInfo->inUse = pEpSet->inUse;
|
||||||
|
pVgroupInfo->numOfEps = pEpSet->numOfEps;
|
||||||
|
for (int32_t i = 0; pVgroupInfo->numOfEps; i++) {
|
||||||
|
strncpy(pVgroupInfo->epAddr[i].fqdn, pEpSet->fqdn[i], TSDB_FQDN_LEN);
|
||||||
|
pVgroupInfo->epAddr[i].port = pEpSet->port[i];
|
||||||
|
}
|
||||||
|
taosCorEndWrite(&pVgroupInfo->version);
|
||||||
|
}
|
||||||
void tscPrintMgmtEp() {
|
void tscPrintMgmtEp() {
|
||||||
if (tscMgmtEpSet.numOfEps <= 0) {
|
SRpcEpSet dump;
|
||||||
tscError("invalid mnode EP list:%d", tscMgmtEpSet.numOfEps);
|
tscDumpMgmtEpSet(&dump);
|
||||||
|
if (dump.numOfEps <= 0) {
|
||||||
|
tscError("invalid mnode EP list:%d", dump.numOfEps);
|
||||||
} else {
|
} else {
|
||||||
for (int i = 0; i < tscMgmtEpSet.numOfEps; ++i) {
|
for (int i = 0; i < dump.numOfEps; ++i) {
|
||||||
tscDebug("mnode index:%d %s:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]);
|
tscDebug("mnode index:%d %s:%d", i, dump.fqdn[i], dump.port[i]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscSetMgmtEpSet(SRpcEpSet *pEpSet) {
|
|
||||||
tscMgmtEpSet.numOfEps = pEpSet->numOfEps;
|
|
||||||
tscMgmtEpSet.inUse = pEpSet->inUse;
|
|
||||||
for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) {
|
|
||||||
tscMgmtEpSet.port[i] = htons(pEpSet->port[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet) {
|
|
||||||
tscMgmtEpSet = *pEpSet;
|
|
||||||
tscDebug("mnode EP list is changed for ufp is called, numOfEps:%d inUse:%d", tscMgmtEpSet.numOfEps, tscMgmtEpSet.inUse);
|
|
||||||
for (int32_t i = 0; i < tscMgmtEpSet.numOfEps; ++i) {
|
|
||||||
tscDebug("index:%d fqdn:%s port:%d", i, tscMgmtEpSet.fqdn[i], tscMgmtEpSet.port[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* For each management node, try twice at least in case of poor network situation.
|
* For each management node, try twice at least in case of poor network situation.
|
||||||
* If the client start to connect to a non-management node from the client, and the first retry may fail due to
|
* If the client start to connect to a non-management node from the client, and the first retry may fail due to
|
||||||
|
@ -95,7 +137,9 @@ void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet) {
|
||||||
UNUSED_FUNC
|
UNUSED_FUNC
|
||||||
static int32_t tscGetMgmtConnMaxRetryTimes() {
|
static int32_t tscGetMgmtConnMaxRetryTimes() {
|
||||||
int32_t factor = 2;
|
int32_t factor = 2;
|
||||||
return tscMgmtEpSet.numOfEps * factor;
|
SRpcEpSet dump;
|
||||||
|
tscDumpMgmtEpSet(&dump);
|
||||||
|
return dump.numOfEps * factor;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||||
|
@ -111,9 +155,11 @@ void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
|
SCMHeartBeatRsp *pRsp = (SCMHeartBeatRsp *)pRes->pRsp;
|
||||||
SRpcEpSet * pEpSet = &pRsp->epSet;
|
SRpcEpSet * epSet = &pRsp->epSet;
|
||||||
if (pEpSet->numOfEps > 0)
|
if (epSet->numOfEps > 0) {
|
||||||
tscSetMgmtEpSet(pEpSet);
|
tscEpSetHtons(epSet);
|
||||||
|
tscUpdateMgmtEpSet(epSet);
|
||||||
|
}
|
||||||
|
|
||||||
pSql->pTscObj->connId = htonl(pRsp->connId);
|
pSql->pTscObj->connId = htonl(pRsp->connId);
|
||||||
|
|
||||||
|
@ -185,7 +231,7 @@ int tscSendMsgToServer(SSqlObj *pSql) {
|
||||||
|
|
||||||
// set the mgmt ip list
|
// set the mgmt ip list
|
||||||
if (pSql->cmd.command >= TSDB_SQL_MGMT) {
|
if (pSql->cmd.command >= TSDB_SQL_MGMT) {
|
||||||
pSql->epSet = tscMgmtEpSet;
|
tscDumpMgmtEpSet(&pSql->epSet);
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
|
memcpy(pMsg, pSql->cmd.payload, pSql->cmd.payloadLen);
|
||||||
|
@ -236,10 +282,16 @@ void tscProcessMsgFromServer(SRpcMsg *rpcMsg, SRpcEpSet *pEpSet) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pCmd->command < TSDB_SQL_MGMT) {
|
if (pEpSet) {
|
||||||
if (pEpSet) pSql->epSet = *pEpSet;
|
//SRpcEpSet dump;
|
||||||
} else {
|
tscEpSetHtons(pEpSet);
|
||||||
if (pEpSet) tscMgmtEpSet = *pEpSet;
|
if (tscEpSetIsEqual(&pSql->epSet, pEpSet)) {
|
||||||
|
if(pCmd->command < TSDB_SQL_MGMT) {
|
||||||
|
tscUpdateVgroupInfo(pSql, pEpSet);
|
||||||
|
} else {
|
||||||
|
tscUpdateMgmtEpSet(pEpSet);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rpcMsg->pCont == NULL) {
|
if (rpcMsg->pCont == NULL) {
|
||||||
|
@ -421,7 +473,8 @@ int tscProcessSql(SSqlObj *pSql) {
|
||||||
return pSql->res.code;
|
return pSql->res.code;
|
||||||
}
|
}
|
||||||
} else if (pCmd->command < TSDB_SQL_LOCAL) {
|
} else if (pCmd->command < TSDB_SQL_LOCAL) {
|
||||||
pSql->epSet = tscMgmtEpSet;
|
|
||||||
|
//pSql->epSet = tscMgmtEpSet;
|
||||||
} else { // local handler
|
} else { // local handler
|
||||||
return (*tscProcessMsgRsp[pCmd->command])(pSql);
|
return (*tscProcessMsgRsp[pCmd->command])(pSql);
|
||||||
}
|
}
|
||||||
|
@ -525,8 +578,8 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
|
|
||||||
// pSql->cmd.payloadLen is set during copying data into payload
|
// pSql->cmd.payloadLen is set during copying data into payload
|
||||||
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||||
tscSetDnodeEpSet(pSql, &pTableMeta->vgroupInfo);
|
tscDumpEpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->epSet);
|
||||||
|
|
||||||
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
|
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
|
||||||
pSql->epSet.numOfEps);
|
pSql->epSet.numOfEps);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -567,11 +620,11 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
||||||
} else {
|
} else {
|
||||||
pVgroupInfo = &pTableMeta->vgroupInfo;
|
pVgroupInfo = &pTableMeta->vgroupInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
tscSetDnodeEpSet(pSql, pVgroupInfo);
|
tscSetDnodeEpSet(pSql, pVgroupInfo);
|
||||||
|
|
||||||
if (pVgroupInfo != NULL) {
|
if (pVgroupInfo != NULL) {
|
||||||
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
|
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
|
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
|
||||||
pTableIdInfo->tid = htonl(pTableMeta->sid);
|
pTableIdInfo->tid = htonl(pTableMeta->sid);
|
||||||
|
@ -588,8 +641,8 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
||||||
tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
|
tscDebug("%p query on stable, vgIndex:%d, numOfVgroups:%d", pSql, index, numOfVgroups);
|
||||||
|
|
||||||
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
|
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
|
||||||
|
|
||||||
// set the vgroup info
|
// set the vgroup info
|
||||||
tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
|
tscSetDnodeEpSet(pSql, &pTableIdList->vgInfo);
|
||||||
pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
|
pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
|
||||||
|
|
||||||
|
@ -1136,11 +1189,11 @@ int32_t tscBuildShowMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
pShowMsg->payloadLen = htons(pPattern->n);
|
pShowMsg->payloadLen = htons(pPattern->n);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
SSQLToken *pIpAddr = &pShowInfo->prefix;
|
SSQLToken *pEpAddr = &pShowInfo->prefix;
|
||||||
assert(pIpAddr->n > 0 && pIpAddr->type > 0);
|
assert(pEpAddr->n > 0 && pEpAddr->type > 0);
|
||||||
|
|
||||||
strncpy(pShowMsg->payload, pIpAddr->z, pIpAddr->n);
|
strncpy(pShowMsg->payload, pEpAddr->z, pEpAddr->n);
|
||||||
pShowMsg->payloadLen = htons(pIpAddr->n);
|
pShowMsg->payloadLen = htons(pEpAddr->n);
|
||||||
}
|
}
|
||||||
|
|
||||||
pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
|
pCmd->payloadLen = sizeof(SCMShowMsg) + pShowMsg->payloadLen;
|
||||||
|
@ -1323,7 +1376,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
|
||||||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||||
|
|
||||||
tscSetDnodeEpSet(pSql, &pTableMetaInfo->pTableMeta->vgroupInfo);
|
tscDumpEpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->epSet);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1847,13 +1900,14 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
|
||||||
|
|
||||||
memcpy(pInfo->vgroupList, pVgroupInfo, size);
|
memcpy(pInfo->vgroupList, pVgroupInfo, size);
|
||||||
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
|
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
|
||||||
|
//just init, no need to lock
|
||||||
SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
|
SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
|
||||||
|
|
||||||
pVgroups->vgId = htonl(pVgroups->vgId);
|
pVgroups->vgId = htonl(pVgroups->vgId);
|
||||||
assert(pVgroups->numOfEps >= 1);
|
assert(pVgroups->numOfEps >= 1);
|
||||||
|
|
||||||
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
|
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
|
||||||
pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
|
pVgroups->epAddr[k].port = htons(pVgroups->epAddr[k].port);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pMsg += size;
|
pMsg += size;
|
||||||
|
@ -1946,8 +2000,10 @@ int tscProcessConnectRsp(SSqlObj *pSql) {
|
||||||
assert(len <= sizeof(pObj->db));
|
assert(len <= sizeof(pObj->db));
|
||||||
tstrncpy(pObj->db, temp, sizeof(pObj->db));
|
tstrncpy(pObj->db, temp, sizeof(pObj->db));
|
||||||
|
|
||||||
if (pConnect->epSet.numOfEps > 0)
|
if (pConnect->epSet.numOfEps > 0) {
|
||||||
tscSetMgmtEpSet(&pConnect->epSet);
|
tscEpSetHtons(&pConnect->epSet);
|
||||||
|
tscUpdateMgmtEpSet(&pConnect->epSet);
|
||||||
|
}
|
||||||
|
|
||||||
strcpy(pObj->sversion, pConnect->serverVersion);
|
strcpy(pObj->sversion, pConnect->serverVersion);
|
||||||
pObj->writeAuth = pConnect->writeAuth;
|
pObj->writeAuth = pConnect->writeAuth;
|
||||||
|
|
|
@ -63,7 +63,7 @@ SSqlObj *taosConnectImpl(const char *ip, const char *user, const char *pass, con
|
||||||
|
|
||||||
if (ip) {
|
if (ip) {
|
||||||
if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL;
|
if (tscSetMgmtEpSetFromCfg(ip, NULL) < 0) return NULL;
|
||||||
if (port) tscMgmtEpSet.port[0] = port;
|
if (port) tscMgmtEpSet.epSet.port[0] = port;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pDnodeConn = NULL;
|
void *pDnodeConn = NULL;
|
||||||
|
|
|
@ -41,7 +41,7 @@ int tscNumOfThreads;
|
||||||
|
|
||||||
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
|
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
|
||||||
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
|
void taosInitNote(int numOfNoteLines, int maxNotes, char* lable);
|
||||||
void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet);
|
//void tscUpdateEpSet(void *ahandle, SRpcEpSet *pEpSet);
|
||||||
|
|
||||||
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
|
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
|
||||||
taosGetDisk();
|
taosGetDisk();
|
||||||
|
|
|
@ -2146,16 +2146,19 @@ char* strdup_throw(const char* str) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tscSetMgmtEpSetFromCfg(const char *first, const char *second) {
|
int tscSetMgmtEpSetFromCfg(const char *first, const char *second) {
|
||||||
tscMgmtEpSet.numOfEps = 0;
|
// init mgmt ip set
|
||||||
tscMgmtEpSet.inUse = 0;
|
tscMgmtEpSet.version = 0;
|
||||||
|
SRpcEpSet *mgmtEpSet = &(tscMgmtEpSet.epSet);
|
||||||
|
mgmtEpSet->numOfEps = 0;
|
||||||
|
mgmtEpSet->inUse = 0;
|
||||||
|
|
||||||
if (first && first[0] != 0) {
|
if (first && first[0] != 0) {
|
||||||
if (strlen(first) >= TSDB_EP_LEN) {
|
if (strlen(first) >= TSDB_EP_LEN) {
|
||||||
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosGetFqdnPortFromEp(first, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]);
|
taosGetFqdnPortFromEp(first, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
|
||||||
tscMgmtEpSet.numOfEps++;
|
mgmtEpSet->numOfEps++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (second && second[0] != 0) {
|
if (second && second[0] != 0) {
|
||||||
|
@ -2163,11 +2166,11 @@ int tscSetMgmtEpSetFromCfg(const char *first, const char *second) {
|
||||||
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosGetFqdnPortFromEp(second, tscMgmtEpSet.fqdn[tscMgmtEpSet.numOfEps], &tscMgmtEpSet.port[tscMgmtEpSet.numOfEps]);
|
taosGetFqdnPortFromEp(second, mgmtEpSet->fqdn[mgmtEpSet->numOfEps], &(mgmtEpSet->port[mgmtEpSet->numOfEps]));
|
||||||
tscMgmtEpSet.numOfEps++;
|
mgmtEpSet->numOfEps++;
|
||||||
}
|
}
|
||||||
|
|
||||||
if ( tscMgmtEpSet.numOfEps == 0) {
|
if (mgmtEpSet->numOfEps == 0) {
|
||||||
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
terrno = TSDB_CODE_TSC_INVALID_FQDN;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,11 @@ typedef struct SRpcEpSet {
|
||||||
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
|
char fqdn[TSDB_MAX_REPLICA][TSDB_FQDN_LEN];
|
||||||
} SRpcEpSet;
|
} SRpcEpSet;
|
||||||
|
|
||||||
|
typedef struct SRpcCorEpSet {
|
||||||
|
int32_t version;
|
||||||
|
SRpcEpSet epSet;
|
||||||
|
} SRpcCorEpSet;
|
||||||
|
|
||||||
typedef struct SRpcConnInfo {
|
typedef struct SRpcConnInfo {
|
||||||
uint32_t clientIp;
|
uint32_t clientIp;
|
||||||
uint16_t clientPort;
|
uint16_t clientPort;
|
||||||
|
|
|
@ -75,7 +75,7 @@ void taosRUnLockLatch(SRWLatch *pLatch);
|
||||||
|
|
||||||
// copy on read
|
// copy on read
|
||||||
#define taosCorBeginRead(x) for (uint32_t i_ = 1; 1; ++i_) { \
|
#define taosCorBeginRead(x) for (uint32_t i_ = 1; 1; ++i_) { \
|
||||||
int32_t old_ = atomic_load_32(x); \
|
int32_t old_ = atomic_add_fetch_32((x), 0); \
|
||||||
if (old_ & 0x00000001) { \
|
if (old_ & 0x00000001) { \
|
||||||
if (i_ % 1000 == 0) { \
|
if (i_ % 1000 == 0) { \
|
||||||
sched_yield(); \
|
sched_yield(); \
|
||||||
|
@ -84,7 +84,7 @@ void taosRUnLockLatch(SRWLatch *pLatch);
|
||||||
}
|
}
|
||||||
|
|
||||||
#define taosCorEndRead(x) \
|
#define taosCorEndRead(x) \
|
||||||
if (atomic_load_32(x) == old_) { \
|
if (atomic_add_fetch_32((x), 0) == old_) { \
|
||||||
break; \
|
break; \
|
||||||
} \
|
} \
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue