[TBASE-1129]
This commit is contained in:
parent
d8292d6630
commit
86ad416657
|
@ -73,12 +73,18 @@ enum _TSDB_VG_LB_STATUS {
|
||||||
TSDB_VG_LB_STATUS_UPDATE
|
TSDB_VG_LB_STATUS_UPDATE
|
||||||
};
|
};
|
||||||
|
|
||||||
|
enum _TSDB_VN_STREAM_STATUS {
|
||||||
|
TSDB_VN_STREAM_STATUS_STOP,
|
||||||
|
TSDB_VN_STREAM_STATUS_START
|
||||||
|
};
|
||||||
|
|
||||||
const char* taosGetVnodeStatusStr(int vnodeStatus);
|
const char* taosGetVnodeStatusStr(int vnodeStatus);
|
||||||
const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus);
|
const char* taosGetVnodeSyncStatusStr(int vnodeSyncStatus);
|
||||||
const char* taosGetVnodeDropStatusStr(int dropping);
|
const char* taosGetVnodeDropStatusStr(int dropping);
|
||||||
const char* taosGetDnodeStatusStr(int dnodeStatus);
|
const char* taosGetDnodeStatusStr(int dnodeStatus);
|
||||||
const char* taosGetDnodeLbStatusStr(int dnodeBalanceStatus);
|
const char* taosGetDnodeLbStatusStr(int dnodeBalanceStatus);
|
||||||
const char* taosGetVgroupLbStatusStr(int vglbStatus);
|
const char* taosGetVgroupLbStatusStr(int vglbStatus);
|
||||||
|
const char* taosGetVnodeStreamStatusStr(int vnodeStreamStatus);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,7 +81,7 @@ void vnodeOpenStreams(void *param, void *tmrId) {
|
||||||
SVnodeObj *pVnode = (SVnodeObj *)param;
|
SVnodeObj *pVnode = (SVnodeObj *)param;
|
||||||
SMeterObj *pObj;
|
SMeterObj *pObj;
|
||||||
|
|
||||||
if (pVnode->streamRole == 0) return;
|
if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return;
|
||||||
if (pVnode->meterList == NULL) return;
|
if (pVnode->meterList == NULL) return;
|
||||||
|
|
||||||
taosTmrStopA(&pVnode->streamTimer);
|
taosTmrStopA(&pVnode->streamTimer);
|
||||||
|
@ -120,7 +120,7 @@ void vnodeCreateStream(SMeterObj *pObj) {
|
||||||
|
|
||||||
SVnodeObj *pVnode = vnodeList + pObj->vnode;
|
SVnodeObj *pVnode = vnodeList + pObj->vnode;
|
||||||
|
|
||||||
if (pVnode->streamRole == 0) return;
|
if (pVnode->streamRole == TSDB_VN_STREAM_STATUS_STOP) return;
|
||||||
if (pObj->pStream) return;
|
if (pObj->pStream) return;
|
||||||
|
|
||||||
dTrace("vid:%d sid:%d id:%s stream:%s is created", pObj->vnode, pObj->sid, pObj->meterId, pObj->pSql);
|
dTrace("vid:%d sid:%d id:%s stream:%s is created", pObj->vnode, pObj->sid, pObj->meterId, pObj->pSql);
|
||||||
|
@ -155,7 +155,7 @@ void vnodeRemoveStream(SMeterObj *pObj) {
|
||||||
// Close all streams in a vnode
|
// Close all streams in a vnode
|
||||||
void vnodeCloseStream(SVnodeObj *pVnode) {
|
void vnodeCloseStream(SVnodeObj *pVnode) {
|
||||||
SMeterObj *pObj;
|
SMeterObj *pObj;
|
||||||
dTrace("vid:%d, stream is closed, old role:%d", pVnode->vnode, pVnode->streamRole);
|
dTrace("vid:%d, stream is closed, old role %s", pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole));
|
||||||
|
|
||||||
// stop stream computing
|
// stop stream computing
|
||||||
for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) {
|
for (int sid = 0; sid < pVnode->cfg.maxSessions; ++sid) {
|
||||||
|
@ -172,9 +172,10 @@ void vnodeCloseStream(SVnodeObj *pVnode) {
|
||||||
void vnodeUpdateStreamRole(SVnodeObj *pVnode) {
|
void vnodeUpdateStreamRole(SVnodeObj *pVnode) {
|
||||||
/* SMeterObj *pObj; */
|
/* SMeterObj *pObj; */
|
||||||
|
|
||||||
int newRole = (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) ? 1 : 0;
|
int newRole = (pVnode->vnodeStatus == TSDB_VN_STATUS_MASTER) ? TSDB_VN_STREAM_STATUS_START : TSDB_VN_STREAM_STATUS_STOP;
|
||||||
if (newRole != pVnode->streamRole) {
|
if (newRole != pVnode->streamRole) {
|
||||||
dTrace("vid:%d, stream role is changed to:%d", pVnode->vnode, newRole);
|
dTrace("vid:%d, stream role is changed from %s to %s",
|
||||||
|
pVnode->vnode, taosGetVnodeStreamStatusStr(pVnode->streamRole), taosGetVnodeStreamStatusStr(newRole));
|
||||||
pVnode->streamRole = newRole;
|
pVnode->streamRole = newRole;
|
||||||
if (newRole) {
|
if (newRole) {
|
||||||
vnodeOpenStreams(pVnode, NULL);
|
vnodeOpenStreams(pVnode, NULL);
|
||||||
|
@ -182,7 +183,7 @@ void vnodeUpdateStreamRole(SVnodeObj *pVnode) {
|
||||||
vnodeCloseStream(pVnode);
|
vnodeCloseStream(pVnode);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
dTrace("vid:%d, stream role is keep to:%d", pVnode->vnode, newRole);
|
dTrace("vid:%d, stream role is keep to %s", pVnode->vnode, taosGetVnodeStreamStatusStr(newRole));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -73,3 +73,11 @@ const char* taosGetVgroupLbStatusStr(int vglbStatus) {
|
||||||
default: return "undefined";
|
default: return "undefined";
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const char* taosGetVnodeStreamStatusStr(int vnodeStreamStatus) {
|
||||||
|
switch (vnodeStreamStatus) {
|
||||||
|
case TSDB_VN_STREAM_STATUS_START: return "start";
|
||||||
|
case TSDB_VN_STREAM_STATUS_STOP: return "stop";
|
||||||
|
default: return "undefined";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue