set up DB connection only when there is a continuous query
This commit is contained in:
parent
4aff58e83f
commit
42e44b4b00
|
@ -40,6 +40,7 @@ typedef struct {
|
||||||
int num; // number of continuous streams
|
int num; // number of continuous streams
|
||||||
struct SCqObj *pHead;
|
struct SCqObj *pHead;
|
||||||
void *dbConn;
|
void *dbConn;
|
||||||
|
int master;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
} SCqContext;
|
} SCqContext;
|
||||||
|
|
||||||
|
@ -58,6 +59,7 @@ typedef struct SCqObj {
|
||||||
int cqDebugFlag = 135;
|
int cqDebugFlag = 135;
|
||||||
|
|
||||||
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
|
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
|
||||||
|
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
|
||||||
|
|
||||||
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
||||||
|
|
||||||
|
@ -69,6 +71,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
||||||
pContext->vgId = pCfg->vgId;
|
pContext->vgId = pCfg->vgId;
|
||||||
pContext->cqWrite = pCfg->cqWrite;
|
pContext->cqWrite = pCfg->cqWrite;
|
||||||
pContext->ahandle = ahandle;
|
pContext->ahandle = ahandle;
|
||||||
|
tscEmbedded = 1;
|
||||||
|
|
||||||
pthread_mutex_init(&pContext->mutex, NULL);
|
pthread_mutex_init(&pContext->mutex, NULL);
|
||||||
|
|
||||||
|
@ -84,6 +87,8 @@ void cqClose(void *handle) {
|
||||||
cqStop(pContext);
|
cqStop(pContext);
|
||||||
|
|
||||||
// free all resources
|
// free all resources
|
||||||
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
SCqObj *pObj = pContext->pHead;
|
SCqObj *pObj = pContext->pHead;
|
||||||
while (pObj) {
|
while (pObj) {
|
||||||
SCqObj *pTemp = pObj;
|
SCqObj *pTemp = pObj;
|
||||||
|
@ -91,6 +96,8 @@ void cqClose(void *handle) {
|
||||||
free(pTemp);
|
free(pTemp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
|
|
||||||
pthread_mutex_destroy(&pContext->mutex);
|
pthread_mutex_destroy(&pContext->mutex);
|
||||||
|
|
||||||
cTrace("vgId:%d, CQ is closed", pContext->vgId);
|
cTrace("vgId:%d, CQ is closed", pContext->vgId);
|
||||||
|
@ -100,28 +107,15 @@ void cqClose(void *handle) {
|
||||||
void cqStart(void *handle) {
|
void cqStart(void *handle) {
|
||||||
SCqContext *pContext = handle;
|
SCqContext *pContext = handle;
|
||||||
cTrace("vgId:%d, start all CQs", pContext->vgId);
|
cTrace("vgId:%d, start all CQs", pContext->vgId);
|
||||||
if (pContext->dbConn) return;
|
if (pContext->dbConn || pContext->master) return;
|
||||||
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
tscEmbedded = 1;
|
pContext->master = 1;
|
||||||
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
|
|
||||||
if (pContext->dbConn == NULL) {
|
|
||||||
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
SCqObj *pObj = pContext->pHead;
|
SCqObj *pObj = pContext->pHead;
|
||||||
while (pObj) {
|
while (pObj) {
|
||||||
int64_t lastKey = 0;
|
cqCreateStream(pContext, pObj);
|
||||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
|
||||||
if (pObj->pStream) {
|
|
||||||
pContext->num++;
|
|
||||||
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
} else {
|
|
||||||
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
}
|
|
||||||
pObj = pObj->next;
|
pObj = pObj->next;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,10 +125,11 @@ void cqStart(void *handle) {
|
||||||
void cqStop(void *handle) {
|
void cqStop(void *handle) {
|
||||||
SCqContext *pContext = handle;
|
SCqContext *pContext = handle;
|
||||||
cTrace("vgId:%d, stop all CQs", pContext->vgId);
|
cTrace("vgId:%d, stop all CQs", pContext->vgId);
|
||||||
if (pContext->dbConn == NULL) return;
|
if (pContext->dbConn == NULL || pContext->master == 0) return;
|
||||||
|
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
|
|
||||||
|
pContext->master = 0;
|
||||||
SCqObj *pObj = pContext->pHead;
|
SCqObj *pObj = pContext->pHead;
|
||||||
while (pObj) {
|
while (pObj) {
|
||||||
if (pObj->pStream) {
|
if (pObj->pStream) {
|
||||||
|
@ -176,16 +171,7 @@ void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int column
|
||||||
if (pContext->pHead) pContext->pHead->prev = pObj;
|
if (pContext->pHead) pContext->pHead->prev = pObj;
|
||||||
pContext->pHead = pObj;
|
pContext->pHead = pObj;
|
||||||
|
|
||||||
if (pContext->dbConn) {
|
cqCreateStream(pContext, pObj);
|
||||||
int64_t lastKey = 0;
|
|
||||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
|
||||||
if (pObj->pStream) {
|
|
||||||
pContext->num++;
|
|
||||||
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
} else {
|
|
||||||
cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->tid, pObj->sqlStr);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&pContext->mutex);
|
pthread_mutex_unlock(&pContext->mutex);
|
||||||
|
|
||||||
|
@ -218,6 +204,26 @@ void cqDrop(void *handle) {
|
||||||
pthread_mutex_lock(&pContext->mutex);
|
pthread_mutex_lock(&pContext->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
||||||
|
|
||||||
|
if (pContext->dbConn == NULL) {
|
||||||
|
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
|
||||||
|
if (pContext->dbConn == NULL) {
|
||||||
|
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t lastKey = 0;
|
||||||
|
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
||||||
|
if (pObj->pStream) {
|
||||||
|
pContext->num++;
|
||||||
|
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
} else {
|
||||||
|
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
||||||
SCqObj *pObj = (SCqObj *)param;
|
SCqObj *pObj = (SCqObj *)param;
|
||||||
SCqContext *pContext = pObj->pContext;
|
SCqContext *pContext = pObj->pContext;
|
||||||
|
|
Loading…
Reference in New Issue