[td-4802]<fix>: fix crash caused by too many vgroups;
This commit is contained in:
parent
fbe1ff81b3
commit
c5c3ee4c37
|
@ -1740,16 +1740,22 @@ static int32_t mnodeGetSuperTableMeta(SMnodeMsg *pMsg) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t calculateVgroupMsgLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) {
|
||||
static int32_t doGetVgroupInfoLength(char* name) {
|
||||
SSTableObj *pTable = mnodeGetSuperTable(name);
|
||||
int32_t len = 0;
|
||||
if (pTable != NULL && pTable->vgHash != NULL) {
|
||||
len = (taosHashGetSize(pTable->vgHash) * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg));
|
||||
}
|
||||
|
||||
mnodeDecTableRef(pTable);
|
||||
return len;
|
||||
}
|
||||
|
||||
static int32_t getVgroupInfoLength(SSTableVgroupMsg* pInfo, int32_t numOfTable) {
|
||||
int32_t contLen = sizeof(SSTableVgroupRspMsg) + 32 * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg);
|
||||
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||
char *stableName = (char *)pInfo + sizeof(SSTableVgroupMsg) + (TSDB_TABLE_FNAME_LEN)*i;
|
||||
SSTableObj *pTable = mnodeGetSuperTable(stableName);
|
||||
if (pTable != NULL && pTable->vgHash != NULL) {
|
||||
contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SVgroupMsg) + sizeof(SVgroupsMsg));
|
||||
}
|
||||
|
||||
mnodeDecTableRef(pTable);
|
||||
contLen += doGetVgroupInfoLength(stableName);
|
||||
}
|
||||
|
||||
return contLen;
|
||||
|
@ -1820,7 +1826,7 @@ static int32_t mnodeProcessSuperTableVgroupMsg(SMnodeMsg *pMsg) {
|
|||
int32_t numOfTable = htonl(pInfo->numOfTables);
|
||||
|
||||
// calculate the required space.
|
||||
int32_t contLen = calculateVgroupMsgLength(pInfo, numOfTable);
|
||||
int32_t contLen = getVgroupInfoLength(pInfo, numOfTable);
|
||||
SSTableVgroupRspMsg *pRsp = rpcMallocCont(contLen);
|
||||
if (pRsp == NULL) {
|
||||
return TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
@ -2860,6 +2866,27 @@ static void mnodeProcessAlterTableRsp(SRpcMsg *rpcMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray* pList, int32_t* totalMallocLen, int32_t numOfVgroupList) {
|
||||
int32_t len = 0;
|
||||
for (int32_t i = 0; i < numOfVgroupList; ++i) {
|
||||
char *name = taosArrayGetP(pList, i);
|
||||
len += doGetVgroupInfoLength(name);
|
||||
}
|
||||
|
||||
if (len + pMultiMeta->contLen > (*totalMallocLen)) {
|
||||
while (len + pMultiMeta->contLen > (*totalMallocLen)) {
|
||||
(*totalMallocLen) *= 2;
|
||||
}
|
||||
|
||||
pMultiMeta = rpcReallocCont(pMultiMeta, *totalMallocLen);
|
||||
if (pMultiMeta == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
return pMultiMeta;
|
||||
}
|
||||
|
||||
static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
||||
SMultiTableInfoMsg *pInfo = pMsg->rpcMsg.pCont;
|
||||
|
||||
|
@ -2950,8 +2977,6 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
|
||||
|
||||
// add the additional super table names that needs the vgroup info
|
||||
for(;t < num; ++t) {
|
||||
taosArrayPush(pList, &nameList[t]);
|
||||
|
@ -2961,6 +2986,13 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
int32_t numOfVgroupList = (int32_t) taosArrayGetSize(pList);
|
||||
pMultiMeta->numOfVgroup = htonl(numOfVgroupList);
|
||||
|
||||
pMultiMeta = ensureMsgBufferSpace(pMultiMeta, pList, &totalMallocLen, numOfVgroupList);
|
||||
if (pMultiMeta == NULL) {
|
||||
code = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
}
|
||||
|
||||
char* msg = (char*) pMultiMeta + pMultiMeta->contLen;
|
||||
for(int32_t i = 0; i < numOfVgroupList; ++i) {
|
||||
char* name = taosArrayGetP(pList, i);
|
||||
|
||||
|
|
|
@ -1305,9 +1305,16 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn
|
|||
continue;
|
||||
}
|
||||
|
||||
if (memcmp(pInfo->prevData, val, bytes) == 0) {
|
||||
num++;
|
||||
continue;
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
if(varDataLen(val) == varDataLen(pInfo->prevData) && memcmp(pInfo->prevData, val, varDataLen(val)) == 0) {
|
||||
num++;
|
||||
continue;
|
||||
}
|
||||
} else {
|
||||
if (memcmp(pInfo->prevData, val, bytes)) {
|
||||
num++;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (pQueryAttr->stableQuery && pQueryAttr->stabledev && (pRuntimeEnv->prevResult != NULL)) {
|
||||
|
@ -1416,9 +1423,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInf
|
|||
}
|
||||
|
||||
static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
|
||||
int64_t v = -1;
|
||||
GET_TYPED_DATA(v, int64_t, type, pData);
|
||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
if (pResultRow->key == NULL) {
|
||||
pResultRow->key = malloc(varDataTLen(pData));
|
||||
varDataCopy(pResultRow->key, pData);
|
||||
|
@ -1426,6 +1431,9 @@ static void setResultRowKey(SResultRow* pResultRow, char* pData, int16_t type) {
|
|||
assert(memcmp(pResultRow->key, pData, varDataTLen(pData)) == 0);
|
||||
}
|
||||
} else {
|
||||
int64_t v = -1;
|
||||
GET_TYPED_DATA(v, int64_t, type, pData);
|
||||
|
||||
pResultRow->win.skey = v;
|
||||
pResultRow->win.ekey = v;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue