This commit is contained in:
dapan1121 2021-01-27 15:56:10 +08:00
parent 2327cbb9b8
commit c7c0fd1dd5
1 changed files with 59 additions and 26 deletions

View File

@ -50,6 +50,8 @@ typedef struct {
void *dbConn;
void *tmrCtrl;
pthread_mutex_t mutex;
int32_t delete;
int32_t cqObjNum;
} SCqContext;
typedef struct SCqObj {
@ -72,17 +74,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
int32_t cqObjRef = -1;
void cqRmFromList(SCqObj *pObj) {
//LOCK in caller
void cqFree(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqObj *pObj = handle;
SCqContext *pContext = pObj->pContext;
int32_t last = 0;
pthread_mutex_lock(&pContext->mutex);
if (pObj->prev) {
pObj->prev->next = pObj->next;
@ -94,9 +89,17 @@ void cqFree(void *handle) {
pObj->next->prev = pObj->prev;
}
if (pContext->pHead == NULL) {
last = 1;
}
void cqFree(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqObj *pObj = handle;
SCqContext *pContext = pObj->pContext;
int32_t delete = 0;
pthread_mutex_lock(&pContext->mutex);
// free the resources associated
if (pObj->pStream) {
@ -113,9 +116,15 @@ void cqFree(void *handle) {
free(pObj->sqlStr);
free(pObj);
pContext->cqObjNum--;
if (pContext->cqObjNum <= 0 && pContext->delete) {
delete = 1;
}
pthread_mutex_unlock(&pContext->mutex);
if (last) {
if (delete) {
pthread_mutex_unlock(&pContext->mutex);
pthread_mutex_destroy(&pContext->mutex);
@ -184,18 +193,29 @@ void cqClose(void *handle) {
SCqContext *pContext = handle;
if (handle == NULL) return;
pContext->delete = 1;
// stop all CQs
cqStop(pContext);
// free all resources
pthread_mutex_lock(&pContext->mutex);
int64_t rid = 0;
SCqObj *pObj = pContext->pHead;
while (pObj) {
SCqObj *pTemp = pObj;
pObj = pObj->next;
while (1) {
pthread_mutex_lock(&pContext->mutex);
taosReleaseRef(cqObjRef, pTemp->rid);
SCqObj *pObj = pContext->pHead;
if (pObj) {
cqRmFromList(pObj);
rid = pObj->rid;
} else {
pthread_mutex_unlock(&pContext->mutex);
break;
}
pthread_mutex_unlock(&pContext->mutex);
taosReleaseRef(cqObjRef, rid);
}
}
@ -255,6 +275,7 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch
return NULL;
}
SCqContext *pContext = handle;
int64_t rid = 0;
SCqObj *pObj = calloc(sizeof(SCqObj), 1);
if (pObj == NULL) return NULL;
@ -277,25 +298,36 @@ void *cqCreate(void *handle, uint64_t uid, int32_t sid, const char* dstTable, ch
if (pContext->pHead) pContext->pHead->prev = pObj;
pContext->pHead = pObj;
pContext->cqObjNum++;
pObj->rid = taosAddRef(cqObjRef, pObj);
cqCreateStream(pContext, pObj);
rid = pObj->rid;
pthread_mutex_unlock(&pContext->mutex);
return pObj;
return (void *)rid;
}
void cqDrop(void *handle) {
if (tsEnableStream == 0) {
return;
}
SCqObj *pObj = handle;
SCqObj* pObj = (SCqObj*)taosAcquireRef(cqObjRef, (int64_t)handle);
if (pObj == NULL) {
return;
}
SCqContext *pContext = pObj->pContext;
pthread_mutex_lock(&pContext->mutex);
cqRmFromList(pObj);
// free the resources associated
if (pObj->pStream) {
taos_close_stream(pObj->pStream);
@ -307,7 +339,8 @@ void cqDrop(void *handle) {
pthread_mutex_unlock(&pContext->mutex);
taosReleaseRef(cqObjRef, pObj->rid);
taosRemoveRef(cqObjRef, (int64_t)handle);
taosReleaseRef(cqObjRef, (int64_t)handle);
}
static void doCreateStream(void *param, TAOS_RES *result, int32_t code) {