From 261e50e23e5bc34723032c4f96e8908d24bb69ce Mon Sep 17 00:00:00 2001 From: tickduan <417921451@qq.com> Date: Sat, 22 May 2021 15:11:45 +0800 Subject: [PATCH 1/7] cq can continue with output table last row time --- src/client/src/tscStream.c | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index f0f87f26db..267fa0c0fe 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -539,6 +539,31 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; } +// +// get tableName last row time, if have error return zero. +// +static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, const char* tableName) { + + int64_t last_time = 0; + char sql[128] = ""; + sprintf(sql, "select last_row(*) from %s;", tableName); + + // query sql + TAOS_RES* res = taos_query(pSql->pTscObj, sql); + if(res == NULL) + return 0; + + // only fetch one row + TAOS_ROW row = taos_fetch_row(res); + if( row[0] ) { + last_time = *((int64_t*)row[0]); + } + + // free and return + taos_free_result(res); + return last_time; +} + static void tscCreateStream(void *param, TAOS_RES *res, int code) { SSqlStream* pStream = (SSqlStream*)param; SSqlObj* pSql = pStream->pSql; @@ -572,6 +597,13 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime); + // set output table last record time to stime if have, why do this, because continue with last break + int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, pStream->dstTable); + if(last_time > 0 && last_time > pStream->stime) { + // can replace stime with last row time + pStream->stime = last_time; + } + int64_t starttime = tscGetLaunchTimestamp(pStream); pCmd->command = TSDB_SQL_SELECT; From 1ee65f6c86bdbfc5402fd7ee0048249e07ac9a2a Mon Sep 17 00:00:00 2001 From: tickduan <417921451@qq.com> Date: Sat, 22 May 2021 15:39:46 +0800 Subject: [PATCH 2/7] modify stream retry defalut delay from 10ms to 10*1000ms --- src/client/src/tscStream.c | 4 ++-- src/common/inc/tglobal.h | 2 +- src/common/src/tglobal.c | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 267fa0c0fe..0f6a403582 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -48,8 +48,8 @@ static bool isProjectStream(SQueryInfo* pQueryInfo) { static int64_t tscGetRetryDelayTime(SSqlStream* pStream, int64_t slidingTime, int16_t prec) { float retryRangeFactor = 0.3f; - int64_t retryDelta = (int64_t)(tsStreamCompRetryDelay * retryRangeFactor); - retryDelta = ((rand() % retryDelta) + tsStreamCompRetryDelay) * 1000L; + int64_t retryDelta = (int64_t)(tsRetryStreamCompDelay * retryRangeFactor); + retryDelta = ((rand() % retryDelta) + tsRetryStreamCompDelay) * 1000L; if (pStream->interval.intervalUnit != 'n' && pStream->interval.intervalUnit != 'y') { // change to ms diff --git a/src/common/inc/tglobal.h b/src/common/inc/tglobal.h index 26475834d5..1e66ce3f0c 100644 --- a/src/common/inc/tglobal.h +++ b/src/common/inc/tglobal.h @@ -74,7 +74,7 @@ extern int32_t tsMinSlidingTime; extern int32_t tsMinIntervalTime; extern int32_t tsMaxStreamComputDelay; extern int32_t tsStreamCompStartDelay; -extern int32_t tsStreamCompRetryDelay; +extern int32_t tsRetryStreamCompDelay; extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window extern int32_t tsProjectExecInterval; extern int64_t tsMaxRetentWindow; diff --git a/src/common/src/tglobal.c b/src/common/src/tglobal.c index c3c159ee45..2f18c8f73a 100644 --- a/src/common/src/tglobal.c +++ b/src/common/src/tglobal.c @@ -92,7 +92,7 @@ int32_t tsMaxStreamComputDelay = 20000; int32_t tsStreamCompStartDelay = 10000; // the stream computing delay time after executing failed, change accordingly -int32_t tsStreamCompRetryDelay = 10; +int32_t tsRetryStreamCompDelay = 10*1000; // The delayed computing ration. 10% of the whole computing time window by default. float tsStreamComputDelayRatio = 0.1f; @@ -696,7 +696,7 @@ static void doInitGlobalConfig(void) { taosInitConfigOption(cfg); cfg.option = "retryStreamCompDelay"; - cfg.ptr = &tsStreamCompRetryDelay; + cfg.ptr = &tsRetryStreamCompDelay; cfg.valType = TAOS_CFG_VTYPE_INT32; cfg.cfgType = TSDB_CFG_CTYPE_B_CONFIG | TSDB_CFG_CTYPE_B_SHOW; cfg.minValue = 10; From 4b4199ce415076bdb4f0a7244e9aef21fb876fe5 Mon Sep 17 00:00:00 2001 From: tickduan <417921451@qq.com> Date: Wed, 26 May 2021 20:50:00 +0800 Subject: [PATCH 3/7] cq support continue query from last stop time --- src/client/inc/tsclient.h | 1 + src/client/src/tscStream.c | 41 +++++++++++++++++++++++++++++--------- 2 files changed, 33 insertions(+), 9 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4bfd3bc88f..0e63fa3551 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -422,6 +422,7 @@ typedef struct SSqlStream { int64_t ctime; // stream created time int64_t stime; // stream next executed time int64_t etime; // stream end query time, when time is larger then etime, the stream will be closed + int64_t ltime; // stream last row time in stream table SInterval interval; void * pTimer; diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 0f6a403582..9094f95dfc 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -538,12 +538,11 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; } - +///* // // get tableName last row time, if have error return zero. // static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, const char* tableName) { - int64_t last_time = 0; char sql[128] = ""; sprintf(sql, "select last_row(*) from %s;", tableName); @@ -555,7 +554,7 @@ static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, con // only fetch one row TAOS_ROW row = taos_fetch_row(res); - if( row[0] ) { + if( row && row[0] ) { last_time = *((int64_t*)row[0]); } @@ -563,7 +562,7 @@ static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, con taos_free_result(res); return last_time; } - +//*/ static void tscCreateStream(void *param, TAOS_RES *res, int code) { SSqlStream* pStream = (SSqlStream*)param; SSqlObj* pSql = pStream->pSql; @@ -597,10 +596,14 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime); - // set output table last record time to stime if have, why do this, because continue with last break - int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, pStream->dstTable); + // set output table last record time to stime if have, why do this, because continue with last brea + const char* dstTable = pStream->dstTable? pStream->dstTable: ""; + int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, dstTable); + pStream->ltime = last_time; + tscDebug(" CQ get table=%s lasttime=%"PRId64" end.", dstTable, last_time); if(last_time > 0 && last_time > pStream->stime) { // can replace stime with last row time + tscDebug(" CQ set table %s stime=%"PRId64" with lasttime=%"PRId64" ", dstTable, pStream->stime, last_time); pStream->stime = last_time; } @@ -619,6 +622,24 @@ void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) { pStream->dstTable = dstTable; } +// already run on another thread +void tscCreateStreamThread(SSchedMsg* pMsg) { + tscDebug(" new thread Sched call tscCreateStream begin..."); + tscCreateStream(pMsg->ahandle, NULL, 0); + tscDebug(" new thread Sched call tscCreateStream end."); + return ; +} + +// parsesql async response return and change run thread +void tsParseSqlRet(void* param, TAOS_RES* res, int code) { + SSchedMsg schedMsg = { 0 }; + schedMsg.fp = tscCreateStreamThread; + schedMsg.ahandle = param; + schedMsg.thandle = res; + schedMsg.msg = NULL; + taosScheduleTask(tscQhandle, &schedMsg); +} + TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)) { STscObj *pObj = (STscObj *)taos; @@ -664,15 +685,17 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); tsem_init(&pSql->rspSem, 0, 0); - pSql->fp = tscCreateStream; - pSql->fetchFp = tscCreateStream; + pSql->fp = tsParseSqlRet; + pSql->fetchFp = tsParseSqlRet; registerSqlObj(pSql); int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_SUCCESS) { tscCreateStream(pStream, pSql, code); - } else if (code != TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { + tscDebug(" cq parseSql IN Process pass. "); + } else { tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code)); taosReleaseRef(tscObjRef, pSql->self); free(pStream); From 398397bcaaa082b004b0b359fcdbfe849e57b7de Mon Sep 17 00:00:00 2001 From: tickduan <417921451@qq.com> Date: Fri, 28 May 2021 11:10:20 +0800 Subject: [PATCH 4/7] change to asynchrous call mode with support last time query --- src/client/src/tscStream.c | 127 +++++++++++++++++++++---------------- src/cq/src/cqMain.c | 7 +- 2 files changed, 79 insertions(+), 55 deletions(-) diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 9094f95dfc..7e6132b7c8 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -25,6 +25,7 @@ #include "tutil.h" #include "tscProfile.h" +#include "tscSubquery.h" static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOfRows); static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows); @@ -538,31 +539,7 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) { return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer; } -///* -// -// get tableName last row time, if have error return zero. -// -static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, const char* tableName) { - int64_t last_time = 0; - char sql[128] = ""; - sprintf(sql, "select last_row(*) from %s;", tableName); - // query sql - TAOS_RES* res = taos_query(pSql->pTscObj, sql); - if(res == NULL) - return 0; - - // only fetch one row - TAOS_ROW row = taos_fetch_row(res); - if( row && row[0] ) { - last_time = *((int64_t*)row[0]); - } - - // free and return - taos_free_result(res); - return last_time; -} -//*/ static void tscCreateStream(void *param, TAOS_RES *res, int code) { SSqlStream* pStream = (SSqlStream*)param; SSqlObj* pSql = pStream->pSql; @@ -596,15 +573,12 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime); - // set output table last record time to stime if have, why do this, because continue with last brea + // set stime with ltime if ltime > stime const char* dstTable = pStream->dstTable? pStream->dstTable: ""; - int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, dstTable); - pStream->ltime = last_time; - tscDebug(" CQ get table=%s lasttime=%"PRId64" end.", dstTable, last_time); - if(last_time > 0 && last_time > pStream->stime) { - // can replace stime with last row time - tscDebug(" CQ set table %s stime=%"PRId64" with lasttime=%"PRId64" ", dstTable, pStream->stime, last_time); - pStream->stime = last_time; + tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime); + if(pStream->ltime > 0 && pStream->ltime > pStream->stime) { + tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" ", dstTable, pStream->stime, pStream->ltime); + pStream->stime = pStream->ltime; } int64_t starttime = tscGetLaunchTimestamp(pStream); @@ -622,25 +596,66 @@ void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) { pStream->dstTable = dstTable; } -// already run on another thread -void tscCreateStreamThread(SSchedMsg* pMsg) { - tscDebug(" new thread Sched call tscCreateStream begin..."); - tscCreateStream(pMsg->ahandle, NULL, 0); - tscDebug(" new thread Sched call tscCreateStream end."); +// fetchFp call back +void fetchFpStreamLastRow(void* param ,TAOS_RES* res, int num) { + SSqlStream* pStream = (SSqlStream*)param; + SSqlObj* pSql = res; + + // get row data set to ltime + tscSetSqlOwner(pSql); + TAOS_ROW row = doSetResultRowData(pSql); + if( row && row[0] ) { + pStream->ltime = *((int64_t*)row[0]); + const char* dstTable = pStream->dstTable? pStream->dstTable: ""; + tscDebug(" CQ stream table=%s last row time=%"PRId64" .", dstTable, pStream->ltime); + } + tscClearSqlOwner(pSql); + + // no condition call + tscCreateStream(param, pStream->pSql, TSDB_CODE_SUCCESS); + taos_free_result(res); +} + +// fp callback +void fpStreamLastRow(void* param ,TAOS_RES* res, int code) { + // check result successful + if (code != TSDB_CODE_SUCCESS) { + tscCreateStream(param, res, TSDB_CODE_SUCCESS); + taos_free_result(res); + return ; + } + + // asynchronous fetch last row data + taos_fetch_rows_a(res, fetchFpStreamLastRow, param); +} + +void cbParseSql(void* param, TAOS_RES* res, int code) { + // check result successful + SSqlStream* pStream = (SSqlStream*)param; + SSqlObj* pSql = pStream->pSql; + SSqlCmd* pCmd = &pSql->cmd; + if (code != TSDB_CODE_SUCCESS) { + pSql->res.code = code; + tscDebug("0x%"PRIx64" open stream parse sql failed, sql:%s, reason:%s, code:%s", pSql->self, pSql->sqlstr, pCmd->payload, tstrerror(code)); + pStream->fp(pStream->param, NULL, NULL); + return; + } + + // check dstTable valid + if(pStream->dstTable == NULL || strlen(pStream->dstTable) == 0) { + tscDebug(" cbParseSql dstTable is empty."); + tscCreateStream(param, res, code); + return ; + } + + // query stream last row time async + char sql[128] = ""; + sprintf(sql, "select last_row(*) from %s;", pStream->dstTable); + taos_query_a(pSql->pTscObj, sql, fpStreamLastRow, param); return ; } -// parsesql async response return and change run thread -void tsParseSqlRet(void* param, TAOS_RES* res, int code) { - SSchedMsg schedMsg = { 0 }; - schedMsg.fp = tscCreateStreamThread; - schedMsg.ahandle = param; - schedMsg.thandle = res; - schedMsg.msg = NULL; - taosScheduleTask(tscQhandle, &schedMsg); -} - -TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), +TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)) { STscObj *pObj = (STscObj *)taos; if (pObj == NULL || pObj->signature != pObj) return NULL; @@ -671,6 +686,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p pSql->pStream = pStream; pSql->param = pStream; pSql->maxRetry = TSDB_MAX_REPLICA; + tscSetStreamDestTable(pStream, dstTable); pSql->sqlstr = calloc(1, strlen(sqlstr) + 1); if (pSql->sqlstr == NULL) { @@ -685,16 +701,16 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p tscDebugL("%p SQL: %s", pSql, pSql->sqlstr); tsem_init(&pSql->rspSem, 0, 0); - pSql->fp = tsParseSqlRet; - pSql->fetchFp = tsParseSqlRet; + pSql->fp = cbParseSql; + pSql->fetchFp = cbParseSql; registerSqlObj(pSql); - + int32_t code = tsParseSql(pSql, true); if (code == TSDB_CODE_SUCCESS) { - tscCreateStream(pStream, pSql, code); + cbParseSql(pStream, pSql, code); } else if (code == TSDB_CODE_TSC_ACTION_IN_PROGRESS) { - tscDebug(" cq parseSql IN Process pass. "); + tscDebug(" CQ taso_open_stream IN Process. sql=%s", sqlstr); } else { tscError("0x%"PRIx64" open stream failed, sql:%s, code:%s", pSql->self, sqlstr, tstrerror(code)); taosReleaseRef(tscObjRef, pSql->self); @@ -705,6 +721,11 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p return pStream; } +TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), + int64_t stime, void *param, void (*callback)(void *)) { + return taos_open_stream_withname(taos, "", sqlstr, fp, stime, param, callback); +} + void taos_close_stream(TAOS_STREAM *handle) { SSqlStream *pStream = (SSqlStream *)handle; diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 5d5d5f339e..ee4be02b90 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -437,6 +437,10 @@ static void cqProcessCreateTimer(void *param, void *tmrId) { taosReleaseRef(cqObjRef, (int64_t)param); } +// inner implement in tscStream.c +TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* desName, const char *sqlstr, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), + int64_t stime, void *param, void (*callback)(void *)); + static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->pContext = pContext; @@ -449,11 +453,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) { pObj->tmrId = 0; if (pObj->pStream == NULL) { - pObj->pStream = taos_open_stream(pContext->dbConn, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL); + pObj->pStream = taos_open_stream_withname(pContext->dbConn, pObj->dstTable, pObj->sqlStr, cqProcessStreamRes, INT64_MIN, (void *)pObj->rid, NULL); // TODO the pObj->pStream may be released if error happens if (pObj->pStream) { - tscSetStreamDestTable(pObj->pStream, pObj->dstTable); pContext->num++; cDebug("vgId:%d, id:%d CQ:%s is opened", pContext->vgId, pObj->tid, pObj->sqlStr); } else { From c7764d44780c89e7b1f46c80e4e96dfeea3068f2 Mon Sep 17 00:00:00 2001 From: tickduan <417921451@qq.com> Date: Fri, 28 May 2021 13:00:12 +0800 Subject: [PATCH 5/7] ltime > stime to do replace --- src/client/src/tscStream.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 7e6132b7c8..2226c3d95d 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -576,7 +576,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { // set stime with ltime if ltime > stime const char* dstTable = pStream->dstTable? pStream->dstTable: ""; tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime); - if(pStream->ltime > 0 && pStream->ltime > pStream->stime) { + if(pStream->ltime > pStream->stime) { tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" ", dstTable, pStream->stime, pStream->ltime); pStream->stime = pStream->ltime; } From d5965c461a1bc0519fee465c438b7b3e078299ce Mon Sep 17 00:00:00 2001 From: tickduan <417921451@qq.com> Date: Mon, 31 May 2021 15:51:43 +0800 Subject: [PATCH 6/7] must ltime >0 to replace --- src/client/src/tscStream.c | 4 ++-- tests/pytest/util/dnodes.py | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 2226c3d95d..3998f99afe 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -576,8 +576,8 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { // set stime with ltime if ltime > stime const char* dstTable = pStream->dstTable? pStream->dstTable: ""; tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime); - if(pStream->ltime > pStream->stime) { - tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" ", dstTable, pStream->stime, pStream->ltime); + if(pStream->ltime > 0 && pStream->ltime > pStream->stime) { + tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime>0 ", dstTable, pStream->stime, pStream->ltime); pStream->stime = pStream->ltime; } diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 6eaf4e18af..0f71ffd0a3 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -432,7 +432,7 @@ class TDDnodes: self.simDeployed = False def init(self, path): - psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" + psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): killCmd = "kill -TERM %s > /dev/null 2>&1" % processID @@ -545,14 +545,14 @@ class TDDnodes: for i in range(len(self.dnodes)): self.dnodes[i].stop() - psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'" + psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") if processID: cmd = "sudo systemctl stop taosd" os.system(cmd) # if os.system(cmd) != 0 : # tdLog.exit(cmd) - psCmd = "ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'" + psCmd = "ps -ef|grep -w taosd| grep -v grep| grep -v defunct | awk '{print $2}'" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8") while(processID): killCmd = "kill -TERM %s > /dev/null 2>&1" % processID From edd05ea486d618af8bbb7b8baaae115253ec7b09 Mon Sep 17 00:00:00 2001 From: tickduan <417921451@qq.com> Date: Mon, 31 May 2021 19:49:47 +0800 Subject: [PATCH 7/7] INT64_MIN default to ltime --- src/client/src/tscStream.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index 3998f99afe..0401d1f3b2 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -576,7 +576,7 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) { // set stime with ltime if ltime > stime const char* dstTable = pStream->dstTable? pStream->dstTable: ""; tscDebug(" CQ table=%s ltime is %"PRId64, dstTable, pStream->ltime); - if(pStream->ltime > 0 && pStream->ltime > pStream->stime) { + if(pStream->ltime != INT64_MIN && pStream->ltime > pStream->stime) { tscWarn(" CQ set stream %s stime=%"PRId64" replace with ltime=%"PRId64" if ltime>0 ", dstTable, pStream->stime, pStream->ltime); pStream->stime = pStream->ltime; } @@ -678,6 +678,7 @@ TAOS_STREAM *taos_open_stream_withname(TAOS *taos, const char* dstTable, const c return NULL; } + pStream->ltime = INT64_MIN; pStream->stime = stime; pStream->fp = fp; pStream->callback = callback;