[TD-2426]<feature>: first round batch create table
[TD-2425]<feature>
This commit is contained in:
parent
f4f5d77e58
commit
19174a3d8a
|
@ -184,7 +184,19 @@ void dnodeReprocessMWriteMsg(void *pMsg) {
|
||||||
dDebug("msg:%p, app:%p type:%s is redirected for mnode not running, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
|
dDebug("msg:%p, app:%p type:%s is redirected for mnode not running, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
|
||||||
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
|
taosMsg[pWrite->rpcMsg.msgType], pWrite->retry);
|
||||||
|
|
||||||
dnodeSendRedirectMsg(pMsg, true);
|
if (pWrite->pBatchMasterMsg) {
|
||||||
|
++pWrite->pBatchMasterMsg->received;
|
||||||
|
if (pWrite->pBatchMasterMsg->successed + pWrite->pBatchMasterMsg->received
|
||||||
|
>= pWrite->pBatchMasterMsg->expected) {
|
||||||
|
dnodeSendRedirectMsg(&pWrite->rpcMsg, true);
|
||||||
|
dnodeFreeMWriteMsg(pWrite);
|
||||||
|
}
|
||||||
|
|
||||||
|
mnodeDestroySubMsg(pWrite);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
dnodeSendRedirectMsg(&pWrite->rpcMsg, true);
|
||||||
dnodeFreeMWriteMsg(pWrite);
|
dnodeFreeMWriteMsg(pWrite);
|
||||||
} else {
|
} else {
|
||||||
dDebug("msg:%p, app:%p type:%s is reput into mwrite queue:%p, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
|
dDebug("msg:%p, app:%p type:%s is reput into mwrite queue:%p, retry times:%d", pWrite, pWrite->rpcMsg.ahandle,
|
||||||
|
|
|
@ -42,11 +42,12 @@ typedef struct SMnodeMsg {
|
||||||
struct SVgObj * pVgroup;
|
struct SVgObj * pVgroup;
|
||||||
struct STableObj *pTable;
|
struct STableObj *pTable;
|
||||||
struct SSTableObj*pSTable;
|
struct SSTableObj*pSTable;
|
||||||
|
struct SMnodeMsg *pBatchMasterMsg;
|
||||||
SMnodeRsp rpcRsp;
|
SMnodeRsp rpcRsp;
|
||||||
int8_t received;
|
int16_t received;
|
||||||
int8_t successed;
|
int16_t successed;
|
||||||
int8_t expected;
|
int16_t expected;
|
||||||
int8_t retry;
|
int16_t retry;
|
||||||
int32_t incomingTs;
|
int32_t incomingTs;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
void * pObj;
|
void * pObj;
|
||||||
|
@ -57,6 +58,7 @@ typedef struct SMnodeMsg {
|
||||||
void * mnodeCreateMsg(SRpcMsg *pRpcMsg);
|
void * mnodeCreateMsg(SRpcMsg *pRpcMsg);
|
||||||
int32_t mnodeInitMsg(SMnodeMsg *pMsg);
|
int32_t mnodeInitMsg(SMnodeMsg *pMsg);
|
||||||
void mnodeCleanupMsg(SMnodeMsg *pMsg);
|
void mnodeCleanupMsg(SMnodeMsg *pMsg);
|
||||||
|
void mnodeDestroySubMsg(SMnodeMsg *pSubMsg);
|
||||||
|
|
||||||
int32_t mnodeInitSystem();
|
int32_t mnodeInitSystem();
|
||||||
int32_t mnodeStartSystem();
|
int32_t mnodeStartSystem();
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tdataformat.h"
|
#include "tdataformat.h"
|
||||||
#include "tgrant.h"
|
#include "tgrant.h"
|
||||||
|
#include "tqueue.h"
|
||||||
#include "hash.h"
|
#include "hash.h"
|
||||||
#include "mnode.h"
|
#include "mnode.h"
|
||||||
#include "dnode.h"
|
#include "dnode.h"
|
||||||
|
@ -720,6 +721,133 @@ static void mnodeExtractTableName(char* tableId, char* name) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SMnodeMsg *mnodeCreateSubMsg(SMnodeMsg *pBatchMasterMsg, int32_t contSize) {
|
||||||
|
SMnodeMsg *pSubMsg = taosAllocateQitem(sizeof(*pBatchMasterMsg) + contSize);
|
||||||
|
*pSubMsg = *pBatchMasterMsg;
|
||||||
|
|
||||||
|
//pSubMsg->pCont = (char *) pSubMsg + sizeof(SMnodeMsg);
|
||||||
|
pSubMsg->rpcMsg.pCont = pSubMsg->pCont;
|
||||||
|
pSubMsg->successed = 0;
|
||||||
|
pSubMsg->expected = 0;
|
||||||
|
SCMCreateTableMsg *pCM = pSubMsg->rpcMsg.pCont;
|
||||||
|
pCM->numOfTables = htonl(1);
|
||||||
|
pCM->contLen = htonl(contSize);
|
||||||
|
|
||||||
|
return pSubMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void mnodeDestroySubMsg(SMnodeMsg *pSubMsg) {
|
||||||
|
if (pSubMsg) {
|
||||||
|
// pUser is retained in batch master msg
|
||||||
|
if (pSubMsg->pDb) mnodeDecDbRef(pSubMsg->pDb);
|
||||||
|
if (pSubMsg->pVgroup) mnodeDecVgroupRef(pSubMsg->pVgroup);
|
||||||
|
if (pSubMsg->pTable) mnodeDecTableRef(pSubMsg->pTable);
|
||||||
|
if (pSubMsg->pSTable) mnodeDecTableRef(pSubMsg->pSTable);
|
||||||
|
if (pSubMsg->pAcct) mnodeDecAcctRef(pSubMsg->pAcct);
|
||||||
|
if (pSubMsg->pDnode) mnodeDecDnodeRef(pSubMsg->pDnode);
|
||||||
|
|
||||||
|
taosFreeQitem(pSubMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mnodeValidateCreateTableMsg(SCreateTableMsg *pCreateTable, SMnodeMsg *pMsg) {
|
||||||
|
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(pCreateTable->db);
|
||||||
|
if (pMsg->pDb == NULL) {
|
||||||
|
mError("msg:%p, app:%p table:%s, failed to create, db not selected", pMsg, pMsg->rpcMsg.ahandle, pCreateTable->tableId);
|
||||||
|
return TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->pDb->status != TSDB_DB_STATUS_READY) {
|
||||||
|
mError("db:%s, status:%d, in dropping", pMsg->pDb->name, pMsg->pDb->status);
|
||||||
|
return TSDB_CODE_MND_DB_IN_DROPPING;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->pTable == NULL) pMsg->pTable = mnodeGetTable(pCreateTable->tableId);
|
||||||
|
if (pMsg->pTable != NULL && pMsg->retry == 0) {
|
||||||
|
if (pCreateTable->getMeta) {
|
||||||
|
mDebug("msg:%p, app:%p table:%s, continue to get meta", pMsg, pMsg->rpcMsg.ahandle, pCreateTable->tableId);
|
||||||
|
return mnodeGetChildTableMeta(pMsg);
|
||||||
|
} else if (pCreateTable->igExists) {
|
||||||
|
mDebug("msg:%p, app:%p table:%s, is already exist", pMsg, pMsg->rpcMsg.ahandle, pCreateTable->tableId);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
} else {
|
||||||
|
mError("msg:%p, app:%p table:%s, failed to create, table already exist", pMsg, pMsg->rpcMsg.ahandle,
|
||||||
|
pCreateTable->tableId);
|
||||||
|
return TSDB_CODE_MND_TABLE_ALREADY_EXIST;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pCreateTable->numOfTags != 0) {
|
||||||
|
mDebug("msg:%p, app:%p table:%s, create stable msg is received from thandle:%p", pMsg, pMsg->rpcMsg.ahandle,
|
||||||
|
pCreateTable->tableId, pMsg->rpcMsg.handle);
|
||||||
|
return mnodeProcessCreateSuperTableMsg(pMsg);
|
||||||
|
} else {
|
||||||
|
mDebug("msg:%p, app:%p table:%s, create ctable msg is received from thandle:%p", pMsg, pMsg->rpcMsg.ahandle,
|
||||||
|
pCreateTable->tableId, pMsg->rpcMsg.handle);
|
||||||
|
return mnodeProcessCreateChildTableMsg(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t mnodeProcessBatchCreateTableMsg(SMnodeMsg *pMsg) {
|
||||||
|
if (pMsg->pBatchMasterMsg == NULL) { // batch master first round
|
||||||
|
pMsg->pBatchMasterMsg = pMsg;
|
||||||
|
|
||||||
|
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||||
|
int32_t numOfTables = htonl(pCreate->numOfTables);
|
||||||
|
int32_t contentLen = htonl(pCreate->contLen);
|
||||||
|
pMsg->expected = numOfTables;
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SCreateTableMsg *pCreateTable = (SCreateTableMsg*) ((char*) pCreate + sizeof(SCMCreateTableMsg));
|
||||||
|
for (SCreateTableMsg *p = pCreateTable; p < (SCreateTableMsg *) ((char *) pCreate + contentLen); p = (SCreateTableMsg *) ((char *) p + htonl(p->len))) {
|
||||||
|
SMnodeMsg *pSubMsg = mnodeCreateSubMsg(pMsg, sizeof(SCMCreateTableMsg) + htonl(p->len));
|
||||||
|
memcpy(pSubMsg->pCont + sizeof(SCMCreateTableMsg), p, htonl(p->len));
|
||||||
|
code = mnodeValidateCreateTableMsg(p, pSubMsg);
|
||||||
|
|
||||||
|
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
|
||||||
|
++pSubMsg->pBatchMasterMsg->successed;
|
||||||
|
mnodeDestroySubMsg(pSubMsg);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||||
|
mnodeDestroySubMsg(pSubMsg);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->successed >= pMsg->expected) {
|
||||||
|
return code;
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pMsg->pBatchMasterMsg != pMsg) { // batch sub replay
|
||||||
|
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||||
|
SCreateTableMsg *pCreateTable = (SCreateTableMsg*) ((char*) pCreate + sizeof(SCMCreateTableMsg));
|
||||||
|
int32_t code = mnodeValidateCreateTableMsg(pCreateTable, pMsg);
|
||||||
|
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_MND_TABLE_ALREADY_EXIST) {
|
||||||
|
++pMsg->pBatchMasterMsg->successed;
|
||||||
|
mnodeDestroySubMsg(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||||
|
mnodeDestroySubMsg(pMsg);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
|
||||||
|
>= pMsg->pBatchMasterMsg->expected) {
|
||||||
|
return code;
|
||||||
|
} else {
|
||||||
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
|
}
|
||||||
|
} else { // batch master replay, reprocess the whole batch
|
||||||
|
assert(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
|
static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
|
||||||
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
|
SCMCreateTableMsg *pCreate = pMsg->rpcMsg.pCont;
|
||||||
|
|
||||||
|
@ -729,6 +857,11 @@ static int32_t mnodeProcessCreateTableMsg(SMnodeMsg *pMsg) {
|
||||||
// todo return error
|
// todo return error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// batch master msg first round or reprocessing and batch sub msg reprocessing
|
||||||
|
if (numOfTables > 1 || pMsg->pBatchMasterMsg != NULL) {
|
||||||
|
return mnodeProcessBatchCreateTableMsg(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
SCreateTableMsg *p = (SCreateTableMsg*)((char*) pCreate + sizeof(SCMCreateTableMsg));
|
SCreateTableMsg *p = (SCreateTableMsg*)((char*) pCreate + sizeof(SCMCreateTableMsg));
|
||||||
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(p->db);
|
if (pMsg->pDb == NULL) pMsg->pDb = mnodeGetDb(p->db);
|
||||||
if (pMsg->pDb == NULL) {
|
if (pMsg->pDb == NULL) {
|
||||||
|
@ -1737,6 +1870,18 @@ static int32_t mnodeDoCreateChildTableCb(SMnodeMsg *pMsg, int32_t code) {
|
||||||
mDebug("msg:%p, app:%p table:%s, created in dnode, thandle:%p", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
|
mDebug("msg:%p, app:%p table:%s, created in dnode, thandle:%p", pMsg, pMsg->rpcMsg.ahandle, pTable->info.tableId,
|
||||||
pMsg->rpcMsg.handle);
|
pMsg->rpcMsg.handle);
|
||||||
|
|
||||||
|
if (pMsg->pBatchMasterMsg) {
|
||||||
|
++pMsg->pBatchMasterMsg->successed;
|
||||||
|
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
|
||||||
|
>= pMsg->pBatchMasterMsg->expected) {
|
||||||
|
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
mnodeDestroySubMsg(pMsg);
|
||||||
|
|
||||||
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
|
}
|
||||||
|
|
||||||
dnodeSendRpcMWriteRsp(pMsg, TSDB_CODE_SUCCESS);
|
dnodeSendRpcMWriteRsp(pMsg, TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
return TSDB_CODE_MND_ACTION_IN_PROGRESS;
|
||||||
|
@ -2477,6 +2622,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
||||||
|
|
||||||
mnodeSendDropChildTableMsg(pMsg, false);
|
mnodeSendDropChildTableMsg(pMsg, false);
|
||||||
rpcMsg->code = TSDB_CODE_SUCCESS;
|
rpcMsg->code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
if (pMsg->pBatchMasterMsg) {
|
||||||
|
++pMsg->pBatchMasterMsg->successed;
|
||||||
|
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
|
||||||
|
>= pMsg->pBatchMasterMsg->expected) {
|
||||||
|
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, rpcMsg->code);
|
||||||
|
}
|
||||||
|
|
||||||
|
mnodeDestroySubMsg(pMsg);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code);
|
dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -2494,6 +2652,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
||||||
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
if (code != TSDB_CODE_SUCCESS && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
|
||||||
pMsg->pTable = NULL;
|
pMsg->pTable = NULL;
|
||||||
mnodeDestroyChildTable(pTable);
|
mnodeDestroyChildTable(pTable);
|
||||||
|
|
||||||
|
if (pMsg->pBatchMasterMsg) {
|
||||||
|
++pMsg->pBatchMasterMsg->received;
|
||||||
|
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
|
||||||
|
>= pMsg->pBatchMasterMsg->expected) {
|
||||||
|
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
mnodeDestroySubMsg(pMsg);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
dnodeSendRpcMWriteRsp(pMsg, code);
|
dnodeSendRpcMWriteRsp(pMsg, code);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -2519,6 +2690,19 @@ static void mnodeProcessCreateChildTableRsp(SRpcMsg *rpcMsg) {
|
||||||
//Avoid retry again in client
|
//Avoid retry again in client
|
||||||
rpcMsg->code = TSDB_CODE_MND_VGROUP_NOT_READY;
|
rpcMsg->code = TSDB_CODE_MND_VGROUP_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pMsg->pBatchMasterMsg) {
|
||||||
|
++pMsg->pBatchMasterMsg->received;
|
||||||
|
if (pMsg->pBatchMasterMsg->successed + pMsg->pBatchMasterMsg->received
|
||||||
|
>= pMsg->pBatchMasterMsg->expected) {
|
||||||
|
dnodeSendRpcMWriteRsp(pMsg->pBatchMasterMsg, rpcMsg->code);
|
||||||
|
}
|
||||||
|
|
||||||
|
mnodeDestroySubMsg(pMsg);
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code);
|
dnodeSendRpcMWriteRsp(pMsg, rpcMsg->code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue