This commit is contained in:
54liuyao 2024-04-24 18:57:22 +08:00
commit 2a72ceadd1
15 changed files with 110 additions and 78 deletions

View File

@ -35,6 +35,7 @@ extern "C" {
#define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2 #define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2
#define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST_ROW 0x4
#define CACHESCAN_RETRIEVE_LAST 0x8 #define CACHESCAN_RETRIEVE_LAST 0x8
#define CACHESCAN_RETRIEVE_PK 0x10
#define META_READER_LOCK 0x0 #define META_READER_LOCK 0x0
#define META_READER_NOLOCK 0x1 #define META_READER_NOLOCK 0x1

View File

@ -660,13 +660,13 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
taosRLockLatch(&tmq->lock); taosRLockLatch(&tmq->lock);
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tscInfo("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
for (int32_t i = 0; i < numOfTopics; i++) { for (int32_t i = 0; i < numOfTopics; i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
tscInfo("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName,
numOfVgroups); numOfVgroups);
for (int32_t j = 0; j < numOfVgroups; j++) { for (int32_t j = 0; j < numOfVgroups; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
@ -688,19 +688,19 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
continue; continue;
} }
tscInfo("consumer:0x%" PRIx64 tscDebug("consumer:0x%" PRIx64
" topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d", " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups); tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups);
tOffsetCopy(&pVg->offsetInfo.committedOffset, &pVg->offsetInfo.endOffset); tOffsetCopy(&pVg->offsetInfo.committedOffset, &pVg->offsetInfo.endOffset);
} else { } else {
tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d",
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups); tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups);
} }
} }
} }
taosRUnLockLatch(&tmq->lock); taosRUnLockLatch(&tmq->lock);
tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1,
numOfTopics); numOfTopics);
// request is sent // request is sent
@ -815,7 +815,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
offRows->ever = pVg->offsetInfo.walVerEnd; offRows->ever = pVg->offsetInfo.walVerEnd;
char buf[TSDB_OFFSET_LEN] = {0}; char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset);
tscInfo("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64, tscDebug("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64,
tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows); tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows);
} }
} }
@ -1058,6 +1058,7 @@ static void tmqMgmtInit(void) {
#define SET_ERROR_MSG_TMQ(MSG) \ #define SET_ERROR_MSG_TMQ(MSG) \
if (errstr != NULL) snprintf(errstr, errstrLen, MSG); if (errstr != NULL) snprintf(errstr, errstrLen, MSG);
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (conf == NULL) { if (conf == NULL) {
SET_ERROR_MSG_TMQ("configure is null") SET_ERROR_MSG_TMQ("configure is null")
@ -1504,7 +1505,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
int32_t topicNumGet = taosArrayGetSize(pRsp->topics); int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) { if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) {
tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId, tscDebug("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId,
tmq->epoch, epoch, topicNumGet); tmq->epoch, epoch, topicNumGet);
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);
@ -1800,14 +1801,14 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
for (int j = 0; j < numOfVg; j++) { for (int j = 0; j < numOfVg; j++) {
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId,
tmq->epoch, pVg->vgId); tmq->epoch, pVg->vgId);
continue; continue;
} }
if (tmq->replayEnable && if (tmq->replayEnable &&
taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms
tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay",
tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay); tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay);
continue; continue;
} }
@ -1815,7 +1816,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus == TMQ_VG_STATUS__WAIT) { if (vgStatus == TMQ_VG_STATUS__WAIT) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch,
pVg->vgId, vgSkipCnt); pVg->vgId, vgSkipCnt);
continue; continue;
} }
@ -1875,7 +1876,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper;
if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", tmq->consumerId); tscDebug("consumer:0x%" PRIx64 " wait for the rebalance, set status to be RECOVER", tmq->consumerId);
} else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { } else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
terrno = pRspWrapper->code; terrno = pRspWrapper->code;
tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId,
@ -2476,7 +2477,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqRspHead* head = pMsg->pData; SMqRspHead* head = pMsg->pData;
int32_t epoch = atomic_load_32(&tmq->epoch); int32_t epoch = atomic_load_32(&tmq->epoch);
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
if (pParam->sync) { if (pParam->sync) {
SMqAskEpRsp rsp = {0}; SMqAskEpRsp rsp = {0};
tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp);
@ -2581,7 +2582,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
int64_t transporterId = 0; int64_t transporterId = 0;
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);

View File

@ -9624,6 +9624,8 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) {
taosArrayDestroy(pTbData->aRowP); taosArrayDestroy(pTbData->aRowP);
} }
} }
pTbData->aRowP = NULL;
} }
void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) { void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) {

View File

@ -166,7 +166,7 @@ typedef struct {
int32_t failedTimes; int32_t failedTimes;
void* rpcRsp; void* rpcRsp;
int32_t rpcRspLen; int32_t rpcRspLen;
int32_t redoActionPos; int32_t actionPos;
SArray* prepareActions; SArray* prepareActions;
SArray* redoActions; SArray* redoActions;
SArray* undoActions; SArray* undoActions;

View File

@ -98,7 +98,8 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p
mDebug("not conflict with checkpoint trans, name:%s, continue create trans", pTransName); mDebug("not conflict with checkpoint trans, name:%s, continue create trans", pTransName);
} }
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) || } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) { (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) ||
strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId,
tInfo.name); tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT; terrno = TSDB_CODE_MND_TRANS_CONFLICT;

View File

@ -438,10 +438,10 @@ static void processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, S
} }
static void printRebalanceLog(SMqRebOutputObj *pOutput){ static void printRebalanceLog(SMqRebOutputObj *pOutput){
mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pOutput->pSub->key); mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key);
for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) { for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) {
SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i); SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i);
mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key, mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key,
pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId);
} }
@ -451,10 +451,10 @@ static void printRebalanceLog(SMqRebOutputObj *pOutput){
if (pIter == NULL) break; if (pIter == NULL) break;
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
int32_t sz = taosArrayGetSize(pConsumerEp->vgs); int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key, pConsumerEp->consumerId, sz); mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key, pConsumerEp->consumerId, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId, mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId,
pConsumerEp->consumerId); pConsumerEp->consumerId);
} }
} }
@ -762,18 +762,18 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) {
bool mndRebTryStart() { bool mndRebTryStart() {
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
mInfo("rebalance counter old val:%d", old); if (old > 0) mInfo("[rebalance] counter old val:%d", old)
return old == 0; return old == 0;
} }
void mndRebCntInc() { void mndRebCntInc() {
int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1); int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1);
mInfo("rebalance cnt inc, value:%d", val); if (val > 0) mInfo("[rebalance] cnt inc, value:%d", val)
} }
void mndRebCntDec() { void mndRebCntDec() {
int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1); int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1);
mInfo("rebalance cnt sub, value:%d", val); if (val > 0) mInfo("[rebalance] cnt sub, value:%d", val)
} }
static void clearRebOutput(SMqRebOutputObj *rebOutput){ static void clearRebOutput(SMqRebOutputObj *rebOutput){
@ -848,10 +848,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
int code = 0; int code = 0;
void *pIter = NULL; void *pIter = NULL;
SMnode *pMnode = pMsg->info.node; SMnode *pMnode = pMsg->info.node;
mInfo("[rebalance] start to process mq timer"); mDebug("[rebalance] start to process mq timer")
if (!mndRebTryStart()) { if (!mndRebTryStart()) {
mInfo("[rebalance] mq rebalance already in progress, do nothing"); mInfo("[rebalance] mq rebalance already in progress, do nothing")
return code; return code;
} }
@ -863,7 +863,9 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
taosHashSetFreeFp(rebSubHash, freeRebalanceItem); taosHashSetFreeFp(rebSubHash, freeRebalanceItem);
mndCheckConsumer(pMsg, rebSubHash); mndCheckConsumer(pMsg, rebSubHash);
mInfo("[rebalance] mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash)); if (taosHashGetSize(rebSubHash) > 0) {
mInfo("[rebalance] mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash))
}
while (1) { while (1) {
pIter = taosHashIterate(rebSubHash, pIter); pIter = taosHashIterate(rebSubHash, pIter);
@ -887,13 +889,15 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) {
mndDoRebalance(pMnode, &rebInput, &rebOutput); mndDoRebalance(pMnode, &rebInput, &rebOutput);
if (mndPersistRebResult(pMnode, pMsg, &rebOutput) != 0) { if (mndPersistRebResult(pMnode, pMsg, &rebOutput) != 0) {
mError("mq re-balance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr()); mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr())
} }
clearRebOutput(&rebOutput); clearRebOutput(&rebOutput);
} }
mInfo("[rebalance] mq re-balance completed successfully, wait trans finish"); if (taosHashGetSize(rebSubHash) > 0) {
mInfo("[rebalance] mq rebalance completed successfully, wait trans finish")
}
END: END:
taosHashCancelIterate(rebSubHash, pIter); taosHashCancelIterate(rebSubHash, pIter);

View File

@ -169,7 +169,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans) {
SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) SDB_SET_INT32(pRaw, dataPos, pTrans->actionPos, _OVER)
int32_t prepareActionNum = taosArrayGetSize(pTrans->prepareActions); int32_t prepareActionNum = taosArrayGetSize(pTrans->prepareActions);
int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions);
@ -317,7 +317,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) {
SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER) SDB_GET_INT32(pRaw, dataPos, &pTrans->actionPos, _OVER)
if (sver > TRANS_VER1_NUMBER) { if (sver > TRANS_VER1_NUMBER) {
SDB_GET_INT32(pRaw, dataPos, &prepareActionNum, _OVER) SDB_GET_INT32(pRaw, dataPos, &prepareActionNum, _OVER)
@ -525,7 +525,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) {
mndTransUpdateActions(pOld->undoActions, pNew->undoActions); mndTransUpdateActions(pOld->undoActions, pNew->undoActions);
mndTransUpdateActions(pOld->commitActions, pNew->commitActions); mndTransUpdateActions(pOld->commitActions, pNew->commitActions);
pOld->stage = pNew->stage; pOld->stage = pNew->stage;
pOld->redoActionPos = pNew->redoActionPos; pOld->actionPos = pNew->actionPos;
if (pOld->stage == TRN_STAGE_COMMIT) { if (pOld->stage == TRN_STAGE_COMMIT) {
pOld->stage = TRN_STAGE_COMMIT_ACTION; pOld->stage = TRN_STAGE_COMMIT_ACTION;
@ -1360,22 +1360,19 @@ static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool
return code; return code;
} }
static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArray *pActions, bool topHalf) {
int32_t code = 0; int32_t code = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); int32_t numOfActions = taosArrayGetSize(pActions);
if (numOfActions == 0) return code; if (numOfActions == 0) return code;
taosThreadMutexLock(&pTrans->mutex); if (pTrans->actionPos >= numOfActions) {
if (pTrans->redoActionPos >= numOfActions) {
taosThreadMutexUnlock(&pTrans->mutex);
return code; return code;
} }
mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos); mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->actionPos);
for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); STransAction *pAction = taosArrayGet(pActions, pTrans->actionPos);
code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
if (code == 0) { if (code == 0) {
@ -1409,14 +1406,14 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans,
if (code == 0) { if (code == 0) {
pTrans->code = 0; pTrans->code = 0;
pTrans->redoActionPos++; pTrans->actionPos++;
mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
pAction->id); pAction->id);
taosThreadMutexUnlock(&pTrans->mutex); taosThreadMutexUnlock(&pTrans->mutex);
code = mndTransSync(pMnode, pTrans); code = mndTransSync(pMnode, pTrans);
taosThreadMutexLock(&pTrans->mutex); taosThreadMutexLock(&pTrans->mutex);
if (code != 0) { if (code != 0) {
pTrans->redoActionPos--; pTrans->actionPos--;
pTrans->code = terrno; pTrans->code = terrno;
mError("trans:%d, %s:%d is executed and failed to sync to other mnodes since %s", pTrans->id, mError("trans:%d, %s:%d is executed and failed to sync to other mnodes since %s", pTrans->id,
mndTransStr(pAction->stage), pAction->id, terrstr()); mndTransStr(pAction->stage), pAction->id, terrstr());
@ -1442,8 +1439,26 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans,
} }
} }
taosThreadMutexUnlock(&pTrans->mutex); return code;
}
static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = TSDB_CODE_ACTION_IN_PROGRESS;
taosThreadMutexLock(&pTrans->mutex);
if (pTrans->stage == TRN_STAGE_REDO_ACTION) {
code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->redoActions, topHalf);
}
taosThreadMutexUnlock(&pTrans->mutex);
return code;
}
static int32_t mndTransExecuteUndoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = TSDB_CODE_ACTION_IN_PROGRESS;
taosThreadMutexLock(&pTrans->mutex);
if (pTrans->stage == TRN_STAGE_UNDO_ACTION) {
code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->undoActions, topHalf);
}
taosThreadMutexUnlock(&pTrans->mutex);
return code; return code;
} }
@ -1563,13 +1578,22 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, boo
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf); int32_t code = 0;
if (pTrans->exec == TRN_EXEC_SERIAL) {
code = mndTransExecuteUndoActionsSerial(pMnode, pTrans, topHalf);
} else {
code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf);
}
if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
terrno = code;
if (code == 0) { if (code == 0) {
pTrans->stage = TRN_STAGE_PRE_FINISH; pTrans->stage = TRN_STAGE_PRE_FINISH;
mInfo("trans:%d, stage from undoAction to pre-finish", pTrans->id); mInfo("trans:%d, stage from undoAction to pre-finish", pTrans->id);
continueExec = true; continueExec = true;
} else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { } else if (code == TSDB_CODE_ACTION_IN_PROGRESS || code == TSDB_CODE_MND_TRANS_CTX_SWITCH) {
mInfo("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); mInfo("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code));
continueExec = false; continueExec = false;
} else { } else {

View File

@ -367,7 +367,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
} while (0); } while (0);
} }
// 2. check re-balance status // 2. check rebalance status
if (pHandle->consumerId != consumerId) { if (pHandle->consumerId != consumerId) {
tqError("ERROR tmq poll: consumer:0x%" PRIx64 tqError("ERROR tmq poll: consumer:0x%" PRIx64
" vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
@ -485,7 +485,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
return -1; return -1;
} }
// 2. check re-balance status // 2. check rebalance status
if (pHandle->consumerId != consumerId) { if (pHandle->consumerId != consumerId) {
tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, vgId, req.subKey, pHandle->consumerId); consumerId, vgId, req.subKey, pHandle->consumerId);
@ -666,7 +666,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId); req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId);
} }
if (req.newConsumerId == -1) { if (req.newConsumerId == -1) {
tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId);
goto end; goto end;
} }
STqHandle handle = {0}; STqHandle handle = {0};

View File

@ -583,7 +583,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
if (IS_SET_NULL(pCol)) { if (IS_SET_NULL(pCol)) {
if (pCol->flags & COL_IS_KEY) { if (pCol->flags & COL_IS_KEY) {
qError("ts:%" PRId64 " Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts, qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts,
pCol->colId, pCol->type); pCol->colId, pCol->type);
break; break;
} }
@ -593,7 +593,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex);
if (colDataIsNull_s(pColData, j)) { if (colDataIsNull_s(pColData, j)) {
if (pCol->flags & COL_IS_KEY) { if (pCol->flags & COL_IS_KEY) {
qError("ts:%" PRId64 "Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8,
ts, pCol->colId, pCol->type); ts, pCol->colId, pCol->type);
break; break;
} }
@ -624,8 +624,8 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat
code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow); code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE);
pTableData->aRowP = taosArrayDestroy(pTableData->aRowP);
taosArrayDestroy(pVals); taosArrayDestroy(pVals);
tqError("s-task:%s build rows for submit failed, ts:%"PRId64, id, ts);
return code; return code;
} }

View File

@ -195,12 +195,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
const char* idstr = pTask->id.idStr; const char* idstr = pTask->id.idStr;
if (pMeta->updateInfo.transId != req.transId) { if (pMeta->updateInfo.transId != req.transId) {
pMeta->updateInfo.transId = req.transId; ASSERT(req.transId > pMeta->updateInfo.transId);
tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", idstr, req.transId); tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr,
vgId, req.transId, pMeta->updateInfo.transId);
// info needs to be kept till the new trans to update the nodeEp arrived. // info needs to be kept till the new trans to update the nodeEp arrived.
taosHashClear(pMeta->updateInfo.pTasks); taosHashClear(pMeta->updateInfo.pTasks);
pMeta->updateInfo.transId = req.transId;
} else { } else {
tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", idstr, req.transId); tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId);
} }
// duplicate update epset msg received, discard this redundant message // duplicate update epset msg received, discard this redundant message

View File

@ -130,7 +130,7 @@ static void tsdbClosePgCache(STsdb *pTsdb) {
enum { enum {
LFLAG_LAST_ROW = 0, LFLAG_LAST_ROW = 0,
LFLAG_LAST = 1, LFLAG_LAST = 1,
LFLAG_PRIMARY_KEY = (1 << 4), LFLAG_PRIMARY_KEY = CACHESCAN_RETRIEVE_PK,
}; };
typedef struct { typedef struct {

View File

@ -386,7 +386,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
goto _end; goto _end;
} }
int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3;
if (pr->rowKey.numOfPKs > 0) {
ltype |= CACHESCAN_RETRIEVE_PK;
}
STableKeyInfo* pTableList = pr->pTableList; STableKeyInfo* pTableList = pr->pTableList;
// retrieve the only one last row of all tables in the uid list. // retrieve the only one last row of all tables in the uid list.

View File

@ -160,6 +160,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
// partition by tbname // partition by tbname
if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) { if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) {
pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull); pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull);
if (pInfo->numOfPks > 0) {
pInfo->retrieveType |= CACHESCAN_RETRIEVE_PK;
}
STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0);

View File

@ -190,6 +190,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code)); stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
streamFreeQitem((SStreamQueueItem*)pBlock);
return code; return code;
} }
} }

View File

@ -21,6 +21,7 @@
#define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data #define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data
#define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms #define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms
#define MIN_INVOKE_INTERVAL 50 // 50ms #define MIN_INVOKE_INTERVAL 50 // 50ms
#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec
static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); static int32_t streamTransferStateDoPrepare(SStreamTask* pTask);
@ -46,6 +47,7 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl
ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH);
code = streamTaskPutDataIntoOutputQ(pTask, pBlock); code = streamTaskPutDataIntoOutputQ(pTask, pBlock);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
destroyStreamDataBlock(pBlock);
return code; return code;
} }
@ -76,7 +78,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks); int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { // back pressure and record position if (code != TSDB_CODE_SUCCESS) { // back pressure and record position
destroyStreamDataBlock(pStreamBlocks);
return code; return code;
} }
@ -244,6 +245,10 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t*
} }
} }
static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32_t idleTime) {
return (SScanhistoryDataInfo){code, idleTime};
}
SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
@ -260,7 +265,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
if (streamTaskShouldPause(pTask)) { if (streamTaskShouldPause(pTask)) {
stDebug("s-task:%s paused from the scan-history task", id); stDebug("s-task:%s paused from the scan-history task", id);
// quit from step1, not continue to handle the step2 // quit from step1, not continue to handle the step2
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
} }
SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock));
@ -275,7 +280,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
if(streamTaskShouldStop(pTask)) { if(streamTaskShouldStop(pTask)) {
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0);
} }
// dispatch the generated results // dispatch the generated results
@ -285,38 +290,21 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) {
// downstream task input queue is full, try in 5sec // downstream task input queue is full, try in 5sec
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) { if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL);
} }
if (finished) { if (finished) {
return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0}; return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0);
} }
if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) { if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) {
stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id,
pTask->info.fillHistory, el / 1000.0); pTask->info.fillHistory, el / 1000.0);
return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, 100);
} }
} }
} }
// wait for the stream task to be idle
static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
const char* id = pTask->id.idStr;
int64_t st = taosGetTimestampMs();
while (!streamTaskIsIdle(pStreamTask)) {
stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel,
pStreamTask->id.idStr);
taosMsleep(100);
}
double el = (taosGetTimestampMs() - st) / 1000.0;
if (el > 0) {
stDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", id, pStreamTask->id.idStr, el);
}
}
int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;