fix(tsdb): fix bugs in suspend/resume. adjust code to conduct the test more easily. td-27579

This commit is contained in:
Haojun Liao 2023-11-29 18:07:15 +08:00
parent a5c8eaacaa
commit 66f25712fd
4 changed files with 127 additions and 144 deletions

View File

@ -126,9 +126,9 @@ void queryCallback(void* param, void* res, int32_t code) {
taos_fetch_raw_block_a(res, fetchCallback, param); taos_fetch_raw_block_a(res, fetchCallback, param);
} }
void createNewTable(TAOS* pConn, int32_t index) { void createNewTable(TAOS* pConn, int32_t index, int32_t numOfRows, int64_t startTs, const char* pVarchar) {
char str[1024] = {0}; char str[1024] = {0};
sprintf(str, "create table tu%d using st2 tags(%d)", index, index); sprintf(str, "create table if not exists tu%d using st2 tags(%d)", index, index);
TAOS_RES* pRes = taos_query(pConn, str); TAOS_RES* pRes = taos_query(pConn, str);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
@ -136,22 +136,43 @@ void createNewTable(TAOS* pConn, int32_t index) {
} }
taos_free_result(pRes); taos_free_result(pRes);
for (int32_t i = 0; i < 10000; i += 20) { if (startTs == 0) {
char sql[1024] = {0}; for (int32_t i = 0; i < numOfRows; i += 20) {
sprintf(sql, char sql[1024] = {0};
"insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" sprintf(sql,
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" "insert into tu%d values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)" "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)", "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
index, i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
i + 7, i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, index, i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7,
i + 14, i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19); i + 7, i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14,
TAOS_RES* p = taos_query(pConn, sql); i + 14, i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
if (taos_errno(p) != 0) { TAOS_RES* p = taos_query(pConn, sql);
printf("failed to insert data, reason:%s\n", taos_errstr(p)); if (taos_errno(p) != 0) {
} printf("failed to insert data, reason:%s\n", taos_errstr(p));
}
taos_free_result(p); taos_free_result(p);
}
} else {
for (int32_t i = 0; i < numOfRows; i += 20) {
char sql[1024*50] = {0};
sprintf(sql,
"insert into tu%d values(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, "
"%d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, "
"'%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')(%ld, %d, '%s')",
index, startTs, i, pVarchar, startTs + 1, i + 1, pVarchar, startTs + 2, i + 2, pVarchar, startTs + 3, i + 3, pVarchar, startTs + 4, i + 4,
pVarchar, startTs + 5, i + 5, pVarchar, startTs + 6, i + 6, pVarchar, startTs + 7, i + 7, pVarchar, startTs + 8, i + 8, pVarchar, startTs + 9, i + 9,
pVarchar, startTs + 10, i + 10, pVarchar, startTs + 11, i + 11, pVarchar, startTs + 12, i + 12, pVarchar, startTs + 13, i + 13, pVarchar, startTs + 14,
i + 14, pVarchar, startTs + 15, i + 15, pVarchar, startTs + 16, i + 16, pVarchar, startTs + 17, i + 17, pVarchar, startTs + 18, i + 18,
pVarchar, startTs + 19, i + 19, pVarchar);
TAOS_RES* p = taos_query(pConn, sql);
if (taos_errno(p) != 0) {
printf("failed to insert data, reason:%s\n", taos_errstr(p));
}
taos_free_result(p);
}
} }
} }
@ -808,14 +829,7 @@ TEST(clientCase, projection_query_tables) {
TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)"); pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int, f varchar(4096)) tags(a int)");
if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
}
taos_free_result(pRes);
pRes = taos_query(pConn, "create stable st2 (ts timestamp, k int) tags(a int)");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create table tu, reason:%s\n", taos_errstr(pRes)); printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
} }
@ -828,28 +842,32 @@ TEST(clientCase, projection_query_tables) {
taos_free_result(pRes); taos_free_result(pRes);
int64_t start = 1685959190000; int64_t start = 1685959190000;
const char* pstr =
"abcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefgh"
"ijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnop"
"qrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwx"
"yzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdef"
"ghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyzabcdefghijklmnopqrstuvwxyz!@#$%^&&*&^^%$#@!qQWERTYUIOPASDFGHJKL:"
"QWERTYUIOP{}";
int32_t code = -1; for(int32_t i = 0; i < 10000; ++i) {
for(int32_t i = 0; i < 1000000; ++i) { char str[1024] = {0};
char t[512] = {0}; sprintf(str, "create table if not exists tu%d using st2 tags(%d)", i, i);
sprintf(t, "insert into t1 values(now, %d)", i); TAOS_RES* px = taos_query(pConn, str);
while(1) { if (taos_errno(px) != 0) {
void* p = taos_query(pConn, t); printf("failed to create table tu, reason:%s\n", taos_errstr(pRes));
code = taos_errno(p); }
taos_free_result(p); taos_free_result(px);
if (code != 0) { }
printf("insert data error, retry\n");
} else { for(int32_t j = 0; j < 5000; ++j) {
break; start += 20;
} for (int32_t i = 0; i < 10000; ++i) {
createNewTable(pConn, i, 20, start, pstr);
} }
} }
for (int32_t i = 0; i < 1; ++i) {
printf("create table :%d\n", i);
createNewTable(pConn, i);
}
// //
// pRes = taos_query(pConn, "select * from tu"); // pRes = taos_query(pConn, "select * from tu");
// if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {

View File

@ -93,7 +93,11 @@ typedef struct SQueryNode SQueryNode;
#define VNODE_RSMA2_DIR "rsma2" #define VNODE_RSMA2_DIR "rsma2"
#define VNODE_TQ_STREAM "stream" #define VNODE_TQ_STREAM "stream"
#if SUSPEND_RESUME_TEST // only for test purpose
#define VNODE_BUFPOOL_SEGMENTS 1
#else
#define VNODE_BUFPOOL_SEGMENTS 3 #define VNODE_BUFPOOL_SEGMENTS 3
#endif
#define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME "vnode.json"
#define VND_INFO_FNAME_TMP "vnode_tmp.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json"

View File

@ -48,7 +48,7 @@ static int32_t doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScan
static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key, static int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBlockScanInfo, int64_t key,
STsdbReader* pReader); STsdbReader* pReader);
static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost);
static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr, static STsdb* getTsdbByRetentions(SVnode* pVnode, SQueryTableDataCond* pCond, SRetention* retentions, const char* idstr,
int8_t* pLevel); int8_t* pLevel);
static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level);
@ -58,6 +58,7 @@ static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbRea
static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo);
static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter);
static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order);
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo);
static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); } static bool outOfTimeWindow(int64_t ts, STimeWindow* pWindow) { return (ts > pWindow->ekey) || (ts < pWindow->skey); }
@ -168,7 +169,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SCostSummary* pCost = &pReader->cost; SReadCostSummary* pCost = &pReader->cost;
pIter->pLastBlockReader->uid = 0; pIter->pLastBlockReader->uid = 0;
tMergeTreeClose(&pIter->pLastBlockReader->mergeTree); tMergeTreeClose(&pIter->pLastBlockReader->mergeTree);
@ -291,11 +292,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity)
} }
static int32_t tsdbInitReaderLock(STsdbReader* pReader) { static int32_t tsdbInitReaderLock(STsdbReader* pReader) {
int32_t code = -1; int32_t code = taosThreadMutexInit(&pReader->readerMutex, NULL);
qTrace("tsdb/read: %p, pre-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexInit(&pReader->readerMutex, NULL);
qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code; return code;
@ -324,22 +321,14 @@ static int32_t tsdbAcquireReader(STsdbReader* pReader) {
} }
static int32_t tsdbTryAcquireReader(STsdbReader* pReader) { static int32_t tsdbTryAcquireReader(STsdbReader* pReader) {
int32_t code = -1; int32_t code = taosThreadMutexTryLock(&pReader->readerMutex);
qTrace("tsdb/read: %p, pre-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexTryLock(&pReader->readerMutex);
qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code; return code;
} }
static int32_t tsdbReleaseReader(STsdbReader* pReader) { static int32_t tsdbReleaseReader(STsdbReader* pReader) {
int32_t code = -1; int32_t code = taosThreadMutexUnlock(&pReader->readerMutex);
qTrace("tsdb/read: %p, pre-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
code = taosThreadMutexUnlock(&pReader->readerMutex);
qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code);
return code; return code;
@ -432,6 +421,7 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void
} }
tsdbInitReaderLock(pReader); tsdbInitReaderLock(pReader);
tsem_init(&pReader->resumeAfterSuspend, 0, 0);
*ppReader = pReader; *ppReader = pReader;
return code; return code;
@ -1015,8 +1005,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
// check if current block are all handled // check if current block are all handled
if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) { if (pDumpInfo->rowIndex >= 0 && pDumpInfo->rowIndex < pRecord->numRow) {
int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex]; int64_t ts = pBlockData->aTSKEY[pDumpInfo->rowIndex];
if (outOfTimeWindow(ts, if (outOfTimeWindow(ts, &pReader->info.window)) {
&pReader->info.window)) { // the remain data has out of query time window, ignore current block // the remain data has out of query time window, ignore current block
setBlockAllDumped(pDumpInfo, ts, pReader->info.order); setBlockAllDumped(pDumpInfo, ts, pReader->info.order);
} }
} else { } else {
@ -1123,16 +1113,12 @@ static bool getNeighborBlockOfSameTable(SDataBlockIter* pBlockIter, SFileDataBlo
} }
int32_t step = asc ? 1 : -1; int32_t step = asc ? 1 : -1;
// *nextIndex = pBlockInfo->tbBlockIdx + step;
// *pBlockIndex = *(SBlockIndex*)taosArrayGet(pTableBlockScanInfo->pBlockList, *nextIndex);
STableDataBlockIdx* pTableDataBlockIdx = STableDataBlockIdx* pTableDataBlockIdx =
taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step); taosArrayGet(pTableBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx + step);
SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex); SFileDataBlockInfo* p = taosArrayGet(pBlockIter->blockList, pTableDataBlockIdx->globalIndex);
memcpy(pRecord, &p->record, sizeof(SBrinRecord)); memcpy(pRecord, &p->record, sizeof(SBrinRecord));
*nextIndex = pBlockInfo->tbBlockIdx + step; *nextIndex = pBlockInfo->tbBlockIdx + step;
// tMapDataGetItemByIdx(&pTableBlockScanInfo->mapData, pIndex->ordinalIndex, pBlock, tGetDataBlk);
return true; return true;
} }
@ -1376,23 +1362,19 @@ static int32_t buildDataBlockFromBuf(STsdbReader* pReader, STableBlockScanInfo*
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
SSDataBlock* pBlock = pReader->resBlockInfo.pResBlock;
int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader); int32_t code = buildDataBlockFromBufImpl(pBlockScanInfo, endKey, pReader->resBlockInfo.capacity, pReader);
blockDataUpdateTsWindow(pBlock, pReader->suppInfo.slotId[0]); double el = (taosGetTimestampUs() - st) / 1000.0;
pBlock->info.id.uid = pBlockScanInfo->uid; updateComposedBlockInfo(pReader, el, pBlockScanInfo);
setComposedBlockFlag(pReader, true);
double elapsedTime = (taosGetTimestampUs() - st) / 1000.0;
tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64 tsdbDebug("%p build data block from cache completed, elapsed time:%.2f ms, numOfRows:%" PRId64 ", brange:%" PRId64
" - %" PRId64 ", uid:%" PRIu64 ", %s", " - %" PRId64 ", uid:%" PRIu64 ", %s",
pReader, elapsedTime, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey, pReader, el, pBlock->info.rows, pBlock->info.window.skey, pBlock->info.window.ekey,
pBlockScanInfo->uid, pReader->idStr); pBlockScanInfo->uid, pReader->idStr);
pReader->cost.buildmemBlock += elapsedTime; pReader->cost.buildmemBlock += el;
return code; return code;
} }
@ -2293,13 +2275,12 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock
return code; return code;
} }
static void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) { void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInfo* pBlockScanInfo) {
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0; pResBlock->info.id.uid = (pBlockScanInfo != NULL) ? pBlockScanInfo->uid : 0;
pResBlock->info.dataLoad = 1; pResBlock->info.dataLoad = 1;
blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]);
setComposedBlockFlag(pReader, true); setComposedBlockFlag(pReader, true);
pReader->cost.composedBlocks += 1; pReader->cost.composedBlocks += 1;
@ -2356,7 +2337,6 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) {
pBlockScanInfo = *pReader->status.pTableIter; pBlockScanInfo = *pReader->status.pTableIter;
if (pReader->pIgnoreTables && if (pReader->pIgnoreTables &&
taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) { taosHashGet(*pReader->pIgnoreTables, &pBlockScanInfo->uid, sizeof(pBlockScanInfo->uid))) {
// setBlockAllDumped(pDumpInfo, pBlock->maxKey.ts, pReader->info.order);
return code; return code;
} }
} }
@ -2436,7 +2416,7 @@ int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order) {
return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1; return ASCENDING_TRAVERSE(order) ? 0 : taosArrayGetSize(pDelSkyline) - 1;
} }
int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SCostSummary* pCost) { int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t order, SReadCostSummary* pCost) {
int32_t code = 0; int32_t code = 0;
int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pFileDelData); int32_t newDelDataInFile = taosArrayGetSize(pBlockScanInfo->pFileDelData);
if (newDelDataInFile == 0 && if (newDelDataInFile == 0 &&
@ -2935,6 +2915,8 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
STableUidList* pUidList = &pStatus->uidList; STableUidList* pUidList = &pStatus->uidList;
tsdbDebug("seq load data blocks from cache, %s", pReader->idStr);
while (1) { while (1) {
if (pReader->code != TSDB_CODE_SUCCESS) { if (pReader->code != TSDB_CODE_SUCCESS) {
tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr); tsdbWarn("tsdb reader is stopped ASAP, code:%s, %s", strerror(pReader->code), pReader->idStr);
@ -3043,6 +3025,8 @@ static ERetrieveType doReadDataFromLastFiles(STsdbReader* pReader) {
SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock;
SDataBlockIter* pBlockIter = &pReader->status.blockIter; SDataBlockIter* pBlockIter = &pReader->status.blockIter;
tsdbDebug("seq load data blocks from stt files %s", pReader->idStr);
while (1) { while (1) {
terrno = 0; terrno = 0;
@ -3774,7 +3758,6 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
do { do {
// SRow* pTSRow = NULL;
TSDBROW row = {.type = -1}; TSDBROW row = {.type = -1};
bool freeTSRow = false; bool freeTSRow = false;
tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow); tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow);
@ -3792,13 +3775,17 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e
if (code) { if (code) {
return code; return code;
} }
pBlockScanInfo->lastProcKey = row.pTSRow->ts;
} else { } else {
code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow); code = doAppendRowFromFileBlock(pBlock, pReader, row.pBlockData, row.iRow);
if (code) { if (code) {
break; break;
} }
pBlockScanInfo->lastProcKey = row.pBlockData->aTSKEY[row.iRow];
} }
// no data in buffer, return immediately // no data in buffer, return immediately
if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) { if (!(pBlockScanInfo->iter.hasVal || pBlockScanInfo->iiter.hasVal)) {
break; break;
@ -4107,7 +4094,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
tsdbDataFileReaderClose(&pReader->pFileReader); tsdbDataFileReaderClose(&pReader->pFileReader);
} }
SCostSummary* pCost = &pReader->cost; SReadCostSummary* pCost = &pReader->cost;
SFilesetIter* pFilesetIter = &pReader->status.fileIter; SFilesetIter* pFilesetIter = &pReader->status.fileIter;
if (pFilesetIter->pLastBlockReader != NULL) { if (pFilesetIter->pLastBlockReader != NULL) {
SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader; SLastBlockReader* pLReader = pFilesetIter->pLastBlockReader;
@ -4122,6 +4109,7 @@ void tsdbReaderClose2(STsdbReader* pReader) {
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true); tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, true);
pReader->pReadSnap = NULL; pReader->pReadSnap = NULL;
tsem_destroy(&pReader->resumeAfterSuspend);
tsdbReleaseReader(pReader); tsdbReleaseReader(pReader);
tsdbUninitReaderLock(pReader); tsdbUninitReaderLock(pReader);
@ -4148,26 +4136,14 @@ void tsdbReaderClose2(STsdbReader* pReader) {
taosMemoryFreeClear(pReader); taosMemoryFreeClear(pReader);
} }
static void clearMemIterInfo(STableBlockScanInfo* pInfo) {
pInfo->iterInit = false;
pInfo->iter.hasVal = false;
pInfo->iiter.hasVal = false;
if (pInfo->iter.iter != NULL) {
pInfo->iter.iter = tsdbTbDataIterDestroy(pInfo->iter.iter);
}
if (pInfo->iiter.iter != NULL) {
pInfo->iiter.iter = tsdbTbDataIterDestroy(pInfo->iiter.iter);
}
}
int32_t tsdbReaderSuspend2(STsdbReader* pReader) { int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
// save reader's base state & reset top state to be reconstructed from base state // save reader's base state & reset top state to be reconstructed from base state
int32_t code = 0; int32_t code = 0;
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
STableBlockScanInfo* pBlockScanInfo = NULL; STableBlockScanInfo* pBlockScanInfo = NULL;
pReader->status.suspendInvoked = true; // record the suspend status
if (pStatus->loadFromFile) { if (pStatus->loadFromFile) {
SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter);
if (pBlockInfo != NULL) { if (pBlockInfo != NULL) {
@ -4181,55 +4157,34 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) {
tsdbDataFileReaderClose(&pReader->pFileReader); tsdbDataFileReaderClose(&pReader->pFileReader);
SCostSummary* pCost = &pReader->cost; SReadCostSummary* pCost = &pReader->cost;
pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost);
pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES);
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
clearMemIterInfo(pInfo);
pInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
pInfo->pFileDelData = taosArrayDestroy(pInfo->pFileDelData);
}
} else {
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
clearMemIterInfo(pInfo);
pInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
pInfo->delSkyline = taosArrayDestroy(pInfo->delSkyline);
}
pBlockScanInfo = (pStatus->pTableIter == NULL) ? NULL : *pStatus->pTableIter;
if (pBlockScanInfo) {
// save lastKey to restore memory iterator
STimeWindow w = pReader->resBlockInfo.pResBlock->info.window;
pBlockScanInfo->lastProcKey = ASCENDING_TRAVERSE(pReader->info.order) ? w.ekey : w.skey;
clearMemIterInfo(pBlockScanInfo);
pBlockScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT;
pBlockScanInfo->delSkyline = taosArrayDestroy(pBlockScanInfo->delSkyline);
pBlockScanInfo->pBlockList = taosArrayDestroy(pBlockScanInfo->pBlockList);
pBlockScanInfo->pBlockIdxList = taosArrayDestroy(pBlockScanInfo->pBlockIdxList);
// TODO: keep skyline for reuse
}
} }
// resetDataBlockScanInfo excluding lastKey
STableBlockScanInfo** p = NULL;
int32_t step = ASCENDING_TRAVERSE(pReader->info.order)? 1:-1;
int32_t iter = 0;
while ((p = tSimpleHashIterate(pStatus->pTableMap, p, &iter)) != NULL) {
STableBlockScanInfo* pInfo = *(STableBlockScanInfo**)p;
clearBlockScanInfo(pInfo);
pInfo->sttKeyInfo.nextProcKey = pInfo->lastProcKey + step;
}
pStatus->uidList.currentIndex = 0;
initReaderStatus(pStatus);
tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false); tsdbUntakeReadSnap2(pReader, pReader->pReadSnap, false);
pReader->pReadSnap = NULL; pReader->pReadSnap = NULL;
pReader->flag = READER_STATUS_SUSPEND; pReader->flag = READER_STATUS_SUSPEND;
#if SUSPEND_RESUME_TEST
tsem_post(&pReader->resumeAfterSuspend);
#endif
tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0, tsdbDebug("reader: %p suspended uid %" PRIu64 " in this query %s", pReader, pBlockScanInfo ? pBlockScanInfo->uid : 0,
pReader->idStr); pReader->idStr);
return code; return code;
@ -4384,6 +4339,16 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) {
SReaderStatus* pStatus = &pReader->status; SReaderStatus* pStatus = &pReader->status;
// NOTE: the following codes is used to perform test for suspend/resume for tsdbReader when it blocks the commit
// the data should be ingested in round-robin and all the child tables should be createted before ingesting data
// the version range of query will be used to identify the correctness of suspend/resume functions.
// this function will blocked before loading the SECOND block from vnode-buffer, and restart itself from sst-files
#if SUSPEND_RESUME_TEST
if (!pReader->status.suspendInvoked && !pReader->status.loadFromFile) {
tsem_wait(&pReader->resumeAfterSuspend);
}
#endif
code = tsdbAcquireReader(pReader); code = tsdbAcquireReader(pReader);
qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code);

View File

@ -96,7 +96,7 @@ typedef struct SResultBlockInfo {
int64_t capacity; int64_t capacity;
} SResultBlockInfo; } SResultBlockInfo;
typedef struct SCostSummary { typedef struct SReadCostSummary {
int64_t numOfBlocks; int64_t numOfBlocks;
double blockLoadTime; double blockLoadTime;
double buildmemBlock; double buildmemBlock;
@ -110,7 +110,7 @@ typedef struct SCostSummary {
double createScanInfoList; double createScanInfoList;
double createSkylineIterTime; double createSkylineIterTime;
double initLastBlockReader; double initLastBlockReader;
} SCostSummary; } SReadCostSummary;
typedef struct STableUidList { typedef struct STableUidList {
uint64_t* tableUidList; // access table uid list in uid ascending order list uint64_t* tableUidList; // access table uid list in uid ascending order list
@ -122,12 +122,6 @@ typedef struct {
int32_t numOfSttFiles; int32_t numOfSttFiles;
} SBlockNumber; } SBlockNumber;
typedef struct SBlockIndex {
int32_t ordinalIndex;
int64_t inFileOffset;
STimeWindow window; // todo replace it with overlap flag.
} SBlockIndex;
typedef struct SBlockOrderWrapper { typedef struct SBlockOrderWrapper {
int64_t uid; int64_t uid;
int64_t offset; int64_t offset;
@ -192,6 +186,7 @@ typedef struct SFileBlockDumpInfo {
} SFileBlockDumpInfo; } SFileBlockDumpInfo;
typedef struct SReaderStatus { typedef struct SReaderStatus {
bool suspendInvoked;
bool loadFromFile; // check file stage bool loadFromFile; // check file stage
bool composedDataBlock; // the returned data block is a composed block or not bool composedDataBlock; // the returned data block is a composed block or not
SSHashObj* pTableMap; // SHash<STableBlockScanInfo> SSHashObj* pTableMap; // SHash<STableBlockScanInfo>
@ -220,7 +215,8 @@ struct STsdbReader {
int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows int32_t type; // query type: 1. retrieve all data blocks, 2. retrieve direct prev|next rows
SBlockLoadSuppInfo suppInfo; SBlockLoadSuppInfo suppInfo;
STsdbReadSnap* pReadSnap; STsdbReadSnap* pReadSnap;
SCostSummary cost; tsem_t resumeAfterSuspend;
SReadCostSummary cost;
SHashObj** pIgnoreTables; SHashObj** pIgnoreTables;
SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema SSHashObj* pSchemaMap; // keep the retrieved schema info, to avoid the overhead by repeatly load schema
SDataFileReader* pFileReader; // the file reader SDataFileReader* pFileReader; // the file reader