commit
053785bbf9
|
@ -144,8 +144,9 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
|
|||
SNewVgroupInfo info = {0};
|
||||
info.numOfEps = pVgroupMsg->numOfEps;
|
||||
info.vgId = pVgroupMsg->vgId;
|
||||
info.inUse = 0;
|
||||
info.inUse = 0; // 0 is the default value of inUse in case of multiple replica
|
||||
|
||||
assert(info.numOfEps >= 1 && info.vgId >= 1);
|
||||
for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) {
|
||||
tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN);
|
||||
info.ep[i].port = pVgroupMsg->epAddr[i].port;
|
||||
|
|
|
@ -34,6 +34,7 @@ int tscKeepConn[TSDB_SQL_MAX] = {0};
|
|||
TSKEY tscGetSubscriptionProgress(void* sub, int64_t uid, TSKEY dflt);
|
||||
void tscUpdateSubscriptionProgress(void* sub, int64_t uid, TSKEY ts);
|
||||
void tscSaveSubscriptionProgress(void* sub);
|
||||
static int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo);
|
||||
|
||||
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
|
||||
static int32_t getWaitingTimeInterval(int32_t count) {
|
||||
|
@ -78,7 +79,8 @@ static void tscEpSetHtons(SRpcEpSet *s) {
|
|||
for (int32_t i = 0; i < s->numOfEps; i++) {
|
||||
s->port[i] = htons(s->port[i]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
bool tscEpSetIsEqual(SRpcEpSet *s1, SRpcEpSet *s2) {
|
||||
if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) {
|
||||
return false;
|
||||
|
@ -111,19 +113,22 @@ static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgrou
|
|||
}
|
||||
}
|
||||
|
||||
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
|
||||
SSqlCmd *pCmd = &pObj->cmd;
|
||||
static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) {
|
||||
SSqlCmd *pCmd = &pSql->cmd;
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t vgId = pTableMetaInfo->pTableMeta->vgId;
|
||||
int32_t vgId = -1;
|
||||
if (pTableMetaInfo->pTableMeta->tableType == TSDB_SUPER_TABLE) {
|
||||
assert(vgId == 0);
|
||||
return;
|
||||
vgId = extractSTableQueryVgroupId(pTableMetaInfo);
|
||||
} else {
|
||||
vgId = pTableMetaInfo->pTableMeta->vgId;
|
||||
}
|
||||
|
||||
assert(vgId > 0);
|
||||
|
||||
SNewVgroupInfo vgroupInfo = {.vgId = -1};
|
||||
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||
assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0);
|
||||
|
@ -138,6 +143,33 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
|
|||
|
||||
tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
|
||||
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||
|
||||
// Update the local cached epSet info cached by SqlObj
|
||||
int32_t inUse = pSql->epSet.inUse;
|
||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
||||
tscDebug("%p update the epSet in SqlObj, in use before:%d, after:%d", pSql, inUse, pSql->epSet.inUse);
|
||||
|
||||
}
|
||||
|
||||
int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo) {
|
||||
assert(pTableMetaInfo != NULL);
|
||||
|
||||
int32_t vgIndex = pTableMetaInfo->vgroupIndex;
|
||||
int32_t vgId = -1;
|
||||
|
||||
if (pTableMetaInfo->pVgroupTables == NULL) {
|
||||
SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
|
||||
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
|
||||
vgId = pVgroupInfo->vgroups[vgIndex].vgId;
|
||||
} else {
|
||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
||||
assert(vgIndex >= 0 && vgIndex < numOfVgroups);
|
||||
|
||||
SVgroupTableInfo *pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
|
||||
vgId = pTableIdList->vgInfo.vgId;
|
||||
}
|
||||
|
||||
return vgId;
|
||||
}
|
||||
|
||||
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||
|
@ -515,21 +547,22 @@ int tscBuildFetchMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
if (UTIL_TABLE_IS_SUPER_TABLE(pTableMetaInfo)) {
|
||||
int32_t vgIndex = pTableMetaInfo->vgroupIndex;
|
||||
int32_t vgId = -1;
|
||||
|
||||
if (pTableMetaInfo->pVgroupTables == NULL) {
|
||||
SVgroupsInfo *pVgroupInfo = pTableMetaInfo->vgroupList;
|
||||
assert(pVgroupInfo->vgroups[vgIndex].vgId > 0 && vgIndex < pTableMetaInfo->vgroupList->numOfVgroups);
|
||||
|
||||
pRetrieveMsg->header.vgId = htonl(pVgroupInfo->vgroups[vgIndex].vgId);
|
||||
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, pVgroupInfo->vgroups[vgIndex].vgId, vgIndex, pSql->res.qId);
|
||||
vgId = pVgroupInfo->vgroups[vgIndex].vgId;
|
||||
} else {
|
||||
int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTableMetaInfo->pVgroupTables);
|
||||
assert(vgIndex >= 0 && vgIndex < numOfVgroups);
|
||||
|
||||
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, vgIndex);
|
||||
|
||||
pRetrieveMsg->header.vgId = htonl(pTableIdList->vgInfo.vgId);
|
||||
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, pTableIdList->vgInfo.vgId, vgIndex, pSql->res.qId);
|
||||
vgId = pTableIdList->vgInfo.vgId;
|
||||
}
|
||||
|
||||
pRetrieveMsg->header.vgId = htonl(vgId);
|
||||
tscDebug("%p build fetch msg from vgId:%d, vgIndex:%d, qId:%" PRIu64, pSql, vgId, vgIndex, pSql->res.qId);
|
||||
} else {
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
pRetrieveMsg->header.vgId = htonl(pTableMeta->vgId);
|
||||
|
@ -1980,7 +2013,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
|||
(vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
||||
vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup);
|
||||
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
|
||||
tscDebug("add new VgroupInfo, vgId:%d, total:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
||||
tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2132,18 +2165,33 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
|
|||
tscError("%p empty vgroup info", pSql);
|
||||
} else {
|
||||
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
|
||||
//just init, no need to lock
|
||||
SVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
|
||||
// just init, no need to lock
|
||||
SVgroupInfo *pVgroup = &pInfo->vgroupList->vgroups[j];
|
||||
|
||||
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
|
||||
pVgroups->vgId = htonl(vmsg->vgId);
|
||||
pVgroups->numOfEps = vmsg->numOfEps;
|
||||
vmsg->vgId = htonl(vmsg->vgId);
|
||||
vmsg->numOfEps = vmsg->numOfEps;
|
||||
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
||||
vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
||||
}
|
||||
|
||||
assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
|
||||
SNewVgroupInfo newVi = createNewVgroupInfo(vmsg);
|
||||
pVgroup->numOfEps = newVi.numOfEps;
|
||||
pVgroup->vgId = newVi.vgId;
|
||||
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
||||
pVgroup->epAddr[k].port = newVi.ep[k].port;
|
||||
pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN);
|
||||
}
|
||||
|
||||
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
|
||||
pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
||||
pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
|
||||
// check if current buffer contains the vgroup info.
|
||||
// If not, add it
|
||||
SNewVgroupInfo existVgroupInfo = {.inUse = -1};
|
||||
taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo));
|
||||
|
||||
if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) ||
|
||||
(existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
||||
taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi));
|
||||
tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", newVi.vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -295,7 +295,7 @@ void *rpcOpen(const SRpcInit *pInit) {
|
|||
return NULL;
|
||||
}
|
||||
} else {
|
||||
pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime);
|
||||
pRpc->pCache = rpcOpenConnCache(pRpc->sessions, rpcCloseConn, pRpc->tmrCtrl, pRpc->idleTime*30);
|
||||
if ( pRpc->pCache == NULL ) {
|
||||
tError("%s failed to init connection cache", pRpc->label);
|
||||
rpcClose(pRpc);
|
||||
|
@ -470,7 +470,7 @@ void rpcSendResponse(const SRpcMsg *pRsp) {
|
|||
taosTmrStopA(&pConn->pTimer);
|
||||
|
||||
// set the idle timer to monitor the activity
|
||||
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
||||
taosTmrReset(rpcProcessIdleTimer, pRpc->idleTime*30, pConn, pRpc->tmrCtrl, &pConn->pIdleTimer);
|
||||
rpcSendMsgToPeer(pConn, msg, msgLen);
|
||||
|
||||
// if not set to secured, set it expcet NOT_READY case, since client wont treat it as secured
|
||||
|
@ -1367,7 +1367,7 @@ static void rpcProcessConnError(void *param, void *id) {
|
|||
|
||||
tDebug("%s %p, connection error happens", pRpc->label, pContext->ahandle);
|
||||
|
||||
if (pContext->numOfTry >= pContext->epSet.numOfEps) {
|
||||
if (pContext->numOfTry >= pContext->epSet.numOfEps || pContext->msgType == TSDB_MSG_TYPE_FETCH) {
|
||||
rpcMsg.msgType = pContext->msgType+1;
|
||||
rpcMsg.ahandle = pContext->ahandle;
|
||||
rpcMsg.code = pContext->code;
|
||||
|
|
Loading…
Reference in New Issue