Merge pull request #24095 from taosdata/fix/3_liaohj

fix(tsdb): add desc check for clean file block.
This commit is contained in:
Haojun Liao 2023-12-16 18:10:15 +08:00 committed by GitHub
commit 0526e619ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 94 additions and 95 deletions

View File

@ -24,8 +24,9 @@ extern "C" {
typedef struct SStreamTransInfo {
int64_t startTime;
int32_t transId;
int64_t streamUid;
const char *name;
int32_t transId;
} SStreamTransInfo;
// time to generated the checkpoint, if now() - checkpointTs >= tsCheckpointInterval, this checkpoint will be discard
@ -52,6 +53,14 @@ typedef struct SStreamExecInfo {
TdThreadMutex lock;
} SStreamExecInfo;
#define MND_STREAM_CREATE_NAME "stream-create"
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
#define MND_STREAM_RESUME_NAME "stream-resume"
#define MND_STREAM_DROP_NAME "stream-drop"
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
extern SStreamExecInfo execInfo;
int32_t mndInitStream(SMnode *pMnode);
@ -61,9 +70,9 @@ void mndReleaseStream(SMnode *pMnode, SStreamObj *pStream);
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamRegisterTrans(STrans *pTrans, const char *pName, const char *pSrcDb, const char *pDstDb);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid);
int32_t mndAddtoCheckpointWaitingList(SStreamObj *pStream, int64_t checkpointId);
bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb, bool lock);
bool streamTransConflictOtherTrans(SMnode *pMnode, int64_t streamUid, const char *pTransName, bool lock);
// for sma
// TODO refactor

View File

@ -16,12 +16,9 @@
#include "mndStream.h"
#include "audit.h"
#include "mndDb.h"
#include "mndDnode.h"
#include "mndMnode.h"
#include "mndPrivilege.h"
#include "mndScheduler.h"
#include "mndShow.h"
#include "mndSnode.h"
#include "mndStb.h"
#include "mndTrans.h"
#include "mndVgroup.h"
@ -34,13 +31,6 @@
#define MND_STREAM_RESERVE_SIZE 64
#define MND_STREAM_MAX_NUM 60
#define MND_STREAM_CHECKPOINT_NAME "stream-checkpoint"
#define MND_STREAM_PAUSE_NAME "stream-pause"
#define MND_STREAM_RESUME_NAME "stream-resume"
#define MND_STREAM_DROP_NAME "stream-drop"
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
typedef struct SNodeEntry {
int32_t nodeId;
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
@ -839,7 +829,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_CREATE_NAME);
if (pTrans == NULL) {
mError("stream:%s, failed to create since %s", createStreamReq.name, terrstr());
goto _OVER;
@ -1030,7 +1020,7 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1;
}
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, true);
if (conflict) {
mndAddtoCheckpointWaitingList(pStream, checkpointId);
mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb,
@ -1038,14 +1028,14 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, NULL, MND_STREAM_CHECKPOINT_NAME);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, MND_STREAM_CHECKPOINT_NAME);
if (pTrans == NULL) {
return -1;
}
mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->sourceDb, pStream->targetDb);
mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pStream->uid);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
if (mndTrancCheckConflict(pMnode, pTrans) != 0) {
mError("failed to checkpoint of stream name%s, checkpointId: %" PRId64 ", reason:%s", pStream->name, checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
@ -1382,14 +1372,14 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
}
// check if it is conflict with other trans in both sourceDb and targetDb.
// bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
// if (conflict) {
// sdbRelease(pMnode->pSdb, pStream);
// tFreeMDropStreamReq(&dropReq);
// return -1;
// }
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_DROP_NAME, true);
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
tFreeMDropStreamReq(&dropReq);
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_DROP_NAME);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_DROP_NAME);
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1399,7 +1389,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d used to drop stream:%s", pTrans->id, dropReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
@ -1407,7 +1397,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->sourceDb, pStream->targetDb);
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_DROP_NAME, pStream->uid);
// drop all tasks
if (mndDropStreamTasks(pMnode, pTrans, pStream) < 0) {
@ -1866,7 +1856,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
}
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_PAUSE_NAME, true);
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
@ -1878,7 +1868,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, "pause-stream");
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_PAUSE_NAME);
if (pTrans == NULL) {
mError("stream:%s failed to pause stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -1887,14 +1877,14 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d, used to pause stream:%s", pTrans->id, pauseReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb);
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->uid);
// if nodeUpdate happened, not send pause trans
if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) {
@ -2001,13 +1991,13 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
}
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_RESUME_NAME, true);
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, MND_STREAM_RESUME_NAME);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, MND_STREAM_RESUME_NAME);
if (pTrans == NULL) {
mError("stream:%s, failed to resume stream since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
@ -2016,14 +2006,14 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
mInfo("trans:%d used to resume stream:%s", pTrans->id, pauseReq.name);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
return -1;
}
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->sourceDb, pStream->targetDb);
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_RESUME_NAME, pStream->uid);
// resume all tasks
if (mndResumeAllStreamTasks(pTrans, pMnode, pStream, pauseReq.igUntreated) < 0) {
@ -2660,7 +2650,7 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
}
STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg) {
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB_INSIDE, pReq, name);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pReq, name);
if (pTrans == NULL) {
mError("failed to build trans:%s, reason: %s", name, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -2669,7 +2659,7 @@ STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const
mDebug("s-task:0x%" PRIx64 " start to build trans %s", pStream->uid, pMsg);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetDb);
mndTransSetDbName(pTrans, pStream->sourceDb, pStream->targetSTbName);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
terrno = TSDB_CODE_MND_TRANS_CONFLICT;
mError("failed to build trans:%s for stream:0x%" PRIx64 " code:%s", name, pStream->uid, tstrerror(terrno));
@ -2788,7 +2778,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
break;
}
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, false);
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->uid, MND_STREAM_TASK_RESET_NAME, false);
if (conflict) {
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", pStream->name,
pStream->sourceDb, pStream->targetDb);

