Merge remote-tracking branch 'origin/develop' into feature/vnode
This commit is contained in:
commit
a11c44b1dc
|
@ -399,7 +399,7 @@ static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
SMDCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg);
|
SMDCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg);
|
||||||
|
|
||||||
void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId);
|
void *pVnode = vnodeAcquire(pCreate->cfg.vgId);
|
||||||
if (pVnode != NULL) {
|
if (pVnode != NULL) {
|
||||||
dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId);
|
dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId);
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
|
@ -413,7 +413,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
|
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
|
||||||
SMDAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg);
|
SMDAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg);
|
||||||
|
|
||||||
void *pVnode = vnodeAcquireVnode(pAlter->cfg.vgId);
|
void *pVnode = vnodeAcquire(pAlter->cfg.vgId);
|
||||||
if (pVnode != NULL) {
|
if (pVnode != NULL) {
|
||||||
dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId);
|
dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId);
|
||||||
int32_t code = vnodeAlter(pVnode, pAlter);
|
int32_t code = vnodeAlter(pVnode, pAlter);
|
||||||
|
|
|
@ -91,23 +91,21 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
|
||||||
int32_t queuedMsgNum = 0;
|
int32_t queuedMsgNum = 0;
|
||||||
int32_t leftLen = pMsg->contLen;
|
int32_t leftLen = pMsg->contLen;
|
||||||
char *pCont = (char *) pMsg->pCont;
|
char *pCont = (char *) pMsg->pCont;
|
||||||
void *pVnode;
|
|
||||||
|
|
||||||
while (leftLen > 0) {
|
while (leftLen > 0) {
|
||||||
SMsgHead *pHead = (SMsgHead *) pCont;
|
SMsgHead *pHead = (SMsgHead *) pCont;
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
pHead->contLen = htonl(pHead->contLen);
|
pHead->contLen = htonl(pHead->contLen);
|
||||||
|
|
||||||
pVnode = vnodeAcquireVnode(pHead->vgId);
|
taos_queue queue = vnodeAcquireRqueue(pHead->vgId);
|
||||||
|
|
||||||
if (pVnode == NULL) {
|
if (queue == NULL) {
|
||||||
leftLen -= pHead->contLen;
|
leftLen -= pHead->contLen;
|
||||||
pCont -= pHead->contLen;
|
pCont -= pHead->contLen;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// put message into queue
|
// put message into queue
|
||||||
taos_queue queue = vnodeGetRqueue(pVnode);
|
|
||||||
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
||||||
pRead->rpcMsg = *pMsg;
|
pRead->rpcMsg = *pMsg;
|
||||||
pRead->pCont = pCont;
|
pRead->pCont = pCont;
|
||||||
|
@ -175,18 +173,6 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
|
||||||
// dynamically adjust the number of threads
|
// dynamically adjust the number of threads
|
||||||
}
|
}
|
||||||
|
|
||||||
void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle) {
|
|
||||||
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
|
||||||
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
|
|
||||||
pRead->pCont = qhandle;
|
|
||||||
pRead->contLen = 0;
|
|
||||||
|
|
||||||
assert(pVnode != NULL);
|
|
||||||
taos_queue queue = vnodeAcquireRqueue(pVnode);
|
|
||||||
|
|
||||||
taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead);
|
|
||||||
}
|
|
||||||
|
|
||||||
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
|
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
|
||||||
SRpcMsg rpcRsp = {
|
SRpcMsg rpcRsp = {
|
||||||
.handle = pRead->rpcMsg.handle,
|
.handle = pRead->rpcMsg.handle,
|
||||||
|
|
|
@ -104,7 +104,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
pHead->vgId = htonl(pHead->vgId);
|
||||||
pHead->contLen = htonl(pHead->contLen);
|
pHead->contLen = htonl(pHead->contLen);
|
||||||
|
|
||||||
taos_queue queue = vnodeGetWqueue(pHead->vgId);
|
taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
|
||||||
if (queue) {
|
if (queue) {
|
||||||
// put message into queue
|
// put message into queue
|
||||||
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
|
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
|
||||||
|
|
|
@ -53,7 +53,6 @@ void *dnodeAllocateVnodeWqueue(void *pVnode);
|
||||||
void dnodeFreeVnodeWqueue(void *queue);
|
void dnodeFreeVnodeWqueue(void *queue);
|
||||||
void *dnodeAllocateVnodeRqueue(void *pVnode);
|
void *dnodeAllocateVnodeRqueue(void *pVnode);
|
||||||
void dnodeFreeVnodeRqueue(void *rqueue);
|
void dnodeFreeVnodeRqueue(void *rqueue);
|
||||||
void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle);
|
|
||||||
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
|
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
|
||||||
|
|
||||||
int32_t dnodeAllocateMnodePqueue();
|
int32_t dnodeAllocateMnodePqueue();
|
||||||
|
|
|
@ -79,7 +79,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
|
||||||
typedef void (*FNotifyRole)(void *ahandle, int8_t role);
|
typedef void (*FNotifyRole)(void *ahandle, int8_t role);
|
||||||
|
|
||||||
// when data file is synced successfully, notity app
|
// when data file is synced successfully, notity app
|
||||||
typedef void (*FNotifyFileSynced)(void *ahandle, uint64_t fversion);
|
typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId; // vgroup ID
|
int32_t vgId; // vgroup ID
|
||||||
|
|
|
@ -22,10 +22,10 @@ extern "C" {
|
||||||
|
|
||||||
typedef enum _VN_STATUS {
|
typedef enum _VN_STATUS {
|
||||||
TAOS_VN_STATUS_INIT,
|
TAOS_VN_STATUS_INIT,
|
||||||
TAOS_VN_STATUS_UPDATING,
|
|
||||||
TAOS_VN_STATUS_READY,
|
TAOS_VN_STATUS_READY,
|
||||||
TAOS_VN_STATUS_CLOSING,
|
TAOS_VN_STATUS_CLOSING,
|
||||||
TAOS_VN_STATUS_DELETING,
|
TAOS_VN_STATUS_UPDATING,
|
||||||
|
TAOS_VN_STATUS_RESET,
|
||||||
} EVnStatus;
|
} EVnStatus;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -47,13 +47,10 @@ int32_t vnodeOpen(int32_t vgId, char *rootDir);
|
||||||
int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
|
int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
|
||||||
int32_t vnodeClose(int32_t vgId);
|
int32_t vnodeClose(int32_t vgId);
|
||||||
|
|
||||||
void vnodeRelease(void *pVnode);
|
void* vnodeAcquire(int32_t vgId); // add refcount
|
||||||
void* vnodeAcquireVnode(int32_t vgId); // add refcount
|
void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue
|
||||||
void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged
|
void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue
|
||||||
|
void vnodeRelease(void *pVnode); // dec refCount
|
||||||
void* vnodeAcquireRqueue(void *);
|
|
||||||
void* vnodeGetRqueue(void *);
|
|
||||||
void* vnodeGetWqueue(int32_t vgId);
|
|
||||||
void* vnodeGetWal(void *pVnode);
|
void* vnodeGetWal(void *pVnode);
|
||||||
|
|
||||||
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
|
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
|
||||||
|
|
|
@ -270,6 +270,14 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* set REUSEADDR option, so the portnumber can be re-used */
|
||||||
|
int reuse = 1;
|
||||||
|
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
|
||||||
|
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
|
||||||
|
close(sockFd);
|
||||||
|
return -1;
|
||||||
|
};
|
||||||
|
|
||||||
if ( clientIp != 0) {
|
if ( clientIp != 0) {
|
||||||
memset((char *)&clientAddr, 0, sizeof(clientAddr));
|
memset((char *)&clientAddr, 0, sizeof(clientAddr));
|
||||||
clientAddr.sin_family = AF_INET;
|
clientAddr.sin_family = AF_INET;
|
||||||
|
|
|
@ -37,7 +37,7 @@ extern int32_t vDebugFlag;
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId; // global vnode group ID
|
int32_t vgId; // global vnode group ID
|
||||||
int32_t refCount; // reference count
|
int32_t refCount; // reference count
|
||||||
int status;
|
int8_t status;
|
||||||
int8_t role;
|
int8_t role;
|
||||||
int8_t accessState;
|
int8_t accessState;
|
||||||
int64_t version; // current version
|
int64_t version; // current version
|
||||||
|
@ -55,6 +55,8 @@ typedef struct {
|
||||||
SWalCfg walCfg;
|
SWalCfg walCfg;
|
||||||
void *qMgmt;
|
void *qMgmt;
|
||||||
char *rootDir;
|
char *rootDir;
|
||||||
|
tsem_t sem;
|
||||||
|
int8_t dropped;
|
||||||
char db[TSDB_DB_NAME_LEN];
|
char db[TSDB_DB_NAME_LEN];
|
||||||
} SVnodeObj;
|
} SVnodeObj;
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status);
|
||||||
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion);
|
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion);
|
||||||
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
|
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
|
||||||
static void vnodeNotifyRole(void *ahandle, int8_t role);
|
static void vnodeNotifyRole(void *ahandle, int8_t role);
|
||||||
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
|
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
|
||||||
|
|
||||||
#ifndef _SYNC
|
#ifndef _SYNC
|
||||||
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
|
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
|
||||||
|
@ -153,7 +153,7 @@ int32_t vnodeDrop(int32_t vgId) {
|
||||||
|
|
||||||
SVnodeObj *pVnode = *ppVnode;
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount);
|
vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount);
|
||||||
pVnode->status = TAOS_VN_STATUS_DELETING;
|
pVnode->dropped = 1;
|
||||||
vnodeCleanUp(pVnode);
|
vnodeCleanUp(pVnode);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -164,18 +164,11 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
|
|
||||||
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
|
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
|
||||||
// cfgVersion can be corrected by status msg
|
// cfgVersion can be corrected by status msg
|
||||||
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_UPDATING) != TAOS_VN_STATUS_READY) {
|
||||||
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
|
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
// the vnode may always fail to synchronize because of it in low cfgVersion
|
|
||||||
// so cannot use the following codes
|
|
||||||
// if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED)
|
|
||||||
// return TSDB_CODE_VND_NOT_SYNCED;
|
|
||||||
|
|
||||||
pVnode->status = TAOS_VN_STATUS_UPDATING;
|
|
||||||
|
|
||||||
int32_t code = vnodeSaveCfg(pVnodeCfg);
|
int32_t code = vnodeSaveCfg(pVnodeCfg);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pVnode->status = TAOS_VN_STATUS_READY;
|
pVnode->status = TAOS_VN_STATUS_READY;
|
||||||
|
@ -194,11 +187,13 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pVnode->tsdb) {
|
||||||
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
|
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
pVnode->status = TAOS_VN_STATUS_READY;
|
pVnode->status = TAOS_VN_STATUS_READY;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pVnode->status = TAOS_VN_STATUS_READY;
|
pVnode->status = TAOS_VN_STATUS_READY;
|
||||||
vDebug("vgId:%d, vnode is altered", pVnode->vgId);
|
vDebug("vgId:%d, vnode is altered", pVnode->vgId);
|
||||||
|
@ -223,6 +218,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
|
||||||
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
|
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
|
||||||
pVnode->rootDir = strdup(rootDir);
|
pVnode->rootDir = strdup(rootDir);
|
||||||
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
|
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
|
||||||
|
tsem_init(&pVnode->sem, 0, 0);
|
||||||
|
|
||||||
int32_t code = vnodeReadCfg(pVnode);
|
int32_t code = vnodeReadCfg(pVnode);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -319,7 +315,6 @@ int32_t vnodeClose(int32_t vgId) {
|
||||||
|
|
||||||
SVnodeObj *pVnode = *ppVnode;
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
vDebug("vgId:%d, vnode will be closed", pVnode->vgId);
|
vDebug("vgId:%d, vnode will be closed", pVnode->vgId);
|
||||||
pVnode->status = TAOS_VN_STATUS_CLOSING;
|
|
||||||
vnodeCleanUp(pVnode);
|
vnodeCleanUp(pVnode);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -334,6 +329,8 @@ void vnodeRelease(void *pVnodeRaw) {
|
||||||
|
|
||||||
if (refCount > 0) {
|
if (refCount > 0) {
|
||||||
vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount);
|
vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount);
|
||||||
|
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2)
|
||||||
|
tsem_post(&pVnode->sem);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -344,11 +341,6 @@ void vnodeRelease(void *pVnodeRaw) {
|
||||||
tsdbCloseRepo(pVnode->tsdb, 1);
|
tsdbCloseRepo(pVnode->tsdb, 1);
|
||||||
pVnode->tsdb = NULL;
|
pVnode->tsdb = NULL;
|
||||||
|
|
||||||
// stop continuous query
|
|
||||||
if (pVnode->cq)
|
|
||||||
cqClose(pVnode->cq);
|
|
||||||
pVnode->cq = NULL;
|
|
||||||
|
|
||||||
if (pVnode->wal)
|
if (pVnode->wal)
|
||||||
walClose(pVnode->wal);
|
walClose(pVnode->wal);
|
||||||
pVnode->wal = NULL;
|
pVnode->wal = NULL;
|
||||||
|
@ -363,20 +355,21 @@ void vnodeRelease(void *pVnodeRaw) {
|
||||||
|
|
||||||
tfree(pVnode->rootDir);
|
tfree(pVnode->rootDir);
|
||||||
|
|
||||||
if (pVnode->status == TAOS_VN_STATUS_DELETING) {
|
if (pVnode->dropped) {
|
||||||
char rootDir[TSDB_FILENAME_LEN] = {0};
|
char rootDir[TSDB_FILENAME_LEN] = {0};
|
||||||
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId);
|
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId);
|
||||||
taosMvDir(tsVnodeBakDir, rootDir);
|
taosMvDir(tsVnodeBakDir, rootDir);
|
||||||
taosRemoveDir(rootDir);
|
taosRemoveDir(rootDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tsem_destroy(&pVnode->sem);
|
||||||
free(pVnode);
|
free(pVnode);
|
||||||
|
|
||||||
int32_t count = taosHashGetSize(tsDnodeVnodesHash);
|
int32_t count = taosHashGetSize(tsDnodeVnodesHash);
|
||||||
vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count);
|
vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *vnodeGetVnode(int32_t vgId) {
|
void *vnodeAcquire(int32_t vgId) {
|
||||||
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
|
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
|
||||||
if (ppVnode == NULL || *ppVnode == NULL) {
|
if (ppVnode == NULL || *ppVnode == NULL) {
|
||||||
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
|
@ -384,35 +377,38 @@ void *vnodeGetVnode(int32_t vgId) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return *ppVnode;
|
SVnodeObj *pVnode = *ppVnode;
|
||||||
}
|
|
||||||
|
|
||||||
void *vnodeAcquireVnode(int32_t vgId) {
|
|
||||||
SVnodeObj *pVnode = vnodeGetVnode(vgId);
|
|
||||||
if (pVnode == NULL) return pVnode;
|
|
||||||
|
|
||||||
atomic_add_fetch_32(&pVnode->refCount, 1);
|
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||||
vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount);
|
vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount);
|
||||||
|
|
||||||
return pVnode;
|
return pVnode;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *vnodeAcquireRqueue(void *param) {
|
void *vnodeAcquireRqueue(int32_t vgId) {
|
||||||
SVnodeObj *pVnode = param;
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
if (pVnode == NULL) return NULL;
|
if (pVnode == NULL) return NULL;
|
||||||
|
|
||||||
atomic_add_fetch_32(&pVnode->refCount, 1);
|
if (pVnode->status == TAOS_VN_STATUS_RESET) {
|
||||||
vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount);
|
terrno = TSDB_CODE_VND_INVALID_STATUS;
|
||||||
return ((SVnodeObj *)pVnode)->rqueue;
|
vInfo("vgId:%d, status is in reset", vgId);
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return pVnode->rqueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *vnodeGetRqueue(void *pVnode) {
|
void *vnodeAcquireWqueue(int32_t vgId) {
|
||||||
return ((SVnodeObj *)pVnode)->rqueue;
|
SVnodeObj *pVnode = vnodeAcquire(vgId);
|
||||||
}
|
|
||||||
|
|
||||||
void *vnodeGetWqueue(int32_t vgId) {
|
|
||||||
SVnodeObj *pVnode = vnodeAcquireVnode(vgId);
|
|
||||||
if (pVnode == NULL) return NULL;
|
if (pVnode == NULL) return NULL;
|
||||||
|
|
||||||
|
if (pVnode->status == TAOS_VN_STATUS_RESET) {
|
||||||
|
terrno = TSDB_CODE_VND_INVALID_STATUS;
|
||||||
|
vInfo("vgId:%d, status is in reset", vgId);
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
return pVnode->wqueue;
|
return pVnode->wqueue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -484,7 +480,7 @@ void vnodeBuildStatusMsg(void *param) {
|
||||||
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
|
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
|
||||||
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||||
pAccess[i].vgId = htonl(pAccess[i].vgId);
|
pAccess[i].vgId = htonl(pAccess[i].vgId);
|
||||||
SVnodeObj *pVnode = vnodeAcquireVnode(pAccess[i].vgId);
|
SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId);
|
||||||
if (pVnode != NULL) {
|
if (pVnode != NULL) {
|
||||||
pVnode->accessState = pAccess[i].accessState;
|
pVnode->accessState = pAccess[i].accessState;
|
||||||
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
|
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
|
||||||
|
@ -498,11 +494,29 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
|
||||||
static void vnodeCleanUp(SVnodeObj *pVnode) {
|
static void vnodeCleanUp(SVnodeObj *pVnode) {
|
||||||
// remove from hash, so new messages wont be consumed
|
// remove from hash, so new messages wont be consumed
|
||||||
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
|
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
|
||||||
|
int i = 0;
|
||||||
|
|
||||||
|
if (pVnode->status != TAOS_VN_STATUS_INIT) {
|
||||||
|
// it may be in updateing or reset state, then it shall wait
|
||||||
|
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) {
|
||||||
|
if (++i % 1000 == 0) {
|
||||||
|
sched_yield();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// stop replication module
|
// stop replication module
|
||||||
if (pVnode->sync) {
|
if (pVnode->sync) {
|
||||||
syncStop(pVnode->sync);
|
void *sync = pVnode->sync;
|
||||||
pVnode->sync = NULL;
|
pVnode->sync = NULL;
|
||||||
|
syncStop(sync);
|
||||||
|
}
|
||||||
|
|
||||||
|
// stop continuous query
|
||||||
|
if (pVnode->cq) {
|
||||||
|
void *cq = pVnode->cq;
|
||||||
|
pVnode->cq = NULL;
|
||||||
|
cqClose(cq);
|
||||||
}
|
}
|
||||||
|
|
||||||
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
|
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
|
||||||
|
@ -549,18 +563,25 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
|
||||||
cqStop(pVnode->cq);
|
cqStop(pVnode->cq);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
|
static int vnodeResetTsdb(SVnodeObj *pVnode)
|
||||||
SVnodeObj *pVnode = ahandle;
|
{
|
||||||
vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion);
|
|
||||||
|
|
||||||
pVnode->fversion = fversion;
|
|
||||||
pVnode->version = fversion;
|
|
||||||
vnodeSaveVersion(pVnode);
|
|
||||||
|
|
||||||
char rootDir[128] = "\0";
|
char rootDir[128] = "\0";
|
||||||
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
|
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
|
||||||
// clsoe tsdb, then open tsdb
|
|
||||||
tsdbCloseRepo(pVnode->tsdb, 0);
|
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY)
|
||||||
|
return -1;
|
||||||
|
|
||||||
|
void *tsdb = pVnode->tsdb;
|
||||||
|
pVnode->tsdb = NULL;
|
||||||
|
|
||||||
|
// acquire vnode
|
||||||
|
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||||
|
|
||||||
|
if (refCount > 2)
|
||||||
|
tsem_wait(&pVnode->sem);
|
||||||
|
|
||||||
|
// close tsdb, then open tsdb
|
||||||
|
tsdbCloseRepo(tsdb, 0);
|
||||||
STsdbAppH appH = {0};
|
STsdbAppH appH = {0};
|
||||||
appH.appH = (void *)pVnode;
|
appH.appH = (void *)pVnode;
|
||||||
appH.notifyStatus = vnodeProcessTsdbStatus;
|
appH.notifyStatus = vnodeProcessTsdbStatus;
|
||||||
|
@ -569,6 +590,22 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
|
||||||
appH.cqDropFunc = cqDrop;
|
appH.cqDropFunc = cqDrop;
|
||||||
appH.configFunc = dnodeSendCfgTableToRecv;
|
appH.configFunc = dnodeSendCfgTableToRecv;
|
||||||
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
|
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
|
||||||
|
|
||||||
|
pVnode->status = TAOS_VN_STATUS_READY;
|
||||||
|
vnodeRelease(pVnode);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
|
||||||
|
SVnodeObj *pVnode = ahandle;
|
||||||
|
vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion);
|
||||||
|
|
||||||
|
pVnode->fversion = fversion;
|
||||||
|
pVnode->version = fversion;
|
||||||
|
vnodeSaveVersion(pVnode);
|
||||||
|
|
||||||
|
return vnodeResetTsdb(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeInt.h"
|
#include "vnodeInt.h"
|
||||||
|
#include "tqueue.h"
|
||||||
|
|
||||||
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
|
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
|
||||||
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
|
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
|
||||||
|
@ -51,6 +52,11 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
|
||||||
return TSDB_CODE_VND_INVALID_STATUS;
|
return TSDB_CODE_VND_INVALID_STATUS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// tsdb may be in reset state
|
||||||
|
if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
|
||||||
|
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
|
||||||
|
return TSDB_CODE_RPC_NOT_READY;
|
||||||
|
|
||||||
// TODO: Later, let slave to support query
|
// TODO: Later, let slave to support query
|
||||||
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
|
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
|
||||||
|
@ -60,6 +66,16 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
|
||||||
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
|
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
|
||||||
|
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
|
||||||
|
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
|
||||||
|
pRead->pCont = qhandle;
|
||||||
|
pRead->contLen = 0;
|
||||||
|
|
||||||
|
atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||||
|
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
void *pCont = pReadMsg->pCont;
|
void *pCont = pReadMsg->pCont;
|
||||||
int32_t contLen = pReadMsg->contLen;
|
int32_t contLen = pReadMsg->contLen;
|
||||||
|
@ -131,7 +147,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
if (handle != NULL) {
|
if (handle != NULL) {
|
||||||
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle);
|
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle);
|
||||||
|
|
||||||
dnodePutItemIntoReadQueue(pVnode, *handle);
|
vnodePutItemIntoReadQueue(pVnode, *handle);
|
||||||
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -208,7 +224,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
|
||||||
} else { // if failed to dump result, free qhandle immediately
|
} else { // if failed to dump result, free qhandle immediately
|
||||||
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) {
|
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) {
|
||||||
if (qHasMoreResultsToRetrieve(*handle)) {
|
if (qHasMoreResultsToRetrieve(*handle)) {
|
||||||
dnodePutItemIntoReadQueue(pVnode, *handle);
|
vnodePutItemIntoReadQueue(pVnode, *handle);
|
||||||
pRet->qhandle = *handle;
|
pRet->qhandle = *handle;
|
||||||
freeHandle = false;
|
freeHandle = false;
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -59,13 +59,18 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
|
||||||
return TSDB_CODE_VND_NO_WRITE_AUTH;
|
return TSDB_CODE_VND_NO_WRITE_AUTH;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// tsdb may be in reset state
|
||||||
|
if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
|
||||||
|
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
|
||||||
|
return TSDB_CODE_RPC_NOT_READY;
|
||||||
|
|
||||||
if (pHead->version == 0) { // from client or CQ
|
if (pHead->version == 0) { // from client or CQ
|
||||||
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
if (pVnode->status != TAOS_VN_STATUS_READY) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status);
|
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status);
|
||||||
return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state
|
return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
|
||||||
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role);
|
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role);
|
||||||
return TSDB_CODE_RPC_NOT_READY;
|
return TSDB_CODE_RPC_NOT_READY;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue