diff --git a/example/src/tmq.c b/example/src/tmq.c index ca80c8fe5a..85dea5b382 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -19,8 +19,8 @@ #include #include "taos.h" -static int running = 1; -static void msg_process(tmq_message_t* message) { tmqShowMsg(message); } +static int running = 1; +/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/ int32_t init_env() { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -166,11 +166,11 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { int32_t cnt = 0; /*clock_t startTime = clock();*/ while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500); if (tmqmessage) { cnt++; - printf("get data\n"); - msg_process(tmqmessage); + /*printf("get data\n");*/ + /*msg_process(tmqmessage);*/ tmq_message_destroy(tmqmessage); /*} else {*/ /*break;*/ @@ -198,9 +198,9 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { } while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1000); + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); if (tmqmessage) { - msg_process(tmqmessage); + /*msg_process(tmqmessage);*/ tmq_message_destroy(tmqmessage); if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); @@ -226,10 +226,10 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) { int32_t skipLogNum = 0; clock_t startTime = clock(); while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 500); if (tmqmessage) { batchCnt++; - skipLogNum += tmqGetSkipLogNum(tmqmessage); + /*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/ /*msg_process(tmqmessage);*/ tmq_message_destroy(tmqmessage); } else { diff --git a/include/client/taos.h b/include/client/taos.h index d3856d432e..87948e7824 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -30,7 +30,7 @@ typedef void **TAOS_ROW; #if 0 typedef void TAOS_STREAM; #endif -typedef void TAOS_SUB; +typedef void TAOS_SUB; // Data type definition #define TSDB_DATA_TYPE_NULL 0 // 1 bytes @@ -138,13 +138,13 @@ typedef enum { #define RET_MSG_LENGTH 1024 typedef struct setConfRet { SET_CONF_RET_CODE retCode; - char retMsg[RET_MSG_LENGTH]; + char retMsg[RET_MSG_LENGTH]; } setConfRet; -DLL_EXPORT void taos_cleanup(void); -DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); -DLL_EXPORT setConfRet taos_set_config(const char *config); -DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); +DLL_EXPORT void taos_cleanup(void); +DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...); +DLL_EXPORT setConfRet taos_set_config(const char *config); +DLL_EXPORT TAOS *taos_connect(const char *ip, const char *user, const char *pass, const char *db, uint16_t port); DLL_EXPORT TAOS *taos_connect_l(const char *ip, int ipLen, const char *user, int userLen, const char *pass, int passLen, const char *db, int dbLen, uint16_t port); DLL_EXPORT TAOS *taos_connect_auth(const char *ip, const char *user, const char *auth, const char *db, uint16_t port); @@ -152,34 +152,34 @@ DLL_EXPORT void taos_close(TAOS *taos); const char *taos_data_type(int type); -DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); -DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); -DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags); -DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); -DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); +DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); +DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); +DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND *tags); +DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); +DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); -DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); -DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); -DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); -DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind); -DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); -DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); -DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); -DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); -DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); +DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); +DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); +DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND *bind); +DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind); +DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int colIdx); +DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); +DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); +DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); -DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); -DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); +DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); +DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); -DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); -DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result -DLL_EXPORT void taos_free_result(TAOS_RES *res); -DLL_EXPORT int taos_field_count(TAOS_RES *res); -DLL_EXPORT int taos_num_fields(TAOS_RES *res); -DLL_EXPORT int taos_affected_rows(TAOS_RES *res); +DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); +DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result +DLL_EXPORT void taos_free_result(TAOS_RES *res); +DLL_EXPORT int taos_field_count(TAOS_RES *res); +DLL_EXPORT int taos_num_fields(TAOS_RES *res); +DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); @@ -188,14 +188,14 @@ DLL_EXPORT void taos_stop_query(TAOS_RES *res); DLL_EXPORT bool taos_is_null(TAOS_RES *res, int32_t row, int32_t col); DLL_EXPORT bool taos_is_update_query(TAOS_RES *res); DLL_EXPORT int taos_fetch_block(TAOS_RES *res, TAOS_ROW *rows); -DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int* numOfRows, TAOS_ROW *rows); -DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int* numOfRows, void** pData); +DLL_EXPORT int taos_fetch_block_s(TAOS_RES *res, int *numOfRows, TAOS_ROW *rows); +DLL_EXPORT int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData); DLL_EXPORT int *taos_get_column_data_offset(TAOS_RES *res, int columnIndex); DLL_EXPORT int taos_validate_sql(TAOS *taos, const char *sql); DLL_EXPORT void taos_reset_current_db(TAOS *taos); -DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res); -DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res); +DLL_EXPORT int *taos_fetch_lengths(TAOS_RES *res); +DLL_EXPORT TAOS_ROW *taos_result_block(TAOS_RES *res); DLL_EXPORT const char *taos_get_server_info(TAOS *taos); DLL_EXPORT const char *taos_get_client_info(); @@ -237,9 +237,9 @@ typedef struct tmq_t tmq_t; typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t; typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; -typedef struct tmq_conf_t tmq_conf_t; -typedef struct tmq_list_t tmq_list_t; -typedef struct tmq_message_t tmq_message_t; +typedef struct tmq_conf_t tmq_conf_t; +typedef struct tmq_list_t tmq_list_t; +// typedef struct tmq_message_t tmq_message_t; typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param)); @@ -259,7 +259,7 @@ DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); -DLL_EXPORT tmq_message_t *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); +DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); #if 0 DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups); @@ -268,8 +268,8 @@ DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** v DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); #if 0 DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async); -#endif DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); +#endif /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ enum tmq_conf_res_t { @@ -285,21 +285,24 @@ DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); DLL_EXPORT void tmq_conf_set_offset_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb); +#if 0 // temporary used function for demo only void tmqShowMsg(tmq_message_t *tmq_message); int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message); +#endif /* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */ +DLL_EXPORT char *tmq_get_topic_name(TAOS_RES *res); +DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); +#if 0 DLL_EXPORT TAOS_ROW tmq_get_row(tmq_message_t *message); -DLL_EXPORT char *tmq_get_topic_name(tmq_message_t *message); -DLL_EXPORT int32_t tmq_get_vgroup_id(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); DLL_EXPORT TAOS_FIELD *tmq_get_fields(tmq_t *tmq, const char *topic); DLL_EXPORT int32_t tmq_field_count(tmq_t *tmq, const char *topic); -DLL_EXPORT void tmq_message_destroy(tmq_message_t *tmq_message); - +#endif +DLL_EXPORT void tmq_message_destroy(TAOS_RES *res); /* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */ #if 0 DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *sql, int sqlLen); @@ -308,7 +311,7 @@ DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char * DLL_EXPORT TAOS_RES *tmq_create_stream(TAOS *taos, const char *streamName, const char *tbName, const char *sql); /* ------------------------------ TMQ END -------------------------------- */ -#if 1 // Shuduo: temporary enable for app build +#if 1 // Shuduo: temporary enable for app build typedef void (*TAOS_SUBSCRIBE_CALLBACK)(TAOS_SUB *tsub, TAOS_RES *res, void *param, int code); #endif diff --git a/include/common/tdatablock.h b/include/common/tdatablock.h index 5e5a8826e5..6f933c663e 100644 --- a/include/common/tdatablock.h +++ b/include/common/tdatablock.h @@ -199,11 +199,11 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF int32_t colInfoDataEnsureCapacity(SColumnInfoData* pColumn, uint32_t numOfRows); int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows); -void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); -void blockDataCleanup(SSDataBlock* pDataBlock); +void colInfoDataCleanup(SColumnInfoData* pColumn, uint32_t numOfRows); +void blockDataCleanup(SSDataBlock* pDataBlock); -size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); -void* blockDataDestroy(SSDataBlock* pBlock); +size_t blockDataGetCapacityInRow(const SSDataBlock* pBlock, size_t pageSize); +void* blockDataDestroy(SSDataBlock* pBlock); int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n); @@ -211,8 +211,8 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock); void blockDebugShowData(const SArray* dataBlocks); -static FORCE_INLINE int32_t blockEstimateEncodeSize(const SSDataBlock* pBlock) { - return blockDataGetSerialMetaSize(pBlock) + (int32_t)ceil(blockDataGetSerialRowSize(pBlock) * pBlock->info.rows); +static FORCE_INLINE int32_t blockGetEncodeSize(const SSDataBlock* pBlock) { + return blockDataGetSerialMetaSize(pBlock) + blockDataGetSize(pBlock); } static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32_t numOfRows, char* data, diff --git a/include/common/tmsg.h b/include/common/tmsg.h index bd4485dc94..5d5889e308 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -485,7 +485,7 @@ typedef struct { char intervalUnit; char slidingUnit; char - offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. + offsetUnit; // TODO Remove it, the offset is the number of precision tickle, and it must be a immutable duration. int8_t precision; int64_t interval; int64_t sliding; @@ -2384,6 +2384,53 @@ typedef struct { SArray* pBlockData; // SArray } SMqPollRsp; +typedef struct { + SMqRspHead head; + int64_t reqOffset; + int64_t rspOffset; + int32_t skipLogNum; + int32_t dataLen; + SArray* blockPos; // beginning pos for each SRetrieveTableRsp + void* blockData; // serialized batched SRetrieveTableRsp +} SMqPollRspV2; + +static FORCE_INLINE int32_t tEncodeSMqPollRspV2(void** buf, const SMqPollRspV2* pRsp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); + tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); + tlen += taosEncodeFixedI32(buf, pRsp->skipLogNum); + tlen += taosEncodeFixedI32(buf, pRsp->dataLen); + if (pRsp->dataLen != 0) { + int32_t sz = taosArrayGetSize(pRsp->blockPos); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + int32_t blockPos = *(int32_t*)taosArrayGet(pRsp->blockPos, i); + tlen += taosEncodeFixedI32(buf, blockPos); + } + tlen += taosEncodeBinary(buf, pRsp->blockData, pRsp->dataLen); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqPollRspV2(const void* buf, SMqPollRspV2* pRsp) { + buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); + buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); + buf = taosDecodeFixedI32(buf, &pRsp->skipLogNum); + buf = taosDecodeFixedI32(buf, &pRsp->dataLen); + if (pRsp->dataLen != 0) { + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pRsp->blockPos = taosArrayInit(sz, sizeof(int32_t)); + for (int32_t i = 0; i < sz; i++) { + int32_t blockPos; + buf = taosDecodeFixedI32(buf, &blockPos); + taosArrayPush(pRsp->blockPos, &blockPos); + } + buf = taosDecodeBinary(buf, &pRsp->blockData, pRsp->dataLen); + } + return (void*)buf; +} + typedef struct { SMqRspHead head; char cgroup[TSDB_CGROUP_LEN]; diff --git a/source/client/inc/clientInt.h b/source/client/inc/clientInt.h index 185b5824d9..b7d88c277b 100644 --- a/source/client/inc/clientInt.h +++ b/source/client/inc/clientInt.h @@ -194,12 +194,12 @@ enum { #define TD_RES_QUERY(res) (*(int8_t*)res == RES_TYPE__QUERY) #define TD_RES_TMQ(res) (*(int8_t*)res == RES_TYPE__TMQ) -typedef struct SMqRspObj { +typedef struct { int8_t resType; char* topic; - void* vg; SArray* res; // SArray int32_t resIter; + int32_t vgId; } SMqRspObj; typedef struct SRequestObj { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 040ddde630..c93260be37 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -465,7 +465,7 @@ int taos_fetch_raw_block(TAOS_RES *res, int *numOfRows, void **pData) { return 0; } - doFetchRow(pRequest, false, false); + doFetchRow(pRequest, false, true); SReqResultInfo *pResultInfo = &pRequest->body.resInfo; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index dbe78782f5..ea31170b1f 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -27,11 +27,22 @@ struct tmq_message_t { SMqPollRsp msg; char* topic; - void* vg; SArray* res; // SArray + int32_t vgId; int32_t resIter; }; +typedef struct { + int8_t tmqRspType; + int32_t epoch; +} SMqRspWrapper; + +typedef struct { + int8_t tmqRspType; + int32_t epoch; + SMqCMGetSubEpRsp msg; +} SMqAskEpRspWrapper; + struct tmq_list_t { SArray container; }; @@ -118,6 +129,14 @@ typedef struct { TAOS_FIELD* fields; } SMqClientTopic; +typedef struct { + int8_t tmqRspType; + int32_t epoch; + SMqClientVg* vgHandle; + SMqClientTopic* topicHandle; + SMqPollRspV2 msg; +} SMqPollRspWrapper; + typedef struct { tmq_t* tmq; tsem_t rspSem; @@ -133,10 +152,10 @@ typedef struct { typedef struct { tmq_t* tmq; SMqClientVg* pVg; + SMqClientTopic* pTopic; int32_t epoch; int32_t vgId; tsem_t rspSem; - tmq_message_t** msg; int32_t sync; } SMqPollCbParam; @@ -244,7 +263,7 @@ static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) { } void tmqClearUnhandleMsg(tmq_t* tmq) { - tmq_message_t* msg = NULL; + SMqRspWrapper* msg = NULL; while (1) { taosGetQitem(tmq->qall, (void**)&msg); if (msg) @@ -777,7 +796,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { return buf; } - +#if 0 int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { if (tmq_message == NULL) return 0; SMqPollRsp* pRsp = &tmq_message->msg; @@ -827,11 +846,13 @@ void tmqShowMsg(tmq_message_t* tmq_message) { } } } +#endif int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { /*printf("recv poll\n");*/ SMqPollCbParam* pParam = (SMqPollCbParam*)param; SMqClientVg* pVg = pParam->pVg; + SMqClientTopic* pTopic = pParam->pTopic; tmq_t* tmq = pParam->tmq; if (code != 0) { tscWarn("msg discard from vg %d, epoch %d, code:%x", pParam->vgId, pParam->epoch, code); @@ -874,18 +895,22 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { #endif /*SMqConsumeRsp* pRsp = taosMemoryCalloc(1, sizeof(SMqConsumeRsp));*/ - tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t)); - if (pRsp == NULL) { + /*tmq_message_t* pRsp = taosAllocateQitem(sizeof(tmq_message_t));*/ + SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper)); + if (pRspWrapper == NULL) { tscWarn("msg discard from vg %d, epoch %d since out of memory", pParam->vgId, pParam->epoch); goto CREATE_MSG_FAIL; } - memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->msg); - /*pRsp->iter.curBlock = 0;*/ - /*pRsp->iter.curRow = 0;*/ + pRspWrapper->tmqRspType = TMQ_MSG_TYPE__POLL_RSP; + pRspWrapper->vgHandle = pVg; + pRspWrapper->topicHandle = pTopic; + /*memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead));*/ + memcpy(&pRspWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); + tDecodeSMqPollRspV2(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRspWrapper->msg); // TODO: alloc mem /*pRsp->*/ /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ + #if 0 if (pRsp->msg.numOfTopics == 0) { /*printf("no data\n");*/ @@ -894,11 +919,10 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { } #endif - tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pParam->pVg->vgId, - pRsp->msg.reqOffset, pRsp->msg.rspOffset); + tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pVg->vgId, + pRspWrapper->msg.reqOffset, pRspWrapper->msg.rspOffset); - pRsp->vg = pParam->pVg; - taosWriteQitem(tmq->mqueue, pRsp); + taosWriteQitem(tmq->mqueue, pRspWrapper); atomic_add_fetch_32(&tmq->readyRequest, 1); /*tsem_post(&tmq->rspSem);*/ return 0; @@ -1015,16 +1039,19 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { } tDeleteSMqCMGetSubEpRsp(&rsp); } else { - SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp)); - if (pRsp == NULL) { + /*SMqCMGetSubEpRsp* pRsp = taosAllocateQitem(sizeof(SMqCMGetSubEpRsp));*/ + SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper)); + if (pWrapper == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; code = -1; goto END; } - memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pRsp); + pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP; + pWrapper->epoch = head->epoch; + memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); + tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg); - taosWriteQitem(tmq->mqueue, pRsp); + taosWriteQitem(tmq->mqueue, pWrapper); /*tsem_post(&tmq->rspSem);*/ } @@ -1152,6 +1179,25 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo return pReq; } +SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) { + SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); + pRspObj->resType = RES_TYPE__TMQ; + pRspObj->topic = strdup(pWrapper->topicHandle->topicName); + pRspObj->resIter = -1; + pRspObj->vgId = pWrapper->vgHandle->vgId; + SMqPollRspV2* pRsp = &pWrapper->msg; + int32_t blockNum = taosArrayGetSize(pRsp->blockPos); + pRspObj->res = taosArrayInit(blockNum, sizeof(SReqResultInfo)); + for (int32_t i = 0; i < blockNum; i++) { + int32_t pos = *(int32_t*)taosArrayGet(pRsp->blockPos, i); + SRetrieveTableRsp* pRetrieve = POINTER_SHIFT(pRsp->blockData, pos); + SReqResultInfo resInfo; + setQueryResultFromRsp(&resInfo, pRetrieve, true); + taosArrayPush(pRspObj->res, &resInfo); + } + return pRspObj; +} + #if 0 tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { tmq_message_t* msg = NULL; @@ -1258,6 +1304,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { } pParam->tmq = tmq; pParam->pVg = pVg; + pParam->pTopic = pTopic; pParam->vgId = pVg->vgId; pParam->epoch = tmq->epoch; pParam->sync = 0; @@ -1296,13 +1343,13 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { return 0; } -// return -int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) { - if (rspHead->mqMsgType == TMQ_MSG_TYPE__EP_RSP) { +int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) { + if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ - if (rspHead->epoch > atomic_load_32(&tmq->epoch)) { - SMqCMGetSubEpRsp* rspMsg = (SMqCMGetSubEpRsp*)rspHead; - tmqUpdateEp(tmq, rspHead->epoch, rspMsg); + if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) { + SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; + SMqCMGetSubEpRsp* rspMsg = &pEpRspWrapper->msg; + tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg); /*tmqClearUnhandleMsg(tmq);*/ *pReset = true; } else { @@ -1314,41 +1361,43 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspHead* rspHead, bool* pReset) { return 0; } -tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { +SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfReset) { while (1) { - SMqRspHead* rspHead = NULL; - taosGetQitem(tmq->qall, (void**)&rspHead); - if (rspHead == NULL) { + SMqRspWrapper* rspWrapper = NULL; + taosGetQitem(tmq->qall, (void**)&rspWrapper); + if (rspWrapper == NULL) { taosReadAllQitems(tmq->mqueue, tmq->qall); - taosGetQitem(tmq->qall, (void**)&rspHead); - if (rspHead == NULL) return NULL; + taosGetQitem(tmq->qall, (void**)&rspWrapper); + if (rspWrapper == NULL) return NULL; } - if (rspHead->mqMsgType == TMQ_MSG_TYPE__POLL_RSP) { - tmq_message_t* rspMsg = (tmq_message_t*)rspHead; + if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { + SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; atomic_sub_fetch_32(&tmq->readyRequest, 1); /*printf("handle poll rsp %d\n", rspMsg->head.mqMsgType);*/ - if (rspMsg->msg.head.epoch == atomic_load_32(&tmq->epoch)) { + if (pollRspWrapper->msg.head.epoch == atomic_load_32(&tmq->epoch)) { /*printf("epoch match\n");*/ - SMqClientVg* pVg = rspMsg->vg; + SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vg %d offset %ld up to %ld\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ - pVg->currentOffset = rspMsg->msg.rspOffset; + pVg->currentOffset = pollRspWrapper->msg.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - if (rspMsg->msg.numOfTopics == 0) { - taosFreeQitem(rspMsg); - rspHead = NULL; + if (pollRspWrapper->msg.dataLen == 0) { + taosFreeQitem(pollRspWrapper); + rspWrapper = NULL; continue; } - return rspMsg; + // build msg + SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper); + return pRsp; } else { /*printf("epoch mismatch\n");*/ - taosFreeQitem(rspMsg); + taosFreeQitem(pollRspWrapper); } } else { /*printf("handle ep rsp %d\n", rspMsg->head.mqMsgType);*/ bool reset = false; - tmqHandleNoPollRsp(tmq, rspHead, &reset); - taosFreeQitem(rspHead); + tmqHandleNoPollRsp(tmq, rspWrapper, &reset); + taosFreeQitem(rspWrapper); if (pollIfReset && reset) { tscDebug("consumer %ld reset and repoll", tmq->consumerId); tmqPollImpl(tmq, blockingTime); @@ -1382,17 +1431,17 @@ tmq_message_t* tmq_consumer_poll_v1(tmq_t* tmq, int64_t blocking_time) { } #endif -tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { - tmq_message_t* rspMsg; - int64_t startTime = taosGetTimestampMs(); +TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { + SMqRspObj* rspObj; + int64_t startTime = taosGetTimestampMs(); // TODO: put into another thread or delayed queue int64_t status = atomic_load_64(&tmq->status); tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT); - rspMsg = tmqHandleAllRsp(tmq, blocking_time, false); - if (rspMsg) { - return rspMsg; + rspObj = tmqHandleAllRsp(tmq, blocking_time, false); + if (rspObj) { + return (TAOS_RES*)rspObj; } while (1) { @@ -1402,9 +1451,9 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { /*tsem_wait(&tmq->rspSem);*/ - rspMsg = tmqHandleAllRsp(tmq, blocking_time, false); - if (rspMsg) { - return rspMsg; + rspObj = tmqHandleAllRsp(tmq, blocking_time, false); + if (rspObj) { + return (TAOS_RES*)rspObj; } if (blocking_time != 0) { int64_t endTime = taosGetTimestampMs(); @@ -1546,6 +1595,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v } #endif +#if 0 void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; SMqPollRsp* pRsp = &tmq_message->msg; @@ -1553,6 +1603,7 @@ void tmq_message_destroy(tmq_message_t* tmq_message) { /*taosMemoryFree(tmq_message);*/ taosFreeQitem(tmq_message); } +#endif tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { return TMQ_RESP_ERR__SUCCESS; } @@ -1563,4 +1614,27 @@ const char* tmq_err2str(tmq_resp_err_t err) { return "fail"; } -char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; } +char* tmq_get_topic_name(TAOS_RES* res) { + if (TD_RES_TMQ(res)) { + SMqRspObj* pRspObj = (SMqRspObj*)res; + return pRspObj->topic; + } else { + return NULL; + } +} + +int32_t tmq_get_vgroup_id(TAOS_RES* res) { + if (TD_RES_TMQ(res)) { + SMqRspObj* pRspObj = (SMqRspObj*)res; + return pRspObj->vgId; + } else { + return -1; + } +} + +void tmq_message_destroy(TAOS_RES* res) { + if (res == NULL) return; + if (TD_RES_TMQ(res)) { + SMqRspObj* pRspObj = (SMqRspObj*)res; + } +} diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 794427475e..bee61f4d40 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -255,7 +255,7 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu return 0; } - +#if 0 int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { SMqPollReq* pReq = pMsg->pCont; int64_t consumerId = pReq->consumerId; @@ -433,6 +433,205 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; } +#endif + +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { + SMqPollReq* pReq = pMsg->pCont; + int64_t consumerId = pReq->consumerId; + int64_t fetchOffset; + int64_t blockingTime = pReq->blockingTime; + int32_t reqEpoch = pReq->epoch; + + if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { + fetchOffset = 0; + } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { + fetchOffset = walGetLastVer(pTq->pWal); + } else { + fetchOffset = pReq->currentOffset + 1; + } + + vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, + pTq->pVnode->vgId, pReq->currentOffset, fetchOffset); + + SMqPollRspV2 rspV2 = {0}; + rspV2.dataLen = 0; + + STqConsumer* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); + if (pConsumer == NULL) { + vWarn("tmq poll: consumer %ld (epoch %d) not found in vg %d", consumerId, pReq->epoch, pTq->pVnode->vgId); + pMsg->pCont = NULL; + pMsg->contLen = 0; + pMsg->code = -1; + tmsgSendRsp(pMsg); + return 0; + } + + int32_t consumerEpoch = atomic_load_32(&pConsumer->epoch); + while (consumerEpoch < reqEpoch) { + consumerEpoch = atomic_val_compare_exchange_32(&pConsumer->epoch, consumerEpoch, reqEpoch); + } + + STqTopic* pTopic = NULL; + int32_t topicSz = taosArrayGetSize(pConsumer->topics); + for (int32_t i = 0; i < topicSz; i++) { + STqTopic* topic = taosArrayGet(pConsumer->topics, i); + // TODO race condition + ASSERT(pConsumer->consumerId == consumerId); + if (strcmp(topic->topicName, pReq->topic) == 0) { + pTopic = topic; + break; + } + } + if (pTopic == NULL) { + vWarn("tmq poll: consumer %ld (epoch %d) topic %s not found in vg %d", consumerId, pReq->epoch, pReq->topic, + pTq->pVnode->vgId); + pMsg->pCont = NULL; + pMsg->contLen = 0; + pMsg->code = -1; + tmsgSendRsp(pMsg); + return 0; + } + + vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, + pTq->pVnode->vgId); + + rspV2.reqOffset = pReq->currentOffset; + rspV2.skipLogNum = 0; + + while (1) { + /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/ + // TODO + consumerEpoch = atomic_load_32(&pConsumer->epoch); + if (consumerEpoch > reqEpoch) { + vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", + consumerId, pReq->epoch, pTq->pVnode->vgId, fetchOffset, consumerEpoch, reqEpoch); + break; + } + SWalReadHead* pHead; + if (walReadWithHandle_s(pTopic->pReadhandle, fetchOffset, &pHead) < 0) { + // TODO: no more log, set timer to wait blocking time + // if data inserted during waiting, launch query and + // response to user + vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, + pTq->pVnode->vgId, fetchOffset); + break; + } + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, + pTq->pVnode->vgId, fetchOffset, pHead->msgType); + /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ + /*pHead = pTopic->pReadhandle->pHead;*/ + if (pHead->msgType == TDMT_VND_SUBMIT) { + SSubmitReq* pCont = (SSubmitReq*)&pHead->body; + qTaskInfo_t task = pTopic->buffer.output[workerId].task; + ASSERT(task); + qSetStreamInput(task, pCont, STREAM_DATA_TYPE_SUBMIT_BLOCK); + SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); + while (1) { + SSDataBlock* pDataBlock = NULL; + uint64_t ts; + if (qExecTask(task, &pDataBlock, &ts) < 0) { + ASSERT(false); + } + if (pDataBlock == NULL) { + /*pos = fetchOffset % TQ_BUFFER_SIZE;*/ + break; + } + + taosArrayPush(pRes, pDataBlock); + } + + if (taosArrayGetSize(pRes) == 0) { + vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, + pReq->epoch, pTq->pVnode->vgId, fetchOffset); + fetchOffset++; + rspV2.skipLogNum++; + taosArrayDestroy(pRes); + continue; + } + rspV2.rspOffset = fetchOffset; + + int32_t blockSz = taosArrayGetSize(pRes); + int32_t tlen = 0; + for (int32_t i = 0; i < blockSz; i++) { + SSDataBlock* pBlock = taosArrayGet(pRes, i); + tlen += sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pBlock); + } + + void* data = taosMemoryMalloc(tlen); + if (data == NULL) { + pMsg->code = -1; + taosMemoryFree(pHead); + } + + rspV2.blockData = data; + + void* dataBlockBuf = data; + int32_t pos; + for (int32_t i = 0; i < blockSz; i++) { + pos = 0; + SSDataBlock* pBlock = taosArrayGet(pRes, i); + blockCompressEncode(pBlock, dataBlockBuf, &pos, pBlock->info.numOfCols, false); + taosArrayPush(rspV2.blockPos, &rspV2.dataLen); + rspV2.dataLen += pos; + dataBlockBuf = POINTER_SHIFT(dataBlockBuf, pos); + } + + int32_t msgLen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2); + void* buf = rpcMallocCont(msgLen); + + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + ((SMqRspHead*)buf)->epoch = pReq->epoch; + ((SMqRspHead*)buf)->consumerId = consumerId; + + void* msgBodyBuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + tEncodeSMqPollRspV2(&msgBodyBuf, &rspV2); + + /*rsp.pBlockData = pRes;*/ + + /*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/ + pMsg->pCont = buf; + pMsg->contLen = tlen; + pMsg->code = 0; + vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", pTq->pVnode->vgId, fetchOffset, + pHead->msgType, consumerId, pReq->epoch); + tmsgSendRsp(pMsg); + taosMemoryFree(pHead); + return 0; + } else { + taosMemoryFree(pHead); + fetchOffset++; + rspV2.skipLogNum++; + } + } + + /*if (blockingTime != 0) {*/ + /*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/ + /*} else {*/ + + rspV2.rspOffset = fetchOffset - 1; + + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRspV2(NULL, &rspV2); + void* buf = rpcMallocCont(tlen); + if (buf == NULL) { + pMsg->code = -1; + return -1; + } + ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; + ((SMqRspHead*)buf)->epoch = pReq->epoch; + ((SMqRspHead*)buf)->consumerId = consumerId; + + void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); + tEncodeSMqPollRspV2(&abuf, &rspV2); + pMsg->pCont = buf; + pMsg->contLen = tlen; + pMsg->code = 0; + tmsgSendRsp(pMsg); + vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", pTq->pVnode->vgId, fetchOffset, consumerId, + pReq->epoch); + /*}*/ + + return 0; +} int32_t tqProcessRebReq(STQ* pTq, char* msg) { SMqMVRebReq req = {0}; diff --git a/source/libs/executor/src/dataDispatcher.c b/source/libs/executor/src/dataDispatcher.c index 626cb1b5f0..e897bc8892 100644 --- a/source/libs/executor/src/dataDispatcher.c +++ b/source/libs/executor/src/dataDispatcher.c @@ -92,7 +92,7 @@ static bool allocBuf(SDataDispatchHandle* pDispatcher, const SInputData* pInput, return false; } - pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockEstimateEncodeSize(pInput->pData); + pBuf->allocSize = sizeof(SRetrieveTableRsp) + blockGetEncodeSize(pInput->pData); pBuf->pData = taosMemoryMalloc(pBuf->allocSize); if (pBuf->pData == NULL) { diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index 45a9d31f4c..756893f217 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -195,7 +195,7 @@ void parseArgument(int32_t argc, char *argv[]) { } static int running = 1; -static void msg_process(tmq_message_t* message) { tmqShowMsg(message); } +/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/ // calc dir size (not include itself 4096Byte) int64_t getDirectorySize(char *dir) @@ -363,9 +363,9 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { } while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1); + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1); if (tmqmessage) { - msg_process(tmqmessage); + /*msg_process(tmqmessage);*/ tmq_message_destroy(tmqmessage); if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); @@ -392,12 +392,12 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog int32_t skipLogNum = 0; int64_t startTime = taosGetTimestampUs(); while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 3000); + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 3000); if (tmqmessage) { batchCnt++; - skipLogNum += tmqGetSkipLogNum(tmqmessage); + /*skipLogNum += tmqGetSkipLogNum(tmqmessage);*/ if (0 != g_stConfInfo.showMsgFlag) { - msg_process(tmqmessage); + /*msg_process(tmqmessage);*/ } tmq_message_destroy(tmqmessage); } else { diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index dc375dd35a..702d88f258 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -13,51 +13,49 @@ * along with this program. If not, see . */ -// clang-format off - #include +#include #include +#include #include -#include #include #include +#include #include -#include -#include #include "taos.h" #include "taoserror.h" #include "tlog.h" -#define GREEN "\033[1;32m" -#define NC "\033[0m" +#define GREEN "\033[1;32m" +#define NC "\033[0m" #define min(a, b) (((a) < (b)) ? (a) : (b)) -#define MAX_SQL_STR_LEN (1024 * 1024) -#define MAX_ROW_STR_LEN (16 * 1024) +#define MAX_SQL_STR_LEN (1024 * 1024) +#define MAX_ROW_STR_LEN (16 * 1024) typedef struct { - // input from argvs - char dbName[32]; - char topicString[256]; - char keyString[1024]; - int32_t showMsgFlag; - int32_t consumeDelay; // unit s - int32_t consumeMsgCnt; + // input from argvs + char dbName[32]; + char topicString[256]; + char keyString[1024]; + int32_t showMsgFlag; + int32_t consumeDelay; // unit s + int32_t consumeMsgCnt; - // save result after parse agrvs - int32_t numOfTopic; - char topics[32][64]; - - int32_t numOfKey; - char key[32][64]; - char value[32][64]; + // save result after parse agrvs + int32_t numOfTopic; + char topics[32][64]; + + int32_t numOfKey; + char key[32][64]; + char value[32][64]; } SConfInfo; static SConfInfo g_stConfInfo; -//char* g_pRowValue = NULL; -//TdFilePtr g_fp = NULL; +// char* g_pRowValue = NULL; +// TdFilePtr g_fp = NULL; static void printHelp() { char indent[10] = " "; @@ -80,13 +78,12 @@ static void printHelp() { exit(EXIT_SUCCESS); } -void parseArgument(int32_t argc, char *argv[]) { - +void parseArgument(int32_t argc, char* argv[]) { memset(&g_stConfInfo, 0, sizeof(SConfInfo)); - g_stConfInfo.showMsgFlag = 0; - g_stConfInfo.consumeDelay = 8000; + g_stConfInfo.showMsgFlag = 0; + g_stConfInfo.consumeDelay = 8000; g_stConfInfo.consumeMsgCnt = 0; - + for (int32_t i = 1; i < argc; i++) { if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { printHelp(); @@ -107,7 +104,7 @@ void parseArgument(int32_t argc, char *argv[]) { g_stConfInfo.consumeMsgCnt = atol(argv[++i]); } else { printf("%s unknow para: %s %s", GREEN, argv[++i], NC); - exit(-1); + exit(-1); } } @@ -117,91 +114,87 @@ void parseArgument(int32_t argc, char *argv[]) { pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC); pPrint("%s keyString:%s %s", GREEN, g_stConfInfo.keyString, NC); pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); -#endif +#endif } -void splitStr(char **arr, char *str, const char *del) { - char *s = strtok(str, del); - while(s != NULL) { +void splitStr(char** arr, char* str, const char* del) { + char* s = strtok(str, del); + while (s != NULL) { *arr++ = s; s = strtok(NULL, del); } } -void ltrim(char *str) -{ - if (str == NULL || *str == '\0') - { - return; - } - int len = 0; - char *p = str; - while (*p != '\0' && isspace(*p)) - { - ++p; ++len; - } - memmove(str, p, strlen(str) - len + 1); - //return str; +void ltrim(char* str) { + if (str == NULL || *str == '\0') { + return; + } + int len = 0; + char* p = str; + while (*p != '\0' && isspace(*p)) { + ++p; + ++len; + } + memmove(str, p, strlen(str) - len + 1); + // return str; } - void parseInputString() { - //printf("topicString: %s\n", g_stConfInfo.topicString); - //printf("keyString: %s\n\n", g_stConfInfo.keyString); + // printf("topicString: %s\n", g_stConfInfo.topicString); + // printf("keyString: %s\n\n", g_stConfInfo.keyString); - char *token; + char* token; const char delim[2] = ","; const char ch = ':'; token = strtok(g_stConfInfo.topicString, delim); - while(token != NULL) { - //printf("%s\n", token ); - strcpy(g_stConfInfo.topics[g_stConfInfo.numOfTopic], token); + while (token != NULL) { + // printf("%s\n", token ); + strcpy(g_stConfInfo.topics[g_stConfInfo.numOfTopic], token); ltrim(g_stConfInfo.topics[g_stConfInfo.numOfTopic]); - //printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); - g_stConfInfo.numOfTopic++; - + // printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]); + g_stConfInfo.numOfTopic++; + token = strtok(NULL, delim); } token = strtok(g_stConfInfo.keyString, delim); - while(token != NULL) { - //printf("%s\n", token ); - { - char* pstr = token; - ltrim(pstr); - char *ret = strchr(pstr, ch); - memcpy(g_stConfInfo.key[g_stConfInfo.numOfKey], pstr, ret-pstr); - strcpy(g_stConfInfo.value[g_stConfInfo.numOfKey], ret+1); - //printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], g_stConfInfo.value[g_stConfInfo.numOfKey]); - g_stConfInfo.numOfKey++; + while (token != NULL) { + // printf("%s\n", token ); + { + char* pstr = token; + ltrim(pstr); + char* ret = strchr(pstr, ch); + memcpy(g_stConfInfo.key[g_stConfInfo.numOfKey], pstr, ret - pstr); + strcpy(g_stConfInfo.value[g_stConfInfo.numOfKey], ret + 1); + // printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey], + // g_stConfInfo.value[g_stConfInfo.numOfKey]); + g_stConfInfo.numOfKey++; } - + token = strtok(NULL, delim); } } +static int running = 1; +/*static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }*/ -static int running = 1; -static void msg_process(tmq_message_t* message) { tmqShowMsg(message); } - - -int queryDB(TAOS *taos, char *command) { - TAOS_RES *pRes = taos_query(taos, command); - int code = taos_errno(pRes); - //if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { - if (code != 0) { - pError("failed to reason:%s, sql: %s", tstrerror(code), command); - taos_free_result(pRes); - return -1; - } - taos_free_result(pRes); - return 0 ; +int queryDB(TAOS* taos, char* command) { + TAOS_RES* pRes = taos_query(taos, command); + int code = taos_errno(pRes); + // if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { + if (code != 0) { + pError("failed to reason:%s, sql: %s", tstrerror(code), command); + taos_free_result(pRes); + return -1; + } + taos_free_result(pRes); + return 0; } tmq_t* build_consumer() { char sqlStr[1024] = {0}; - + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -209,13 +202,13 @@ tmq_t* build_consumer() { TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { printf("error in use db, reason:%s\n", taos_errstr(pRes)); - taos_free_result(pRes); - exit(-1); + taos_free_result(pRes); + exit(-1); } taos_free_result(pRes); tmq_conf_t* conf = tmq_conf_new(); - //tmq_conf_set(conf, "group.id", "tg2"); + // tmq_conf_set(conf, "group.id", "tg2"); for (int32_t i = 0; i < g_stConfInfo.numOfKey; i++) { tmq_conf_set(conf, g_stConfInfo.key[i], g_stConfInfo.value[i]); } @@ -225,7 +218,7 @@ tmq_t* build_consumer() { tmq_list_t* build_topic_list() { tmq_list_t* topic_list = tmq_list_new(); - //tmq_list_append(topic_list, "test_stb_topic_1"); + // tmq_list_append(topic_list, "test_stb_topic_1"); for (int32_t i = 0; i < g_stConfInfo.numOfTopic; i++) { tmq_list_append(topic_list, g_stConfInfo.topics[i]); } @@ -239,21 +232,21 @@ void loop_consume(tmq_t* tmq) { int32_t totalRows = 0; int32_t skipLogNum = 0; while (running) { - tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, 8000); + TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, 8000); if (tmqMsg) { - totalMsgs++; + totalMsgs++; - #if 0 +#if 0 TAOS_ROW row; while (NULL != (row = tmq_get_row(tmqMsg))) { totalRows++; } - #endif - - skipLogNum += tmqGetSkipLogNum(tmqMsg); - if (0 != g_stConfInfo.showMsgFlag) { - msg_process(tmqMsg); - } +#endif + + /*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/ + if (0 != g_stConfInfo.showMsgFlag) { + /*msg_process(tmqMsg);*/ + } tmq_message_destroy(tmqMsg); } else { break; @@ -263,13 +256,12 @@ void loop_consume(tmq_t* tmq) { err = tmq_consumer_close(tmq); if (err) { printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); - exit(-1); + exit(-1); } - + printf("{consume success: %d, %d}", totalMsgs, totalRows); } - void parallel_consume(tmq_t* tmq) { tmq_resp_err_t err; @@ -277,26 +269,26 @@ void parallel_consume(tmq_t* tmq) { int32_t totalRows = 0; int32_t skipLogNum = 0; while (running) { - tmq_message_t* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000); + TAOS_RES* tmqMsg = tmq_consumer_poll(tmq, g_stConfInfo.consumeDelay * 1000); if (tmqMsg) { - totalMsgs++; + totalMsgs++; - #if 0 +#if 0 TAOS_ROW row; while (NULL != (row = tmq_get_row(tmqMsg))) { totalRows++; } - #endif - - skipLogNum += tmqGetSkipLogNum(tmqMsg); - if (0 != g_stConfInfo.showMsgFlag) { - msg_process(tmqMsg); - } +#endif + + /*skipLogNum += tmqGetSkipLogNum(tmqMsg);*/ + if (0 != g_stConfInfo.showMsgFlag) { + /*msg_process(tmqMsg);*/ + } tmq_message_destroy(tmqMsg); - if (totalMsgs >= g_stConfInfo.consumeMsgCnt) { + if (totalMsgs >= g_stConfInfo.consumeMsgCnt) { break; - } + } } else { break; } @@ -305,22 +297,22 @@ void parallel_consume(tmq_t* tmq) { err = tmq_consumer_close(tmq); if (err) { printf("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); - exit(-1); + exit(-1); } - - printf("%d", totalMsgs); // output to sim for check result + + printf("%d", totalMsgs); // output to sim for check result } -int main(int32_t argc, char *argv[]) { +int main(int32_t argc, char* argv[]) { parseArgument(argc, argv); parseInputString(); - - tmq_t* tmq = build_consumer(); + + tmq_t* tmq = build_consumer(); tmq_list_t* topic_list = build_topic_list(); - if ((NULL == tmq) || (NULL == topic_list)){ + if ((NULL == tmq) || (NULL == topic_list)) { return -1; } - + tmq_resp_err_t err = tmq_subscribe(tmq, topic_list); if (err) { printf("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));