diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 2e8aa21da7..7870d7d9a1 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -47,7 +47,7 @@ int32_t init_env() { return -1; } - TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2"); + TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); if (taos_errno(pRes) != 0) { printf("error in create db, reason:%s\n", taos_errstr(pRes)); return -1; @@ -146,8 +146,8 @@ int32_t create_topic() { return 0; } -void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { - printf("commit %d tmq %p offsets %p param %p\n", resp, tmq, offsets, param); +void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + printf("commit %d tmq %p param %p\n", code, tmq, param); } tmq_t* build_consumer() { @@ -167,7 +167,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "td.connect.user", "root"); tmq_conf_set(conf, "td.connect.pass", "taosdata"); tmq_conf_set(conf, "msg.with.table.name", "true"); - tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "enable.auto.commit", "true"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); assert(tmq); @@ -183,10 +183,10 @@ tmq_list_t* build_topic_list() { } void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { - tmq_resp_err_t err; + int32_t code; - if ((err = tmq_subscribe(tmq, topics))) { - fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); + if ((code = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); printf("subscribe err\n"); return; } @@ -201,12 +201,13 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { taos_free_result(tmqmessage); /*} else {*/ /*break;*/ + /*tmq_commit_sync(tmq, NULL);*/ } } - err = tmq_consumer_close(tmq); - if (err) - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); + code = tmq_consumer_close(tmq); + if (code) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); else fprintf(stderr, "%% Consumer closed\n"); } @@ -214,11 +215,11 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { static const int MIN_COMMIT_COUNT = 1; - int msg_count = 0; - tmq_resp_err_t err; + int msg_count = 0; + int32_t code; - if ((err = tmq_subscribe(tmq, topics))) { - fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); + if ((code = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); return; } @@ -239,14 +240,14 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { msg_process(tmqmessage); taos_free_result(tmqmessage); - /*tmq_commit_async(tmq, NULL, tmq_commit_cb_print, NULL);*/ + /*tmq_commit_sync(tmq, NULL);*/ /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ } } - err = tmq_consumer_close(tmq); - if (err) - fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); + code = tmq_consumer_close(tmq); + if (code) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); else fprintf(stderr, "%% Consumer closed\n"); } diff --git a/include/client/taos.h b/include/client/taos.h index 479090b04d..e9f7f88387 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -209,21 +209,20 @@ DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLi /* --------------------------TMQ INTERFACE------------------------------- */ +#if 0 enum { TMQ_RESP_ERR__FAIL = -1, TMQ_RESP_ERR__SUCCESS = 0, }; typedef int32_t tmq_resp_err_t; +#endif -typedef struct tmq_t tmq_t; -typedef struct tmq_topic_vgroup_t tmq_topic_vgroup_t; -typedef struct tmq_topic_vgroup_list_t tmq_topic_vgroup_list_t; - +typedef struct tmq_t tmq_t; typedef struct tmq_conf_t tmq_conf_t; typedef struct tmq_list_t tmq_list_t; -typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, void *param)); +typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param)); DLL_EXPORT tmq_list_t *tmq_list_new(); DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); @@ -233,25 +232,19 @@ DLL_EXPORT char **tmq_list_to_c_array(const tmq_list_t *); DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen); -DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); +DLL_EXPORT const char *tmq_err2str(int32_t code); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ -DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); -DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); -DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); - +DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); +DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq); +DLL_EXPORT int32_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); // timeout: -1 means infinitely waiting DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); +DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); -DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t *tmq); -DLL_EXPORT tmq_resp_err_t tmq_commit_sync(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets); -DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, tmq_commit_cb *cb, void *param); - -#if 0 -DLL_EXPORT tmq_resp_err_t tmq_commit(tmq_t *tmq, const tmq_topic_vgroup_list_t *offsets, int32_t async); -DLL_EXPORT tmq_resp_err_t tmq_seek(tmq_t *tmq, const tmq_topic_vgroup_t *offset); -#endif +DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); +DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); /* ----------------------TMQ CONFIGURATION INTERFACE---------------------- */ @@ -275,11 +268,6 @@ DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res); -#if 0 -DLL_EXPORT int64_t tmq_get_request_offset(tmq_message_t *message); -DLL_EXPORT int64_t tmq_get_response_offset(tmq_message_t *message); -#endif - /* ------------------------------ TMQ END -------------------------------- */ #if 1 // Shuduo: temporary enable for app build diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 22acf2089c..d72e5351e3 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -35,6 +35,7 @@ enum { TMQ_MSG_TYPE__DUMMY = 0, TMQ_MSG_TYPE__POLL_RSP, TMQ_MSG_TYPE__EP_RSP, + TMQ_MSG_TYPE__END_RSP, }; typedef enum EStreamType { diff --git a/include/libs/nodes/nodes.h b/include/libs/nodes/nodes.h index 6f239cc892..08190d978e 100644 --- a/include/libs/nodes/nodes.h +++ b/include/libs/nodes/nodes.h @@ -228,6 +228,7 @@ typedef enum ENodeType { QUERY_NODE_PHYSICAL_PLAN_FILL, QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, + QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION, QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION, QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 749d58b224..a9002b5d19 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -130,13 +130,17 @@ typedef struct SMergeLogicNode { typedef enum EWindowType { WINDOW_TYPE_INTERVAL = 1, WINDOW_TYPE_SESSION, WINDOW_TYPE_STATE } EWindowType; -typedef enum EIntervalAlgorithm { +typedef enum EWindowAlgorithm { INTERVAL_ALGO_HASH = 1, INTERVAL_ALGO_MERGE, INTERVAL_ALGO_STREAM_FINAL, INTERVAL_ALGO_STREAM_SEMI, INTERVAL_ALGO_STREAM_SINGLE, -} EIntervalAlgorithm; + SESSION_ALGO_STREAM_SEMI, + SESSION_ALGO_STREAM_FINAL, + SESSION_ALGO_STREAM_SINGLE, + SESSION_ALGO_MERGE, +} EWindowAlgorithm; typedef struct SWindowLogicNode { SLogicNode node; @@ -153,7 +157,7 @@ typedef struct SWindowLogicNode { int8_t triggerType; int64_t watermark; double filesFactor; - EIntervalAlgorithm intervalAlgo; + EWindowAlgorithm windowAlgo; } SWindowLogicNode; typedef struct SFillLogicNode { @@ -371,6 +375,8 @@ typedef struct SSessionWinodwPhysiNode { } SSessionWinodwPhysiNode; typedef SSessionWinodwPhysiNode SStreamSessionWinodwPhysiNode; +typedef SSessionWinodwPhysiNode SStreamSemiSessionWinodwPhysiNode; +typedef SSessionWinodwPhysiNode SStreamFinalSessionWinodwPhysiNode; typedef struct SStateWinodwPhysiNode { SWinodwPhysiNode window; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 3e7b86d74d..d98d0f7c1f 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -430,7 +430,8 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TQ_META_KEY_NOT_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A09) #define TSDB_CODE_TQ_META_KEY_DUP_IN_TXN TAOS_DEF_ERROR_CODE(0, 0x0A0A) #define TSDB_CODE_TQ_GROUP_NOT_SET TAOS_DEF_ERROR_CODE(0, 0x0A0B) -#define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0B) +#define TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x0A0C) +#define TSDB_CODE_TQ_NO_COMMITTED_OFFSET TAOS_DEF_ERROR_CODE(0, 0x0A0D) // wal #define TSDB_CODE_WAL_APP_ERROR TAOS_DEF_ERROR_CODE(0, 0x1000) @@ -710,6 +711,8 @@ int32_t* taosGetErrno(); //index #define TSDB_CODE_INDEX_REBUILDING TAOS_DEF_ERROR_CODE(0, 0x3200) +//tmq +#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000) #ifdef __cplusplus } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index dd900e6045..638b4f1ea5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -48,14 +48,6 @@ struct tmq_list_t { SArray container; }; -struct tmq_topic_vgroup_t { - SMqOffset offset; -}; - -struct tmq_topic_vgroup_list_t { - SArray container; // SArray -}; - struct tmq_conf_t { char clientId[256]; char groupId[TSDB_CGROUP_LEN]; @@ -161,9 +153,9 @@ typedef struct { } SMqPollRspWrapper; typedef struct { - tmq_t* tmq; - tsem_t rspSem; - tmq_resp_err_t rspErr; + tmq_t* tmq; + tsem_t rspSem; + int32_t rspErr; } SMqSubscribeCbParam; typedef struct { @@ -189,7 +181,7 @@ typedef struct { int8_t freeOffsets; tmq_commit_cb* userCb; tsem_t rspSem; - tmq_resp_err_t rspErr; + int32_t rspErr; SArray* offsets; void* userParam; } SMqCommitCbParam; @@ -201,12 +193,12 @@ typedef struct { int8_t freeOffsets; int32_t waitingRspNum; int32_t totalRspNum; - tmq_resp_err_t rspErr; + int32_t rspErr; tmq_commit_cb* userCb; - SArray* successfulOffsets; - SArray* failedOffsets; - void* userParam; - tsem_t rspSem; + /*SArray* successfulOffsets;*/ + /*SArray* failedOffsets;*/ + void* userParam; + tsem_t rspSem; } SMqCommitCbParamSet; typedef struct { @@ -347,10 +339,9 @@ int32_t tmqCommitCb(void* param, const SDataBuf* pMsg, int32_t code) { pParam->rspErr = code; if (pParam->async) { if (pParam->automatic && pParam->tmq->commitCb) { - pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, - pParam->tmq->commitCbUserParam); + pParam->tmq->commitCb(pParam->tmq, pParam->rspErr, pParam->tmq->commitCbUserParam); } else if (!pParam->automatic && pParam->userCb) { - pParam->userCb(pParam->tmq, pParam->rspErr, (tmq_topic_vgroup_list_t*)pParam->offsets, pParam->userParam); + pParam->userCb(pParam->tmq, pParam->rspErr, pParam->userParam); } if (pParam->freeOffsets) { @@ -368,11 +359,16 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) { SMqCommitCbParam2* pParam = (SMqCommitCbParam2*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; // push into array +#if 0 if (code == 0) { taosArrayPush(pParamSet->failedOffsets, &pParam->pOffset); } else { taosArrayPush(pParamSet->successfulOffsets, &pParam->pOffset); } +#endif + + /*tscDebug("receive offset commit cb of %s on vg %d, offset is %ld", pParam->pOffset->subKey, pParam->->vgId, + * pOffset->version);*/ // count down waiting rsp int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); @@ -383,23 +379,149 @@ int32_t tmqCommitCb2(void* param, const SDataBuf* pBuf, int32_t code) { if (pParamSet->async) { // call async cb func if (pParamSet->automatic && pParamSet->tmq->commitCb) { - pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->tmq->commitCbUserParam); + pParamSet->tmq->commitCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->tmq->commitCbUserParam); } else if (!pParamSet->automatic && pParamSet->userCb) { // sem post - pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, NULL, pParamSet->userParam); + pParamSet->userCb(pParamSet->tmq, pParamSet->rspErr, pParamSet->userParam); } + } else { + tsem_post(&pParamSet->rspSem); } +#if 0 taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); +#endif } return 0; } -int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async, - tmq_commit_cb* userCb, void* userParam) { +int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, + void* userParam) { int32_t code = -1; + if (msg != NULL) { + SMqRspObj* pRspObj = (SMqRspObj*)msg; + if (!TD_RES_TMQ(pRspObj)) { + return TSDB_CODE_TMQ_INVALID_MSG; + } + + SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); + if (pParamSet == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pParamSet->tmq = tmq; + pParamSet->automatic = automatic; + pParamSet->async = async; + pParamSet->freeOffsets = 1; + pParamSet->userCb = userCb; + pParamSet->userParam = userParam; + tsem_init(&pParamSet->rspSem, 0, 0); + + for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { + SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + if (strcmp(pTopic->topicName, pRspObj->topic) == 0) { + for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + if (pVg->vgId == pRspObj->vgId) { + if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) { + tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld", + tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset); + + return 0; + } + + STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset)); + if (pOffset == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pOffset->type = TMQ_OFFSET__LOG; + pOffset->version = pVg->currentOffset; + + int32_t groupLen = strlen(tmq->groupId); + memcpy(pOffset->subKey, tmq->groupId, groupLen); + pOffset->subKey[groupLen] = TMQ_SEPARATOR; + strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName); + + int32_t len; + int32_t code; + tEncodeSize(tEncodeSTqOffset, pOffset, len, code); + if (code < 0) { + ASSERT(0); + } + void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len); + ((SMsgHead*)buf)->vgId = htonl(pVg->vgId); + + void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); + + SEncoder encoder; + tEncoderInit(&encoder, abuf, len); + tEncodeSTqOffset(&encoder, pOffset); + + // build param + SMqCommitCbParam2* pParam = taosMemoryCalloc(1, sizeof(SMqCommitCbParam2)); + pParam->params = pParamSet; + pParam->pOffset = pOffset; + + // build send info + SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + if (pMsgSendInfo == NULL) { + // TODO + continue; + } + pMsgSendInfo->msgInfo = (SDataBuf){ + .pData = buf, + .len = sizeof(SMsgHead) + len, + .handle = NULL, + }; + + tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey, + pVg->vgId, pOffset->version); + + // TODO: put into cb + pVg->committedOffset = pVg->currentOffset; + + pMsgSendInfo->requestId = generateRequestId(); + pMsgSendInfo->requestObjRefId = 0; + pMsgSendInfo->param = pParam; + pMsgSendInfo->fp = tmqCommitCb2; + pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET; + // send msg + + int64_t transporterId = 0; + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); + pParamSet->waitingRspNum++; + pParamSet->totalRspNum++; + } + } + } + } + if (pParamSet->totalRspNum == 0) { + tsem_destroy(&pParamSet->rspSem); + taosMemoryFree(pParamSet); + return 0; + } + + if (!async) { + tsem_wait(&pParamSet->rspSem); + code = pParamSet->rspErr; + tsem_destroy(&pParamSet->rspSem); + } else { + code = 0; + } + + if (code != 0 && async) { + if (automatic) { + tmq->commitCb(tmq, code, tmq->commitCbUserParam); + } else { + userCb(tmq, code, userParam); + } + } + return 0; + } + SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -415,17 +537,35 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8 for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); + + tscDebug("consumer %ld begin commit for topic %s, vgNum %d", tmq->consumerId, pTopic->topicName, + (int32_t)taosArrayGetSize(pTopic->vgs)); + for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) { - SMqClientVg* pVg = taosArrayGet(pTopic->vgs, i); - STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset)); + SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); + + tscDebug("consumer %ld begin commit for topic %s, vgId %d", tmq->consumerId, pTopic->topicName, pVg->vgId); + + /*if (pVg->currentOffset < 0) {*/ + if (pVg->currentOffset < 0 || pVg->committedOffset == pVg->currentOffset) { + tscDebug("consumer %ld skip commit for topic %s vg %d, current offset is %ld, committed offset is %ld", + tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset, pVg->committedOffset); + + continue; + } + STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset)); if (pOffset == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - int32_t tlen = strlen(tmq->groupId); - memcpy(pOffset->subKey, tmq->groupId, tlen); - pOffset->subKey[tlen] = TMQ_SEPARATOR; - strcpy(pOffset->subKey + tlen + 1, pTopic->topicName); + pOffset->type = TMQ_OFFSET__LOG; + pOffset->version = pVg->currentOffset; + + int32_t groupLen = strlen(tmq->groupId); + memcpy(pOffset->subKey, tmq->groupId, groupLen); + pOffset->subKey[groupLen] = TMQ_SEPARATOR; + strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName); + int32_t len; int32_t code; tEncodeSize(tEncodeSTqOffset, pOffset, len, code); @@ -454,25 +594,36 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8 } pMsgSendInfo->msgInfo = (SDataBuf){ .pData = buf, - .len = len, + .len = sizeof(SMsgHead) + len, .handle = NULL, }; + tscDebug("consumer %ld commit offset of %s on vg %d, offset is %ld", tmq->consumerId, pOffset->subKey, pVg->vgId, + pOffset->version); + + // TODO: put into cb + pVg->committedOffset = pVg->currentOffset; + pMsgSendInfo->requestId = generateRequestId(); pMsgSendInfo->requestObjRefId = 0; pMsgSendInfo->param = pParam; pMsgSendInfo->fp = tmqCommitCb2; - pMsgSendInfo->msgType = TDMT_MND_MQ_COMMIT_OFFSET; + pMsgSendInfo->msgType = TDMT_VND_MQ_COMMIT_OFFSET; // send msg - SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, pMsgSendInfo); + asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo); pParamSet->waitingRspNum++; pParamSet->totalRspNum++; } } + if (pParamSet->totalRspNum == 0) { + tsem_destroy(&pParamSet->rspSem); + taosMemoryFree(pParamSet); + return 0; + } + if (!async) { tsem_wait(&pParamSet->rspSem); code = pParamSet->rspErr; @@ -483,21 +634,24 @@ int32_t tmqCommitInner2(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8 if (code != 0 && async) { if (automatic) { - tmq->commitCb(tmq, code, NULL, tmq->commitCbUserParam); + tmq->commitCb(tmq, code, tmq->commitCbUserParam); } else { - userCb(tmq, code, NULL, userParam); + userCb(tmq, code, userParam); } } +#if 0 if (!async) { taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree); taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree); } +#endif return 0; } -int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_t automatic, int8_t async, +#if 0 +int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) { SMqCMCommitOffsetReq req; SArray* pOffsets = NULL; @@ -507,7 +661,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_ int8_t freeOffsets; int32_t code = -1; - if (offsets == NULL) { + if (msg == NULL) { freeOffsets = 1; pOffsets = taosArrayInit(0, sizeof(SMqOffset)); for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { @@ -524,7 +678,7 @@ int32_t tmqCommitInner(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int8_ } } else { freeOffsets = 0; - pOffsets = (SArray*)&offsets->container; + pOffsets = (SArray*)&msg->container; } req.num = (int32_t)pOffsets->size; @@ -611,6 +765,7 @@ END: } return code; } +#endif void tmqAssignDelayedHbTask(void* param, void* tmrId) { tmq_t* tmq = (tmq_t*)param; @@ -648,7 +803,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { tmqAskEp(tmq, true); taosTmrReset(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer, &tmq->hbTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { - tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam); + tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam); taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { } else { @@ -689,7 +844,7 @@ int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { return 0; } -tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { +int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { if (*topics == NULL) { *topics = tmq_list_new(); } @@ -697,12 +852,12 @@ tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i); tmq_list_append(*topics, strchr(topic->topicName, '.') + 1); } - return TMQ_RESP_ERR__SUCCESS; + return 0; } -tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq) { - tmq_list_t* lst = tmq_list_new(); - tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); +int32_t tmq_unsubscribe(tmq_t* tmq) { + tmq_list_t* lst = tmq_list_new(); + int32_t rsp = tmq_subscribe(tmq, lst); tmq_list_destroy(lst); return rsp; } @@ -818,11 +973,13 @@ FAIL: return NULL; } -tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) { - return tmqCommitInner(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam); +#if 0 +int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) { + return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam); } +#endif -tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { +int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); void* buf = NULL; @@ -862,7 +1019,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { if (sendInfo == NULL) goto FAIL; SMqSubscribeCbParam param = { - .rspErr = TMQ_RESP_ERR__SUCCESS, + .rspErr = 0, .tmq = tmq, }; @@ -943,6 +1100,19 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { if (code != 0) { tscWarn("msg discard from vg %d, epoch %d, code:%x", vgId, epoch, code); if (pMsg->pData) taosMemoryFree(pMsg->pData); + if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { + SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); + if (pRspWrapper == NULL) { + taosMemoryFree(pMsg->pData); + tscWarn("msg discard from vg %d, epoch %d since out of memory", vgId, epoch); + goto CREATE_MSG_FAIL; + } + pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; + /*pRspWrapper->vgHandle = pVg;*/ + /*pRspWrapper->topicHandle = pTopic;*/ + taosWriteQitem(tmq->mqueue, pRspWrapper); + tsem_post(&tmq->rspSem); + } goto CREATE_MSG_FAIL; } @@ -1017,14 +1187,13 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { if (pTopicCur->vgs) { int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); tscDebug("consumer %ld new vg num: %d", tmq->consumerId, vgNumCur); - if (vgNumCur == 0) break; for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); - tscDebug("consumer %ld epoch %d vg %d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey); + tscDebug("consumer %ld epoch %d vg %d vgKey is %s, offset is %ld", tmq->consumerId, epoch, pVgCur->vgId, vgKey, + pVgCur->currentOffset); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); } - break; } } @@ -1032,7 +1201,6 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { SMqClientTopic topic = {0}; SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); topic.schema = pTopicEp->schema; - taosHashClear(pHash); topic.topicName = strdup(pTopicEp->topic); tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN); @@ -1049,7 +1217,8 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { offset = *pOffset; } - tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld", tmq->consumerId, epoch, pVgEp->vgId, offset); + tscDebug("consumer %ld(epoch %d) offset of vg %d updated to %ld, vgKey is %s", tmq->consumerId, epoch, + pVgEp->vgId, offset, vgKey); SMqClientVg clientVg = { .pollCnt = 0, .currentOffset = offset, @@ -1076,6 +1245,7 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { return set; } +#if 1 bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { /*printf("call update ep %d\n", epoch);*/ bool set = false; @@ -1160,6 +1330,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { atomic_store_32(&tmq->epoch, epoch); return set; } +#endif int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; @@ -1186,7 +1357,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ - tmqUpdateEp(tmq, head->epoch, &rsp); + tmqUpdateEp2(tmq, head->epoch, &rsp); tDeleteSMqAskEpRsp(&rsp); } else { SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM); @@ -1283,7 +1454,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { return code; } -tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { +#if 0 +int32_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { const SMqOffset* pOffset = &offset->offset; if (strcmp(pOffset->cgroup, tmq->groupId) != 0) { return TMQ_RESP_ERR__FAIL; @@ -1305,16 +1477,17 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { } return TMQ_RESP_ERR__FAIL; } +#endif SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { int64_t reqOffset; if (pVg->currentOffset >= 0) { reqOffset = pVg->currentOffset; } else { - if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) { - tscError("unable to poll since no committed offset but reset offset is set to none"); - return NULL; - } + /*if (tmq->resetOffsetCfg == TMQ_CONF__RESET_OFFSET__NONE) {*/ + /*tscError("unable to poll since no committed offset but reset offset is set to none");*/ + /*return NULL;*/ + /*}*/ reqOffset = tmq->resetOffsetCfg; } @@ -1326,10 +1499,10 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* /*strcpy(pReq->topic, pTopic->topicName);*/ /*strcpy(pReq->cgroup, tmq->groupId);*/ - int32_t tlen = strlen(tmq->groupId); - memcpy(pReq->subKey, tmq->groupId, tlen); - pReq->subKey[tlen] = TMQ_SEPARATOR; - strcpy(pReq->subKey + tlen + 1, pTopic->topicName); + int32_t groupLen = strlen(tmq->groupId); + memcpy(pReq->subKey, tmq->groupId, groupLen); + pReq->subKey[groupLen] = TMQ_SEPARATOR; + strcpy(pReq->subKey + groupLen + 1, pTopic->topicName); pReq->withTbName = tmq->withTbName; pReq->timeout = timeout; @@ -1440,7 +1613,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) { SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; - tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg); + tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg); /*tmqClearUnhandleMsg(tmq);*/ *pReset = true; } else { @@ -1462,7 +1635,11 @@ SMqRspObj* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { if (rspWrapper == NULL) return NULL; } - if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { + if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { + taosFreeQitem(rspWrapper); + terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; + return NULL; + } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)rspWrapper; /*atomic_sub_fetch_32(&tmq->readyRequest, 1);*/ int32_t consumerEpoch = atomic_load_32(&tmq->epoch); @@ -1523,6 +1700,8 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { rspObj = tmqHandleAllRsp(tmq, timeout, false); if (rspObj) { return (TAOS_RES*)rspObj; + } else if (terrno == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { + return NULL; } if (timeout != -1) { int64_t endTime = taosGetTimestampMs(); @@ -1539,10 +1718,10 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { } } -tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { +int32_t tmq_consumer_close(tmq_t* tmq) { if (tmq->status == TMQ_CONSUMER_STATUS__READY) { - tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != TMQ_RESP_ERR__SUCCESS) { + int32_t rsp = tmq_commit_sync(tmq, NULL); + if (rsp != 0) { return rsp; } @@ -1550,18 +1729,18 @@ tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { rsp = tmq_subscribe(tmq, lst); tmq_list_destroy(lst); - if (rsp != TMQ_RESP_ERR__SUCCESS) { + if (rsp != 0) { return rsp; } } // TODO: free resources - return TMQ_RESP_ERR__SUCCESS; + return 0; } -const char* tmq_err2str(tmq_resp_err_t err) { - if (err == TMQ_RESP_ERR__SUCCESS) { +const char* tmq_err2str(int32_t err) { + if (err == 0) { return "success"; - } else if (err == TMQ_RESP_ERR__FAIL) { + } else if (err == -1) { return "fail"; } else { return tstrerror(err); @@ -1607,10 +1786,8 @@ const char* tmq_get_table_name(TAOS_RES* res) { return NULL; } -void tmq_commit_async(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, tmq_commit_cb* cb, void* param) { - tmqCommitInner(tmq, offsets, 0, 1, cb, param); +void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { + tmqCommitInner2(tmq, msg, 0, 1, cb, param); } -tmq_resp_err_t tmq_commit_sync(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets) { - return tmqCommitInner(tmq, offsets, 0, 0, NULL, NULL); -} +int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) { return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL); } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index d1bed90175..58c3570c36 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -761,11 +761,12 @@ static int32_t mndRetrieveDnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, (const char *)&pDnode->createdTime, false); - char b[tListLen(offlineReason) + VARSTR_HEADER_SIZE] = {0}; + char *b = taosMemoryCalloc(VARSTR_HEADER_SIZE + strlen(offlineReason[pDnode->offlineReason]) + 1, 1); STR_TO_VARSTR(b, online ? "" : offlineReason[pDnode->offlineReason]); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); colDataAppend(pColInfo, numOfRows, b, false); + taosMemoryFreeClear(b); numOfRows++; sdbRelease(pSdb, pDnode); diff --git a/source/dnode/mnode/impl/src/mndMain.c b/source/dnode/mnode/impl/src/mndMain.c index 7b5025a986..bc418e0f53 100644 --- a/source/dnode/mnode/impl/src/mndMain.c +++ b/source/dnode/mnode/impl/src/mndMain.c @@ -510,8 +510,7 @@ static int32_t mndCheckMnodeState(SRpcMsg *pMsg) { if (IsReq(pMsg) && pMsg->msgType != TDMT_MND_MQ_TIMER && pMsg->msgType != TDMT_MND_TELEM_TIMER && pMsg->msgType != TDMT_MND_TRANS_TIMER) { - mError("msg:%p, failed to check mnode state since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, - TMSG_INFO(pMsg->msgType)); + mError("msg:%p, failed to check mnode state since %s, type:%s", pMsg, terrstr(), TMSG_INFO(pMsg->msgType)); SEpSet epSet = {0}; mndGetMnodeEpSet(pMsg->info.node, &epSet); @@ -534,7 +533,8 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) { if (!IsReq(pMsg)) return 0; if (pMsg->contLen != 0 && pMsg->pCont != NULL) return 0; - mError("msg:%p, failed to check msg content, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); + mError("msg:%p, failed to check msg, cont:%p contLen:%d, app:%p type:%s", pMsg, pMsg->pCont, pMsg->contLen, + pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); terrno = TSDB_CODE_INVALID_MSG_LEN; return -1; } @@ -558,7 +558,7 @@ int32_t mndProcessRpcMsg(SRpcMsg *pMsg) { if (code == TSDB_CODE_ACTION_IN_PROGRESS) { mTrace("msg:%p, won't response immediately since in progress", pMsg); } else if (code == 0) { - mTrace("msg:%p, successfully processed and response", pMsg); + mTrace("msg:%p, successfully processed", pMsg); } else { mError("msg:%p, failed to process since %s, app:%p type:%s", pMsg, terrstr(), pMsg->info.ahandle, TMSG_INFO(pMsg->msgType)); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 858c6f956f..11e07cb9fa 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -661,7 +661,7 @@ static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { } sdbSetRawStatus(pRaw, SDB_STATUS_READY); - mDebug("trans:%d, sync to other mnodes", pTrans->id); + mDebug("trans:%d, sync to other mnodes, stage:%s", pTrans->id, mndTransStr(pTrans->stage)); int32_t code = mndSyncPropose(pMnode, pRaw, pTrans->id); if (code != 0) { mError("trans:%d, failed to sync since %s", pTrans->id, terrstr()); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index cec83d1af5..94ddbcd409 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1058,7 +1058,7 @@ int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t del static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t newDnodeId) { - mDebug("vgId:%d, will add 1 vnode, replica:%d, dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId); + mDebug("vgId:%d, will add 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, newDnodeId); SVnodeGid *pGid = &pVgroup->vnodeGid[pVgroup->replica]; pVgroup->replica++; @@ -1074,7 +1074,7 @@ static int32_t mndAddIncVgroupReplicaToTrans(SMnode *pMnode, STrans *pTrans, SDb static int32_t mndAddDecVgroupReplicaFromTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t delDnodeId) { - mDebug("vgId:%d, will remove 1 vnode, replica:%d, dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId); + mDebug("vgId:%d, will remove 1 vnode, replica:%d dnode:%d", pVgroup->vgId, pVgroup->replica, delDnodeId); SVnodeGid *pGid = NULL; SVnodeGid delGid = {0}; @@ -1116,7 +1116,8 @@ static int32_t mndRedistributeVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, memcpy(&newVg, pVgroup, sizeof(SVgObj)); mInfo("vgId:%d, vgroup info before redistribute, replica:%d", newVg.vgId, newVg.replica); for (int32_t i = 0; i < newVg.replica; ++i) { - mInfo("vgId:%d, vnode:%d dnode:%d", newVg.vgId, i, newVg.vnodeGid[i].dnodeId); + mInfo("vgId:%d, vnode:%d dnode:%d role:%s", newVg.vgId, i, newVg.vnodeGid[i].dnodeId, + syncStr(newVg.vnodeGid[i].role)); } if (pNew1 != NULL && pOld1 != NULL) { @@ -1198,7 +1199,7 @@ static int32_t mndProcessRedistributeVgroupMsg(SRpcMsg *pReq) { goto _OVER; } - mInfo("vgId:%d, start to redistribute to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3); + mInfo("vgId:%d, start to redistribute vgroup to dnode %d:%d:%d", req.vgId, req.dnodeId1, req.dnodeId2, req.dnodeId3); if (mndCheckOperAuth(pMnode, pReq->info.conn.user, MND_OPER_REDISTRIBUTE_VGROUP) != 0) goto _OVER; diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index a6339125c4..8cfebe1b98 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -628,7 +628,7 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) { void * tagData = NULL; if (param->val == NULL) { - metaError("vgId:%d failed to filter NULL data", TD_VID(pMeta->pVnode)); + metaError("vgId:%d, failed to filter NULL data", TD_VID(pMeta->pVnode)); return -1; } else { if (IS_VAR_DATA_TYPE(param->type)) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c9d3267adb..70b09ec701 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -124,17 +124,20 @@ int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { tDecoderClear(&decoder); if (offset.type == TMQ_OFFSET__SNAPSHOT) { - tqDebug("receive offset commit msg to %s, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey, offset.uid, - offset.ts); + tqDebug("receive offset commit msg to %s on vg %d, offset(type:snapshot) uid: %ld, ts: %ld", offset.subKey, + pTq->pVnode->config.vgId, offset.uid, offset.ts); } else if (offset.type == TMQ_OFFSET__LOG) { - tqDebug("receive offset commit msg to %s, offset(type:log) version: %ld", offset.subKey, offset.version); + tqDebug("receive offset commit msg to %s on vg %d, offset(type:log) version: %ld", offset.subKey, + pTq->pVnode->config.vgId, offset.version); } else { ASSERT(0); } - - if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { - ASSERT(0); - return -1; + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, offset.subKey); + if (pOffset == NULL || pOffset->version < offset.version) { + if (tqOffsetWrite(pTq->pOffsetStore, &offset) < 0) { + ASSERT(0); + return -1; + } } return 0; @@ -149,16 +152,33 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { int32_t code = 0; // get offset to fetch message - if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { - fetchOffset = walGetFirstVer(pTq->pWal); - } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { - fetchOffset = walGetCommittedVer(pTq->pWal); - } else { + if (pReq->currentOffset >= 0) { fetchOffset = pReq->currentOffset + 1; + } else { + STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, pReq->subKey); + if (pOffset != NULL) { + ASSERT(pOffset->type == TMQ_OFFSET__LOG); + tqDebug("consumer %ld, restore offset of %s on vg %d, offset(type:log) version: %ld", consumerId, pReq->subKey, + pTq->pVnode->config.vgId, pOffset->version); + fetchOffset = pOffset->version + 1; + } else { + if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { + fetchOffset = walGetFirstVer(pTq->pWal); + } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) { + fetchOffset = walGetCommittedVer(pTq->pWal); + } else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__NONE) { + tqError("tmq poll: no offset committed for consumer %ld in vg %d, subkey %s", consumerId, + pTq->pVnode->config.vgId, pReq->subKey); + terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; + return -1; + } + tqDebug("consumer %ld, restore offset of %s on vg %d failed, config is %ld, set to %ld", consumerId, pReq->subKey, + pTq->pVnode->config.vgId, pReq->currentOffset, fetchOffset); + } } - tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, - TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); + tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %ld fetch offset %ld", consumerId, + pReq->epoch, TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); STqHandle* pHandle = taosHashGet(pTq->handles, pReq->subKey, strlen(pReq->subKey)); /*ASSERT(pHandle);*/ diff --git a/source/dnode/vnode/src/tq/tqCommit.c b/source/dnode/vnode/src/tq/tqCommit.c index 7b116bff2e..639da22b1c 100644 --- a/source/dnode/vnode/src/tq/tqCommit.c +++ b/source/dnode/vnode/src/tq/tqCommit.c @@ -15,7 +15,4 @@ #include "tq.h" -int tqCommit(STQ* pTq) { - // do nothing - return 0; -} +int tqCommit(STQ* pTq) { return tqOffsetSnapshot(pTq->pOffsetStore); } diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 8d6cb28065..e5475d7c30 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -92,6 +92,8 @@ STqOffset* tqOffsetRead(STqOffsetStore* pStore, const char* subscribeKey) { } int32_t tqOffsetWrite(STqOffsetStore* pStore, const STqOffset* pOffset) { + ASSERT(pOffset->type == TMQ_OFFSET__LOG); + ASSERT(pOffset->version >= 0); return taosHashPut(pStore->pHash, pOffset->subKey, strlen(pOffset->subKey), pOffset, sizeof(STqOffset)); } @@ -129,7 +131,7 @@ int32_t tqOffsetSnapshot(STqOffsetStore* pStore) { tEncodeSTqOffset(&encoder, pOffset); // write file int64_t writeLen; - if ((writeLen = taosWriteFile(pFile, buf, totLen)) != bodyLen) { + if ((writeLen = taosWriteFile(pFile, buf, totLen)) != totLen) { ASSERT(0); tqError("write offset incomplete, len %d, write len %ld", bodyLen, writeLen); taosHashCancelIterate(pStore->pHash, pIter); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index f46114746e..fded723f1a 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -61,6 +61,7 @@ static int32_t vnodeSetStandBy(SVnode *pVnode) { return -1; } + vInfo("vgId:%d, start to transfer leader", TD_VID(pVnode)); if (syncLeaderTransfer(pVnode->sync) != 0) { vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr()); return -1; @@ -297,7 +298,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { ret = -1; } - if (ret != 0) { + if (ret != 0 && terrno == 0) { terrno = TSDB_CODE_SYN_INTERNAL_ERROR; } return ret; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e2f8f12788..ab218e724e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -845,8 +845,10 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SNode* pOnCondition, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, + SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, + SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index c3cd75ebae..dbbc411f91 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4728,18 +4728,13 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) { - SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; - - STimeWindowAggSupp as = {.waterMark = pSessionNode->window.watermark, - .calTrigger = pSessionNode->window.triggerType}; - - SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &num); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; - - pOptr = createStreamSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, - pTaskInfo); - + pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) { + int32_t children = 0; + pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); + } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION == type) { + int32_t children = 1; + pOptr = createStreamFinalSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo, children); } else if (QUERY_NODE_PHYSICAL_PLAN_PARTITION == type) { pOptr = createPartitionOperatorInfo(ops[0], (SPartitionPhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE == type) { diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 81899b68cd..77eeb24210 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -392,10 +392,8 @@ SOperatorInfo* createMultiwaySortMergeOperatorInfo(SOperatorInfo** downStreams, pInfo->bufPageSize = getProperSortPageSize(rowSize); - uint32_t numOfSources = taosArrayGetSize(pSortInfo); - numOfSources = TMAX(4, numOfSources); - - pInfo->sortBufSize = numOfSources * pInfo->bufPageSize; + // one additional is reserved for merged result. + pInfo->sortBufSize = pInfo->bufPageSize * (numStreams + 1); pOperator->fpSet = createOperatorFpSet(doOpenMultiwaySortMergeOperator, doMultiwaySortMerge, NULL, NULL, diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index d7ae823522..d827067f1c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2425,12 +2425,15 @@ int32_t initSessionAggSupporter(SStreamAggSupporter* pSup, const char* pKey, Sql return initStreamAggSupporter(pSup, pKey, pCtx, numOfOutput, sizeof(SResultWindowInfo)); } -SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResBlock, int64_t gap, int32_t tsSlotId, - STimeWindowAggSupp* pTwAggSupp, SExecTaskInfo* pTaskInfo) { - int32_t code = TSDB_CODE_OUT_OF_MEMORY; +SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { + SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; + int32_t numOfCols = 0; + SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); + SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; + int32_t code = TSDB_CODE_OUT_OF_MEMORY; SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { goto _error; } @@ -2453,12 +2456,14 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SEx } initDummyFunction(pInfo->pDummyCtx, pInfo->binfo.pCtx, numOfCols); - pInfo->twAggSup = *pTwAggSupp; + pInfo->twAggSup = (STimeWindowAggSupp) {.waterMark = pSessionNode->window.watermark, + .calTrigger = pSessionNode->window.triggerType, + .maxTs = INT64_MIN}; initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window); pInfo->primaryTsIndex = tsSlotId; - pInfo->gap = gap; + pInfo->gap = pSessionNode->gap; pInfo->binfo.pRes = pResBlock; pInfo->order = TSDB_ORDER_ASC; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); @@ -2960,25 +2965,20 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } -SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, - int32_t numOfCols, SSDataBlock* pResBlock, int64_t gap, - int32_t tsSlotId, STimeWindowAggSupp* pTwAggSupp, - SExecTaskInfo* pTaskInfo) { - int32_t code = TSDB_CODE_OUT_OF_MEMORY; - SStreamSessionAggOperatorInfo* pInfo = NULL; - SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pExprInfo, numOfCols, pResBlock, gap, - tsSlotId, pTwAggSupp, pTaskInfo); +SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, + SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild) { + int32_t code = TSDB_CODE_OUT_OF_MEMORY; + SOperatorInfo* pOperator = createStreamSessionAggOperatorInfo(downstream, pPhyNode, pTaskInfo); if (pOperator == NULL) { goto _error; } pOperator->name = "StreamFinalSessionWindowAggOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION; - int32_t numOfChild = 1; // Todo(liuyao) get it from phy plan - pInfo = pOperator->info; + SStreamSessionAggOperatorInfo* pInfo = pOperator->info; pInfo->pChildren = taosArrayInit(8, sizeof(void*)); for (int32_t i = 0; i < numOfChild; i++) { SOperatorInfo* pChild = - createStreamSessionAggOperatorInfo(NULL, pExprInfo, numOfCols, NULL, gap, tsSlotId, pTwAggSupp, pTaskInfo); + createStreamSessionAggOperatorInfo(NULL, pPhyNode, pTaskInfo); if (pChild == NULL) { goto _error; } @@ -2988,7 +2988,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream _error: if (pInfo != NULL) { - destroyStreamSessionAggOperatorInfo(pInfo, numOfCols); + destroyStreamSessionAggOperatorInfo(pInfo, pOperator->numOfExprs); } taosMemoryFreeClear(pInfo); diff --git a/source/libs/executor/src/tsort.c b/source/libs/executor/src/tsort.c index 1502387360..00e07c7199 100644 --- a/source/libs/executor/src/tsort.c +++ b/source/libs/executor/src/tsort.c @@ -195,6 +195,11 @@ static int32_t doAddToBuf(SSDataBlock* pDataBlock, SSortHandle* pHandle) { return doAddNewExternalMemSource(pHandle->pBuf, pHandle->pOrderedSource, pBlock, &pHandle->sourceId); } +static void setCurrentSourceIsDone(SSortSource* pSource, SSortHandle* pHandle) { + pSource->src.rowIndex = -1; + ++pHandle->numOfCompletedSources; +} + static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int32_t startIndex, int32_t endIndex, SSortHandle* pHandle) { cmpParam->pSources = taosArrayGet(pSources, startIndex); cmpParam->numOfSources = (endIndex - startIndex + 1); @@ -205,8 +210,10 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int for (int32_t i = 0; i < cmpParam->numOfSources; ++i) { SSortSource* pSource = cmpParam->pSources[i]; + // set current source is done if (taosArrayGetSize(pSource->pageIdList) == 0) { - return TSDB_CODE_SUCCESS; + setCurrentSourceIsDone(pSource, pHandle); + continue; } SPageInfo* pPgInfo = *(SPageInfo**)taosArrayGet(pSource->pageIdList, pSource->pageIndex); @@ -233,10 +240,9 @@ static int32_t sortComparInit(SMsortComparParam* cmpParam, SArray* pSources, int SSortSource* pSource = cmpParam->pSources[i]; pSource->src.pBlock = pHandle->fetchfp(pSource->param); - // set current source id done + // set current source is done if (pSource->src.pBlock == NULL) { - pSource->src.rowIndex = -1; - ++pHandle->numOfCompletedSources; + setCurrentSourceIsDone(pSource, pHandle); } } } diff --git a/source/libs/nodes/src/nodesCloneFuncs.c b/source/libs/nodes/src/nodesCloneFuncs.c index 2e30b01357..d0eb1548c1 100644 --- a/source/libs/nodes/src/nodesCloneFuncs.c +++ b/source/libs/nodes/src/nodesCloneFuncs.c @@ -427,7 +427,7 @@ static SNode* logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* pD COPY_SCALAR_FIELD(triggerType); COPY_SCALAR_FIELD(watermark); COPY_SCALAR_FIELD(filesFactor); - COPY_SCALAR_FIELD(intervalAlgo); + COPY_SCALAR_FIELD(windowAlgo); return (SNode*)pDst; } @@ -538,6 +538,12 @@ static SNode* physiIntervalCopy(const SIntervalPhysiNode* pSrc, SIntervalPhysiNo return (SNode*)pDst; } +static SNode* physiSessionCopy(const SSessionWinodwPhysiNode* pSrc, SSessionWinodwPhysiNode* pDst) { + COPY_BASE_OBJECT_FIELD(window, physiWindowCopy); + COPY_SCALAR_FIELD(gap); + return (SNode*)pDst; +} + static SNode* dataBlockDescCopy(const SDataBlockDescNode* pSrc, SDataBlockDescNode* pDst) { COPY_SCALAR_FIELD(dataBlockId); CLONE_NODE_LIST_FIELD(pSlots); @@ -678,6 +684,9 @@ SNode* nodesCloneNode(const SNode* pNode) { case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL: return physiIntervalCopy((const SIntervalPhysiNode*)pNode, (SIntervalPhysiNode*)pDst); + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: + return physiSessionCopy((const SSessionWinodwPhysiNode*)pNode, (SSessionWinodwPhysiNode*)pDst); default: break; } diff --git a/source/libs/nodes/src/nodesCodeFuncs.c b/source/libs/nodes/src/nodesCodeFuncs.c index fb6a428d3c..ff433660be 100644 --- a/source/libs/nodes/src/nodesCodeFuncs.c +++ b/source/libs/nodes/src/nodesCodeFuncs.c @@ -246,6 +246,10 @@ const char* nodesNodeName(ENodeType type) { return "PhysiSessionWindow"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: return "PhysiStreamSessionWindow"; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: + return "PhysiStreamSemiSessionWindow"; + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: + return "PhysiStreamFinalSessionWindow"; case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: return "PhysiStateWindow"; case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: @@ -3998,6 +4002,8 @@ static int32_t specificNodeToJson(const void* pObj, SJson* pJson) { return physiFillNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: return physiSessionWindowNodeToJson(pObj, pJson); case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: @@ -4131,6 +4137,8 @@ static int32_t jsonToSpecificNode(const SJson* pJson, void* pObj) { return jsonToPhysiFillNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: return jsonToPhysiSessionWindowNode(pJson, pObj); case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: diff --git a/source/libs/nodes/src/nodesUtilFuncs.c b/source/libs/nodes/src/nodesUtilFuncs.c index 6d04e7a959..18a5b8664f 100644 --- a/source/libs/nodes/src/nodesUtilFuncs.c +++ b/source/libs/nodes/src/nodesUtilFuncs.c @@ -289,6 +289,10 @@ SNode* nodesMakeNode(ENodeType type) { return makeNode(type, sizeof(SSessionWinodwPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: return makeNode(type, sizeof(SStreamSessionWinodwPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: + return makeNode(type, sizeof(SStreamSemiSessionWinodwPhysiNode)); + case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: + return makeNode(type, sizeof(SStreamFinalSessionWinodwPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE: return makeNode(type, sizeof(SStateWinodwPhysiNode)); case QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE: @@ -806,6 +810,7 @@ void nodesDestroyNode(SNode* pNode) { } case QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION: + case QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION: case QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION: destroyWinodwPhysiNode((SWinodwPhysiNode*)pNode); break; diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index 68f9e9d36d..9758da7899 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -285,7 +285,7 @@ class MockCatalogServiceImpl { } void createSmaIndex(const SMCreateSmaReq* pReq) { - STableIndexInfo info; + STableIndexInfo info = {0}; info.intervalUnit = pReq->intervalUnit; info.slidingUnit = pReq->slidingUnit; info.interval = pReq->interval; diff --git a/source/libs/planner/src/planLogicCreater.c b/source/libs/planner/src/planLogicCreater.c index cbc74a2711..3cd5eeb655 100644 --- a/source/libs/planner/src/planLogicCreater.c +++ b/source/libs/planner/src/planLogicCreater.c @@ -548,6 +548,7 @@ static int32_t createWindowLogicNodeBySession(SLogicPlanContext* pCxt, SSessionW pWindow->winType = WINDOW_TYPE_SESSION; pWindow->sessionGap = ((SValueNode*)pSession->pGap)->datum.i; + pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? SESSION_ALGO_STREAM_SINGLE : SESSION_ALGO_MERGE; pWindow->pTspk = nodesCloneNode((SNode*)pSession->pCol); if (NULL == pWindow->pTspk) { @@ -572,7 +573,7 @@ static int32_t createWindowLogicNodeByInterval(SLogicPlanContext* pCxt, SInterva pWindow->sliding = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->datum.i : pWindow->interval); pWindow->slidingUnit = (NULL != pInterval->pSliding ? ((SValueNode*)pInterval->pSliding)->unit : pWindow->intervalUnit); - pWindow->intervalAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH; + pWindow->windowAlgo = pCxt->pPlanCxt->streamQuery ? INTERVAL_ALGO_STREAM_SINGLE : INTERVAL_ALGO_HASH; pWindow->pTspk = nodesCloneNode(pInterval->pCol); if (NULL == pWindow->pTspk) { diff --git a/source/libs/planner/src/planPhysiCreater.c b/source/libs/planner/src/planPhysiCreater.c index 2974b3ef8c..99476c4cbd 100644 --- a/source/libs/planner/src/planPhysiCreater.c +++ b/source/libs/planner/src/planPhysiCreater.c @@ -977,8 +977,8 @@ static int32_t createWindowPhysiNodeFinalize(SPhysiPlanContext* pCxt, SNodeList* return code; } -static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) { - switch (intervalAlgo) { +static ENodeType getIntervalOperatorType(EWindowAlgorithm windowAlgo) { + switch (windowAlgo) { case INTERVAL_ALGO_HASH: return QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL; case INTERVAL_ALGO_MERGE: @@ -989,6 +989,14 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) { return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL; case INTERVAL_ALGO_STREAM_SINGLE: return QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL; + case SESSION_ALGO_STREAM_FINAL: + return QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION; + case SESSION_ALGO_STREAM_SEMI: + return QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION; + case SESSION_ALGO_STREAM_SINGLE: + return QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; + case SESSION_ALGO_MERGE: + return QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION; default: break; } @@ -998,7 +1006,7 @@ static ENodeType getIntervalOperatorType(EIntervalAlgorithm intervalAlgo) { static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { SIntervalPhysiNode* pInterval = (SIntervalPhysiNode*)makePhysiNode( - pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->intervalAlgo)); + pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo)); if (NULL == pInterval) { return TSDB_CODE_OUT_OF_MEMORY; } @@ -1015,8 +1023,7 @@ static int32_t createIntervalPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChil static int32_t createSessionWindowPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SWindowLogicNode* pWindowLogicNode, SPhysiNode** pPhyNode) { SSessionWinodwPhysiNode* pSession = (SSessionWinodwPhysiNode*)makePhysiNode( - pCxt, (SLogicNode*)pWindowLogicNode, - (pCxt->pPlanCxt->streamQuery ? QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION : QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION)); + pCxt, (SLogicNode*)pWindowLogicNode, getIntervalOperatorType(pWindowLogicNode->windowAlgo)); if (NULL == pSession) { return TSDB_CODE_OUT_OF_MEMORY; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 3a9e1d7405..fe02dd6425 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -376,8 +376,8 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo SLogicNode* pPartWindow = NULL; int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); if (TSDB_CODE_SUCCESS == code) { - ((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH; - ((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE; + ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_HASH; + ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_MERGE; SNodeList* pMergeKeys = NULL; code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys); if (TSDB_CODE_SUCCESS == code) { @@ -400,8 +400,8 @@ static int32_t stbSplSplitIntervalForStream(SSplitContext* pCxt, SStableSplitInf SLogicNode* pPartWindow = NULL; int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); if (TSDB_CODE_SUCCESS == code) { - ((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_STREAM_SEMI; - ((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_STREAM_FINAL; + ((SWindowLogicNode*)pPartWindow)->windowAlgo = INTERVAL_ALGO_STREAM_SEMI; + ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = INTERVAL_ALGO_STREAM_FINAL; code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow); } if (TSDB_CODE_SUCCESS == code) { @@ -421,8 +421,29 @@ static int32_t stbSplSplitInterval(SSplitContext* pCxt, SStableSplitInfo* pInfo) } } +static int32_t stbSplSplitSessionForStream(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pPartWindow = NULL; + int32_t code = stbSplCreatePartWindowNode((SWindowLogicNode*)pInfo->pSplitNode, &pPartWindow); + if (TSDB_CODE_SUCCESS == code) { + ((SWindowLogicNode*)pPartWindow)->windowAlgo = SESSION_ALGO_STREAM_SEMI; + ((SWindowLogicNode*)pInfo->pSplitNode)->windowAlgo = SESSION_ALGO_STREAM_FINAL; + code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartWindow); + } + if (TSDB_CODE_SUCCESS == code) { + code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, + (SNode*)splCreateScanSubplan(pCxt, pPartWindow, SPLIT_FLAG_STABLE_SPLIT)); + } + pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE; + ++(pCxt->groupId); + return code; +} + static int32_t stbSplSplitSession(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - return TSDB_CODE_PLAN_INTERNAL_ERROR; + if (pCxt->pPlanCxt->streamQuery) { + return stbSplSplitSessionForStream(pCxt, pInfo); + } else { + return TSDB_CODE_PLAN_INTERNAL_ERROR; + } } static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) { @@ -537,10 +558,12 @@ static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, SNode* pNode = NULL; FOREACH(pNode, pSortKeys) { SOrderByExprNode* pSortKey = (SOrderByExprNode*)pNode; + SExprNode* pSortExpr = (SExprNode*)pSortKey->pExpr; SNode* pTarget = NULL; bool found = false; FOREACH(pTarget, pTargets) { - if (0 == strcmp(((SExprNode*)pSortKey->pExpr)->aliasName, ((SColumnNode*)pTarget)->colName)) { + if ((QUERY_NODE_COLUMN == nodeType(pSortExpr) && nodesEqualNode((SNode*)pSortExpr, pTarget)) || + (0 == strcmp(pSortExpr->aliasName, ((SColumnNode*)pTarget)->colName))) { code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pTarget)); if (TSDB_CODE_SUCCESS != code) { break; @@ -549,7 +572,7 @@ static int32_t stbSplCreateMergeKeys(SNodeList* pSortKeys, SNodeList* pTargets, } } if (TSDB_CODE_SUCCESS == code && !found) { - SNode* pCol = stbSplCreateColumnNode((SExprNode*)pSortKey->pExpr); + SNode* pCol = stbSplCreateColumnNode(pSortExpr); code = nodesListMakeStrictAppend(&pMergeKeys, stbSplCreateOrderByExpr(pSortKey, pCol)); if (TSDB_CODE_SUCCESS == code) { code = nodesListStrictAppend(pTargets, pCol); diff --git a/source/libs/planner/test/planOrderByTest.cpp b/source/libs/planner/test/planOrderByTest.cpp index b065ad0a59..2ca662cf86 100644 --- a/source/libs/planner/test/planOrderByTest.cpp +++ b/source/libs/planner/test/planOrderByTest.cpp @@ -28,6 +28,8 @@ TEST_F(PlanOrderByTest, basic) { // ORDER BY key is not in the projection list run("SELECT c1 FROM t1 ORDER BY c2"); + run("SELECT c1 AS a FROM t1 ORDER BY a"); + run("SELECT c1 + 10 AS a FROM t1 ORDER BY a"); } @@ -59,4 +61,6 @@ TEST_F(PlanOrderByTest, stable) { run("SELECT c2 FROM st1 ORDER BY c1"); run("SELECT c2 FROM st1 PARTITION BY c2 ORDER BY c1"); + + run("SELECT c1 AS a FROM st1 ORDER BY a"); } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 7b2f79e24c..a8cb3d0e69 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -713,7 +713,7 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) { // delete confict entries code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin); ASSERT(code == 0); - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu log truncate, from %ld to %ld", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, delBegin, delEnd); logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore); @@ -1062,7 +1062,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ths->commitIndex = snapshot.lastApplyIndex; sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, commitBegin, commitEnd); } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index f934f9a268..eabd14bf28 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -190,7 +190,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (gRaftDetailLog) { char* s = snapshotSender2Str(pSender); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu " @@ -201,7 +201,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries taosMemoryFree(s); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d start sender first time, " "lastApplyIndex:%ld " "lastApplyTerm:%lu lastConfigIndex:%ld privateTerm:%lu", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, host, port, diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index d010728c78..d563e5aacc 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -56,7 +56,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SyncIndex commitEnd = snapshot.lastApplyIndex; pSyncNode->commitIndex = snapshot.lastApplyIndex; - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by snapshot from index:%ld to index:%ld", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, pSyncNode->commitIndex, snapshot.lastApplyIndex); } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3e5dcfea42..2b06673a62 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -414,7 +414,7 @@ int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { assert(rid == pSyncNode->rid); sMeta->lastConfigIndex = pSyncNode->pRaftCfg->lastConfigIndex; - sTrace("sync get snapshot meta: lastConfigIndex:%ld", pSyncNode->pRaftCfg->lastConfigIndex); + sTrace("vgId:%d, get snapshot meta, lastConfigIndex:%" PRId64, pSyncNode->vgId, pSyncNode->pRaftCfg->lastConfigIndex); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return 0; @@ -437,7 +437,8 @@ int32_t syncGetSnapshotMetaByIndex(int64_t rid, SyncIndex snapshotIndex, struct } } sMeta->lastConfigIndex = lastIndex; - sTrace("sync get snapshot meta by index:%ld lastConfigIndex:%ld", snapshotIndex, sMeta->lastConfigIndex); + sTrace("vgId:%d, get snapshot meta by index:%" PRId64 " lastConfigIndex:%" PRId64, pSyncNode->vgId, snapshotIndex, + sMeta->lastConfigIndex); taosReleaseRef(tsNodeRefId, pSyncNode->rid); return 0; @@ -598,7 +599,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { return -1; } assert(rid == pSyncNode->rid); - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType); ret = syncNodePropose(pSyncNode, pMsg, isWeak); @@ -609,7 +610,7 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu propose msgType:%s,%d", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pMsg->msgType), pMsg->msgType); @@ -857,7 +858,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { // snapshot meta // pSyncNode->sMeta.lastConfigIndex = -1; - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu sync open", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm); return pSyncNode; @@ -905,7 +906,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) { } void syncNodeClose(SSyncNode* pSyncNode) { - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu sync close", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm); int32_t ret; @@ -1294,7 +1295,7 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) { int len = 256; char* s = (char*)taosMemoryMalloc(len); snprintf(s, len, - "syncNode: vgId:%d currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, " + "syncNode: vgId:%d, currentTerm:%lu, commitIndex:%ld, state:%d %s, isStandBy:%d, " "electTimerLogicClock:%lu, " "electTimerLogicClockUser:%lu, " "electTimerMS:%d, replicaNum:%d", @@ -1345,7 +1346,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA]; for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { oldSenders[i] = (pSyncNode->senders)[i]; - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu", + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu save senders %d, %p, privateTerm:%lu", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, i, oldSenders[i], oldSenders[i]->privateTerm); if (gRaftDetailLog) { @@ -1400,7 +1401,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex uint16_t port; syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, " "privateTerm:%lu", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, (pSyncNode->replicasId)[i].addr, i, host, port, oldSenders[j], @@ -1413,7 +1414,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex int32_t oldreplicaIndex = (pSyncNode->senders)[i]->replicaIndex; (pSyncNode->senders)[i]->replicaIndex = i; sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, " "reset:%d", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, oldreplicaIndex, i, host, port, (pSyncNode->senders)[i], reset); @@ -1426,7 +1427,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex if ((pSyncNode->senders)[i] == NULL) { (pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, (pSyncNode->senders)[i], i, (pSyncNode->senders)[i]->privateTerm); } @@ -1436,7 +1437,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* pNewConfig, SyncIndex for (int i = 0; i < TSDB_MAX_REPLICA; ++i) { if (oldSenders[i] != NULL) { snapshotSenderDestroy(oldSenders[i]); - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d", + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu delete old sender %p replicaIndex:%d", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, oldSenders[i], i); oldSenders[i] = NULL; @@ -1509,7 +1510,7 @@ void syncNodeUpdateTerm(SSyncNode* pSyncNode, SyncTerm term) { void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, " "restoreFinish:%d, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, @@ -1551,7 +1552,7 @@ void syncNodeBecomeLeader(SSyncNode* pSyncNode, const char* debugStr) { pSyncNode->restoreFinish = false; sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, " "restoreFinish:%d, %s", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, pSyncNode->pRaftCfg->isStandBy, pSyncNode->replicaNum, @@ -1857,7 +1858,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { if (pSyncNode->FpEqMsg != NULL) { int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { - sError("vgId:%d sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); + sError("vgId:%d, sync enqueue ping msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); syncTimeoutDestroy(pSyncMsg); return; @@ -1891,7 +1892,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { if (pSyncNode->FpEqMsg != NULL) { int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { - sError("vgId:%d sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); + sError("vgId:%d, sync enqueue elect msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); syncTimeoutDestroy(pSyncMsg); return; @@ -1929,7 +1930,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { if (pSyncNode->FpEqMsg != NULL) { int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg); if (code != 0) { - sError("vgId:%d sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); + sError("vgId:%d, sync enqueue timer msg error, code:%d", pSyncNode->vgId, code); rpcFreeCont(rpcMsg.pCont); syncTimeoutDestroy(pSyncMsg); return; @@ -2129,12 +2130,12 @@ const char* syncStr(ESyncState state) { static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* pEntry) { SyncLeaderTransfer* pSyncLeaderTransfer = syncLeaderTransferFromRpcMsg2(pRpcMsg); - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu begin leader transfer", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm); if (strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && pSyncLeaderTransfer->newNodeInfo.nodePort == ths->myNodeInfo.nodePort) { - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu maybe leader transfer to %s:%d %lu", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, pSyncLeaderTransfer->newNodeInfo.nodeFqdn, pSyncLeaderTransfer->newNodeInfo.nodePort, pSyncLeaderTransfer->newLeaderId.addr); @@ -2275,7 +2276,7 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t code = 0; ESyncState state = flag; - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu commit by wal from index:%" PRId64 " to index:%" PRId64 ", %s", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, beginIndex, endIndex, syncUtilState2String(state)); @@ -2327,7 +2328,7 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, ths->pFsm->FpRestoreFinishCb(ths->pFsm); } ths->restoreFinish = true; - sDebug("vgId:%d sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, + sDebug("vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu restore finish, %s, index:%ld", ths->vgId, syncUtilState2String(ths->state), ths->commitIndex, ths->pRaftStore->currentTerm, syncUtilState2String(ths->state), pEntry->index); } diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 87dfef5fdd..8fff7ae6de 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -164,7 +164,7 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr walFsync(pWal, true); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu write index:%ld, isStandBy:%d, msgType:%s,%d, " "originalRpcType:%s,%d", pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, @@ -325,7 +325,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { walFsync(pWal, true); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu old write index:%ld, isStandBy:%d, msgType:%s,%d, " "originalRpcType:%s,%d", pData->pSyncNode->vgId, syncUtilState2String(pData->pSyncNode->state), pData->pSyncNode->commitIndex, pData->pSyncNode->pRaftStore->currentTerm, pEntry->index, pData->pSyncNode->pRaftCfg->isStandBy, diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index 551c2611b0..831e286d76 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -47,7 +47,7 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) { SSyncNode *pSyncNode = pObj->data; sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, keyCode, pStub->rpcMsg.info.handle, pStub->rpcMsg.info.ahandle); @@ -74,7 +74,7 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) { SSyncNode *pSyncNode = pObj->data; sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p " "ahandle:%p", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, @@ -96,7 +96,7 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu SSyncNode *pSyncNode = pObj->data; sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p " "ahandle:%p", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, pSyncNode->pRaftStore->currentTerm, TMSG_INFO(pStub->rpcMsg.msgType), pStub->rpcMsg.msgType, index, diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 4973d36295..7c8abfe494 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -141,7 +141,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " @@ -153,7 +153,7 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", @@ -287,7 +287,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu send " @@ -299,7 +299,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", @@ -310,7 +310,7 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) { } } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d " "lastApplyIndex:%ld " "lastApplyTerm:%lu " "lastConfigIndex:%ld privateTerm:%lu", @@ -349,7 +349,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " "privateTerm:%lu send " "msg:%s", pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, @@ -358,7 +358,7 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d " "privateTerm:%lu", pSender->pSyncNode->vgId, syncUtilState2String(pSender->pSyncNode->state), pSender->pSyncNode->commitIndex, pSender->pSyncNode->pRaftStore->currentTerm, host, port, pSender->seq, pSender->ack, pSender->privateTerm); @@ -594,7 +594,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", @@ -604,7 +604,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d begin ack:%d, " "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld privateTerm:%lu", @@ -648,7 +648,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { bool isDrop; if (IamInNew) { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu update config by snapshot, lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld ", pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, @@ -656,7 +656,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { syncNodeUpdateConfig(pSyncNode, &newSyncCfg, pMsg->lastConfigIndex, &isDrop); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu do not update config by snapshot, I am not in " "newCfg, " "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld ", @@ -694,7 +694,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " "index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s", @@ -704,7 +704,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { taosMemoryFree(logSimpleStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d finish, update log begin " "index:%ld, " "snapshot.lastApplyIndex:%ld, " "snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu", @@ -721,7 +721,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, @@ -730,7 +730,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d end ack:%d, " "lastIndex:%ld, lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", pReceiver->pSyncNode->vgId, syncUtilState2String(pSyncNode->state), pSyncNode->commitIndex, @@ -750,7 +750,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv " @@ -761,7 +761,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d force close ack:%d, " "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", @@ -787,7 +787,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { if (gRaftDetailLog) { char *msgStr = syncSnapshotSend2Str(pMsg); sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s", @@ -797,7 +797,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) { taosMemoryFree(msgStr); } else { sDebug( - "vgId:%d sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " + "vgId:%d, sync event %s commitIndex:%ld currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, " "lastIndex:%ld, " "lastTerm:%lu, " "lastConfigIndex:%ld, privateTerm:%lu", diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 4a0d9e2866..4d61e7036d 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -946,8 +946,7 @@ int32_t taosGetFqdn(char *fqdn) { #endif // __APPLE__ int32_t ret = getaddrinfo(hostname, NULL, &hints, &result); if (!result) { - // printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); - assert(0); + fprintf(stderr,"failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); return -1; } diff --git a/source/util/src/tenv.c b/source/util/src/tenv.c index e717e82c5b..4fc0542816 100644 --- a/source/util/src/tenv.c +++ b/source/util/src/tenv.c @@ -72,6 +72,9 @@ int32_t taosEnvToCfg(const char *envStr, char *cfgStr) { if (cfgNameLen > 0) { memcpy(cfgStr, buf, cfgNameLen); memset(&cfgStr[cfgNameLen], ' ', p - cfgStr - cfgNameLen + 1); + } else { + *cfgStr = '\0'; + return -1; } } return strlen(cfgStr); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index e41b704156..b70f5e5223 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -578,9 +578,14 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSMA_NO_INDEX_IN_CACHE, "No tsma index in ca TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_ENV, "Invalid rsma env") TAOS_DEFINE_ERROR(TSDB_CODE_RSMA_INVALID_STAT, "Invalid rsma state") +//tq +TAOS_DEFINE_ERROR(TSDB_CODE_TQ_NO_COMMITTED_OFFSET, "No committed offset") + TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Index is rebuilding") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") + #ifdef TAOS_ERROR_C }; #endif diff --git a/tests/script/sh/copy_udf.bat b/tests/script/sh/copy_udf.bat new file mode 100644 index 0000000000..5144cf8d25 --- /dev/null +++ b/tests/script/sh/copy_udf.bat @@ -0,0 +1,23 @@ +@echo off + +echo Executing copy_udf.bat +set SCRIPT_DIR=%cd% +echo SCRIPT_DIR: %SCRIPT_DIR% + +cd ..\..\.. +set TAOS_DIR=%cd% +echo find udf library in %TAOS_DIR% +set UDF1_DIR=%TAOS_DIR%\debug\build\lib\udf1.dll +set UDF2_DIR=%TAOS_DIR%\debug\build\lib\udf2.dll + +echo %UDF1_DIR% +echo %UDF2_DIR% + +set UDF_TMP=C:\Windows\Temp\udf +rm -rf %UDF_TMP% +mkdir %UDF_TMP% + +echo Copy udf shared library files to %UDF_TMP% + +cp %UDF1_DIR% %UDF_TMP% +cp %UDF2_DIR% %UDF_TMP% diff --git a/tests/script/tsim/query/udf.sim b/tests/script/tsim/query/udf.sim index 93cae4e391..d9821a0495 100644 --- a/tests/script/tsim/query/udf.sim +++ b/tests/script/tsim/query/udf.sim @@ -19,8 +19,14 @@ sql show databases; sql create table t (ts timestamp, f int); sql insert into t values(now, 1)(now+1s, 2); -sql create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8; -sql create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8; +system_content printf %OS% +if $system_content == Windows_NT then + sql create function udf1 as 'C:\\Windows\\Temp\\udf1.dll' outputtype int bufSize 8; + sql create aggregate function udf2 as 'C:\\Windows\\Temp\\udf2.dll' outputtype double bufSize 8; +else + sql create function udf1 as '/tmp/udf/libudf1.so' outputtype int bufSize 8; + sql create aggregate function udf2 as '/tmp/udf/libudf2.so' outputtype double bufSize 8; +endi sql show functions; if $rows != 2 then return -1 diff --git a/tests/system-test/7-tmq/subscribeDb1.py b/tests/system-test/7-tmq/subscribeDb1.py index ed92a429ae..c08c7d3dae 100644 --- a/tests/system-test/7-tmq/subscribeDb1.py +++ b/tests/system-test/7-tmq/subscribeDb1.py @@ -442,7 +442,7 @@ class TDTestCase: showRow = 1 self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) - time.sleep(3) + time.sleep(6) tdLog.info("pkill consume processor") if (platform.system().lower() == 'windows'): os.system("TASKKILL /F /IM tmq_sim.exe") diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index 5a972e071a..02fd3c1396 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -353,8 +353,8 @@ tmq_list_t* build_topic_list() { void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { static const int MIN_COMMIT_COUNT = 1000; - int msg_count = 0; - tmq_resp_err_t err; + int msg_count = 0; + int32_t err; if ((err = tmq_subscribe(tmq, topics))) { fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); @@ -379,7 +379,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { } void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLogSize) { - tmq_resp_err_t err; + int32_t err; if ((err = tmq_subscribe(tmq, topics))) { fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 8455bd9890..0f78a003d6 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -62,7 +62,7 @@ typedef struct { tmq_t* tmq; tmq_list_t* topicList; - + int32_t numOfVgroups; int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume int64_t ts; @@ -74,7 +74,7 @@ typedef struct { char cdbName[32]; char dbName[32]; int32_t showMsgFlag; - int32_t showRowFlag; + int32_t showRowFlag; int32_t saveRowFlag; int32_t consumeDelay; // unit s int32_t numOfThread; @@ -108,26 +108,20 @@ static void printHelp() { } char* getCurrentTimeString(char* timeString) { - time_t tTime = taosGetTimestampSec(); + time_t tTime = taosGetTimestampSec(); struct tm tm = *taosLocalTime(&tTime, NULL); - sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", - tm.tm_year + 1900, - tm.tm_mon + 1, - tm.tm_mday, - tm.tm_hour, - tm.tm_min, - tm.tm_sec); + sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d", tm.tm_year + 1900, tm.tm_mon + 1, tm.tm_mday, tm.tm_hour, + tm.tm_min, tm.tm_sec); return timeString; } - void initLogFile() { char filename[256]; - char tmpString[128]; + char tmpString[128]; - sprintf(filename,"%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString)); - //sprintf(filename, "%s/../log/tmqlog.txt", configDir); + sprintf(filename, "%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString)); + // sprintf(filename, "%s/../log/tmqlog.txt", configDir); #ifdef WINDOWS for (int i = 2; i < sizeof(filename); i++) { if (filename[i] == ':') filename[i] = '-'; @@ -249,17 +243,18 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) { for (i = 0; i < pInfo->numOfVgroups; i++) { if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) { pInfo->rowsOfPerVgroups[i][1] += rows; - return; - } + return; + } } pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][0] = vgroupId; pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows; pInfo->numOfVgroups++; - + taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId); if (pInfo->numOfVgroups > MAX_VGROUP_CNT) { - taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId, pInfo->numOfVgroups, vgroupId); + taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId, + pInfo->numOfVgroups, vgroupId); taosCloseFile(&g_fp); exit(-1); } @@ -277,7 +272,8 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) { TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); assert(pConn != NULL); - sprintf(sqlStr, "insert into %s.content_%d values (%"PRId64", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId, pInfo->ts++, buf); + sprintf(sqlStr, "insert into %s.content_%d values (%" PRId64 ", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId, + pInfo->ts++, buf); TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { pError("error in insert consume result, reason:%s\n", taos_errstr(pRes)); @@ -295,12 +291,13 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) { static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) { char buf[1024]; int32_t totalRows = 0; - + // printf("topic: %s\n", tmq_get_topic_name(msg)); int32_t vgroupId = tmq_get_vgroup_id(msg); - + taosFprintfFile(g_fp, "msg index:%" PRId64 ", consumerId: %d\n", msgIndex, pInfo->consumerId); - //taosFprintfFile(g_fp, "topic: %s, vgroupId: %d, tableName: %s\n", tmq_get_topic_name(msg), vgroupId, tmq_get_table_name(msg)); + // taosFprintfFile(g_fp, "topic: %s, vgroupId: %d, tableName: %s\n", tmq_get_topic_name(msg), vgroupId, + // tmq_get_table_name(msg)); taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), vgroupId); while (1) { @@ -316,9 +313,9 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) const char* tbName = tmq_get_table_name(msg); if (0 != g_stConfInfo.showRowFlag) { - taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName:"null table"), totalRows, buf); - if (0 != g_stConfInfo.saveRowFlag) { - saveConsumeContentToTbl(pInfo, buf); + taosFprintfFile(g_fp, "tbname:%s, rows[%d]: %s\n", (tbName != NULL ? tbName : "null table"), totalRows, buf); + if (0 != g_stConfInfo.saveRowFlag) { + saveConsumeContentToTbl(pInfo, buf); } } @@ -326,7 +323,7 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) } addRowsToVgroupId(pInfo, vgroupId, totalRows); - + return totalRows; } @@ -342,8 +339,8 @@ int queryDB(TAOS* taos, char* command) { return 0; } -static void tmq_commit_cb_print(tmq_t* tmq, tmq_resp_err_t resp, tmq_topic_vgroup_list_t* offsets, void* param) { - pError("tmq_commit_cb_print() commit %d\n", resp); +static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + pError("tmq_commit_cb_print() commit %d\n", code); } void build_consumer(SThreadInfo* pInfo) { @@ -401,16 +398,11 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { int64_t now = taosGetTimestampMs(); // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int - sprintf(sqlStr, "insert into %s.consumeresult values (%"PRId64", %d, %" PRId64 ", %" PRId64 ", %d)", - g_stConfInfo.cdbName, - now, - pInfo->consumerId, - pInfo->consumeMsgCnt, - pInfo->consumeRowCnt, - pInfo->checkresult); + sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)", + g_stConfInfo.cdbName, now, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult); char tmpString[128]; - taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId ,sqlStr); + taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { @@ -421,7 +413,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { taos_free_result(pRes); - #if 0 +#if 0 // vgroups for (i = 0; i < pInfo->numOfVgroups; i++) { // schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int @@ -445,19 +437,20 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { taos_free_result(pRes); } - #endif +#endif return 0; } void loop_consume(SThreadInfo* pInfo) { - tmq_resp_err_t err; + int32_t code; int64_t totalMsgs = 0; int64_t totalRows = 0; char tmpString[128]; - taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId); + taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), + pInfo->consumerId); pInfo->ts = taosGetTimestampMs(); @@ -473,7 +466,7 @@ void loop_consume(SThreadInfo* pInfo) { totalMsgs++; if (totalRows >= pInfo->expectMsgCnt) { - char tmpString[128]; + char tmpString[128]; taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString)); break; } @@ -503,8 +496,8 @@ void* consumeThreadFunc(void* param) { return NULL; } - tmq_resp_err_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); - if (err) { + int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); + if (err != 0) { pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); exit(-1); } @@ -519,17 +512,19 @@ void* consumeThreadFunc(void* param) { pPrint("tmq_commit() manual commit when consume end.\n"); /*tmq_commit(pInfo->tmq, NULL, 0);*/ tmq_commit_sync(pInfo->tmq, NULL); + taosFprintfFile(g_fp, "tmq_commit() manual commit over.\n"); + pPrint("tmq_commit() manual commit over.\n"); } err = tmq_unsubscribe(pInfo->tmq); - if (err) { + if (err != 0) { pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); /*pInfo->consumeMsgCnt = -1;*/ /*return NULL;*/ } err = tmq_consumer_close(pInfo->tmq); - if (err) { + if (err != 0) { pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); /*exit(-1);*/ } diff --git a/tests/tsim/src/simExe.c b/tests/tsim/src/simExe.c index dbc3c2c460..f97a13d2c5 100644 --- a/tests/tsim/src/simExe.c +++ b/tests/tsim/src/simExe.c @@ -458,11 +458,17 @@ bool simExecuteSystemContentCmd(SScript *script, char *option) { char buf[4096] = {0}; char buf1[4096 + 512] = {0}; char filename[400] = {0}; - sprintf(filename, "%s/%s.tmp", simScriptDir, script->fileName); + sprintf(filename, "%s" TD_DIRSEP "%s.tmp", simScriptDir, script->fileName); +#ifdef WINDOWS + sprintf(buf, "cd %s && ", simScriptDir); + simVisuallizeOption(script, option, buf + strlen(buf)); + sprintf(buf1, "%s > %s 2>nul", buf, filename); +#else sprintf(buf, "cd %s; ", simScriptDir); simVisuallizeOption(script, option, buf + strlen(buf)); sprintf(buf1, "%s > %s 2>/dev/null", buf, filename); +#endif sprintf(script->system_exit_code, "%d", system(buf1)); simStoreSystemContentResult(script, filename); diff --git a/tests/tsim/src/simParse.c b/tests/tsim/src/simParse.c index 638c4a1ccb..5b6dda4dae 100644 --- a/tests/tsim/src/simParse.c +++ b/tests/tsim/src/simParse.c @@ -206,7 +206,7 @@ SScript *simParseScript(char *fileName) { for (int32_t i = 0; i < cmdlen; ++i) { if (buffer[i] == '\r' || buffer[i] == '\n') { - buffer[i] = ' '; + buffer[i] = '\0'; } }