Merge pull request #1990 from taosdata/enhance/cq
set up DB connection only when there is a continuous query
This commit is contained in:
commit
71c26d5c7c
|
@ -40,6 +40,7 @@ typedef struct {
|
|||
int num; // number of continuous streams
|
||||
struct SCqObj *pHead;
|
||||
void *dbConn;
|
||||
int master;
|
||||
pthread_mutex_t mutex;
|
||||
} SCqContext;
|
||||
|
||||
|
@ -58,6 +59,7 @@ typedef struct SCqObj {
|
|||
int cqDebugFlag = 135;
|
||||
|
||||
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
|
||||
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
|
||||
|
||||
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
||||
|
||||
|
@ -69,6 +71,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
|
|||
pContext->vgId = pCfg->vgId;
|
||||
pContext->cqWrite = pCfg->cqWrite;
|
||||
pContext->ahandle = ahandle;
|
||||
tscEmbedded = 1;
|
||||
|
||||
pthread_mutex_init(&pContext->mutex, NULL);
|
||||
|
||||
|
@ -84,6 +87,8 @@ void cqClose(void *handle) {
|
|||
cqStop(pContext);
|
||||
|
||||
// free all resources
|
||||
pthread_mutex_lock(&pContext->mutex);
|
||||
|
||||
SCqObj *pObj = pContext->pHead;
|
||||
while (pObj) {
|
||||
SCqObj *pTemp = pObj;
|
||||
|
@ -91,6 +96,8 @@ void cqClose(void *handle) {
|
|||
free(pTemp);
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&pContext->mutex);
|
||||
|
||||
pthread_mutex_destroy(&pContext->mutex);
|
||||
|
||||
cTrace("vgId:%d, CQ is closed", pContext->vgId);
|
||||
|
@ -100,28 +107,15 @@ void cqClose(void *handle) {
|
|||
void cqStart(void *handle) {
|
||||
SCqContext *pContext = handle;
|
||||
cTrace("vgId:%d, start all CQs", pContext->vgId);
|
||||
if (pContext->dbConn) return;
|
||||
if (pContext->dbConn || pContext->master) return;
|
||||
|
||||
pthread_mutex_lock(&pContext->mutex);
|
||||
|
||||
tscEmbedded = 1;
|
||||
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
|
||||
if (pContext->dbConn == NULL) {
|
||||
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
|
||||
pthread_mutex_unlock(&pContext->mutex);
|
||||
return;
|
||||
}
|
||||
pContext->master = 1;
|
||||
|
||||
SCqObj *pObj = pContext->pHead;
|
||||
while (pObj) {
|
||||
int64_t lastKey = 0;
|
||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
||||
if (pObj->pStream) {
|
||||
pContext->num++;
|
||||
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
} else {
|
||||
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
}
|
||||
cqCreateStream(pContext, pObj);
|
||||
pObj = pObj->next;
|
||||
}
|
||||
|
||||
|
@ -131,10 +125,11 @@ void cqStart(void *handle) {
|
|||
void cqStop(void *handle) {
|
||||
SCqContext *pContext = handle;
|
||||
cTrace("vgId:%d, stop all CQs", pContext->vgId);
|
||||
if (pContext->dbConn == NULL) return;
|
||||
if (pContext->dbConn == NULL || pContext->master == 0) return;
|
||||
|
||||
pthread_mutex_lock(&pContext->mutex);
|
||||
|
||||
pContext->master = 0;
|
||||
SCqObj *pObj = pContext->pHead;
|
||||
while (pObj) {
|
||||
if (pObj->pStream) {
|
||||
|
@ -176,16 +171,7 @@ void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int column
|
|||
if (pContext->pHead) pContext->pHead->prev = pObj;
|
||||
pContext->pHead = pObj;
|
||||
|
||||
if (pContext->dbConn) {
|
||||
int64_t lastKey = 0;
|
||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
||||
if (pObj->pStream) {
|
||||
pContext->num++;
|
||||
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
} else {
|
||||
cError("vgId:%d, id:%d CQ:%s, failed to launch", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
}
|
||||
}
|
||||
cqCreateStream(pContext, pObj);
|
||||
|
||||
pthread_mutex_unlock(&pContext->mutex);
|
||||
|
||||
|
@ -218,6 +204,26 @@ void cqDrop(void *handle) {
|
|||
pthread_mutex_lock(&pContext->mutex);
|
||||
}
|
||||
|
||||
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
||||
|
||||
if (pContext->dbConn == NULL) {
|
||||
pContext->dbConn = taos_connect("localhost", pContext->user, pContext->pass, NULL, 0);
|
||||
if (pContext->dbConn == NULL) {
|
||||
cError("vgId:%d, failed to connect to TDengine(%s)", pContext->vgId, tstrerror(terrno));
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t lastKey = 0;
|
||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, lastKey, pObj, NULL);
|
||||
if (pObj->pStream) {
|
||||
pContext->num++;
|
||||
cTrace("vgId:%d, id:%d CQ:%s is openned", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
} else {
|
||||
cError("vgId:%d, id:%d CQ:%s, failed to open", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||
}
|
||||
}
|
||||
|
||||
static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
|
||||
SCqObj *pObj = (SCqObj *)param;
|
||||
SCqContext *pContext = pObj->pContext;
|
||||
|
|
Loading…
Reference in New Issue