fix(query): fix bug in cached last query.
This commit is contained in:
parent
faa5ace174
commit
60e7f2800b
|
@ -30,36 +30,69 @@ typedef struct SCacheRowsReader {
|
|||
} SCacheRowsReader;
|
||||
|
||||
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
|
||||
SFirstLastRes** pRes) {
|
||||
void** pRes) {
|
||||
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
|
||||
int32_t numOfRows = pBlock->info.rows;
|
||||
|
||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
if ((pReader->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) {
|
||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
|
||||
|
||||
if (slotIds[i] == -1) { // the primary timestamp
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
||||
pRes[i]->ts = pColVal->ts;
|
||||
memcpy(pRes[i]->buf, &pColVal->ts, TSDB_KEYSIZE);
|
||||
} else {
|
||||
int32_t slotId = slotIds[i];
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||
if (slotIds[i] == -1) { // the primary timestamp
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
||||
p->ts = pColVal->ts;
|
||||
p->bytes = TSDB_KEYSIZE;
|
||||
*(int64_t*)p->buf = pColVal->ts;
|
||||
} else {
|
||||
int32_t slotId = slotIds[i];
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||
|
||||
pRes[i]->ts = pColVal->ts;
|
||||
pRes[i]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
|
||||
p->ts = pColVal->ts;
|
||||
p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
|
||||
if (!p->isNull) {
|
||||
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
varDataSetLen(p->buf, pColVal->colVal.value.nData);
|
||||
memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
p->bytes = pColVal->colVal.value.nData;
|
||||
} else {
|
||||
memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
|
||||
p->bytes = pReader->pSchema->columns[slotId].bytes;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!pRes[i]->isNull) {
|
||||
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
varDataSetLen(pRes[i]->buf, pColVal->colVal.value.nData);
|
||||
memcpy(varDataVal(pRes[i]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
p->hasResult = true;
|
||||
varDataSetLen(pRes[i], pColInfoData->info.bytes);
|
||||
colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
|
||||
}
|
||||
} else {
|
||||
ASSERT((pReader->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
|
||||
|
||||
SColVal colVal = {0};
|
||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
|
||||
if (slotIds[i] == -1) {
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0);
|
||||
colDataAppend(pColInfoData, numOfRows, (const char*)&pColVal->ts, false);
|
||||
} else {
|
||||
int32_t slotId = slotIds[i];
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(colVal.type)) {
|
||||
if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
|
||||
colDataAppendNULL(pColInfoData, numOfRows);
|
||||
} else {
|
||||
varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData);
|
||||
memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData);
|
||||
colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false);
|
||||
}
|
||||
} else {
|
||||
memcpy(pRes[i]->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
|
||||
colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&pColVal->colVal));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pRes[i]->hasResult = true;
|
||||
colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
|
||||
}
|
||||
|
||||
pBlock->info.rows += 1;
|
||||
|
@ -126,24 +159,17 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) {
|
||||
code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// no data in the table of Uid
|
||||
if (*h != NULL) {
|
||||
*pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
|
||||
}
|
||||
} else {
|
||||
code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
// no data in the table of Uid
|
||||
if (*h != NULL) {
|
||||
*pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
|
||||
}
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
// no data in the table of Uid
|
||||
if (*h != NULL) {
|
||||
*pRow = (SArray*)taosLRUCacheValue(lruCache, *h);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -163,10 +189,26 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
size_t numOfTables = taosArrayGetSize(pr->pTableList);
|
||||
bool hasRes = false;
|
||||
|
||||
SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
|
||||
void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES);
|
||||
for (int32_t j = 0; j < pr->numOfCols; ++j) {
|
||||
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes);
|
||||
pRes[j]->ts = INT64_MIN;
|
||||
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE);
|
||||
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);
|
||||
p->ts = INT64_MIN;
|
||||
}
|
||||
|
||||
SArray* pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol));
|
||||
for (int32_t i = 0; i < pr->numOfCols; ++i) {
|
||||
SLastCol p = {0};
|
||||
p.ts = INT64_MIN;
|
||||
|
||||
struct STColumn* pCol = &pr->pSchema->columns[i];
|
||||
p.colVal.type = pCol->type;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
|
||||
}
|
||||
|
||||
taosArrayPush(pLastCols, &p);
|
||||
}
|
||||
|
||||
// retrieve the only one last row of all tables in the uid list.
|
||||
|
@ -185,38 +227,32 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
|
||||
{
|
||||
for (int32_t k = 0; k < pr->numOfCols; ++k) {
|
||||
SColumnInfoData* pColInfoData = taosArrayGet(pResBlock->pDataBlock, k);
|
||||
|
||||
SLastCol* p = taosArrayGet(pLastCols, k);
|
||||
if (slotIds[k] == -1) { // the primary timestamp
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k);
|
||||
if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) {
|
||||
SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, k);
|
||||
if (pCol->ts > p->ts) {
|
||||
hasRes = true;
|
||||
pRes[k]->hasResult = true;
|
||||
pRes[k]->ts = pColVal->ts;
|
||||
memcpy(pRes[k]->buf, &pColVal->ts, TSDB_KEYSIZE);
|
||||
|
||||
colDataAppend(pColInfoData, 1, (const char*)pRes[k], false);
|
||||
p->ts = pCol->ts;
|
||||
p->colVal = pCol->colVal;
|
||||
}
|
||||
} else {
|
||||
int32_t slotId = slotIds[k];
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||
|
||||
if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) {
|
||||
if (pColVal->ts > p->ts) {
|
||||
hasRes = true;
|
||||
pRes[k]->hasResult = true;
|
||||
pRes[k]->ts = pColVal->ts;
|
||||
p->ts = pColVal->ts;
|
||||
|
||||
pRes[k]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal);
|
||||
if (!pRes[k]->isNull) {
|
||||
if (!COL_VAL_IS_VALUE(&pColVal->colVal)) {
|
||||
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||
varDataSetLen(pRes[k]->buf, pColVal->colVal.value.nData);
|
||||
memcpy(varDataVal(pRes[k]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
uint8_t* px = p->colVal.value.pData;
|
||||
p->colVal = pColVal->colVal;
|
||||
p->colVal.value.pData = px;
|
||||
memcpy(px, pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||
} else {
|
||||
memcpy(pRes[k]->buf, &pColVal->colVal.value, pr->pSchema->columns[slotId].bytes);
|
||||
p->colVal = pColVal->colVal;
|
||||
}
|
||||
}
|
||||
|
||||
colDataAppend(pColInfoData, 1, (const char*)pRes[k], false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -227,6 +263,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
|
||||
if (hasRes) {
|
||||
pResBlock->info.rows = 1;
|
||||
saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes);
|
||||
}
|
||||
|
||||
} else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) {
|
||||
|
|
|
@ -2232,6 +2232,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic
|
|||
cxt.doAgg = false;
|
||||
nodesWalkExprs(pScan->pScanCols, lastRowScanOptSetColDataType, &cxt);
|
||||
nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt);
|
||||
nodesWalkExprs(pScan->node.pTargets, lastRowScanOptSetColDataType, &cxt);
|
||||
nodesClearList(cxt.pLastCols);
|
||||
}
|
||||
pAgg->hasLastRow = false;
|
||||
|
|
Loading…
Reference in New Issue