View File

@ -23,21 +23,17 @@ typedef struct SKeyInfo {
static int32_t clearFinishedTrans(SMnode* pMnode);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb) {
SStreamTransInfo info = {.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pName};
taosHashPut(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb), &info, sizeof(SStreamTransInfo));
if (strcmp(pSrcDb, pDstDb) != 0) {
taosHashPut(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb), &info, sizeof(SStreamTransInfo));
}
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pTransName, int64_t streamUid) {
SStreamTransInfo info = {
.transId = pTrans->id, .startTime = taosGetTimestampMs(), .name = pTransName, .streamUid = streamUid};
taosHashPut(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid), &info, sizeof(SStreamTransInfo));
return 0;
}
int32_t clearFinishedTrans(SMnode* pMnode) {
size_t keyLen = 0;
SArray* pList = taosArrayInit(4, sizeof(SKeyInfo));
void* pIter = NULL;
SArray* pList = taosArrayInit(4, sizeof(SKeyInfo));
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
SStreamTransInfo* pEntry = (SStreamTransInfo*)pIter;
@ -69,7 +65,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) {
return 0;
}
bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb, bool lock) {
bool streamTransConflictOtherTrans(SMnode* pMnode, int64_t streamUid, const char* pTransName, bool lock) {
if (lock) {
taosThreadMutexLock(&execInfo.lock);
}
@ -84,23 +80,27 @@ bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const cha
clearFinishedTrans(pMnode);
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb));
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamUid, sizeof(streamUid));
if (pEntry != NULL) {
SStreamTransInfo tInfo = *pEntry;
if (lock) {
taosThreadMutexUnlock(&execInfo.lock);
}
mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name);
return true;
}
pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb));
if (pEntry != NULL) {
if (lock) {
taosThreadMutexUnlock(&execInfo.lock);
}
mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name);
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if (strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) {
mWarn("conflict with other transId:%d streamUid:%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
tInfo.name);
return true;
}
} else if ((strcmp(tInfo.name, MND_STREAM_CREATE_NAME) == 0) ||
(strcmp(tInfo.name, MND_STREAM_DROP_NAME) == 0)) {
mWarn("conflict with other transId:%d streamUid:%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
tInfo.name);
return true;
}
}
if (lock) {
taosThreadMutexUnlock(&execInfo.lock);

View File

@ -622,6 +622,9 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
streamMetaReleaseTask(pMeta, pTask);
}
// TODO: send the checkpoint complete msg if it is in checkpoint procedure.
// drop the stream task now
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
@ -633,8 +636,8 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
streamMetaWUnLock(pMeta);
streamMetaWUnLock(pMeta);
return 0;
}

View File

@ -2354,15 +2354,19 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
int64_t st = taosGetTimestampUs();
int32_t step = asc ? 1 : -1;
double el = 0;
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader;
SBrinRecord* pRecord = &pBlockInfo->record;
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
SBrinRecord* pRecord = NULL;
STableBlockScanInfo* pBlockScanInfo = NULL;
if (pBlockInfo != NULL) {
if (pBlockInfo == NULL) {
return 0;
}
pRecord = &pBlockInfo->record;
if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) {
setBlockAllDumped(pDumpInfo, pRecord->lastKey, pReader->info.order);
return code;
@ -2379,7 +2383,8 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
// it is a clean block, load it directly
int64_t cap = pReader->resBlockInfo.capacity;
if (isCleanFileDataBlock(pReader, pBlockInfo, pBlockScanInfo, keyInBuf) && (pRecord->numRow <= cap)) {
if (asc || (pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA)) {
if (((asc && (pRecord->firstKey < keyInBuf.ts)) || (!asc && (pRecord->lastKey > keyInBuf.ts))) &&
(pBlockScanInfo->sttKeyInfo.status == STT_FILE_NO_DATA)) {
code = copyBlockDataToSDataBlock(pReader);
if (code) {
goto _end;
@ -2390,14 +2395,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
goto _end;
}
}
} else { // file blocks not exist
ASSERT(0);
pBlockScanInfo = *pReader->status.pTableIter;
if (pReader->pIgnoreTables &&
taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) {
return code;
}
}
SBlockData* pBlockData = &pReader->status.fileBlockData;
initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader);