This commit is contained in:
chenhaoran 2023-12-29 11:19:41 +08:00
commit f90694e726
34 changed files with 676 additions and 2226 deletions

View File

@ -355,6 +355,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL TAOS_DEF_ERROR_CODE(0, 0x03D5) #define TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL TAOS_DEF_ERROR_CODE(0, 0x03D5)
#define TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED TAOS_DEF_ERROR_CODE(0, 0x03D6) //internal #define TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED TAOS_DEF_ERROR_CODE(0, 0x03D6) //internal
#define TSDB_CODE_MND_TRANS_SYNC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x03D7) #define TSDB_CODE_MND_TRANS_SYNC_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x03D7)
#define TSDB_CODE_MND_TRANS_CTX_SWITCH TAOS_DEF_ERROR_CODE(0, 0x03D8)
#define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03DF) #define TSDB_CODE_MND_TRANS_UNKNOW_ERROR TAOS_DEF_ERROR_CODE(0, 0x03DF)
// mnode-mq // mnode-mq

View File

@ -108,11 +108,11 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_SNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CONFIG_DNODE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_MNODE_TYPE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE_RSP, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_SYNC_CONFIG_CHANGE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CONNECT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_ACCT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -97,7 +97,7 @@ SSdbRaw *mndTransEncode(STrans *pTrans);
SSdbRow *mndTransDecode(SSdbRaw *pRaw); SSdbRow *mndTransDecode(SSdbRaw *pRaw);
void mndTransDropData(STrans *pTrans); void mndTransDropData(STrans *pTrans);
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans); bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -290,11 +290,11 @@ int32_t mndAddCompactDetailToTran(SMnode *pMnode, STrans *pTrans, SCompactObj* p
SSdbRaw *pVgRaw = mndCompactDetailActionEncode(&compactDetail); SSdbRaw *pVgRaw = mndCompactDetailActionEncode(&compactDetail);
if (pVgRaw == NULL) return -1; if (pVgRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pVgRaw) != 0) { if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw); sdbFreeRaw(pVgRaw);
return -1; return -1;
} }
(void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
return 0; return 0;
} }

View File

@ -1215,7 +1215,7 @@ int32_t mndDropSubByTopic(SMnode *pMnode, STrans *pTrans, const char *topicName)
return -1; return -1;
} }
if (mndSetDropSubRedoLogs(pMnode, pTrans, pSub) < 0) { if (mndSetDropSubCommitLogs(pMnode, pTrans, pSub) < 0) {
sdbRelease(pSdb, pSub); sdbRelease(pSdb, pSub);
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return -1; return -1;

View File

@ -180,7 +180,7 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
goto _OUT; goto _OUT;
} }
mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 mInfo("trans:%d, process sync proposal, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64
" role:%s raw:%p sec:%d seq:%" PRId64, " role:%s raw:%p sec:%d seq:%" PRId64,
transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state), transId, pMgmt->transId, pMeta->code, pMeta->index, pMeta->term, pMeta->lastConfigIndex, syncStr(pMeta->state),
pRaw, pMgmt->transSec, pMgmt->transSeq); pRaw, pMgmt->transSec, pMgmt->transSeq);
@ -208,15 +208,11 @@ int32_t mndProcessWriteMsg(SMnode *pMnode, SRpcMsg *pMsg, SFsmCbMeta *pMeta) {
} }
if (pTrans->stage == TRN_STAGE_PREPARE) { if (pTrans->stage == TRN_STAGE_PREPARE) {
bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans); bool continueExec = mndTransPerformPrepareStage(pMnode, pTrans, false);
if (!continueExec) goto _OUT; if (!continueExec) goto _OUT;
} }
if (pTrans->id != pMgmt->transId) { mndTransRefresh(pMnode, pTrans);
mInfo("trans:%d, execute in mnode which not leader or sync timeout, createTime:%" PRId64 " saved trans:%d",
pTrans->id, pTrans->createdTime, pMgmt->transId);
mndTransRefresh(pMnode, pTrans);
}
sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex); sdbSetApplyInfo(pMnode->pSdb, pMeta->index, pMeta->term, pMeta->lastConfigIndex);
sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta); sdbWriteFile(pMnode->pSdb, tsMndSdbWriteDelta);
@ -234,6 +230,7 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
goto _OUT; goto _OUT;
} }
int32_t transId = pMgmt->transId;
pMgmt->transId = 0; pMgmt->transId = 0;
pMgmt->transSec = 0; pMgmt->transSec = 0;
pMgmt->transSeq = 0; pMgmt->transSeq = 0;
@ -241,9 +238,9 @@ static int32_t mndPostMgmtCode(SMnode *pMnode, int32_t code) {
tsem_post(&pMgmt->syncSem); tsem_post(&pMgmt->syncSem);
if (pMgmt->errCode != 0) { if (pMgmt->errCode != 0) {
mError("trans:%d, failed to propose since %s, post sem", pMgmt->transId, tstrerror(pMgmt->errCode)); mError("trans:%d, failed to propose since %s, post sem", transId, tstrerror(pMgmt->errCode));
} else { } else {
mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, pMgmt->transId, pMgmt->transSeq); mInfo("trans:%d, is proposed and post sem, seq:%" PRId64, transId, pMgmt->transSeq);
} }
_OUT: _OUT:
@ -542,7 +539,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
taosThreadMutexLock(&pMgmt->lock); taosThreadMutexLock(&pMgmt->lock);
pMgmt->errCode = 0; pMgmt->errCode = 0;
if (pMgmt->transId != 0 /* && pMgmt->transId != transId*/) { if (pMgmt->transId != 0) {
mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId); mError("trans:%d, can't be proposed since trans:%d already waiting for confirm", transId, pMgmt->transId);
taosThreadMutexUnlock(&pMgmt->lock); taosThreadMutexUnlock(&pMgmt->lock);
rpcFreeCont(req.pCont); rpcFreeCont(req.pCont);

View File

@ -36,21 +36,25 @@ static int32_t mndTransAppendLog(SArray *pArray, SSdbRaw *pRaw);
static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction); static int32_t mndTransAppendAction(SArray *pArray, STransAction *pAction);
static void mndTransDropLogs(SArray *pArray); static void mndTransDropLogs(SArray *pArray);
static void mndTransDropActions(SArray *pArray); static void mndTransDropActions(SArray *pArray);
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray);
static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf);
static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf);
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoLogs(SMnode *pMnode, STrans *pTrans, bool topHalf);
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf);
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans); static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformUndoLogStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans); static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndCannotExecuteTransAction(SMnode *pMnode) { return !pMnode->deploy && !mndIsLeader(pMnode); } static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf);
static bool mndCannotExecuteTransAction(SMnode *pMnode, bool topHalf) {
return (!pMnode->deploy && !mndIsLeader(pMnode)) || !topHalf;
}
static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans); static void mndTransSendRpcRsp(SMnode *pMnode, STrans *pTrans);
static int32_t mndProcessTransTimer(SRpcMsg *pReq); static int32_t mndProcessTransTimer(SRpcMsg *pReq);
@ -1090,8 +1094,9 @@ static void mndTransResetActions(SMnode *pMnode, STrans *pTrans, SArray *pArray)
} }
} }
static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->rawWritten) return 0; if (pAction->rawWritten) return 0;
if (topHalf) return TSDB_CODE_MND_TRANS_CTX_SWITCH;
int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw); int32_t code = sdbWriteWithoutFree(pMnode->pSdb, pAction->pRaw);
if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { if (code == 0 || terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
@ -1112,9 +1117,9 @@ static int32_t mndTransWriteSingleLog(SMnode *pMnode, STrans *pTrans, STransActi
return code; return code;
} }
static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->msgSent) return 0; if (pAction->msgSent) return 0;
if (mndCannotExecuteTransAction(pMnode)) return -1; if (mndCannotExecuteTransAction(pMnode, topHalf)) return TSDB_CODE_MND_TRANS_CTX_SWITCH;
int64_t signature = pTrans->id; int64_t signature = pTrans->id;
signature = (signature << 32); signature = (signature << 32);
@ -1159,7 +1164,8 @@ static int32_t mndTransSendSingleMsg(SMnode *pMnode, STrans *pTrans, STransActio
return code; return code;
} }
static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (!topHalf) return TSDB_CODE_MND_TRANS_CTX_SWITCH;
pAction->rawWritten = 0; pAction->rawWritten = 0;
pAction->errCode = 0; pAction->errCode = 0;
mInfo("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id); mInfo("trans:%d, %s:%d confirm action executed", pTrans->id, mndTransStr(pAction->stage), pAction->id);
@ -1168,34 +1174,39 @@ static int32_t mndTransExecNullMsg(SMnode *pMnode, STrans *pTrans, STransAction
return 0; return 0;
} }
static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { static int32_t mndTransExecSingleAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction, bool topHalf) {
if (pAction->actionType == TRANS_ACTION_RAW) { if (pAction->actionType == TRANS_ACTION_RAW) {
return mndTransWriteSingleLog(pMnode, pTrans, pAction); return mndTransWriteSingleLog(pMnode, pTrans, pAction, topHalf);
} else if (pAction->actionType == TRANS_ACTION_MSG) { } else if (pAction->actionType == TRANS_ACTION_MSG) {
return mndTransSendSingleMsg(pMnode, pTrans, pAction); return mndTransSendSingleMsg(pMnode, pTrans, pAction, topHalf);
} else { } else {
return mndTransExecNullMsg(pMnode, pTrans, pAction); return mndTransExecNullMsg(pMnode, pTrans, pAction, topHalf);
} }
} }
static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { static int32_t mndTransExecSingleActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) {
int32_t numOfActions = taosArrayGetSize(pArray); int32_t numOfActions = taosArrayGetSize(pArray);
int32_t code = 0; int32_t code = 0;
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pArray, action); STransAction *pAction = taosArrayGet(pArray, action);
code = mndTransExecSingleAction(pMnode, pTrans, pAction); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
if (code != 0) break; if (code != 0) {
mInfo("trans:%d, action:%d not executed since %s. numOfActions:%d", pTrans->id, action, tstrerror(code),
numOfActions);
break;
}
} }
return code; return code;
} }
static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray) { static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pArray, bool topHalf) {
int32_t numOfActions = taosArrayGetSize(pArray); int32_t numOfActions = taosArrayGetSize(pArray);
int32_t code = 0;
if (numOfActions == 0) return 0; if (numOfActions == 0) return 0;
if (mndTransExecSingleActions(pMnode, pTrans, pArray) != 0) { if ((code = mndTransExecSingleActions(pMnode, pTrans, pArray, topHalf)) != 0) {
return -1; return -1;
} }
@ -1248,31 +1259,31 @@ static int32_t mndTransExecuteActions(SMnode *pMnode, STrans *pTrans, SArray *pA
} }
} }
static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteRedoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->redoActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno); mError("failed to execute redoActions since:%s, code:0x%x", terrstr(), terrno);
} }
return code; return code;
} }
static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteUndoActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->undoActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute undoActions since %s", terrstr()); mError("failed to execute undoActions since %s", terrstr());
} }
return code; return code;
} }
static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteCommitActions(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions); int32_t code = mndTransExecuteActions(pMnode, pTrans, pTrans->commitActions, topHalf);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
mError("failed to execute commitActions since %s", terrstr()); mError("failed to execute commitActions since %s", terrstr());
} }
return code; return code;
} }
static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans) { static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans, bool topHalf) {
int32_t code = 0; int32_t code = 0;
int32_t numOfActions = taosArrayGetSize(pTrans->redoActions); int32_t numOfActions = taosArrayGetSize(pTrans->redoActions);
if (numOfActions == 0) return code; if (numOfActions == 0) return code;
@ -1289,7 +1300,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) { for (int32_t action = pTrans->redoActionPos; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos); STransAction *pAction = taosArrayGet(pTrans->redoActions, pTrans->redoActionPos);
code = mndTransExecSingleAction(pMnode, pTrans, pAction); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
if (code == 0) { if (code == 0) {
if (pAction->msgSent) { if (pAction->msgSent) {
if (pAction->msgReceived) { if (pAction->msgReceived) {
@ -1317,14 +1328,16 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
} }
mndSetTransLastAction(pTrans, pAction); mndSetTransLastAction(pTrans, pAction);
if (mndCannotExecuteTransAction(pMnode)) break; if (mndCannotExecuteTransAction(pMnode, topHalf)) break;
if (code == 0) { if (code == 0) {
pTrans->code = 0; pTrans->code = 0;
pTrans->redoActionPos++; pTrans->redoActionPos++;
mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage), mInfo("trans:%d, %s:%d is executed and need sync to other mnodes", pTrans->id, mndTransStr(pAction->stage),
pAction->id); pAction->id);
taosThreadMutexUnlock(&pTrans->mutex);
code = mndTransSync(pMnode, pTrans); code = mndTransSync(pMnode, pTrans);
taosThreadMutexLock(&pTrans->mutex);
if (code != 0) { if (code != 0) {
pTrans->redoActionPos--; pTrans->redoActionPos--;
pTrans->code = terrno; pTrans->code = terrno;
@ -1357,7 +1370,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
return code; return code;
} }
bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) { bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true; bool continueExec = true;
int32_t code = 0; int32_t code = 0;
@ -1368,7 +1381,7 @@ bool mndTransPerformPrepareStage(SMnode *pMnode, STrans *pTrans) {
for (int32_t action = 0; action < numOfActions; ++action) { for (int32_t action = 0; action < numOfActions; ++action) {
STransAction *pAction = taosArrayGet(pTrans->prepareActions, action); STransAction *pAction = taosArrayGet(pTrans->prepareActions, action);
code = mndTransExecSingleAction(pMnode, pTrans, pAction); code = mndTransExecSingleAction(pMnode, pTrans, pAction, topHalf);
if (code != 0) { if (code != 0) {
mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d", pTrans->id, action, numOfActions); mError("trans:%d, failed to execute prepare action:%d, numOfActions:%d", pTrans->id, action, numOfActions);
return false; return false;
@ -1381,17 +1394,17 @@ _OVER:
return continueExec; return continueExec;
} }
static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true; bool continueExec = true;
int32_t code = 0; int32_t code = 0;
if (pTrans->exec == TRN_EXEC_SERIAL) { if (pTrans->exec == TRN_EXEC_SERIAL) {
code = mndTransExecuteRedoActionsSerial(pMnode, pTrans); code = mndTransExecuteRedoActionsSerial(pMnode, pTrans, topHalf);
} else { } else {
code = mndTransExecuteRedoActions(pMnode, pTrans); code = mndTransExecuteRedoActions(pMnode, pTrans, topHalf);
} }
if (mndCannotExecuteTransAction(pMnode)) return false; if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
terrno = code; terrno = code;
if (code == 0) { if (code == 0) {
@ -1431,8 +1444,8 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode)) return false; if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransCommit(pMnode, pTrans); int32_t code = mndTransCommit(pMnode, pTrans);
@ -1452,9 +1465,9 @@ static bool mndTransPerformCommitStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransExecuteCommitActions(pMnode, pTrans); int32_t code = mndTransExecuteCommitActions(pMnode, pTrans, topHalf);
if (code == 0) { if (code == 0) {
pTrans->code = 0; pTrans->code = 0;
@ -1471,9 +1484,9 @@ static bool mndTransPerformCommitActionStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransExecuteUndoActions(pMnode, pTrans); int32_t code = mndTransExecuteUndoActions(pMnode, pTrans, topHalf);
if (code == 0) { if (code == 0) {
pTrans->stage = TRN_STAGE_PRE_FINISH; pTrans->stage = TRN_STAGE_PRE_FINISH;
@ -1491,8 +1504,8 @@ static bool mndTransPerformUndoActionStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode)) return false; if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransRollback(pMnode, pTrans); int32_t code = mndTransRollback(pMnode, pTrans);
@ -1510,8 +1523,8 @@ static bool mndTransPerformRollbackStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
if (mndCannotExecuteTransAction(pMnode)) return false; if (mndCannotExecuteTransAction(pMnode, topHalf)) return false;
bool continueExec = true; bool continueExec = true;
int32_t code = mndTransPreFinish(pMnode, pTrans); int32_t code = mndTransPreFinish(pMnode, pTrans);
@ -1529,8 +1542,9 @@ static bool mndTransPerformPreFinishStage(SMnode *pMnode, STrans *pTrans) {
return continueExec; return continueExec;
} }
static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans) { static bool mndTransPerformFinishStage(SMnode *pMnode, STrans *pTrans, bool topHalf) {
bool continueExec = false; bool continueExec = false;
if (topHalf) return continueExec;
SSdbRaw *pRaw = mndTransEncode(pTrans); SSdbRaw *pRaw = mndTransEncode(pTrans);
if (pRaw == NULL) { if (pRaw == NULL) {
@ -1558,43 +1572,28 @@ void mndTransExecuteImp(SMnode *pMnode, STrans *pTrans, bool topHalf) {
pTrans->lastExecTime = taosGetTimestampMs(); pTrans->lastExecTime = taosGetTimestampMs();
switch (pTrans->stage) { switch (pTrans->stage) {
case TRN_STAGE_PREPARE: case TRN_STAGE_PREPARE:
continueExec = mndTransPerformPrepareStage(pMnode, pTrans); continueExec = mndTransPerformPrepareStage(pMnode, pTrans, topHalf);
break; break;
case TRN_STAGE_REDO_ACTION: case TRN_STAGE_REDO_ACTION:
continueExec = mndTransPerformRedoActionStage(pMnode, pTrans); continueExec = mndTransPerformRedoActionStage(pMnode, pTrans, topHalf);
break; break;
case TRN_STAGE_COMMIT: case TRN_STAGE_COMMIT:
if (topHalf) { continueExec = mndTransPerformCommitStage(pMnode, pTrans, topHalf);
continueExec = mndTransPerformCommitStage(pMnode, pTrans);
} else {
mInfo("trans:%d, can not commit since not leader", pTrans->id);
continueExec = false;
}
break; break;
case TRN_STAGE_COMMIT_ACTION: case TRN_STAGE_COMMIT_ACTION:
continueExec = mndTransPerformCommitActionStage(pMnode, pTrans); continueExec = mndTransPerformCommitActionStage(pMnode, pTrans, topHalf);
break; break;
case TRN_STAGE_ROLLBACK: case TRN_STAGE_ROLLBACK:
if (topHalf) { continueExec = mndTransPerformRollbackStage(pMnode, pTrans, topHalf);
continueExec = mndTransPerformRollbackStage(pMnode, pTrans);
} else {
mInfo("trans:%d, can not rollback since not leader", pTrans->id);
continueExec = false;
}
break; break;
case TRN_STAGE_UNDO_ACTION: case TRN_STAGE_UNDO_ACTION:
continueExec = mndTransPerformUndoActionStage(pMnode, pTrans); continueExec = mndTransPerformUndoActionStage(pMnode, pTrans, topHalf);
break; break;
case TRN_STAGE_PRE_FINISH: case TRN_STAGE_PRE_FINISH:
if (topHalf) { continueExec = mndTransPerformPreFinishStage(pMnode, pTrans, topHalf);
continueExec = mndTransPerformPreFinishStage(pMnode, pTrans);
} else {
mInfo("trans:%d, can not pre-finish since not leader", pTrans->id);
continueExec = false;
}
break; break;
case TRN_STAGE_FINISH: case TRN_STAGE_FINISH:
continueExec = mndTransPerformFinishStage(pMnode, pTrans); continueExec = mndTransPerformFinishStage(pMnode, pTrans, topHalf);
break; break;
default: default:
continueExec = false; continueExec = false;

View File

@ -256,18 +256,7 @@ void tsdbDelFileName(STsdb *pTsdb, SDelFile *pFile, char fname[]);
// tsdbFS.c ============================================================================================== // tsdbFS.c ==============================================================================================
int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback); int32_t tsdbFSOpen(STsdb *pTsdb, int8_t rollback);
int32_t tsdbFSClose(STsdb *pTsdb); int32_t tsdbFSClose(STsdb *pTsdb);
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS);
void tsdbFSDestroy(STsdbFS *pFS);
int32_t tDFileSetCmprFn(const void *p1, const void *p2);
int32_t tsdbFSCommit(STsdb *pTsdb);
int32_t tsdbFSRollback(STsdb *pTsdb);
int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFS);
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS);
void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS);
void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t); void tsdbGetCurrentFName(STsdb *pTsdb, char *current, char *current_t);
int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet);
int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile);
// tsdbReaderWriter.c ============================================================================================== // tsdbReaderWriter.c ==============================================================================================
// SDataFWriter // SDataFWriter
int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet); int32_t tsdbDataFWriterOpen(SDataFWriter **ppWriter, STsdb *pTsdb, SDFileSet *pSet);
@ -737,7 +726,6 @@ struct STsdbReadSnap {
SMemTable *pIMem; SMemTable *pIMem;
SQueryNode *pINode; SQueryNode *pINode;
TFileSetArray *pfSetArray; TFileSetArray *pfSetArray;
STsdbFS fs;
}; };
struct SDataFWriter { struct SDataFWriter {
@ -796,16 +784,16 @@ typedef struct {
} SSttTableRowsInfo; } SSttTableRowsInfo;
typedef struct SSttBlockLoadInfo { typedef struct SSttBlockLoadInfo {
SBlockDataInfo blockData[2]; // buffered block data SBlockDataInfo blockData[2]; // buffered block data
SArray *aSttBlk; SArray *aSttBlk;
int32_t currentLoadBlockIndex; int32_t currentLoadBlockIndex;
STSchema *pSchema; STSchema *pSchema;
int16_t *colIds; int16_t *colIds;
int32_t numOfCols; int32_t numOfCols;
bool checkRemainingRow; // todo: no assign value? bool checkRemainingRow; // todo: no assign value?
bool isLast; bool isLast;
bool sttBlockLoaded; bool sttBlockLoaded;
SSttTableRowsInfo info; SSttTableRowsInfo info;
SSttBlockLoadCostInfo cost; SSttBlockLoadCostInfo cost;
} SSttBlockLoadInfo; } SSttBlockLoadInfo;
@ -894,15 +882,15 @@ typedef struct {
_load_tomb_fn loadTombFn; _load_tomb_fn loadTombFn;
void *pReader; void *pReader;
void *idstr; void *idstr;
bool rspRows; // response the rows in stt-file, if possible bool rspRows; // response the rows in stt-file, if possible
} SMergeTreeConf; } SMergeTreeConf;
typedef struct SSttDataInfoForTable { typedef struct SSttDataInfoForTable {
SArray* pTimeWindowList; SArray *pTimeWindowList;
int64_t numOfRows; int64_t numOfRows;
} SSttDataInfoForTable; } SSttDataInfoForTable;
int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable* pTableInfo); int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoForTable *pTableInfo);
void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter); void tMergeTreeAddIter(SMergeTree *pMTree, SLDataIter *pIter);
bool tMergeTreeNext(SMergeTree *pMTree); bool tMergeTreeNext(SMergeTree *pMTree);
void tMergeTreePinSttBlock(SMergeTree *pMTree); void tMergeTreePinSttBlock(SMergeTree *pMTree);

View File

@ -95,7 +95,7 @@ typedef struct SQueryNode SQueryNode;
#define VNODE_RSMA2_DIR "rsma2" #define VNODE_RSMA2_DIR "rsma2"
#define VNODE_TQ_STREAM "stream" #define VNODE_TQ_STREAM "stream"
#if SUSPEND_RESUME_TEST // only for test purpose #if SUSPEND_RESUME_TEST // only for test purpose
#define VNODE_BUFPOOL_SEGMENTS 1 #define VNODE_BUFPOOL_SEGMENTS 1
#else #else
#define VNODE_BUFPOOL_SEGMENTS 3 #define VNODE_BUFPOOL_SEGMENTS 3
@ -216,8 +216,6 @@ int32_t tsdbBegin(STsdb* pTsdb);
int32_t tsdbCacheCommit(STsdb* pTsdb); int32_t tsdbCacheCommit(STsdb* pTsdb);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync); int32_t tsdbRetention(STsdb* tsdb, int64_t now, int32_t sync);
// int32_t tsdbFinishCommit(STsdb* pTsdb);
// int32_t tsdbRollbackCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp); int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq2* pMsg, SSubmitRsp2* pRsp);
int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows); int32_t tsdbInsertTableData(STsdb* pTsdb, int64_t version, SSubmitTbData* pSubmitTbData, int32_t* affectedRows);

View File

@ -36,7 +36,6 @@ static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) { static void metaGetEntryInfo(const SMetaEntry *pEntry, SMetaInfo *pInfo) {
pInfo->uid = pEntry->uid; pInfo->uid = pEntry->uid;
pInfo->version = pEntry->version; pInfo->version = pEntry->version;
@ -562,6 +561,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
metaULock(pMeta); metaULock(pMeta);
metaDestroyTagIdxKey(pTagIdxKey); metaDestroyTagIdxKey(pTagIdxKey);
pTagIdxKey = NULL;
} }
nStbEntry.version = version; nStbEntry.version = version;
@ -692,6 +692,7 @@ int metaDropIndexFromSTable(SMeta *pMeta, int64_t version, SDropIndexReq *pReq)
tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn); tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn);
metaULock(pMeta); metaULock(pMeta);
metaDestroyTagIdxKey(pTagIdxKey); metaDestroyTagIdxKey(pTagIdxKey);
pTagIdxKey = NULL;
} }
// clear idx flag // clear idx flag
@ -1076,7 +1077,7 @@ static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx); return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
} }
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t* pSysTbl) { static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *pSuid, int8_t *pSysTbl) {
void *pData = NULL; void *pData = NULL;
int nData = 0; int nData = 0;
int rc = 0; int rc = 0;
@ -1146,6 +1147,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn); tdbTbDelete(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, pMeta->txn);
} }
metaDestroyTagIdxKey(pTagIdxKey); metaDestroyTagIdxKey(pTagIdxKey);
pTagIdxKey = NULL;
} }
} }
tDecoderClear(&tdc); tDecoderClear(&tdc);
@ -1865,6 +1867,7 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
} }
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn); tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
metaDestroyTagIdxKey(pTagIdxKey); metaDestroyTagIdxKey(pTagIdxKey);
pTagIdxKey = NULL;
} }
tdbTbcClose(pCtbIdxc); tdbTbcClose(pCtbIdxc);
return 0; return 0;
@ -2122,7 +2125,7 @@ int metaUpdateChangeTimeWithLock(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeM
if (!tsTtlChangeOnWrite) return 0; if (!tsTtlChangeOnWrite) return 0;
metaWLock(pMeta); metaWLock(pMeta);
int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs); int ret = metaUpdateChangeTime(pMeta, uid, changeTimeMs);
metaULock(pMeta); metaULock(pMeta);
return ret; return ret;
} }
@ -2228,15 +2231,14 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
nTagData = tDataTypes[pTagColumn->type].bytes; nTagData = tDataTypes[pTagColumn->type].bytes;
} }
if (pTagData != NULL) { if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type, pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) {
pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) { ret = -1;
ret = -1; goto end;
goto end;
}
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
} }
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
metaDestroyTagIdxKey(pTagIdxKey); metaDestroyTagIdxKey(pTagIdxKey);
pTagIdxKey = NULL;
} }
} }
end: end:

View File

@ -1292,10 +1292,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// check if the checkpoint msg already sent or not. // check if the checkpoint msg already sent or not.
if (status == TASK_STATUS__CK) { if (status == TASK_STATUS__CK) {
ASSERT(pTask->chkInfo.checkpointingId == req.checkpointId);
tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64 tqWarn("s-task:%s recv checkpoint-source msg again checkpointId:%" PRId64
" already received, ignore this msg and continue process checkpoint", " transId:%d already received, ignore this msg and continue process checkpoint",
pTask->id.idStr, pTask->chkInfo.checkpointingId); pTask->id.idStr, pTask->chkInfo.checkpointingId, req.transId);
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);

View File

@ -1183,8 +1183,10 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen); LRUHandle *h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[i], klen);
if (h) { if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
if (pLastCol->dirty && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { if (pLastCol->dirty) {
pLastCol->dirty = 0; pLastCol->dirty = 0;
}
if (pLastCol->ts <= eKey && pLastCol->ts >= sKey) {
erase = true; erase = true;
} }
taosLRUCacheRelease(pTsdb->lruCache, h, erase); taosLRUCacheRelease(pTsdb->lruCache, h, erase);
@ -1197,8 +1199,10 @@ int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKE
h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[num_keys + i], klen); h = taosLRUCacheLookup(pTsdb->lruCache, keys_list[num_keys + i], klen);
if (h) { if (h) {
SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h); SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pTsdb->lruCache, h);
if (pLastCol->dirty && (pLastCol->ts <= eKey && pLastCol->ts >= sKey)) { if (pLastCol->dirty) {
pLastCol->dirty = 0; pLastCol->dirty = 0;
}
if (pLastCol->ts <= eKey && pLastCol->ts >= sKey) {
erase = true; erase = true;
} }
taosLRUCacheRelease(pTsdb->lruCache, h, erase); taosLRUCacheRelease(pTsdb->lruCache, h, erase);

File diff suppressed because it is too large Load Diff

View File

@ -702,7 +702,7 @@ _exit:
} }
// EXPOSED APIS ==================================================================================== // EXPOSED APIS ====================================================================================
int32_t tsdbFSCommit(STsdb *pTsdb) { static int32_t tsdbFSCommit(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
STsdbFS fs = {0}; STsdbFS fs = {0};
@ -738,7 +738,7 @@ _exit:
return code; return code;
} }
int32_t tsdbFSRollback(STsdb *pTsdb) { static int32_t tsdbFSRollback(STsdb *pTsdb) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
@ -833,312 +833,3 @@ int32_t tsdbFSClose(STsdb *pTsdb) {
return code; return code;
} }
int32_t tsdbFSCopy(STsdb *pTsdb, STsdbFS *pFS) {
int32_t code = 0;
int32_t lino = 0;
pFS->pDelFile = NULL;
if (pFS->aDFileSet) {
taosArrayClear(pFS->aDFileSet);
} else {
pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet));
if (pFS->aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
if (pTsdb->fs.pDelFile) {
pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
if (pFS->pDelFile == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*pFS->pDelFile = *pTsdb->fs.pDelFile;
}
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid};
// head
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
if (fSet.pHeadF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*fSet.pHeadF = *pSet->pHeadF;
// data
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
if (fSet.pDataF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*fSet.pDataF = *pSet->pDataF;
// sma
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
if (fSet.pSmaF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*fSet.pSmaF = *pSet->pSmaF;
// stt
for (fSet.nSttF = 0; fSet.nSttF < pSet->nSttF; fSet.nSttF++) {
fSet.aSttF[fSet.nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (fSet.aSttF[fSet.nSttF] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
*fSet.aSttF[fSet.nSttF] = *pSet->aSttF[fSet.nSttF];
}
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
TSDB_CHECK_CODE(code, lino, _exit);
}
}
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tsdbFSUpsertDelFile(STsdbFS *pFS, SDelFile *pDelFile) {
int32_t code = 0;
if (pDelFile) {
if (pFS->pDelFile == NULL) {
pFS->pDelFile = (SDelFile *)taosMemoryMalloc(sizeof(SDelFile));
if (pFS->pDelFile == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
*pFS->pDelFile = *pDelFile;
} else {
if (pFS->pDelFile) {
taosMemoryFree(pFS->pDelFile);
pFS->pDelFile = NULL;
}
}
_exit:
return code;
}
int32_t tsdbFSUpsertFSet(STsdbFS *pFS, SDFileSet *pSet) {
int32_t code = 0;
int32_t idx = taosArraySearchIdx(pFS->aDFileSet, pSet, tDFileSetCmprFn, TD_GE);
if (idx < 0) {
idx = taosArrayGetSize(pFS->aDFileSet);
} else {
SDFileSet *pDFileSet = (SDFileSet *)taosArrayGet(pFS->aDFileSet, idx);
int32_t c = tDFileSetCmprFn(pSet, pDFileSet);
if (c == 0) {
*pDFileSet->pHeadF = *pSet->pHeadF;
*pDFileSet->pDataF = *pSet->pDataF;
*pDFileSet->pSmaF = *pSet->pSmaF;
// stt
if (pSet->nSttF > pDFileSet->nSttF) {
ASSERT(pSet->nSttF == pDFileSet->nSttF + 1);
pDFileSet->aSttF[pDFileSet->nSttF] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (pDFileSet->aSttF[pDFileSet->nSttF] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*pDFileSet->aSttF[pDFileSet->nSttF] = *pSet->aSttF[pSet->nSttF - 1];
pDFileSet->nSttF++;
} else if (pSet->nSttF < pDFileSet->nSttF) {
ASSERT(pSet->nSttF == 1);
for (int32_t iStt = 1; iStt < pDFileSet->nSttF; iStt++) {
taosMemoryFree(pDFileSet->aSttF[iStt]);
}
*pDFileSet->aSttF[0] = *pSet->aSttF[0];
pDFileSet->nSttF = 1;
} else {
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
*pDFileSet->aSttF[iStt] = *pSet->aSttF[iStt];
}
}
pDFileSet->diskId = pSet->diskId;
goto _exit;
}
}
ASSERT(pSet->nSttF == 1);
SDFileSet fSet = {.diskId = pSet->diskId, .fid = pSet->fid, .nSttF = 1};
// head
fSet.pHeadF = (SHeadFile *)taosMemoryMalloc(sizeof(SHeadFile));
if (fSet.pHeadF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*fSet.pHeadF = *pSet->pHeadF;
// data
fSet.pDataF = (SDataFile *)taosMemoryMalloc(sizeof(SDataFile));
if (fSet.pDataF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*fSet.pDataF = *pSet->pDataF;
// sma
fSet.pSmaF = (SSmaFile *)taosMemoryMalloc(sizeof(SSmaFile));
if (fSet.pSmaF == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*fSet.pSmaF = *pSet->pSmaF;
// stt
fSet.aSttF[0] = (SSttFile *)taosMemoryMalloc(sizeof(SSttFile));
if (fSet.aSttF[0] == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*fSet.aSttF[0] = *pSet->aSttF[0];
if (taosArrayInsert(pFS->aDFileSet, idx, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
_exit:
return code;
}
int32_t tsdbFSPrepareCommit(STsdb *pTsdb, STsdbFS *pFSNew) {
int32_t code = 0;
int32_t lino = 0;
char tfname[TSDB_FILENAME_LEN];
tsdbGetCurrentFName(pTsdb, NULL, tfname);
// gnrt CURRENT.t
code = tsdbSaveFSToFile(pFSNew, tfname);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at line %d since %s", TD_VID(pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tsdbFSRef(STsdb *pTsdb, STsdbFS *pFS) {
int32_t code = 0;
int32_t nRef;
pFS->aDFileSet = taosArrayInit(taosArrayGetSize(pTsdb->fs.aDFileSet), sizeof(SDFileSet));
if (pFS->aDFileSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
pFS->pDelFile = pTsdb->fs.pDelFile;
if (pFS->pDelFile) {
nRef = atomic_fetch_add_32(&pFS->pDelFile->nRef, 1);
ASSERT(nRef > 0);
}
SDFileSet fSet;
for (int32_t iSet = 0; iSet < taosArrayGetSize(pTsdb->fs.aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pTsdb->fs.aDFileSet, iSet);
fSet = *pSet;
nRef = atomic_fetch_add_32(&pSet->pHeadF->nRef, 1);
ASSERT(nRef > 0);
nRef = atomic_fetch_add_32(&pSet->pDataF->nRef, 1);
ASSERT(nRef > 0);
nRef = atomic_fetch_add_32(&pSet->pSmaF->nRef, 1);
ASSERT(nRef > 0);
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
nRef = atomic_fetch_add_32(&pSet->aSttF[iStt]->nRef, 1);
ASSERT(nRef > 0);
}
if (taosArrayPush(pFS->aDFileSet, &fSet) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
_exit:
return code;
}
void tsdbFSUnref(STsdb *pTsdb, STsdbFS *pFS) {
int32_t nRef;
char fname[TSDB_FILENAME_LEN];
if (pFS->pDelFile) {
nRef = atomic_sub_fetch_32(&pFS->pDelFile->nRef, 1);
ASSERT(nRef >= 0);
if (nRef == 0) {
tsdbDelFileName(pTsdb, pFS->pDelFile, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(pFS->pDelFile);
}
}
for (int32_t iSet = 0; iSet < taosArrayGetSize(pFS->aDFileSet); iSet++) {
SDFileSet *pSet = (SDFileSet *)taosArrayGet(pFS->aDFileSet, iSet);
// head
nRef = atomic_sub_fetch_32(&pSet->pHeadF->nRef, 1);
ASSERT(nRef >= 0);
if (nRef == 0) {
tsdbHeadFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pHeadF, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(pSet->pHeadF);
}
// data
nRef = atomic_sub_fetch_32(&pSet->pDataF->nRef, 1);
ASSERT(nRef >= 0);
if (nRef == 0) {
tsdbDataFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pDataF, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(pSet->pDataF);
}
// sma
nRef = atomic_sub_fetch_32(&pSet->pSmaF->nRef, 1);
ASSERT(nRef >= 0);
if (nRef == 0) {
tsdbSmaFileName(pTsdb, pSet->diskId, pSet->fid, pSet->pSmaF, fname);
(void)taosRemoveFile(fname);
taosMemoryFree(pSet->pSmaF);
}
// stt
for (int32_t iStt = 0; iStt < pSet->nSttF; iStt++) {
nRef = atomic_sub_fetch_32(&pSet->aSttF[iStt]->nRef, 1);
ASSERT(nRef >= 0);
if (nRef == 0) {
tsdbSttFileName(pTsdb, pSet->diskId, pSet->fid, pSet->aSttF[iStt], fname);
(void)taosRemoveFile(fname);
taosMemoryFree(pSet->aSttF[iStt]);
/* code */
}
}
}
taosArrayDestroy(pFS->aDFileSet);
}

View File

@ -22,8 +22,8 @@
#include "tsdbUtil2.h" #include "tsdbUtil2.h"
#include "tsimplehash.h" #include "tsimplehash.h"
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
#define getCurrentKeyInSttBlock(_r) ((_r)->currentKey) #define getCurrentKeyInSttBlock(_r) ((_r)->currentKey)
typedef struct { typedef struct {
bool overlapWithNeighborBlock; bool overlapWithNeighborBlock;
@ -41,7 +41,7 @@ static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, i
static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader);
static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts, static int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, int64_t ts,
SRowMerger* pMerger, SVersionRange* pVerRange, const char* id); SRowMerger* pMerger, SVersionRange* pVerRange, const char* id);
static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader); static int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, int64_t ts, SArray* pDelList, STsdbReader* pReader);
static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow, static int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pTSRow,
STableBlockScanInfo* pScanInfo); STableBlockScanInfo* pScanInfo);
@ -67,7 +67,7 @@ static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond
static int32_t doBuildDataBlock(STsdbReader* pReader); static int32_t doBuildDataBlock(STsdbReader* pReader);
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static bool hasDataInSttBlock(STableBlockScanInfo *pInfo); static bool hasDataInSttBlock(STableBlockScanInfo* pInfo);
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static void resetTableListIndex(SReaderStatus* pStatus); static void resetTableListIndex(SReaderStatus* pStatus);
@ -1138,7 +1138,7 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
return false; return false;
} }
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
STableDataBlockIdx* pTableDataBlockIdx = STableDataBlockIdx* pTableDataBlockIdx =
taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
@ -1316,17 +1316,17 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader);
double el = (taosGetTimestampUs() - st) / 1000.0; double el = (taosGetTimestampUs() - st) / 1000.0;
updateComposedBlockInfo(pReader, el, pBlockScanInfo); updateComposedBlockInfo(pReader, el, pBlockScanInfo);
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64 tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64
" - %" PRId64 ", uid:%" PRIu64 ", %s", " - %" PRId64 ", uid:%" PRIu64 ", %s",
pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pBlockScanInfo->uid,
pBlockScanInfo->uid, pReader->idStr); pReader->idStr);
pReader->cost.buildmemBlock += el; pReader->cost.buildmemBlock += el;
return code; return code;
@ -1390,13 +1390,9 @@ static bool nextRowFromSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockSc
} }
} }
static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { static void doPinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreePinSttBlock(&pSttBlockReader->mergeTree); }
tMergeTreePinSttBlock(&pSttBlockReader->mergeTree);
}
static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { static void doUnpinSttBlock(SSttBlockReader* pSttBlockReader) { tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree); }
tMergeTreeUnpinSttBlock(&pSttBlockReader->mergeTree);
}
static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader, static bool tryCopyDistinctRowFromSttBlock(TSDBROW* fRow, SSttBlockReader* pSttBlockReader,
STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader, STableBlockScanInfo* pScanInfo, int64_t ts, STsdbReader* pReader,
@ -1535,8 +1531,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
pReader->idStr);
} }
if (minKey == k.ts) { if (minKey == k.ts) {
@ -1585,8 +1580,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return code; return code;
} }
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
pReader->idStr);
} }
if (minKey == key) { if (minKey == key) {
@ -1648,7 +1642,7 @@ static int32_t doMergeFileBlockAndLastBlock(SSttBlockReader* pSttBlockReader, ST
TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
tsdbRowMergerAdd(pMerger, pRow1, NULL); tsdbRowMergerAdd(pMerger, pRow1, NULL);
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
pReader->idStr); pReader->idStr);
code = tsdbRowMergerGetRow(pMerger, &pTSRow); code = tsdbRowMergerGetRow(pMerger, &pTSRow);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1671,7 +1665,7 @@ static int32_t doMergeFileBlockAndLastBlock(SSttBlockReader* pSttBlockReader, ST
} }
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange, doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLastBlock, pMerger, &pReader->info.verRange,
pReader->idStr); pReader->idStr);
// merge with block data if ts == key // merge with block data if ts == key
if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) { if (tsLastBlock == pBlockData->aTSKEY[pDumpInfo->rowIndex]) {
@ -1740,7 +1734,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
// the following for key == tsLast // the following for key == tsLast
// ASC: file block ------> stt block // ASC: file block ------> stt block
// DESC: stt block ------> file block // DESC: stt block ------> file block
SRow* pTSRow = NULL; SRow* pTSRow = NULL;
if (ASCENDING_TRAVERSE(pReader->info.order)) { if (ASCENDING_TRAVERSE(pReader->info.order)) {
code = tsdbRowMergerAdd(pMerger, &fRow, NULL); code = tsdbRowMergerAdd(pMerger, &fRow, NULL);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
@ -1889,8 +1883,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
pReader->idStr);
} }
if (minKey == ik.ts) { if (minKey == ik.ts) {
@ -1948,8 +1941,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
return code; return code;
} }
doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, tsLast, pMerger, &pReader->info.verRange, pReader->idStr);
pReader->idStr);
} }
if (minKey == key) { if (minKey == key) {
@ -2120,7 +2112,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
}; };
SSttDataInfoForTable info = {.pTimeWindowList = taosArrayInit(4, sizeof(STimeWindow))}; SSttDataInfoForTable info = {.pTimeWindowList = taosArrayInit(4, sizeof(STimeWindow))};
int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info); int32_t code = tMergeTreeOpen2(&pSttBlockReader->mergeTree, &conf, &info);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
return false; return false;
} }
@ -2138,7 +2130,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
pScanInfo->sttWindow.ekey = INT64_MIN; pScanInfo->sttWindow.ekey = INT64_MIN;
// calculate the time window for data in stt files // calculate the time window for data in stt files
for(int32_t i = 0; i < taosArrayGetSize(info.pTimeWindowList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(info.pTimeWindowList); ++i) {
STimeWindow* pWindow = taosArrayGet(info.pTimeWindowList, i); STimeWindow* pWindow = taosArrayGet(info.pTimeWindowList, i);
if (pScanInfo->sttWindow.skey > pWindow->skey) { if (pScanInfo->sttWindow.skey > pWindow->skey) {
pScanInfo->sttWindow.skey = pWindow->skey; pScanInfo->sttWindow.skey = pWindow->skey;
@ -2149,8 +2141,9 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
} }
} }
pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList)? STT_FILE_HAS_DATA:STT_FILE_NO_DATA; pScanInfo->sttKeyInfo.status = taosArrayGetSize(info.pTimeWindowList) ? STT_FILE_HAS_DATA : STT_FILE_NO_DATA;
pScanInfo->sttKeyInfo.nextProcKey = ASCENDING_TRAVERSE(pReader->info.order)? pScanInfo->sttWindow.skey:pScanInfo->sttWindow.ekey; pScanInfo->sttKeyInfo.nextProcKey =
ASCENDING_TRAVERSE(pReader->info.order) ? pScanInfo->sttWindow.skey : pScanInfo->sttWindow.ekey;
hasData = true; hasData = true;
} else { } else {
hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange); hasData = nextRowFromSttBlocks(pSttBlockReader, pScanInfo, &pReader->info.verRange);
@ -2168,9 +2161,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
return hasData; return hasData;
} }
static bool hasDataInSttBlock(STableBlockScanInfo *pInfo) { static bool hasDataInSttBlock(STableBlockScanInfo* pInfo) { return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; }
return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA;
}
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) { if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
@ -2225,10 +2216,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc
} }
} }
int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pBlockScanInfo,
bool copied = false; STsdbReader* pReader) {
SRow* pTSRow = NULL; bool copied = false;
int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader); SRow* pTSRow = NULL;
int64_t tsLastBlock = getCurrentKeyInSttBlock(pSttBlockReader);
SRowMerger* pMerger = &pReader->status.merger; SRowMerger* pMerger = &pReader->status.merger;
TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree); TSDBROW* pRow = tMergeTreeGetRow(&pSttBlockReader->mergeTree);
@ -2559,16 +2551,16 @@ static void prepareDurationForNextFileSet(STsdbReader* pReader) {
pReader->status.bProcMemFirstFileset = false; pReader->status.bProcMemFirstFileset = false;
} }
int32_t fid = pReader->status.pCurrentFileset->fid; int32_t fid = pReader->status.pCurrentFileset->fid;
STimeWindow winFid = {0}; STimeWindow winFid = {0};
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &winFid.skey, &winFid.ekey); tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &winFid.skey, &winFid.ekey);
if (ASCENDING_TRAVERSE(pReader->info.order)) { if (ASCENDING_TRAVERSE(pReader->info.order)) {
pReader->status.bProcMemPreFileset = !(pReader->status.memTableMaxKey < pReader->status.prevFilesetStartKey || pReader->status.bProcMemPreFileset = !(pReader->status.memTableMaxKey < pReader->status.prevFilesetStartKey ||
(winFid.skey-1) < pReader->status.memTableMinKey); (winFid.skey - 1) < pReader->status.memTableMinKey);
} else { } else {
pReader->status.bProcMemPreFileset = !( pReader->status.memTableMaxKey < (winFid.ekey+1) || pReader->status.bProcMemPreFileset = !(pReader->status.memTableMaxKey < (winFid.ekey + 1) ||
pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey); pReader->status.prevFilesetEndKey < pReader->status.memTableMinKey);
} }
if (pReader->status.bProcMemPreFileset) { if (pReader->status.bProcMemPreFileset) {
@ -2802,7 +2794,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) { static bool notOverlapWithSttFiles(SFileDataBlockInfo* pBlockInfo, STableBlockScanInfo* pScanInfo, bool asc) {
ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT); ASSERT(pScanInfo->sttKeyInfo.status != STT_FILE_READER_UNINIT);
if(pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) { if (pScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA) {
return true; return true;
} else { } else {
int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey; int64_t keyInStt = pScanInfo->sttKeyInfo.nextProcKey;
@ -2923,7 +2915,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_t endKey) { static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_t endKey) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid, pReader->idStr); tsdbDebug("seq load data blocks from cache that preceeds fileset %d, %s", pReader->status.pCurrentFileset->fid,
pReader->idStr);
while (1) { while (1) {
if (pReader->code != TSDB_CODE_SUCCESS) { if (pReader->code != TSDB_CODE_SUCCESS) {
@ -3847,7 +3840,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow]; pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow];
} }
// no data in buffer, return immediately // no data in buffer, return immediately
if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) { if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
break; break;
@ -3945,7 +3937,7 @@ static int32_t doOpenReaderImpl(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if (pStatus->fileIter.numOfFiles == 0) { if (pStatus->fileIter.numOfFiles == 0) {
pStatus->loadFromFile = false; pStatus->loadFromFile = false;
// } else if (READER_EXEC_DATA == pReader->info.readMode) { // } else if (READER_EXEC_DATA == pReader->info.readMode) {
// DO NOTHING // DO NOTHING
} else { } else {
code = initForFirstBlockInFile(pReader, pBlockIter); code = initForFirstBlockInFile(pReader, pBlockIter);
@ -4092,7 +4084,7 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
} }
pReader->flag = READER_STATUS_SUSPEND; pReader->flag = READER_STATUS_SUSPEND;
pReader->info.execMode = pCond->notLoadData? READER_EXEC_ROWS : READER_EXEC_DATA; pReader->info.execMode = pCond->notLoadData ? READER_EXEC_ROWS : READER_EXEC_DATA;
pReader->pIgnoreTables = pIgnoreTables; pReader->pIgnoreTables = pIgnoreTables;
tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64 tsdbDebug("%p total numOfTable:%d, window:%" PRId64 " - %" PRId64 ", verRange:%" PRId64 " - %" PRId64
@ -4157,7 +4149,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
} }
SReadCostSummary* pCost = &pReader->cost; SReadCostSummary* pCost = &pReader->cost;
SFilesetIter* pFilesetIter = &pReader->status.fileIter; SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pSttBlockReader != NULL) { if (pFilesetIter->pSttBlockReader != NULL) {
SSttBlockReader* pLReader = pFilesetIter->pSttBlockReader; SSttBlockReader* pLReader = pFilesetIter->pSttBlockReader;
tMergeTreeClose(&pLReader->mergeTree); tMergeTreeClose(&pLReader->mergeTree);
@ -4204,7 +4196,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
STableBlockScanInfo* pBlockScanInfo = NULL; STableBlockScanInfo* pBlockScanInfo = NULL;
pReader->status.suspendInvoked = true; // record the suspend status pReader->status.suspendInvoked = true; // record the suspend status
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
@ -4227,7 +4219,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
// resetDataBlockScanInfo excluding lastKey // resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL; STableBlockScanInfo** p = NULL;
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1:-1; int32_t step = ASCENDING_TRAVERSE(pReader->info.order) ? 1 : -1;
int32_t iter = 0; int32_t iter = 0;
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) { while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
@ -4248,7 +4240,7 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
pReader->flag = READER_STATUS_SUSPEND; pReader->flag = READER_STATUS_SUSPEND;
#if SUSPEND_RESUME_TEST #if SUSPEND_RESUME_TEST
tsem_post(&pReader->resumeAfterSuspend); tsem_post(&pReader->resumeAfterSuspend);
#endif #endif
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0, tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
@ -4331,13 +4323,13 @@ _err:
} }
static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) { static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
int32_t fid = pReader->status.pCurrentFileset->fid; int32_t fid = pReader->status.pCurrentFileset->fid;
STimeWindow win = {0}; STimeWindow win = {0};
tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey); tsdbFidKeyRange(fid, pReader->pTsdb->keepCfg.days, pReader->pTsdb->keepCfg.precision, &win.skey, &win.ekey);
int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey; int64_t endKey = (ASCENDING_TRAVERSE(pReader->info.order)) ? win.skey : win.ekey;
@ -4359,8 +4351,8 @@ static int32_t buildFromPreFilesetBuffer(STsdbReader* pReader) {
static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) { static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
if (pStatus->bProcMemPreFileset) { if (pStatus->bProcMemPreFileset) {
@ -4375,12 +4367,12 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
return code; return code;
} }
tsdbTrace("block from file rows: %"PRId64", will process pre-file set buffer: %d. %s", tsdbTrace("block from file rows: %" PRId64 ", will process pre-file set buffer: %d. %s", pBlock->info.rows,
pBlock->info.rows, pStatus->bProcMemFirstFileset, pReader->idStr); pStatus->bProcMemFirstFileset, pReader->idStr);
if (pStatus->bProcMemPreFileset) { if (pStatus->bProcMemPreFileset) {
if (pBlock->info.rows > 0) { if (pBlock->info.rows > 0) {
if (pReader->notifyFn) { if (pReader->notifyFn) {
int32_t fid = pReader->status.pCurrentFileset->fid; int32_t fid = pReader->status.pCurrentFileset->fid;
STsdReaderNotifyInfo info = {0}; STsdReaderNotifyInfo info = {0};
info.duration.filesetId = fid; info.duration.filesetId = fid;
pReader->notifyFn(TSD_READER_NOTIFY_NEXT_DURATION_BLOCK, &info, pReader->notifyParam); pReader->notifyFn(TSD_READER_NOTIFY_NEXT_DURATION_BLOCK, &info, pReader->notifyParam);
@ -4404,8 +4396,8 @@ static int32_t doTsdbNextDataBlockFilesetDelimited(STsdbReader* pReader) {
static int32_t doTsdbNextDataBlockFilesFirst(STsdbReader* pReader) { static int32_t doTsdbNextDataBlockFilesFirst(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
code = buildBlockFromFiles(pReader); code = buildBlockFromFiles(pReader);
@ -4940,7 +4932,7 @@ static void getMemTableTimeRange(STsdbReader* pReader, int64_t* pMaxKey, int64_t
int64_t minKey = INT64_MAX; int64_t minKey = INT64_MAX;
void* pHashIter = tSimpleHashIterate(pStatus->pTableMap, NULL, &iter); void* pHashIter = tSimpleHashIterate(pStatus->pTableMap, NULL, &iter);
while (pHashIter!= NULL) { while (pHashIter != NULL) {
STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pHashIter; STableBlockScanInfo* pBlockScanInfo = *(STableBlockScanInfo**)pHashIter;
STbData* d = NULL; STbData* d = NULL;
@ -5144,7 +5136,6 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact
tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive); tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive);
} }
tsdbFSUnref(pTsdb, &pSnap->fs);
if (pSnap->pNode) taosMemoryFree(pSnap->pNode); if (pSnap->pNode) taosMemoryFree(pSnap->pNode);
if (pSnap->pINode) taosMemoryFree(pSnap->pINode); if (pSnap->pINode) taosMemoryFree(pSnap->pINode);
@ -5165,9 +5156,7 @@ void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) {
void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/ void tsdbReaderSetCloseFlag(STsdbReader* pReader) { /*pReader->code = TSDB_CODE_TSC_QUERY_CANCELLED;*/
} }
void tsdbSetFilesetDelimited(STsdbReader* pReader) { void tsdbSetFilesetDelimited(STsdbReader* pReader) { pReader->bFilesetDelimited = true; }
pReader->bFilesetDelimited = true;
}
void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) { void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn notifyFn, void* param) {
pReader->notifyFn = notifyFn; pReader->notifyFn = notifyFn;

View File

@ -845,6 +845,7 @@ void resetWinRange(STimeWindow* winRange);
bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts); bool checkExpiredData(SStateStore* pAPI, SUpdateInfo* pUpdateInfo, STimeWindowAggSupp* pTwSup, uint64_t tableId, TSKEY ts);
int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval); int64_t getDeleteMark(SWindowPhysiNode* pWinPhyNode, int64_t interval);
void resetUnCloseSessionWinInfo(SSHashObj* winMap); void resetUnCloseSessionWinInfo(SSHashObj* winMap);
void setStreamOperatorCompleted(struct SOperatorInfo* pOperator);
int32_t encodeSSessionKey(void** buf, SSessionKey* key); int32_t encodeSSessionKey(void** buf, SSessionKey* key);
void* decodeSSessionKey(void* buf, SSessionKey* key); void* decodeSSessionKey(void* buf, SSessionKey* key);

View File

@ -2244,8 +2244,11 @@ void printDataBlock(SSDataBlock* pBlock, const char* flag, const char* taskIdStr
} }
void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) { void printSpecDataBlock(SSDataBlock* pBlock, const char* flag, const char* opStr, const char* taskIdStr) {
if (!pBlock || pBlock->info.rows == 0) { if (!pBlock) {
qDebug("%s===stream===%s: Block is Null or Empty", taskIdStr, flag); qDebug("%s===stream===%s: Block is Null", taskIdStr, flag);
return;
} else if (pBlock->info.rows == 0) {
qDebug("%s===stream===%s: Block is Empty. type:%d", taskIdStr, flag, pBlock->info.type);
return; return;
} }
if (qDebugFlag & DEBUG_DEBUG) { if (qDebugFlag & DEBUG_DEBUG) {

View File

@ -1997,7 +1997,6 @@ void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) {
int32_t len = streamScanOperatorEncode(pInfo, &pBuf); int32_t len = streamScanOperatorEncode(pInfo, &pBuf);
pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len);
taosMemoryFree(pBuf); taosMemoryFree(pBuf);
pInfo->stateStore.streamStateCommit(pInfo->pState);
} }
// other properties are recovered from the execution plan // other properties are recovered from the execution plan
@ -2316,6 +2315,7 @@ FETCH_NEXT_BLOCK:
doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes); doCheckUpdate(pInfo, pBlockInfo->window.ekey, pInfo->pRes);
doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL); doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, NULL);
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows; int64_t numOfUpdateRes = pInfo->pUpdateDataRes->info.rows;
qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes); qDebug("%s %" PRId64 " rows in datablock, update res:%" PRId64, id, pBlockInfo->rows, numOfUpdateRes);

View File

@ -480,18 +480,18 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
return resBlock; return resBlock;
} }
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
if (pInfo->reCkBlock) { if (pInfo->reCkBlock) {
pInfo->reCkBlock = false; pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes; return pInfo->pCheckpointRes;
} }
if (pInfo->recvGetAll) { setStreamOperatorCompleted(pOperator);
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -561,7 +561,7 @@ static SSDataBlock* doStreamEventAgg(SOperatorInfo* pOperator) {
if (resBlock != NULL) { if (resBlock != NULL) {
return resBlock; return resBlock;
} }
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
return NULL; return NULL;
} }

View File

@ -801,10 +801,24 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
SRowBuffPos* pResPos = NULL; SRowBuffPos* pResPos = NULL;
SResultRow* pResult = NULL; SResultRow* pResult = NULL;
int32_t forwardRows = 0; int32_t forwardRows = 0;
int32_t endRowId = pSDataBlock->info.rows - 1;
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
tsCols = (int64_t*)pColDataInfo->pData; tsCols = (int64_t*)pColDataInfo->pData;
if (pSDataBlock->info.window.skey != tsCols[0] || pSDataBlock->info.window.ekey != tsCols[endRowId]) {
qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64
",maxKey %" PRId64,
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
blockDataUpdateTsWindow(pSDataBlock, pInfo->primaryTsIndex);
// timestamp of the data is incorrect
if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
qError("table uid %" PRIu64 " data block timestamp is out of range! minKey %" PRId64 ",maxKey %" PRId64,
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
}
}
int32_t startPos = 0; int32_t startPos = 0;
TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols); TSKEY ts = getStartTsKey(&pSDataBlock->info.window, tsCols);
STimeWindow nextWin = {0}; STimeWindow nextWin = {0};
@ -902,19 +916,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat
pInfo->delKey = key; pInfo->delKey = key;
} }
int32_t prevEndPos = (forwardRows - 1) * step + startPos; int32_t prevEndPos = (forwardRows - 1) * step + startPos;
if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64
",maxKey %" PRId64,
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
blockDataUpdateTsWindow(pSDataBlock, 0);
// timestamp of the data is incorrect
if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) {
qError("table uid %" PRIu64 " data block timestamp is out of range! minKey %" PRId64 ",maxKey %" PRId64,
pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey);
}
}
if (IS_FINAL_INTERVAL_OP(pOperator)) { if (IS_FINAL_INTERVAL_OP(pOperator)) {
startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos); startPos = getNextQualifiedFinalWindow(&pInfo->interval, &nextWin, &pSDataBlock->info, tsCols, prevEndPos);
} else { } else {
@ -1223,7 +1224,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
return pInfo->pCheckpointRes; return pInfo->pCheckpointRes;
} }
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
if (!IS_FINAL_INTERVAL_OP(pOperator)) { if (!IS_FINAL_INTERVAL_OP(pOperator)) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi interval operator clear disk buffer // semi interval operator clear disk buffer
@ -2343,10 +2344,6 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
} }
int32_t code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow); int32_t code = pAPI->stateStore.streamStateGetByPos(pState, pPos, (void**)&pRow);
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0);
break;
}
if (code == -1) { if (code == -1) {
// for history // for history
@ -2363,6 +2360,11 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
continue; continue;
} }
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
ASSERT(pBlock->info.rows > 0);
break;
}
pGroupResInfo->index += 1; pGroupResInfo->index += 1;
for (int32_t j = 0; j < numOfExprs; ++j) { for (int32_t j = 0; j < numOfExprs; ++j) {
@ -2609,18 +2611,18 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return opRes; return opRes;
} }
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
if (pInfo->reCkBlock) { if (pInfo->reCkBlock) {
pInfo->reCkBlock = false; pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes; return pInfo->pCheckpointRes;
} }
if (pInfo->recvGetAll) { setStreamOperatorCompleted(pOperator);
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -2718,7 +2720,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
return opRes; return opRes;
} }
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -3001,7 +3003,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi session operator clear disk buffer // semi session operator clear disk buffer
clearStreamSessionOperator(pInfo); clearStreamSessionOperator(pInfo);
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
pInfo->clearState = false; pInfo->clearState = false;
return NULL; return NULL;
} }
@ -3074,7 +3076,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
clearFunctionContext(&pOperator->exprSupp); clearFunctionContext(&pOperator->exprSupp);
// semi session operator clear disk buffer // semi session operator clear disk buffer
clearStreamSessionOperator(pInfo); clearStreamSessionOperator(pInfo);
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -3551,18 +3553,18 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
return resBlock; return resBlock;
} }
if (pInfo->recvGetAll) {
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
if (pInfo->reCkBlock) { if (pInfo->reCkBlock) {
pInfo->reCkBlock = false; pInfo->reCkBlock = false;
printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo)); printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes; return pInfo->pCheckpointRes;
} }
if (pInfo->recvGetAll) { setStreamOperatorCompleted(pOperator);
pInfo->recvGetAll = false;
resetUnCloseSessionWinInfo(pInfo->streamAggSup.pResultRows);
}
setOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -3628,7 +3630,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
if (resBlock != NULL) { if (resBlock != NULL) {
return resBlock; return resBlock;
} }
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -3864,11 +3866,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
if (pInfo->reCkBlock) { if (pInfo->reCkBlock) {
pInfo->reCkBlock = false; pInfo->reCkBlock = false;
// printDataBlock(pInfo->pCheckpointRes, "single interval ck"); printDataBlock(pInfo->pCheckpointRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
return pInfo->pCheckpointRes; return pInfo->pCheckpointRes;
} }
setOperatorCompleted(pOperator); setStreamOperatorCompleted(pOperator);
return NULL; return NULL;
} }
@ -3900,7 +3902,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap); doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pInfo->pUpdatedMap);
continue; continue;
} else if (pBlock->info.type == STREAM_GET_ALL) { } else if (pBlock->info.type == STREAM_GET_ALL) {
qDebug("===stream===%s recv|block type STREAM_GET_ALL", getStreamOpName(pOperator->operatorType));
pInfo->recvGetAll = true; pInfo->recvGetAll = true;
getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap);
continue; continue;
@ -4083,3 +4084,8 @@ _error:
pTaskInfo->code = code; pTaskInfo->code = code;
return NULL; return NULL;
} }
void setStreamOperatorCompleted(SOperatorInfo* pOperator) {
setOperatorCompleted(pOperator);
qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status);
}

View File

@ -8941,7 +8941,7 @@ static int32_t extractShowVariablesResultSchema(int32_t* numOfCols, SSchema** pS
(*pSchema)[0].type = TSDB_DATA_TYPE_BINARY; (*pSchema)[0].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[0].bytes = TSDB_CONFIG_OPTION_LEN; (*pSchema)[0].bytes = TSDB_CONFIG_OPTION_LEN;
strcpy((*pSchema)[0].name, "result"); strcpy((*pSchema)[0].name, "name");
(*pSchema)[1].type = TSDB_DATA_TYPE_BINARY; (*pSchema)[1].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[1].bytes = TSDB_CONFIG_VALUE_LEN; (*pSchema)[1].bytes = TSDB_CONFIG_VALUE_LEN;
@ -8963,7 +8963,7 @@ static int32_t extractCompactDbResultSchema(int32_t* numOfCols, SSchema** pSchem
(*pSchema)[0].type = TSDB_DATA_TYPE_BINARY; (*pSchema)[0].type = TSDB_DATA_TYPE_BINARY;
(*pSchema)[0].bytes = COMPACT_DB_RESULT_FIELD1_LEN; (*pSchema)[0].bytes = COMPACT_DB_RESULT_FIELD1_LEN;
strcpy((*pSchema)[0].name, "name"); strcpy((*pSchema)[0].name, "result");
(*pSchema)[1].type = TSDB_DATA_TYPE_INT; (*pSchema)[1].type = TSDB_DATA_TYPE_INT;
(*pSchema)[1].bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes; (*pSchema)[1].bytes = tDataTypes[TSDB_DATA_TYPE_INT].bytes;

View File

@ -746,7 +746,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
taosArrayClear(pTask->pReadyMsgList); taosArrayClear(pTask->pReadyMsgList);
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
} else { } else {
stDebug("s-task:%s level:%d already send rsp to mnode", pTask->id.idStr, pTask->info.taskLevel); stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel);
} }
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);

View File

@ -625,10 +625,29 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
// todo other thread may change the status // todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) { if (type == STREAM_INPUT__CHECKPOINT) {
// todo add lock
char* p = NULL; char* p = NULL;
streamTaskGetStatus(pTask, &p); ETaskStatus s = streamTaskGetStatus(pTask, &p);
stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p); if (s == TASK_STATUS__CK) {
streamTaskBuildCheckpoint(pTask); stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, p);
streamTaskBuildCheckpoint(pTask);
} else {
// todo refactor
int32_t code = 0;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
code = streamTaskSendCheckpointReadyMsg(pTask);
}
if (code != TSDB_CODE_SUCCESS) {
// todo: let's retry send rsp to upstream/mnode
stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", pTask->id.idStr,
0, tstrerror(code));
}
}
return 0; return 0;
} }
} }

View File

@ -558,22 +558,31 @@ int32_t syncFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, SyncTe
pEntry->term); pEntry->term);
} }
SRpcMsg rpcMsg = {.code = applyCode}; int32_t code = 0;
syncEntry2OriginalRpc(pEntry, &rpcMsg); bool retry = false;
do {
SRpcMsg rpcMsg = {.code = applyCode};
syncEntry2OriginalRpc(pEntry, &rpcMsg);
SFsmCbMeta cbMeta = {0}; SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index; cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index); cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(pNode, pEntry->index);
cbMeta.isWeak = pEntry->isWeak; cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = applyCode; cbMeta.code = applyCode;
cbMeta.state = role; cbMeta.state = role;
cbMeta.seqNum = pEntry->seqNum; cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term; cbMeta.term = pEntry->term;
cbMeta.currentTerm = term; cbMeta.currentTerm = term;
cbMeta.flag = -1; cbMeta.flag = -1;
(void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info); (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info);
int32_t code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); code = pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta);
retry = (code != 0) && (terrno == TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE);
if (retry) {
taosMsleep(10);
sError("vgId:%d, retry on fsm commit since %s. index:%" PRId64, pNode->vgId, terrstr(), pEntry->index);
}
} while (retry);
return code; return code;
} }

View File

@ -806,6 +806,13 @@ static int32_t syncNodeOnSnapshotBegin(SSyncNode *pSyncNode, SyncSnapshotSend *p
goto _SEND_REPLY; goto _SEND_REPLY;
} }
SyncIndex beginIndex = syncNodeGetSnapBeginIndex(pSyncNode);
if (pReceiver->snapshotParam.start != beginIndex) {
sRError(pReceiver, "snapshot begin index is changed unexpectedly. sver:%" PRId64 ", beginIndex:%" PRId64,
pReceiver->snapshotParam.start, beginIndex);
goto _SEND_REPLY;
}
code = 0; code = 0;
_SEND_REPLY: _SEND_REPLY:
if (code != 0 && terrno != 0) { if (code != 0 && terrno != 0) {

View File

@ -1070,6 +1070,11 @@ int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t
return TSDB_PATTERN_MATCH; /* "*" at the end of the pattern matches */ return TSDB_PATTERN_MATCH; /* "*" at the end of the pattern matches */
} }
if (c == '\\' && (pattern[i] == '_' || pattern[i] == '%')) {
c = pattern[i];
i++;
}
char rejectList[2] = {toupper(c), tolower(c)}; char rejectList[2] = {toupper(c), tolower(c)};
str += nMatchChar; str += nMatchChar;
@ -1139,6 +1144,11 @@ int32_t wcsPatternMatch(const TdUcs4 *pattern, size_t psize, const TdUcs4 *str,
return TSDB_PATTERN_MATCH; return TSDB_PATTERN_MATCH;
} }
if (c == '\\' && (pattern[i] == '_' || pattern[i] == '%')) {
c = pattern[i];
i++;
}
TdUcs4 rejectList[2] = {towupper(c), towlower(c)}; TdUcs4 rejectList[2] = {towupper(c), towlower(c)};
str += nMatchChar; str += nMatchChar;
@ -1166,7 +1176,8 @@ int32_t wcsPatternMatch(const TdUcs4 *pattern, size_t psize, const TdUcs4 *str,
c1 = str[j++]; c1 = str[j++];
nMatchChar++; nMatchChar++;
if (c == L'\\' && pattern[i] == L'_' && c1 == L'_') { if (c == '\\' && pattern[i] == c1 &&
(c1 == '_' || c1 == '%')) {
i++; i++;
continue; continue;
} }

View File

@ -286,6 +286,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CLOG_IS_NULL, "Transaction commitlog
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction and will continue in the background") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_NETWORK_UNAVAILL, "Unable to establish connection While execute transaction and will continue in the background")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED, "Last Transaction not finished") TAOS_DEFINE_ERROR(TSDB_CODE_MND_LAST_TRANS_NOT_FINISHED, "Last Transaction not finished")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT, "Sync timeout While execute transaction and will continue in the background") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_SYNC_TIMEOUT, "Sync timeout While execute transaction and will continue in the background")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_CTX_SWITCH, "Transaction context switch")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error") TAOS_DEFINE_ERROR(TSDB_CODE_MND_TRANS_UNKNOW_ERROR, "Unknown transaction error")
// mnode-mq // mnode-mq

View File

@ -76,6 +76,16 @@ TEST(utilTest, wchar_pattern_match_test) {
const TdWchar* str12 = L""; const TdWchar* str12 = L"";
ret = wcsPatternMatch(reinterpret_cast<const TdUcs4*>(pattern12), 4, reinterpret_cast<const TdUcs4*>(str12), 0, &pInfo); ret = wcsPatternMatch(reinterpret_cast<const TdUcs4*>(pattern12), 4, reinterpret_cast<const TdUcs4*>(str12), 0, &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_NOMATCH); ASSERT_EQ(ret, TSDB_PATTERN_NOMATCH);
const TdWchar* pattern13 = L"%\\_6 ";
const TdWchar* str13 = L"6a6 ";
ret = wcsPatternMatch(reinterpret_cast<const TdUcs4*>(pattern13), 6, reinterpret_cast<const TdUcs4*>(str13), 4, &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_NOWILDCARDMATCH);
const TdWchar* pattern14 = L"%\\%6 ";
const TdWchar* str14 = L"6a6 ";
ret = wcsPatternMatch(reinterpret_cast<const TdUcs4*>(pattern14), 6, reinterpret_cast<const TdUcs4*>(str14), 4, &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_NOWILDCARDMATCH);
} }
TEST(utilTest, wchar_pattern_match_no_terminated) { TEST(utilTest, wchar_pattern_match_no_terminated) {
@ -126,14 +136,24 @@ TEST(utilTest, wchar_pattern_match_no_terminated) {
ret = wcsPatternMatch(reinterpret_cast<const TdUcs4*>(pattern8), 8, reinterpret_cast<const TdUcs4*>(str8), 6, &pInfo); ret = wcsPatternMatch(reinterpret_cast<const TdUcs4*>(pattern8), 8, reinterpret_cast<const TdUcs4*>(str8), 6, &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_NOWILDCARDMATCH); ASSERT_EQ(ret, TSDB_PATTERN_NOWILDCARDMATCH);
const TdWchar* pattern9 = L"6\\_6 "; const TdWchar* pattern9 = L"6\\_6 ";
const TdWchar* str9 = L"6_6 "; const TdWchar* str9 = L"6_6 ";
ret = wcsPatternMatch(reinterpret_cast<const TdUcs4*>(pattern9), 4, reinterpret_cast<const TdUcs4*>(str9), 3, &pInfo); ret = wcsPatternMatch(reinterpret_cast<const TdUcs4*>(pattern9), 6, reinterpret_cast<const TdUcs4*>(str9), 4, &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_MATCH); ASSERT_EQ(ret, TSDB_PATTERN_MATCH);
const TdWchar* pattern10 = L"% "; const TdWchar* pattern10 = L"% ";
const TdWchar* str10 = L"6_6 "; const TdWchar* str10 = L"6_6 ";
ret = wcsPatternMatch(reinterpret_cast<const TdUcs4*>(pattern10), 1, reinterpret_cast<const TdUcs4*>(str10), 3, &pInfo); ret = wcsPatternMatch(reinterpret_cast<const TdUcs4*>(pattern10), 2, reinterpret_cast<const TdUcs4*>(str10), 4, &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_MATCH);
const TdWchar* pattern11 = L"%\\_6 ";
const TdWchar* str11 = L"6_6 ";
ret = wcsPatternMatch(reinterpret_cast<const TdUcs4*>(pattern11), 6, reinterpret_cast<const TdUcs4*>(str11), 4, &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_MATCH);
const TdWchar* pattern12 = L"%\\%6 ";
const TdWchar* str12 = L"6%6 ";
ret = wcsPatternMatch(reinterpret_cast<const TdUcs4*>(pattern12), 6, reinterpret_cast<const TdUcs4*>(str12), 4, &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_MATCH); ASSERT_EQ(ret, TSDB_PATTERN_MATCH);
} }
@ -209,6 +229,36 @@ TEST(utilTest, char_pattern_match_test) {
const char* str13 = "a%c"; const char* str13 = "a%c";
ret = patternMatch(pattern13, 5, str13, strlen(str13), &pInfo); ret = patternMatch(pattern13, 5, str13, strlen(str13), &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_MATCH); ASSERT_EQ(ret, TSDB_PATTERN_MATCH);
const char* pattern14 = "%a\\%c";
const char* str14 = "a%c";
ret = patternMatch(pattern14, strlen(pattern14), str14, strlen(str14), &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_MATCH);
const char* pattern15 = "_a\\%c";
const char* str15 = "ba%c";
ret = patternMatch(pattern15, strlen(pattern15), str15, strlen(str15), &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_MATCH);
const char* pattern16 = "_\\%c";
const char* str16 = "a%c";
ret = patternMatch(pattern16, strlen(pattern16), str16, strlen(str16), &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_MATCH);
const char* pattern17 = "_\\%c";
const char* str17 = "ba%c";
ret = patternMatch(pattern17, strlen(pattern17), str17, strlen(str17), &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_NOMATCH);
const char* pattern18 = "%\\%c";
const char* str18 = "abc";
ret = patternMatch(pattern18, strlen(pattern18), str18, strlen(str18), &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_NOWILDCARDMATCH);
const char* pattern19 = "%\\_c";
const char* str19 = "abc";
ret = patternMatch(pattern19, strlen(pattern19), str19, strlen(str19), &pInfo);
ASSERT_EQ(ret, TSDB_PATTERN_NOWILDCARDMATCH);
} }
TEST(utilTest, char_pattern_match_no_terminated) { TEST(utilTest, char_pattern_match_no_terminated) {

View File

@ -29,6 +29,7 @@
,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4 ,,n,system-test,python3 ./test.py -f 8-stream/snode_restart_with_checkpoint.py -N 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/tbname_vgroup.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/compact-col.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stbJoin.py -Q 3
@ -131,6 +132,10 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py -Q 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py -Q 4 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/ts-4233.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/like.py -Q 4
,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False ,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False
,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False ,,y,system-test,./pytest.sh python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False
@ -514,6 +519,7 @@ e
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/json_tag.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/nestedQueryInterval.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/systable_func.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/systable_func.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/test_ts4382.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity.py
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/stablity_1.py

View File

@ -159,20 +159,21 @@ class TDTestCase:
] ]
} }
def get_param_value_with_gdb(self, config_name, process_name): def get_param_value(self, config_name):
res = subprocess.Popen("gdb -q -nx -p `pidof {}` --batch -ex 'set height 0' -ex 'p {}'".format(process_name, config_name), shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE) tdSql.query("show local variables;")
r_lines = res.stdout.read() for row in tdSql.queryResult:
if r_lines: if config_name == row[0]:
for line in r_lines.decode().split("\n"): tdLog.debug("Found variable '{}'".format(row[0]))
if "$1 = " in line: return row[1]
tdLog.debug("gdb result: {}".format(line))
return line.split(" = ")[1]
def cli_check(self, name, values, except_values=False): def cli_check(self, name, values, except_values=False):
if not except_values: if not except_values:
for v in values: for v in values:
tdLog.debug("Set {} to {}".format(name, v)) tdLog.debug("Set {} to {}".format(name, v))
tdSql.execute(f'alter local "{name} {v}";') tdSql.execute(f'alter local "{name} {v}";')
value = self.get_param_value(name)
tdLog.debug("Get {} value: {}".format(name, value))
assert(v == int(value))
else: else:
for v in values: for v in values:
tdLog.debug("Set {} to {}".format(name, v)) tdLog.debug("Set {} to {}".format(name, v))
@ -190,9 +191,7 @@ class TDTestCase:
for v in values: for v in values:
dnode = random.choice(p_list) dnode = random.choice(p_list)
tdSql.execute(f'alter {dnode} "{name} {v}";') tdSql.execute(f'alter {dnode} "{name} {v}";')
if platform.system() == "Linux" and platform.machine() == "aarch64": value = self.get_param_value(alias)
continue
value = self.get_param_value_with_gdb(alias, "taosd")
if value: if value:
tdLog.debug(f"value: {value}") tdLog.debug(f"value: {value}")
assert(value == str(bool(v)).lower() if is_bool else str(v)) assert(value == str(bool(v)).lower() if is_bool else str(v))

View File

@ -0,0 +1,71 @@
import sys
from util.log import *
from util.cases import *
from util.sql import *
from util.dnodes import tdDnodes
from math import inf
class TDTestCase:
def caseDescription(self):
'''
case1<shenglian zhou>: [TD-]
'''
return
def init(self, conn, logSql, replicaVer=1):
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor(), True)
self.conn = conn
def restartTaosd(self, index=1, dbname="db"):
tdDnodes.stop(index)
tdDnodes.startWithoutSleep(index)
tdSql.execute(f"use tbname_vgroup")
def run(self):
print("running {}".format(__file__))
tdSql.execute("drop database if exists tbname_vgroup")
tdSql.execute("create database if not exists tbname_vgroup")
tdSql.execute('use tbname_vgroup')
tdSql.execute('drop database if exists dbvg')
tdSql.execute('create database dbvg vgroups 8;')
tdSql.execute('use dbvg;')
tdSql.execute('create table st(ts timestamp, f int) tags (t int);')
tdSql.execute("insert into ct1 using st tags(1) values('2021-04-19 00:00:01', 1)")
tdSql.execute("insert into ct2 using st tags(2) values('2021-04-19 00:00:02', 2)")
tdSql.execute("insert into ct3 using st tags(3) values('2021-04-19 00:00:03', 3)")
tdSql.execute("insert into ct4 using st tags(4) values('2021-04-19 00:00:04', 4)")
tdSql.execute('create table st2(ts timestamp, f int) tags (t int);')
tdSql.execute("insert into ct21 using st2 tags(1) values('2021-04-19 00:00:01', 1)")
tdSql.execute("insert into ct22 using st2 tags(2) values('2021-04-19 00:00:02', 2)")
tdSql.execute("insert into ct23 using st2 tags(3) values('2021-04-19 00:00:03', 3)")
tdSql.execute("insert into ct24 using st2 tags(4) values('2021-04-19 00:00:04', 4)")
col_names = tdSql.getColNameList("compact database tbname_vgroup")
if col_names[0] != "result":
raise Exception("first column name of compact result shall be result")
col_names = tdSql.getColNameList("show variables")
if col_names[0] != "name":
raise Exception("first column name of compact result shall be name")
tdSql.execute('drop database tbname_vgroup')
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -0,0 +1,148 @@
import taos
import sys
import datetime
import inspect
from util.log import *
from util.sql import *
from util.cases import *
from util.common import tdCom
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def initDB(self):
tdSql.execute("drop database if exists db")
tdSql.execute("create database if not exists db")
def stopTest(self):
tdSql.execute("drop database if exists db")
def like_wildcard_test(self):
tdSql.execute("create table db.t1x (ts timestamp, c1 varchar(100))")
tdSql.execute("create table db.t_1x (ts timestamp, c1 varchar(100))")
tdSql.query("select * from information_schema.ins_columns where table_name like '%1x'")
tdSql.checkRows(4)
tdSql.query("select * from information_schema.ins_columns where table_name like '%\_1x'")
tdSql.checkRows(2)
tdSql.query("insert into db.t1x values(now, 'abc'), (now+1s, 'a%c'),(now+2s, 'a_c'),(now+3s, '_c'),(now+4s, '%c')")
tdSql.query("select * from db.t1x")
tdSql.checkRows(5)
tdSql.query("select * from db.t1x where c1 like '%_c'")
tdSql.checkRows(5)
tdSql.query("select * from db.t1x where c1 like '%__c'")
tdSql.checkRows(3)
tdSql.query("select * from db.t1x where c1 like '%\_c'")
tdSql.checkRows(2)
tdSql.query("select * from db.t1x where c1 like '%\%c'")
tdSql.checkRows(2)
tdSql.query("select * from db.t1x where c1 like '_\%c'")
tdSql.checkRows(1)
tdSql.checkData(0, 1, "a%c")
tdSql.query("select * from db.t1x where c1 like '_\_c'")
tdSql.checkRows(1)
tdSql.checkData(0, 1, "a_c")
tdSql.query("select * from db.t1x where c1 like '%%_c'")
tdSql.checkRows(5)
tdSql.query("select * from db.t1x where c1 like '%_%c'")
tdSql.checkRows(5)
tdSql.query("select * from db.t1x where c1 like '__%c'")
tdSql.checkRows(3)
tdSql.query("select * from db.t1x where c1 not like '__%c'")
tdSql.checkRows(2)
def like_cnc_wildcard_test(self):
tdSql.execute("create table db.t3x (ts timestamp, c1 varchar(100))")
tdSql.execute("insert into db.t3x values(now, '我是中文'), (now+1s, '我是_中文'), (now+2s, '我是%中文'), (now+3s, '%中文'),(now+4s, '_中文')")
tdSql.query("select * from db.t3x")
tdSql.checkRows(5)
tdSql.query("select * from db.t3x where c1 like '%中文'")
tdSql.checkRows(5)
tdSql.query("select * from db.t3x where c1 like '%中_文'")
tdSql.checkRows(0)
tdSql.query("select * from db.t3x where c1 like '%\%中文'")
tdSql.checkRows(2)
tdSql.query("select * from db.t3x where c1 like '%\_中文'")
tdSql.checkRows(2)
tdSql.query("select * from db.t3x where c1 like '_中文'")
tdSql.checkRows(2)
tdSql.query("select * from db.t3x where c1 like '\_中文'")
tdSql.checkRows(1)
def like_multi_wildcard_test(self):
tdSql.execute("create table db.t4x (ts timestamp, c1 varchar(100))")
# 插入测试数据
tdSql.execute("insert into db.t4x values(now, 'abc'), (now+1s, 'a%c'),(now+2s, 'a_c'),(now+3s, '_c'),(now+4s, '%c')")
tdSql.execute("insert into db.t4x values(now+5s, '%%%c'),(now+6s, '___c'),(now+7s, '%_%c'),(now+8s, '%\\c')")
tdSql.query("select * from db.t4x where c1 like '%%%_'")
tdSql.checkRows(9)
tdSql.query("select * from db.t4x where c1 like '\%\%\%_'")
tdSql.checkRows(1)
tdSql.query("select * from db.t4x where c1 like '%\_%%'")
tdSql.checkRows(4)
tdSql.query("select * from db.t4x where c1 like '_\%\%'")
tdSql.checkRows(0)
tdSql.query("select * from db.t4x where c1 like '%abc%'")
tdSql.checkRows(1)
tdSql.query("select * from db.t4x where c1 like '_%abc%'")
tdSql.checkRows(0)
tdSql.query("select * from db.t4x where c1 like '\%%\%%'")
tdSql.checkRows(2)
tdSql.query("select * from db.t4x where c1 like '\%\_%\%%'")
tdSql.checkRows(1)
def run(self):
tdLog.printNoPrefix("==========start like_wildcard_test run ...............")
tdSql.prepare(replica = self.replicaVar)
self.initDB()
self.like_wildcard_test()
self.like_cnc_wildcard_test()
self.like_multi_wildcard_test()
tdLog.printNoPrefix("==========end like_wildcard_test run ...............")
self.stopTest()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,75 @@
import random
import string
from util.log import *
from util.cases import *
from util.sql import *
from util.sqlset import *
from util import constant
from util.common import *
class TDTestCase:
"""Verify the jira TS-4382
"""
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug("start to execute %s" % __file__)
tdSql.init(conn.cursor())
self.dbname = 'db'
self.stbname = 'st'
self.ctbname_list = ["ct1", "ct2"]
self.tag_value_list = ['{"instance":"100"}', '{"instance":"200"}']
def prepareData(self):
# db
tdSql.execute("create database {};".format(self.dbname))
tdSql.execute("use {};".format(self.dbname))
tdLog.debug("Create database %s" % self.dbname)
# super table
tdSql.execute("create table {} (ts timestamp, col1 int) tags (t1 json);".format(self.stbname))
tdLog.debug("Create super table %s" % self.stbname)
# child table
for i in range(len(self.ctbname_list)):
tdSql.execute("create table {} using {} tags('{}');".format(self.ctbname_list[i], self.stbname, self.tag_value_list[i]))
tdLog.debug("Create child table %s" % self.ctbname_list)
# insert data
tdSql.execute("insert into {} values(now, 1)(now+1s, 2)".format(self.ctbname_list[0]))
tdSql.execute("insert into {} values(now, null)(now+1s, null)".format(self.ctbname_list[1]))
def run(self):
self.prepareData()
sql_list = [
# super table query with correct tag name of json type
{
"sql": "select to_char(ts, 'yyyy-mm-dd hh24:mi:ss') as time, irate(col1) from st group by to_char(ts, 'yyyy-mm-dd hh24:mi:ss'), t1->'instance' order by time;",
"result_check": "0.0"
},
# child table query with incorrect tag name of json type
{
"sql": "select to_char(ts, 'yyyy-mm-dd hh24:mi:ss') as time, irate(col1) from ct1 group by to_char(ts, 'yyyy-mm-dd hh24:mi:ss'), t1->'name' order by time;",
"result_check": "None"
},
# child table query with null value
{
"sql": "select ts, avg(col1) from ct2 group by ts, t1->'name' order by ts;",
"result_check": "None"
}
]
for sql_dic in sql_list:
tdSql.query(sql_dic["sql"])
tdLog.debug("execute sql: %s" % sql_dic["sql"])
for item in [row[1] for row in tdSql.queryResult]:
if sql_dic["result_check"] in str(item):
tdLog.debug("Check query result of '{}' successfully".format(sql_dic["sql"]))
break
def stop(self):
tdSql.close()
tdLog.success("%s successfully executed" % __file__)
tdCases.addWindows(__file__, TDTestCase())
tdCases.addLinux(__file__, TDTestCase())

View File

@ -99,6 +99,10 @@ python3 ./test.py -f 2-query/ts-4233.py
python3 ./test.py -f 2-query/ts-4233.py -Q 2 python3 ./test.py -f 2-query/ts-4233.py -Q 2
python3 ./test.py -f 2-query/ts-4233.py -Q 3 python3 ./test.py -f 2-query/ts-4233.py -Q 3
python3 ./test.py -f 2-query/ts-4233.py -Q 4 python3 ./test.py -f 2-query/ts-4233.py -Q 4
python3 ./test.py -f 2-query/like.py
python3 ./test.py -f 2-query/like.py -Q 2
python3 ./test.py -f 2-query/like.py -Q 3
python3 ./test.py -f 2-query/like.py -Q 4
python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False python3 ./test.py -f 3-enterprise/restore/restoreDnode.py -N 5 -M 3 -i False
python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False python3 ./test.py -f 3-enterprise/restore/restoreVnode.py -N 5 -M 3 -i False
python3 ./test.py -f 3-enterprise/restore/restoreMnode.py -N 5 -M 3 -i False python3 ./test.py -f 3-enterprise/restore/restoreMnode.py -N 5 -M 3 -i False