diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 903b172068..dec9292209 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -149,7 +149,8 @@ typedef struct _vg_obj { int32_t lbDnodeId; int32_t lbTime; int8_t status; - int8_t reserved[14]; + int8_t inUse; + int8_t reserved[13]; int8_t updateEnd[1]; int32_t refCount; struct _vg_obj *prev, *next; @@ -243,6 +244,8 @@ typedef struct { int8_t received; int8_t successed; int8_t expected; + int8_t retry; + int8_t maxRetry; int32_t contLen; int32_t code; void *ahandle; diff --git a/src/inc/taoserror.h b/src/inc/taoserror.h index 0624af45c3..5cd38f6d6b 100644 --- a/src/inc/taoserror.h +++ b/src/inc/taoserror.h @@ -76,89 +76,89 @@ TAOS_DEFINE_ERROR(TSDB_CODE_NODE_OFFLINE, 0, 28, "node offline") TAOS_DEFINE_ERROR(TSDB_CODE_NETWORK_UNAVAIL, 0, 29, "network unavailable") // db & user -TAOS_DEFINE_ERROR(TSDB_CODE_DB_NOT_SELECTED, 0, 30, "db not selected") -TAOS_DEFINE_ERROR(TSDB_CODE_DB_ALREADY_EXIST, 0, 31, "database aleady exist") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DB, 0, 32, "invalid database") -TAOS_DEFINE_ERROR(TSDB_CODE_MONITOR_DB_FORBIDDEN, 0, 33, "monitor db forbidden") -TAOS_DEFINE_ERROR(TSDB_CODE_USER_ALREADY_EXIST, 0, 34, "user already exist") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_USER, 0, 35, "invalid user") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PASS, 0, 36, "invalid password") +TAOS_DEFINE_ERROR(TSDB_CODE_DB_NOT_SELECTED, 0, 100, "db not selected") +TAOS_DEFINE_ERROR(TSDB_CODE_DB_ALREADY_EXIST, 0, 101, "database aleady exist") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_DB, 0, 102, "invalid database") +TAOS_DEFINE_ERROR(TSDB_CODE_MONITOR_DB_FORBIDDEN, 0, 103, "monitor db forbidden") +TAOS_DEFINE_ERROR(TSDB_CODE_USER_ALREADY_EXIST, 0, 104, "user already exist") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_USER, 0, 105, "invalid user") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PASS, 0, 106, "invalid password") // table -TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ALREADY_EXIST, 0, 41, "table already exist") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_ID, 0, 42, "invalid table id") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 43, "invalid table typee") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE, 0, 44, "invalid table name") -TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUPER_TABLE, 0, 45, "no super table") // operation only available for super table -TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_TABLE, 0, 46, "not active table") -TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 47, "table id mismatch") +TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ALREADY_EXIST, 0, 200, "table already exist") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_ID, 0, 201, "invalid table id") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE_TYPE, 0, 202, "invalid table typee") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TABLE, 0, 203, "invalid table name") +TAOS_DEFINE_ERROR(TSDB_CODE_NOT_SUPER_TABLE, 0, 204, "no super table") // operation only available for super table +TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_TABLE, 0, 205, "not active table") +TAOS_DEFINE_ERROR(TSDB_CODE_TABLE_ID_MISMATCH, 0, 206, "table id mismatch") // dnode & mnode -TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DNODES, 0, 50, "no enough dnodes") -TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ALREADY_EXIST, 0, 51, "dnode already exist") -TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_NOT_EXIST, 0, 52, "dnode not exist") -TAOS_DEFINE_ERROR(TSDB_CODE_NO_MASTER, 0, 53, "no master") -TAOS_DEFINE_ERROR(TSDB_CODE_NO_REMOVE_MASTER, 0, 54, "no remove master") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QUERY_ID, 0, 55, "invalid query id") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_STREAM_ID, 0, 56, "invalid stream id") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONNECTION, 0, 57, "invalid connection") -TAOS_DEFINE_ERROR(TSDB_CODE_SDB_ERROR, 0, 58, "sdb error") +TAOS_DEFINE_ERROR(TSDB_CODE_NO_ENOUGH_DNODES, 0, 300, "no enough dnodes") +TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_ALREADY_EXIST, 0, 301, "dnode already exist") +TAOS_DEFINE_ERROR(TSDB_CODE_DNODE_NOT_EXIST, 0, 302, "dnode not exist") +TAOS_DEFINE_ERROR(TSDB_CODE_NO_MASTER, 0, 303, "no master") +TAOS_DEFINE_ERROR(TSDB_CODE_NO_REMOVE_MASTER, 0, 304, "no remove master") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QUERY_ID, 0, 305, "invalid query id") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_STREAM_ID, 0, 306, "invalid stream id") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CONNECTION, 0, 307, "invalid connection") +TAOS_DEFINE_ERROR(TSDB_CODE_SDB_ERROR, 0, 308, "sdb error") // acct -TAOS_DEFINE_ERROR(TSDB_CODE_ACCT_ALREADY_EXIST, 0, 60, "accounts already exist") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_ACCT, 0, 61, "invalid account") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_ACCT_PARAMETER, 0, 62, "invalid account parameter") -TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_ACCTS, 0, 63, "too many accounts") -TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_USERS, 0, 64, "too many users") -TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TABLES, 0, 65, "too many tables") -TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_DATABASES, 0, 66, "too many databases") -TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TIME_SERIES, 0, 67, "not enough time series") +TAOS_DEFINE_ERROR(TSDB_CODE_ACCT_ALREADY_EXIST, 0, 400, "accounts already exist") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_ACCT, 0, 401, "invalid account") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_ACCT_PARAMETER, 0, 402, "invalid account parameter") +TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_ACCTS, 0, 403, "too many accounts") +TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_USERS, 0, 404, "too many users") +TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TABLES, 0, 405, "too many tables") +TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_DATABASES, 0, 406, "too many databases") +TAOS_DEFINE_ERROR(TSDB_CODE_TOO_MANY_TIME_SERIES, 0, 407, "not enough time series") // grant -TAOS_DEFINE_ERROR(TSDB_CODE_AUTH_FAILURE, 0, 70, "auth failure") -TAOS_DEFINE_ERROR(TSDB_CODE_NO_RIGHTS, 0, 71, "no rights") -TAOS_DEFINE_ERROR(TSDB_CODE_NO_WRITE_ACCESS, 0, 72, "no write access") -TAOS_DEFINE_ERROR(TSDB_CODE_NO_READ_ACCESS, 0, 73, "no read access") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 74, "grant expired") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DNODE_LIMITED, 0, 75, "grant dnode limited") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_ACCT_LIMITED, 0, 76, "grant account limited") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_TIMESERIES_LIMITED, 0, 77, "grant timeseries limited") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DB_LIMITED, 0, 78, "grant db limited") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_USER_LIMITED, 0, 79, "grant user limited") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CONN_LIMITED, 0, 80, "grant conn limited") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STREAM_LIMITED, 0, 81, "grant stream limited") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_SPEED_LIMITED, 0, 82, "grant speed limited") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STORAGE_LIMITED, 0, 83, "grant storage limited") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_QUERYTIME_LIMITED, 0, 84, "grant query time limited") -TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 85, "grant cpu limited") +TAOS_DEFINE_ERROR(TSDB_CODE_AUTH_FAILURE, 0, 400, "auth failure") +TAOS_DEFINE_ERROR(TSDB_CODE_NO_RIGHTS, 0, 401, "no rights") +TAOS_DEFINE_ERROR(TSDB_CODE_NO_WRITE_ACCESS, 0, 402, "no write access") +TAOS_DEFINE_ERROR(TSDB_CODE_NO_READ_ACCESS, 0, 403, "no read access") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_EXPIRED, 0, 404, "grant expired") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DNODE_LIMITED, 0, 405, "grant dnode limited") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_ACCT_LIMITED, 0, 406, "grant account limited") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_TIMESERIES_LIMITED, 0, 407, "grant timeseries limited") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_DB_LIMITED, 0, 408, "grant db limited") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_USER_LIMITED, 0, 409, "grant user limited") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CONN_LIMITED, 0, 410, "grant conn limited") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STREAM_LIMITED, 0, 411, "grant stream limited") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_SPEED_LIMITED, 0, 412, "grant speed limited") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_STORAGE_LIMITED, 0, 413, "grant storage limited") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_QUERYTIME_LIMITED, 0, 414, "grant query time limited") +TAOS_DEFINE_ERROR(TSDB_CODE_GRANT_CPU_LIMITED, 0, 415, "grant cpu limited") // server -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VGROUP_ID, 0, 90, "invalid vgroup id") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_ID, 0, 91, "invalid vnode id") -TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_VNODE, 0, 92, "not active vnode") -TAOS_DEFINE_ERROR(TSDB_CODE_VG_INIT_FAILED, 0, 93, "vg init failed") -TAOS_DEFINE_ERROR(TSDB_CODE_SERV_NO_DISKSPACE, 0, 94, "server no diskspace") -TAOS_DEFINE_ERROR(TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 95, "server out of memory") -TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISK_PERMISSIONS, 0, 96, "no disk permissions") -TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, 0, 97, "file corrupted") -TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, 0, 98, "memory corrupted") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VGROUP_ID, 0, 500, "invalid vgroup id") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_ID, 0, 501, "invalid vnode id") +TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_VNODE, 0, 502, "not active vnode") +TAOS_DEFINE_ERROR(TSDB_CODE_VG_INIT_FAILED, 0, 503, "vg init failed") +TAOS_DEFINE_ERROR(TSDB_CODE_SERV_NO_DISKSPACE, 0, 504, "server no diskspace") +TAOS_DEFINE_ERROR(TSDB_CODE_SERV_OUT_OF_MEMORY, 0, 505, "server out of memory") +TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISK_PERMISSIONS, 0, 506, "no disk permissions") +TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, 0, 507, "file corrupted") +TAOS_DEFINE_ERROR(TSDB_CODE_MEMORY_CORRUPTED, 0, 508, "memory corrupted") // client -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CLIENT_VERSION, 0, 101, "invalid client version") -TAOS_DEFINE_ERROR(TSDB_CODE_CLI_OUT_OF_MEMORY, 0, 102, "client out of memory") -TAOS_DEFINE_ERROR(TSDB_CODE_CLI_NO_DISKSPACE, 0, 103, "client no disk space") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIME_STAMP, 0, 104, "invalid timestamp") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SQL, 0, 105, "invalid sql") -TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 106, "query cache erased") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QUERY_MSG, 0, 107, "invalid query message") // failed to validate the sql expression msg by vnode -TAOS_DEFINE_ERROR(TSDB_CODE_SORTED_RES_TOO_MANY, 0, 108, "sorted res too many") // too many result for ordered super table projection query -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QHANDLE, 0, 109, "invalid handle") -TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CANCELLED, 0, 110, "query cancelled") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 111, "invalid ie") -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 112, "invalid value") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_CLIENT_VERSION, 0, 601, "invalid client version") +TAOS_DEFINE_ERROR(TSDB_CODE_CLI_OUT_OF_MEMORY, 0, 602, "client out of memory") +TAOS_DEFINE_ERROR(TSDB_CODE_CLI_NO_DISKSPACE, 0, 603, "client no disk space") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_TIME_STAMP, 0, 604, "invalid timestamp") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SQL, 0, 605, "invalid sql") +TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CACHE_ERASED, 0, 606, "query cache erased") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QUERY_MSG, 0, 607, "invalid query message") // failed to validate the sql expression msg by vnode +TAOS_DEFINE_ERROR(TSDB_CODE_SORTED_RES_TOO_MANY, 0, 608, "sorted res too many") // too many result for ordered super table projection query +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_QHANDLE, 0, 609, "invalid handle") +TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CANCELLED, 0, 610, "query cancelled") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 611, "invalid ie") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 612, "invalid value") // others -TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 120, "invalid file format") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 700, "invalid file format") #ifdef TAOS_ERROR_C diff --git a/src/inc/taosmsg.h b/src/inc/taosmsg.h index 5f07d6ec99..f5168a2c9e 100644 --- a/src/inc/taosmsg.h +++ b/src/inc/taosmsg.h @@ -510,7 +510,6 @@ typedef struct SRetrieveTableRsp { typedef struct { int32_t vgId; - int32_t vnode; int64_t totalStorage; int64_t compStorage; int64_t pointsWritten; diff --git a/src/mnode/inc/mgmtShell.h b/src/mnode/inc/mgmtShell.h index b92e9de1f4..171c93a390 100644 --- a/src/mnode/inc/mgmtShell.h +++ b/src/mnode/inc/mgmtShell.h @@ -31,6 +31,7 @@ void mgmtAddShellShowMetaHandle(uint8_t showType, SShowMetaFp fp); void mgmtAddShellShowRetrieveHandle(uint8_t showType, SShowRetrieveFp fp); void mgmtAddToShellQueue(SQueuedMsg *queuedMsg); +void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg); void mgmtSendSimpleResp(void *thandle, int32_t code); #ifdef __cplusplus diff --git a/src/mnode/inc/mgmtVgroup.h b/src/mnode/inc/mgmtVgroup.h index 83e003e063..072c616f3d 100644 --- a/src/mnode/inc/mgmtVgroup.h +++ b/src/mnode/inc/mgmtVgroup.h @@ -37,6 +37,7 @@ void mgmtDropAllVgroups(SDbObj *pDropDb); void * mgmtGetNextVgroup(void *pNode, SVgObj **pVgroup); void mgmtUpdateVgroup(SVgObj *pVgroup); +void mgmtUpdateVgroupStatus(SVgObj *pVgroup, int32_t dnodeId, SVnodeLoad *pVload); void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb); void mgmtDropVgroup(SVgObj *pVgroup, void *ahandle); diff --git a/src/mnode/src/mgmtDnode.c b/src/mnode/src/mgmtDnode.c index e4ab114090..2f4abc5cfe 100644 --- a/src/mnode/src/mgmtDnode.c +++ b/src/mnode/src/mgmtDnode.c @@ -189,18 +189,21 @@ void clusterProcessDnodeStatusMsg(SRpcMsg *rpcMsg) { int32_t openVnodes = htons(pStatus->openVnodes); for (int32_t j = 0; j < openVnodes; ++j) { - pDnode->vload[j].vgId = htonl(pStatus->load[j].vgId); - pDnode->vload[j].totalStorage = htobe64(pStatus->load[j].totalStorage); - pDnode->vload[j].compStorage = htobe64(pStatus->load[j].compStorage); - pDnode->vload[j].pointsWritten = htobe64(pStatus->load[j].pointsWritten); + SVnodeLoad *pVload = &pStatus->load[j]; + pDnode->vload[j].vgId = htonl(pVload->vgId); + pDnode->vload[j].totalStorage = htobe64(pVload->totalStorage); + pDnode->vload[j].compStorage = htobe64(pVload->compStorage); + pDnode->vload[j].pointsWritten = htobe64(pVload->pointsWritten); SVgObj *pVgroup = mgmtGetVgroup(pDnode->vload[j].vgId); if (pVgroup == NULL) { SRpcIpSet ipSet = mgmtGetIpSetFromIp(pDnode->privateIp); mPrint("dnode:%d, vgroup:%d not exist in mnode, drop it", pDnode->dnodeId, pDnode->vload[j].vgId); mgmtSendDropVnodeMsg(pDnode->vload[j].vgId, &ipSet, NULL); + } else { + mgmtUpdateVgroupStatus(pVgroup, pDnode->dnodeId, pVload); + mgmtReleaseVgroup(pVgroup); } - mgmtReleaseVgroup(pVgroup); } if (pDnode->status == TAOS_DN_STATUS_OFFLINE) { diff --git a/src/mnode/src/mgmtProfile.c b/src/mnode/src/mgmtProfile.c index 0360432971..2b22fae47a 100644 --- a/src/mnode/src/mgmtProfile.c +++ b/src/mnode/src/mgmtProfile.c @@ -806,6 +806,8 @@ void* mgmtCloneQueuedMsg(SQueuedMsg *pSrcMsg) { pDestMsg->msgType = pSrcMsg->msgType; pDestMsg->pCont = pSrcMsg->pCont; pDestMsg->contLen = pSrcMsg->contLen; + pDestMsg->retry = pSrcMsg->retry; + pDestMsg->maxRetry= pSrcMsg->maxRetry; pDestMsg->pUser = pSrcMsg->pUser; pDestMsg->usePublicIp = pSrcMsg->usePublicIp; diff --git a/src/mnode/src/mgmtShell.c b/src/mnode/src/mgmtShell.c index 7b6a2654ae..880c6d0c10 100644 --- a/src/mnode/src/mgmtShell.c +++ b/src/mnode/src/mgmtShell.c @@ -128,6 +128,15 @@ void mgmtAddToShellQueue(SQueuedMsg *queuedMsg) { taosScheduleTask(tsMgmtTranQhandle, &schedMsg); } +static void mgmtDoDealyedAddToShellQueue(void *param, void *tmrId) { + mgmtAddToShellQueue(param); +} + +void mgmtDealyedAddToShellQueue(SQueuedMsg *queuedMsg) { + void *unUsed = NULL; + taosTmrReset(mgmtDoDealyedAddToShellQueue, 1000, queuedMsg, tsMgmtTmr, &unUsed); +} + static void mgmtProcessMsgFromShell(SRpcMsg *rpcMsg) { if (rpcMsg == NULL || rpcMsg->pCont == NULL) { return; diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index a14bdd058f..b536bb5ac9 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -540,7 +540,7 @@ static void mgmtProcessCreateTableMsg(SQueuedMsg *pMsg) { SCMCreateTableMsg *pCreate = pMsg->pCont; pMsg->pTable = mgmtGetTable(pCreate->tableId); - if (pMsg->pTable != NULL) { + if (pMsg->pTable != NULL && pMsg->retry == 0) { if (pCreate->igExists) { mTrace("table:%s, is already exist", pCreate->tableId); mgmtSendSimpleResp(pMsg->thandle, TSDB_CODE_SUCCESS); @@ -1300,7 +1300,11 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { return; } - pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid); + if (pMsg->retry == 0) { + pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid); + } else { + pMsg->pTable = mgmtGetTable(pCreate->tableId); + } if (pMsg->pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; @@ -1315,6 +1319,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { SRpcIpSet ipSet = mgmtGetIpSetFromVgroup(pVgroup); SQueuedMsg *newMsg = mgmtCloneQueuedMsg(pMsg); newMsg->ahandle = pMsg->pTable; + newMsg->maxRetry = 5; mgmtIncTableRef(pMsg->pTable); SRpcMsg rpcMsg = { .handle = newMsg, @@ -1737,30 +1742,40 @@ static void mgmtProcessCreateChildTableRsp(SRpcMsg *rpcMsg) { queueMsg->received++; SChildTableObj *pTable = queueMsg->ahandle; - mTrace("table:%s, create table rsp received, thandle:%p ahandle:%p result:%s", pTable->info.tableId, queueMsg->thandle, - rpcMsg->handle, tstrerror(rpcMsg->code)); + mTrace("table:%s, create table rsp received, thandle:%p result:%s", pTable->info.tableId, queueMsg->thandle, + tstrerror(rpcMsg->code)); if (rpcMsg->code != TSDB_CODE_SUCCESS) { - SSdbOperDesc oper = { - .type = SDB_OPER_TYPE_GLOBAL, - .table = tsChildTableSdb, - .pObj = pTable - }; - sdbDeleteRow(&oper); + if (queueMsg->retry++ < queueMsg->maxRetry) { + mTrace("table:%s, create table rsp received, retry:%d thandle:%p result:%s", pTable->info.tableId, + queueMsg->retry, queueMsg->thandle, tstrerror(rpcMsg->code)); + mgmtDealyedAddToShellQueue(queueMsg); + } else { + mError("table:%s, failed to create in dnode, thandle:%p result:%s", pTable->info.tableId, + queueMsg->thandle, tstrerror(rpcMsg->code)); + + SSdbOperDesc oper = { + .type = SDB_OPER_TYPE_GLOBAL, + .table = tsChildTableSdb, + .pObj = pTable + }; + sdbDeleteRow(&oper); - mError("table:%s, failed to create in dnode, reason:%s", pTable->info.tableId, tstrerror(rpcMsg->code)); - mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + mgmtFreeQueuedMsg(queueMsg); + } } else { - mTrace("table:%s, created in dnode", pTable->info.tableId); + mTrace("table:%s, created in dnode, thandle:%p result:%s", pTable->info.tableId, queueMsg->thandle, + tstrerror(rpcMsg->code)); + if (queueMsg->msgType != TSDB_MSG_TYPE_CM_CREATE_TABLE) { mTrace("table:%s, start to get meta", pTable->info.tableId); - mgmtAddToShellQueue(mgmtCloneQueuedMsg(queueMsg)); + mgmtAddToShellQueue(queueMsg); } else { mgmtSendSimpleResp(queueMsg->thandle, rpcMsg->code); + mgmtFreeQueuedMsg(queueMsg); } } - - mgmtFreeQueuedMsg(queueMsg); } // not implemented yet diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index 4c969124a0..6b27fbbc83 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -18,6 +18,7 @@ #include "taoserror.h" #include "tlog.h" #include "tbalance.h" +#include "tsync.h" #include "tcluster.h" #include "mnode.h" #include "mgmtDb.h" @@ -209,6 +210,18 @@ void mgmtUpdateVgroup(SVgObj *pVgroup) { mgmtSendCreateVgroupMsg(pVgroup, NULL); } +void mgmtUpdateVgroupStatus(SVgObj *pVgroup, int32_t dnodeId, SVnodeLoad *pVload) { + if (pVload->role == TAOS_SYNC_ROLE_MASTER) { + for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) { + SVnodeGid *pVgid = &pVgroup->vnodeGid[i]; + if (pVgid->dnodeId == dnodeId) { + pVgroup->inUse = i; + break; + } + } + } +} + SVgObj *mgmtGetAvailableVgroup(SDbObj *pDb) { return pDb->pHead; } diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 7ee468fa54..e3433d6296 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -287,7 +287,6 @@ static void vnodeBuildVloadMsg(char *pNode, void * param) { SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++]; pLoad->vgId = htonl(pVnode->vgId); - pLoad->vnode = htonl(pVnode->vgId); pLoad->status = pVnode->status; pLoad->role = pVnode->role; } diff --git a/src/vnode/main/src/vnodeWrite.c b/src/vnode/main/src/vnodeWrite.c index 5e03305487..1692bc14eb 100644 --- a/src/vnode/main/src/vnodeWrite.c +++ b/src/vnode/main/src/vnodeWrite.c @@ -57,7 +57,8 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { if (pVnode->status != TAOS_VN_STATUS_READY) return TSDB_CODE_NOT_ACTIVE_VNODE; - // if (pVnode->replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) + if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) + return TSDB_CODE_NO_MASTER; // assign version pVnode->version++; @@ -109,36 +110,27 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe int32_t code = 0; dTrace("pVnode:%p vgId:%d, table:%s, start to create", pVnode, pVnode->vgId, pTable->tableId); - pTable->numOfColumns = htons(pTable->numOfColumns); - pTable->numOfTags = htons(pTable->numOfTags); - pTable->sid = htonl(pTable->sid); - pTable->sversion = htonl(pTable->sversion); - pTable->tagDataLen = htonl(pTable->tagDataLen); - pTable->sqlDataLen = htonl(pTable->sqlDataLen); - pTable->uid = htobe64(pTable->uid); - pTable->superTableUid = htobe64(pTable->superTableUid); - pTable->createdTime = htobe64(pTable->createdTime); + int16_t numOfColumns = htons(pTable->numOfColumns); + int16_t numOfTags = htons(pTable->numOfTags); + int32_t sid = htonl(pTable->sid); + uint64_t uid = htobe64(pTable->uid); SSchema *pSchema = (SSchema *) pTable->data; - int totalCols = pTable->numOfColumns + pTable->numOfTags; - for (int i = 0; i < totalCols; i++) { - pSchema[i].colId = htons(pSchema[i].colId); - pSchema[i].bytes = htons(pSchema[i].bytes); - } - + int32_t totalCols = numOfColumns + numOfTags; + STableCfg tCfg; - tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid); + tsdbInitTableCfg(&tCfg, pTable->tableType, uid, sid); - STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns); - for (int i = 0; i < pTable->numOfColumns; i++) { - tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + STSchema *pDestSchema = tdNewSchema(numOfColumns); + for (int i = 0; i < numOfColumns; i++) { + tdSchemaAppendCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); } tsdbTableSetSchema(&tCfg, pDestSchema, false); - if (pTable->numOfTags != 0) { - STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags); - for (int i = pTable->numOfColumns; i < totalCols; i++) { - tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + if (numOfTags != 0) { + STSchema *pDestTagSchema = tdNewSchema(numOfTags); + for (int i = numOfColumns; i < totalCols; i++) { + tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); } tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); @@ -146,9 +138,9 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe int accumBytes = 0; SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); - for (int i = 0; i < pTable->numOfTags; i++) { + for (int i = 0; i < numOfTags; i++) { tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); - accumBytes += pSchema[i + pTable->numOfColumns].bytes; + accumBytes += htons(pSchema[i + numOfColumns].bytes); } tsdbTableSetTagValue(&tCfg, dataRow, false); } @@ -182,51 +174,46 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet int32_t code = 0; dTrace("pVnode:%p vgId:%d, table:%s, start to alter", pVnode, pVnode->vgId, pTable->tableId); - pTable->numOfColumns = htons(pTable->numOfColumns); - pTable->numOfTags = htons(pTable->numOfTags); - pTable->sid = htonl(pTable->sid); - pTable->sversion = htonl(pTable->sversion); - pTable->tagDataLen = htonl(pTable->tagDataLen); - pTable->sqlDataLen = htonl(pTable->sqlDataLen); - pTable->uid = htobe64(pTable->uid); - pTable->superTableUid = htobe64(pTable->superTableUid); - pTable->createdTime = htobe64(pTable->createdTime); + int16_t numOfColumns = htons(pTable->numOfColumns); + int16_t numOfTags = htons(pTable->numOfTags); + int32_t sid = htonl(pTable->sid); + uint64_t uid = htobe64(pTable->uid); SSchema *pSchema = (SSchema *) pTable->data; - int totalCols = pTable->numOfColumns + pTable->numOfTags; - for (int i = 0; i < totalCols; i++) { - pSchema[i].colId = htons(pSchema[i].colId); - pSchema[i].bytes = htons(pSchema[i].bytes); - } - + int32_t totalCols = numOfColumns + numOfTags; + STableCfg tCfg; - tsdbInitTableCfg(&tCfg, pTable->tableType, pTable->uid, pTable->sid); + tsdbInitTableCfg(&tCfg, pTable->tableType, uid, sid); - STSchema *pDestSchema = tdNewSchema(pTable->numOfColumns); - for (int i = 0; i < pTable->numOfColumns; i++) { - tdSchemaAppendCol(pDestSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + STSchema *pDestSchema = tdNewSchema(numOfColumns); + for (int i = 0; i < numOfColumns; i++) { + tdSchemaAppendCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); } tsdbTableSetSchema(&tCfg, pDestSchema, false); - if (pTable->numOfTags != 0) { - STSchema *pDestTagSchema = tdNewSchema(pTable->numOfTags); - for (int i = pTable->numOfColumns; i < totalCols; i++) { - tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, pSchema[i].colId, pSchema[i].bytes); + if (numOfTags != 0) { + STSchema *pDestTagSchema = tdNewSchema(numOfTags); + for (int i = numOfColumns; i < totalCols; i++) { + tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); } - tsdbTableSetSchema(&tCfg, pDestTagSchema, false); + tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); char *pTagData = pTable->data + totalCols * sizeof(SSchema); int accumBytes = 0; SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema); - for (int i = 0; i < pTable->numOfTags; i++) { + for (int i = 0; i < numOfTags; i++) { tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i); - accumBytes += pSchema[i + pTable->numOfColumns].bytes; + accumBytes += htons(pSchema[i + numOfColumns].bytes); } tsdbTableSetTagValue(&tCfg, dataRow, false); } - code = tsdbAlterTable(pVnode->tsdb, &tCfg); + void *pTsdb = vnodeGetTsdb(pVnode); + code = tsdbAlterTable(pTsdb, &tCfg); + + tfree(pDestSchema); + dTrace("pVnode:%p vgId:%d, table:%s, alter table result:%d", pVnode, pVnode->vgId, pTable->tableId, code); return code; @@ -237,7 +224,7 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet int32_t code = 0; dTrace("pVnode:%p vgId:%d, stable:%s, start to drop", pVnode, pVnode->vgId, pTable->tableId); - pTable->uid = htobe64(pTable->uid); + // int64_t uid = htobe64(pTable->uid); // TODO: drop stable in vvnode //void *pTsdb = dnodeGetVnodeTsdb(pMsg->pVnode); diff --git a/tests/script/tmp/prepare.sim b/tests/script/tmp/prepare.sim index 1f0b893e6d..731b707434 100644 --- a/tests/script/tmp/prepare.sim +++ b/tests/script/tmp/prepare.sim @@ -1,3 +1,4 @@ system sh/stop_dnodes.sh system sh/deploy.sh -n dnode1 -m 192.168.0.1 -i 192.168.0.1 -system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2 \ No newline at end of file +system sh/deploy.sh -n dnode2 -m 192.168.0.1 -i 192.168.0.2 +system sh/deploy.sh -n dnode3 -m 192.168.0.1 -i 192.168.0.3 \ No newline at end of file