cq can continue with output table last row time

This commit is contained in:
tickduan 2021-05-22 15:11:45 +08:00
parent 3500811178
commit 261e50e23e
1 changed files with 32 additions and 0 deletions

View File

@ -539,6 +539,31 @@ static int64_t tscGetLaunchTimestamp(const SSqlStream *pStream) {
return (pStream->precision == TSDB_TIME_PRECISION_MICRO) ? timer / 1000L : timer;
}
//
// get tableName last row time, if have error return zero.
//
static int64_t tscGetStreamTableLastTime(SSqlObj* pSql, SSqlStream* pStream, const char* tableName) {
int64_t last_time = 0;
char sql[128] = "";
sprintf(sql, "select last_row(*) from %s;", tableName);
// query sql
TAOS_RES* res = taos_query(pSql->pTscObj, sql);
if(res == NULL)
return 0;
// only fetch one row
TAOS_ROW row = taos_fetch_row(res);
if( row[0] ) {
last_time = *((int64_t*)row[0]);
}
// free and return
taos_free_result(res);
return last_time;
}
static void tscCreateStream(void *param, TAOS_RES *res, int code) {
SSqlStream* pStream = (SSqlStream*)param;
SSqlObj* pSql = pStream->pSql;
@ -572,6 +597,13 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pStream->stime = tscGetStreamStartTimestamp(pSql, pStream, pStream->stime);
// set output table last record time to stime if have, why do this, because continue with last break
int64_t last_time = tscGetStreamTableLastTime(pSql, pStream, pStream->dstTable);
if(last_time > 0 && last_time > pStream->stime) {
// can replace stime with last row time
pStream->stime = last_time;
}
int64_t starttime = tscGetLaunchTimestamp(pStream);
pCmd->command = TSDB_SQL_SELECT;