commit
eabc8e6337
|
@ -911,7 +911,7 @@ CREATE_MSG_FAIL:
|
||||||
|
|
||||||
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
||||||
/*printf("call update ep %d\n", epoch);*/
|
/*printf("call update ep %d\n", epoch);*/
|
||||||
/*printf("tmq update ep epoch %d to epoch %d\n", tmq->epoch, epoch);*/
|
tscDebug("tmq update ep epoch %d to epoch %d", tmq->epoch, epoch);
|
||||||
bool set = false;
|
bool set = false;
|
||||||
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
||||||
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
|
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
|
||||||
|
@ -984,6 +984,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
||||||
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
||||||
tmq_t* tmq = pParam->tmq;
|
tmq_t* tmq = pParam->tmq;
|
||||||
|
tscDebug("consumer %ld recv ep", tmq->consumerId);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tscError("get topic endpoint error, not ready, wait:%d\n", pParam->sync);
|
tscError("get topic endpoint error, not ready, wait:%d\n", pParam->sync);
|
||||||
goto END;
|
goto END;
|
||||||
|
@ -1032,12 +1033,14 @@ END:
|
||||||
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
|
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
|
||||||
if (epStatus == 1) {
|
if (epStatus == 1) {
|
||||||
|
tscDebug("consumer %ld skip ask ep", tmq->consumerId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int32_t tlen = sizeof(SMqCMGetSubEpReq);
|
int32_t tlen = sizeof(SMqCMGetSubEpReq);
|
||||||
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
|
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
|
||||||
if (req == NULL) {
|
if (req == NULL) {
|
||||||
tscError("failed to malloc get subscribe ep buf");
|
tscError("failed to malloc get subscribe ep buf");
|
||||||
|
atomic_store_8(&tmq->epStatus, 0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
req->consumerId = htobe64(tmq->consumerId);
|
req->consumerId = htobe64(tmq->consumerId);
|
||||||
|
@ -1048,6 +1051,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
if (pParam == NULL) {
|
if (pParam == NULL) {
|
||||||
tscError("failed to malloc subscribe param");
|
tscError("failed to malloc subscribe param");
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
|
atomic_store_8(&tmq->epStatus, 0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pParam->tmq = tmq;
|
pParam->tmq = tmq;
|
||||||
|
@ -1059,6 +1063,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
tsem_destroy(&pParam->rspSem);
|
tsem_destroy(&pParam->rspSem);
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
|
atomic_store_8(&tmq->epStatus, 0);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1076,6 +1081,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
|
|
||||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
|
||||||
|
tscDebug("consumer %ld ask ep", tmq->consumerId);
|
||||||
|
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||||
|
|
||||||
|
@ -1214,7 +1221,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
||||||
if (vgStatus != TMQ_VG_STATUS__IDLE) {
|
if (vgStatus != TMQ_VG_STATUS__IDLE) {
|
||||||
tscDebug("skip vg %d", pVg->vgId);
|
tscDebug("consumer %ld skip vg %d", tmq->consumerId, pVg->vgId);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
|
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
|
||||||
|
@ -1258,7 +1265,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||||
int64_t transporterId = 0;
|
int64_t transporterId = 0;
|
||||||
/*printf("send poll\n");*/
|
/*printf("send poll\n");*/
|
||||||
atomic_add_fetch_32(&tmq->waitingRequest, 1);
|
atomic_add_fetch_32(&tmq->waitingRequest, 1);
|
||||||
tscDebug("tmq send poll: vg %d, req offset %ld", pVg->vgId, pVg->currentOffset);
|
tscDebug("consumer %ld send poll: vg %d, req offset %ld", tmq->consumerId, pVg->vgId, pVg->currentOffset);
|
||||||
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
|
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
|
||||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
||||||
pVg->pollCnt++;
|
pVg->pollCnt++;
|
||||||
|
@ -1322,7 +1329,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
|
||||||
tmqHandleNoPollRsp(tmq, rspHead, &reset);
|
tmqHandleNoPollRsp(tmq, rspHead, &reset);
|
||||||
taosFreeQitem(rspHead);
|
taosFreeQitem(rspHead);
|
||||||
if (pollIfReset && reset) {
|
if (pollIfReset && reset) {
|
||||||
tscDebug("reset and repoll\n");
|
tscDebug("consumer %ld reset and repoll", tmq->consumerId);
|
||||||
tmqPollImpl(tmq, blockingTime);
|
tmqPollImpl(tmq, blockingTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1561,24 +1568,3 @@ TAOS_ROW tmq_get_row(tmq_message_t* message) {
|
||||||
}
|
}
|
||||||
|
|
||||||
char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; }
|
char* tmq_get_topic_name(tmq_message_t* message) { return "not implemented yet"; }
|
||||||
|
|
||||||
#if 0
|
|
||||||
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
|
|
||||||
tmq_t* pTmq = taosMemoryMalloc(sizeof(tmq_t));
|
|
||||||
if (pTmq == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
strcpy(pTmq->groupId, conf->groupId);
|
|
||||||
strcpy(pTmq->clientId, conf->clientId);
|
|
||||||
pTmq->pTscObj = (STscObj*)conn;
|
|
||||||
pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;
|
|
||||||
return pTmq;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
|
|
||||||
assert(pMsgBody != NULL);
|
|
||||||
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
|
|
||||||
taosMemoryFreeClear(pMsgBody);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
|
@ -507,7 +507,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
|
||||||
|
|
||||||
// TODO: log rebalance statistics
|
// TODO: log rebalance statistics
|
||||||
SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
|
SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
|
||||||
sdbSetRawStatus(pSubRaw, SDB_STATUS_UPDATING);
|
sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
|
||||||
mndTransAppendRedolog(pTrans, pSubRaw);
|
mndTransAppendRedolog(pTrans, pSubRaw);
|
||||||
}
|
}
|
||||||
mndReleaseSubscribe(pMnode, pSub);
|
mndReleaseSubscribe(pMnode, pSub);
|
||||||
|
|
|
@ -40,6 +40,10 @@ const char *sdbTableName(ESdbType type) {
|
||||||
return "auth";
|
return "auth";
|
||||||
case SDB_ACCT:
|
case SDB_ACCT:
|
||||||
return "acct";
|
return "acct";
|
||||||
|
case SDB_STREAM:
|
||||||
|
return "stream";
|
||||||
|
case SDB_OFFSET:
|
||||||
|
return "offset";
|
||||||
case SDB_SUBSCRIBE:
|
case SDB_SUBSCRIBE:
|
||||||
return "subscribe";
|
return "subscribe";
|
||||||
case SDB_CONSUMER:
|
case SDB_CONSUMER:
|
||||||
|
|
Loading…
Reference in New Issue