Merge pull request #20722 from taosdata/feature/3_liaohj

fix(tmq): build sync api on top of async APIs.
This commit is contained in:
Haojun Liao 2023-04-01 01:02:10 +08:00 committed by GitHub
commit 9d165874eb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 278 additions and 268 deletions

View File

@ -27,6 +27,8 @@
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);
struct SMqMgmt {
int8_t inited;
tmr_h timer;
@ -109,6 +111,11 @@ struct tmq_t {
tsem_t rspSem;
};
typedef struct SAskEpInfo {
int32_t code;
tsem_t sem;
} SAskEpInfo;
enum {
TMQ_VG_STATUS__IDLE = 0,
TMQ_VG_STATUS__WAIT,
@ -169,11 +176,10 @@ typedef struct {
} SMqSubscribeCbParam;
typedef struct {
int64_t refId;
int32_t epoch;
int32_t code;
int32_t async;
tsem_t rspSem;
int64_t refId;
int32_t epoch;
void* pParam;
__tmq_askep_fn_t pUserFn;
} SMqAskEpCbParam;
typedef struct {
@ -189,16 +195,13 @@ typedef struct {
typedef struct {
int64_t refId;
int32_t epoch;
int8_t automatic;
int8_t async;
int32_t waitingRspNum;
int32_t totalRspNum;
int32_t rspErr;
tmq_commit_cb* userCb;
int32_t code;
tmq_commit_cb* callbackFn;
/*SArray* successfulOffsets;*/
/*SArray* failedOffsets;*/
void* userParam;
tsem_t rspSem;
void* userParam;
} SMqCommitCbParamSet;
typedef struct {
@ -209,12 +212,14 @@ typedef struct {
tmq_t* pTmq;
} SMqCommitCbParam;
static int32_t tmqAskEp(tmq_t* tmq, bool async);
static int32_t doAskEp(tmq_t* tmq);
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
int32_t index, int32_t totalVgroups);
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
static void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
static void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
@ -444,7 +449,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
// taosMemoryFree(pBuf->pData);
// taosMemoryFree(pBuf->pEpSet);
//
// tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
// commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
// return 0;
// }
//
@ -454,7 +459,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet);
tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
return 0;
}
@ -462,8 +467,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
int32_t index, int32_t totalVgroups) {
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
if (pOffset == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
pOffset->val = pVg->currentOffset;
@ -477,13 +481,13 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
int32_t code = 0;
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
if (code < 0) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
if (buf == NULL) {
taosMemoryFree(pOffset);
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
@ -500,7 +504,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
if (pParam == NULL) {
taosMemoryFree(pOffset);
taosMemoryFree(buf);
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
pParam->params = pParamSet;
@ -516,7 +520,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
taosMemoryFree(pOffset);
taosMemoryFree(buf);
taosMemoryFree(pParam);
return -1;
return TSDB_CODE_OUT_OF_MEMORY;
}
pMsgSendInfo->msgInfo = (SDataBuf){
@ -547,129 +551,117 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
return 0;
return TSDB_CODE_SUCCESS;
}
static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
char* topic;
int32_t vgId;
if (TD_RES_TMQ(msg)) {
SMqRspObj* pRspObj = (SMqRspObj*)msg;
topic = pRspObj->topic;
static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
char* pTopicName = NULL;
int32_t vgId = 0;
int32_t code = 0;
if (pRes == NULL || tmq == NULL) {
pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam);
return;
}
if (TD_RES_TMQ(pRes)) {
SMqRspObj* pRspObj = (SMqRspObj*)pRes;
pTopicName = pRspObj->topic;
vgId = pRspObj->vgId;
} else if (TD_RES_TMQ_META(msg)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
topic = pMetaRspObj->topic;
} else if (TD_RES_TMQ_META(pRes)) {
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
pTopicName = pMetaRspObj->topic;
vgId = pMetaRspObj->vgId;
} else if (TD_RES_TMQ_METADATA(msg)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
topic = pRspObj->topic;
} else if (TD_RES_TMQ_METADATA(pRes)) {
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
pTopicName = pRspObj->topic;
vgId = pRspObj->vgId;
} else {
return TSDB_CODE_TMQ_INVALID_MSG;
pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam);
return;
}
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
return;
}
pParamSet->refId = tmq->refId;
pParamSet->epoch = tmq->epoch;
pParamSet->automatic = 0;
pParamSet->async = async;
pParamSet->userCb = userCb;
pParamSet->callbackFn = pCommitFp;
pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0);
int32_t code = -1;
taosThreadMutexLock(&tmq->lock);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " user invoked commit offset for %d", tmq->consumerId, numOfTopics);
for (int32_t i = 0; i < numOfTopics; i++) {
tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
int32_t i = 0;
for (; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
if (strcmp(pTopic->topicName, topic) != 0) {
continue;
}
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->vgId != vgId) {
continue;
}
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
if (doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups) < 0) {
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
goto FAIL;
}
goto HANDLE_RSP;
}
if (strcmp(pTopic->topicName, pTopicName) == 0) {
break;
}
}
HANDLE_RSP:
if (pParamSet->totalRspNum == 0) {
tsem_destroy(&pParamSet->rspSem);
if (i == numOfTopics) {
tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId, pTopicName,
numOfTopics);
taosMemoryFree(pParamSet);
taosThreadMutexUnlock(&tmq->lock);
return 0;
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
return;
}
if (!async) {
taosThreadMutexUnlock(&tmq->lock);
tsem_wait(&pParamSet->rspSem);
code = pParamSet->rspErr;
tsem_destroy(&pParamSet->rspSem);
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
int32_t j = 0;
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
for (j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->vgId == vgId) {
break;
}
}
if (j == numOfVgroups) {
tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId, vgId,
numOfVgroups, pTopicName);
taosMemoryFree(pParamSet);
return code;
} else {
code = 0;
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
return;
}
FAIL:
taosThreadMutexUnlock(&tmq->lock);
if (code != 0 && async) {
userCb(tmq, code, userParam);
}
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
return 0;
// failed to commit, callback user function directly.
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
}
} else { // do not perform commit, callback user function directly.
taosMemoryFree(pParamSet);
pCommitFp(tmq, code, userParam);
}
}
static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) {
int32_t code = -1;
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
if (async) {
if (automatic) {
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
} else {
userCb(tmq, code, userParam);
}
}
return -1;
pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
return;
}
pParamSet->refId = tmq->refId;
pParamSet->epoch = tmq->epoch;
pParamSet->automatic = automatic;
pParamSet->async = async;
pParamSet->userCb = userCb;
pParamSet->callbackFn = pCommitFp;
pParamSet->userParam = userParam;
tsem_init(&pParamSet->rspSem, 0, 0);
// init as 1 to prevent concurrency issue
pParamSet->waitingRspNum = 1;
taosThreadMutexLock(&tmq->lock);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
@ -683,7 +675,7 @@ static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_comm
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->committedOffset.version, tstrerror(terrno),
@ -694,7 +686,7 @@ static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_comm
// update the offset value.
pVg->committedOffset = pVg->currentOffset;
} else {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
}
}
@ -702,39 +694,16 @@ static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_comm
tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
numOfTopics);
taosThreadMutexUnlock(&tmq->lock);
// no request is sent
if (pParamSet->totalRspNum == 0) {
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
return 0;
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
return;
}
// count down since waiting rsp num init as 1
tmqCommitRspCountDown(pParamSet, tmq->consumerId, "", 0);
if (!async) {
tsem_wait(&pParamSet->rspSem);
code = pParamSet->rspErr;
tsem_destroy(&pParamSet->rspSem);
taosMemoryFree(pParamSet);
#if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
}
return code;
}
static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
void* userParam) {
if (msg) { // user invoked commit
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
} else { // this for auto commit
return doAutoCommit(tmq, automatic, async, userCb, userParam);
}
commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
}
static void generateTimedTask(int64_t refId, int32_t type) {
@ -841,6 +810,12 @@ OVER:
taosReleaseRef(tmqMgmt.rsetId, refId);
}
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
if (code != 0) {
tscDebug("consumer:0x%"PRIx64", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
}
}
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
STaosQall* qall = taosAllocateQall();
taosReadAllQitems(pTmq->delayedTask, qall);
@ -856,7 +831,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
while (pTaskType != NULL) {
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
tmqAskEp(pTmq, true);
asyncAskEp(pTmq, addToQueueCallbackFn, NULL);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
@ -864,12 +839,13 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
tmq_commit_cb* pCallbackFn = pTmq->commitCb? pTmq->commitCb:defaultCommitCbFn;
asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
*pRefId = pTmq->refId;
tscDebug("consumer:0x%" PRIx64 " commit to vnode(s) in %.2fs", pTmq->consumerId,
tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
pTmq->autoCommitInterval / 1000.0);
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
@ -1061,8 +1037,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
/*pTmq->epStatus = 0;*/
/*pTmq->epSkipCnt = 0;*/
// set conf
strcpy(pTmq->clientId, conf->clientId);
@ -1214,7 +1188,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
}
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
if (retryCnt++ > MAX_RETRY_COUNT) {
goto FAIL;
}
@ -1348,7 +1322,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
char buf[80];
tFormatOffset(buf, 80, &pRspWrapper->dataRsp.rspOffset);
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
SDecoder decoder;
@ -1514,28 +1488,30 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
return set;
}
static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
int8_t async = pParam->async;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
if (tmq == NULL) {
if (!async) {
tsem_destroy(&pParam->rspSem);
} else {
taosMemoryFree(pParam);
}
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1;
taosMemoryFree(pParam);
return terrno;
}
pParam->code = code;
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId, pParam->async,
tstrerror(code));
goto END;
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
pParam->pUserFn(tmq, code, NULL, pParam->pParam);
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pParam);
return code;
}
// tmq's epoch is monotonically increase,
@ -1546,6 +1522,7 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (head->epoch <= epoch) {
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
tmq->consumerId, head->epoch, epoch);
if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
@ -1554,45 +1531,17 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
tDeleteSMqAskEpRsp(&rsp);
}
goto END;
}
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
head->epoch, epoch);
if (!async) {
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
tmqUpdateEp(tmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp);
} else {
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
if (pWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
code = -1;
goto END;
}
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
pWrapper->epoch = head->epoch;
memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
taosWriteQitem(tmq->mqueue, pWrapper);
tsem_post(&tmq->rspSem);
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
head->epoch, epoch);
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
}
END:
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
if (!async) {
tsem_post(&pParam->rspSem);
} else {
taosMemoryFree(pParam);
}
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pParam);
return code;
}
@ -1981,7 +1930,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
if (retryCnt++ > 40) {
return NULL;
}
@ -2163,96 +2112,162 @@ const char* tmq_get_table_name(TAOS_RES* res) {
return NULL;
}
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
tmqCommitInner(tmq, msg, 0, 1, cb, param);
}
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
}
int32_t tmqAskEp(tmq_t* tmq, bool async) {
int32_t code = TSDB_CODE_SUCCESS;
#if 0
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) {
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
if (epSkipCnt < 5000) return 0;
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
if (pRes == NULL) { // here needs to commit all offsets.
asyncCommitAllOffsets(tmq, cb, param);
} else { // only commit one offset
asyncCommitOffset(tmq, pRes, cb, param);
}
atomic_store_32(&tmq->epSkipCnt, 0);
#endif
}
typedef struct SSyncCommitInfo {
tsem_t sem;
int32_t code;
} SSyncCommitInfo;
static void commitCallBackFn(tmq_t *pTmq, int32_t code, void* param) {
SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
pInfo->code = code;
tsem_post(&pInfo->sem);
}
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
int32_t code = 0;
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
tsem_init(&pInfo->sem, 0, 0);
pInfo->code = 0;
if (pRes == NULL) {
asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
} else {
asyncCommitOffset(tmq, pRes, commitCallBackFn, pInfo);
}
tsem_wait(&pInfo->sem);
code = pInfo->code;
tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo);
tscDebug("consumer:0x%"PRIx64" sync commit done, code:%s", tmq->consumerId, tstrerror(code));
return code;
}
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
SAskEpInfo* pInfo = param;
pInfo->code = code;
if (code == TSDB_CODE_SUCCESS) {
SMqRspHead* head = pDataBuf->pData;
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
tmqUpdateEp(pTmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp);
}
tsem_post(&pInfo->sem);
}
void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return;
}
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
if (pWrapper == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return;
}
SMqRspHead* head = pDataBuf->pData;
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
pWrapper->epoch = head->epoch;
memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead));
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg);
taosWriteQitem(pTmq->mqueue, pWrapper);
}
int32_t doAskEp(tmq_t* pTmq) {
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
tsem_init(&pInfo->sem, 0, 0);
asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
tsem_wait(&pInfo->sem);
int32_t code = pInfo->code;
tsem_destroy(&pInfo->sem);
taosMemoryFree(pInfo);
return code;
}
void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
SMqAskEpReq req = {0};
req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch;
strcpy(req.cgroup, tmq->groupId);
req.consumerId = pTmq->consumerId;
req.epoch = pTmq->epoch;
strcpy(req.cgroup, pTmq->groupId);
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", tmq->consumerId);
return -1;
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
return;
}
void* pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
return;
}
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
taosMemoryFree(pReq);
return -1;
askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
return;
}
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId);
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
taosMemoryFree(pReq);
return -1;
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
return;
}
pParam->refId = tmq->refId;
pParam->epoch = tmq->epoch;
pParam->async = async;
tsem_init(&pParam->rspSem, 0, 0);
pParam->refId = pTmq->refId;
pParam->epoch = pTmq->epoch;
pParam->pUserFn = askEpFn;
pParam->pParam = param;
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
tsem_destroy(&pParam->rspSem);
taosMemoryFree(pParam);
taosMemoryFree(pReq);
return -1;
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
return;
}
sendInfo->msgInfo = (SDataBuf){
.pData = pReq,
.len = tlen,
.handle = NULL,
};
sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL };
sendInfo->requestId = generateRequestId();
sendInfo->requestObjRefId = 0;
sendInfo->param = pParam;
sendInfo->fp = tmqAskEpCb;
sendInfo->fp = askEpCallbackFn;
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%" PRIx64, tmq->consumerId, async,
sendInfo->requestId);
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if (!async) {
tsem_wait(&pParam->rspSem);
code = pParam->code;
taosMemoryFree(pParam);
}
return code;
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
}
int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
@ -2264,38 +2279,20 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) {
if (!pParamSet->async) {
tsem_destroy(&pParamSet->rspSem);
}
taosMemoryFree(pParamSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1;
}
// if no more waiting rsp
if (pParamSet->async) {
// call async cb func
if (pParamSet->automatic && tmq->commitCb) {
tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
} else if (!pParamSet->automatic && pParamSet->userCb) { // sem post
pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
}
taosMemoryFree(pParamSet);
} else {
tsem_post(&pParamSet->rspSem);
}
#if 0
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
#endif
pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
taosMemoryFree(pParamSet);
taosReleaseRef(tmqMgmt.rsetId, refId);
return 0;
}
void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
if (waitingRspNum == 0) {
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,

View File

@ -1091,10 +1091,13 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
pAction->msgReceived = 1;
pAction->errCode = pRsp->code;
pTrans->lastErrorNo = pRsp->code;
mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId,
mndTransStr(pAction->stage), action, pRsp->code, pAction->acceptableCode, pAction->retryCode);
} else {
mInfo("trans:%d, invalid action, index:%d, code:0x%x", transId, action, pRsp->code);
}
mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, mndTransStr(pAction->stage),
action, pRsp->code, pAction->acceptableCode, pAction->retryCode);
mndTransExecute(pMnode, pTrans, true);
_OVER:

View File

@ -755,8 +755,8 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo, rspCode);
qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
while(qTaskIsExecuting(pTaskInfo)) {
taosMsleep(10);

View File

@ -771,24 +771,32 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// scan table one by one sequentially
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableInfoList);
STableKeyInfo tInfo = {0};
while (1) {
SSDataBlock* result = doGroupedTableScan(pOperator);
if (result || (pOperator->status == OP_EXEC_DONE)) {
if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
return result;
}
// if no data, switch to next table and continue scan
pInfo->currentTable++;
taosRLockLatch(&pTaskInfo->lock);
numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
if (pInfo->currentTable >= numOfTables) {
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
taosRUnLockLatch(&pTaskInfo->lock);
return NULL;
}
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables,
tInfo = *(STableKeyInfo*) tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
taosRUnLockLatch(&pTaskInfo->lock);
tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1);
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
@ -1672,6 +1680,8 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
if (pTaskInfo->streamInfo.prepareStatus.type == TMQ_OFFSET__LOG) {
while (1) {
SFetchRet ret = {0};
terrno = 0;
if (tqNextBlock(pInfo->tqReader, &ret) < 0) {
// if the end is reached, terrno is 0
if (terrno != 0) {