Merge pull request #6390 from taosdata/cq
Kill Connection cause taosd crash
This commit is contained in:
commit
35de0f4d3d
|
@ -405,6 +405,7 @@ typedef struct SSqlObj {
|
|||
|
||||
typedef struct SSqlStream {
|
||||
SSqlObj *pSql;
|
||||
void * cqhandle; // stream belong to SCQContext handle
|
||||
const char* dstTable;
|
||||
uint32_t streamId;
|
||||
char listed;
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "ttimer.h"
|
||||
#include "tutil.h"
|
||||
#include "taosmsg.h"
|
||||
#include "tcq.h"
|
||||
|
||||
#include "taos.h"
|
||||
|
||||
|
@ -294,24 +295,34 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
|
|||
return msgLen;
|
||||
}
|
||||
|
||||
// cqContext->dbconn is killed then call this callback
|
||||
void cqConnKilledNotify(void* handle, void* conn) {
|
||||
if (handle == NULL || conn == NULL){
|
||||
return ;
|
||||
}
|
||||
|
||||
SCqContext* pContext = (SCqContext*) handle;
|
||||
if (pContext->dbConn == conn){
|
||||
atomic_store_ptr(&(pContext->dbConn), NULL);
|
||||
}
|
||||
}
|
||||
|
||||
void tscKillConnection(STscObj *pObj) {
|
||||
// get stream header by locked
|
||||
pthread_mutex_lock(&pObj->mutex);
|
||||
|
||||
SSqlObj *pSql = pObj->sqlList;
|
||||
while (pSql) {
|
||||
pSql = pSql->next;
|
||||
}
|
||||
|
||||
|
||||
SSqlStream *pStream = pObj->streamList;
|
||||
pthread_mutex_unlock(&pObj->mutex);
|
||||
|
||||
while (pStream) {
|
||||
SSqlStream *tmp = pStream->next;
|
||||
// set associate variant to NULL
|
||||
cqConnKilledNotify(pStream->cqhandle, pObj);
|
||||
// taos_close_stream function call pObj->mutet lock , careful death-lock
|
||||
taos_close_stream(pStream);
|
||||
pStream = tmp;
|
||||
}
|
||||
|
||||
pthread_mutex_unlock(&pObj->mutex);
|
||||
|
||||
tscDebug("connection:%p is killed", pObj);
|
||||
taos_close(pObj);
|
||||
}
|
||||
|
||||
|
|
|
@ -141,6 +141,10 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
|
|||
|
||||
pStream->numOfRes = 0; // reset the numOfRes.
|
||||
SSqlObj *pSql = pStream->pSql;
|
||||
// pSql == NULL maybe killStream already called
|
||||
if(pSql == NULL) {
|
||||
return ;
|
||||
}
|
||||
SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0);
|
||||
tscDebug("0x%"PRIx64" add into timer", pSql->self);
|
||||
|
||||
|
@ -662,7 +666,7 @@ void cbParseSql(void* param, TAOS_RES* res, int code) {
|
|||
}
|
||||
|
||||
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
||||
int64_t stime, void *param, void (*callback)(void *)) {
|
||||
int64_t stime, void *param, void (*callback)(void *), void* cqhandle) {
|
||||
STscObj *pObj = (STscObj *)taos;
|
||||
if (pObj == NULL || pObj->signature != pObj) return NULL;
|
||||
|
||||
|
@ -695,6 +699,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
|
|||
pStream->callback = callback;
|
||||
pStream->param = param;
|
||||
pStream->pSql = pSql;
|
||||
pStream->cqhandle = cqhandle;
|
||||
pSql->pStream = pStream;
|
||||
pSql->param = pStream;
|
||||
pSql->maxRetry = TSDB_MAX_REPLICA;
|
||||
|
@ -735,7 +740,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c
|
|||
|
||||
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
||||
int64_t stime, void *param, void (*callback)(void *)) {
|
||||
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback);
|
||||
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback, NULL);
|
||||
}
|
||||
|
||||
void taos_close_stream(TAOS_STREAM *handle) {
|
||||
|
|
|
@ -38,21 +38,6 @@
|
|||
#define cDebug(...) { if (cqDebugFlag & DEBUG_DEBUG) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
|
||||
#define cTrace(...) { if (cqDebugFlag & DEBUG_TRACE) { taosPrintLog("CQ ", cqDebugFlag, __VA_ARGS__); }}
|
||||
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int32_t master;
|
||||
int32_t num; // number of continuous streams
|
||||
char user[TSDB_USER_LEN];
|
||||
char pass[TSDB_KEY_LEN];
|
||||
char db[TSDB_DB_NAME_LEN];
|
||||
FCqWrite cqWrite;
|
||||
struct SCqObj *pHead;
|
||||
void *dbConn;
|
||||
void *tmrCtrl;
|
||||
pthread_mutex_t mutex;
|
||||
int32_t delete;
|
||||
int32_t cqObjNum;
|
||||
} SCqContext;
|
||||
|
||||
typedef struct SCqObj {
|
||||
tmr_h tmrId;
|
||||
|
@ -439,7 +424,7 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
|
|||
|
||||
// inner implement in tscStream.c
|
||||
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
||||
int64_t stime, void *param, void (*callback)(void *));
|
||||
int64_t stime, void *param, void (*callback)(void *), void* cqhandle);
|
||||
|
||||
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
||||
pObj->pContext = pContext;
|
||||
|
@ -453,7 +438,8 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
|||
pObj->tmrId = 0;
|
||||
|
||||
if (pObj->pStream == NULL) {
|
||||
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL);
|
||||
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, \
|
||||
INT64_MIN, (void *)pObj->rid, NULL, pContext);
|
||||
|
||||
// TODO the pObj->pStream may be released if error happens
|
||||
if (pObj->pStream) {
|
||||
|
|
|
@ -31,6 +31,23 @@ typedef struct {
|
|||
FCqWrite cqWrite;
|
||||
} SCqCfg;
|
||||
|
||||
// SCqContext
|
||||
typedef struct {
|
||||
int32_t vgId;
|
||||
int32_t master;
|
||||
int32_t num; // number of continuous streams
|
||||
char user[TSDB_USER_LEN];
|
||||
char pass[TSDB_KEY_LEN];
|
||||
char db[TSDB_DB_NAME_LEN];
|
||||
FCqWrite cqWrite;
|
||||
struct SCqObj *pHead;
|
||||
void *dbConn;
|
||||
void *tmrCtrl;
|
||||
pthread_mutex_t mutex;
|
||||
int32_t delete;
|
||||
int32_t cqObjNum;
|
||||
} SCqContext;
|
||||
|
||||
// the following API shall be called by vnode
|
||||
void *cqOpen(void *ahandle, const SCqCfg *pCfg);
|
||||
void cqClose(void *handle);
|
||||
|
|
Loading…
Reference in New Issue