diff --git a/src/client/src/tscStream.c b/src/client/src/tscStream.c index e2cdff2d03..c39ab23811 100644 --- a/src/client/src/tscStream.c +++ b/src/client/src/tscStream.c @@ -121,7 +121,20 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) { pQueryInfo->window.ekey = pStream->etime; } } else { - pQueryInfo->window.skey = pStream->stime - pStream->interval; + pQueryInfo->window.skey = pStream->stime;// - pStream->interval; + int64_t etime = taosGetTimestamp(pStream->precision); + // delay to wait all data in last time window + if (pStream->precision == TSDB_TIME_PRECISION_MICRO) { + etime -= tsMaxStreamComputDelay * 1000l; + } else { + etime -= tsMaxStreamComputDelay; + } + if (etime > pStream->etime) { + etime = pStream->etime; + } else { + etime = pStream->stime + (etime - pStream->stime) / pStream->interval * pStream->interval; + } + pQueryInfo->window.ekey = etime; } // launch stream computing in a new thread @@ -151,17 +164,45 @@ static void tscProcessStreamQueryCallback(void *param, TAOS_RES *tres, int numOf taos_fetch_rows_a(tres, tscProcessStreamRetrieveResult, param); } -static void tscSetTimestampForRes(SSqlStream *pStream, SSqlObj *pSql) { - SSqlRes *pRes = &pSql->res; - - int64_t timestamp = *(int64_t *)pRes->data; - int64_t actualTimestamp = pStream->stime - pStream->interval; - - if (timestamp != actualTimestamp) { - // reset the timestamp of each agg point by using start time of each interval - *((int64_t *)pRes->data) = actualTimestamp; - tscWarn("%p stream:%p, timestamp of points is:%" PRId64 ", reset to %" PRId64, pSql, pStream, timestamp, actualTimestamp); +// no need to be called as this is alreay done in the query +static void tscStreamFillTimeGap(SSqlStream* pStream, TSKEY ts) { +#if 0 + SSqlObj * pSql = pStream->pSql; + SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); + + if (pQueryInfo->fillType != TSDB_FILL_SET_VALUE && pQueryInfo->fillType != TSDB_FILL_NULL) { + return; } + + SSqlRes *pRes = &pSql->res; + /* failed to retrieve any result in this retrieve */ + pSql->res.numOfRows = 1; + void *row[TSDB_MAX_COLUMNS] = {0}; + char tmpRes[TSDB_MAX_BYTES_PER_ROW] = {0}; + void *oldPtr = pSql->res.data; + pSql->res.data = tmpRes; + int32_t rowNum = 0; + + while (pStream->stime + pStream->slidingTime < ts) { + pStream->stime += pStream->slidingTime; + *(TSKEY*)row[0] = pStream->stime; + for (int32_t i = 1; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { + int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i); + TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); + assignVal(pSql->res.data + offset, (char *)(&pQueryInfo->fillVal[i]), pField->bytes, pField->type); + row[i] = pSql->res.data + offset; + } + (*pStream->fp)(pStream->param, pSql, row); + ++rowNum; + } + + if (rowNum > 0) { + tscDebug("%p stream:%p %d rows padded", pSql, pStream, rowNum); + } + + pRes->numOfRows = 0; + pRes->data = oldPtr; +#endif } static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOfRows) { @@ -180,10 +221,10 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf if (numOfRows > 0) { // when reaching here the first execution of stream computing is successful. pStream->numOfRes += numOfRows; - for(int32_t i = 0; i < numOfRows; ++i) { TAOS_ROW row = taos_fetch_row(res); tscDebug("%p stream:%p fetch result", pSql, pStream); + tscStreamFillTimeGap(pStream, *(TSKEY*)row[0]); pStream->stime = *(TSKEY *)row[0]; // user callback function @@ -194,40 +235,8 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream); } else { // numOfRows == 0, all data has been retrieved pStream->useconds += pSql->res.useconds; - - SQueryInfo* pQueryInfo = tscGetQueryInfoDetail(&pSql->cmd, 0); - if (pStream->numOfRes == 0) { - if (pQueryInfo->fillType == TSDB_FILL_SET_VALUE || pQueryInfo->fillType == TSDB_FILL_NULL) { - SSqlRes *pRes = &pSql->res; - - /* failed to retrieve any result in this retrieve */ - pSql->res.numOfRows = 1; - void *row[TSDB_MAX_COLUMNS] = {0}; - char tmpRes[TSDB_MAX_BYTES_PER_ROW] = {0}; - - void *oldPtr = pSql->res.data; - pSql->res.data = tmpRes; - - for (int32_t i = 1; i < pQueryInfo->fieldsInfo.numOfOutput; ++i) { - int16_t offset = tscFieldInfoGetOffset(pQueryInfo, i); - TAOS_FIELD *pField = tscFieldInfoGetField(&pQueryInfo->fieldsInfo, i); - - assignVal(pSql->res.data + offset, (char *)(&pQueryInfo->fillVal[i]), pField->bytes, pField->type); - row[i] = pSql->res.data + offset; - } - - tscSetTimestampForRes(pStream, pSql); - row[0] = pRes->data; - - tscDebug("%p stream:%p fetch result", pSql, pStream); - - // user callback function - (*pStream->fp)(pStream->param, res, row); - - pRes->numOfRows = 0; - pRes->data = oldPtr; - } else if (pStream->isProject) { + if (pStream->isProject) { /* no resuls in the query range, retry */ // todo set retry dynamic time int32_t retry = tsProjectExecInterval; @@ -236,10 +245,8 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf tscSetRetryTimer(pStream, pStream->pSql, retry); return; } - } else { - if (pStream->isProject) { - pStream->stime += 1; - } + } else if (pStream->isProject) { + pStream->stime += 1; } tscDebug("%p stream:%p, query on:%s, fetch result completed, fetched rows:%" PRId64, pSql, pStream, pTableMetaInfo->name, @@ -421,8 +428,6 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in } else { // timewindow based aggregation stream if (stime == 0) { // no data in meter till now stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval; - const char* fmtts(int64_t); - printf("stream start time is: %s\n", fmtts(stime)); tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime); } else { int64_t newStime = (stime / pStream->interval) * pStream->interval;