td-909: fix stream start time issue
fix crash in taos_load_table_info
This commit is contained in:
parent
efe558731b
commit
dbd22da833
|
@ -880,6 +880,8 @@ int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
|||
}
|
||||
|
||||
SSqlObj* pSql = calloc(1, sizeof(SSqlObj));
|
||||
pSql->pTscObj = taos;
|
||||
pSql->signature = pSql;
|
||||
SSqlRes *pRes = &pSql->res;
|
||||
|
||||
pRes->numOfTotal = 0; // the number of getting table meta from server
|
||||
|
|
|
@ -122,7 +122,7 @@ static void tscProcessStreamTimer(void *handle, void *tmrId) {
|
|||
pQueryInfo->window.ekey = pStream->etime;
|
||||
}
|
||||
} else {
|
||||
pQueryInfo->window.skey = pStream->stime - pStream->interval;
|
||||
pQueryInfo->window.skey = pStream->stime;
|
||||
int64_t etime = taosGetTimestamp(pStream->precision);
|
||||
// delay to wait all data in last time window
|
||||
if (pStream->precision == TSDB_TIME_PRECISION_MICRO) {
|
||||
|
@ -232,6 +232,9 @@ static void tscProcessStreamRetrieveResult(void *param, TAOS_RES *res, int numOf
|
|||
(*pStream->fp)(pStream->param, res, row);
|
||||
}
|
||||
|
||||
if (!pStream->isProject) {
|
||||
pStream->stime += pStream->slidingTime;
|
||||
}
|
||||
// actually only one row is returned. this following is not necessary
|
||||
taos_fetch_rows_a(res, tscProcessStreamRetrieveResult, pStream);
|
||||
} else { // numOfRows == 0, all data has been retrieved
|
||||
|
@ -432,6 +435,7 @@ static int64_t tscGetStreamStartTimestamp(SSqlObj *pSql, SSqlStream *pStream, in
|
|||
} else { // timewindow based aggregation stream
|
||||
if (stime == 0) { // no data in meter till now
|
||||
stime = ((int64_t)taosGetTimestamp(pStream->precision) / pStream->interval) * pStream->interval;
|
||||
stime -= pStream->interval;
|
||||
tscWarn("%p stream:%p, last timestamp:0, reset to:%" PRId64, pSql, pStream, stime);
|
||||
} else {
|
||||
int64_t newStime = (stime / pStream->interval) * pStream->interval;
|
||||
|
|
|
@ -98,7 +98,12 @@ static void check_row_count(int line, TAOS_RES* res, int expected) {
|
|||
static void verify_query(TAOS* taos) {
|
||||
prepare_data(taos);
|
||||
|
||||
int code = taos_validate_sql(taos, "select * from nonexisttable");
|
||||
int code = taos_load_table_info(taos, "t0,t1,t2,t3,t4,t5,t6,t7,t8,t9");
|
||||
if (code != 0) {
|
||||
printf("\033[31mfailed to load table info: 0x%08x\033[0m\n", code);
|
||||
}
|
||||
|
||||
code = taos_validate_sql(taos, "select * from nonexisttable");
|
||||
if (code == 0) {
|
||||
printf("\033[31mimpossible, the table does not exists\033[0m\n");
|
||||
}
|
||||
|
@ -127,9 +132,17 @@ static void verify_query(TAOS* taos) {
|
|||
code = taos_errno(res);
|
||||
printf("code=%d, error msg=%s\n", code, taos_errstr(res));
|
||||
taos_free_result(res);
|
||||
|
||||
res = taos_query(taos, "select * from meters");
|
||||
taos_stop_query(res);
|
||||
}
|
||||
|
||||
|
||||
void subscribe_callback(TAOS_SUB* tsub, TAOS_RES *res, void* param, int code) {
|
||||
int rows = print_result(res, *(int*)param);
|
||||
printf("%d rows consumed in subscribe_callback\n", rows);
|
||||
}
|
||||
|
||||
static void verify_subscribe(TAOS* taos) {
|
||||
prepare_data(taos);
|
||||
|
||||
|
@ -188,6 +201,13 @@ static void verify_subscribe(TAOS* taos) {
|
|||
check_row_count(__LINE__, res, 1);
|
||||
|
||||
taos_unsubscribe(tsub, 0);
|
||||
|
||||
int blockFetch = 0;
|
||||
tsub = taos_subscribe(taos, 1, "test", "select * from meters;", subscribe_callback, &blockFetch, 1000);
|
||||
usleep(2000000);
|
||||
taos_query(taos, "insert into t0 values('2020-01-01 00:05:00.001', 0);");
|
||||
usleep(2000000);
|
||||
taos_unsubscribe(tsub, 0);
|
||||
}
|
||||
|
||||
|
||||
|
@ -358,7 +378,60 @@ void verify_prepare(TAOS* taos) {
|
|||
taos_stmt_close(stmt);
|
||||
}
|
||||
|
||||
void retrieve_callback(void *param, TAOS_RES *tres, int numOfRows)
|
||||
{
|
||||
if (numOfRows > 0) {
|
||||
printf("%d rows async retrieved\n", numOfRows);
|
||||
taos_fetch_rows_a(tres, retrieve_callback, param);
|
||||
} else {
|
||||
if (numOfRows < 0) {
|
||||
printf("\033[31masync retrieve failed, code: %d\033[0m\n", numOfRows);
|
||||
} else {
|
||||
printf("async retrieve completed\n");
|
||||
}
|
||||
taos_free_result(tres);
|
||||
}
|
||||
}
|
||||
|
||||
void select_callback(void *param, TAOS_RES *tres, int code)
|
||||
{
|
||||
if (code == 0 && tres) {
|
||||
taos_fetch_rows_a(tres, retrieve_callback, param);
|
||||
} else {
|
||||
printf("\033[31masync select failed, code: %d\033[0m\n", code);
|
||||
}
|
||||
}
|
||||
|
||||
void verify_async(TAOS* taos) {
|
||||
prepare_data(taos);
|
||||
taos_query_a(taos, "select * from meters", select_callback, NULL);
|
||||
usleep(1000000);
|
||||
}
|
||||
|
||||
void stream_callback(void *param, TAOS_RES *res, TAOS_ROW row) {
|
||||
int num_fields = taos_num_fields(res);
|
||||
TAOS_FIELD* fields = taos_fetch_fields(res);
|
||||
|
||||
printf("got one row from stream_callback\n");
|
||||
char temp[256];
|
||||
taos_print_row(temp, row, fields, num_fields);
|
||||
puts(temp);
|
||||
}
|
||||
|
||||
void verify_stream(TAOS* taos) {
|
||||
prepare_data(taos);
|
||||
TAOS_STREAM* strm = taos_open_stream(
|
||||
taos,
|
||||
"select count(*) from meters interval(1m)",
|
||||
stream_callback,
|
||||
0,
|
||||
NULL,
|
||||
NULL);
|
||||
printf("waiting for stream data\n");
|
||||
usleep(100000);
|
||||
taos_query(taos, "insert into t0 values(now, 0)(now+5s,1)(now+10s, 2);");
|
||||
usleep(200000000);
|
||||
taos_close_stream(strm);
|
||||
}
|
||||
|
||||
int main(int argc, char *argv[]) {
|
||||
|
@ -382,12 +455,19 @@ int main(int argc, char *argv[]) {
|
|||
|
||||
printf("************ verify query *************\n");
|
||||
verify_query(taos);
|
||||
|
||||
printf("********* verify async query **********\n");
|
||||
verify_async(taos);
|
||||
|
||||
printf("*********** verify subscribe ************\n");
|
||||
verify_subscribe(taos);
|
||||
|
||||
printf("************ verify prepare *************\n");
|
||||
verify_prepare(taos);
|
||||
|
||||
printf("************ verify stream *************\n");
|
||||
verify_stream(taos);
|
||||
printf("done\n");
|
||||
|
||||
taos_close(taos);
|
||||
taos_cleanup();
|
||||
|
|
Loading…
Reference in New Issue