fix(tsdb): fix errors identified by CI.
This commit is contained in:
parent
f2a9fef804
commit
750ea5789f
|
@ -141,7 +141,7 @@ static int32_t tGetPrimaryKeyIndex(uint8_t *p, SPrimaryKeyIndex *index) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
|
static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
|
||||||
pKey->ts = pKey->ts;
|
pKey->ts = pRow->ts;
|
||||||
pKey->numOfPKs = pRow->numOfPKs;
|
pKey->numOfPKs = pRow->numOfPKs;
|
||||||
if (pKey->numOfPKs == 0) {
|
if (pKey->numOfPKs == 0) {
|
||||||
return;
|
return;
|
||||||
|
@ -164,7 +164,7 @@ static void tRowGetKeyDeepCopy(SRow* pRow, SRowKey* pKey) {
|
||||||
pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData);
|
pKey->pks[i].pData = memcpy(pKey->pks[i].pData, data + indices[i].offset, pKey->pks[i].nData);
|
||||||
pKey->pks[i].pData += pKey->pks[i].nData;
|
pKey->pks[i].pData += pKey->pks[i].nData;
|
||||||
} else {
|
} else {
|
||||||
pKey->pks[i].val = *(int64_t*) data + indices[i].offset;
|
pKey->pks[i].val = *(int64_t*) (data + indices[i].offset);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -694,7 +694,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
|
||||||
w.ekey = pScanInfo->lastProcKey.ts;
|
w.ekey = pScanInfo->lastProcKey.ts;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (/*isEmptyQueryTimeWindow(&w)*/ w.ekey - w.skey < 2) { // NOTE: specialized for open interval
|
if (/*isEmptyQueryTimeWindow(&w)*/ w.ekey - w.skey < 1) { // NOTE: specialized for open interval
|
||||||
k += 1;
|
k += 1;
|
||||||
|
|
||||||
if (k >= numOfTables) {
|
if (k >= numOfTables) {
|
||||||
|
@ -2012,41 +2012,58 @@ int32_t doInitMemDataIter(STsdbReader* pReader, STbData** pData, STableBlockScan
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
||||||
|
SRowKey rowKey;
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
TSDBROW* pRow = getValidMemRow(pIter, pBlockScanInfo->delSkyline, pReader);
|
||||||
|
if (!pIter->hasVal) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
tRowGetKeyEx(pRow, &rowKey);
|
||||||
|
int32_t ret = pkCompEx(pReader->pkComparFn, pKey, &rowKey);
|
||||||
|
if (ret == 0) {
|
||||||
|
pIter->hasVal = tsdbTbDataIterNext(pIter->iter);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handle the open interval issue. Find the first row key that is greater than the given one.
|
||||||
|
static int32_t forwardDataIter(SRowKey* pKey, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
||||||
|
doForwardDataIter(pKey, &pBlockScanInfo->iter, pBlockScanInfo, pReader);
|
||||||
|
doForwardDataIter(pKey, &pBlockScanInfo->iiter, pBlockScanInfo, pReader);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) {
|
||||||
|
STbData* d = NULL;
|
||||||
|
STbData* di = NULL;
|
||||||
|
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
|
||||||
|
STsdbReadSnap* pSnap = pReader->pReadSnap;
|
||||||
|
|
||||||
if (pBlockScanInfo->iterInit) {
|
if (pBlockScanInfo->iterInit) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
STbData* d = NULL;
|
|
||||||
STsdbRowKey startKey = {0};
|
STsdbRowKey startKey = {0};
|
||||||
if (ASCENDING_TRAVERSE(pReader->info.order)) {
|
tRowKeyAssign(&startKey.key, &pBlockScanInfo->lastProcKey);
|
||||||
startKey = (STsdbRowKey){.version = pReader->info.verRange.minVer,
|
startKey.version = asc ? pReader->info.verRange.minVer : pReader->info.verRange.maxVer;
|
||||||
.key = {
|
|
||||||
.ts = pBlockScanInfo->lastProcKey.ts + 1,
|
|
||||||
.numOfPKs = pReader->suppInfo.numOfPks,
|
|
||||||
}};
|
|
||||||
} else {
|
|
||||||
startKey = (STsdbRowKey){.version = pReader->info.verRange.maxVer,
|
|
||||||
.key = {
|
|
||||||
.ts = pBlockScanInfo->lastProcKey.ts - 1,
|
|
||||||
.numOfPKs = pReader->suppInfo.numOfPks,
|
|
||||||
}};
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t code =
|
int32_t code = doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pSnap->pMem, &pBlockScanInfo->iter, "mem");
|
||||||
doInitMemDataIter(pReader, &d, pBlockScanInfo, &startKey, pReader->pReadSnap->pMem, &pBlockScanInfo->iter, "mem");
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
STbData* di = NULL;
|
code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pSnap->pIMem, &pBlockScanInfo->iiter, "imem");
|
||||||
code = doInitMemDataIter(pReader, &di, pBlockScanInfo, &startKey, pReader->pReadSnap->pIMem, &pBlockScanInfo->iiter,
|
|
||||||
"imem");
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer);
|
loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer);
|
||||||
|
forwardDataIter(&startKey.key, pBlockScanInfo, pReader);
|
||||||
|
|
||||||
pBlockScanInfo->iterInit = true;
|
pBlockScanInfo->iterInit = true;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -4128,6 +4145,9 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi
|
||||||
blockDataEnsureCapacity(pResBlock, capacity);
|
blockDataEnsureCapacity(pResBlock, capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// for debug purpose
|
||||||
|
// capacity = 7;
|
||||||
|
|
||||||
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
|
int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
goto _err;
|
goto _err;
|
||||||
|
@ -4894,6 +4914,10 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) {
|
||||||
|
|
||||||
SReaderStatus* pStatus = &pTReader->status;
|
SReaderStatus* pStatus = &pTReader->status;
|
||||||
if (pStatus->composedDataBlock || pReader->info.execMode == READER_EXEC_ROWS) {
|
if (pStatus->composedDataBlock || pReader->info.execMode == READER_EXEC_ROWS) {
|
||||||
|
|
||||||
|
// tsdbReaderSuspend2(pReader);
|
||||||
|
// tsdbReaderResume2(pReader);
|
||||||
|
|
||||||
return pTReader->resBlockInfo.pResBlock;
|
return pTReader->resBlockInfo.pResBlock;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4904,6 +4928,7 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) {
|
||||||
|
|
||||||
// tsdbReaderSuspend2(pReader);
|
// tsdbReaderSuspend2(pReader);
|
||||||
// tsdbReaderResume2(pReader);
|
// tsdbReaderResume2(pReader);
|
||||||
|
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -633,7 +633,7 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey* pSrc) {
|
||||||
pDst->numOfPKs = pSrc->numOfPKs;
|
pDst->numOfPKs = pSrc->numOfPKs;
|
||||||
|
|
||||||
if (pSrc->numOfPKs > 0) {
|
if (pSrc->numOfPKs > 0) {
|
||||||
for (int32_t i = 0; i < pDst->numOfPKs; ++i) {
|
for (int32_t i = 0; i < pSrc->numOfPKs; ++i) {
|
||||||
SValue *pVal = &pDst->pks[i];
|
SValue *pVal = &pDst->pks[i];
|
||||||
pVal->type = pSrc->pks[i].type;
|
pVal->type = pSrc->pks[i].type;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue