diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index ec92bd56dd..fcb2e4d405 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -35,6 +35,7 @@ extern "C" { #define CACHESCAN_RETRIEVE_TYPE_SINGLE 0x2 #define CACHESCAN_RETRIEVE_LAST_ROW 0x4 #define CACHESCAN_RETRIEVE_LAST 0x8 +#define CACHESCAN_RETRIEVE_PK 0x10 #define META_READER_LOCK 0x0 #define META_READER_NOLOCK 0x1 diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index db86561e9b..8eabbb8098 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -660,13 +660,13 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us taosRLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - tscInfo("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); + tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); for (int32_t i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - tscInfo("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, + tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups); for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); @@ -688,19 +688,19 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us continue; } - tscInfo("consumer:0x%" PRIx64 + tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s, ordinal:%d/%d", tmq->consumerId, pTopic->topicName, pVg->vgId, offsetBuf, commitBuf, j + 1, numOfVgroups); tOffsetCopy(&pVg->offsetInfo.committedOffset, &pVg->offsetInfo.endOffset); } else { - tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", + tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, current:%" PRId64 ", ordinal:%d/%d", tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups); } } } taosRUnLockLatch(&tmq->lock); - tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, + tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics); // request is sent @@ -815,7 +815,7 @@ void tmqSendHbReq(void* param, void* tmrId) { offRows->ever = pVg->offsetInfo.walVerEnd; char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); - tscInfo("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64, + tscDebug("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64, tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows); } } @@ -1058,6 +1058,7 @@ static void tmqMgmtInit(void) { #define SET_ERROR_MSG_TMQ(MSG) \ if (errstr != NULL) snprintf(errstr, errstrLen, MSG); + tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { if (conf == NULL) { SET_ERROR_MSG_TMQ("configure is null") @@ -1504,7 +1505,7 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) int32_t topicNumGet = taosArrayGetSize(pRsp->topics); if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) { - tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId, + tscDebug("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId, tmq->epoch, epoch, topicNumGet); if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); @@ -1800,14 +1801,14 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { for (int j = 0; j < numOfVg; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (taosGetTimestampMs() - pVg->emptyBlockReceiveTs < EMPTY_BLOCK_POLL_IDLE_DURATION) { // less than 10ms - tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, + tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } if (tmq->replayEnable && taosGetTimestampMs() - pVg->blockReceiveTs < pVg->blockSleepForReplay) { // less than 10ms - tscTrace("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", + tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay); continue; } @@ -1815,7 +1816,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); - tscTrace("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, + tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); continue; } @@ -1875,7 +1876,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); - tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 " wait for the rebalance, set status to be RECOVER", tmq->consumerId); } else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { terrno = pRspWrapper->code; tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, @@ -2476,7 +2477,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); - tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); + tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); if (pParam->sync) { SMqAskEpRsp rsp = {0}; tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); @@ -2581,7 +2582,7 @@ void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp); - tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); + tscDebug("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); int64_t transporterId = 0; code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 340463e48c..6c2358a1d7 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -9624,6 +9624,8 @@ void tDestroySubmitTbData(SSubmitTbData *pTbData, int32_t flag) { taosArrayDestroy(pTbData->aRowP); } } + + pTbData->aRowP = NULL; } void tDestroySubmitReq(SSubmitReq2 *pReq, int32_t flag) { diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index acd0a2009c..68c55e235f 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -166,7 +166,7 @@ typedef struct { int32_t failedTimes; void* rpcRsp; int32_t rpcRspLen; - int32_t redoActionPos; + int32_t actionPos; SArray* prepareActions; SArray* redoActions; SArray* undoActions; diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 16b735fbc4..74ad09c752 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -98,7 +98,8 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamId, const char* p mDebug("not conflict with checkpoint trans, name:%s, continue create trans", pTransName); } } else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) || (strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0) || - (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0)) { + (strcmp(tInfo.name, MND_STREAM_TASK_RESET_NAME) == 0) || + strcmp(tInfo.name, MND_STREAM_TASK_UPDATE_NAME) == 0) { mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamId, tInfo.name); terrno = TSDB_CODE_MND_TRANS_CONFLICT; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c875deb972..8bbaadd203 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -438,10 +438,10 @@ static void processSubOffsetRows(SMnode *pMnode, const SMqRebInputObj *pInput, S } static void printRebalanceLog(SMqRebOutputObj *pOutput){ - mInfo("sub:%s mq re-balance calculation completed, re-balanced vg", pOutput->pSub->key); + mInfo("sub:%s mq rebalance calculation completed, re-balanced vg", pOutput->pSub->key); for (int32_t i = 0; i < taosArrayGetSize(pOutput->rebVgs); i++) { SMqRebOutputVg *pOutputRebVg = taosArrayGet(pOutput->rebVgs, i); - mInfo("sub:%s mq re-balance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key, + mInfo("sub:%s mq rebalance vgId:%d, moved from consumer:0x%" PRIx64 ", to consumer:0x%" PRIx64, pOutput->pSub->key, pOutputRebVg->pVgEp->vgId, pOutputRebVg->oldConsumerId, pOutputRebVg->newConsumerId); } @@ -451,10 +451,10 @@ static void printRebalanceLog(SMqRebOutputObj *pOutput){ if (pIter == NULL) break; SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter; int32_t sz = taosArrayGetSize(pConsumerEp->vgs); - mInfo("sub:%s mq re-balance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key, pConsumerEp->consumerId, sz); + mInfo("sub:%s mq rebalance final cfg: consumer:0x%" PRIx64 " has %d vg", pOutput->pSub->key, pConsumerEp->consumerId, sz); for (int32_t i = 0; i < sz; i++) { SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i); - mInfo("sub:%s mq re-balance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId, + mInfo("sub:%s mq rebalance final cfg: vg %d to consumer:0x%" PRIx64, pOutput->pSub->key, pVgEp->vgId, pConsumerEp->consumerId); } } @@ -762,18 +762,18 @@ static void mndCheckConsumer(SRpcMsg *pMsg, SHashObj* rebSubHash) { bool mndRebTryStart() { int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1); - mInfo("rebalance counter old val:%d", old); + if (old > 0) mInfo("[rebalance] counter old val:%d", old) return old == 0; } void mndRebCntInc() { int32_t val = atomic_add_fetch_32(&mqRebInExecCnt, 1); - mInfo("rebalance cnt inc, value:%d", val); + if (val > 0) mInfo("[rebalance] cnt inc, value:%d", val) } void mndRebCntDec() { int32_t val = atomic_sub_fetch_32(&mqRebInExecCnt, 1); - mInfo("rebalance cnt sub, value:%d", val); + if (val > 0) mInfo("[rebalance] cnt sub, value:%d", val) } static void clearRebOutput(SMqRebOutputObj *rebOutput){ @@ -848,10 +848,10 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { int code = 0; void *pIter = NULL; SMnode *pMnode = pMsg->info.node; - mInfo("[rebalance] start to process mq timer"); + mDebug("[rebalance] start to process mq timer") if (!mndRebTryStart()) { - mInfo("[rebalance] mq rebalance already in progress, do nothing"); + mInfo("[rebalance] mq rebalance already in progress, do nothing") return code; } @@ -863,7 +863,9 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { taosHashSetFreeFp(rebSubHash, freeRebalanceItem); mndCheckConsumer(pMsg, rebSubHash); - mInfo("[rebalance] mq re-balance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash)); + if (taosHashGetSize(rebSubHash) > 0) { + mInfo("[rebalance] mq rebalance start, total required re-balanced trans:%d", taosHashGetSize(rebSubHash)) + } while (1) { pIter = taosHashIterate(rebSubHash, pIter); @@ -887,13 +889,15 @@ static int32_t mndProcessRebalanceReq(SRpcMsg *pMsg) { mndDoRebalance(pMnode, &rebInput, &rebOutput); if (mndPersistRebResult(pMnode, pMsg, &rebOutput) != 0) { - mError("mq re-balance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr()); + mError("mq rebalance persist output error, possibly vnode splitted or dropped,msg:%s", terrstr()) } clearRebOutput(&rebOutput); } - mInfo("[rebalance] mq re-balance completed successfully, wait trans finish"); + if (taosHashGetSize(rebSubHash) > 0) { + mInfo("[rebalance] mq rebalance completed successfully, wait trans finish") + } END: taosHashCancelIterate(rebSubHash, pIter); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 7b6563f4b4..84940e01d4 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -169,7 +169,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans) { SDB_SET_INT64(pRaw, dataPos, pTrans->createdTime, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_SET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER) - SDB_SET_INT32(pRaw, dataPos, pTrans->redoActionPos, _OVER) + SDB_SET_INT32(pRaw, dataPos, pTrans->actionPos, _OVER) int32_t prepareActionNum = taosArrayGetSize(pTrans->prepareActions); int32_t redoActionNum = taosArrayGetSize(pTrans->redoActions); @@ -317,7 +317,7 @@ SSdbRow *mndTransDecode(SSdbRaw *pRaw) { SDB_GET_INT64(pRaw, dataPos, &pTrans->createdTime, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->dbname, TSDB_TABLE_FNAME_LEN, _OVER) SDB_GET_BINARY(pRaw, dataPos, pTrans->stbname, TSDB_TABLE_FNAME_LEN, _OVER) - SDB_GET_INT32(pRaw, dataPos, &pTrans->redoActionPos, _OVER) + SDB_GET_INT32(pRaw, dataPos, &pTrans->actionPos, _OVER) if (sver > TRANS_VER1_NUMBER) { SDB_GET_INT32(pRaw, dataPos, &prepareActionNum, _OVER) @@ -525,7 +525,7 @@ static int32_t mndTransActionUpdate(SSdb *pSdb, STrans *pOld, STrans *pNew) { mndTransUpdateActions(pOld->undoActions, pNew->undoActions); mndTransUpdateActions(pOld->commitActions, pNew->commitActions); pOld->stage = pNew->stage; - pOld->redoActionPos = pNew->redoActionPos; + pOld->actionPos = pNew->actionPos; if (pOld->stage == TRN_STAGE_COMMIT) { pOld->stage = TRN_STAGE_COMMIT_ACTION; @@ -1360,22 +1360,19 @@ static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool return code; } -static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { +static int32_t mndTransExecuteActionsSerial(SMnode *pMnode, STrans *pTrans, SArray *pActions, bool topHalf) { int32_t code = 0; - int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); + int32_t numOfActions = taosArrayGetSize(pActions); if (numOfActions == 0) return code; - taosThreadMutexLock(&pTrans->mutex); - - if (pTrans->redoActionPos >= numOfActions) { - taosThreadMutexUnlock(&pTrans->mutex); + if (pTrans->actionPos >= numOfActions) { return code; } - mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->redoActionPos); + mInfo("trans:%d, execute %d actions serial, current redoAction:%d", pTrans->id, numOfActions, pTrans->actionPos); - for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { - STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); + for (int32_t action = pTrans->actionPos; action < numOfActions; ++action) { + STransAction *pAction = taosArrayGet(pActions, pTrans->actionPos); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf); if (code == 0) { @@ -1409,14 +1406,14 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, if (code == 0) { pTrans->code = 0; - pTrans->redoActionPos++; + pTrans->actionPos++; mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), pAction->id); taosThreadMutexUnlock(&pTrans->mutex); code = mndTransSync(pMnode, pTrans); taosThreadMutexLock(&pTrans->mutex); if (code != 0) { - pTrans->redoActionPos--; + pTrans->actionPos--; pTrans->code = terrno; mError("trans:%d, %s:%d is executed and failed to sync to other mnodes since %s", pTrans->id, mndTransStr(pAction->stage), pAction->id, terrstr()); @@ -1442,8 +1439,26 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, } } - taosThreadMutexUnlock(&pTrans->mutex); + return code; +} +static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = TSDB_CODE_ACTION_IN_PROGRESS; + taosThreadMutexLock(&pTrans->mutex); + if (pTrans->stage == TRN_STAGE_REDO_ACTION) { + code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->redoActions, topHalf); + } + taosThreadMutexUnlock(&pTrans->mutex); + return code; +} + +static int32_t mndTransExecuteUndoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) { + int32_t code = TSDB_CODE_ACTION_IN_PROGRESS; + taosThreadMutexLock(&pTrans->mutex); + if (pTrans->stage == TRN_STAGE_UNDO_ACTION) { + code = mndTransExecuteActionsSerial(pMnode, pTrans, pTrans->undoActions, topHalf); + } + taosThreadMutexUnlock(&pTrans->mutex); return code; } @@ -1563,13 +1578,22 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, boo static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) { bool continueExec = true; - int32_t code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf); + int32_t code = 0; + + if (pTrans->exec == TRN_EXEC_SERIAL) { + code = mndTransExecuteUndoActionsSerial(pMnode, pTrans, topHalf); + } else { + code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf); + } + + if (mndCannotExecuteTransAction(pMnode, topHalf)) return false; + terrno = code; if (code == 0) { pTrans->stage = TRN_STAGE_PRE_FINISH; mInfo("trans:%d, stage from undoAction to pre-finish", pTrans->id); continueExec = true; - } else if (code == TSDB_CODE_ACTION_IN_PROGRESS) { + } else if (code == TSDB_CODE_ACTION_IN_PROGRESS || code == TSDB_CODE_MND_TRANS_CTX_SWITCH) { mInfo("trans:%d, stage keep on undoAction since %s", pTrans->id, tstrerror(code)); continueExec = false; } else { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 567d61e27a..3d5f5bd64c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -367,7 +367,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { } while (0); } - // 2. check re-balance status + // 2. check rebalance status if (pHandle->consumerId != consumerId) { tqError("ERROR tmq poll: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, @@ -485,7 +485,7 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { return -1; } - // 2. check re-balance status + // 2. check rebalance status if (pHandle->consumerId != consumerId) { tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, vgId, req.subKey, pHandle->consumerId); @@ -666,7 +666,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg req.vgId, req.subKey, req.newConsumerId, req.oldConsumerId); } if (req.newConsumerId == -1) { - tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); + tqError("vgId:%d, tq invalid rebalance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); goto end; } STqHandle handle = {0}; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index 1da096224c..f690b9b277 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -583,7 +583,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat if (IS_SET_NULL(pCol)) { if (pCol->flags & COL_IS_KEY) { - qError("ts:%" PRId64 " Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts, + qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts, pCol->colId, pCol->type); break; } @@ -593,7 +593,7 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, dataIndex); if (colDataIsNull_s(pColData, j)) { if (pCol->flags & COL_IS_KEY) { - qError("ts:%" PRId64 "Primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, + qError("ts:%" PRId64 " primary key column should not be null, colId:%" PRIi16 ", colType:%" PRIi8, ts, pCol->colId, pCol->type); break; } @@ -624,8 +624,8 @@ int32_t doConvertRows(SSubmitTbData* pTableData, const STSchema* pTSchema, SSDat code = tRowBuild(pVals, (STSchema*)pTSchema, &pRow); if (code != TSDB_CODE_SUCCESS) { tDestroySubmitTbData(pTableData, TSDB_MSG_FLG_ENCODE); - pTableData->aRowP = taosArrayDestroy(pTableData->aRowP); taosArrayDestroy(pVals); + tqError("s-task:%s build rows for submit failed, ts:%"PRId64, id, ts); return code; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 2352d3a555..8b2e9693eb 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -195,12 +195,15 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM const char* idstr = pTask->id.idStr; if (pMeta->updateInfo.transId != req.transId) { - pMeta->updateInfo.transId = req.transId; - tqInfo("s-task:%s receive new trans to update nodeEp msg from mnode, transId:%d", idstr, req.transId); + ASSERT(req.transId > pMeta->updateInfo.transId); + tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr, + vgId, req.transId, pMeta->updateInfo.transId); + // info needs to be kept till the new trans to update the nodeEp arrived. taosHashClear(pMeta->updateInfo.pTasks); + pMeta->updateInfo.transId = req.transId; } else { - tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", idstr, req.transId); + tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d", idstr, vgId, req.transId); } // duplicate update epset msg received, discard this redundant message diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 76f98b33c1..fd8b73b1f0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -130,7 +130,7 @@ static void tsdbClosePgCache(STsdb *pTsdb) { enum { LFLAG_LAST_ROW = 0, LFLAG_LAST = 1, - LFLAG_PRIMARY_KEY = (1 << 4), + LFLAG_PRIMARY_KEY = CACHESCAN_RETRIEVE_PK, }; typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 1d1009c15f..42b8365130 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -386,7 +386,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 goto _end; } - int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; + int8_t ltype = (pr->type & CACHESCAN_RETRIEVE_LAST) >> 3; + if (pr->rowKey.numOfPKs > 0) { + ltype |= CACHESCAN_RETRIEVE_PK; + } + STableKeyInfo* pTableList = pr->pTableList; // retrieve the only one last row of all tables in the uid list. diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index b7159225e1..985cdb9433 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -160,6 +160,9 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe // partition by tbname if (oneTableForEachGroup(pTableListInfo) || (totalTables == 1)) { pInfo->retrieveType = CACHESCAN_RETRIEVE_TYPE_ALL | SCAN_ROW_TYPE(pScanNode->ignoreNull); + if (pInfo->numOfPks > 0) { + pInfo->retrieveType |= CACHESCAN_RETRIEVE_PK; + } STableKeyInfo* pList = tableListGetInfo(pTableListInfo, 0); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 67b68f73ad..36886329ac 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -190,6 +190,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code)); + streamFreeQitem((SStreamQueueItem*)pBlock); return code; } } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 93ede2707b..891e0aa142 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -21,6 +21,7 @@ #define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data #define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms #define MIN_INVOKE_INTERVAL 50 // 50ms +#define FILL_HISTORY_TASK_EXEC_INTERVAL 5000 // 5 sec static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); @@ -46,6 +47,7 @@ static int32_t doOutputResultBlockImpl(SStreamTask* pTask, SStreamDataBlock* pBl ASSERT(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH); code = streamTaskPutDataIntoOutputQ(pTask, pBlock); if (code != TSDB_CODE_SUCCESS) { + destroyStreamDataBlock(pBlock); return code; } @@ -76,7 +78,6 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks); if (code != TSDB_CODE_SUCCESS) { // back pressure and record position - destroyStreamDataBlock(pStreamBlocks); return code; } @@ -244,6 +245,10 @@ static void streamScanHistoryDataImpl(SStreamTask* pTask, SArray* pRes, int32_t* } } +static SScanhistoryDataInfo buildScanhistoryExecRet(EScanHistoryCode code, int32_t idleTime) { + return (SScanhistoryDataInfo){code, idleTime}; +} + SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); @@ -260,7 +265,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { if (streamTaskShouldPause(pTask)) { stDebug("s-task:%s paused from the scan-history task", id); // quit from step1, not continue to handle the step2 - return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); @@ -275,7 +280,7 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { if(streamTaskShouldStop(pTask)) { taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes); - return (SScanhistoryDataInfo){TASK_SCANHISTORY_QUIT, 0}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_QUIT, 0); } // dispatch the generated results @@ -285,38 +290,21 @@ SScanhistoryDataInfo streamScanHistoryData(SStreamTask* pTask, int64_t st) { // downstream task input queue is full, try in 5sec if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED && (pTask->info.fillHistory == 1)) { - return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 5000}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, FILL_HISTORY_TASK_EXEC_INTERVAL); } if (finished) { - return (SScanhistoryDataInfo){TASK_SCANHISTORY_CONT, 0}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_CONT, 0); } if (el >= STREAM_SCAN_HISTORY_TIMESLICE && (pTask->info.fillHistory == 1)) { stDebug("s-task:%s fill-history:%d time slice exhausted, elapsed time:%.2fs, retry in 100ms", id, pTask->info.fillHistory, el / 1000.0); - return (SScanhistoryDataInfo){TASK_SCANHISTORY_REXEC, 100}; + return buildScanhistoryExecRet(TASK_SCANHISTORY_REXEC, 100); } } } -// wait for the stream task to be idle -static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) { - const char* id = pTask->id.idStr; - - int64_t st = taosGetTimestampMs(); - while (!streamTaskIsIdle(pStreamTask)) { - stDebug("s-task:%s level:%d wait for stream task:%s to be idle, check again in 100ms", id, pTask->info.taskLevel, - pStreamTask->id.idStr); - taosMsleep(100); - } - - double el = (taosGetTimestampMs() - st) / 1000.0; - if (el > 0) { - stDebug("s-task:%s wait for stream task:%s for %.2fs to be idle", id, pStreamTask->id.idStr, el); - } -} - int32_t streamTransferStateDoPrepare(SStreamTask* pTask) { SStreamMeta* pMeta = pTask->pMeta; const char* id = pTask->id.idStr;