Merge pull request #1491 from taosdata/refactor/cluster
Refactor/cluster
This commit is contained in:
commit
1d74aacdd4
|
@ -284,10 +284,10 @@ static void dnodeDoCleanupVnode(SVnodeObj *pVnode, ECloseTsdbFlag closeFlag) {
|
||||||
if (pVnode->tsdb) {
|
if (pVnode->tsdb) {
|
||||||
if (closeFlag == DROP_TSDB) {
|
if (closeFlag == DROP_TSDB) {
|
||||||
tsdbDropRepo(pVnode->tsdb);
|
tsdbDropRepo(pVnode->tsdb);
|
||||||
|
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
||||||
} else if (closeFlag == CLOSE_TSDB) {
|
} else if (closeFlag == CLOSE_TSDB) {
|
||||||
tsdbCloseRepo(pVnode->tsdb);
|
tsdbCloseRepo(pVnode->tsdb);
|
||||||
}
|
}
|
||||||
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
|
|
||||||
pVnode->tsdb = NULL;
|
pVnode->tsdb = NULL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -270,7 +270,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg) {
|
static void dnodeProcessSubmitMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||||
dTrace("submit msg is disposed");
|
dTrace("pVnode:%p, submit msg is disposed", pVnode);
|
||||||
|
|
||||||
SShellSubmitRspMsg *pRsp = rpcMallocCont(sizeof(SShellSubmitRspMsg));
|
SShellSubmitRspMsg *pRsp = rpcMallocCont(sizeof(SShellSubmitRspMsg));
|
||||||
pRsp->code = 0;
|
pRsp->code = 0;
|
||||||
|
@ -298,7 +298,7 @@ static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||||
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
|
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||||
|
|
||||||
dTrace("table:%s, start to create in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
|
dTrace("pVnode:%p, table:%s, start to create in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId);
|
||||||
pTable->numOfColumns = htons(pTable->numOfColumns);
|
pTable->numOfColumns = htons(pTable->numOfColumns);
|
||||||
pTable->numOfTags = htons(pTable->numOfTags);
|
pTable->numOfTags = htons(pTable->numOfTags);
|
||||||
pTable->sid = htonl(pTable->sid);
|
pTable->sid = htonl(pTable->sid);
|
||||||
|
@ -344,11 +344,9 @@ static void dnodeProcessCreateTableMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
||||||
|
|
||||||
rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg);
|
rpcRsp.code = tsdbCreateTable(pTsdb, &tCfg);
|
||||||
dnodeReleaseVnode(pVnode);
|
|
||||||
|
|
||||||
dTrace("table:%s, create table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
|
dTrace("pVnode:%p, table:%s, create table result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code));
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -356,18 +354,16 @@ static void dnodeProcessDropTableMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||||
SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont;
|
SMDDropTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||||
|
|
||||||
dTrace("table:%s, start to drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
|
dTrace("pVnode:%p, table:%s, start to drop in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId);
|
||||||
STableId tableId = {
|
STableId tableId = {
|
||||||
.uid = htobe64(pTable->uid),
|
.uid = htobe64(pTable->uid),
|
||||||
.tid = htonl(pTable->sid)
|
.tid = htonl(pTable->sid)
|
||||||
};
|
};
|
||||||
|
|
||||||
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
||||||
|
|
||||||
rpcRsp.code = tsdbDropTable(pTsdb, tableId);
|
rpcRsp.code = tsdbDropTable(pTsdb, tableId);
|
||||||
dnodeReleaseVnode(pVnode);
|
|
||||||
|
|
||||||
dTrace("table:%s, drop table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
|
dTrace("pVnode:%p, table:%s, drop table result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code));
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -375,7 +371,7 @@ static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||||
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
|
SMDCreateTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||||
|
|
||||||
dTrace("table:%s, start to alter in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
|
dTrace("pVnode:%p, table:%s, start to alter in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId);
|
||||||
pTable->numOfColumns = htons(pTable->numOfColumns);
|
pTable->numOfColumns = htons(pTable->numOfColumns);
|
||||||
pTable->numOfTags = htons(pTable->numOfTags);
|
pTable->numOfTags = htons(pTable->numOfTags);
|
||||||
pTable->sid = htonl(pTable->sid);
|
pTable->sid = htonl(pTable->sid);
|
||||||
|
@ -421,11 +417,9 @@ static void dnodeProcessAlterTableMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
void *pTsdb = dnodeGetVnodeTsdb(pVnode);
|
||||||
|
|
||||||
rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg);
|
rpcRsp.code = tsdbAlterTable(pTsdb, &tCfg);
|
||||||
dnodeReleaseVnode(pVnode);
|
|
||||||
|
|
||||||
dTrace("table:%s, alter table result:%s", pTable->tableId, tstrerror(rpcRsp.code));
|
dTrace("pVnode:%p, table:%s, alter table result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code));
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -433,7 +427,7 @@ static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||||
SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont;
|
SMDDropSTableMsg *pTable = pMsg->rpcMsg.pCont;
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .pCont = NULL, .contLen = 0, .code = 0, .msgType = 0};
|
||||||
|
|
||||||
dTrace("stable:%s, start to it drop in dnode, vgroup:%d", pTable->tableId, pTable->vgId);
|
dTrace("pVnode:%p, stable:%s, start to it drop in dnode, vgroup:%d", pVnode, pTable->tableId, pTable->vgId);
|
||||||
pTable->uid = htobe64(pTable->uid);
|
pTable->uid = htobe64(pTable->uid);
|
||||||
|
|
||||||
// TODO: drop stable in vvnode
|
// TODO: drop stable in vvnode
|
||||||
|
@ -441,9 +435,8 @@ static void dnodeProcessDropStableMsg(void *pVnode, SWriteMsg *pMsg) {
|
||||||
//rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid);
|
//rpcRsp.code = tsdbDropSTable(pTsdb, pTable->uid);
|
||||||
|
|
||||||
rpcRsp.code = TSDB_CODE_SUCCESS;
|
rpcRsp.code = TSDB_CODE_SUCCESS;
|
||||||
dnodeReleaseVnode(pVnode);
|
|
||||||
|
|
||||||
dTrace("stable:%s, drop stable result:%s", pTable->tableId, tstrerror(rpcRsp.code));
|
dTrace("pVnode:%p, stable:%s, drop stable result:%s", pVnode, pTable->tableId, tstrerror(rpcRsp.code));
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue