diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 1cdd4c02da..fc34915fe7 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -21,17 +21,17 @@ #include "taos.h" static int running = 1; -static char dbName[64] = "tmqdb"; -static char stbName[64] = "stb"; +static char dbName[64] = "tmqdb"; +static char stbName[64] = "stb"; static char topicName[64] = "topicname"; static int32_t msg_process(TAOS_RES* msg) { - char buf[1024]; + char buf[1024]; int32_t rows = 0; const char* topicName = tmq_get_topic_name(msg); - const char* dbName = tmq_get_db_name(msg); - int32_t vgroupId = tmq_get_vgroup_id(msg); + const char* dbName = tmq_get_db_name(msg); + int32_t vgroupId = tmq_get_vgroup_id(msg); printf("topic: %s\n", topicName); printf("db: %s\n", dbName); @@ -41,14 +41,14 @@ static int32_t msg_process(TAOS_RES* msg) { TAOS_ROW row = taos_fetch_row(msg); if (row == NULL) break; - TAOS_FIELD* fields = taos_fetch_fields(msg); + TAOS_FIELD* fields = taos_fetch_fields(msg); int32_t numOfFields = taos_field_count(msg); - int32_t* length = taos_fetch_lengths(msg); - int32_t precision = taos_result_precision(msg); - const char* tbName = tmq_get_table_name(msg); - rows++; + int32_t* length = taos_fetch_lengths(msg); + int32_t precision = taos_result_precision(msg); + const char* tbName = tmq_get_table_name(msg); + rows++; taos_print_row(buf, row, fields, numOfFields); - printf("row content from %s: %s\n", (tbName != NULL ? tbName : "null table"), buf); + printf("row content from %s: %s\n", (tbName != NULL ? tbName : "table null"), buf); } return rows; @@ -80,7 +80,8 @@ static int32_t init_env() { // create super table printf("create super table\n"); - pRes = taos_query(pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))"); + pRes = taos_query( + pConn, "create table tmqdb.stb (ts timestamp, c1 int, c2 float, c3 varchar(16)) tags(t1 int, t3 varchar(16))"); if (taos_errno(pRes) != 0) { printf("failed to create super table stb, reason:%s\n", taos_errstr(pRes)); return -1; @@ -166,7 +167,6 @@ int32_t create_topic() { } taos_free_result(pRes); - // pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); pRes = taos_query(pConn, "create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topicname, reason:%s\n", taos_errstr(pRes)); @@ -184,26 +184,28 @@ void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { tmq_t* build_consumer() { tmq_conf_res_t code; - tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_t* conf = tmq_conf_new(); code = tmq_conf_set(conf, "enable.auto.commit", "true"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "group.id", "cgrpName"); if (TMQ_CONF_OK != code) return NULL; + code = tmq_conf_set(conf, "client.id", "user defined name"); + if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "td.connect.user", "root"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "td.connect.pass", "taosdata"); if (TMQ_CONF_OK != code) return NULL; - code = tmq_conf_set(conf, "auto.offset.reset", "earliest"); + code = tmq_conf_set(conf, "auto.offset.reset", "earliest"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "experimental.snapshot.enable", "true"); if (TMQ_CONF_OK != code) return NULL; code = tmq_conf_set(conf, "msg.with.table.name", "true"); if (TMQ_CONF_OK != code) return NULL; - - tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); - + + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); tmq_conf_destroy(conf); return tmq; @@ -211,7 +213,7 @@ tmq_t* build_consumer() { tmq_list_t* build_topic_list() { tmq_list_t* topicList = tmq_list_new(); - int32_t code = tmq_list_append(topicList, "topicname"); + int32_t code = tmq_list_append(topicList, "topicname"); if (code) { return NULL; } @@ -228,18 +230,18 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) { int32_t totalRows = 0; int32_t msgCnt = 0; - int32_t consumeDelay = 5000; + int32_t timeout = 5000; while (running) { - TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, consumeDelay); + TAOS_RES* tmqmsg = tmq_consumer_poll(tmq, timeout); if (tmqmsg) { msgCnt++; totalRows += msg_process(tmqmsg); taos_free_result(tmqmsg); - } else { - break; - } + /*} else {*/ + /*break;*/ + } } - + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); } @@ -256,32 +258,30 @@ int main(int argc, char* argv[]) { tmq_t* tmq = build_consumer(); if (NULL == tmq) { - fprintf(stderr, "%% build_consumer() fail!\n"); + fprintf(stderr, "%% build_consumer() fail!\n"); return -1; } tmq_list_t* topic_list = build_topic_list(); if (NULL == topic_list) { return -1; - } - + } + basic_consume_loop(tmq, topic_list); code = tmq_unsubscribe(tmq); if (code) { fprintf(stderr, "%% Failed to unsubscribe: %s\n", tmq_err2str(code)); - } - else { + } else { fprintf(stderr, "%% unsubscribe\n"); } code = tmq_consumer_close(tmq); if (code) { fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); - } - else { + } else { fprintf(stderr, "%% Consumer closed\n"); } - + return 0; } diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 9355d76dcb..32dc9e1866 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -118,7 +118,7 @@ typedef struct { int64_t sourceVer; int64_t reqId; - SArray* blocks; // SArray + SArray* blocks; // SArray } SStreamDataBlock; typedef struct { diff --git a/include/util/taoserror.h b/include/util/taoserror.h index eab6e5561f..ce73dceec2 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -619,6 +619,7 @@ int32_t* taosGetErrno(); //tmq #define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000) +#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001) #ifdef __cplusplus } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 24997a735c..112fb3e855 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -122,6 +122,7 @@ enum { TMQ_CONSUMER_STATUS__INIT = 0, TMQ_CONSUMER_STATUS__READY, TMQ_CONSUMER_STATUS__NO_TOPIC, + TMQ_CONSUMER_STATUS__RECOVER, }; enum { @@ -134,10 +135,8 @@ typedef struct { // statistics int64_t pollCnt; // offset - /*int64_t committedOffset;*/ - /*int64_t currentOffset;*/ - STqOffsetVal committedOffsetNew; - STqOffsetVal currentOffsetNew; + STqOffsetVal committedOffset; + STqOffsetVal currentOffset; // connection info int32_t vgId; int32_t vgStatus; @@ -152,7 +151,6 @@ typedef struct { SArray* vgs; // SArray - int8_t isSchemaAdaptive; SSchemaWrapper schema; } SMqClientTopic; @@ -190,10 +188,9 @@ typedef struct { } SMqPollCbParam; typedef struct { - tmq_t* tmq; - int8_t automatic; - int8_t async; - /*int8_t freeOffsets;*/ + tmq_t* tmq; + int8_t automatic; + int8_t async; int32_t waitingRspNum; int32_t totalRspNum; int32_t rspErr; @@ -418,7 +415,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - pOffset->val = pVg->currentOffsetNew; + pOffset->val = pVg->currentOffset; int32_t groupLen = strlen(tmq->groupId); memcpy(pOffset->subKey, tmq->groupId, groupLen); @@ -462,7 +459,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT pVg->vgId, pOffset->val.version); // TODO: put into cb - pVg->committedOffsetNew = pVg->currentOffsetNew; + pVg->committedOffset = pVg->currentOffset; pMsgSendInfo->requestId = generateRequestId(); pMsgSendInfo->requestObjRefId = 0; @@ -504,7 +501,6 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm pParamSet->tmq = tmq; pParamSet->automatic = 0; pParamSet->async = async; - /*pParamSet->freeOffsets = 1;*/ pParamSet->userCb = userCb; pParamSet->userParam = userParam; tsem_init(&pParamSet->rspSem, 0, 0); @@ -518,7 +514,7 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (pVg->vgId != vgId) continue; - if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) { + if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { goto FAIL; } @@ -550,8 +546,8 @@ FAIL: return 0; } -int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, - void* userParam) { +int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb, + void* userParam) { int32_t code = -1; if (msg != NULL) { @@ -566,7 +562,6 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_ pParamSet->tmq = tmq; pParamSet->automatic = automatic; pParamSet->async = async; - /*pParamSet->freeOffsets = 1;*/ pParamSet->userCb = userCb; pParamSet->userParam = userParam; tsem_init(&pParamSet->rspSem, 0, 0); @@ -583,7 +578,9 @@ int32_t tmqCommitInner2(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_ tscDebug("consumer:%" PRId64 ", begin commit for topic %s, vgId:%d", tmq->consumerId, pTopic->topicName, pVg->vgId); - if (pVg->currentOffsetNew.type > 0 && !tOffsetEqual(&pVg->currentOffsetNew, &pVg->committedOffsetNew)) { + if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) { + tscDebug("consumer: %ld, vg:%d, current %ld, committed %ld", tmq->consumerId, pVg->vgId, + pVg->currentOffset.version, pVg->committedOffset.version); if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) { continue; } @@ -699,7 +696,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* tmq) { tmqAskEp(tmq, true); taosTmrReset(tmqAssignAskEpTask, 1000, tmq, tmqMgmt.timer, &tmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { - tmqCommitInner2(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam); + tmqCommitInner(tmq, NULL, 1, 1, tmq->commitCb, tmq->commitCbUserParam); taosTmrReset(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer, &tmq->commitTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { } else { @@ -888,12 +885,6 @@ FAIL: return NULL; } -#if 0 -int32_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, int32_t async) { - return tmqCommitInner2(tmq, offsets, 0, async, tmq->commitCb, tmq->commitCbUserParam); -} -#endif - int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); @@ -967,7 +958,11 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { code = param.rspErr; if (code != 0) goto FAIL; + int32_t retryCnt = 0; while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { + if (retryCnt++ > 10) { + goto FAIL; + } tscDebug("consumer not ready, retry"); taosMsleep(500); } @@ -1006,8 +1001,12 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t epoch = pParam->epoch; taosMemoryFree(pParam); if (code != 0) { - tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code); - if (pMsg->pData) taosMemoryFreeClear(pMsg->pData); + tscWarn("msg discard from vgId:%d, epoch %d, since %s", vgId, epoch, terrstr()); + if (pMsg->pData) taosMemoryFree(pMsg->pData); + if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); + goto CREATE_MSG_FAIL; + } if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); if (pRspWrapper == NULL) { @@ -1083,7 +1082,7 @@ CREATE_MSG_FAIL: return -1; } -bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { +bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { bool set = false; int32_t topicNumGet = taosArrayGetSize(pRsp->topics); @@ -1112,10 +1111,10 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); char buf[80]; - tFormatOffset(buf, 80, &pVgCur->currentOffsetNew); + tFormatOffset(buf, 80, &pVgCur->currentOffset); tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal)); + taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(STqOffsetVal)); } } } @@ -1142,7 +1141,7 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { SMqClientVg clientVg = { .pollCnt = 0, - .currentOffsetNew = offsetNew, + .currentOffset = offsetNew, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet, .vgStatus = TMQ_VG_STATUS__IDLE, @@ -1166,93 +1165,6 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { return set; } -#if 0 -bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { - /*printf("call update ep %d\n", epoch);*/ - bool set = false; - int32_t topicNumGet = taosArrayGetSize(pRsp->topics); - char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; - tscDebug("consumer:%" PRId64 ", update ep epoch %d to epoch %d, topic num: %d", tmq->consumerId, tmq->epoch, epoch, - topicNumGet); - SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); - if (newTopics == NULL) { - return false; - } - SHashObj* pHash = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); - if (pHash == NULL) { - taosArrayDestroy(newTopics); - return false; - } - - // find topic, build hash - for (int32_t i = 0; i < topicNumGet; i++) { - SMqClientTopic topic = {0}; - SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); - topic.schema = pTopicEp->schema; - taosHashClear(pHash); - topic.topicName = strdup(pTopicEp->topic); - tstrncpy(topic.db, pTopicEp->db, TSDB_DB_FNAME_LEN); - - tscDebug("consumer:%" PRId64 ", update topic: %s", tmq->consumerId, topic.topicName); - int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); - for (int32_t j = 0; j < topicNumCur; j++) { - // find old topic - SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j); - if (pTopicCur->vgs && strcmp(pTopicCur->topicName, pTopicEp->topic) == 0) { - int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); - tscDebug("consumer:%" PRId64 ", new vg num: %d", tmq->consumerId, vgNumCur); - if (vgNumCur == 0) break; - for (int32_t k = 0; k < vgNumCur; k++) { - SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k); - sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId); - tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d build %s", tmq->consumerId, epoch, pVgCur->vgId, vgKey); - taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t)); - } - break; - } - } - - int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs); - topic.vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); - for (int32_t j = 0; j < vgNumGet; j++) { - SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); - sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId); - int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey)); - int64_t offset = pVgEp->offset; - tscDebug("consumer:%" PRId64 ", (epoch %d) original offset of vgId:%d is %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset); - if (pOffset != NULL) { - offset = *pOffset; - tscDebug("consumer:%" PRId64 ", (epoch %d) receive offset of vgId:%d, full key is %s", tmq->consumerId, epoch, pVgEp->vgId, - vgKey); - } - tscDebug("consumer:%" PRId64 ", (epoch %d) offset of vgId:%d updated to %" PRId64, tmq->consumerId, epoch, pVgEp->vgId, offset); - SMqClientVg clientVg = { - .pollCnt = 0, - .currentOffset = offset, - .vgId = pVgEp->vgId, - .epSet = pVgEp->epSet, - .vgStatus = TMQ_VG_STATUS__IDLE, - .vgSkipCnt = 0, - }; - taosArrayPush(topic.vgs, &clientVg); - set = true; - } - taosArrayPush(newTopics, &topic); - } - if (tmq->clientTopics) taosArrayDestroy(tmq->clientTopics); - taosHashCleanup(pHash); - tmq->clientTopics = newTopics; - - if (taosArrayGetSize(tmq->clientTopics) == 0) - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC); - else - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); - - atomic_store_32(&tmq->epoch, epoch); - return set; -} -#endif - int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = pParam->tmq; @@ -1278,7 +1190,7 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); /*printf("rsp epoch %" PRId64 " sz %" PRId64 "\n", rsp.epoch, rsp.topics->size);*/ /*printf("tmq epoch %" PRId64 " sz %" PRId64 "\n", tmq->epoch, tmq->clientTopics->size);*/ - tmqUpdateEp2(tmq, head->epoch, &rsp); + tmqUpdateEp(tmq, head->epoch, &rsp); tDeleteSMqAskEpRsp(&rsp); } else { SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM); @@ -1430,7 +1342,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t timeout, SMqClientTopic* pReq->consumerId = tmq->consumerId; pReq->epoch = tmq->epoch; /*pReq->currentOffset = reqOffset;*/ - pReq->reqOffset = pVg->currentOffsetNew; + pReq->reqOffset = pVg->currentOffset; pReq->reqId = generateRequestId(); pReq->useSnapshot = tmq->useSnapshot; @@ -1534,7 +1446,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { /*printf("send poll\n");*/ char offsetFormatBuf[80]; - tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew); + tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffset); tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64, tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId); /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/ @@ -1552,7 +1464,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) { SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; - tmqUpdateEp2(tmq, rspWrapper->epoch, rspMsg); + tmqUpdateEp(tmq, rspWrapper->epoch, rspMsg); /*tmqClearUnhandleMsg(tmq);*/ *pReset = true; } else { @@ -1586,7 +1498,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, * rspMsg->msg.rspOffset);*/ - pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset; + pVg->currentOffset = pollRspWrapper->dataRsp.rspOffset; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); if (pollRspWrapper->dataRsp.blockNum == 0) { taosFreeQitem(pollRspWrapper); @@ -1609,8 +1521,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { SMqClientVg* pVg = pollRspWrapper->vgHandle; /*printf("vgId:%d, offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, * rspMsg->msg.rspOffset);*/ - pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset; - pVg->currentOffsetNew.type = TMQ_OFFSET__LOG; + pVg->currentOffset.version = pollRspWrapper->metaRsp.rspOffset; + pVg->currentOffset.type = TMQ_OFFSET__LOG; atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); @@ -1653,6 +1565,17 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { return NULL; } + if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { + int32_t retryCnt = 0; + while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { + if (retryCnt++ > 10) { + return NULL; + } + tscDebug("consumer not ready, retry"); + taosMsleep(500); + } + } + while (1) { tmqHandleAllDelayedTask(tmq); if (tmqPollImpl(tmq, timeout) < 0) return NULL; @@ -3384,10 +3307,10 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { // - tmqCommitInner2(tmq, msg, 0, 1, cb, param); + tmqCommitInner(tmq, msg, 0, 1, cb, param); } int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) { // - return tmqCommitInner2(tmq, msg, 0, 0, NULL, NULL); + return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL); } diff --git a/source/common/src/systable.c b/source/common/src/systable.c index ce80942356..be76a1b453 100644 --- a/source/common/src/systable.c +++ b/source/common/src/systable.c @@ -284,8 +284,7 @@ static const SSysDbTableSchema consumerSchema[] = { {.name = "client_id", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_BINARY}, {.name = "status", .bytes = 20 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, {.name = "topics", .bytes = TSDB_TOPIC_FNAME_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, - {.name = "pid", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, - {.name = "end_point", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_BINARY}, + /*{.name = "end_point", .bytes = TSDB_IPv4ADDR_LEN + 6 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},*/ {.name = "up_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "subscribe_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "rebalance_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 39b57f2966..614348c209 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -131,8 +131,9 @@ static int32_t mndProcessConsumerRecoverMsg(SRpcMsg *pMsg) { mInfo("receive consumer recover msg, consumer id %" PRId64 ", status %s", pRecoverMsg->consumerId, mndConsumerStatusName(pConsumer->status)); - if (pConsumer->status != MQ_CONSUMER_STATUS__READY) { + if (pConsumer->status != MQ_CONSUMER_STATUS__LOST_REBD) { mndReleaseConsumer(pMnode, pConsumer); + terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; return -1; } @@ -275,6 +276,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { int32_t status = atomic_load_32(&pConsumer->status); if (status == MQ_CONSUMER_STATUS__LOST_REBD) { + mInfo("try to recover consumer %ld", consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); pRecoverMsg->consumerId = consumerId; @@ -305,15 +307,14 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); -#if 1 atomic_store_32(&pConsumer->hbStatus, 0); -#endif // 1. check consumer status int32_t status = atomic_load_32(&pConsumer->status); #if 1 if (status == MQ_CONSUMER_STATUS__LOST_REBD) { + mInfo("try to recover consumer %ld", consumerId); SMqConsumerRecoverMsg *pRecoverMsg = rpcMallocCont(sizeof(SMqConsumerRecoverMsg)); pRecoverMsg->consumerId = consumerId; @@ -326,6 +327,7 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { #endif if (status != MQ_CONSUMER_STATUS__READY) { + mInfo("consumer %ld not ready, status: %s", consumerId, mndConsumerStatusName(status)); terrno = TSDB_CODE_MND_CONSUMER_NOT_READY; return -1; } @@ -939,13 +941,9 @@ static int32_t mndRetrieveConsumer(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock * colDataAppend(pColInfo, numOfRows, NULL, true); } - // pid - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->pid, true); - // end point - pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); - colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true); + /*pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);*/ + /*colDataAppend(pColInfo, numOfRows, (const char *)&pConsumer->ep, true);*/ // up time pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 2b154fce04..7b22e061f0 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -157,7 +157,7 @@ int32_t tqCheckColModifiable(STQ* pTq, int64_t tbUid, int32_t colId); int32_t tqProcessCheckAlterInfoReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen); +int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver); int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 02ca248054..112543e340 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -183,7 +183,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con return 0; } -int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen) { +int32_t tqProcessOffsetCommitReq(STQ* pTq, char* msg, int32_t msgLen, int64_t ver) { STqOffset offset = {0}; SDecoder decoder; tDecoderInit(&decoder, msg, msgLen); @@ -302,6 +302,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { tqError("tmq poll: consumer handle mismatch for consumer:%" PRId64 ", in vgId:%d, subkey %s, handle consumer id %" PRId64, consumerId, TD_VID(pTq->pVnode), pReq->subKey, pHandle->consumerId); + terrno = TSDB_CODE_TMQ_CONSUMER_MISMATCH; return -1; } diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index b8e94de115..e4e1e608e5 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -204,7 +204,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp break; case TDMT_VND_MQ_COMMIT_OFFSET: if (tqProcessOffsetCommitReq(pVnode->pTq, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)), - pMsg->contLen - sizeof(SMsgHead)) < 0) { + pMsg->contLen - sizeof(SMsgHead), version) < 0) { goto _err; } break; diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 7dc8d822e9..3776cb261f 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -44,7 +44,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock) int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* pReq); -SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); +SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem); void streamFreeQitem(SStreamQueueItem* data); #ifdef __cplusplus diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 629a6e8b06..cd5f499c34 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -124,7 +124,7 @@ void streamDataSubmitRefDec(SStreamDataSubmit* pDataSubmit) { } } -SStreamQueueItem* streamAppendQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { +SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* elem) { ASSERT(elem); if (dst->type == STREAM_INPUT__DATA_BLOCK && elem->type == STREAM_INPUT__DATA_BLOCK) { SStreamDataBlock* pBlock = (SStreamDataBlock*)dst; @@ -171,8 +171,8 @@ void streamFreeQitem(SStreamQueueItem* data) { int32_t ref = atomic_sub_fetch_32(pRef, 1); ASSERT(ref >= 0); if (ref == 0) { - void* data = taosArrayGetP(pMerge->reqs, i); - taosMemoryFree(data); + void* dataStr = taosArrayGetP(pMerge->reqs, i); + taosMemoryFree(dataStr); taosMemoryFree(pRef); } } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9d98afa65a..c78ff0756f 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -370,80 +370,6 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat return 0; } -int32_t streamBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* data, SRpcMsg* pMsg, SEpSet** ppEpSet) { - void* buf = NULL; - int32_t code = -1; - int32_t blockNum = taosArrayGetSize(data->blocks); - ASSERT(blockNum != 0); - - SStreamDispatchReq req = { - .streamId = pTask->streamId, - .dataSrcVgId = data->srcVgId, - .upstreamTaskId = pTask->taskId, - .upstreamChildId = pTask->selfChildId, - .upstreamNodeId = pTask->nodeId, - .blockNum = blockNum, - }; - - req.data = taosArrayInit(blockNum, sizeof(void*)); - req.dataLen = taosArrayInit(blockNum, sizeof(int32_t)); - if (req.data == NULL || req.dataLen == NULL) { - goto FAIL; - } - for (int32_t i = 0; i < blockNum; i++) { - SSDataBlock* pDataBlock = taosArrayGet(data->blocks, i); - if (streamAddBlockToDispatchMsg(pDataBlock, &req) < 0) { - goto FAIL; - } - } - int32_t vgId = 0; - int32_t downstreamTaskId = 0; - // find ep - if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) { - vgId = pTask->fixedEpDispatcher.nodeId; - *ppEpSet = &pTask->fixedEpDispatcher.epSet; - downstreamTaskId = pTask->fixedEpDispatcher.taskId; - } else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { - // TODO get ctbName for each block - SSDataBlock* pBlock = taosArrayGet(data->blocks, 0); - char* ctbName = buildCtbNameByGroupId(pTask->shuffleDispatcher.stbFullName, pBlock->info.groupId); - // TODO: get hash function by hashMethod - - // get groupId, compute hash value - uint32_t hashValue = MurmurHash3_32(ctbName, strlen(ctbName)); - - // get node - // TODO: optimize search process - SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos; - int32_t sz = taosArrayGetSize(vgInfo); - for (int32_t i = 0; i < sz; i++) { - SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i); - ASSERT(pVgInfo->vgId > 0); - if (hashValue >= pVgInfo->hashBegin && hashValue <= pVgInfo->hashEnd) { - vgId = pVgInfo->vgId; - downstreamTaskId = pVgInfo->taskId; - *ppEpSet = &pVgInfo->epSet; - break; - } - } - } - - ASSERT(vgId > 0 || vgId == SNODE_HANDLE); - req.taskId = downstreamTaskId; - - qDebug("dispatch from task %d (child id %d) to down stream task %d in vnode %d", pTask->taskId, pTask->selfChildId, - downstreamTaskId, vgId); - - streamDispatchOneReq(pTask, &req, vgId, *ppEpSet); - - code = 0; -FAIL: - if (code < 0 && buf) rpcFreeCont(buf); - if (req.data) taosArrayDestroyP(req.data, (FDelete)taosMemoryFree); - if (req.dataLen) taosArrayDestroy(req.dataLen); - return code; -} - int32_t streamDispatch(SStreamTask* pTask) { ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH); @@ -461,7 +387,7 @@ int32_t streamDispatch(SStreamTask* pTask) { } ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); - qDebug("stream continue dispatching: task %d", pTask->taskId); + qDebug("stream dispatching: task %d", pTask->taskId); int32_t code = 0; if (streamDispatchAllBlocks(pTask, pBlock) < 0) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 72249c5181..06ca26f029 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -82,17 +82,6 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, const void* data, SArray* return 0; } -#if 0 -static FORCE_INLINE int32_t streamUpdateVer(SStreamTask* pTask, SStreamDataBlock* pBlock) { - ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK); - int32_t childId = pBlock->childId; - int64_t ver = pBlock->sourceVer; - SStreamChildEpInfo* pChildInfo = taosArrayGetP(pTask->childEpInfo, childId); - /*pChildInfo-> = ver;*/ - return 0; -} -#endif - int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) { ASSERT(pTask->taskLevel != TASK_LEVEL__SINK); @@ -150,10 +139,11 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) return 0; } + // TODO: handle version int32_t streamExecForAll(SStreamTask* pTask) { while (1) { - int32_t cnt = 1; + int32_t batchCnt = 1; void* data = NULL; while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); @@ -169,13 +159,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { } } else { void* newRet; - if ((newRet = streamAppendQueueItem(data, qItem)) == NULL) { + if ((newRet = streamMergeQueueItem(data, qItem)) == NULL) { streamQueueProcessFail(pTask->inputQueue); break; } else { - cnt++; + batchCnt++; data = newRet; - /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ streamQueueProcessSuccess(pTask->inputQueue); } } @@ -198,16 +187,14 @@ int32_t streamExecForAll(SStreamTask* pTask) { SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, cnt); + qDebug("stream task %d exec begin, msg batch: %d", pTask->taskId, batchCnt); streamTaskExecImpl(pTask, data, pRes); qDebug("stream task %d exec end", pTask->taskId); if (taosArrayGetSize(pRes) != 0) { SStreamDataBlock* qRes = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM); if (qRes == NULL) { - // TODO log failed ver - streamQueueProcessFail(pTask->inputQueue); - taosArrayDestroy(pRes); + taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); streamFreeQitem(data); return -1; } @@ -218,17 +205,18 @@ int32_t streamExecForAll(SStreamTask* pTask) { SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)data; qRes->childId = pTask->selfChildId; qRes->sourceVer = pSubmit->ver; + } else if (((SStreamQueueItem*)data)->type == STREAM_INPUT__MERGED_SUBMIT) { + SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data; + qRes->childId = pTask->selfChildId; + qRes->sourceVer = pMerged->ver; } if (streamTaskOutput(pTask, qRes) < 0) { - // TODO save failed ver - /*streamQueueProcessFail(pTask->inputQueue);*/ taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - taosFreeQitem(qRes); streamFreeQitem(data); + taosFreeQitem(qRes); return -1; } - /*streamQueueProcessSuccess(pTask->inputQueue);*/ } else { taosArrayDestroy(pRes); } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index ad571a9e82..fad3977a21 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -421,6 +421,8 @@ static FORCE_INLINE int32_t walWriteImpl(SWal *pWal, int64_t index, tmsg_t msgTy pWal->writeHead.cksumHead = walCalcHeadCksum(&pWal->writeHead); pWal->writeHead.cksumBody = walCalcBodyCksum(body, bodyLen); + wDebug("vgId:%d, wal write log %ld, msgType: %s", pWal->cfg.vgId, index, TMSG_INFO(msgType)); + if (taosWriteFile(pWal->pLogFile, &pWal->writeHead, sizeof(SWalCkHead)) != sizeof(SWalCkHead)) { // TODO ftruncate terrno = TAOS_SYSTEM_ERROR(errno); diff --git a/source/os/src/osSemaphore.c b/source/os/src/osSemaphore.c index c57ea33028..a7d2ba8531 100644 --- a/source/os/src/osSemaphore.c +++ b/source/os/src/osSemaphore.c @@ -16,8 +16,8 @@ #define ALLOW_FORBID_FUNC #define _DEFAULT_SOURCE #include "os.h" -#include "tdef.h" #include "pthread.h" +#include "tdef.h" #ifdef WINDOWS @@ -77,8 +77,8 @@ int32_t tsem_wait(tsem_t* sem) { int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { struct timespec ts, rel; - FILETIME ft_before, ft_after; - int rc; + FILETIME ft_before, ft_after; + int rc; rel.tv_sec = 0; rel.tv_nsec = nanosecs; @@ -218,7 +218,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { // int e = errno; // if (e == EEXIST) continue; // if (e == EINTR) continue; -// fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, +// fprintf(stderr, "==%s[%d]%s():[%p]==not created[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, +// sem, // e, strerror(e)); // abort(); // } while (p->sem == SEM_FAILED); @@ -232,7 +233,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { // } // kern_return_t ret = semaphore_create(sem_port, &p->sem, SYNC_POLICY_FIFO, value); // if (ret != KERN_SUCCESS) { -// fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, +// fprintf(stderr, "==%s[%d]%s():[%p]==semophore_create failed\n", taosDirEntryBaseName(__FILE__), __LINE__, +// __func__, // sem); // // we fail-fast here, because we have less-doc about semaphore_create for the moment // abort(); @@ -259,8 +261,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { // } // struct tsem_s *p = *sem; // if (!p->valid) { -// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); -// abort(); +// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, +// sem); abort(); // } // #ifdef SEM_USE_PTHREAD // if (taosThreadMutexLock(&p->lock)) { @@ -271,7 +273,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { // p->val -= 1; // if (p->val < 0) { // if (taosThreadCondWait(&p->cond, &p->lock)) { -// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, +// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, +// __func__, // sem); // abort(); // } @@ -298,8 +301,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { // } // struct tsem_s *p = *sem; // if (!p->valid) { -// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); -// abort(); +// fprintf(stderr, "==%s[%d]%s():[%p]==already destroyed\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, +// sem); abort(); // } // #ifdef SEM_USE_PTHREAD // if (taosThreadMutexLock(&p->lock)) { @@ -310,7 +313,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { // p->val += 1; // if (p->val <= 0) { // if (taosThreadCondSignal(&p->cond)) { -// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, +// fprintf(stderr, "==%s[%d]%s():[%p]==internal logic error\n", taosDirEntryBaseName(__FILE__), __LINE__, +// __func__, // sem); // abort(); // } @@ -333,7 +337,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { // int tsem_destroy(tsem_t *sem) { // // fprintf(stderr, "==%s[%d]%s():[%p]==destroying\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); // if (!*sem) { -// // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem); +// // fprintf(stderr, "==%s[%d]%s():[%p]==not initialized\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, +// sem); // // abort(); // return 0; // } @@ -371,7 +376,8 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { // int r = sem_unlink(name); // if (r) { // int e = errno; -// fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, sem, +// fprintf(stderr, "==%s[%d]%s():[%p]==unlink failed[%d]%s\n", taosDirEntryBaseName(__FILE__), __LINE__, __func__, +// sem, // e, strerror(e)); // abort(); // } @@ -386,225 +392,189 @@ int32_t tsem_timewait(tsem_t* sem, int64_t nanosecs) { // *sem = NULL; // return 0; // } -typedef struct -{ - pthread_mutex_t count_lock; - pthread_cond_t count_bump; - unsigned int count; -}bosal_sem_t; +typedef struct { + pthread_mutex_t count_lock; + pthread_cond_t count_bump; + unsigned int count; +} bosal_sem_t; -int tsem_init(tsem_t *psem, int flags, unsigned int count) -{ - bosal_sem_t *pnewsem; - int result; +int tsem_init(tsem_t *psem, int flags, unsigned int count) { + bosal_sem_t *pnewsem; + int result; - pnewsem = (bosal_sem_t *)malloc(sizeof(bosal_sem_t)); - if (! pnewsem) - { - return -1; - } - result = pthread_mutex_init(&pnewsem->count_lock, NULL); - if (result) - { - free(pnewsem); - return result; - } - result = pthread_cond_init(&pnewsem->count_bump, NULL); - if (result) - { - pthread_mutex_destroy(&pnewsem->count_lock); - free(pnewsem); - return result; - } - pnewsem->count = count; - *psem = (tsem_t)pnewsem; - return 0; + pnewsem = (bosal_sem_t *)malloc(sizeof(bosal_sem_t)); + if (!pnewsem) { + return -1; + } + result = pthread_mutex_init(&pnewsem->count_lock, NULL); + if (result) { + free(pnewsem); + return result; + } + result = pthread_cond_init(&pnewsem->count_bump, NULL); + if (result) { + pthread_mutex_destroy(&pnewsem->count_lock); + free(pnewsem); + return result; + } + pnewsem->count = count; + *psem = (tsem_t)pnewsem; + return 0; } -int tsem_destroy(tsem_t *psem) -{ - bosal_sem_t *poldsem; +int tsem_destroy(tsem_t *psem) { + bosal_sem_t *poldsem; - if (! psem) - { - return EINVAL; - } - poldsem = (bosal_sem_t *)*psem; + if (!psem) { + return EINVAL; + } + poldsem = (bosal_sem_t *)*psem; - pthread_mutex_destroy(&poldsem->count_lock); - pthread_cond_destroy(&poldsem->count_bump); - free(poldsem); - return 0; + pthread_mutex_destroy(&poldsem->count_lock); + pthread_cond_destroy(&poldsem->count_bump); + free(poldsem); + return 0; } -int tsem_post(tsem_t *psem) -{ - bosal_sem_t *pxsem; - int result, xresult; +int tsem_post(tsem_t *psem) { + bosal_sem_t *pxsem; + int result, xresult; - if (! psem) - { - return EINVAL; - } - pxsem = (bosal_sem_t *)*psem; + if (!psem) { + return EINVAL; + } + pxsem = (bosal_sem_t *)*psem; - result = pthread_mutex_lock(&pxsem->count_lock); - if (result) - { - return result; - } - pxsem->count = pxsem->count + 1; + result = pthread_mutex_lock(&pxsem->count_lock); + if (result) { + return result; + } + pxsem->count = pxsem->count + 1; - xresult = pthread_cond_signal(&pxsem->count_bump); + xresult = pthread_cond_signal(&pxsem->count_bump); - result = pthread_mutex_unlock(&pxsem->count_lock); - if (result) - { - return result; - } - if (xresult) - { - errno = xresult; - return -1; - } - return 0; + result = pthread_mutex_unlock(&pxsem->count_lock); + if (result) { + return result; + } + if (xresult) { + errno = xresult; + return -1; + } + return 0; } -int tsem_trywait(tsem_t *psem) -{ - bosal_sem_t *pxsem; - int result, xresult; +int tsem_trywait(tsem_t *psem) { + bosal_sem_t *pxsem; + int result, xresult; - if (! psem) - { - return EINVAL; - } - pxsem = (bosal_sem_t *)*psem; + if (!psem) { + return EINVAL; + } + pxsem = (bosal_sem_t *)*psem; - result = pthread_mutex_lock(&pxsem->count_lock); - if (result) - { - return result; - } - xresult = 0; + result = pthread_mutex_lock(&pxsem->count_lock); + if (result) { + return result; + } + xresult = 0; - if (pxsem->count > 0) - { - pxsem->count--; - } - else - { - xresult = EAGAIN; - } - result = pthread_mutex_unlock(&pxsem->count_lock); - if (result) - { - return result; - } - if (xresult) - { - errno = xresult; - return -1; - } - return 0; + if (pxsem->count > 0) { + pxsem->count--; + } else { + xresult = EAGAIN; + } + result = pthread_mutex_unlock(&pxsem->count_lock); + if (result) { + return result; + } + if (xresult) { + errno = xresult; + return -1; + } + return 0; } -int tsem_wait(tsem_t *psem) -{ - bosal_sem_t *pxsem; - int result, xresult; +int tsem_wait(tsem_t *psem) { + bosal_sem_t *pxsem; + int result, xresult; - if (! psem) - { - return EINVAL; - } - pxsem = (bosal_sem_t *)*psem; + if (!psem) { + return EINVAL; + } + pxsem = (bosal_sem_t *)*psem; - result = pthread_mutex_lock(&pxsem->count_lock); - if (result) - { - return result; - } - xresult = 0; + result = pthread_mutex_lock(&pxsem->count_lock); + if (result) { + return result; + } + xresult = 0; - if (pxsem->count == 0) - { - xresult = pthread_cond_wait(&pxsem->count_bump, &pxsem->count_lock); + if (pxsem->count == 0) { + xresult = pthread_cond_wait(&pxsem->count_bump, &pxsem->count_lock); + } + if (!xresult) { + if (pxsem->count > 0) { + pxsem->count--; } - if (! xresult) - { - if (pxsem->count > 0) - { - pxsem->count--; - } - } - result = pthread_mutex_unlock(&pxsem->count_lock); - if (result) - { - return result; - } - if (xresult) - { - errno = xresult; - return -1; - } - return 0; + } + result = pthread_mutex_unlock(&pxsem->count_lock); + if (result) { + return result; + } + if (xresult) { + errno = xresult; + return -1; + } + return 0; } -int tsem_timewait(tsem_t *psem, int64_t nanosecs) -{ +int tsem_timewait(tsem_t *psem, int64_t nanosecs) { struct timespec abstim = { .tv_sec = 0, .tv_nsec = nanosecs, }; - bosal_sem_t *pxsem; - int result, xresult; + bosal_sem_t *pxsem; + int result, xresult; - if (! psem) - { - return EINVAL; - } - pxsem = (bosal_sem_t *)*psem; + if (!psem) { + return EINVAL; + } + pxsem = (bosal_sem_t *)*psem; - result = pthread_mutex_lock(&pxsem->count_lock); - if (result) - { - return result; - } - xresult = 0; + result = pthread_mutex_lock(&pxsem->count_lock); + if (result) { + return result; + } + xresult = 0; - if (pxsem->count == 0) - { - xresult = pthread_cond_timedwait(&pxsem->count_bump, &pxsem->count_lock, &abstim); + if (pxsem->count == 0) { + xresult = pthread_cond_timedwait(&pxsem->count_bump, &pxsem->count_lock, &abstim); + } + if (!xresult) { + if (pxsem->count > 0) { + pxsem->count--; } - if (! xresult) - { - if (pxsem->count > 0) - { - pxsem->count--; - } - } - result = pthread_mutex_unlock(&pxsem->count_lock); - if (result) - { - return result; - } - if (xresult) - { - errno = xresult; - return -1; - } - return 0; + } + result = pthread_mutex_unlock(&pxsem->count_lock); + if (result) { + return result; + } + if (xresult) { + errno = xresult; + return -1; + } + return 0; } -bool taosCheckPthreadValid(TdThread thread) { +bool taosCheckPthreadValid(TdThread thread) { int32_t ret = taosThreadKill(thread, 0); if (ret == ESRCH) return false; if (ret == EINVAL) return false; // alive return true; - } +} int64_t taosGetSelfPthreadId() { TdThread thread = taosThreadSelf(); @@ -651,7 +621,13 @@ int64_t taosGetSelfPthreadId() { int64_t taosGetPthreadId(TdThread thread) { return (int64_t)thread; } void taosResetPthread(TdThread* thread) { *thread = 0; } bool taosComparePthread(TdThread first, TdThread second) { return first == second; } -int32_t taosGetPId() { return getpid(); } + +int32_t taosGetPId() { + static int32_t pid; + if (pid != 0) return pid; + pid = getpid(); + return pid; +} int32_t taosGetAppName(char* name, int32_t* len) { const char* self = "/proc/self/exe"; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 2be1c9f744..f0a58c9bdc 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -623,6 +623,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Invalid index file" //tmq TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_MSG, "Invalid message") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_MISMATCH, "Consumer mismatch") #ifdef TAOS_ERROR_C };