[TD-271] fix bug while drop stable
This commit is contained in:
parent
459c267a2a
commit
c2a9c11968
|
@ -85,8 +85,7 @@ typedef struct SSuperTableObj {
|
||||||
int32_t numOfTables;
|
int32_t numOfTables;
|
||||||
int16_t nextColId;
|
int16_t nextColId;
|
||||||
SSchema * schema;
|
SSchema * schema;
|
||||||
int32_t vgLen;
|
void * vgHash;
|
||||||
int32_t * vgList;
|
|
||||||
} SSuperTableObj;
|
} SSuperTableObj;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "tname.h"
|
#include "tname.h"
|
||||||
#include "tidpool.h"
|
#include "tidpool.h"
|
||||||
#include "tglobal.h"
|
#include "tglobal.h"
|
||||||
|
#include "hash.h"
|
||||||
#include "dnode.h"
|
#include "dnode.h"
|
||||||
#include "mgmtDef.h"
|
#include "mgmtDef.h"
|
||||||
#include "mgmtInt.h"
|
#include "mgmtInt.h"
|
||||||
|
@ -363,39 +364,35 @@ static void mgmtCleanUpChildTables() {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mgmtAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable) {
|
static void mgmtAddTableIntoStable(SSuperTableObj *pStable, SChildTableObj *pCtable) {
|
||||||
if (pStable->vgLen == 0) {
|
|
||||||
pStable->vgLen = 8;
|
|
||||||
pStable->vgList = calloc(pStable->vgLen, sizeof(int32_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
bool find = false;
|
|
||||||
int32_t pos = 0;
|
|
||||||
for (pos = 0; pos < pStable->vgLen; ++pos) {
|
|
||||||
if (pStable->vgList[pos] == 0) break;
|
|
||||||
if (pStable->vgList[pos] == pCtable->vgId) {
|
|
||||||
find = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!find) {
|
|
||||||
if (pos >= pStable->vgLen) {
|
|
||||||
pStable->vgLen *= 2;
|
|
||||||
pStable->vgList = realloc(pStable->vgList, pStable->vgLen * sizeof(int32_t));
|
|
||||||
}
|
|
||||||
pStable->vgList[pos] = pCtable->vgId;
|
|
||||||
}
|
|
||||||
|
|
||||||
pStable->numOfTables++;
|
pStable->numOfTables++;
|
||||||
|
|
||||||
|
if (pStable->vgHash == NULL) {
|
||||||
|
pStable->vgHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pStable->vgHash != NULL) {
|
||||||
|
taosHashPut(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId), &pCtable->vgId, sizeof(pCtable->vgId));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mgmtRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj *pCtable) {
|
static void mgmtRemoveTableFromStable(SSuperTableObj *pStable, SChildTableObj *pCtable) {
|
||||||
pStable->numOfTables--;
|
pStable->numOfTables--;
|
||||||
|
|
||||||
|
if (pStable->vgHash == NULL) return;
|
||||||
|
|
||||||
|
SVgObj *pVgroup = mgmtGetVgroup(pCtable->vgId);
|
||||||
|
if (pVgroup != NULL) {
|
||||||
|
taosHashRemove(pStable->vgHash, (char *)&pCtable->vgId, sizeof(pCtable->vgId));
|
||||||
|
}
|
||||||
|
mgmtDecVgroupRef(pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mgmtDestroySuperTable(SSuperTableObj *pStable) {
|
static void mgmtDestroySuperTable(SSuperTableObj *pStable) {
|
||||||
|
if (pStable->vgHash != NULL) {
|
||||||
|
taosHashCleanup(pStable->vgHash);
|
||||||
|
pStable->vgHash = NULL;
|
||||||
|
}
|
||||||
tfree(pStable->schema);
|
tfree(pStable->schema);
|
||||||
tfree(pStable->vgList)
|
|
||||||
tfree(pStable);
|
tfree(pStable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -434,7 +431,7 @@ static int32_t mgmtSuperTableActionUpdate(SSdbOper *pOper) {
|
||||||
void *oldSchema = pTable->schema;
|
void *oldSchema = pTable->schema;
|
||||||
memcpy(pTable, pNew, pOper->rowSize);
|
memcpy(pTable, pNew, pOper->rowSize);
|
||||||
pTable->schema = pNew->schema;
|
pTable->schema = pNew->schema;
|
||||||
free(pNew->vgList);
|
free(pNew->vgHash);
|
||||||
free(pNew);
|
free(pNew);
|
||||||
free(oldSchema);
|
free(oldSchema);
|
||||||
}
|
}
|
||||||
|
@ -797,26 +794,26 @@ static void mgmtProcessCreateSuperTableMsg(SQueuedMsg *pMsg) {
|
||||||
static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
|
static void mgmtProcessDropSuperTableMsg(SQueuedMsg *pMsg) {
|
||||||
SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable;
|
SSuperTableObj *pStable = (SSuperTableObj *)pMsg->pTable;
|
||||||
if (pStable->numOfTables != 0) {
|
if (pStable->numOfTables != 0) {
|
||||||
mgmtDropAllChildTablesInStable(pStable);
|
SHashMutableIterator *pIter = taosHashCreateIter(pStable->vgHash);
|
||||||
for (int32_t vg = 0; vg < pStable->vgLen; ++vg) {
|
while (taosHashIterNext(pIter)) {
|
||||||
int32_t vgId = pStable->vgList[vg];
|
int32_t *pVgId = taosHashIterGet(pIter);
|
||||||
if (vgId == 0) break;
|
SVgObj *pVgroup = mgmtGetVgroup(*pVgId);
|
||||||
|
|
||||||
SVgObj *pVgroup = mgmtGetVgroup(vgId);
|
|
||||||
if (pVgroup == NULL) break;
|
if (pVgroup == NULL) break;
|
||||||
|
|
||||||
SMDDropSTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropSTableMsg));
|
SMDDropSTableMsg *pDrop = rpcMallocCont(sizeof(SMDDropSTableMsg));
|
||||||
pDrop->contLen = htonl(sizeof(SMDDropSTableMsg));
|
pDrop->contLen = htonl(sizeof(SMDDropSTableMsg));
|
||||||
pDrop->vgId = htonl(vgId);
|
pDrop->vgId = htonl(pVgroup->vgId);
|
||||||
pDrop->uid = htobe64(pStable->uid);
|
pDrop->uid = htobe64(pStable->uid);
|
||||||
mgmtExtractTableName(pStable->info.tableId, pDrop->tableId);
|
mgmtExtractTableName(pStable->info.tableId, pDrop->tableId);
|
||||||
|
|
||||||
mPrint("stable:%s, send drop stable msg to vgId:%d", pStable->info.tableId, vgId);
|
mPrint("stable:%s, send drop stable msg to vgId:%d", pStable->info.tableId, pVgroup->vgId);
|
||||||
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup);
|
||||||
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
|
SRpcMsg rpcMsg = {.pCont = pDrop, .contLen = sizeof(SMDDropSTableMsg), .msgType = TSDB_MSG_TYPE_MD_DROP_STABLE};
|
||||||
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
|
dnodeSendMsgToDnode(&ipSet, &rpcMsg);
|
||||||
mgmtDecVgroupRef(pVgroup);
|
mgmtDecVgroupRef(pVgroup);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
mgmtDropAllChildTablesInStable(pStable);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbOper oper = {
|
SSdbOper oper = {
|
||||||
|
@ -1244,11 +1241,20 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
|
||||||
SCMSTableVgroupMsg *pInfo = pMsg->pCont;
|
SCMSTableVgroupMsg *pInfo = pMsg->pCont;
|
||||||
int32_t numOfTable = htonl(pInfo->numOfTables);
|
int32_t numOfTable = htonl(pInfo->numOfTables);
|
||||||
|
|
||||||
char* name = (char*) pInfo + sizeof(struct SCMSTableVgroupMsg);
|
|
||||||
SCMSTableVgroupRspMsg *pRsp = NULL;
|
SCMSTableVgroupRspMsg *pRsp = NULL;
|
||||||
|
int32_t contLen = sizeof(SCMSTableVgroupRspMsg);
|
||||||
|
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||||
|
char *stableName = (char*)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN) * i;
|
||||||
|
SSuperTableObj *pTable = mgmtGetSuperTable(stableName);
|
||||||
|
if (pTable != NULL) {
|
||||||
|
stableName = (char*)pTable; //hack way
|
||||||
|
}
|
||||||
|
|
||||||
// todo set the initial size to be 10, fix me
|
if (pTable->vgHash != NULL) {
|
||||||
int32_t contLen = sizeof(SCMSTableVgroupRspMsg) + (sizeof(SCMVgroupInfo) * 10 + sizeof(SVgroupsInfo))*numOfTable;
|
contLen += (taosHashGetSize(pTable->vgHash) * sizeof(SCMVgroupInfo) + sizeof(SVgroupsInfo));
|
||||||
|
}
|
||||||
|
mgmtDecTableRef(pTable);
|
||||||
|
}
|
||||||
|
|
||||||
pRsp = rpcMallocCont(contLen);
|
pRsp = rpcMallocCont(contLen);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
|
@ -1257,45 +1263,39 @@ static void mgmtProcessSuperTableVgroupMsg(SQueuedMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRsp->numOfTables = htonl(numOfTable);
|
pRsp->numOfTables = htonl(numOfTable);
|
||||||
char* msg = (char*) pRsp + sizeof(SCMSTableVgroupRspMsg);
|
char *msg = (char *)pRsp + sizeof(SCMSTableVgroupRspMsg);
|
||||||
|
|
||||||
for(int32_t i = 0; i < numOfTable; ++i) {
|
for (int32_t i = 0; i < numOfTable; ++i) {
|
||||||
SSuperTableObj *pTable = mgmtGetSuperTable(name);
|
SSuperTableObj *pTable = (SSuperTableObj *)((char *)pInfo + sizeof(SCMSTableVgroupMsg) + (TSDB_TABLE_ID_LEN)*i);
|
||||||
|
SVgroupsInfo * pVgroup = (SVgroupsInfo *)msg;
|
||||||
|
|
||||||
pMsg->pTable = (STableObj *)pTable;
|
SHashMutableIterator *pIter = taosHashCreateIter(pTable->vgHash);
|
||||||
if (pMsg->pTable == NULL) {
|
int32_t vgSize = 0;
|
||||||
mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_INVALID_TABLE);
|
while (taosHashIterNext(pIter)) {
|
||||||
return;
|
int32_t *pVgId = taosHashIterGet(pIter);
|
||||||
}
|
SVgObj * pVgItem = mgmtGetVgroup(*pVgId
|
||||||
|
);
|
||||||
|
if (pVgItem == NULL) continue;
|
||||||
|
|
||||||
SVgroupsInfo* pVgroup = (SVgroupsInfo*) msg;
|
pVgroup->vgroups[vgSize].vgId = htonl(pVgItem->vgId);
|
||||||
|
for (int32_t vn = 0; vn < pVgItem->numOfVnodes; ++vn) {
|
||||||
int32_t vg = 0;
|
SDnodeObj *pDnode = pVgItem->vnodeGid[vn].pDnode;
|
||||||
for (; vg < pTable->vgLen; ++vg) {
|
|
||||||
int32_t vgId = pTable->vgList[vg];
|
|
||||||
if (vgId == 0) break;
|
|
||||||
|
|
||||||
SVgObj *vgItem = mgmtGetVgroup(vgId);
|
|
||||||
if (vgItem == NULL) break;
|
|
||||||
|
|
||||||
pVgroup->vgroups[vg].vgId = htonl(vgId);
|
|
||||||
for (int32_t vn = 0; vn < vgItem->numOfVnodes; ++vn) {
|
|
||||||
SDnodeObj *pDnode = vgItem->vnodeGid[vn].pDnode;
|
|
||||||
if (pDnode == NULL) break;
|
if (pDnode == NULL) break;
|
||||||
|
|
||||||
strncpy(pVgroup->vgroups[vg].ipAddr[vn].fqdn, pDnode->dnodeFqdn, tListLen(pDnode->dnodeFqdn));
|
strncpy(pVgroup->vgroups[vgSize].ipAddr[vn].fqdn, pDnode->dnodeFqdn, tListLen(pDnode->dnodeFqdn));
|
||||||
pVgroup->vgroups[vg].ipAddr[vn].port = htons(tsDnodeShellPort);
|
pVgroup->vgroups[vgSize].ipAddr[vn].port = htons(tsDnodeShellPort);
|
||||||
|
|
||||||
pVgroup->vgroups[vg].numOfIps++;
|
pVgroup->vgroups[vgSize].numOfIps++;
|
||||||
}
|
}
|
||||||
|
|
||||||
mgmtDecVgroupRef(vgItem);
|
vgSize++;
|
||||||
|
mgmtDecVgroupRef(pVgItem);
|
||||||
}
|
}
|
||||||
|
|
||||||
pVgroup->numOfVgroups = htonl(vg);
|
pVgroup->numOfVgroups = htonl(vgSize);
|
||||||
|
|
||||||
// one table is done, try the next table
|
// one table is done, try the next table
|
||||||
msg += sizeof(SVgroupsInfo) + vg * sizeof(SCMVgroupInfo);
|
msg += sizeof(SVgroupsInfo) + vgSize * sizeof(SCMVgroupInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {0};
|
SRpcMsg rpcRsp = {0};
|
||||||
|
|
Loading…
Reference in New Issue