[td-225] update the local cached epset in sqlObj.
This commit is contained in:
parent
5e9cd734c6
commit
67fd99e756
|
@ -144,8 +144,9 @@ SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
|
||||||
SNewVgroupInfo info = {0};
|
SNewVgroupInfo info = {0};
|
||||||
info.numOfEps = pVgroupMsg->numOfEps;
|
info.numOfEps = pVgroupMsg->numOfEps;
|
||||||
info.vgId = pVgroupMsg->vgId;
|
info.vgId = pVgroupMsg->vgId;
|
||||||
info.inUse = 0;
|
info.inUse = 0; // 0 is the default value of inUse in case of multiple replica
|
||||||
|
|
||||||
|
assert(info.numOfEps >= 1 && info.vgId >= 1);
|
||||||
for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) {
|
for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) {
|
||||||
tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN);
|
tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN);
|
||||||
info.ep[i].port = pVgroupMsg->epAddr[i].port;
|
info.ep[i].port = pVgroupMsg->epAddr[i].port;
|
||||||
|
|
|
@ -113,8 +113,8 @@ static void tscDumpEpSetFromVgroupInfo(SRpcEpSet *pEpSet, SNewVgroupInfo *pVgrou
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
|
static void tscUpdateVgroupInfo(SSqlObj *pSql, SRpcEpSet *pEpSet) {
|
||||||
SSqlCmd *pCmd = &pObj->cmd;
|
SSqlCmd *pCmd = &pSql->cmd;
|
||||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||||
if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) {
|
if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) {
|
||||||
return;
|
return;
|
||||||
|
@ -143,6 +143,12 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
|
||||||
|
|
||||||
tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
|
tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
|
||||||
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
|
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||||
|
|
||||||
|
// Update the local cached epSet info cached by SqlObj
|
||||||
|
int32_t inUse = pSql->epSet.inUse;
|
||||||
|
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
||||||
|
tscDebug("%p update the epSet in SqlObj, in use before:%d, after:%d", pSql, inUse, pSql->epSet.inUse);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo) {
|
int32_t extractSTableQueryVgroupId(STableMetaInfo* pTableMetaInfo) {
|
||||||
|
@ -2007,7 +2013,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
||||||
(vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
(vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
||||||
vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup);
|
vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup);
|
||||||
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
|
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
|
||||||
tscDebug("add new VgroupInfo, vgId:%d, total:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2159,18 +2165,33 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
|
||||||
tscError("%p empty vgroup info", pSql);
|
tscError("%p empty vgroup info", pSql);
|
||||||
} else {
|
} else {
|
||||||
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
|
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
|
||||||
//just init, no need to lock
|
// just init, no need to lock
|
||||||
SVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
|
SVgroupInfo *pVgroup = &pInfo->vgroupList->vgroups[j];
|
||||||
|
|
||||||
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
|
SVgroupMsg *vmsg = &pVgroupMsg->vgroups[j];
|
||||||
pVgroups->vgId = htonl(vmsg->vgId);
|
vmsg->vgId = htonl(vmsg->vgId);
|
||||||
pVgroups->numOfEps = vmsg->numOfEps;
|
vmsg->numOfEps = vmsg->numOfEps;
|
||||||
|
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
||||||
|
vmsg->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
||||||
|
}
|
||||||
|
|
||||||
assert(pVgroups->numOfEps >= 1 && pVgroups->vgId >= 1);
|
SNewVgroupInfo newVi = createNewVgroupInfo(vmsg);
|
||||||
|
pVgroup->numOfEps = newVi.numOfEps;
|
||||||
|
pVgroup->vgId = newVi.vgId;
|
||||||
|
for (int32_t k = 0; k < vmsg->numOfEps; ++k) {
|
||||||
|
pVgroup->epAddr[k].port = newVi.ep[k].port;
|
||||||
|
pVgroup->epAddr[k].fqdn = strndup(newVi.ep[k].fqdn, TSDB_FQDN_LEN);
|
||||||
|
}
|
||||||
|
|
||||||
for (int32_t k = 0; k < pVgroups->numOfEps; ++k) {
|
// check if current buffer contains the vgroup info.
|
||||||
pVgroups->epAddr[k].port = htons(vmsg->epAddr[k].port);
|
// If not, add it
|
||||||
pVgroups->epAddr[k].fqdn = strndup(vmsg->epAddr[k].fqdn, tListLen(vmsg->epAddr[k].fqdn));
|
SNewVgroupInfo existVgroupInfo = {.inUse = -1};
|
||||||
|
taosHashGetClone(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), NULL, &existVgroupInfo, sizeof(SNewVgroupInfo));
|
||||||
|
|
||||||
|
if (((existVgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&existVgroupInfo, vmsg)) ||
|
||||||
|
(existVgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
||||||
|
taosHashPut(tscVgroupMap, &newVi.vgId, sizeof(newVi.vgId), &newVi, sizeof(newVi));
|
||||||
|
tscDebug("add new VgroupInfo, vgId:%d, total cached:%d", newVi.vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue