Merge pull request #24052 from taosdata/fix/3_liaohj
fix(tsdb): fix error in tsdb read.
This commit is contained in:
commit
26af397431
|
@ -704,10 +704,10 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
|
||||||
pReq->streamId = pTask->id.streamId;
|
pReq->streamId = pTask->id.streamId;
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
SEpSet epset = {0};
|
SEpSet epset = {0};
|
||||||
if(pTask->info.nodeId == SNODE_HANDLE){
|
if (pTask->info.nodeId == SNODE_HANDLE) {
|
||||||
SSnodeObj *pObj = NULL;
|
SSnodeObj *pObj = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
|
||||||
if (pIter == NULL) {
|
if (pIter == NULL) {
|
||||||
|
@ -717,10 +717,16 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
|
||||||
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
|
||||||
sdbRelease(pMnode->pSdb, pObj);
|
sdbRelease(pMnode->pSdb, pObj);
|
||||||
}
|
}
|
||||||
}else{
|
} else {
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||||
epset = mndGetVgroupEpset(pMnode, pVgObj);
|
if (pVgObj != NULL) {
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
epset = mndGetVgroupEpset(pMnode, pVgObj);
|
||||||
|
mndReleaseVgroup(pMnode, pVgObj);
|
||||||
|
} else {
|
||||||
|
mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", pTask->id.taskId, pTask->info.nodeId);
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
|
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
|
||||||
|
@ -1657,6 +1663,7 @@ static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDat
|
||||||
|
|
||||||
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
||||||
if (pe == NULL) {
|
if (pe == NULL) {
|
||||||
|
mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,8 +48,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) {
|
||||||
void* pKey = taosHashGetKey(pEntry, &keyLen);
|
void* pKey = taosHashGetKey(pEntry, &keyLen);
|
||||||
// key is the name of src/dst db name
|
// key is the name of src/dst db name
|
||||||
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
|
SKeyInfo info = {.pKey = pKey, .keyLen = keyLen};
|
||||||
|
mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name,
|
||||||
mDebug("transId:%d %s startTs:%" PRId64 "cleared due to finished", pEntry->transId, pEntry->name,
|
|
||||||
pEntry->startTime);
|
pEntry->startTime);
|
||||||
taosArrayPush(pList, &info);
|
taosArrayPush(pList, &info);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -67,7 +67,7 @@ static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond
|
||||||
static int32_t doBuildDataBlock(STsdbReader* pReader);
|
static int32_t doBuildDataBlock(STsdbReader* pReader);
|
||||||
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader);
|
||||||
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
|
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
|
||||||
static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader);
|
static bool hasDataInSttBlock(STableBlockScanInfo *pInfo);
|
||||||
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
|
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
|
||||||
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
|
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
|
||||||
static void resetTableListIndex(SReaderStatus* pStatus);
|
static void resetTableListIndex(SReaderStatus* pStatus);
|
||||||
|
@ -1466,7 +1466,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo;
|
||||||
|
|
||||||
int64_t tsLast = INT64_MIN;
|
int64_t tsLast = INT64_MIN;
|
||||||
if (hasDataInSttBlock(pSttBlockReader)) {
|
if (hasDataInSttBlock(pBlockScanInfo)) {
|
||||||
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1485,7 +1485,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
int64_t minKey = 0;
|
int64_t minKey = 0;
|
||||||
if (pReader->info.order == TSDB_ORDER_ASC) {
|
if (pReader->info.order == TSDB_ORDER_ASC) {
|
||||||
minKey = INT64_MAX; // chosen the minimum value
|
minKey = INT64_MAX; // chosen the minimum value
|
||||||
if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) {
|
if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) {
|
||||||
minKey = tsLast;
|
minKey = tsLast;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1498,7 +1498,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
minKey = INT64_MIN;
|
minKey = INT64_MIN;
|
||||||
if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) {
|
if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) {
|
||||||
minKey = tsLast;
|
minKey = tsLast;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1705,7 +1705,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader*
|
||||||
}
|
}
|
||||||
|
|
||||||
bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo);
|
bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo);
|
||||||
bool dataInSttFile = hasDataInSttBlock(pSttBlockReader);
|
bool dataInSttFile = hasDataInSttBlock(pBlockScanInfo);
|
||||||
if (dataInDataFile && (!dataInSttFile)) {
|
if (dataInDataFile && (!dataInSttFile)) {
|
||||||
// no stt file block available, only data block exists
|
// no stt file block available, only data block exists
|
||||||
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader);
|
||||||
|
@ -1791,7 +1791,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader);
|
||||||
|
|
||||||
int64_t tsLast = INT64_MIN;
|
int64_t tsLast = INT64_MIN;
|
||||||
if (hasDataInSttBlock(pSttBlockReader)) {
|
if (hasDataInSttBlock(pBlockScanInfo)) {
|
||||||
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
tsLast = getCurrentKeyInSttBlock(pSttBlockReader);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1840,7 +1840,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = key;
|
minKey = key;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) {
|
if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) {
|
||||||
minKey = tsLast;
|
minKey = tsLast;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -1857,7 +1857,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo*
|
||||||
minKey = key;
|
minKey = key;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) {
|
if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) {
|
||||||
minKey = tsLast;
|
minKey = tsLast;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2065,7 +2065,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
|
|
||||||
// the stt block reader has been initialized for this table.
|
// the stt block reader has been initialized for this table.
|
||||||
if (pSttBlockReader->uid == pScanInfo->uid) {
|
if (pSttBlockReader->uid == pScanInfo->uid) {
|
||||||
return hasDataInSttBlock(pSttBlockReader);
|
return hasDataInSttBlock(pScanInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSttBlockReader->uid != 0) {
|
if (pSttBlockReader->uid != 0) {
|
||||||
|
@ -2158,7 +2158,9 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan
|
||||||
return hasData;
|
return hasData;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader) { return pSttBlockReader->mergeTree.pIter != NULL; }
|
static bool hasDataInSttBlock(STableBlockScanInfo *pInfo) {
|
||||||
|
return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA;
|
||||||
|
}
|
||||||
|
|
||||||
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) {
|
||||||
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
|
if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) {
|
||||||
|
@ -2733,7 +2735,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) {
|
||||||
int64_t st = taosGetTimestampUs();
|
int64_t st = taosGetTimestampUs();
|
||||||
while (1) {
|
while (1) {
|
||||||
// no data in stt block and block, no need to proceed.
|
// no data in stt block and block, no need to proceed.
|
||||||
if (!hasDataInSttBlock(pSttBlockReader)) {
|
if (!hasDataInSttBlock(pScanInfo)) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2850,7 +2852,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) {
|
||||||
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
initSttBlockReader(pSttBlockReader, pScanInfo, pReader);
|
||||||
|
|
||||||
// no data in stt block, no need to proceed.
|
// no data in stt block, no need to proceed.
|
||||||
while (hasDataInSttBlock(pSttBlockReader)) {
|
while (hasDataInSttBlock(pScanInfo)) {
|
||||||
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
|
ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA);
|
||||||
|
|
||||||
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader);
|
code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader);
|
||||||
|
|
|
@ -252,6 +252,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) {
|
||||||
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
|
taosArrayClear(pScanInfo->pFileDelData); // del data from each file set
|
||||||
pScanInfo->cleanSttBlocks = false;
|
pScanInfo->cleanSttBlocks = false;
|
||||||
pScanInfo->numOfRowsInStt = 0;
|
pScanInfo->numOfRowsInStt = 0;
|
||||||
|
pScanInfo->sttBlockReturned = false;
|
||||||
INIT_TIMEWINDOW(&pScanInfo->sttWindow);
|
INIT_TIMEWINDOW(&pScanInfo->sttWindow);
|
||||||
INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
|
INIT_TIMEWINDOW(&pScanInfo->filesetWindow);
|
||||||
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
|
pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
|
||||||
|
|
Loading…
Reference in New Issue