Merge from master
This commit is contained in:
commit
0d5d2736be
|
@ -283,6 +283,7 @@ typedef struct SSqlStream {
|
||||||
int64_t ctime; // stream created time
|
int64_t ctime; // stream created time
|
||||||
int64_t stime; // stream next executed time
|
int64_t stime; // stream next executed time
|
||||||
int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed
|
int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed
|
||||||
|
int64_t ltime; // stream last row time in stream table
|
||||||
SInterval interval;
|
SInterval interval;
|
||||||
void * pTimer;
|
void * pTimer;
|
||||||
|
|
||||||
|
|
|
@ -24,6 +24,7 @@
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
#include "tscProfile.h"
|
#include "tscProfile.h"
|
||||||
|
#include "tscSubquery.h"
|
||||||
|
|
||||||
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows);
|
static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows);
|
||||||
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows);
|
static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows);
|
||||||
|
@ -47,8 +48,8 @@ static bool isProjectStream(SQueryInfo* pQueryInfo) {
|
||||||
|
|
||||||
static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, int16_t prec) {
|
static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, int16_t prec) {
|
||||||
float retryRangeFactor = 0.3f;
|
float retryRangeFactor = 0.3f;
|
||||||
int64_t retryDelta = (int64_t)(tsStreamCompRetryDelay * retryRangeFactor);
|
int64_t retryDelta = (int64_t)(tsRetryStreamCompDelay * retryRangeFactor);
|
||||||
retryDelta = ((rand() % retryDelta) + tsStreamCompRetryDelay) * 1000L;
|
retryDelta = ((rand() % retryDelta) + tsRetryStreamCompDelay) * 1000L;
|
||||||
|
|
||||||
if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
|
if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') {
|
||||||
// change to ms
|
// change to ms
|
||||||
|
@ -575,6 +576,14 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
|
||||||
|
|
||||||
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime);
|
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime);
|
||||||
|
|
||||||
|
// set stime with ltime if ltime > stime
|
||||||
|
const char* dstTable = pStream->dstTable? pStream->dstTable: "";
|
||||||
|
tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime);
|
||||||
|
if(pStream->ltime != INT64_MIN && pStream->ltime > pStream->stime) {
|
||||||
|
tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime>0 ", dstTable, pStream->stime, pStream->ltime);
|
||||||
|
pStream->stime = pStream->ltime;
|
||||||
|
}
|
||||||
|
|
||||||
int64_t starttime = tscGetLaunchTimestamp(pStream);
|
int64_t starttime = tscGetLaunchTimestamp(pStream);
|
||||||
pCmd->command = TSDB_SQL_SELECT;
|
pCmd->command = TSDB_SQL_SELECT;
|
||||||
|
|
||||||
|
@ -590,7 +599,66 @@ void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) {
|
||||||
pStream->dstTable = dstTable;
|
pStream->dstTable = dstTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
// fetchFp call back
|
||||||
|
void fetchFpStreamLastRow(void* param ,TAOS_RES* res, int num) {
|
||||||
|
SSqlStream* pStream = (SSqlStream*)param;
|
||||||
|
SSqlObj* pSql = res;
|
||||||
|
|
||||||
|
// get row data set to ltime
|
||||||
|
tscSetSqlOwner(pSql);
|
||||||
|
TAOS_ROW row = doSetResultRowData(pSql);
|
||||||
|
if( row && row[0] ) {
|
||||||
|
pStream->ltime = *((int64_t*)row[0]);
|
||||||
|
const char* dstTable = pStream->dstTable? pStream->dstTable: "";
|
||||||
|
tscDebug(" CQ stream table=%s last row time=%"PRId64" .", dstTable, pStream->ltime);
|
||||||
|
}
|
||||||
|
tscClearSqlOwner(pSql);
|
||||||
|
|
||||||
|
// no condition call
|
||||||
|
tscCreateStream(param, pStream->pSql, TSDB_CODE_SUCCESS);
|
||||||
|
taos_free_result(res);
|
||||||
|
}
|
||||||
|
|
||||||
|
// fp callback
|
||||||
|
void fpStreamLastRow(void* param ,TAOS_RES* res, int code) {
|
||||||
|
// check result successful
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tscCreateStream(param, res, TSDB_CODE_SUCCESS);
|
||||||
|
taos_free_result(res);
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
|
// asynchronous fetch last row data
|
||||||
|
taos_fetch_rows_a(res, fetchFpStreamLastRow, param);
|
||||||
|
}
|
||||||
|
|
||||||
|
void cbParseSql(void* param, TAOS_RES* res, int code) {
|
||||||
|
// check result successful
|
||||||
|
SSqlStream* pStream = (SSqlStream*)param;
|
||||||
|
SSqlObj* pSql = pStream->pSql;
|
||||||
|
SSqlCmd* pCmd = &pSql->cmd;
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
pSql->res.code = code;
|
||||||
|
tscDebug("0x%"PRIx64" open stream parse sql failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code));
|
||||||
|
pStream->fp(pStream->param, NULL, NULL);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check dstTable valid
|
||||||
|
if(pStream->dstTable == NULL || strlen(pStream->dstTable) == 0) {
|
||||||
|
tscDebug(" cbParseSql dstTable is empty.");
|
||||||
|
tscCreateStream(param, res, code);
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
|
// query stream last row time async
|
||||||
|
char sql[128] = "";
|
||||||
|
sprintf(sql, "select last_row(*) from %s;", pStream->dstTable);
|
||||||
|
taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param);
|
||||||
|
return ;
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
||||||
int64_t stime, void *param, void (*callback)(void *)) {
|
int64_t stime, void *param, void (*callback)(void *)) {
|
||||||
STscObj *pObj = (STscObj *)taos;
|
STscObj *pObj = (STscObj *)taos;
|
||||||
if (pObj == NULL || pObj->signature != pObj) return NULL;
|
if (pObj == NULL || pObj->signature != pObj) return NULL;
|
||||||
|
@ -613,11 +681,16 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pStream->stime = stime;
|
pStream->ltime = INT64_MIN;
|
||||||
pStream->fp = fp;
|
pStream->stime = stime;
|
||||||
|
pStream->fp = fp;
|
||||||
pStream->callback = callback;
|
pStream->callback = callback;
|
||||||
pStream->param = param;
|
pStream->param = param;
|
||||||
pStream->pSql = pSql;
|
pStream->pSql = pSql;
|
||||||
|
pSql->pStream = pStream;
|
||||||
|
pSql->param = pStream;
|
||||||
|
pSql->maxRetry = TSDB_MAX_REPLICA;
|
||||||
|
tscSetStreamDestTable(pStream, dstTable);
|
||||||
|
|
||||||
pSql->pStream = pStream;
|
pSql->pStream = pStream;
|
||||||
pSql->param = pStream;
|
pSql->param = pStream;
|
||||||
|
@ -640,10 +713,17 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
||||||
|
|
||||||
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
|
tscDebugL("0x%"PRIx64" SQL: %s", pSql->self, pSql->sqlstr);
|
||||||
|
|
||||||
|
pSql->fp = cbParseSql;
|
||||||
|
pSql->fetchFp = cbParseSql;
|
||||||
|
|
||||||
|
registerSqlObj(pSql);
|
||||||
|
|
||||||
int32_t code = tsParseSql(pSql, true);
|
int32_t code = tsParseSql(pSql, true);
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
tscCreateStream(pStream, pSql, code);
|
cbParseSql(pStream, pSql, code);
|
||||||
} else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
} else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) {
|
||||||
|
tscDebug(" CQ taso_open_stream IN Process. sql=%s", sqlstr);
|
||||||
|
} else {
|
||||||
tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code));
|
tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code));
|
||||||
taosReleaseRef(tscObjRef, pSql->self);
|
taosReleaseRef(tscObjRef, pSql->self);
|
||||||
free(pStream);
|
free(pStream);
|
||||||
|
@ -653,6 +733,11 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
|
||||||
return pStream;
|
return pStream;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
||||||
|
int64_t stime, void *param, void (*callback)(void *)) {
|
||||||
|
return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback);
|
||||||
|
}
|
||||||
|
|
||||||
void taos_close_stream(TAOS_STREAM *handle) {
|
void taos_close_stream(TAOS_STREAM *handle) {
|
||||||
SSqlStream *pStream = (SSqlStream *)handle;
|
SSqlStream *pStream = (SSqlStream *)handle;
|
||||||
|
|
||||||
|
|
|
@ -75,7 +75,7 @@ extern int32_t tsMinSlidingTime;
|
||||||
extern int32_t tsMinIntervalTime;
|
extern int32_t tsMinIntervalTime;
|
||||||
extern int32_t tsMaxStreamComputDelay;
|
extern int32_t tsMaxStreamComputDelay;
|
||||||
extern int32_t tsStreamCompStartDelay;
|
extern int32_t tsStreamCompStartDelay;
|
||||||
extern int32_t tsStreamCompRetryDelay;
|
extern int32_t tsRetryStreamCompDelay;
|
||||||
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
|
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
|
||||||
extern int32_t tsProjectExecInterval;
|
extern int32_t tsProjectExecInterval;
|
||||||
extern int64_t tsMaxRetentWindow;
|
extern int64_t tsMaxRetentWindow;
|
||||||
|
|
|
@ -93,7 +93,7 @@ int32_t tsMaxStreamComputDelay = 20000;
|
||||||
int32_t tsStreamCompStartDelay = 10000;
|
int32_t tsStreamCompStartDelay = 10000;
|
||||||
|
|
||||||
// the stream computing delay time after executing failed, change accordingly
|
// the stream computing delay time after executing failed, change accordingly
|
||||||
int32_t tsStreamCompRetryDelay = 10;
|
int32_t tsRetryStreamCompDelay = 10*1000;
|
||||||
|
|
||||||
// The delayed computing ration. 10% of the whole computing time window by default.
|
// The delayed computing ration. 10% of the whole computing time window by default.
|
||||||
float tsStreamComputDelayRatio = 0.1f;
|
float tsStreamComputDelayRatio = 0.1f;
|
||||||
|
@ -710,7 +710,7 @@ static void doInitGlobalConfig(void) {
|
||||||
taosInitConfigOption(cfg);
|
taosInitConfigOption(cfg);
|
||||||
|
|
||||||
cfg.option = "retryStreamCompDelay";
|
cfg.option = "retryStreamCompDelay";
|
||||||
cfg.ptr = &tsStreamCompRetryDelay;
|
cfg.ptr = &tsRetryStreamCompDelay;
|
||||||
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
cfg.valType = TAOS_CFG_VTYPE_INT32;
|
||||||
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
|
cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW;
|
||||||
cfg.minValue = 10;
|
cfg.minValue = 10;
|
||||||
|
|
|
@ -437,6 +437,10 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
|
||||||
taosReleaseRef(cqObjRef, (int64_t)param);
|
taosReleaseRef(cqObjRef, (int64_t)param);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// inner implement in tscStream.c
|
||||||
|
TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
|
||||||
|
int64_t stime, void *param, void (*callback)(void *));
|
||||||
|
|
||||||
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
||||||
pObj->pContext = pContext;
|
pObj->pContext = pContext;
|
||||||
|
|
||||||
|
@ -449,11 +453,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
|
||||||
pObj->tmrId = 0;
|
pObj->tmrId = 0;
|
||||||
|
|
||||||
if (pObj->pStream == NULL) {
|
if (pObj->pStream == NULL) {
|
||||||
pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL);
|
pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL);
|
||||||
|
|
||||||
// TODO the pObj->pStream may be released if error happens
|
// TODO the pObj->pStream may be released if error happens
|
||||||
if (pObj->pStream) {
|
if (pObj->pStream) {
|
||||||
tscSetStreamDestTable(pObj->pStream, pObj->dstTable);
|
|
||||||
pContext->num++;
|
pContext->num++;
|
||||||
cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr);
|
cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -432,7 +432,7 @@ class TDDnodes:
|
||||||
self.simDeployed = False
|
self.simDeployed = False
|
||||||
|
|
||||||
def init(self, path):
|
def init(self, path):
|
||||||
psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'"
|
psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}'"
|
||||||
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
|
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
|
||||||
while(processID):
|
while(processID):
|
||||||
killCmd = "kill -TERM %s > /dev/null 2>&1" % processID
|
killCmd = "kill -TERM %s > /dev/null 2>&1" % processID
|
||||||
|
@ -545,14 +545,14 @@ class TDDnodes:
|
||||||
for i in range(len(self.dnodes)):
|
for i in range(len(self.dnodes)):
|
||||||
self.dnodes[i].stop()
|
self.dnodes[i].stop()
|
||||||
|
|
||||||
psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'"
|
psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}'"
|
||||||
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
|
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
|
||||||
if processID:
|
if processID:
|
||||||
cmd = "sudo systemctl stop taosd"
|
cmd = "sudo systemctl stop taosd"
|
||||||
os.system(cmd)
|
os.system(cmd)
|
||||||
# if os.system(cmd) != 0 :
|
# if os.system(cmd) != 0 :
|
||||||
# tdLog.exit(cmd)
|
# tdLog.exit(cmd)
|
||||||
psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'"
|
psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}'"
|
||||||
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
|
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
|
||||||
while(processID):
|
while(processID):
|
||||||
killCmd = "kill -TERM %s > /dev/null 2>&1" % processID
|
killCmd = "kill -TERM %s > /dev/null 2>&1" % processID
|
||||||
|
|
Loading…
Reference in New Issue