fix:conflict from main
This commit is contained in:
commit
446f31925e
|
@ -33,6 +33,7 @@ typedef enum {
|
|||
JOB_TASK_STATUS_INIT,
|
||||
JOB_TASK_STATUS_EXEC,
|
||||
JOB_TASK_STATUS_PART_SUCC,
|
||||
JOB_TASK_STATUS_FETCH,
|
||||
JOB_TASK_STATUS_SUCC,
|
||||
JOB_TASK_STATUS_FAIL,
|
||||
JOB_TASK_STATUS_DROP,
|
||||
|
|
|
@ -27,6 +27,8 @@
|
|||
#define EMPTY_BLOCK_POLL_IDLE_DURATION 10
|
||||
#define DEFAULT_AUTO_COMMIT_INTERVAL 5000
|
||||
|
||||
typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam);
|
||||
|
||||
struct SMqMgmt {
|
||||
int8_t inited;
|
||||
tmr_h timer;
|
||||
|
@ -109,6 +111,11 @@ struct tmq_t {
|
|||
tsem_t rspSem;
|
||||
};
|
||||
|
||||
typedef struct SAskEpInfo {
|
||||
int32_t code;
|
||||
tsem_t sem;
|
||||
} SAskEpInfo;
|
||||
|
||||
enum {
|
||||
TMQ_VG_STATUS__IDLE = 0,
|
||||
TMQ_VG_STATUS__WAIT,
|
||||
|
@ -169,11 +176,10 @@ typedef struct {
|
|||
} SMqSubscribeCbParam;
|
||||
|
||||
typedef struct {
|
||||
int64_t refId;
|
||||
int32_t epoch;
|
||||
int32_t code;
|
||||
int32_t async;
|
||||
tsem_t rspSem;
|
||||
int64_t refId;
|
||||
int32_t epoch;
|
||||
void* pParam;
|
||||
__tmq_askep_fn_t pUserFn;
|
||||
} SMqAskEpCbParam;
|
||||
|
||||
typedef struct {
|
||||
|
@ -189,16 +195,13 @@ typedef struct {
|
|||
typedef struct {
|
||||
int64_t refId;
|
||||
int32_t epoch;
|
||||
int8_t automatic;
|
||||
int8_t async;
|
||||
int32_t waitingRspNum;
|
||||
int32_t totalRspNum;
|
||||
int32_t rspErr;
|
||||
tmq_commit_cb* userCb;
|
||||
int32_t code;
|
||||
tmq_commit_cb* callbackFn;
|
||||
/*SArray* successfulOffsets;*/
|
||||
/*SArray* failedOffsets;*/
|
||||
void* userParam;
|
||||
tsem_t rspSem;
|
||||
void* userParam;
|
||||
} SMqCommitCbParamSet;
|
||||
|
||||
typedef struct {
|
||||
|
@ -209,12 +212,14 @@ typedef struct {
|
|||
tmq_t* pTmq;
|
||||
} SMqCommitCbParam;
|
||||
|
||||
static int32_t tmqAskEp(tmq_t* tmq, bool async);
|
||||
static int32_t doAskEp(tmq_t* tmq);
|
||||
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg);
|
||||
static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet);
|
||||
static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet,
|
||||
int32_t index, int32_t totalVgroups);
|
||||
static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
|
||||
static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId);
|
||||
static void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param);
|
||||
static void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param);
|
||||
|
||||
tmq_conf_t* tmq_conf_new() {
|
||||
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
|
||||
|
@ -444,7 +449,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
|||
// taosMemoryFree(pBuf->pData);
|
||||
// taosMemoryFree(pBuf->pEpSet);
|
||||
//
|
||||
// tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
|
||||
// commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
|
||||
// return 0;
|
||||
// }
|
||||
//
|
||||
|
@ -454,7 +459,7 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
|||
taosMemoryFree(pBuf->pData);
|
||||
taosMemoryFree(pBuf->pEpSet);
|
||||
|
||||
tmqCommitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
|
||||
commitRspCountDown(pParamSet, pParam->pTmq->consumerId, pParam->topicName, pParam->vgId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -462,8 +467,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
|
|||
int32_t index, int32_t totalVgroups) {
|
||||
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
|
||||
if (pOffset == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pOffset->val = pVg->currentOffset;
|
||||
|
@ -477,13 +481,13 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
|
|||
int32_t code = 0;
|
||||
tEncodeSize(tEncodeSTqOffset, pOffset, len, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
void* buf = taosMemoryCalloc(1, sizeof(SMsgHead) + len);
|
||||
if (buf == NULL) {
|
||||
taosMemoryFree(pOffset);
|
||||
return -1;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(pVg->vgId);
|
||||
|
@ -500,7 +504,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
|
|||
if (pParam == NULL) {
|
||||
taosMemoryFree(pOffset);
|
||||
taosMemoryFree(buf);
|
||||
return -1;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pParam->params = pParamSet;
|
||||
|
@ -516,7 +520,7 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
|
|||
taosMemoryFree(pOffset);
|
||||
taosMemoryFree(buf);
|
||||
taosMemoryFree(pParam);
|
||||
return -1;
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pMsgSendInfo->msgInfo = (SDataBuf){
|
||||
|
@ -547,129 +551,117 @@ static int32_t doSendCommitMsg(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicN
|
|||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, pMsgSendInfo);
|
||||
return 0;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
|
||||
char* topic;
|
||||
int32_t vgId;
|
||||
if (TD_RES_TMQ(msg)) {
|
||||
SMqRspObj* pRspObj = (SMqRspObj*)msg;
|
||||
topic = pRspObj->topic;
|
||||
static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* pCommitFp, void* userParam) {
|
||||
char* pTopicName = NULL;
|
||||
int32_t vgId = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
if (pRes == NULL || tmq == NULL) {
|
||||
pCommitFp(tmq, TSDB_CODE_INVALID_PARA, userParam);
|
||||
return;
|
||||
}
|
||||
|
||||
if (TD_RES_TMQ(pRes)) {
|
||||
SMqRspObj* pRspObj = (SMqRspObj*)pRes;
|
||||
pTopicName = pRspObj->topic;
|
||||
vgId = pRspObj->vgId;
|
||||
} else if (TD_RES_TMQ_META(msg)) {
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)msg;
|
||||
topic = pMetaRspObj->topic;
|
||||
} else if (TD_RES_TMQ_META(pRes)) {
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)pRes;
|
||||
pTopicName = pMetaRspObj->topic;
|
||||
vgId = pMetaRspObj->vgId;
|
||||
} else if (TD_RES_TMQ_METADATA(msg)) {
|
||||
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)msg;
|
||||
topic = pRspObj->topic;
|
||||
} else if (TD_RES_TMQ_METADATA(pRes)) {
|
||||
SMqTaosxRspObj* pRspObj = (SMqTaosxRspObj*)pRes;
|
||||
pTopicName = pRspObj->topic;
|
||||
vgId = pRspObj->vgId;
|
||||
} else {
|
||||
return TSDB_CODE_TMQ_INVALID_MSG;
|
||||
pCommitFp(tmq, TSDB_CODE_TMQ_INVALID_MSG, userParam);
|
||||
return;
|
||||
}
|
||||
|
||||
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
||||
if (pParamSet == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
|
||||
return;
|
||||
}
|
||||
|
||||
pParamSet->refId = tmq->refId;
|
||||
pParamSet->epoch = tmq->epoch;
|
||||
pParamSet->automatic = 0;
|
||||
pParamSet->async = async;
|
||||
pParamSet->userCb = userCb;
|
||||
pParamSet->callbackFn = pCommitFp;
|
||||
pParamSet->userParam = userParam;
|
||||
tsem_init(&pParamSet->rspSem, 0, 0);
|
||||
|
||||
int32_t code = -1;
|
||||
|
||||
taosThreadMutexLock(&tmq->lock);
|
||||
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " user invoked commit offset for %d", tmq->consumerId, numOfTopics);
|
||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||
tscDebug("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
|
||||
|
||||
int32_t i = 0;
|
||||
for (; i < numOfTopics; i++) {
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
if (strcmp(pTopic->topicName, topic) != 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
||||
for (int32_t j = 0; j < numOfVgroups; j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
if (pVg->vgId != vgId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
|
||||
if (doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups) < 0) {
|
||||
tsem_destroy(&pParamSet->rspSem);
|
||||
taosMemoryFree(pParamSet);
|
||||
goto FAIL;
|
||||
}
|
||||
goto HANDLE_RSP;
|
||||
}
|
||||
if (strcmp(pTopic->topicName, pTopicName) == 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
HANDLE_RSP:
|
||||
if (pParamSet->totalRspNum == 0) {
|
||||
tsem_destroy(&pParamSet->rspSem);
|
||||
if (i == numOfTopics) {
|
||||
tscWarn("consumer:0x%" PRIx64 " failed to find the specified topic:%s, total topics:%d", tmq->consumerId, pTopicName,
|
||||
numOfTopics);
|
||||
taosMemoryFree(pParamSet);
|
||||
taosThreadMutexUnlock(&tmq->lock);
|
||||
return 0;
|
||||
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
|
||||
return;
|
||||
}
|
||||
|
||||
if (!async) {
|
||||
taosThreadMutexUnlock(&tmq->lock);
|
||||
tsem_wait(&pParamSet->rspSem);
|
||||
code = pParamSet->rspErr;
|
||||
tsem_destroy(&pParamSet->rspSem);
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||
|
||||
int32_t j = 0;
|
||||
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
||||
for (j = 0; j < numOfVgroups; j++) {
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
if (pVg->vgId == vgId) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (j == numOfVgroups) {
|
||||
tscWarn("consumer:0x%" PRIx64 " failed to find the specified vgId:%d, total Vgs:%d, topic:%s", tmq->consumerId, vgId,
|
||||
numOfVgroups, pTopicName);
|
||||
taosMemoryFree(pParamSet);
|
||||
return code;
|
||||
} else {
|
||||
code = 0;
|
||||
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
|
||||
return;
|
||||
}
|
||||
|
||||
FAIL:
|
||||
taosThreadMutexUnlock(&tmq->lock);
|
||||
if (code != 0 && async) {
|
||||
userCb(tmq, code, userParam);
|
||||
}
|
||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
|
||||
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
|
||||
|
||||
return 0;
|
||||
// failed to commit, callback user function directly.
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(pParamSet);
|
||||
pCommitFp(tmq, code, userParam);
|
||||
}
|
||||
} else { // do not perform commit, callback user function directly.
|
||||
taosMemoryFree(pParamSet);
|
||||
pCommitFp(tmq, code, userParam);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_commit_cb* userCb, void* userParam) {
|
||||
int32_t code = -1;
|
||||
|
||||
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
|
||||
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
|
||||
if (pParamSet == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
if (async) {
|
||||
if (automatic) {
|
||||
tmq->commitCb(tmq, code, tmq->commitCbUserParam);
|
||||
} else {
|
||||
userCb(tmq, code, userParam);
|
||||
}
|
||||
}
|
||||
return -1;
|
||||
pCommitFp(tmq, TSDB_CODE_OUT_OF_MEMORY, userParam);
|
||||
return;
|
||||
}
|
||||
|
||||
pParamSet->refId = tmq->refId;
|
||||
pParamSet->epoch = tmq->epoch;
|
||||
|
||||
pParamSet->automatic = automatic;
|
||||
pParamSet->async = async;
|
||||
pParamSet->userCb = userCb;
|
||||
pParamSet->callbackFn = pCommitFp;
|
||||
pParamSet->userParam = userParam;
|
||||
tsem_init(&pParamSet->rspSem, 0, 0);
|
||||
|
||||
// init as 1 to prevent concurrency issue
|
||||
pParamSet->waitingRspNum = 1;
|
||||
|
||||
taosThreadMutexLock(&tmq->lock);
|
||||
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
|
||||
tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
|
||||
|
||||
|
@ -683,7 +675,7 @@ static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_comm
|
|||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||
|
||||
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
|
||||
code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
|
||||
int32_t code = doSendCommitMsg(tmq, pVg, pTopic->topicName, pParamSet, j, numOfVgroups);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscError("consumer:0x%" PRIx64 " topic:%s vgId:%d offset:%" PRId64 " failed, code:%s ordinal:%d/%d",
|
||||
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->committedOffset.version, tstrerror(terrno),
|
||||
|
@ -694,7 +686,7 @@ static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_comm
|
|||
// update the offset value.
|
||||
pVg->committedOffset = pVg->currentOffset;
|
||||
} else {
|
||||
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
|
||||
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d",
|
||||
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
|
||||
}
|
||||
}
|
||||
|
@ -702,39 +694,16 @@ static int32_t doAutoCommit(tmq_t* tmq, int8_t automatic, int8_t async, tmq_comm
|
|||
|
||||
tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
|
||||
numOfTopics);
|
||||
taosThreadMutexUnlock(&tmq->lock);
|
||||
|
||||
// no request is sent
|
||||
if (pParamSet->totalRspNum == 0) {
|
||||
tsem_destroy(&pParamSet->rspSem);
|
||||
taosMemoryFree(pParamSet);
|
||||
return 0;
|
||||
pCommitFp(tmq, TSDB_CODE_SUCCESS, userParam);
|
||||
return;
|
||||
}
|
||||
|
||||
// count down since waiting rsp num init as 1
|
||||
tmqCommitRspCountDown(pParamSet, tmq->consumerId, "", 0);
|
||||
|
||||
if (!async) {
|
||||
tsem_wait(&pParamSet->rspSem);
|
||||
code = pParamSet->rspErr;
|
||||
tsem_destroy(&pParamSet->rspSem);
|
||||
taosMemoryFree(pParamSet);
|
||||
#if 0
|
||||
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
|
||||
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
|
||||
#endif
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tmqCommitInner(tmq_t* tmq, const TAOS_RES* msg, int8_t automatic, int8_t async, tmq_commit_cb* userCb,
|
||||
void* userParam) {
|
||||
if (msg) { // user invoked commit
|
||||
return tmqCommitMsgImpl(tmq, msg, async, userCb, userParam);
|
||||
} else { // this for auto commit
|
||||
return doAutoCommit(tmq, automatic, async, userCb, userParam);
|
||||
}
|
||||
commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
|
||||
}
|
||||
|
||||
static void generateTimedTask(int64_t refId, int32_t type) {
|
||||
|
@ -841,6 +810,12 @@ OVER:
|
|||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
}
|
||||
|
||||
static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) {
|
||||
if (code != 0) {
|
||||
tscDebug("consumer:0x%"PRIx64", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code));
|
||||
}
|
||||
}
|
||||
|
||||
int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
||||
STaosQall* qall = taosAllocateQall();
|
||||
taosReadAllQitems(pTmq->delayedTask, qall);
|
||||
|
@ -856,7 +831,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|||
|
||||
while (pTaskType != NULL) {
|
||||
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
|
||||
tmqAskEp(pTmq, true);
|
||||
asyncAskEp(pTmq, addToQueueCallbackFn, NULL);
|
||||
|
||||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId = pTmq->refId;
|
||||
|
@ -864,12 +839,13 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|||
tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
|
||||
taosTmrReset(tmqAssignAskEpTask, 1000, pRefId, tmqMgmt.timer, &pTmq->epTimer);
|
||||
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
|
||||
tmqCommitInner(pTmq, NULL, 1, 1, pTmq->commitCb, pTmq->commitCbUserParam);
|
||||
tmq_commit_cb* pCallbackFn = pTmq->commitCb? pTmq->commitCb:defaultCommitCbFn;
|
||||
|
||||
asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
|
||||
int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t));
|
||||
*pRefId = pTmq->refId;
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " commit to vnode(s) in %.2fs", pTmq->consumerId,
|
||||
tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId,
|
||||
pTmq->autoCommitInterval / 1000.0);
|
||||
taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer);
|
||||
} else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) {
|
||||
|
@ -1061,8 +1037,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
|||
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
|
||||
pTmq->pollCnt = 0;
|
||||
pTmq->epoch = 0;
|
||||
/*pTmq->epStatus = 0;*/
|
||||
/*pTmq->epSkipCnt = 0;*/
|
||||
|
||||
// set conf
|
||||
strcpy(pTmq->clientId, conf->clientId);
|
||||
|
@ -1214,7 +1188,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
|
|||
}
|
||||
|
||||
int32_t retryCnt = 0;
|
||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
|
||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
|
||||
if (retryCnt++ > MAX_RETRY_COUNT) {
|
||||
goto FAIL;
|
||||
}
|
||||
|
@ -1348,7 +1322,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
char buf[80];
|
||||
tFormatOffset(buf, 80, &pRspWrapper->dataRsp.rspOffset);
|
||||
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
|
||||
tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d, reqId:0x%" PRIx64,
|
||||
tmq->consumerId, vgId, pRspWrapper->dataRsp.reqOffset.version, buf, rspType, requestId);
|
||||
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
|
||||
SDecoder decoder;
|
||||
|
@ -1514,28 +1488,30 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
|||
return set;
|
||||
}
|
||||
|
||||
static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
|
||||
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
||||
int8_t async = pParam->async;
|
||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
|
||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
|
||||
|
||||
if (tmq == NULL) {
|
||||
if (!async) {
|
||||
tsem_destroy(&pParam->rspSem);
|
||||
} else {
|
||||
taosMemoryFree(pParam);
|
||||
}
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
||||
pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
|
||||
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
||||
return -1;
|
||||
taosMemoryFree(pParam);
|
||||
return terrno;
|
||||
}
|
||||
|
||||
pParam->code = code;
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, async:%d, code:%s", tmq->consumerId, pParam->async,
|
||||
tstrerror(code));
|
||||
goto END;
|
||||
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
|
||||
pParam->pUserFn(tmq, code, NULL, pParam->pParam);
|
||||
|
||||
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
|
||||
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
taosMemoryFree(pParam);
|
||||
return code;
|
||||
}
|
||||
|
||||
// tmq's epoch is monotonically increase,
|
||||
|
@ -1546,6 +1522,7 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
if (head->epoch <= epoch) {
|
||||
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep",
|
||||
tmq->consumerId, head->epoch, epoch);
|
||||
|
||||
if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) {
|
||||
SMqAskEpRsp rsp;
|
||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
||||
|
@ -1554,45 +1531,17 @@ static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
|||
tDeleteSMqAskEpRsp(&rsp);
|
||||
}
|
||||
|
||||
goto END;
|
||||
}
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
|
||||
head->epoch, epoch);
|
||||
|
||||
if (!async) {
|
||||
SMqAskEpRsp rsp;
|
||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
|
||||
tmqUpdateEp(tmq, head->epoch, &rsp);
|
||||
tDeleteSMqAskEpRsp(&rsp);
|
||||
} else {
|
||||
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
|
||||
if (pWrapper == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
code = -1;
|
||||
goto END;
|
||||
}
|
||||
|
||||
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
|
||||
pWrapper->epoch = head->epoch;
|
||||
memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead));
|
||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg);
|
||||
|
||||
taosWriteQitem(tmq->mqueue, pWrapper);
|
||||
tsem_post(&tmq->rspSem);
|
||||
tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
|
||||
head->epoch, epoch);
|
||||
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
|
||||
}
|
||||
|
||||
END:
|
||||
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
|
||||
|
||||
if (!async) {
|
||||
tsem_post(&pParam->rspSem);
|
||||
} else {
|
||||
taosMemoryFree(pParam);
|
||||
}
|
||||
|
||||
taosMemoryFree(pMsg->pEpSet);
|
||||
taosMemoryFree(pMsg->pData);
|
||||
taosMemoryFree(pParam);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -1981,7 +1930,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
|||
|
||||
while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
|
||||
int32_t retryCnt = 0;
|
||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
|
||||
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
|
||||
if (retryCnt++ > 40) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2163,96 +2112,162 @@ const char* tmq_get_table_name(TAOS_RES* res) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
|
||||
tmqCommitInner(tmq, msg, 0, 1, cb, param);
|
||||
}
|
||||
|
||||
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
|
||||
return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
|
||||
}
|
||||
|
||||
int32_t tmqAskEp(tmq_t* tmq, bool async) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
#if 0
|
||||
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
|
||||
if (epStatus == 1) {
|
||||
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
|
||||
tscTrace("consumer:0x%" PRIx64 ", skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
|
||||
if (epSkipCnt < 5000) return 0;
|
||||
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) {
|
||||
if (pRes == NULL) { // here needs to commit all offsets.
|
||||
asyncCommitAllOffsets(tmq, cb, param);
|
||||
} else { // only commit one offset
|
||||
asyncCommitOffset(tmq, pRes, cb, param);
|
||||
}
|
||||
atomic_store_32(&tmq->epSkipCnt, 0);
|
||||
#endif
|
||||
}
|
||||
|
||||
typedef struct SSyncCommitInfo {
|
||||
tsem_t sem;
|
||||
int32_t code;
|
||||
} SSyncCommitInfo;
|
||||
|
||||
static void commitCallBackFn(tmq_t *pTmq, int32_t code, void* param) {
|
||||
SSyncCommitInfo* pInfo = (SSyncCommitInfo*) param;
|
||||
pInfo->code = code;
|
||||
tsem_post(&pInfo->sem);
|
||||
}
|
||||
|
||||
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
|
||||
int32_t code = 0;
|
||||
|
||||
SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo));
|
||||
tsem_init(&pInfo->sem, 0, 0);
|
||||
pInfo->code = 0;
|
||||
|
||||
if (pRes == NULL) {
|
||||
asyncCommitAllOffsets(tmq, commitCallBackFn, pInfo);
|
||||
} else {
|
||||
asyncCommitOffset(tmq, pRes, commitCallBackFn, pInfo);
|
||||
}
|
||||
|
||||
tsem_wait(&pInfo->sem);
|
||||
code = pInfo->code;
|
||||
|
||||
tsem_destroy(&pInfo->sem);
|
||||
taosMemoryFree(pInfo);
|
||||
|
||||
tscDebug("consumer:0x%"PRIx64" sync commit done, code:%s", tmq->consumerId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
|
||||
SAskEpInfo* pInfo = param;
|
||||
pInfo->code = code;
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
SMqRspHead* head = pDataBuf->pData;
|
||||
|
||||
SMqAskEpRsp rsp;
|
||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
|
||||
tmqUpdateEp(pTmq, head->epoch, &rsp);
|
||||
tDeleteSMqAskEpRsp(&rsp);
|
||||
}
|
||||
|
||||
tsem_post(&pInfo->sem);
|
||||
}
|
||||
|
||||
void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
terrno = code;
|
||||
return;
|
||||
}
|
||||
|
||||
SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0);
|
||||
if (pWrapper == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return;
|
||||
}
|
||||
|
||||
SMqRspHead* head = pDataBuf->pData;
|
||||
|
||||
pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP;
|
||||
pWrapper->epoch = head->epoch;
|
||||
memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead));
|
||||
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg);
|
||||
|
||||
taosWriteQitem(pTmq->mqueue, pWrapper);
|
||||
}
|
||||
|
||||
int32_t doAskEp(tmq_t* pTmq) {
|
||||
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
|
||||
tsem_init(&pInfo->sem, 0, 0);
|
||||
|
||||
asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
|
||||
tsem_wait(&pInfo->sem);
|
||||
|
||||
int32_t code = pInfo->code;
|
||||
tsem_destroy(&pInfo->sem);
|
||||
taosMemoryFree(pInfo);
|
||||
return code;
|
||||
}
|
||||
|
||||
void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
|
||||
SMqAskEpReq req = {0};
|
||||
req.consumerId = tmq->consumerId;
|
||||
req.epoch = tmq->epoch;
|
||||
strcpy(req.cgroup, tmq->groupId);
|
||||
req.consumerId = pTmq->consumerId;
|
||||
req.epoch = pTmq->epoch;
|
||||
strcpy(req.cgroup, pTmq->groupId);
|
||||
|
||||
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
|
||||
if (tlen < 0) {
|
||||
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", tmq->consumerId);
|
||||
return -1;
|
||||
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
|
||||
askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
|
||||
return;
|
||||
}
|
||||
|
||||
void* pReq = taosMemoryCalloc(1, tlen);
|
||||
if (pReq == NULL) {
|
||||
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
|
||||
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
|
||||
return;
|
||||
}
|
||||
|
||||
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
|
||||
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
|
||||
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
|
||||
askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
|
||||
return;
|
||||
}
|
||||
|
||||
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
|
||||
if (pParam == NULL) {
|
||||
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", tmq->consumerId);
|
||||
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
|
||||
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
|
||||
return;
|
||||
}
|
||||
|
||||
pParam->refId = tmq->refId;
|
||||
pParam->epoch = tmq->epoch;
|
||||
pParam->async = async;
|
||||
tsem_init(&pParam->rspSem, 0, 0);
|
||||
pParam->refId = pTmq->refId;
|
||||
pParam->epoch = pTmq->epoch;
|
||||
pParam->pUserFn = askEpFn;
|
||||
pParam->pParam = param;
|
||||
|
||||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
tsem_destroy(&pParam->rspSem);
|
||||
taosMemoryFree(pParam);
|
||||
taosMemoryFree(pReq);
|
||||
return -1;
|
||||
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
|
||||
return;
|
||||
}
|
||||
|
||||
sendInfo->msgInfo = (SDataBuf){
|
||||
.pData = pReq,
|
||||
.len = tlen,
|
||||
.handle = NULL,
|
||||
};
|
||||
sendInfo->msgInfo = (SDataBuf){ .pData = pReq, .len = tlen, .handle = NULL };
|
||||
|
||||
sendInfo->requestId = generateRequestId();
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = pParam;
|
||||
sendInfo->fp = tmqAskEpCb;
|
||||
sendInfo->fp = askEpCallbackFn;
|
||||
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
|
||||
|
||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, async:%d, reqId:0x%" PRIx64, tmq->consumerId, async,
|
||||
sendInfo->requestId);
|
||||
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
|
||||
tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
||||
if (!async) {
|
||||
tsem_wait(&pParam->rspSem);
|
||||
code = pParam->code;
|
||||
taosMemoryFree(pParam);
|
||||
}
|
||||
|
||||
return code;
|
||||
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
}
|
||||
|
||||
int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
|
||||
|
@ -2264,38 +2279,20 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
|
|||
|
||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
|
||||
if (tmq == NULL) {
|
||||
if (!pParamSet->async) {
|
||||
tsem_destroy(&pParamSet->rspSem);
|
||||
}
|
||||
taosMemoryFree(pParamSet);
|
||||
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
|
||||
return -1;
|
||||
}
|
||||
|
||||
// if no more waiting rsp
|
||||
if (pParamSet->async) {
|
||||
// call async cb func
|
||||
if (pParamSet->automatic && tmq->commitCb) {
|
||||
tmq->commitCb(tmq, pParamSet->rspErr, tmq->commitCbUserParam);
|
||||
} else if (!pParamSet->automatic && pParamSet->userCb) { // sem post
|
||||
pParamSet->userCb(tmq, pParamSet->rspErr, pParamSet->userParam);
|
||||
}
|
||||
|
||||
taosMemoryFree(pParamSet);
|
||||
} else {
|
||||
tsem_post(&pParamSet->rspSem);
|
||||
}
|
||||
|
||||
#if 0
|
||||
taosArrayDestroyP(pParamSet->successfulOffsets, taosMemoryFree);
|
||||
taosArrayDestroyP(pParamSet->failedOffsets, taosMemoryFree);
|
||||
#endif
|
||||
pParamSet->callbackFn(tmq, pParamSet->code, pParamSet->userParam);
|
||||
taosMemoryFree(pParamSet);
|
||||
|
||||
taosReleaseRef(tmqMgmt.rsetId, refId);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
|
||||
void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) {
|
||||
int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
|
||||
if (waitingRspNum == 0) {
|
||||
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic,
|
||||
|
|
|
@ -1091,10 +1091,13 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
|
|||
pAction->msgReceived = 1;
|
||||
pAction->errCode = pRsp->code;
|
||||
pTrans->lastErrorNo = pRsp->code;
|
||||
|
||||
mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId,
|
||||
mndTransStr(pAction->stage), action, pRsp->code, pAction->acceptableCode, pAction->retryCode);
|
||||
} else {
|
||||
mInfo("trans:%d, invalid action, index:%d, code:0x%x", transId, action, pRsp->code);
|
||||
}
|
||||
|
||||
mInfo("trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x", transId, mndTransStr(pAction->stage),
|
||||
action, pRsp->code, pAction->acceptableCode, pAction->retryCode);
|
||||
mndTransExecute(pMnode, pTrans, true);
|
||||
|
||||
_OVER:
|
||||
|
|
|
@ -755,8 +755,8 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) {
|
|||
return TSDB_CODE_QRY_INVALID_QHANDLE;
|
||||
}
|
||||
|
||||
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
|
||||
setTaskKilled(pTaskInfo, rspCode);
|
||||
qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo));
|
||||
setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED);
|
||||
|
||||
while(qTaskIsExecuting(pTaskInfo)) {
|
||||
taosMsleep(10);
|
||||
|
|
|
@ -771,24 +771,32 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
|
|||
|
||||
// scan table one by one sequentially
|
||||
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
|
||||
int32_t numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
|
||||
int32_t numOfTables = 0;//tableListGetSize(pTaskInfo->pTableInfoList);
|
||||
STableKeyInfo tInfo = {0};
|
||||
|
||||
while (1) {
|
||||
SSDataBlock* result = doGroupedTableScan(pOperator);
|
||||
if (result || (pOperator->status == OP_EXEC_DONE)) {
|
||||
if (result || (pOperator->status == OP_EXEC_DONE) || isTaskKilled(pTaskInfo)) {
|
||||
return result;
|
||||
}
|
||||
|
||||
// if no data, switch to next table and continue scan
|
||||
pInfo->currentTable++;
|
||||
|
||||
taosRLockLatch(&pTaskInfo->lock);
|
||||
numOfTables = tableListGetSize(pTaskInfo->pTableInfoList);
|
||||
|
||||
if (pInfo->currentTable >= numOfTables) {
|
||||
qDebug("all table checked in table list, total:%d, return NULL, %s", numOfTables, GET_TASKID(pTaskInfo));
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
STableKeyInfo* pTableInfo = tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
|
||||
tsdbSetTableList(pInfo->base.dataReader, pTableInfo, 1);
|
||||
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", pTableInfo->uid, numOfTables,
|
||||
tInfo = *(STableKeyInfo*) tableListGetInfo(pTaskInfo->pTableInfoList, pInfo->currentTable);
|
||||
taosRUnLockLatch(&pTaskInfo->lock);
|
||||
|
||||
tsdbSetTableList(pInfo->base.dataReader, &tInfo, 1);
|
||||
qDebug("set uid:%" PRIu64 " into scanner, total tables:%d, index:%d/%d %s", tInfo.uid, numOfTables,
|
||||
pInfo->currentTable, numOfTables, GET_TASKID(pTaskInfo));
|
||||
|
||||
tsdbReaderReset(pInfo->base.dataReader, &pInfo->base.cond);
|
||||
|
|
|
@ -337,8 +337,14 @@ static SNodeList* getChildProjection(SNode* pStmt) {
|
|||
static void eraseSetOpChildProjection(SSetOperator* pSetOp, int32_t index) {
|
||||
SNodeList* pLeftProjs = getChildProjection(pSetOp->pLeft);
|
||||
nodesListErase(pLeftProjs, nodesListGetCell(pLeftProjs, index));
|
||||
if (QUERY_NODE_SET_OPERATOR == nodeType(pSetOp->pLeft)) {
|
||||
eraseSetOpChildProjection((SSetOperator*)pSetOp->pLeft, index);
|
||||
}
|
||||
SNodeList* pRightProjs = getChildProjection(pSetOp->pRight);
|
||||
nodesListErase(pRightProjs, nodesListGetCell(pRightProjs, index));
|
||||
if (QUERY_NODE_SET_OPERATOR == nodeType(pSetOp->pRight)) {
|
||||
eraseSetOpChildProjection((SSetOperator*)pSetOp->pRight, index);
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct SNotRefByOrderByCxt {
|
||||
|
|
|
@ -25,6 +25,8 @@ static void clearColValArray(SArray* pCols) {
|
|||
if (TSDB_DATA_TYPE_NCHAR == pCol->type) {
|
||||
taosMemoryFreeClear(pCol->value.pData);
|
||||
}
|
||||
pCol->flag = CV_FLAG_NONE;
|
||||
pCol->value.val = 0;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -194,6 +194,8 @@ char* jobTaskStatusStr(int32_t status) {
|
|||
return "EXECUTING";
|
||||
case JOB_TASK_STATUS_PART_SUCC:
|
||||
return "PARTIAL_SUCCEED";
|
||||
case JOB_TASK_STATUS_FETCH:
|
||||
return "FETCHING";
|
||||
case JOB_TASK_STATUS_SUCC:
|
||||
return "SUCCEED";
|
||||
case JOB_TASK_STATUS_FAIL:
|
||||
|
|
|
@ -259,15 +259,26 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped) {
|
|||
static int32_t ignoreTime = 0;
|
||||
|
||||
if (++ignoreTime > 10 && 0 == taosRand() % 9) {
|
||||
if (ctx->msgType == TDMT_SCH_FETCH) {
|
||||
qwBuildAndSendErrorRsp(TDMT_SCH_LINK_BROKEN, &ctx->ctrlConnInfo, TSDB_CODE_RPC_BROKEN_LINK);
|
||||
qwBuildAndSendErrorRsp(ctx->msgType + 1, &ctx->dataConnInfo, TSDB_CODE_QRY_TASK_CTX_NOT_EXIST);
|
||||
*rsped = true;
|
||||
|
||||
taosSsleep(3);
|
||||
return;
|
||||
}
|
||||
|
||||
#if 0
|
||||
SRpcHandleInfo *pConn =
|
||||
((ctx->msgType == TDMT_SCH_FETCH || ctx->msgType == TDMT_SCH_MERGE_FETCH) ? &ctx->dataConnInfo
|
||||
: &ctx->ctrlConnInfo);
|
||||
: &ctx->ctrlConnInfo);
|
||||
qwBuildAndSendErrorRsp(ctx->msgType + 1, pConn, TSDB_CODE_RPC_BROKEN_LINK);
|
||||
|
||||
|
||||
qwBuildAndSendDropMsg(QW_FPARAMS(), pConn);
|
||||
*rsped = true;
|
||||
|
||||
|
||||
return;
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -193,7 +193,7 @@ typedef struct SSchLevel {
|
|||
int32_t taskSucceed;
|
||||
int32_t taskNum;
|
||||
int32_t taskLaunchedNum;
|
||||
int32_t taskDoneNum;
|
||||
int32_t taskExecDoneNum;
|
||||
SArray *subTasks; // Element is SSchTask
|
||||
} SSchLevel;
|
||||
|
||||
|
@ -299,6 +299,7 @@ typedef struct SSchJob {
|
|||
SExecResult execRes;
|
||||
void *fetchRes; // TODO free it or not
|
||||
bool fetched;
|
||||
bool noMoreRetry;
|
||||
int64_t resNumOfRows; // from int32_t to int64_t
|
||||
SSchResInfo userRes;
|
||||
char *sql;
|
||||
|
@ -333,13 +334,16 @@ extern SSchedulerMgmt schMgmt;
|
|||
((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \
|
||||
(!SCH_IS_DATA_BIND_QRY_TASK(_task)))
|
||||
|
||||
#define SCH_UPDATE_REDICT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
|
||||
#define SCH_GET_REDICT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
|
||||
#define SCH_UPDATE_REDIRECT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
|
||||
#define SCH_GET_REDIRECT_CODE(job, _code) (((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
|
||||
|
||||
#define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st)
|
||||
#define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status)
|
||||
#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task))
|
||||
|
||||
#define SCH_TASK_ALREADY_LAUNCHED(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_EXEC)
|
||||
#define SCH_TASK_EXEC_DONE(task) (SCH_GET_TASK_STATUS(task) >= JOB_TASK_STATUS_PART_SUCC)
|
||||
|
||||
#define SCH_GET_TASK_HANDLE(_task) ((_task) ? (_task)->handle : NULL)
|
||||
#define SCH_SET_TASK_HANDLE(_task, _handle) ((_task)->handle = (_handle))
|
||||
|
||||
|
@ -361,6 +365,7 @@ extern SSchedulerMgmt schMgmt;
|
|||
(SCH_IS_DATA_BIND_QRY_TASK(_task) && SCH_JOB_NEED_FLOW_CTRL(_job) && SCH_IS_LEVEL_UNFINISHED((_task)->level))
|
||||
#define SCH_FETCH_TYPE(_pSrcTask) (SCH_IS_DATA_BIND_QRY_TASK(_pSrcTask) ? TDMT_SCH_FETCH : TDMT_SCH_MERGE_FETCH)
|
||||
#define SCH_TASK_NEED_FETCH(_task) ((_task)->plan->subplanType != SUBPLAN_TYPE_MODIFY)
|
||||
#define SCH_MULTI_LEVEL_LAUNCHED(_job) ((_job)->levelIdx != ((_job)->levelNum - 1))
|
||||
|
||||
#define SCH_SET_JOB_TYPE(_job, type) \
|
||||
do { \
|
||||
|
@ -377,16 +382,24 @@ extern SSchedulerMgmt schMgmt;
|
|||
#define SCH_JOB_NEED_DROP(_job) (SCH_IS_QUERY_JOB(_job))
|
||||
#define SCH_IS_EXPLAIN_JOB(_job) (EXPLAIN_MODE_ANALYZE == (_job)->attr.explainMode)
|
||||
#define SCH_NETWORK_ERR(_code) ((_code) == TSDB_CODE_RPC_BROKEN_LINK || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || (_code) == TSDB_CODE_RPC_SOMENODE_NOT_CONNECTED)
|
||||
#define SCH_MERGE_TASK_NETWORK_ERR(_task, _code, _len) \
|
||||
(SCH_NETWORK_ERR(_code) && (((_len) > 0) || (!SCH_IS_DATA_BIND_TASK(_task)) || (_task)->redirectCtx.inRedirect))
|
||||
#define SCH_REDIRECT_MSGTYPE(_msgType) \
|
||||
((_msgType) == TDMT_SCH_LINK_BROKEN || (_msgType) == TDMT_SCH_QUERY || (_msgType) == TDMT_SCH_MERGE_QUERY || \
|
||||
(_msgType) == TDMT_SCH_FETCH || (_msgType) == TDMT_SCH_MERGE_FETCH)
|
||||
#define SCH_TASK_NEED_REDIRECT(_task, _msgType, _code, _rspLen) \
|
||||
(SCH_REDIRECT_MSGTYPE(_msgType) && \
|
||||
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_MERGE_TASK_NETWORK_ERR((_task), (_code), (_rspLen))))
|
||||
#define SCH_NEED_RETRY(_msgType, _code) \
|
||||
((SCH_NETWORK_ERR(_code) && SCH_REDIRECT_MSGTYPE(_msgType)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
|
||||
#define SCH_LOW_LEVEL_NETWORK_ERR(_job, _task, _code) \
|
||||
(SCH_NETWORK_ERR(_code) && ((_task)->level->level == (_job)->levelIdx))
|
||||
#define SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code) \
|
||||
(SCH_NETWORK_ERR(_code) && ((_task)->level->level > (_job)->levelIdx))
|
||||
#define SCH_TASK_RETRY_NETWORK_ERR(_task, _code) \
|
||||
(SCH_NETWORK_ERR(_code) && (_task)->redirectCtx.inRedirect)
|
||||
|
||||
#define SCH_JOB_NEED_RETRY(_job, _task, _msgType, _code) \
|
||||
(SCH_REDIRECT_MSGTYPE(_msgType) && SCH_TOP_LEVEL_NETWORK_ERR(_job, _task, _code))
|
||||
#define SCH_TASKSET_NEED_RETRY(_job, _task, _msgType, _code) \
|
||||
(SCH_REDIRECT_MSGTYPE(_msgType) && \
|
||||
(NEED_SCHEDULER_REDIRECT_ERROR(_code) || SCH_LOW_LEVEL_NETWORK_ERR((_job), (_task), (_code)) || SCH_TASK_RETRY_NETWORK_ERR((_task), (_code))))
|
||||
#define SCH_TASK_NEED_RETRY(_msgType, _code) \
|
||||
((SCH_REDIRECT_MSGTYPE(_msgType) && SCH_NETWORK_ERR(_code)) || (_code) == TSDB_CODE_SCH_TIMEOUT_ERROR)
|
||||
|
||||
|
||||
#define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum)
|
||||
#define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse])
|
||||
|
@ -510,6 +523,11 @@ extern SSchedulerMgmt schMgmt;
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define SCH_RESET_JOB_LEVEL_IDX(_job) do { \
|
||||
(_job)->levelIdx = (_job)->levelNum - 1; \
|
||||
SCH_JOB_DLOG("set job levelIdx to %d", (_job)->levelIdx); \
|
||||
} while (0)
|
||||
|
||||
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
|
||||
void schCleanClusterHb(void *pTrans);
|
||||
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
|
||||
|
@ -562,7 +580,7 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
|
|||
int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq);
|
||||
int32_t schDumpJobExecRes(SSchJob *pJob, SExecResult *pRes);
|
||||
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet);
|
||||
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode);
|
||||
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode);
|
||||
void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq, int32_t errCode);
|
||||
int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq);
|
||||
void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode);
|
||||
|
@ -591,6 +609,10 @@ int32_t schHandleJobDrop(SSchJob *pJob, int32_t errCode);
|
|||
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync);
|
||||
int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rspCode);
|
||||
int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp);
|
||||
int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode);
|
||||
int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode);
|
||||
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask);
|
||||
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet, int32_t rspCode);
|
||||
|
||||
extern SSchDebug gSCHDebug;
|
||||
|
||||
|
|
|
@ -282,7 +282,6 @@ int32_t schLaunchTasksInFlowCtrlList(SSchJob *pJob, SSchTask *pTask) {
|
|||
}
|
||||
|
||||
int32_t code = schLaunchTasksInFlowCtrlListImpl(pJob, ctrl);
|
||||
;
|
||||
SCH_ERR_RET(code);
|
||||
|
||||
return code; // to avoid compiler error
|
||||
|
|
|
@ -83,6 +83,10 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
|
|||
oriStatus = SCH_GET_JOB_STATUS(pJob);
|
||||
|
||||
if (oriStatus == newStatus) {
|
||||
if (JOB_TASK_STATUS_FETCH == newStatus) {
|
||||
return code;
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
|
||||
}
|
||||
|
||||
|
@ -108,10 +112,19 @@ int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) {
|
|||
break;
|
||||
case JOB_TASK_STATUS_PART_SUCC:
|
||||
if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
|
||||
newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC) {
|
||||
newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC &&
|
||||
newStatus != JOB_TASK_STATUS_FETCH) {
|
||||
SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_FETCH:
|
||||
if (newStatus != JOB_TASK_STATUS_FAIL && newStatus != JOB_TASK_STATUS_SUCC &&
|
||||
newStatus != JOB_TASK_STATUS_DROP && newStatus != JOB_TASK_STATUS_EXEC &&
|
||||
newStatus != JOB_TASK_STATUS_FETCH) {
|
||||
SCH_ERR_JRET(TSDB_CODE_APP_ERROR);
|
||||
}
|
||||
|
||||
break;
|
||||
case JOB_TASK_STATUS_SUCC:
|
||||
case JOB_TASK_STATUS_FAIL:
|
||||
|
@ -288,7 +301,7 @@ int32_t schValidateAndBuildJob(SQueryPlan *pDag, SSchJob *pJob) {
|
|||
}
|
||||
|
||||
pJob->levelNum = levelNum;
|
||||
pJob->levelIdx = levelNum - 1;
|
||||
SCH_RESET_JOB_LEVEL_IDX(pJob);
|
||||
|
||||
SSchLevel level = {0};
|
||||
SNodeListNode *plans = NULL;
|
||||
|
@ -550,9 +563,9 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
|
|||
}
|
||||
|
||||
SSchLevel *pLevel = pTask->level;
|
||||
int32_t doneNum = atomic_add_fetch_32(&pLevel->taskDoneNum, 1);
|
||||
int32_t doneNum = atomic_add_fetch_32(&pLevel->taskExecDoneNum, 1);
|
||||
if (doneNum == pLevel->taskNum) {
|
||||
pJob->levelIdx--;
|
||||
atomic_sub_fetch_32(&pJob->levelIdx, 1);
|
||||
|
||||
pLevel = taosArrayGet(pJob->levels, pJob->levelIdx);
|
||||
for (int32_t i = 0; i < pLevel->taskNum; ++i) {
|
||||
|
@ -562,6 +575,10 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if (SCH_TASK_ALREADY_LAUNCHED(pTask)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
SCH_ERR_RET(schLaunchTask(pJob, pTask));
|
||||
}
|
||||
}
|
||||
|
@ -811,6 +828,75 @@ void schDirectPostJobRes(SSchedulerReq *pReq, int32_t errCode) {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) {
|
||||
if (pJob->status >= JOB_TASK_STATUS_PART_SUCC) {
|
||||
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
||||
if (pJob->fetched) {
|
||||
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||
pJob->noMoreRetry = true;
|
||||
SCH_JOB_ELOG("already fetched while got error %s", tstrerror(rspCode));
|
||||
SCH_ERR_RET(rspCode);
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||
|
||||
schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode) {
|
||||
SCH_ERR_RET(schChkResetJobRetry(pJob, rspCode));
|
||||
|
||||
int32_t numOfLevels = taosArrayGetSize(pJob->levels);
|
||||
for (int32_t i = 0; i < numOfLevels; ++i) {
|
||||
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
|
||||
|
||||
pLevel->taskExecDoneNum = 0;
|
||||
pLevel->taskLaunchedNum = 0;
|
||||
|
||||
int32_t numOfTasks = taosArrayGetSize(pLevel->subTasks);
|
||||
for (int32_t j = 0; j < numOfTasks; ++j) {
|
||||
SSchTask *pTask = taosArrayGet(pLevel->subTasks, j);
|
||||
SCH_LOCK_TASK(pTask);
|
||||
SCH_ERR_RET(schChkUpdateRedirectCtx(pJob, pTask, NULL, rspCode));
|
||||
qClearSubplanExecutionNode(pTask->plan);
|
||||
schResetTaskForRetry(pJob, pTask);
|
||||
SCH_UNLOCK_TASK(pTask);
|
||||
}
|
||||
}
|
||||
|
||||
SCH_RESET_JOB_LEVEL_IDX(pJob);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
|
||||
taosMemoryFreeClear(pMsg->pData);
|
||||
taosMemoryFreeClear(pMsg->pEpSet);
|
||||
|
||||
SCH_UNLOCK_TASK(pTask);
|
||||
|
||||
SCH_TASK_DLOG("start to redirect all job tasks cause of error: %s", tstrerror(rspCode));
|
||||
|
||||
SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode));
|
||||
|
||||
SCH_ERR_JRET(schLaunchJob(pJob));
|
||||
|
||||
SCH_LOCK_TASK(pTask);
|
||||
|
||||
SCH_RET(code);
|
||||
|
||||
_return:
|
||||
|
||||
SCH_LOCK_TASK(pTask);
|
||||
|
||||
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
||||
}
|
||||
|
||||
bool schChkCurrentOp(SSchJob *pJob, int32_t op, int8_t sync) {
|
||||
bool r = false;
|
||||
SCH_LOCK(SCH_READ, &pJob->opStatus.lock);
|
||||
|
@ -907,7 +993,7 @@ int32_t schProcessOnOpBegin(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq *pReq
|
|||
SCH_ERR_RET(TSDB_CODE_APP_ERROR);
|
||||
}
|
||||
|
||||
if (status != JOB_TASK_STATUS_PART_SUCC) {
|
||||
if (status != JOB_TASK_STATUS_PART_SUCC && status != JOB_TASK_STATUS_FETCH) {
|
||||
SCH_JOB_ELOG("job status error for fetch, status:%s", jobTaskStatusStr(status));
|
||||
SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR);
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ int32_t schValidateRspMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) {
|
|||
TMSG_INFO(msgType));
|
||||
SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
|
||||
}
|
||||
if (taskStatus != JOB_TASK_STATUS_PART_SUCC) {
|
||||
if (taskStatus != JOB_TASK_STATUS_FETCH) {
|
||||
SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus),
|
||||
TMSG_INFO(msgType));
|
||||
SCH_ERR_RET(TSDB_CODE_QW_MSG_ERROR);
|
||||
|
@ -137,25 +137,12 @@ int32_t schProcessExplainRsp(SSchJob *pJob, SSchTask *pTask, SExplainRsp *rsp) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// Note: no more task error processing, handled in function internal
|
||||
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
char *msg = pMsg->pData;
|
||||
int32_t msgSize = pMsg->len;
|
||||
int32_t msgType = pMsg->msgType;
|
||||
|
||||
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
|
||||
if (SCH_IS_QUERY_JOB(pJob)) {
|
||||
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId));
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType));
|
||||
|
||||
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
|
||||
if (SCH_TASK_NEED_REDIRECT(pTask, reqType, rspCode, pMsg->len)) {
|
||||
SCH_RET(schHandleRedirect(pJob, pTask, (SDataBuf *)pMsg, rspCode));
|
||||
}
|
||||
|
||||
pTask->redirectCtx.inRedirect = false;
|
||||
|
||||
switch (msgType) {
|
||||
|
@ -423,6 +410,38 @@ _return:
|
|||
|
||||
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
||||
}
|
||||
|
||||
|
||||
// Note: no more task error processing, handled in function internal
|
||||
int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
int32_t msgType = pMsg->msgType;
|
||||
char *msg = pMsg->pData;
|
||||
|
||||
bool dropExecNode = (msgType == TDMT_SCH_LINK_BROKEN || SCH_NETWORK_ERR(rspCode));
|
||||
if (SCH_IS_QUERY_JOB(pJob)) {
|
||||
SCH_ERR_JRET(schUpdateTaskHandle(pJob, pTask, dropExecNode, pMsg->handle, execId));
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schValidateRspMsgType(pJob, pTask, msgType));
|
||||
|
||||
int32_t reqType = IsReq(pMsg) ? pMsg->msgType : (pMsg->msgType - 1);
|
||||
if (SCH_JOB_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
|
||||
SCH_RET(schHandleJobRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
|
||||
} else if (SCH_TASKSET_NEED_RETRY(pJob, pTask, reqType, rspCode)) {
|
||||
SCH_RET(schHandleTaskSetRetry(pJob, pTask, (SDataBuf *)pMsg, rspCode));
|
||||
}
|
||||
|
||||
pTask->redirectCtx.inRedirect = false;
|
||||
|
||||
SCH_RET(schProcessResponseMsg(pJob, pTask, execId, pMsg, rspCode));
|
||||
|
||||
_return:
|
||||
|
||||
taosMemoryFreeClear(msg);
|
||||
|
||||
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
||||
}
|
||||
int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
SSchTaskCallbackParam *pParam = (SSchTaskCallbackParam *)param;
|
||||
|
|
|
@ -34,6 +34,9 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
|
|||
case JOB_TASK_STATUS_PART_SUCC:
|
||||
SCH_ERR_JRET(schProcessOnJobPartialSuccess(pJob));
|
||||
break;
|
||||
case JOB_TASK_STATUS_FETCH:
|
||||
SCH_ERR_JRET(schJobFetchRows(pJob));
|
||||
break;
|
||||
case JOB_TASK_STATUS_SUCC:
|
||||
break;
|
||||
case JOB_TASK_STATUS_FAIL:
|
||||
|
|
|
@ -378,7 +378,8 @@ int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet,
|
|||
if (lastTime > tsMaxRetryWaitTime) {
|
||||
SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
|
||||
nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
|
||||
SCH_ERR_RET(SCH_GET_REDICT_CODE(pJob, rspCode));
|
||||
pJob->noMoreRetry = true;
|
||||
SCH_ERR_RET(SCH_GET_REDIRECT_CODE(pJob, rspCode));
|
||||
}
|
||||
|
||||
pCtx->periodMs *= tsRedirectFactor;
|
||||
|
@ -404,32 +405,35 @@ _return:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
|
||||
pTask->waitRetry = true;
|
||||
|
||||
schDropTaskOnExecNode(pJob, pTask);
|
||||
if (pTask->delayTimer) {
|
||||
taosTmrStopA(&pTask->delayTimer);
|
||||
}
|
||||
taosHashClear(pTask->execNodes);
|
||||
schRemoveTaskFromExecList(pJob, pTask);
|
||||
schDeregisterTaskHb(pJob, pTask);
|
||||
taosMemoryFreeClear(pTask->msg);
|
||||
pTask->msgLen = 0;
|
||||
pTask->lastMsgType = 0;
|
||||
pTask->childReady = 0;
|
||||
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
|
||||
}
|
||||
|
||||
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
|
||||
SCH_TASK_DLOG("task will be redirected now, status:%s, code:%s", SCH_GET_TASK_STATUS_STR(pTask), tstrerror(rspCode));
|
||||
|
||||
if (NULL == pData) {
|
||||
pTask->retryTimes = 0;
|
||||
}
|
||||
|
||||
if (!NO_RET_REDIRECT_ERROR(rspCode)) {
|
||||
SCH_UPDATE_REDICT_CODE(pJob, rspCode);
|
||||
SCH_UPDATE_REDIRECT_CODE(pJob, rspCode);
|
||||
}
|
||||
|
||||
SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL, rspCode));
|
||||
|
||||
pTask->waitRetry = true;
|
||||
|
||||
schDropTaskOnExecNode(pJob, pTask);
|
||||
taosHashClear(pTask->execNodes);
|
||||
schRemoveTaskFromExecList(pJob, pTask);
|
||||
schDeregisterTaskHb(pJob, pTask);
|
||||
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
|
||||
taosMemoryFreeClear(pTask->msg);
|
||||
pTask->msgLen = 0;
|
||||
pTask->lastMsgType = 0;
|
||||
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
|
||||
schResetTaskForRetry(pJob, pTask);
|
||||
|
||||
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
||||
if (pData && pData->pEpSet) {
|
||||
|
@ -445,12 +449,6 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
|||
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
|
||||
}
|
||||
|
||||
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
||||
if (JOB_TASK_STATUS_EXEC == SCH_GET_TASK_STATUS(pTask)) {
|
||||
SCH_ERR_JRET(schLaunchTasksInFlowCtrlList(pJob, pTask));
|
||||
}
|
||||
}
|
||||
|
||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
|
||||
|
||||
SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
|
||||
|
@ -486,20 +484,10 @@ _return:
|
|||
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
||||
}
|
||||
|
||||
int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
|
||||
int32_t schHandleTaskSetRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (JOB_TASK_STATUS_PART_SUCC == pJob->status) {
|
||||
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
||||
if (pJob->fetched) {
|
||||
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||
SCH_TASK_ELOG("already fetched while got error %s", tstrerror(rspCode));
|
||||
SCH_ERR_JRET(rspCode);
|
||||
}
|
||||
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||
|
||||
schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
|
||||
}
|
||||
SCH_ERR_JRET(schChkResetJobRetry(pJob, rspCode));
|
||||
|
||||
if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
|
||||
if (NULL == pData->pEpSet) {
|
||||
|
@ -509,7 +497,19 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
|||
}
|
||||
}
|
||||
|
||||
SCH_TASK_DLOG("start to redirect current task set cause of error: %s", tstrerror(rspCode));
|
||||
|
||||
for (int32_t i = 0; i < pJob->levelNum; ++i) {
|
||||
SSchLevel *pLevel = taosArrayGet(pJob->levels, i);
|
||||
|
||||
pLevel->taskExecDoneNum = 0;
|
||||
pLevel->taskLaunchedNum = 0;
|
||||
}
|
||||
|
||||
SCH_RESET_JOB_LEVEL_IDX(pJob);
|
||||
|
||||
code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
|
||||
|
||||
taosMemoryFreeClear(pData->pData);
|
||||
taosMemoryFreeClear(pData->pEpSet);
|
||||
|
||||
|
@ -621,6 +621,13 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) {
|
|||
*/
|
||||
|
||||
int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) {
|
||||
if (pJob->noMoreRetry) {
|
||||
*needRetry = false;
|
||||
SCH_TASK_DLOG("task no more retry since job no more retry, retryTimes:%d/%d", pTask->retryTimes,
|
||||
pTask->maxRetryTimes);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SCH_TIMEOUT_ERROR == errCode) {
|
||||
pTask->maxExecTimes++;
|
||||
pTask->maxRetryTimes++;
|
||||
|
@ -645,7 +652,7 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (!SCH_NEED_RETRY(pTask->lastMsgType, errCode)) {
|
||||
if (!SCH_TASK_NEED_RETRY(pTask->lastMsgType, errCode)) {
|
||||
*needRetry = false;
|
||||
SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1067,7 +1074,6 @@ int32_t schLaunchTaskImpl(void *param) {
|
|||
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
|
||||
}
|
||||
|
||||
// NOTE: race condition: the task should be put into the hash table before send msg to server
|
||||
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
|
||||
SCH_ERR_JRET(schPushTaskToExecList(pJob, pTask));
|
||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
|
||||
|
@ -1272,6 +1278,8 @@ int32_t schLaunchFetchTask(SSchJob *pJob) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SCH_SET_TASK_STATUS(pJob->fetchTask, JOB_TASK_STATUS_FETCH);
|
||||
|
||||
if (SCH_IS_LOCAL_EXEC_TASK(pJob, pJob->fetchTask)) {
|
||||
SCH_ERR_JRET(schExecLocalFetch(pJob, pJob->fetchTask));
|
||||
} else {
|
||||
|
|
|
@ -91,7 +91,7 @@ int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq *pReq) {
|
|||
|
||||
SCH_ERR_JRET(schHandleOpBeginEvent(jobId, &pJob, SCH_OP_FETCH, pReq));
|
||||
|
||||
SCH_ERR_JRET(schJobFetchRows(pJob));
|
||||
SCH_ERR_JRET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FETCH, pReq));
|
||||
|
||||
_return:
|
||||
|
||||
|
|
|
@ -939,6 +939,9 @@ int sml_ts2164_Test() {
|
|||
// "meters,location=la,groupid=ca current=11.8,voltage=221,phase=0.27",
|
||||
"meters,location=la,groupid=ca current=11.8,voltage=221",
|
||||
"meters,location=la,groupid=ca current=11.8,voltage=221,phase=0.27",
|
||||
"ts3038,location=l2a,groupid=ca current=L\"11.8\"",
|
||||
"ts3038,location=l2a,groupid=ca voltage=L\"221\"",
|
||||
"ts3038,location=l2a,groupid=ca phase=L\"221\"",
|
||||
// "meters,location=la,groupid=cb current=11.8,voltage=221,phase=0.27",
|
||||
};
|
||||
|
||||
|
|
Loading…
Reference in New Issue