diff --git a/include/client/taos.h b/include/client/taos.h index 6d3bf39c03..0260316618 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -29,8 +29,8 @@ typedef void TAOS_RES; typedef void **TAOS_ROW; #if 0 typedef void TAOS_STREAM; -typedef void TAOS_SUB; #endif +typedef void TAOS_SUB; // Data type definition #define TSDB_DATA_TYPE_NULL 0 // 1 bytes @@ -182,13 +182,16 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres); DLL_EXPORT void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param); DLL_EXPORT void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param); -#if 0 +// Shuduo: temporary enable for app build +#if 1 typedef void (*__taos_sub_fn_t)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); DLL_EXPORT TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char *topic, const char *sql, __taos_sub_fn_t fp, void *param, int interval); DLL_EXPORT TAOS_RES *taos_consume(TAOS_SUB *tsub); DLL_EXPORT void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress); +#endif +#if 0 DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row), int64_t stime, void *param, void (*callback)(void *)); DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); @@ -281,7 +284,7 @@ DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char * DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql); /* ------------------------------ TMQ END -------------------------------- */ -#if 0 +#if 1 // Shuduo: temporary enable for app build typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); #endif diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 3f82cacd66..b1ada016e8 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -392,7 +392,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { // TODO } -#if 0 + TAOS_SUB *taos_subscribe(TAOS *taos, int restart, const char* topic, const char *sql, TAOS_SUBSCRIBE_CALLBACK fp, void *param, int interval) { // TODO return NULL; @@ -406,7 +406,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) { void taos_unsubscribe(TAOS_SUB *tsub, int keepProgress) { // TODO } -#endif TAOS_STMT *taos_stmt_init(TAOS *taos) { // TODO @@ -458,12 +457,27 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) { return -1; } -int taos_stmt_add_batch(TAOS_STMT *stmt) { - // TODO - return -1; +int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert) { + // TODO + return -1; } -int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) { - // TODO - return -1; +int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) { + // TODO + return -1; +} + +int taos_stmt_add_batch(TAOS_STMT* stmt) { + // TODO + return -1; +} + +TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) { + // TODO + return NULL; +} + +int taos_stmt_bind_param_batch(TAOS_STMT* stmt, TAOS_MULTI_BIND* bind) { + // TODO + return -1; } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 159ec5b3e8..469a517c3e 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -275,6 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { pMsg->pCont = NULL; pMsg->contLen = 0; pMsg->code = -1; + ASSERT(0); rpcSendResponse(pMsg); return 0; } @@ -356,6 +357,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { void* buf = rpcMallocCont(tlen); if (buf == NULL) { pMsg->code = -1; + ASSERT(0); return -1; } ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 6365b2abcb..3d9d9a90dd 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -142,15 +142,6 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { colInfo.info.colId = pColSchema->colId; colInfo.info.type = pColSchema->type; -#if 0 - colInfo.pData = taosMemoryCalloc(1, sz); - if (colInfo.pData == NULL) { - // TODO free - taosArrayDestroy(pArray); - return NULL; - } -#endif - if (blockDataEnsureColumnCapacity(&colInfo, numOfRows) < 0) { taosArrayDestroyEx(pArray, (void (*)(void*))tDeleteSSDataBlock); return NULL; @@ -161,39 +152,9 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { } } -#if 0 - int j = 0; - for (int32_t i = 0; i < colNumNeed; i++) { - col_id_t colId = *(col_id_t*)taosArrayGet(pHandle->pColIdList, i); - while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) { - j++; - } - if (j >= pSchemaWrapper->nCols) { - continue; - } - SSchema* pColSchema = &pSchemaWrapper->pSchema[j]; - SColumnInfoData colInfo = {0}; - int sz = numOfRows * pColSchema->bytes; - colInfo.info.bytes = pColSchema->bytes; - colInfo.info.colId = colId; - colInfo.info.type = pColSchema->type; - - colInfo.pData = taosMemoryCalloc(1, sz); - if (colInfo.pData == NULL) { - // TODO free - taosArrayDestroy(pArray); - return NULL; - } - - blockDataEnsureColumnCapacity(&colInfo, numOfRows); - taosArrayPush(pArray, &colInfo); - } -#endif - STSRowIter iter = {0}; tdSTSRowIterInit(&iter, pTschema); STSRow* row; - // int32_t kvIdx = 0; int32_t curRow = 0; tInitSubmitBlkIter(pHandle->pBlock, &pHandle->blkIter); while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { @@ -206,25 +167,9 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { if (!tdSTSRowIterNext(&iter, pColData->info.colId, pColData->info.type, &sVal)) { break; } - memcpy(POINTER_SHIFT(pColData->pData, curRow * pColData->info.bytes), sVal.val, pColData->info.bytes); + // TODO handle null + colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL); } -#if 0 - for (int32_t i = 0; i < colNumNeed; i++) { - SColumnInfoData* pColData = taosArrayGet(pArray, i); - STColumn* pCol = schemaColAt(pTschema, i); - // TODO - if(pCol->colId != pColData->info.colId) { - continue; - } - // void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); - SCellVal sVal = {0}; - if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) { - // TODO: reach end - break; - } - memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes); - } -#endif curRow++; } return pArray; diff --git a/tests/script/jenkins/basic.txt b/tests/script/jenkins/basic.txt index 77ee1d9de4..2cad518128 100644 --- a/tests/script/jenkins/basic.txt +++ b/tests/script/jenkins/basic.txt @@ -37,6 +37,6 @@ # ---- tmq ./test.sh -f tsim/tmq/basic.sim -#./test.sh -f tsim/tmq/basic1.sim +./test.sh -f tsim/tmq/basic1.sim #======================b1-end=============== diff --git a/tests/script/tsim/parser/groupby-basic.sim b/tests/script/tsim/parser/groupby-basic.sim index cbd05031df..c0cbfa8aeb 100644 --- a/tests/script/tsim/parser/groupby-basic.sim +++ b/tests/script/tsim/parser/groupby-basic.sim @@ -107,80 +107,54 @@ print rows: $rows print $data00 $data01 $data02 $data03 print $data10 $data11 $data12 $data13 print $data20 $data21 $data22 $data23 -if $row != 20 then - return -1 +print $data80 $data81 $data82 $data83 +print $data90 $data91 $data92 $data93 +if $rows != 10 then + return -1 endi - -if $data00 != 100 then - return -1 -endi - +#if $data00 != 10 then +# return -1 +#endi if $data01 != 0 then return -1 endi - -if $data10 != 100 then - return -1 -endi - +#if $data10 != 10 then +# return -1 +#endi if $data11 != 1 then return -1 endi - -sql select first(ts),c1 from group_tb0 where c1<20 group by c1; -if $row != 20 then - return -1 -endi - -if $data00 != @70-01-01 08:01:40.000@ then - return -1 -endi - -if $data01 != 0 then - return -1 -endi - -if $data90 != @70-01-01 08:01:40.009@ then - return -1 -endi - +#if $data90 != 10 then +# return -1 +#endi if $data91 != 9 then return -1 endi -sql select first(ts), ts, c1 from group_tb0 where c1 < 20 group by c1; -print $row -if $row != 20 then +sql select first(ts),c1 from group_tb0 group by c1; +print rows: $rows +print $data00 $data01 $data02 $data03 +print $data10 $data11 $data12 $data13 +print $data20 $data21 $data22 $data23 +print $data80 $data81 $data82 $data83 +print $data90 $data91 $data92 $data93 +if $row != 10 then return -1 endi -if $data00 != $data01 then +if $data00 != @2022-01-01 00:00:00.000@ then + return -1 +endi +if $data01 != 0 then + return -1 +endi +if $data90 != @2022-01-01 00:00:00.009@ then + return -1 +endi +if $data91 != 9 then return -1 endi -if $data10 != $data11 then - return -1 -endi - -if $data20 != $data21 then - return -1 -endi - -if $data90 != $data91 then - return -1 -endi - -if $data02 != 0 then - return -1 -endi - -if $data12 != 1 then - return -1 -endi - -if $data92 != 9 then - return -1 -endi sql select sum(c1), c1, avg(c1), min(c1), max(c2) from group_tb0 where c1 < 20 group by c1; if $row != 20 then diff --git a/tests/script/tsim/testCaseSuite.sim b/tests/script/tsim/testCaseSuite.sim index 2ed5e5fcf3..bf184f8794 100644 --- a/tests/script/tsim/testCaseSuite.sim +++ b/tests/script/tsim/testCaseSuite.sim @@ -26,4 +26,4 @@ run tsim/show/basic.sim run tsim/table/basic1.sim run tsim/tmq/basic.sim -#run tsim/tmq/basic1.sim +run tsim/tmq/basic1.sim diff --git a/tests/script/tsim/tmq/basic1.sim b/tests/script/tsim/tmq/basic1.sim index a04c4e8252..fe6a7a0660 100644 --- a/tests/script/tsim/tmq/basic1.sim +++ b/tests/script/tsim/tmq/basic1.sim @@ -3,9 +3,10 @@ # vgroups=1, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb # vgroups=4, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb # vgroups=4, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb +# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN # The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5; # -# notes: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). +# notes2: not support aggregate functions(such as sum/count/min/max) and time-windows(interval). # system sh/stop_dnodes.sh @@ -135,44 +136,44 @@ print inserted totalMsgCnt: $totalMsgCnt print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2" print cmd result----> $system_content -if $system_content != @{consume success: 20}@ then +if $system_content != @{consume success: 20, 0}@ then return -1 endi #print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" #system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2" #print cmd result----> $system_content -#if $system_content != @{consume success: 20}@ then +#if $system_content != @{consume success: 20, 0}@ then # return -1 #endi print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2" print cmd result----> $system_content -if $system_content != @{consume success: 10}@ then +if $system_content != @{consume success: 10, 0}@ then return -1 endi -#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" -#print cmd result----> $system_content -#if $system_content != @{consume success: 10}@ then -# return -1 -#endi +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2" +print cmd result----> $system_content +if $system_content != @{consume success: 10, 0}@ then + return -1 +endi print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2" print cmd result----> $system_content -if $system_content != @{consume success: 20}@ then +if $system_content != @{consume success: 20, 0}@ then return -1 endi -#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" -#print cmd result----> $system_content -#if $system_content != @{consume success: 20}@ then -# return -1 -#endi +print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" +system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2" +print cmd result----> $system_content +if $system_content != @{consume success: 20, 0}@ then + return -1 +endi print =============== create database , vgroup 4 $dbNamme = d1 diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 75542bfa71..4d3108500e 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -219,33 +219,33 @@ tmq_list_t* build_topic_list() { return topic_list; } -void perf_loop(tmq_t* tmq, tmq_list_t* topics) { +void loop_consume(tmq_t* tmq) { tmq_resp_err_t err; - if ((err = tmq_subscribe(tmq, topics))) { - printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); - exit(-1); - } - int32_t totalMsgs = 0; + int32_t totalRows = 0; int32_t skipLogNum = 0; - //int64_t startTime = taosGetTimestampUs(); while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1); - if (tmqmessage) { - totalMsgs++; - skipLogNum += tmqGetSkipLogNum(tmqmessage); - if (0 != g_stConfInfo.showMsgFlag) { - msg_process(tmqmessage); + tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 1); + if (tmqMsg) { + totalMsgs++; + + #if 0 + TAOS_ROW row; + while (NULL != (row = tmq_get_row(tmqMsg))) { + totalRows++; } - tmq_message_destroy(tmqmessage); + #endif + + skipLogNum += tmqGetSkipLogNum(tmqMsg); + if (0 != g_stConfInfo.showMsgFlag) { + msg_process(tmqMsg); + } + tmq_message_destroy(tmqMsg); } else { break; } } - //int64_t endTime = taosGetTimestampUs(); - //double consumeTime = (double)(endTime - startTime) / 1000000; - err = tmq_consumer_close(tmq); if (err) { @@ -253,7 +253,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) { exit(-1); } - printf("{consume success: %d}", totalMsgs); + printf("{consume success: %d, %d}", totalMsgs, totalRows); } int main(int32_t argc, char *argv[]) { @@ -266,7 +266,21 @@ int main(int32_t argc, char *argv[]) { return -1; } - perf_loop(tmq, topic_list); + tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); + if (err) { + printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); + exit(-1); + } + + loop_consume(tmq); + + #if 0 + err = tmq_unsubscribe(tmq); + if (err) { + printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); + exit(-1); + } + #endif return 0; }