Merge pull request #746 from taosdata/feature/slguan
Fix crashes and other exceptions that may occur when deleting a database
This commit is contained in:
commit
f7e40d0e02
|
@ -134,6 +134,8 @@ extern "C" {
|
|||
#define TSDB_CODE_INVALID_SUBMIT_MSG 113
|
||||
#define TSDB_CODE_NOT_ACTIVE_TABLE 114
|
||||
#define TSDB_CODE_INVALID_TABLE_ID 115
|
||||
#define TSDB_CODE_INVALID_VNODE_STATUS 116
|
||||
#define TSDB_CODE_FAILED_TO_LOCK_RESOURCES 117
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -44,22 +44,40 @@ extern "C" {
|
|||
#define TSDB_TIME_PRECISION_MILLI_STR "ms"
|
||||
#define TSDB_TIME_PRECISION_MICRO_STR "us"
|
||||
|
||||
enum _status {
|
||||
TSDB_STATUS_OFFLINE,
|
||||
TSDB_STATUS_CREATING,
|
||||
TSDB_STATUS_UNSYNCED,
|
||||
TSDB_STATUS_SLAVE,
|
||||
TSDB_STATUS_MASTER,
|
||||
TSDB_STATUS_READY,
|
||||
enum _vnode_status {
|
||||
TSDB_VNODE_STATUS_OFFLINE,
|
||||
TSDB_VNODE_STATUS_CREATING,
|
||||
TSDB_VNODE_STATUS_UNSYNCED,
|
||||
TSDB_VNODE_STATUS_SLAVE,
|
||||
TSDB_VNODE_STATUS_MASTER,
|
||||
TSDB_VNODE_STATUS_CLOSING,
|
||||
TSDB_VNODE_STATUS_DELETING,
|
||||
};
|
||||
|
||||
enum _syncstatus {
|
||||
enum _vnode_sync_status {
|
||||
STDB_SSTATUS_INIT,
|
||||
TSDB_SSTATUS_SYNCING,
|
||||
TSDB_SSTATUS_SYNC_CACHE,
|
||||
TSDB_SSTATUS_SYNC_FILE,
|
||||
};
|
||||
|
||||
enum _dnode_status {
|
||||
TSDB_DNODE_STATUS_OFFLINE,
|
||||
TSDB_DNODE_STATUS_READY
|
||||
};
|
||||
|
||||
enum _dnode_balance_status {
|
||||
LB_DNODE_STATE_BALANCED,
|
||||
LB_DNODE_STATE_BALANCING,
|
||||
LB_DNODE_STATE_OFFLINE_REMOVING,
|
||||
LB_DNODE_STATE_SHELL_REMOVING
|
||||
};
|
||||
|
||||
enum _vgroup_status {
|
||||
LB_VGROUP_STATE_READY,
|
||||
LB_VGROUP_STATE_UPDATE
|
||||
};
|
||||
|
||||
#define TSDB_DATA_TYPE_BOOL 1 // 1 bytes
|
||||
#define TSDB_DATA_TYPE_TINYINT 2 // 1 byte
|
||||
#define TSDB_DATA_TYPE_SMALLINT 3 // 2 bytes
|
||||
|
|
|
@ -20,10 +20,11 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
extern char *sdbDnodeStatusStr[];
|
||||
extern char *sdbDnodeBalanceStateStr[];
|
||||
extern char *sdbVnodeDropStateStr[];
|
||||
extern char *sdbVnodeSyncStatusStr[];
|
||||
const char* taosGetVnodeStatusStr(int vnodeStatus);
|
||||
const char* taosGetDnodeStatusStr(int dnodeStatus);
|
||||
const char* taosGetDnodeBalanceStateStr(int dnodeBalanceStatus);
|
||||
const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus);
|
||||
const char* taosGetVnodeDropStatusStr(int dropping);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -354,6 +354,8 @@ int taosOpenRpcChannWithQ(void *handle, int cid, int sessions, void *qhandle) {
|
|||
STaosRpc * pServer = (STaosRpc *)handle;
|
||||
SRpcChann *pChann;
|
||||
|
||||
tTrace("cid:%d, handle:%p open rpc chann", cid, handle);
|
||||
|
||||
if (pServer == NULL) return -1;
|
||||
if (cid >= pServer->numOfChanns || cid < 0) {
|
||||
tError("%s: cid:%d, chann is out of range, max:%d", pServer->label, cid, pServer->numOfChanns);
|
||||
|
@ -402,6 +404,8 @@ void taosCloseRpcChann(void *handle, int cid) {
|
|||
STaosRpc * pServer = (STaosRpc *)handle;
|
||||
SRpcChann *pChann;
|
||||
|
||||
tTrace("cid:%d, handle:%p close rpc chann", cid, handle);
|
||||
|
||||
if (pServer == NULL) return;
|
||||
if (cid >= pServer->numOfChanns || cid < 0) {
|
||||
tError("%s cid:%d, chann is out of range, max:%d", pServer->label, cid, pServer->numOfChanns);
|
||||
|
|
|
@ -240,4 +240,6 @@ char *tsError[] = {"success",
|
|||
"invalid submit message",
|
||||
"not active table(not created yet or deleted already)", //114
|
||||
"invalid table id",
|
||||
"invalid vnode status", //116
|
||||
"failed to lock resources",
|
||||
};
|
||||
|
|
|
@ -28,15 +28,6 @@ extern "C" {
|
|||
#include "tstatus.h"
|
||||
#include "ttime.h"
|
||||
|
||||
enum {
|
||||
LB_DNODE_STATE_BALANCED,
|
||||
LB_DNODE_STATE_BALANCING,
|
||||
LB_DNODE_STATE_OFFLINE_REMOVING,
|
||||
LB_DNODE_STATE_SHELL_REMOVING
|
||||
};
|
||||
|
||||
enum { LB_VGROUP_STATE_READY, LB_VGROUP_STATE_UPDATE };
|
||||
|
||||
void mgmtCreateDnodeOrderList();
|
||||
|
||||
void mgmtReleaseDnodeOrderList();
|
||||
|
|
|
@ -92,7 +92,7 @@ typedef struct {
|
|||
SVPeerDesc vpeers[TSDB_VNODES_SUPPORT];
|
||||
SVnodePeer * peerInfo[TSDB_VNODES_SUPPORT];
|
||||
char selfIndex;
|
||||
char status;
|
||||
char vnodeStatus;
|
||||
char accessState; // Vnode access state, Readable/Writable
|
||||
char syncStatus;
|
||||
char commitInProcess;
|
||||
|
|
|
@ -26,6 +26,7 @@
|
|||
#include "vnodeMgmt.h"
|
||||
#include "vnodeSystem.h"
|
||||
#include "vnodeUtil.h"
|
||||
#include "tstatus.h"
|
||||
|
||||
SMgmtObj mgmtObj;
|
||||
extern uint64_t tsCreatedTime;
|
||||
|
@ -330,7 +331,7 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (vnodeList[vnode].status == TSDB_STATUS_CREATING) {
|
||||
if (vnodeList[vnode].vnodeStatus == TSDB_VNODE_STATUS_CREATING) {
|
||||
dTrace("vid:%d, vnode is still under creating", vnode);
|
||||
return 0;
|
||||
}
|
||||
|
@ -359,13 +360,27 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
|
|||
}
|
||||
|
||||
if (vnodeList[vnode].cfg.maxSessions == 0) {
|
||||
dTrace("vid:%d, vnode is empty", vnode);
|
||||
if (pCfg->maxSessions > 0) {
|
||||
return vnodeCreateVnode(vnode, pCfg, pMsg->vpeerDesc);
|
||||
if (vnodeList[vnode].vnodeStatus == TSDB_VNODE_STATUS_OFFLINE) {
|
||||
dTrace("vid:%d, status:%s, start to create vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
return vnodeCreateVnode(vnode, pCfg, pMsg->vpeerDesc);
|
||||
} else {
|
||||
dTrace("vid:%d, status:%s, cannot preform create vnode operation", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
return TSDB_CODE_INVALID_VNODE_STATUS;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
dTrace("vid:%d, vnode is not empty", vnode);
|
||||
if (pCfg->maxSessions > 0) {
|
||||
if (vnodeList[vnode].vnodeStatus == TSDB_VNODE_STATUS_DELETING) {
|
||||
dTrace("vid:%d, status:%s, wait vnode delete finished", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
} else {
|
||||
dTrace("vid:%d, status:%s, start to update vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
}
|
||||
/*
|
||||
if (pCfg->maxSessions != vnodeList[vnode].cfg.maxSessions) {
|
||||
vnodeCleanUpOneVnode(vnode);
|
||||
vnodeCleanUpOneVnode(vnode);
|
||||
}
|
||||
|
||||
vnodeConfigVPeers(vnode, pCfg->replications, pMsg->vpeerDesc);
|
||||
|
@ -376,7 +391,10 @@ int vnodeProcessVPeerCfg(char *msg, int msgLen, SMgmtObj *pMgmtObj) {
|
|||
vnodeList[vnode].cfg.maxSessions = pCfg->maxSessions;
|
||||
vnodeOpenVnode(vnode);
|
||||
}
|
||||
*/
|
||||
return 0;
|
||||
} else {
|
||||
dTrace("vid:%d, status:%s, start to delete vnode", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
vnodeRemoveVnode(vnode);
|
||||
}
|
||||
}
|
||||
|
@ -434,11 +452,11 @@ int vnodeProcessFreeVnodeRequest(char *pMsg, int msgLen, SMgmtObj *pMgmtObj) {
|
|||
pFree->vnode = htons(pFree->vnode);
|
||||
|
||||
if (pFree->vnode < 0 || pFree->vnode >= TSDB_MAX_VNODES) {
|
||||
dWarn("vid:%d out of range", pFree->vnode);
|
||||
dWarn("vid:%d, out of range", pFree->vnode);
|
||||
return -1;
|
||||
}
|
||||
|
||||
dTrace("vid:%d receive free vnode message", pFree->vnode);
|
||||
dTrace("vid:%d, receive free vnode message", pFree->vnode);
|
||||
int32_t code = vnodeRemoveVnode(pFree->vnode);
|
||||
assert(code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS);
|
||||
|
||||
|
|
|
@ -141,11 +141,10 @@ int mgmtCheckDbParams(SCreateDbMsg *pCreate) {
|
|||
if (pCreate->cacheNumOfBlocks.fraction < 0) pCreate->cacheNumOfBlocks.fraction = tsAverageCacheBlocks; //
|
||||
//-1 for balance
|
||||
|
||||
#ifdef CLUSTER
|
||||
if (pCreate->replications > TSDB_VNODES_SUPPORT - 1) pCreate->replications = TSDB_VNODES_SUPPORT - 1;
|
||||
#else
|
||||
pCreate->replications = 1;
|
||||
#endif
|
||||
if (pCreate->replications <= 0 || pCreate->replications > TSDB_REPLICA_MAX_NUM) {
|
||||
mTrace("invalid db option replications: %d", pCreate->replications);
|
||||
return TSDB_CODE_INVALID_OPTION;
|
||||
}
|
||||
|
||||
if (pCreate->commitLog < 0 || pCreate->commitLog > 1) {
|
||||
mTrace("invalid db option commitLog: %d", pCreate->commitLog);
|
||||
|
@ -316,7 +315,7 @@ bool mgmtCheckDropDbFinished(SDbObj *pDb) {
|
|||
SDnodeObj *pDnode = mgmtGetDnode(pVnodeGid->ip);
|
||||
|
||||
if (pDnode == NULL) continue;
|
||||
if (pDnode->status == TSDB_STATUS_OFFLINE) continue;
|
||||
if (pDnode->status == TSDB_DNODE_STATUS_OFFLINE) continue;
|
||||
|
||||
SVnodeLoad *pVload = &pDnode->vload[pVnodeGid->vnode];
|
||||
if (pVload->dropStatus == TSDB_VN_STATUS_DROPPING) {
|
||||
|
|
|
@ -21,6 +21,7 @@
|
|||
#include "mgmt.h"
|
||||
#include "tschemautil.h"
|
||||
#include "tstatus.h"
|
||||
#include "tstatus.h"
|
||||
|
||||
bool mgmtCheckModuleInDnode(SDnodeObj *pDnode, int moduleType);
|
||||
int mgmtGetDnodesNum();
|
||||
|
@ -43,9 +44,9 @@ void mgmtSetDnodeMaxVnodes(SDnodeObj *pDnode) {
|
|||
pDnode->openVnodes = 0;
|
||||
|
||||
#ifdef CLUSTER
|
||||
pDnode->status = TSDB_STATUS_OFFLINE;
|
||||
pDnode->status = TSDB_DNODE_STATUS_OFFLINE;
|
||||
#else
|
||||
pDnode->status = TSDB_STATUS_READY;
|
||||
pDnode->status = TSDB_DNODE_STATUS_READY;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -57,9 +58,9 @@ void mgmtCalcNumOfFreeVnodes(SDnodeObj *pDnode) {
|
|||
if (pVload->vgId != 0) {
|
||||
mTrace("dnode:%s, calc free vnodes, exist vnode:%d, vgroup:%d, state:%d %s, dropstate:%d %s, syncstatus:%d %s",
|
||||
taosIpStr(pDnode->privateIp), i, pVload->vgId,
|
||||
pVload->status, sdbDnodeStatusStr[pVload->status],
|
||||
pVload->dropStatus, sdbVnodeDropStateStr[pVload->dropStatus],
|
||||
pVload->syncStatus, sdbVnodeSyncStatusStr[pVload->syncStatus]);
|
||||
pVload->status, taosGetDnodeStatusStr(pVload->status),
|
||||
pVload->dropStatus, taosGetVnodeDropStatusStr(pVload->dropStatus),
|
||||
pVload->syncStatus, taosGetVnodeSyncStatusStr(pVload->syncStatus));
|
||||
totalVnodes++;
|
||||
}
|
||||
}
|
||||
|
@ -196,11 +197,11 @@ int mgmtRetrieveDnodes(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
|
|||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
strcpy(pWrite, sdbDnodeStatusStr[pDnode->status]);
|
||||
strcpy(pWrite, taosGetDnodeStatusStr(pDnode->status) );
|
||||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
strcpy(pWrite, sdbDnodeBalanceStateStr[pDnode->lbState]);
|
||||
strcpy(pWrite, taosGetDnodeBalanceStateStr(pDnode->lbState));
|
||||
cols++;
|
||||
|
||||
tinet_ntoa(ipstr, pDnode->publicIp);
|
||||
|
@ -292,7 +293,7 @@ int mgmtRetrieveModules(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
|
|||
cols++;
|
||||
|
||||
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
|
||||
strcpy(pWrite, sdbDnodeStatusStr[pDnode->status]);
|
||||
strcpy(pWrite, taosGetDnodeStatusStr(pDnode->status) );
|
||||
cols++;
|
||||
|
||||
numOfRows++;
|
||||
|
|
|
@ -128,7 +128,7 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) {
|
|||
}
|
||||
|
||||
if (pDb->vgStatus != TSDB_VG_STATUS_IN_PROGRESS) {
|
||||
mTrace("dnode:%s, db:%s vpeer rsp already disposed, code:%d", taosIpStr(pObj->privateIp), pRsp->more, pRsp->code);
|
||||
mTrace("dnode:%s, db:%s vpeer rsp already disposed, vgroup status:%d code:%d", taosIpStr(pObj->privateIp), pRsp->more, pDb->vgStatus, pRsp->code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -140,10 +140,11 @@ int mgmtProcessVPeersRsp(char *msg, int msgLen, SDnodeObj *pObj) {
|
|||
|
||||
if (pRsp->code == TSDB_CODE_VG_COMMITLOG_INIT_FAILED) {
|
||||
pDb->vgStatus = TSDB_VG_STATUS_COMMITLOG_INIT_FAILED;
|
||||
mError("dnode:%s, db:%s vgroup commit log init failed, code:%d", taosIpStr(pObj->privateIp), pRsp->more, pRsp->code);
|
||||
} else {
|
||||
pDb->vgStatus = TSDB_VG_STATUS_INIT_FAILED;
|
||||
mError("dnode:%s, db:%s vgroup init failed, code:%d", taosIpStr(pObj->privateIp), pRsp->more, pRsp->code);
|
||||
}
|
||||
mError("dnode:%s, db:%s vgroup create failed, code:%d", taosIpStr(pObj->privateIp), pRsp->more, pRsp->code);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -331,7 +332,6 @@ char *mgmtBuildVpeersIe(char *pMsg, SVgObj *pVgroup, int vnode) {
|
|||
pCfg->replications = (char)pVgroup->numOfVnodes;
|
||||
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
|
||||
|
||||
#ifdef CLUSTER
|
||||
SVPeerDesc *vpeerDesc = pVPeers->vpeerDesc;
|
||||
|
||||
pMsg = (char *)(pVPeers->vpeerDesc);
|
||||
|
@ -341,7 +341,6 @@ char *mgmtBuildVpeersIe(char *pMsg, SVgObj *pVgroup, int vnode) {
|
|||
vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode);
|
||||
pMsg += sizeof(SVPeerDesc);
|
||||
}
|
||||
#endif
|
||||
|
||||
return pMsg;
|
||||
}
|
||||
|
|
|
@ -502,7 +502,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
|
||||
int numOfTables = sdbGetNumOfRows(meterSdb);
|
||||
if (numOfTables >= tsMaxTables) {
|
||||
mWarn("numOfTables:%d, exceed tsMaxTables:%d", numOfTables, tsMaxTables);
|
||||
mError("table:%s, numOfTables:%d exceed maxTables:%d", pCreate->meterId, numOfTables, tsMaxTables);
|
||||
return TSDB_CODE_TOO_MANY_TABLES;
|
||||
}
|
||||
|
||||
|
@ -510,6 +510,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
assert(pAcct != NULL);
|
||||
int code = mgmtCheckMeterLimit(pAcct, pCreate);
|
||||
if (code != 0) {
|
||||
mError("table:%s, exceed the limit", pCreate->meterId);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -517,8 +518,10 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
pMeter = mgmtGetMeter(pCreate->meterId);
|
||||
if (pMeter) {
|
||||
if (pCreate->igExists) {
|
||||
mError("table:%s, igExists is true", pCreate->meterId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
mError("table:%s, table is already exist", pCreate->meterId);
|
||||
return TSDB_CODE_TABLE_ALREADY_EXIST;
|
||||
}
|
||||
}
|
||||
|
@ -533,6 +536,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
char *pTagData = (char *)pCreate->schema; // it is a tag key
|
||||
pMetric = mgmtGetMeter(pTagData);
|
||||
if (pMetric == NULL) {
|
||||
mError("table:%s, corresponding super table does not exist", pCreate->meterId);
|
||||
return TSDB_CODE_INVALID_TABLE;
|
||||
}
|
||||
|
||||
|
@ -545,6 +549,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
pMeter->schema = (char *)malloc(size);
|
||||
if (pMeter->schema == NULL) {
|
||||
mgmtDestroyMeter(pMeter);
|
||||
mError("table:%s, corresponding super table schema is null", pCreate->meterId);
|
||||
return TSDB_CODE_INVALID_TABLE;
|
||||
}
|
||||
memset(pMeter->schema, 0, size);
|
||||
|
@ -556,13 +561,13 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
pMeter->pTagData = pMeter->schema;
|
||||
pMeter->nextColId = pMetric->nextColId;
|
||||
memcpy(pMeter->pTagData, pTagData, size);
|
||||
|
||||
} else {
|
||||
int numOfCols = pCreate->numOfColumns + pCreate->numOfTags;
|
||||
size = numOfCols * sizeof(SSchema) + pCreate->sqlLen;
|
||||
pMeter->schema = (char *)malloc(size);
|
||||
if (pMeter->schema == NULL) {
|
||||
mgmtDestroyMeter(pMeter);
|
||||
mError("table:%s, no schema input", pCreate->meterId);
|
||||
return TSDB_CODE_SERV_OUT_OF_MEMORY;
|
||||
}
|
||||
memset(pMeter->schema, 0, size);
|
||||
|
@ -583,7 +588,7 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
pMeter->pSql = pMeter->schema + numOfCols * sizeof(SSchema);
|
||||
memcpy(pMeter->pSql, (char *)(pCreate->schema) + numOfCols * sizeof(SSchema), pCreate->sqlLen);
|
||||
pMeter->pSql[pCreate->sqlLen - 1] = 0;
|
||||
mTrace("stream sql len:%d, sql:%s", pCreate->sqlLen, pMeter->pSql);
|
||||
mTrace("table:%s, stream sql len:%d sql:%s", pCreate->meterId, pCreate->sqlLen, pMeter->pSql);
|
||||
} else {
|
||||
if (pCreate->numOfTags > 0) {
|
||||
pMeter->meterType = TSDB_METER_METRIC;
|
||||
|
@ -596,13 +601,14 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
pMeter->createdTime = taosGetTimestampMs();
|
||||
strcpy(pMeter->meterId, pCreate->meterId);
|
||||
if (pthread_rwlock_init(&pMeter->rwLock, NULL)) {
|
||||
mError("Failed to init meter lock");
|
||||
mError("table:%s, failed to init meter lock", pCreate->meterId);
|
||||
mgmtDestroyMeter(pMeter);
|
||||
return TSDB_CODE_OTHERS;
|
||||
return TSDB_CODE_FAILED_TO_LOCK_RESOURCES;
|
||||
}
|
||||
|
||||
code = mgmtCheckMeterGrant(pCreate, pMeter);
|
||||
if (code != 0) {
|
||||
mError("table:%s, grant expired", pCreate->meterId);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -611,21 +617,25 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
|
||||
if (pDb->vgStatus == TSDB_VG_STATUS_IN_PROGRESS) {
|
||||
mgmtDestroyMeter(pMeter);
|
||||
mTrace("table:%s, vgroup in creating progress", pCreate->meterId);
|
||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
if (pDb->vgStatus == TSDB_VG_STATUS_FULL) {
|
||||
mgmtDestroyMeter(pMeter);
|
||||
mError("table:%s, vgroup is full", pCreate->meterId);
|
||||
return TSDB_CODE_NO_ENOUGH_DNODES;
|
||||
}
|
||||
|
||||
if (pDb->vgStatus == TSDB_VG_STATUS_COMMITLOG_INIT_FAILED) {
|
||||
mgmtDestroyMeter(pMeter);
|
||||
mError("table:%s, commit log init failed", pCreate->meterId);
|
||||
return TSDB_CODE_VG_COMMITLOG_INIT_FAILED;
|
||||
}
|
||||
|
||||
if (pDb->vgStatus == TSDB_VG_STATUS_INIT_FAILED) {
|
||||
mgmtDestroyMeter(pMeter);
|
||||
mError("table:%s, vgroup init failed", pCreate->meterId);
|
||||
return TSDB_CODE_VG_INIT_FAILED;
|
||||
}
|
||||
|
||||
|
@ -633,12 +643,13 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS;
|
||||
mgmtCreateVgroup(pDb);
|
||||
mgmtDestroyMeter(pMeter);
|
||||
mTrace("table:%s, vgroup malloced, wait for create progress finished", pCreate->meterId);
|
||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
int sid = taosAllocateId(pVgroup->idPool);
|
||||
if (sid < 0) {
|
||||
mWarn("db:%s, vgroup:%d, run out of ID, num:%d", pDb->name, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool));
|
||||
mWarn("table:%s, vgroup:%d run out of ID, num:%d", pCreate->meterId, pVgroup->vgId, taosIdPoolNumOfUsed(pVgroup->idPool));
|
||||
pDb->vgStatus = TSDB_VG_STATUS_IN_PROGRESS;
|
||||
mgmtCreateVgroup(pDb);
|
||||
mgmtDestroyMeter(pMeter);
|
||||
|
@ -650,18 +661,21 @@ int mgmtCreateMeter(SDbObj *pDb, SCreateTableMsg *pCreate) {
|
|||
pMeter->uid = (((uint64_t)pMeter->gid.vgId) << 40) + ((((uint64_t)pMeter->gid.sid) & ((1ul << 24) - 1ul)) << 16) +
|
||||
((uint64_t)sdbVersion & ((1ul << 16) - 1ul));
|
||||
|
||||
mTrace("meter:%s, create meter in vgroup, vgId:%d, sid:%d, vnode:%d, uid:%d",
|
||||
pMeter->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pMeter->uid);
|
||||
mTrace("table:%s, create table in vgroup, vgId:%d sid:%d vnode:%d uid:%d db:%s",
|
||||
pMeter->meterId, pVgroup->vgId, sid, pVgroup->vnodeGid[0].vnode, pMeter->uid, pDb->name);
|
||||
} else {
|
||||
pMeter->uid = (((uint64_t)pMeter->createdTime) << 16) + ((uint64_t)sdbVersion & ((1ul << 16) - 1ul));
|
||||
}
|
||||
|
||||
if (sdbInsertRow(meterSdb, pMeter, 0) < 0) return TSDB_CODE_SDB_ERROR;
|
||||
if (sdbInsertRow(meterSdb, pMeter, 0) < 0) {
|
||||
mError("table:%s, update sdb error", pCreate->meterId);
|
||||
return TSDB_CODE_SDB_ERROR;
|
||||
}
|
||||
|
||||
// send create message to the selected vnode servers
|
||||
if (pCreate->numOfTags == 0) {
|
||||
mTrace("meter:%s, send msg to dnode, vgId:%d, sid:%d, vnode:%d, dbname:%s",
|
||||
pMeter->meterId, pMeter->gid.vgId, pMeter->gid.sid, pVgroup->vnodeGid[0].vnode, pDb->name);
|
||||
mTrace("table:%s, send create msg to dnode, vgId:%d, sid:%d, vnode:%d",
|
||||
pMeter->meterId, pMeter->gid.vgId, pMeter->gid.sid, pVgroup->vnodeGid[0].vnode);
|
||||
|
||||
grantAddTimeSeries(pMeter->numOfColumns - 1);
|
||||
mgmtSendCreateMsgToVgroup(pMeter, pVgroup);
|
||||
|
@ -881,7 +895,10 @@ void mgmtCleanUpMeters() { sdbCloseTable(meterSdb); }
|
|||
int mgmtGetMeterMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
||||
int cols = 0;
|
||||
|
||||
if (pConn->pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
|
||||
SDbObj *pDb = NULL;
|
||||
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
|
||||
|
||||
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
|
||||
|
||||
SSchema *pSchema = tsGetSchema(pMeta);
|
||||
|
||||
|
@ -916,7 +933,7 @@ int mgmtGetMeterMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
|||
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||
|
||||
// pShow->numOfRows = sdbGetNumOfRows (meterSdb);
|
||||
pShow->numOfRows = pConn->pDb->numOfTables;
|
||||
pShow->numOfRows = pDb->numOfTables;
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
|
||||
return 0;
|
||||
|
@ -1208,8 +1225,12 @@ int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
|
|||
int numOfRead = 0;
|
||||
char prefix[20] = {0};
|
||||
|
||||
if (pConn->pDb == NULL) return 0;
|
||||
strcpy(prefix, pConn->pDb->name);
|
||||
SDbObj *pDb = NULL;
|
||||
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
|
||||
|
||||
if (pDb == NULL) return 0;
|
||||
|
||||
strcpy(prefix, pDb->name);
|
||||
strcat(prefix, TS_PATH_DELIMITER);
|
||||
prefixLen = strlen(prefix);
|
||||
|
||||
|
@ -1269,7 +1290,10 @@ int mgmtRetrieveMeters(SShowObj *pShow, char *data, int rows, SConnObj *pConn) {
|
|||
int mgmtGetMetricMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
||||
int cols = 0;
|
||||
|
||||
if (pConn->pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
|
||||
SDbObj *pDb = NULL;
|
||||
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
|
||||
|
||||
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
|
||||
|
||||
SSchema *pSchema = tsGetSchema(pMeta);
|
||||
|
||||
|
@ -1309,8 +1333,8 @@ int mgmtGetMetricMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
|||
pShow->offset[0] = 0;
|
||||
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||
|
||||
pShow->numOfRows = pConn->pDb->numOfMetrics;
|
||||
pShow->pNode = pConn->pDb->pMetric;
|
||||
pShow->numOfRows = pDb->numOfMetrics;
|
||||
pShow->pNode = pDb->pMetric;
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
|
||||
return 0;
|
||||
|
|
|
@ -189,8 +189,11 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
int size = sizeof(STaosHeader) + sizeof(STaosRsp) + sizeof(SMeterMeta) + sizeof(SSchema) * TSDB_MAX_COLUMNS +
|
||||
sizeof(SSchema) * TSDB_MAX_TAGS + TSDB_MAX_TAGS_LEN + TSDB_EXTRA_PAYLOAD_SIZE;
|
||||
|
||||
SDbObj *pDb = NULL;
|
||||
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
|
||||
|
||||
// todo db check should be extracted
|
||||
if (pConn->pDb == NULL || (pConn->pDb != NULL && pConn->pDb->dropStatus != TSDB_DB_STATUS_READY)) {
|
||||
if (pDb == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) {
|
||||
|
||||
if ((pStart = mgmtAllocMsg(pConn, size, &pMsg, &pRsp)) == NULL) {
|
||||
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METERINFO_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
|
||||
|
@ -223,10 +226,10 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
|
||||
SDbObj* pMeterDb = mgmtGetDbByMeterId(pCreateMsg->meterId);
|
||||
mTrace("meter:%s, pConnDb:%p, pConnDbName:%s, pMeterDb:%p, pMeterDbName:%s",
|
||||
pCreateMsg->meterId, pConn->pDb, pConn->pDb->name, pMeterDb, pMeterDb->name);
|
||||
assert(pConn->pDb == pMeterDb);
|
||||
pCreateMsg->meterId, pDb, pDb->name, pMeterDb, pMeterDb->name);
|
||||
assert(pDb == pMeterDb);
|
||||
|
||||
int32_t code = mgmtCreateMeter(pConn->pDb, pCreateMsg);
|
||||
int32_t code = mgmtCreateMeter(pDb, pCreateMsg);
|
||||
|
||||
char stableName[TSDB_METER_ID_LEN] = {0};
|
||||
strncpy(stableName, pInfo->tags, TSDB_METER_ID_LEN);
|
||||
|
@ -256,7 +259,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
}
|
||||
|
||||
if (pMeterObj == NULL) {
|
||||
if (pConn->pDb)
|
||||
if (pDb)
|
||||
pRsp->code = TSDB_CODE_INVALID_TABLE;
|
||||
else
|
||||
pRsp->code = TSDB_CODE_DB_NOT_SELECTED;
|
||||
|
@ -274,7 +277,7 @@ int mgmtProcessMeterMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
pMeta->vgid = htonl(pMeterObj->gid.vgId);
|
||||
pMeta->sversion = htons(pMeterObj->sversion);
|
||||
|
||||
pMeta->precision = pConn->pDb->cfg.precision;
|
||||
pMeta->precision = pDb->cfg.precision;
|
||||
|
||||
pMeta->numOfTags = pMeterObj->numOfTags;
|
||||
pMeta->numOfColumns = htons(pMeterObj->numOfColumns);
|
||||
|
@ -505,7 +508,10 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
SMetricMetaElemMsg *pElem = (SMetricMetaElemMsg *)(((char *)pMetricMetaMsg) + pMetricMetaMsg->metaElem[0]);
|
||||
pMetric = mgmtGetMeter(pElem->meterId);
|
||||
|
||||
if (pMetric == NULL || (pConn->pDb != NULL && pConn->pDb->dropStatus != TSDB_DB_STATUS_READY)) {
|
||||
SDbObj *pDb = NULL;
|
||||
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
|
||||
|
||||
if (pMetric == NULL || (pDb != NULL && pDb->dropStatus != TSDB_DB_STATUS_READY)) {
|
||||
pStart = taosBuildRspMsg(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP);
|
||||
if (pStart == NULL) {
|
||||
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_METRIC_META_RSP, TSDB_CODE_SERV_OUT_OF_MEMORY);
|
||||
|
@ -514,7 +520,7 @@ int mgmtProcessMetricMetaMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
|
||||
pMsg = pStart;
|
||||
pRsp = (STaosRsp *)pMsg;
|
||||
if (pConn->pDb)
|
||||
if (pDb)
|
||||
pRsp->code = TSDB_CODE_INVALID_TABLE;
|
||||
else
|
||||
pRsp->code = TSDB_CODE_DB_NOT_SELECTED;
|
||||
|
@ -957,17 +963,23 @@ int mgmtProcessCreateTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
pSchema++;
|
||||
}
|
||||
|
||||
if (pConn->pDb) {
|
||||
code = mgmtCreateMeter(pConn->pDb, pCreate);
|
||||
if (code == 0) {
|
||||
mTrace("meter:%s is created by %s", pCreate->meterId, pConn->pUser->user);
|
||||
// mLPrint("meter:%s is created by %s", pCreate->meterId, pConn->pUser->user);
|
||||
}
|
||||
SDbObj *pDb = NULL;
|
||||
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
|
||||
|
||||
if (pDb) {
|
||||
code = mgmtCreateMeter(pDb, pCreate);
|
||||
} else {
|
||||
code = TSDB_CODE_DB_NOT_SELECTED;
|
||||
}
|
||||
}
|
||||
|
||||
if (code != 0) {
|
||||
mError("table:%s, failed to create table, code:%d", pCreate->meterId, code);
|
||||
} else {
|
||||
mTrace("table:%s, table is created by %s", pCreate->meterId, pConn->pUser->user);
|
||||
//mLPrint("meter:%s is created by %s", pCreate->meterId, pConn->pUser->user);
|
||||
}
|
||||
|
||||
taosSendSimpleRsp(pConn->thandle, TSDB_MSG_TYPE_CREATE_TABLE_RSP, code);
|
||||
|
||||
return 0;
|
||||
|
@ -984,7 +996,10 @@ int mgmtProcessDropTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
if (!pConn->writeAuth) {
|
||||
code = TSDB_CODE_NO_RIGHTS;
|
||||
} else {
|
||||
code = mgmtDropMeter(pConn->pDb, pDrop->meterId, pDrop->igNotExists);
|
||||
SDbObj *pDb = NULL;
|
||||
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
|
||||
|
||||
code = mgmtDropMeter(pDb, pDrop->meterId, pDrop->igNotExists);
|
||||
if (code == 0) {
|
||||
mTrace("meter:%s is dropped by user:%s", pDrop->meterId, pConn->pUser->user);
|
||||
// mLPrint("meter:%s is dropped by user:%s", pDrop->meterId, pConn->pUser->user);
|
||||
|
@ -1014,12 +1029,15 @@ int mgmtProcessAlterTableMsg(char *pMsg, int msgLen, SConnObj *pConn) {
|
|||
mError("meter:%s error numOfCols:%d in alter table", pAlter->meterId, pAlter->numOfCols);
|
||||
code = TSDB_CODE_APP_ERROR;
|
||||
} else {
|
||||
if (pConn->pDb) {
|
||||
SDbObj *pDb = NULL;
|
||||
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
|
||||
|
||||
if (pDb) {
|
||||
for (int32_t i = 0; i < pAlter->numOfCols; ++i) {
|
||||
pAlter->schema[i].bytes = htons(pAlter->schema[i].bytes);
|
||||
}
|
||||
|
||||
code = mgmtAlterMeter(pConn->pDb, pAlter);
|
||||
code = mgmtAlterMeter(pDb, pAlter);
|
||||
if (code == 0) {
|
||||
mLPrint("meter:%s is altered by %s", pAlter->meterId, pConn->pUser->user);
|
||||
}
|
||||
|
@ -1263,8 +1281,7 @@ void *mgmtProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
|
|||
}
|
||||
|
||||
if (pConn->pAcct) {
|
||||
if (pConn->pDb == NULL ||
|
||||
strncmp(pConn->pDb->name, pHead->db, tListLen(pConn->pDb->name)) != 0) {
|
||||
if (pConn->pDb == NULL || strncmp(pConn->pDb->name, pHead->db, tListLen(pConn->pDb->name)) != 0) {
|
||||
pConn->pDb = mgmtGetDb(pHead->db);
|
||||
}
|
||||
|
||||
|
|
|
@ -123,7 +123,7 @@ void mgmtProcessVgTimer(void *handle, void *tmrId) {
|
|||
if (pDb == NULL) return;
|
||||
|
||||
if (pDb->vgStatus > TSDB_VG_STATUS_IN_PROGRESS) {
|
||||
mTrace("db:%s, set vgstatus from %d to %d", pDb->name, pDb->vgStatus, TSDB_VG_STATUS_READY);
|
||||
mTrace("db:%s, set vgroup status from %d to ready", pDb->name, pDb->vgStatus);
|
||||
pDb->vgStatus = TSDB_VG_STATUS_READY;
|
||||
}
|
||||
|
||||
|
@ -143,7 +143,7 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
|
|||
|
||||
// based on load balance, create a new one
|
||||
if (mgmtAllocVnodes(pVgroup) != 0) {
|
||||
mError("no enough free dnode");
|
||||
mError("db:%s, no enough free dnode to alloc %d vnodes", pDb->name, pVgroup->numOfVnodes);
|
||||
free(pVgroup);
|
||||
pDb->vgStatus = TSDB_VG_STATUS_FULL;
|
||||
taosTmrReset(mgmtProcessVgTimer, 5000, pDb, mgmtTmr, &pDb->vgTimer);
|
||||
|
@ -152,9 +152,9 @@ SVgObj *mgmtCreateVgroup(SDbObj *pDb) {
|
|||
|
||||
sdbInsertRow(vgSdb, pVgroup, 0);
|
||||
|
||||
mTrace("vgroup:%d, db:%s replica:%d is created", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
|
||||
mTrace("vgroup:%d, vgroup is created, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
|
||||
for (int i = 0; i < pVgroup->numOfVnodes; ++i)
|
||||
mTrace("dnode:%s, vgroup:%d, vnode:%d is created", taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vgId, pVgroup->vnodeGid[i].vnode);
|
||||
mTrace("vgroup:%d, dnode:%s vnode:%d is created", pVgroup->vgId, taosIpStr(pVgroup->vnodeGid[i].ip), pVgroup->vnodeGid[i].vnode);
|
||||
|
||||
mgmtSendVPeersMsg(pVgroup);
|
||||
|
||||
|
@ -206,7 +206,10 @@ void mgmtCleanUpVgroups() { sdbCloseTable(vgSdb); }
|
|||
int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
||||
int cols = 0;
|
||||
|
||||
if (pConn->pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
|
||||
SDbObj *pDb = NULL;
|
||||
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
|
||||
|
||||
if (pDb == NULL) return TSDB_CODE_DB_NOT_SELECTED;
|
||||
|
||||
SSchema *pSchema = tsGetSchema(pMeta);
|
||||
|
||||
|
@ -229,7 +232,7 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
|||
cols++;
|
||||
|
||||
int maxReplica = 0;
|
||||
SVgObj *pVgroup = pConn->pDb->pHead;
|
||||
SVgObj *pVgroup = pDb->pHead;
|
||||
while (pVgroup != NULL) {
|
||||
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
|
||||
pVgroup = pVgroup->next;
|
||||
|
@ -267,8 +270,8 @@ int mgmtGetVgroupMeta(SMeterMeta *pMeta, SShowObj *pShow, SConnObj *pConn) {
|
|||
pShow->offset[0] = 0;
|
||||
for (int i = 1; i < cols; ++i) pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1];
|
||||
|
||||
pShow->numOfRows = pConn->pDb->numOfVgroups;
|
||||
pShow->pNode = pConn->pDb->pHead;
|
||||
pShow->numOfRows = pDb->numOfVgroups;
|
||||
pShow->pNode = pDb->pHead;
|
||||
pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1];
|
||||
|
||||
return 0;
|
||||
|
@ -282,7 +285,11 @@ int mgmtRetrieveVgroups(SShowObj *pShow, char *data, int rows, SConnObj *pConn)
|
|||
char ipstr[20];
|
||||
|
||||
int maxReplica = 0;
|
||||
pVgroup = pConn->pDb->pHead;
|
||||
|
||||
SDbObj *pDb = NULL;
|
||||
if (pConn->pDb != NULL) pDb = mgmtGetDb(pConn->pDb->name);
|
||||
|
||||
pVgroup = pDb->pHead;
|
||||
while (pVgroup != NULL) {
|
||||
maxReplica = pVgroup->numOfVnodes > maxReplica ? pVgroup->numOfVnodes : maxReplica;
|
||||
pVgroup = pVgroup->next;
|
||||
|
|
|
@ -297,7 +297,7 @@ pthread_t vnodeCreateCommitThread(SVnodeObj *pVnode) {
|
|||
|
||||
taosTmrStopA(&pVnode->commitTimer);
|
||||
|
||||
if (pVnode->status == TSDB_STATUS_UNSYNCED) {
|
||||
if (pVnode->vnodeStatus == TSDB_VNODE_STATUS_UNSYNCED) {
|
||||
taosTmrReset(vnodeProcessCommitTimer, pVnode->cfg.commitTime * 1000, pVnode, vnodeTmrCtrl, &pVnode->commitTimer);
|
||||
dTrace("vid:%d, it is in unsyc state, commit later", pVnode->vnode);
|
||||
return pVnode->commitThread;
|
||||
|
|
|
@ -1290,7 +1290,7 @@ int vnodeWriteBlockToFile(SMeterObj *pObj, SCompBlock *pCompBlock, SData *data[]
|
|||
pCompBlock->len += wlen;
|
||||
}
|
||||
|
||||
dTrace("vid: %d vnode compStorage size is: %ld", pObj->vnode, pVnode->vnodeStatistic.compStorage);
|
||||
dTrace("vid:%d, vnode compStorage size is: %ld", pObj->vnode, pVnode->vnodeStatistic.compStorage);
|
||||
|
||||
pCompBlock->algorithm = pCfg->compression;
|
||||
pCompBlock->numOfPoints = points;
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "vnode.h"
|
||||
#include "vnodeRead.h"
|
||||
#include "vnodeUtil.h"
|
||||
#include "vnodeStore.h"
|
||||
|
||||
#pragma GCC diagnostic ignored "-Wint-conversion"
|
||||
extern int tsMaxQueues;
|
||||
|
@ -89,9 +90,9 @@ void *vnodeProcessMsgFromShell(char *msg, void *ahandle, void *thandle) {
|
|||
// if ( vnodeList[vnode].status != TSDB_STATUS_MASTER && pMsg->msgType != TSDB_MSG_TYPE_RETRIEVE ) {
|
||||
|
||||
#ifdef CLUSTER
|
||||
if (vnodeList[vnode].status != TSDB_STATUS_MASTER) {
|
||||
if (vnodeList[vnode].vnodeStatus != TSDB_VNODE_STATUS_MASTER) {
|
||||
taosSendSimpleRsp(thandle, pMsg->msgType + 1, TSDB_CODE_NOT_READY);
|
||||
dTrace("vid:%d sid:%d, shell msg is ignored since in state:%d", vnode, sid, vnodeList[vnode].status);
|
||||
dTrace("vid:%d sid:%d, shell msg is ignored since in state:%d", vnode, sid, vnodeList[vnode].vnodeStatus);
|
||||
} else {
|
||||
#endif
|
||||
dTrace("vid:%d sid:%d, msg:%s is received pConn:%p", vnode, sid, taosMsg[pMsg->msgType], thandle);
|
||||
|
@ -154,6 +155,11 @@ int vnodeInitShell() {
|
|||
}
|
||||
|
||||
int vnodeOpenShellVnode(int vnode) {
|
||||
if (shellList[vnode] != NULL) {
|
||||
dError("vid:%d, shell is already opened", vnode);
|
||||
return -1;
|
||||
}
|
||||
|
||||
const int32_t MIN_NUM_OF_SESSIONS = 300;
|
||||
|
||||
SVnodeCfg *pCfg = &vnodeList[vnode].cfg;
|
||||
|
@ -162,23 +168,29 @@ int vnodeOpenShellVnode(int vnode) {
|
|||
size_t size = sessions * sizeof(SShellObj);
|
||||
shellList[vnode] = (SShellObj *)calloc(1, size);
|
||||
if (shellList[vnode] == NULL) {
|
||||
dError("vid:%d failed to allocate shellObj, size:%d", vnode, size);
|
||||
dError("vid:%d, sessions:%d, failed to allocate shellObj, size:%d", vnode, pCfg->maxSessions, size);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if(taosOpenRpcChannWithQ(pShellServer, vnode, sessions, rpcQhandle[(vnode+1)%tsMaxQueues]) != TSDB_CODE_SUCCESS) {
|
||||
dError("vid:%d, sessions:%d, failed to open shell", vnode, pCfg->maxSessions);
|
||||
return -1;
|
||||
}
|
||||
|
||||
dTrace("vid:%d, sessions:%d, shell is opened", vnode, pCfg->maxSessions);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static void vnodeDelayedFreeResource(void *param, void *tmrId) {
|
||||
int32_t vnode = *(int32_t*) param;
|
||||
taosCloseRpcChann(pShellServer, vnode); // close connection
|
||||
tfree (shellList[vnode]); //free SShellObj
|
||||
dTrace("vid:%d, start to free resources", vnode);
|
||||
|
||||
taosCloseRpcChann(pShellServer, vnode); // close connection
|
||||
tfree(shellList[vnode]); //free SShellObj
|
||||
tfree(param);
|
||||
|
||||
memset(vnodeList + vnode, 0, sizeof(SVnodeObj));
|
||||
vnodeCalcOpenVnodes();
|
||||
}
|
||||
|
||||
void vnodeCloseShellVnode(int vnode) {
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "vnode.h"
|
||||
#include "vnodeStore.h"
|
||||
#include "vnodeUtil.h"
|
||||
#include "tstatus.h"
|
||||
|
||||
#pragma GCC diagnostic push
|
||||
#pragma GCC diagnostic warning "-Woverflow"
|
||||
|
@ -30,12 +31,14 @@ int tsMaxVnode = -1;
|
|||
int tsOpenVnodes = 0;
|
||||
SVnodeObj *vnodeList = NULL;
|
||||
|
||||
int vnodeInitStoreVnode(int vnode) {
|
||||
static int vnodeInitStoreVnode(int vnode) {
|
||||
SVnodeObj *pVnode = vnodeList + vnode;
|
||||
|
||||
pVnode->vnode = vnode;
|
||||
vnodeOpenMetersVnode(vnode);
|
||||
if (pVnode->cfg.maxSessions == 0) return 0;
|
||||
if (pVnode->cfg.maxSessions <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
pVnode->firstKey = taosGetTimestamp(pVnode->cfg.precision);
|
||||
|
||||
|
@ -45,9 +48,10 @@ int vnodeInitStoreVnode(int vnode) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (vnodeInitFile(vnode) < 0) return -1;
|
||||
|
||||
// vnodeOpenMeterMgmtStoreVnode(vnode);
|
||||
if (vnodeInitFile(vnode) < 0) {
|
||||
dError("vid:%d, files init failed.", pVnode->vnode);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (vnodeInitCommit(vnode) < 0) {
|
||||
dError("vid:%d, commit init failed.", pVnode->vnode);
|
||||
|
@ -70,10 +74,17 @@ int vnodeOpenVnode(int vnode) {
|
|||
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
|
||||
|
||||
// vnode is empty
|
||||
if (pVnode->cfg.maxSessions == 0) return 0;
|
||||
if (pVnode->cfg.maxSessions <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (!(pVnode->vnodeStatus == TSDB_VNODE_STATUS_OFFLINE || pVnode->vnodeStatus == TSDB_VNODE_STATUS_CREATING)) {
|
||||
dError("vid:%d, status:%s, cannot enter open operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
|
||||
return TSDB_CODE_INVALID_VNODE_STATUS;
|
||||
}
|
||||
|
||||
dTrace("vid:%d, status:%s, start to open", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
|
||||
pthread_mutex_lock(&dmutex);
|
||||
// vnodeOpenMeterMgmtVnode(vnode);
|
||||
|
||||
// not enough memory, abort
|
||||
if ((code = vnodeOpenShellVnode(vnode)) != TSDB_CODE_SUCCESS) {
|
||||
|
@ -93,14 +104,13 @@ int vnodeOpenVnode(int vnode) {
|
|||
vnodeOpenStreams(pVnode, NULL);
|
||||
#endif
|
||||
|
||||
dTrace("vid:%d, vnode is opened, openVnodes:%d", vnode, tsOpenVnodes);
|
||||
dTrace("vid:%d, vnode is opened, openVnodes:%d, status:%s", vnode, tsOpenVnodes, taosGetVnodeStatusStr(pVnode->vnodeStatus));
|
||||
|
||||
return 0;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t vnodeMarkAllMetersDropped(SVnodeObj* pVnode) {
|
||||
if (pVnode->meterList == NULL) {
|
||||
assert(pVnode->cfg.maxSessions == 0);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -119,7 +129,7 @@ static int32_t vnodeMarkAllMetersDropped(SVnodeObj* pVnode) {
|
|||
return ready? TSDB_CODE_SUCCESS:TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
int vnodeCloseVnode(int vnode) {
|
||||
static int vnodeCloseVnode(int vnode) {
|
||||
if (vnodeList == NULL) return TSDB_CODE_SUCCESS;
|
||||
|
||||
SVnodeObj* pVnode = &vnodeList[vnode];
|
||||
|
@ -130,12 +140,23 @@ int vnodeCloseVnode(int vnode) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (pVnode->vnodeStatus == TSDB_VNODE_STATUS_DELETING) {
|
||||
dTrace("vid:%d, status:%s, another thread performed delete operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
dTrace("vid:%d, status:%s, enter close operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
|
||||
pVnode->vnodeStatus = TSDB_VNODE_STATUS_CLOSING;
|
||||
}
|
||||
|
||||
// set the meter is dropped flag
|
||||
if (vnodeMarkAllMetersDropped(pVnode) != TSDB_CODE_SUCCESS) {
|
||||
pthread_mutex_unlock(&dmutex);
|
||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
}
|
||||
|
||||
dTrace("vid:%d, status:%s, enter delete operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
|
||||
pVnode->vnodeStatus = TSDB_VNODE_STATUS_DELETING;
|
||||
|
||||
vnodeCloseStream(vnodeList + vnode);
|
||||
vnodeCancelCommit(vnodeList + vnode);
|
||||
vnodeClosePeerVnode(vnode);
|
||||
|
@ -149,9 +170,6 @@ int vnodeCloseVnode(int vnode) {
|
|||
if (tsMaxVnode == vnode) tsMaxVnode = vnode - 1;
|
||||
|
||||
tfree(vnodeList[vnode].meterIndex);
|
||||
memset(vnodeList + vnode, 0, sizeof(SVnodeObj));
|
||||
|
||||
vnodeCalcOpenVnodes();
|
||||
|
||||
pthread_mutex_unlock(&dmutex);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -160,7 +178,12 @@ int vnodeCloseVnode(int vnode) {
|
|||
int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc) {
|
||||
char fileName[128];
|
||||
|
||||
vnodeList[vnode].status = TSDB_STATUS_CREATING;
|
||||
if (vnodeList[vnode].vnodeStatus != TSDB_VNODE_STATUS_OFFLINE) {
|
||||
dError("vid:%d, status:%s, cannot enter create operation", vnode, taosGetVnodeStatusStr(vnodeList[vnode].vnodeStatus));
|
||||
return TSDB_CODE_INVALID_VNODE_STATUS;
|
||||
}
|
||||
|
||||
vnodeList[vnode].vnodeStatus = TSDB_VNODE_STATUS_CREATING;
|
||||
|
||||
sprintf(fileName, "%s/vnode%d", tsDirectory, vnode);
|
||||
mkdir(fileName, 0755);
|
||||
|
@ -177,14 +200,14 @@ int vnodeCreateVnode(int vnode, SVnodeCfg *pCfg, SVPeerDesc *pDesc) {
|
|||
return TSDB_CODE_VG_INIT_FAILED;
|
||||
}
|
||||
|
||||
if (vnodeInitStoreVnode(vnode) != 0) {
|
||||
if (vnodeInitStoreVnode(vnode) < 0) {
|
||||
return TSDB_CODE_VG_COMMITLOG_INIT_FAILED;
|
||||
}
|
||||
|
||||
return vnodeOpenVnode(vnode);
|
||||
}
|
||||
|
||||
void vnodeRemoveDataFiles(int vnode) {
|
||||
static void vnodeRemoveDataFiles(int vnode) {
|
||||
char vnodeDir[TSDB_FILENAME_LEN];
|
||||
char dfilePath[TSDB_FILENAME_LEN];
|
||||
char linkFile[TSDB_FILENAME_LEN];
|
||||
|
@ -227,19 +250,28 @@ void vnodeRemoveDataFiles(int vnode) {
|
|||
|
||||
sprintf(vnodeDir, "%s/vnode%d", tsDirectory, vnode);
|
||||
rmdir(vnodeDir);
|
||||
dTrace("vnode %d is removed!", vnode);
|
||||
dTrace("vid:%d, vnode is removed!", vnode);
|
||||
}
|
||||
|
||||
int vnodeRemoveVnode(int vnode) {
|
||||
if (vnodeList == NULL) return TSDB_CODE_SUCCESS;
|
||||
|
||||
if (vnodeList[vnode].cfg.maxSessions > 0) {
|
||||
int32_t ret = vnodeCloseVnode(vnode);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
SVnodeObj* pVnode = &vnodeList[vnode];
|
||||
if (pVnode->vnodeStatus == TSDB_VNODE_STATUS_CREATING
|
||||
|| pVnode->vnodeStatus == TSDB_VNODE_STATUS_OFFLINE
|
||||
|| pVnode->vnodeStatus == TSDB_VNODE_STATUS_DELETING) {
|
||||
dError("vid:%d, status:%s, cannot enter close/delete operation", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus));
|
||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||
} else {
|
||||
int32_t ret = vnodeCloseVnode(vnode);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
vnodeRemoveDataFiles(vnode);
|
||||
}
|
||||
|
||||
vnodeRemoveDataFiles(vnode);
|
||||
} else {
|
||||
dTrace("vid:%d, max sessions:%d, this vnode already dropped!!!", vnode, vnodeList[vnode].cfg.maxSessions);
|
||||
vnodeList[vnode].cfg.maxSessions = 0; //reset value
|
||||
|
@ -293,7 +325,7 @@ void vnodeCleanUpOneVnode(int vnode) {
|
|||
again = 1;
|
||||
|
||||
if (vnodeList[vnode].pCachePool) {
|
||||
vnodeList[vnode].status = TSDB_STATUS_OFFLINE;
|
||||
vnodeList[vnode].vnodeStatus = TSDB_VNODE_STATUS_OFFLINE;
|
||||
vnodeClosePeerVnode(vnode);
|
||||
}
|
||||
|
||||
|
@ -322,7 +354,7 @@ void vnodeCleanUpVnodes() {
|
|||
|
||||
for (int vnode = 0; vnode < TSDB_MAX_VNODES; ++vnode) {
|
||||
if (vnodeList[vnode].pCachePool) {
|
||||
vnodeList[vnode].status = TSDB_STATUS_OFFLINE;
|
||||
vnodeList[vnode].vnodeStatus = TSDB_VNODE_STATUS_OFFLINE;
|
||||
vnodeClosePeerVnode(vnode);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -171,7 +171,7 @@ void vnodeCloseStream(SVnodeObj *pVnode) {
|
|||
void vnodeUpdateStreamRole(SVnodeObj *pVnode) {
|
||||
/* SMeterObj *pObj; */
|
||||
|
||||
int newRole = (pVnode->status == TSDB_STATUS_MASTER) ? 1 : 0;
|
||||
int newRole = (pVnode->vnodeStatus == TSDB_VNODE_STATUS_MASTER) ? 1 : 0;
|
||||
if (newRole != pVnode->streamRole) {
|
||||
dTrace("vid:%d, stream role is changed to:%d", pVnode->vnode, newRole);
|
||||
pVnode->streamRole = newRole;
|
||||
|
|
|
@ -50,7 +50,7 @@ char *taosBuildReqMsgToMnode(SMgmtObj *pObj, char type) {
|
|||
}
|
||||
|
||||
int taosSendMsgToMnode(SMgmtObj *pObj, char *msg, int msgLen) {
|
||||
mTrace("msg:%s is sent to mnode", taosMsg[*(msg-1)]);
|
||||
dTrace("msg:%s is sent to mnode", taosMsg[*(msg-1)]);
|
||||
|
||||
/*
|
||||
* Lite version has no message header, so minus one
|
||||
|
|
|
@ -30,7 +30,7 @@ int mgmtInitDnodes() {
|
|||
dnodeObj.createdTime = (int64_t)tsRebootTime * 1000;
|
||||
dnodeObj.lastReboot = tsRebootTime;
|
||||
dnodeObj.numOfCores = (uint16_t)tsNumOfCores;
|
||||
dnodeObj.status = TSDB_STATUS_READY;
|
||||
dnodeObj.status = TSDB_DNODE_STATUS_READY;
|
||||
dnodeObj.alternativeRole = TSDB_DNODE_ROLE_ANY;
|
||||
dnodeObj.numOfTotalVnodes = tsNumOfTotalVnodes;
|
||||
dnodeObj.thandle = (void*)(1); //hack way
|
||||
|
|
|
@ -82,7 +82,7 @@ void mgmtCleanUpDnodeInt() {}
|
|||
void mgmtProcessDnodeStatus(void *handle, void *tmrId) {
|
||||
SDnodeObj *pObj = &dnodeObj;
|
||||
pObj->openVnodes = tsOpenVnodes;
|
||||
pObj->status = TSDB_STATUS_READY;
|
||||
pObj->status = TSDB_DNODE_STATUS_READY;
|
||||
|
||||
float memoryUsedMB = 0;
|
||||
taosGetSysMemory(&memoryUsedMB);
|
||||
|
@ -97,7 +97,7 @@ void mgmtProcessDnodeStatus(void *handle, void *tmrId) {
|
|||
if (vnodeList[vnode].cfg.maxSessions <= 0) {
|
||||
pVload->dropStatus = TSDB_VN_STATUS_READY;
|
||||
pVload->status = TSDB_VN_STATUS_READY;
|
||||
mPrint("vid:%d, drop finished", pObj->privateIp, vnode);
|
||||
mPrint("dnode:%s, vid:%d, drop finished", taosIpStr(pObj->privateIp), vnode);
|
||||
taosTmrStart(mgmtMonitorDbDrop, 10000, NULL, mgmtTmr);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "vnode.h"
|
||||
#include "tstatus.h"
|
||||
|
||||
int vnodeInitPeer(int numOfThreads) { return 0; }
|
||||
|
||||
|
@ -30,8 +31,8 @@ void vnodeBroadcastStatusToUnsyncedPeer(SVnodeObj *pVnode) {}
|
|||
|
||||
int vnodeOpenPeerVnode(int vnode) {
|
||||
SVnodeObj *pVnode = vnodeList + vnode;
|
||||
pVnode->status = (pVnode->cfg.replications > 1) ? TSDB_STATUS_UNSYNCED : TSDB_STATUS_MASTER;
|
||||
dTrace("vid:%d, vnode status:%d numOfPeers:%d", vnode, pVnode->status, pVnode->cfg.replications-1);
|
||||
pVnode->vnodeStatus = (pVnode->cfg.replications > 1) ? TSDB_VNODE_STATUS_UNSYNCED : TSDB_VNODE_STATUS_MASTER;
|
||||
dTrace("vid:%d, status:%s numOfPeers:%d", vnode, taosGetVnodeStatusStr(pVnode->vnodeStatus), pVnode->cfg.replications - 1);
|
||||
vnodeUpdateStreamRole(pVnode);
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -13,10 +13,54 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
char* sdbDnodeStatusStr[] = {"offline", "creating", "unsynced", "slave", "master", "ready"};
|
||||
#include "taosmsg.h"
|
||||
#include "tsdb.h"
|
||||
|
||||
char* sdbDnodeBalanceStateStr[] = {"balanced", "balancing", "offline removing", "shell removing"};
|
||||
const char* taosGetVnodeStatusStr(int vnodeStatus) {
|
||||
switch (vnodeStatus) {
|
||||
case TSDB_VNODE_STATUS_OFFLINE:return "offline";
|
||||
case TSDB_VNODE_STATUS_CREATING: return "creating";
|
||||
case TSDB_VNODE_STATUS_UNSYNCED: return "unsynced";
|
||||
case TSDB_VNODE_STATUS_SLAVE: return "slave";
|
||||
case TSDB_VNODE_STATUS_MASTER: return "master";
|
||||
case TSDB_VNODE_STATUS_CLOSING: return "closing";
|
||||
case TSDB_VNODE_STATUS_DELETING: return "deleting";
|
||||
default: return "undefined";
|
||||
}
|
||||
}
|
||||
|
||||
char* sdbVnodeSyncStatusStr[] = {"init", "syncing", "sync_cache", "sync_file"};
|
||||
const char* taosGetDnodeStatusStr(int dnodeStatus) {
|
||||
switch (dnodeStatus) {
|
||||
case TSDB_DNODE_STATUS_OFFLINE: return "offline";
|
||||
case TSDB_DNODE_STATUS_READY: return "ready";
|
||||
default: return "undefined";
|
||||
}
|
||||
}
|
||||
|
||||
char* sdbVnodeDropStateStr[] = {"ready", "dropping"};
|
||||
const char* taosGetDnodeBalanceStateStr(int dnodeBalanceStatus) {
|
||||
switch (dnodeBalanceStatus) {
|
||||
case LB_DNODE_STATE_BALANCED: return "balanced";
|
||||
case LB_DNODE_STATE_BALANCING: return "balancing";
|
||||
case LB_DNODE_STATE_OFFLINE_REMOVING: return "offline removing";
|
||||
case LB_DNODE_STATE_SHELL_REMOVING: return "removing";
|
||||
default: return "undefined";
|
||||
}
|
||||
}
|
||||
|
||||
const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus) {
|
||||
switch (vnodeSyncStatus) {
|
||||
case STDB_SSTATUS_INIT: return "init";
|
||||
case TSDB_SSTATUS_SYNCING: return "syncing";
|
||||
case TSDB_SSTATUS_SYNC_CACHE: return "sync_cache";
|
||||
case TSDB_SSTATUS_SYNC_FILE: return "sync_file";
|
||||
default: return "undefined";
|
||||
}
|
||||
}
|
||||
|
||||
const char* taosGetVnodeDropStatusStr(int dropping) {
|
||||
switch (dropping) {
|
||||
case 0: return "ready";
|
||||
case 1: return "dropping";
|
||||
default: return "undefined";
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue