[TD-335] fix bug while auto create table
This commit is contained in:
parent
30e867daa6
commit
780e3e1bb9
|
@ -75,7 +75,7 @@ void dnodeCleanupMnodePeer() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
dPrint("dnode mmgmt is closed");
|
dPrint("dnode mpeer is closed");
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t dnodeAllocateMnodePqueue() {
|
int32_t dnodeAllocateMnodePqueue() {
|
||||||
|
@ -93,14 +93,14 @@ int32_t dnodeAllocateMnodePqueue() {
|
||||||
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMnodePeerQueue, pWorker) != 0) {
|
if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessMnodePeerQueue, pWorker) != 0) {
|
||||||
dError("failed to create thread to process mmgmt queue, reason:%s", strerror(errno));
|
dError("failed to create thread to process mpeer queue, reason:%s", strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_attr_destroy(&thAttr);
|
pthread_attr_destroy(&thAttr);
|
||||||
dTrace("dnode mmgmt worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.num);
|
dTrace("dnode mpeer worker:%d is launched, total:%d", pWorker->workerId, tsMPeerPool.num);
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("dnode mmgmt queue:%p is allocated", tsMPeerQueue);
|
dTrace("dnode mpeer queue:%p is allocated", tsMPeerQueue);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,7 +150,7 @@ static void *dnodeProcessMnodePeerQueue(void *param) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
dTrace("%p, msg:%s will be processed in mpeer queue", pPeerMsg->rpcMsg.ahandle, taosMsg[pPeerMsg->rpcMsg.msgType]);
|
dTrace("msg:%s will be processed in mpeer queue", taosMsg[pPeerMsg->rpcMsg.msgType]);
|
||||||
int32_t code = mnodeProcessPeerReq(pPeerMsg);
|
int32_t code = mnodeProcessPeerReq(pPeerMsg);
|
||||||
dnodeSendRpcMnodePeerRsp(pPeerMsg, code);
|
dnodeSendRpcMnodePeerRsp(pPeerMsg, code);
|
||||||
}
|
}
|
||||||
|
|
|
@ -132,6 +132,11 @@ static void dnodeFreeMnodeReadMsg(SMnodeMsg *pRead) {
|
||||||
|
|
||||||
static void dnodeSendRpcMnodeReadRsp(SMnodeMsg *pRead, int32_t code) {
|
static void dnodeSendRpcMnodeReadRsp(SMnodeMsg *pRead, int32_t code) {
|
||||||
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
|
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
|
||||||
|
if (code == TSDB_CODE_ACTION_NEED_REPROCESSED) {
|
||||||
|
// may be a auto create req, should put into write queue
|
||||||
|
dnodeReprocessMnodeWriteMsg(pRead);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {
|
SRpcMsg rpcRsp = {
|
||||||
.handle = pRead->rpcMsg.handle,
|
.handle = pRead->rpcMsg.handle,
|
||||||
|
|
|
@ -130,6 +130,10 @@ static void dnodeFreeMnodeWriteMsg(SMnodeMsg *pWrite) {
|
||||||
void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) {
|
void dnodeSendRpcMnodeWriteRsp(void *pRaw, int32_t code) {
|
||||||
SMnodeMsg *pWrite = pRaw;
|
SMnodeMsg *pWrite = pRaw;
|
||||||
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
|
if (code == TSDB_CODE_ACTION_IN_PROGRESS) return;
|
||||||
|
if (code == TSDB_CODE_ACTION_NEED_REPROCESSED) {
|
||||||
|
dnodeReprocessMnodeWriteMsg(pWrite);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SRpcMsg rpcRsp = {
|
SRpcMsg rpcRsp = {
|
||||||
.handle = pWrite->rpcMsg.handle,
|
.handle = pWrite->rpcMsg.handle,
|
||||||
|
|
|
@ -741,6 +741,7 @@ static int32_t mnodeProcessTableMetaMsg(SMnodeMsg *pMsg) {
|
||||||
mError("table:%s, failed to get table meta, table not exist", pInfo->tableId);
|
mError("table:%s, failed to get table meta, table not exist", pInfo->tableId);
|
||||||
return TSDB_CODE_INVALID_TABLE;
|
return TSDB_CODE_INVALID_TABLE;
|
||||||
} else {
|
} else {
|
||||||
|
mTrace("table:%s, failed to get table meta, start auto create table ", pInfo->tableId);
|
||||||
return mgmtAutoCreateChildTable(pMsg);
|
return mgmtAutoCreateChildTable(pMsg);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -307,7 +307,6 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) {
|
||||||
if (balanceAllocVnodes(pVgroup) != 0) {
|
if (balanceAllocVnodes(pVgroup) != 0) {
|
||||||
mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes);
|
mError("db:%s, no enough dnode to alloc %d vnodes to vgroup", pDb->name, pVgroup->numOfVnodes);
|
||||||
free(pVgroup);
|
free(pVgroup);
|
||||||
mnodeCleanupMsg(pMsg);
|
|
||||||
return TSDB_CODE_NO_ENOUGH_DNODES;
|
return TSDB_CODE_NO_ENOUGH_DNODES;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -321,9 +320,7 @@ int32_t mnodeCreateVgroup(SMnodeMsg *pMsg, SDbObj *pDb) {
|
||||||
int32_t code = sdbInsertRow(&oper);
|
int32_t code = sdbInsertRow(&oper);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
tfree(pVgroup);
|
tfree(pVgroup);
|
||||||
code = TSDB_CODE_SDB_ERROR;
|
return TSDB_CODE_SDB_ERROR;
|
||||||
mnodeCleanupMsg(pMsg);
|
|
||||||
return TSDB_CODE_SDB_ERROR;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
mPrint("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
|
mPrint("vgId:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
|
||||||
|
|
Loading…
Reference in New Issue