diff --git a/include/client/taos.h b/include/client/taos.h index 6afbcee6f1..72cb7bfa96 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -93,12 +93,12 @@ typedef struct taosField { typedef void (*__taos_async_fn_t)(void *param, TAOS_RES *, int code); typedef struct TAOS_BIND_v2 { - int buffer_type; - void *buffer; - int32_t buffer_length; - int32_t *length; - char *is_null; - int num; + int buffer_type; + void *buffer; + int32_t buffer_length; + int32_t *length; + char *is_null; + int num; } TAOS_BIND_v2; typedef enum { @@ -128,35 +128,35 @@ DLL_EXPORT void taos_close(TAOS *taos); const char *taos_data_type(int type); -DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); -DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); -DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND_v2 *tags); -DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); -DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); +DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos); +DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length); +DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_BIND_v2 *tags); +DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name); +DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name); -DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); -DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); -DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); -DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind); -DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind); -DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int colIdx); -DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); -DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); -DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); -DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert); +DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums); +DLL_EXPORT int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes); +DLL_EXPORT int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_BIND_v2 *bind); +DLL_EXPORT int taos_stmt_bind_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind); +DLL_EXPORT int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_BIND_v2 *bind, int colIdx); +DLL_EXPORT int taos_stmt_add_batch(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_execute(TAOS_STMT *stmt); +DLL_EXPORT TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_close(TAOS_STMT *stmt); +DLL_EXPORT char *taos_stmt_errstr(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_affected_rows(TAOS_STMT *stmt); +DLL_EXPORT int taos_stmt_affected_rows_once(TAOS_STMT *stmt); -DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); -DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); +DLL_EXPORT TAOS_RES *taos_query(TAOS *taos, const char *sql); +DLL_EXPORT TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen); -DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); -DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result -DLL_EXPORT void taos_free_result(TAOS_RES *res); -DLL_EXPORT int taos_field_count(TAOS_RES *res); -DLL_EXPORT int taos_num_fields(TAOS_RES *res); -DLL_EXPORT int taos_affected_rows(TAOS_RES *res); +DLL_EXPORT TAOS_ROW taos_fetch_row(TAOS_RES *res); +DLL_EXPORT int taos_result_precision(TAOS_RES *res); // get the time precision of result +DLL_EXPORT void taos_free_result(TAOS_RES *res); +DLL_EXPORT int taos_field_count(TAOS_RES *res); +DLL_EXPORT int taos_num_fields(TAOS_RES *res); +DLL_EXPORT int taos_affected_rows(TAOS_RES *res); DLL_EXPORT TAOS_FIELD *taos_fetch_fields(TAOS_RES *res); DLL_EXPORT int taos_select_db(TAOS *taos, const char *db); @@ -234,7 +234,7 @@ DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errst DLL_EXPORT const char *tmq_err2str(tmq_resp_err_t); /* ------------------------TMQ CONSUMER INTERFACE------------------------ */ -DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, tmq_list_t *topic_list); +DLL_EXPORT tmq_resp_err_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t *tmq); DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t *tmq, tmq_list_t **topics); DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t blocking_time); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 25d9fcdf85..3bde0b1f03 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1333,7 +1333,7 @@ typedef struct { typedef struct { int64_t consumerId; char cgroup[TSDB_CGROUP_LEN]; - SArray* topicNames; // SArray + SArray* topicNames; // SArray } SCMSubscribeReq; static FORCE_INLINE int32_t tSerializeSCMSubscribeReq(void** buf, const SCMSubscribeReq* pReq) { diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 691d491eb0..97ee66a2da 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -145,7 +145,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) - TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) + TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-mq-ask-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp) TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-mq-tmr", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_CONSUMER_LOST, "mnode-mq-consumer-lost", SMTimerReq, SMTimerReq) TD_DEF_MSG_TYPE(TDMT_MND_MQ_DO_REBALANCE, "mnode-mq-do-rebalance", SMqDoRebalanceMsg, SMqDoRebalanceMsg) diff --git a/include/util/ttimer.h b/include/util/ttimer.h index f2ee825c4e..1022259631 100644 --- a/include/util/ttimer.h +++ b/include/util/ttimer.h @@ -41,10 +41,6 @@ bool taosTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void *param, void *han void taosTmrCleanUp(void *handle); -int32_t taosInitTimer(void (*callback)(int32_t), int32_t ms); - -void taosUninitTimer(); - #ifdef __cplusplus } #endif diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 988406c84d..f9540bc8fc 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -23,6 +23,9 @@ #include "tmsgtype.h" #include "tqueue.h" #include "tref.h" +#include "ttimer.h" + +int32_t tmqAskEp(tmq_t* tmq, bool sync); typedef struct { int8_t tmqRspType; @@ -61,29 +64,40 @@ struct tmq_conf_t { tmq_commit_cb* commit_cb; }; +typedef struct { + int8_t inited; + tmr_h timer; +} SMqMgmt; + +static SMqMgmt tmqMgmt = {0}; + struct tmq_t { // conf - char groupId[TSDB_CGROUP_LEN]; - char clientId[256]; - int8_t autoCommit; - /*int8_t inWaiting;*/ + char groupId[TSDB_CGROUP_LEN]; + char clientId[256]; + int8_t autoCommit; int64_t consumerId; - int32_t epoch; int32_t resetOffsetCfg; - int64_t status; - STscObj* pTscObj; tmq_commit_cb* commit_cb; - /*int32_t nextTopicIdx;*/ + + // status + int8_t status; int8_t epStatus; + int32_t epoch; int32_t epSkipCnt; - /*int32_t waitingRequest;*/ - /*int32_t readyRequest;*/ - SArray* clientTopics; // SArray - STaosQueue* mqueue; // queue of tmq_message_t - STaosQall* qall; - tsem_t rspSem; - // stat int64_t pollCnt; + + // connection + STscObj* pTscObj; + + // container + SArray* clientTopics; // SArray + STaosQueue* mqueue; // queue of rsp + STaosQall* qall; + STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit + + // ctl + tsem_t rspSem; }; enum { @@ -93,6 +107,7 @@ enum { enum { TMQ_CONSUMER_STATUS__INIT = 0, + TMQ_CONSUMER_STATUS__SUBSCRIBED, TMQ_CONSUMER_STATUS__READY, }; @@ -110,13 +125,11 @@ typedef struct { typedef struct { // subscribe info - int32_t sqlLen; - char* sql; - char* topicName; - int64_t topicId; - SArray* vgs; // SArray + char* topicName; + + SArray* vgs; // SArray + int8_t isSchemaAdaptive; - int32_t numOfFields; SSchemaWrapper schema; } SMqClientTopic; @@ -156,7 +169,6 @@ typedef struct { int32_t async; tsem_t rspSem; tmq_resp_err_t rspErr; - /*SMqClientVg* pVg;*/ } SMqCommitCbParam; tmq_conf_t* tmq_conf_new() { @@ -251,13 +263,7 @@ int32_t tmq_list_append(tmq_list_t* list, const char* src) { void tmq_list_destroy(tmq_list_t* list) { SArray* container = &list->container; - /*taosArrayDestroy(container);*/ - int32_t sz = taosArrayGetSize(container); - for (int32_t i = 0; i < sz; i++) { - char* str = taosArrayGetP(container, i); - taosMemoryFree(str); - } - taosArrayDestroy(container); + taosArrayDestroyP(container, taosMemoryFree); } int32_t tmq_list_get_size(const tmq_list_t* list) { @@ -298,6 +304,8 @@ void tmqClearUnhandleMsg(tmq_t* tmq) { int32_t tmqSubscribeCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqSubscribeCbParam* pParam = (SMqSubscribeCbParam*)param; pParam->rspErr = code; + tmq_t* tmq = pParam->tmq; + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__SUBSCRIBED); tsem_post(&pParam->rspSem); return 0; } @@ -335,12 +343,9 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs return NULL; } pTmq->pTscObj = (STscObj*)conn; - /*pTmq->inWaiting = 0;*/ pTmq->status = 0; pTmq->pollCnt = 0; pTmq->epoch = 0; - /*pTmq->waitingRequest = 0;*/ - /*pTmq->readyRequest = 0;*/ pTmq->epStatus = 0; pTmq->epSkipCnt = 0; // set conf @@ -367,26 +372,45 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs #endif tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { + // init timer + int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1); + if (inited == 0) { + tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ"); + if (tmqMgmt.timer == NULL) { + atomic_store_8(&tmqMgmt.inited, 0); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + } + tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t)); if (pTmq == NULL) { return NULL; } + const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user; const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass; ASSERT(user); ASSERT(pass); - /*ASSERT(conf->db);*/ ASSERT(conf->groupId[0]); - pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, conf->db, conf->port, CONN_TYPE__TMQ); - if (pTmq->pTscObj == NULL) return NULL; + pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); + pTmq->mqueue = taosOpenQueue(); + pTmq->qall = taosAllocateQall(); + pTmq->delayedTask = taosOpenQueue(); - pTmq->status = 0; + if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) { + goto FAIL; + } + + // init status + pTmq->status = TMQ_CONSUMER_STATUS__INIT; pTmq->pollCnt = 0; pTmq->epoch = 0; pTmq->epStatus = 0; pTmq->epSkipCnt = 0; + // set conf strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->groupId, conf->groupId); @@ -394,19 +418,30 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->commit_cb = conf->commit_cb; pTmq->resetOffsetCfg = conf->resetOffset; + // assign consumerId pTmq->consumerId = tGenIdPI64(); - pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); - if (pTmq->clientTopics == NULL) { - taosMemoryFree(pTmq); - return NULL; + + // init semaphore + if (tsem_init(&pTmq->rspSem, 0, 0) != 0) { + goto FAIL; } - pTmq->mqueue = taosOpenQueue(); - pTmq->qall = taosAllocateQall(); - - tsem_init(&pTmq->rspSem, 0, 0); + // init connection + pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ); + if (pTmq->pTscObj == NULL) { + tsem_destroy(&pTmq->rspSem); + goto FAIL; + } return pTmq; + +FAIL: + if (pTmq->clientTopics) taosArrayDestroy(pTmq->clientTopics); + if (pTmq->mqueue) taosCloseQueue(pTmq->mqueue); + if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask); + if (pTmq->qall) taosFreeQall(pTmq->qall); + taosMemoryFree(pTmq); + return NULL; } tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) { @@ -497,86 +532,64 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in return resp; } -tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { - SRequestObj* pRequest = NULL; - SArray* container = &topic_list->container; - int32_t sz = taosArrayGetSize(container); - // destroy ex - taosArrayDestroy(tmq->clientTopics); - tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic)); +tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { + const SArray* container = &topic_list->container; + int32_t sz = taosArrayGetSize(container); + void* buf = NULL; + SCMSubscribeReq req = {0}; + int32_t code = -1; - SCMSubscribeReq req; req.consumerId = tmq->consumerId; - strcpy(req.cgroup, tmq->groupId); + tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); + if (req.topicNames == NULL) goto FAIL; - for (int i = 0; i < sz; i++) { - /*char* topicName = topic_list->elems[i];*/ - char* topicName = taosArrayGetP(container, i); + for (int32_t i = 0; i < sz; i++) { + char* topic = taosArrayGetP(container, i); SName name = {0}; -#if 0 - char* dbName = getDbOfConnection(tmq->pTscObj); - if (dbName == NULL) { - return TMQ_RESP_ERR__FAIL; - } -#endif - tNameSetDbName(&name, tmq->pTscObj->acctId, topicName, strlen(topicName)); -#if 0 - tNameFromString(&name, topicName, T_NAME_TABLE); -#endif + tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic)); - char* topicFname = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); - if (topicFname == NULL) { - goto _return; + char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); + if (topicFName == NULL) { + goto FAIL; } - tNameExtractFullName(&name, topicFname); - tscDebug("subscribe topic: %s", topicFname); - SMqClientTopic topic = { - .sql = NULL, - .sqlLen = 0, - .topicId = 0, - .topicName = topicFname, - .vgs = NULL, - }; - topic.vgs = taosArrayInit(0, sizeof(SMqClientVg)); - taosArrayPush(tmq->clientTopics, &topic); - taosArrayPush(req.topicNames, &topicFname); -#if 0 - taosMemoryFree(dbName); -#endif + tNameExtractFullName(&name, topicFName); + + tscDebug("subscribe topic: %s", topicFName); + + taosArrayPush(req.topicNames, &topicFName); } - int tlen = tSerializeSCMSubscribeReq(NULL, &req); - void* buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - goto _return; - } + int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req); + buf = taosMemoryMalloc(tlen); + if (buf == NULL) goto FAIL; void* abuf = buf; tSerializeSCMSubscribeReq(&abuf, &req); - /*printf("formatted: %s\n", dagStr);*/ - pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE); - if (pRequest == NULL) { - tscError("failed to malloc request"); - } + SMsgSendInfo* sendInfo = taosMemoryMalloc(sizeof(SMsgSendInfo)); + if (sendInfo == NULL) goto FAIL; SMqSubscribeCbParam param = { .rspErr = TMQ_RESP_ERR__SUCCESS, .tmq = tmq, }; - tsem_init(¶m.rspSem, 0, 0); - pRequest->body.requestMsg = (SDataBuf){ + if (tsem_init(¶m.rspSem, 0, 0) != 0) goto FAIL; + + sendInfo->msgInfo = (SDataBuf){ .pData = buf, .len = tlen, .handle = NULL, }; - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); + sendInfo->requestId = generateRequestId(); + sendInfo->requestObjRefId = 0; sendInfo->param = ¶m; sendInfo->fp = tmqSubscribeCb; + sendInfo->msgType = TDMT_MND_SUBSCRIBE; + SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; @@ -585,15 +598,28 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { tsem_wait(¶m.rspSem); tsem_destroy(¶m.rspSem); -_return: - /*if (sendInfo != NULL) {*/ - /*destroySendMsgInfo(sendInfo);*/ - /*}*/ + code = param.rspErr; + if (code != 0) goto FAIL; - return param.rspErr; + // TODO: add max retry cnt + while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, true)) { + tscDebug("not ready, retry\n"); + taosMsleep(500); + } + + code = 0; +FAIL: + if (req.topicNames != NULL) taosArrayDestroyP(req.topicNames, taosMemoryFree); + if (code != 0) { + taosMemoryFree(buf); + } + return code; } -void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { conf->commit_cb = cb; } +void tmq_conf_set_offset_commit_cb(tmq_conf_t* conf, tmq_commit_cb* cb) { + // + conf->commit_cb = cb; +} TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbName, const char* sql) { STscObj* pTscObj = (STscObj*)taos; @@ -627,9 +653,6 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa int32_t code = 0; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); CHECK_CODE_GOTO(parseSql(pRequest, false, &pQueryNode, NULL), _return); - - // todo check for invalid sql statement and return with error code - CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return); /*printf("%s\n", pStr);*/ @@ -653,7 +676,6 @@ TAOS_RES* tmq_create_stream(TAOS* taos, const char* streamName, const char* tbNa } tSerializeSCMCreateStreamReq(buf, tlen, &req); - /*printf("formatted: %s\n", dagStr);*/ pRequest->body.requestMsg = (SDataBuf){ .pData = buf, @@ -684,94 +706,6 @@ _return: return pRequest; } -#if 0 -TAOS_RES* tmq_create_topic(TAOS* taos, const char* topicName, const char* sql, int sqlLen) { - STscObj* pTscObj = (STscObj*)taos; - SRequestObj* pRequest = NULL; - SQuery* pQueryNode = NULL; - char* astStr = NULL; - - terrno = TSDB_CODE_SUCCESS; - if (taos == NULL || topicName == NULL || sql == NULL) { - tscError("invalid parameters for creating topic, connObj:%p, topic name:%s, sql:%s", taos, topicName, sql); - terrno = TSDB_CODE_TSC_INVALID_INPUT; - goto _return; - } - - if (strlen(topicName) >= TSDB_TOPIC_NAME_LEN) { - tscError("topic name too long, max length:%d", TSDB_TOPIC_NAME_LEN - 1); - terrno = TSDB_CODE_TSC_INVALID_INPUT; - goto _return; - } - - if (sqlLen > TSDB_MAX_ALLOWED_SQL_LEN) { - tscError("sql string exceeds max length:%d", TSDB_MAX_ALLOWED_SQL_LEN); - terrno = TSDB_CODE_TSC_EXCEED_SQL_LIMIT; - goto _return; - } - - tscDebug("start to create topic: %s", topicName); - - int32_t code = TSDB_CODE_SUCCESS; - CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); - CHECK_CODE_GOTO(parseSql(pRequest, true, &pQueryNode), _return); - - // todo check for invalid sql statement and return with error code - - CHECK_CODE_GOTO(nodesNodeToString(pQueryNode->pRoot, false, &astStr, NULL), _return); - - /*printf("%s\n", pStr);*/ - - SName name = {.acctId = pTscObj->acctId, .type = TSDB_TABLE_NAME_T}; - strcpy(name.dbname, pRequest->pDb); - strcpy(name.tname, topicName); - - SCMCreateTopicReq req = { - .igExists = 1, - .ast = astStr, - .sql = (char*)sql, - }; - tNameExtractFullName(&name, req.name); - - int tlen = tSerializeSCMCreateTopicReq(NULL, 0, &req); - void* buf = taosMemoryMalloc(tlen); - if (buf == NULL) { - goto _return; - } - - tSerializeSCMCreateTopicReq(buf, tlen, &req); - /*printf("formatted: %s\n", dagStr);*/ - - pRequest->body.requestMsg = (SDataBuf){ - .pData = buf, - .len = tlen, - .handle = NULL, - }; - pRequest->type = TDMT_MND_CREATE_TOPIC; - - SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); - SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); - - int64_t transporterId = 0; - asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); - - tsem_wait(&pRequest->body.rspSem); - -_return: - taosMemoryFreeClear(astStr); - qDestroyQuery(pQueryNode); - /*if (sendInfo != NULL) {*/ - /*destroySendMsgInfo(sendInfo);*/ - /*}*/ - - if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { - pRequest->code = terrno; - } - - return pRequest; -} -#endif - #if 0 int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { if (tmq_message == NULL) return 0; @@ -954,7 +888,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) { /*printf("rsp epoch %ld sz %ld\n", rsp.epoch, rsp.topics->size);*/ /*printf("tmq epoch %ld sz %ld\n", tmq->epoch, tmq->clientTopics->size);*/ if (tmqUpdateEp(tmq, head->epoch, &rsp)) { - atomic_store_64(&tmq->status, TMQ_CONSUMER_STATUS__READY); + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); } tDeleteSMqCMGetSubEpRsp(&rsp); } else { @@ -1189,7 +1123,6 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { int64_t transporterId = 0; /*printf("send poll\n");*/ - /*atomic_add_fetch_32(&tmq->waitingRequest, 1);*/ tscDebug("consumer %ld send poll to %s : vg %d, epoch %d, req offset %ld, reqId %lu", tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, pVg->currentOffset, pReq->reqId); /*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/ @@ -1268,12 +1201,14 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { SMqRspObj* rspObj; int64_t startTime = taosGetTimestampMs(); - // TODO: put into another thread or delayed queue - int64_t status = atomic_load_64(&tmq->status); - while (0 != tmqAskEp(tmq, status == TMQ_CONSUMER_STATUS__INIT)) { + // TODO: put into delayed queue +#if 0 + int8_t status = atomic_load_8(&tmq->status); + while (0 != tmqAskEp(tmq, status != TMQ_CONSUMER_STATUS__READY)) { tscDebug("not ready, retry\n"); taosSsleep(1); } +#endif rspObj = tmqHandleAllRsp(tmq, blocking_time, false); if (rspObj) { @@ -1281,7 +1216,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) { } while (1) { - /*printf("cycle\n");*/ tmqAskEp(tmq, false); tmqPollImpl(tmq, blocking_time); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 4cc1b8527c..7fc263c93c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -202,6 +202,17 @@ int32_t vmProcessCreateVnodeReq(SVnodesMgmt *pMgmt, SNodeMsg *pMsg) { return code; } + code = vnodeStart(pImpl); + if (code != 0) { + tFreeSCreateVnodeReq(&createReq); + dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr()); + vnodeClose(pImpl); + vnodeDestroy(path, pMgmt->pTfs); + terrno = code; + return code; + } + + code = vmWriteVnodesToFile(pMgmt); if (code != 0) { tFreeSCreateVnodeReq(&createReq); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 687a799c57..3088c5dea4 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -74,12 +74,6 @@ int32_t vmOpenVnode(SVnodesMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { return -1; } - // sync integration - vnodeSyncSetQ(pImpl, NULL); - vnodeSyncSetRpc(pImpl, NULL); - int32_t ret = vnodeSyncStart(pImpl); - assert(ret == 0); - taosWLockLatch(&pMgmt->latch); int32_t code = taosHashPut(pMgmt->hash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); taosWUnLockLatch(&pMgmt->latch); @@ -153,6 +147,7 @@ static void *vmOpenVnodeFunc(void *param) { pThread->failed++; } else { vmOpenVnode(pMgmt, pCfg, pImpl); + //vnodeStart(pImpl); dDebug("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->opened++; } @@ -364,10 +359,52 @@ static int32_t vmRequire(SMgmtWrapper *pWrapper, bool *required) { return 0; } +static int32_t vmStart(SMgmtWrapper *pWrapper) { + dDebug("vnode-mgmt start to run"); + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + + taosRLockLatch(&pMgmt->latch); + + void *pIter = taosHashIterate(pMgmt->hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (ppVnode == NULL || *ppVnode == NULL) continue; + + SVnodeObj *pVnode = *ppVnode; + vnodeStart(pVnode->pImpl); + pIter = taosHashIterate(pMgmt->hash, pIter); + } + + taosRUnLockLatch(&pMgmt->latch); + return 0; +} + +static void vmStop(SMgmtWrapper *pWrapper) { +#if 0 + dDebug("vnode-mgmt start to stop"); + SVnodesMgmt *pMgmt = pWrapper->pMgmt; + taosRLockLatch(&pMgmt->latch); + + void *pIter = taosHashIterate(pMgmt->hash, NULL); + while (pIter) { + SVnodeObj **ppVnode = pIter; + if (ppVnode == NULL || *ppVnode == NULL) continue; + + SVnodeObj *pVnode = *ppVnode; + vnodeStop(pVnode->pImpl); + pIter = taosHashIterate(pMgmt->hash, pIter); + } + + taosRUnLockLatch(&pMgmt->latch); +#endif +} + void vmSetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; mgmtFp.openFp = vmInit; mgmtFp.closeFp = vmCleanup; + mgmtFp.startFp = vmStart; + mgmtFp.stopFp = vmStop; mgmtFp.requiredFp = vmRequire; vmInitMsgHandle(pWrapper); @@ -396,4 +433,4 @@ void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo) { } taosRUnLockLatch(&pMgmt->latch); -} \ No newline at end of file +} diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index f4975c183e..9eab4d3376 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -61,6 +61,9 @@ int32_t vnodeSync(SVnode *pVnode); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); +int32_t vnodeStart(SVnode *pVnode); +void vnodeStop(SVnode *pVnode); + int64_t vnodeGetSyncHandle(SVnode *pVnode); void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot); @@ -171,11 +174,6 @@ typedef struct { uint64_t uid; } STableKeyInfo; -// sync integration -void vnodeSyncSetQ(SVnode *pVnode, void *qHandle); -void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle); -int32_t vnodeSyncStart(SVnode *pVnode); - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 2b10afa5ed..11136bc7ba 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -14,6 +14,7 @@ */ #include "vnodeInt.h" +#include "vnodeSync.h" int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { SVnodeInfo info = {0}; @@ -171,6 +172,16 @@ void vnodeClose(SVnode *pVnode) { } } +// start the sync timer after the queue is ready +int32_t vnodeStart(SVnode *pVnode) { + vnodeSyncSetQ(pVnode, NULL); + vnodeSyncSetRpc(pVnode, NULL); + vnodeSyncStart(pVnode); + return 0; +} + +void vnodeStop(SVnode *pVnode) {} + int64_t vnodeGetSyncHandle(SVnode *pVnode) { return pVnode->sync; } void vnodeGetSnapshot(SVnode *pVnode, SSnapshot *pSnapshot) { pSnapshot->lastApplyIndex = pVnode->state.committed; } \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a3068d69a9..262c9d5530 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -457,6 +457,20 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { void syncNodeStart(SSyncNode* pSyncNode) { // start raft + if (pSyncNode->replicaNum == 1) { + syncNodeBecomeLeader(pSyncNode); + + syncNodeLog2("==state change become leader immediately==", pSyncNode); + + // Raft 3.6.2 Committing entries from previous terms + + // use this now + syncNodeAppendNoop(pSyncNode); + syncMaybeAdvanceCommitIndex(pSyncNode); // maybe only one replica + return; + } + + syncNodeBecomeFollower(pSyncNode); // for test diff --git a/source/libs/sync/test/test.sh b/source/libs/sync/test/test.sh new file mode 100644 index 0000000000..7a693aac0b --- /dev/null +++ b/source/libs/sync/test/test.sh @@ -0,0 +1,3 @@ +#!/bin/bash + + diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 50c924bbb9..399a2255ac 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -403,6 +403,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, "Invalid msg checksum" TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length") TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader") +TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error") + // wal TAOS_DEFINE_ERROR(TSDB_CODE_WAL_APP_ERROR, "Unexpected generic error in wal") TAOS_DEFINE_ERROR(TSDB_CODE_WAL_FILE_CORRUPTED, "WAL file is corrupted") diff --git a/tools/shell/src/shellCommand.c b/tools/shell/src/shellCommand.c index c34ee7a22c..fcdbf2d020 100644 --- a/tools/shell/src/shellCommand.c +++ b/tools/shell/src/shellCommand.c @@ -395,7 +395,7 @@ void shellClearScreen(int32_t ecmd_pos, int32_t cursor_pos) { void shellShowOnScreen(SShellCmd *cmd) { struct winsize w; if (ioctl(0, TIOCGWINSZ, &w) < 0 || w.ws_col == 0 || w.ws_row == 0) { - fprintf(stderr, "No stream device\n"); + // fprintf(stderr, "No stream device\n"); w.ws_col = 120; w.ws_row = 30; } diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index ac2be3c3cf..1f036ab25b 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -750,7 +750,7 @@ void shellReadHistory() { void shellWriteHistory() { SShellHistory *pHistory = &shell.history; - TdFilePtr pFile = taosOpenFile(pHistory->file, TD_FILE_WRITE | TD_FILE_STREAM); + TdFilePtr pFile = taosOpenFile(pHistory->file, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_STREAM | TD_FILE_APPEND); if (pFile == NULL) return; for (int32_t i = pHistory->hstart; i != pHistory->hend;) {