[TD-147] fix invalid write in drop db
This commit is contained in:
parent
dc1cc4bd46
commit
993cce6d79
|
@ -913,15 +913,17 @@ static void mgmtProcessDropDbMsg(SQueuedMsg *pMsg) {
|
|||
return;
|
||||
}
|
||||
|
||||
#if 0
|
||||
SVgObj *pVgroup = pMsg->pDb->pHead;
|
||||
if (pVgroup != NULL) {
|
||||
mPrint("vgroup:%d, will be dropped", pVgroup->vgId);
|
||||
mPrint("vgId:%d, will be dropped", pVgroup->vgId);
|
||||
SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg);
|
||||
newMsg->ahandle = pVgroup;
|
||||
newMsg->expected = pVgroup->numOfVnodes;
|
||||
mgmtDropVgroup(pVgroup, newMsg);
|
||||
return;
|
||||
}
|
||||
#endif
|
||||
|
||||
mTrace("db:%s, all vgroups is dropped", pMsg->pDb->name);
|
||||
mgmtDropDb(pMsg);
|
||||
|
|
|
@ -309,7 +309,7 @@ void mgmtProcessDnodeStatusMsg(SRpcMsg *rpcMsg) {
|
|||
SVgObj *pVgroup = mgmtGetVgroup(pVload->vgId);
|
||||
if (pVgroup == NULL) {
|
||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->dnodeEp);
|
||||
mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pVload->vgId);
|
||||
mPrint("dnode:%d, vgId:%d not exist in mnode, drop it", pDnode->dnodeId, pVload->vgId);
|
||||
mgmtSendDropVnodeMsg(pVload->vgId, &ipSet, NULL);
|
||||
} else {
|
||||
mgmtUpdateVgroupStatus(pVgroup, pDnode, pVload);
|
||||
|
|
|
@ -97,14 +97,14 @@ static int32_t mgmtChildTableActionInsert(SSdbOper *pOper) {
|
|||
|
||||
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||
if (pVgroup == NULL) {
|
||||
mError("ctable:%s, not in vgroup:%d", pTable->info.tableId, pTable->vgId);
|
||||
mError("ctable:%s, not in vgId:%d", pTable->info.tableId, pTable->vgId);
|
||||
return TSDB_CODE_INVALID_VGROUP_ID;
|
||||
}
|
||||
mgmtDecVgroupRef(pVgroup);
|
||||
|
||||
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
|
||||
if (pDb == NULL) {
|
||||
mError("ctable:%s, vgroup:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
|
||||
mError("ctable:%s, vgId:%d not in db:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
|
||||
return TSDB_CODE_INVALID_DB;
|
||||
}
|
||||
mgmtDecDbRef(pDb);
|
||||
|
@ -147,7 +147,7 @@ static int32_t mgmtChildTableActionDelete(SSdbOper *pOper) {
|
|||
|
||||
SDbObj *pDb = mgmtGetDb(pVgroup->dbName);
|
||||
if (pDb == NULL) {
|
||||
mError("ctable:%s, vgroup:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
|
||||
mError("ctable:%s, vgId:%d not in DB:%s", pTable->info.tableId, pVgroup->vgId, pVgroup->dbName);
|
||||
return TSDB_CODE_INVALID_DB;
|
||||
}
|
||||
mgmtDecDbRef(pDb);
|
||||
|
@ -270,7 +270,7 @@ static int32_t mgmtChildTableActionRestored() {
|
|||
|
||||
SVgObj *pVgroup = mgmtGetVgroup(pTable->vgId);
|
||||
if (pVgroup == NULL) {
|
||||
mError("ctable:%s, failed to get vgroup:%d sid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->sid);
|
||||
mError("ctable:%s, failed to get vgId:%d sid:%d, discard it", pTable->info.tableId, pTable->vgId, pTable->sid);
|
||||
pTable->vgId = 0;
|
||||
SSdbOper desc = {0};
|
||||
desc.type = SDB_OPER_LOCAL;
|
||||
|
@ -283,7 +283,7 @@ static int32_t mgmtChildTableActionRestored() {
|
|||
mgmtDecVgroupRef(pVgroup);
|
||||
|
||||
if (strcmp(pVgroup->dbName, pDb->name) != 0) {
|
||||
mError("ctable:%s, db:%s not match with vgroup:%d db:%s sid:%d, discard it",
|
||||
mError("ctable:%s, db:%s not match with vgId:%d db:%s sid:%d, discard it",
|
||||
pTable->info.tableId, pDb->name, pTable->vgId, pVgroup->dbName, pTable->sid);
|
||||
pTable->vgId = 0;
|
||||
SSdbOper desc = {0};
|
||||
|
@ -296,7 +296,7 @@ static int32_t mgmtChildTableActionRestored() {
|
|||
}
|
||||
|
||||
if (pVgroup->tableList == NULL) {
|
||||
mError("ctable:%s, vgroup:%d tableList is null", pTable->info.tableId, pTable->vgId);
|
||||
mError("ctable:%s, vgId:%d tableList is null", pTable->info.tableId, pTable->vgId);
|
||||
pTable->vgId = 0;
|
||||
SSdbOper desc = {0};
|
||||
desc.type = SDB_OPER_LOCAL;
|
||||
|
@ -1157,6 +1157,7 @@ void mgmtDropAllSuperTables(SDbObj *pDropDb) {
|
|||
mPrint("db:%s, all super tables will be dropped from sdb", pDropDb->name);
|
||||
|
||||
while (1) {
|
||||
pLastNode = pNode;
|
||||
pNode = mgmtGetNextSuperTable(pNode, &pTable);
|
||||
if (pTable == NULL) break;
|
||||
|
||||
|
@ -1432,7 +1433,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
|
|||
|
||||
int32_t sid = taosAllocateId(pVgroup->idPool);
|
||||
if (sid <= 0) {
|
||||
mTrace("tables:%s, no enough sid in vgroup:%d", pCreate->tableId, pVgroup->vgId);
|
||||
mTrace("tables:%s, no enough sid in vgId:%d", pCreate->tableId, pVgroup->vgId);
|
||||
mgmtCreateVgroup(mgmtCloneQueuedMsg(pMsg), pMsg->pDb);
|
||||
return;
|
||||
}
|
||||
|
@ -1716,6 +1717,7 @@ void mgmtDropAllChildTables(SDbObj *pDropDb) {
|
|||
mPrint("db:%s, all child tables will be dropped from sdb", pDropDb->name);
|
||||
|
||||
while (1) {
|
||||
pLastNode = pNode;
|
||||
pNode = mgmtGetNextChildTable(pNode, &pTable);
|
||||
if (pTable == NULL) break;
|
||||
|
||||
|
@ -1744,6 +1746,7 @@ static void mgmtDropAllChildTablesInStable(SSuperTableObj *pStable) {
|
|||
mPrint("stable:%s, all child tables will dropped from sdb", pStable->info.tableId, numOfTables);
|
||||
|
||||
while (1) {
|
||||
pLastNode = pNode;
|
||||
pNode = mgmtGetNextChildTable(pNode, &pTable);
|
||||
if (pTable == NULL) break;
|
||||
|
||||
|
@ -1768,7 +1771,7 @@ static SChildTableObj* mgmtGetTableByPos(int32_t vnode, int32_t sid) {
|
|||
SVgObj *pVgroup = mgmtGetVgroup(vnode);
|
||||
if (pVgroup == NULL) return NULL;
|
||||
|
||||
SChildTableObj *pTable = pVgroup->tableList[sid];
|
||||
SChildTableObj *pTable = pVgroup->tableList[sid - 1];
|
||||
mgmtIncTableRef((STableObj *)pTable);
|
||||
|
||||
mgmtDecVgroupRef(pVgroup);
|
||||
|
@ -1852,7 +1855,7 @@ static void mgmtProcessDropChildTableRsp(SRpcMsg *rpcMsg) {
|
|||
}
|
||||
|
||||
if (queueMsg->pVgroup->numOfTables <= 0) {
|
||||
mPrint("vgroup:%d, all tables is dropped, drop vgroup", queueMsg->pVgroup->vgId);
|
||||
mPrint("vgId:%d, all tables is dropped, drop vgroup", queueMsg->pVgroup->vgId);
|
||||
mgmtDropVgroup(queueMsg->pVgroup, NULL);
|
||||
}
|
||||
|
||||
|
|
|
@ -76,13 +76,13 @@ static int32_t mgmtVgroupActionInsert(SSdbOper *pOper) {
|
|||
int32_t size = sizeof(SChildTableObj *) * pDb->cfg.maxTables;
|
||||
pVgroup->tableList = calloc(pDb->cfg.maxTables, sizeof(SChildTableObj *));
|
||||
if (pVgroup->tableList == NULL) {
|
||||
mError("vgroup:%d, failed to malloc(size:%d) for the tableList of vgroups", pVgroup->vgId, size);
|
||||
mError("vgId:%d, failed to malloc(size:%d) for the tableList of vgroups", pVgroup->vgId, size);
|
||||
return -1;
|
||||
}
|
||||
|
||||
pVgroup->idPool = taosInitIdPool(pDb->cfg.maxTables);
|
||||
if (pVgroup->idPool == NULL) {
|
||||
mError("vgroup:%d, failed to taosInitIdPool for vgroups", pVgroup->vgId);
|
||||
mError("vgId:%d, failed to taosInitIdPool for vgroups", pVgroup->vgId);
|
||||
tfree(pVgroup->tableList);
|
||||
return -1;
|
||||
}
|
||||
|
@ -103,7 +103,7 @@ static int32_t mgmtVgroupActionInsert(SSdbOper *pOper) {
|
|||
|
||||
static int32_t mgmtVgroupActionDelete(SSdbOper *pOper) {
|
||||
SVgObj *pVgroup = pOper->pObj;
|
||||
|
||||
|
||||
if (pVgroup->pDb != NULL) {
|
||||
mgmtRemoveVgroupFromDb(pVgroup);
|
||||
}
|
||||
|
@ -150,7 +150,7 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) {
|
|||
SDbObj *pDb = pVgroup->pDb;
|
||||
if (pDb != NULL) {
|
||||
if (pDb->cfg.maxTables != oldTables) {
|
||||
mPrint("vgroup:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxTables);
|
||||
mPrint("vgId:%d tables change from %d to %d", pVgroup->vgId, oldTables, pDb->cfg.maxTables);
|
||||
taosUpdateIdPool(pVgroup->idPool, pDb->cfg.maxTables);
|
||||
int32_t size = sizeof(SChildTableObj *) * pDb->cfg.maxTables;
|
||||
pVgroup->tableList = (SChildTableObj **)realloc(pVgroup->tableList, size);
|
||||
|
@ -158,7 +158,7 @@ static int32_t mgmtVgroupActionUpdate(SSdbOper *pOper) {
|
|||
}
|
||||
|
||||
mgmtDecVgroupRef(pVgroup);
|
||||
mTrace("vgroup:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxTables, pVgroup->numOfVnodes);
|
||||
mTrace("vgId:%d, is updated, tables:%d numOfVnode:%d", pVgroup->vgId, pDb->cfg.maxTables, pVgroup->numOfVnodes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -264,7 +264,7 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo
|
|||
|
||||
if (!dnodeExist) {
|
||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->dnodeEp);
|
||||
mError("vgroup:%d, dnode:%d not exist in mnode, drop it", pVload->vgId, pDnode->dnodeId);
|
||||
mError("vgId:%d, dnode:%d not exist in mnode, drop it", pVload->vgId, pDnode->dnodeId);
|
||||
mgmtSendDropVnodeMsg(pVload->vgId, &ipSet, NULL);
|
||||
return;
|
||||
}
|
||||
|
@ -276,7 +276,7 @@ void mgmtUpdateVgroupStatus(SVgObj *pVgroup, SDnodeObj *pDnode, SVnodeLoad *pVlo
|
|||
}
|
||||
|
||||
if (pVload->cfgVersion != pVgroup->pDb->cfgVersion || pVload->replica != pVgroup->numOfVnodes) {
|
||||
mError("dnode:%d, vgroup:%d, vnode cfgVersion:%d repica:%d not match with mgmt cfgVersion:%d replica:%d",
|
||||
mError("dnode:%d, vgId:%d, vnode cfgVersion:%d repica:%d not match with mgmt cfgVersion:%d replica:%d",
|
||||
pDnode->dnodeId, pVload->vgId, pVload->cfgVersion, pVload->replica, pVgroup->pDb->cfgVersion,
|
||||
pVgroup->numOfVnodes);
|
||||
mgmtSendCreateVgroupMsg(pVgroup, NULL);
|
||||
|
@ -320,9 +320,9 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
|
|||
return;
|
||||
}
|
||||
|
||||
mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
|
||||
mPrint("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
|
||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
mPrint("vgroup:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId);
|
||||
mPrint("vgId:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId);
|
||||
}
|
||||
|
||||
pMsg->ahandle = pVgroup;
|
||||
|
@ -334,7 +334,7 @@ void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle) {
|
|||
if (ahandle != NULL) {
|
||||
mgmtSendDropVgroupMsg(pVgroup, ahandle);
|
||||
} else {
|
||||
mTrace("vgroup:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
|
||||
mTrace("vgId:%d, replica:%d is deleting from sdb", pVgroup->vgId, pVgroup->numOfVnodes);
|
||||
mgmtSendDropVgroupMsg(pVgroup, NULL);
|
||||
SSdbOper oper = {
|
||||
.type = SDB_OPER_GLOBAL,
|
||||
|
@ -509,25 +509,31 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
|
|||
}
|
||||
|
||||
void mgmtAddTableIntoVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
|
||||
if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] == NULL) {
|
||||
pVgroup->tableList[pTable->sid] = pTable;
|
||||
if (pTable->sid >= 1 && pVgroup->tableList[pTable->sid - 1] == NULL) {
|
||||
pVgroup->tableList[pTable->sid - 1] = pTable;
|
||||
taosIdPoolMarkStatus(pVgroup->idPool, pTable->sid);
|
||||
pVgroup->numOfTables++;
|
||||
}
|
||||
|
||||
if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxTables)
|
||||
if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxTables) {
|
||||
mgmtAddVgroupIntoDbTail(pVgroup);
|
||||
}
|
||||
|
||||
mgmtIncVgroupRef(pVgroup);
|
||||
}
|
||||
|
||||
void mgmtRemoveTableFromVgroup(SVgObj *pVgroup, SChildTableObj *pTable) {
|
||||
if (pTable->sid >= 0 && pVgroup->tableList[pTable->sid] != NULL) {
|
||||
pVgroup->tableList[pTable->sid] = NULL;
|
||||
if (pTable->sid >= 1 && pVgroup->tableList[pTable->sid - 1] != NULL) {
|
||||
pVgroup->tableList[pTable->sid - 1] = NULL;
|
||||
taosFreeId(pVgroup->idPool, pTable->sid);
|
||||
pVgroup->numOfTables--;
|
||||
}
|
||||
|
||||
if (pVgroup->numOfTables >= pVgroup->pDb->cfg.maxTables)
|
||||
mgmtAddVgroupIntoDbTail(pVgroup);
|
||||
if (pVgroup->numOfTables == 0) {
|
||||
mgmtRemoveVgroupFromDb(pVgroup);
|
||||
}
|
||||
|
||||
mgmtDecVgroupRef(pVgroup);
|
||||
}
|
||||
|
||||
SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
|
||||
|
@ -592,7 +598,7 @@ SRpcIpSet mgmtGetIpSetFromIp(char *ep) {
|
|||
}
|
||||
|
||||
void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
|
||||
mTrace("vgroup:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, pVgroup->vgId, ahandle);
|
||||
mTrace("vgId:%d, send create vnode:%d msg, ahandle:%p", pVgroup->vgId, pVgroup->vgId, ahandle);
|
||||
SMDCreateVnodeMsg *pCreate = mgmtBuildCreateVnodeMsg(pVgroup);
|
||||
SRpcMsg rpcMsg = {
|
||||
.handle = ahandle,
|
||||
|
@ -605,7 +611,7 @@ void mgmtSendCreateVnodeMsg(SVgObj *pVgroup, SRpcIpSet *ipSet, void *ahandle) {
|
|||
}
|
||||
|
||||
void mgmtSendCreateVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||
mTrace("vgroup:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||
mTrace("vgId:%d, send create all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
|
||||
mgmtSendCreateVnodeMsg(pVgroup, &ipSet, ahandle);
|
||||
|
@ -623,7 +629,7 @@ static void mgmtProcessCreateVnodeRsp(SRpcMsg *rpcMsg) {
|
|||
}
|
||||
|
||||
SVgObj *pVgroup = queueMsg->ahandle;
|
||||
mTrace("vgroup:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
|
||||
mTrace("vgId:%d, create vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
|
||||
pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected,
|
||||
queueMsg->thandle, rpcMsg->handle);
|
||||
|
||||
|
@ -658,7 +664,7 @@ static SMDDropVnodeMsg *mgmtBuildDropVnodeMsg(int32_t vgId) {
|
|||
}
|
||||
|
||||
void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
|
||||
mTrace("vgroup:%d, send drop vnode msg, ahandle:%p", vgId, ahandle);
|
||||
mTrace("vgId:%d, send drop vnode msg, ahandle:%p", vgId, ahandle);
|
||||
SMDDropVnodeMsg *pDrop = mgmtBuildDropVnodeMsg(vgId);
|
||||
SRpcMsg rpcMsg = {
|
||||
.handle = ahandle,
|
||||
|
@ -671,7 +677,7 @@ void mgmtSendDropVnodeMsg(int32_t vgId, SRpcIpSet *ipSet, void *ahandle) {
|
|||
}
|
||||
|
||||
static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
||||
mTrace("vgroup:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||
mTrace("vgId:%d, send drop all vnodes msg, ahandle:%p", pVgroup->vgId, ahandle);
|
||||
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
|
||||
SRpcIpSet ipSet = mgmtGetIpSetFromIp(pVgroup->vnodeGid[i].pDnode->dnodeEp);
|
||||
mgmtSendDropVnodeMsg(pVgroup->vgId, &ipSet, ahandle);
|
||||
|
@ -679,7 +685,7 @@ static void mgmtSendDropVgroupMsg(SVgObj *pVgroup, void *ahandle) {
|
|||
}
|
||||
|
||||
static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
|
||||
mTrace("drop vnode rsp is received");
|
||||
mTrace("drop vnode rsp is received, handle:%p", rpcMsg->handle);
|
||||
if (rpcMsg->handle == NULL) return;
|
||||
|
||||
SQueuedMsg *queueMsg = rpcMsg->handle;
|
||||
|
@ -690,7 +696,7 @@ static void mgmtProcessDropVnodeRsp(SRpcMsg *rpcMsg) {
|
|||
}
|
||||
|
||||
SVgObj *pVgroup = queueMsg->ahandle;
|
||||
mTrace("vgroup:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
|
||||
mTrace("vgId:%d, drop vnode rsp received, result:%s received:%d successed:%d expected:%d, thandle:%p ahandle:%p",
|
||||
pVgroup->vgId, tstrerror(rpcMsg->code), queueMsg->received, queueMsg->successed, queueMsg->expected,
|
||||
queueMsg->thandle, rpcMsg->handle);
|
||||
|
||||
|
@ -774,6 +780,7 @@ void mgmtDropAllDbVgroups(SDbObj *pDropDb) {
|
|||
|
||||
mPrint("db:%s, all vgroups will be dropped from sdb", pDropDb->name);
|
||||
while (1) {
|
||||
pLastNode = pNode;
|
||||
pNode = mgmtGetNextVgroup(pNode, &pVgroup);
|
||||
if (pVgroup == NULL) break;
|
||||
|
||||
|
|
Loading…
Reference in New Issue