bugfix
This commit is contained in:
parent
a28e9a9402
commit
3c4e620f31
|
@ -52,12 +52,20 @@ typedef struct STableComInfo {
|
|||
int32_t rowSize;
|
||||
} STableComInfo;
|
||||
|
||||
typedef struct SCMCorVgroupInfo {
|
||||
int32_t version;
|
||||
int8_t inUse;
|
||||
int8_t numOfIps;
|
||||
SIpAddr ipAddr[TSDB_MAX_REPLICA];
|
||||
} SCMCorVgroupInfo;
|
||||
|
||||
typedef struct STableMeta {
|
||||
STableComInfo tableInfo;
|
||||
uint8_t tableType;
|
||||
int16_t sversion;
|
||||
int16_t tversion;
|
||||
SCMVgroupInfo vgroupInfo;
|
||||
SCMVgroupInfo vgroupInfo;
|
||||
SCMCorVgroupInfo corVgroupInfo;
|
||||
int32_t sid; // the index of one table in a virtual node
|
||||
uint64_t uid; // unique id of a table
|
||||
SSchema schema[]; // if the table is TSDB_CHILD_TABLE, schema is acquired by super table meta info
|
||||
|
|
|
@ -140,7 +140,15 @@ struct SSchema tscGetTbnameColumnSchema() {
|
|||
strcpy(s.name, TSQL_TBNAME_L);
|
||||
return s;
|
||||
}
|
||||
|
||||
static void tscInitCorVgroupInfo(SCMCorVgroupInfo *corVgroupInfo, SCMVgroupInfo *vgroupInfo) {
|
||||
corVgroupInfo->version = 0;
|
||||
corVgroupInfo->inUse = 0;
|
||||
corVgroupInfo->numOfIps = vgroupInfo->numOfIps;
|
||||
for (int32_t i = 0; i < corVgroupInfo->numOfIps; i++) {
|
||||
strncpy(corVgroupInfo->ipAddr[i].fqdn, vgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN);
|
||||
corVgroupInfo->ipAddr[i].port = vgroupInfo->ipAddr[i].port;
|
||||
}
|
||||
}
|
||||
STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size) {
|
||||
assert(pTableMetaMsg != NULL);
|
||||
|
||||
|
@ -157,9 +165,9 @@ STableMeta* tscCreateTableMetaFromMsg(STableMetaMsg* pTableMetaMsg, size_t* size
|
|||
pTableMeta->sid = pTableMetaMsg->sid;
|
||||
pTableMeta->uid = pTableMetaMsg->uid;
|
||||
pTableMeta->vgroupInfo = pTableMetaMsg->vgroup;
|
||||
//init version here
|
||||
pTableMeta->vgroupInfo.version = 0;
|
||||
|
||||
|
||||
tscInitCorVgroupInfo(&pTableMeta->corVgroupInfo, &pTableMeta->vgroupInfo);
|
||||
|
||||
pTableMeta->sversion = pTableMetaMsg->sversion;
|
||||
pTableMeta->tversion = pTableMetaMsg->tversion;
|
||||
|
||||
|
|
|
@ -45,6 +45,20 @@ void tscSaveSubscriptionProgress(void* sub);
|
|||
|
||||
static int32_t minMsgSize() { return tsRpcHeadSize + 100; }
|
||||
|
||||
static void tscSetDnodeIpList(SSqlObj* pSql, SCMVgroupInfo* pVgroupInfo) {
|
||||
SRpcIpSet* pIpList = &pSql->ipList;
|
||||
pIpList->inUse = 0;
|
||||
if (pVgroupInfo == NULL) {
|
||||
pIpList->numOfIps = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
pIpList->numOfIps = pVgroupInfo->numOfIps;
|
||||
for(int32_t i = 0; i < pVgroupInfo->numOfIps; ++i) {
|
||||
strcpy(pIpList->fqdn[i], pVgroupInfo->ipAddr[i].fqdn);
|
||||
pIpList->port[i] = pVgroupInfo->ipAddr[i].port;
|
||||
}
|
||||
}
|
||||
void tscIpSetCopy(SRpcIpSet *dst, SRpcIpSet *src) {
|
||||
dst->numOfIps = src->numOfIps;
|
||||
dst->inUse = src->inUse;
|
||||
|
@ -83,12 +97,9 @@ void tscUpdateMgmtIpList(SRpcIpSet *pIpSet) {
|
|||
tscIpSetCopy(mgmtIpSet, pIpSet);
|
||||
taosCorEndWrite(&tscMgmtIpSet.version);
|
||||
}
|
||||
static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet, int32_t *vgId) {
|
||||
static void tscDumpIpSetFromVgroupInfo(SCMCorVgroupInfo *pVgroupInfo, SRpcIpSet *pIpSet) {
|
||||
if (pVgroupInfo == NULL) { return;}
|
||||
//taosCorBeginRead(&pVgroupInfo->version);
|
||||
if (vgId) {
|
||||
*vgId = pVgroupInfo->vgId;
|
||||
}
|
||||
taosCorBeginRead(&pVgroupInfo->version);
|
||||
int8_t inUse = pVgroupInfo->inUse;
|
||||
pIpSet->inUse = (inUse >= 0 && inUse < TSDB_MAX_REPLICA) ? inUse: 0;
|
||||
pIpSet->numOfIps = pVgroupInfo->numOfIps;
|
||||
|
@ -96,16 +107,16 @@ static void tscDumpIpSetFromVgroupInfo(SCMVgroupInfo *pVgroupInfo, SRpcIpSet *pI
|
|||
strncpy(pIpSet->fqdn[i], pVgroupInfo->ipAddr[i].fqdn, TSDB_FQDN_LEN);
|
||||
pIpSet->port[i] = pVgroupInfo->ipAddr[i].port;
|
||||
}
|
||||
//taosCorEndRead(&pVgroupInfo->version);
|
||||
taosCorEndRead(&pVgroupInfo->version);
|
||||
}
|
||||
|
||||
static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) {
|
||||
SSqlCmd *pCmd = &pObj->cmd;
|
||||
STableMetaInfo *pTableMetaInfo = tscGetTableMetaInfoFromCmd(pCmd, pCmd->clauseIndex, 0);
|
||||
if (pTableMetaInfo == NULL || pTableMetaInfo->pTableMeta == NULL) { return;}
|
||||
SCMVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->vgroupInfo;
|
||||
SCMCorVgroupInfo *pVgroupInfo = &pTableMetaInfo->pTableMeta->corVgroupInfo;
|
||||
|
||||
//taosCorBeginWrite(&pVgroupInfo->version);
|
||||
taosCorBeginWrite(&pVgroupInfo->version);
|
||||
//TODO(dengyihao), dont care vgid
|
||||
pVgroupInfo->inUse = pIpSet->inUse;
|
||||
pVgroupInfo->numOfIps = pIpSet->numOfIps;
|
||||
|
@ -113,7 +124,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcIpSet *pIpSet) {
|
|||
strncpy(pVgroupInfo->ipAddr[i].fqdn, pIpSet->fqdn[i], TSDB_FQDN_LEN);
|
||||
pVgroupInfo->ipAddr[i].port = pIpSet->port[i];
|
||||
}
|
||||
//taosCorEndWrite(&pVgroupInfo->version);
|
||||
taosCorEndWrite(&pVgroupInfo->version);
|
||||
}
|
||||
void tscPrintMgmtIp() {
|
||||
SRpcIpSet dump;
|
||||
|
@ -577,7 +588,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
|||
|
||||
// pSql->cmd.payloadLen is set during copying data into payload
|
||||
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||
tscDumpIpSetFromVgroupInfo(&pTableMeta->vgroupInfo, &pSql->ipList, NULL);
|
||||
tscDumpIpSetFromVgroupInfo(&pTableMeta->corVgroupInfo, &pSql->ipList);
|
||||
|
||||
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfIP:%d", pSql, vgId, pSql->cmd.numOfTablesInSubmit,
|
||||
pSql->ipList.numOfIps);
|
||||
|
@ -619,9 +630,10 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
|||
} else {
|
||||
pVgroupInfo = &pTableMeta->vgroupInfo;
|
||||
}
|
||||
int32_t vgId = 0;
|
||||
tscDumpIpSetFromVgroupInfo(pVgroupInfo, &pSql->ipList, &vgId);
|
||||
pQueryMsg->head.vgId = htonl(vgId);
|
||||
tscSetDnodeIpList(pSql, pVgroupInfo);
|
||||
if (pVgroupInfo != NULL) {
|
||||
pQueryMsg->head.vgId = htonl(pVgroupInfo->vgId);
|
||||
}
|
||||
|
||||
STableIdInfo *pTableIdInfo = (STableIdInfo *)pMsg;
|
||||
pTableIdInfo->tid = htonl(pTableMeta->sid);
|
||||
|
@ -639,10 +651,9 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
|||
|
||||
SVgroupTableInfo* pTableIdList = taosArrayGet(pTableMetaInfo->pVgroupTables, index);
|
||||
|
||||
// set the vgroup info
|
||||
int32_t vgId = 0;
|
||||
tscDumpIpSetFromVgroupInfo(&pTableIdList->vgInfo, &pSql->ipList, &vgId);
|
||||
pQueryMsg->head.vgId = htonl(vgId);
|
||||
// set the vgroup info
|
||||
tscSetDnodeIpList(pSql, &pTableIdList->vgInfo);
|
||||
pQueryMsg->head.vgId = htonl(pTableIdList->vgInfo.vgId);
|
||||
|
||||
int32_t numOfTables = taosArrayGetSize(pTableIdList->itemList);
|
||||
pQueryMsg->numOfTables = htonl(numOfTables); // set the number of tables
|
||||
|
@ -1374,7 +1385,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
|
|||
SQueryInfo * pQueryInfo = tscGetQueryInfoDetail(pCmd, 0);
|
||||
STableMetaInfo *pTableMetaInfo = tscGetMetaInfo(pQueryInfo, 0);
|
||||
|
||||
tscDumpIpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->vgroupInfo, &pSql->ipList, NULL);
|
||||
tscDumpIpSetFromVgroupInfo(&pTableMetaInfo->pTableMeta->corVgroupInfo, &pSql->ipList);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1900,8 +1911,6 @@ int tscProcessSTableVgroupRsp(SSqlObj *pSql) {
|
|||
for (int32_t j = 0; j < pInfo->vgroupList->numOfVgroups; ++j) {
|
||||
//just init, no need to lock
|
||||
SCMVgroupInfo *pVgroups = &pInfo->vgroupList->vgroups[j];
|
||||
pVgroups->version = 0;
|
||||
pVgroups->inUse = 0;
|
||||
pVgroups->vgId = htonl(pVgroups->vgId);
|
||||
assert(pVgroups->numOfIps >= 1);
|
||||
for (int32_t k = 0; k < pVgroups->numOfIps; ++k) {
|
||||
|
|
|
@ -647,9 +647,7 @@ typedef struct SCMSTableVgroupMsg {
|
|||
} SCMSTableVgroupMsg, SCMSTableVgroupRspMsg;
|
||||
|
||||
typedef struct {
|
||||
int32_t version;
|
||||
int32_t vgId;
|
||||
int8_t inUse;
|
||||
int8_t numOfIps;
|
||||
SIpAddr ipAddr[TSDB_MAX_REPLICA];
|
||||
} SCMVgroupInfo;
|
||||
|
|
Loading…
Reference in New Issue