Merge pull request #26726 from taosdata/fix/TD-31068-3.0
fix: (last cache) compare pk when merging ctb data
This commit is contained in:
commit
4139dc6663
|
@ -294,6 +294,14 @@ void* tsdbCacherowsReaderClose(void* pReader) {
|
|||
|
||||
taosMemoryFree(p->pCurrSchema);
|
||||
|
||||
if (p->rowKey.numOfPKs > 0) {
|
||||
for (int32_t i = 0; i < p->rowKey.numOfPKs; i++) {
|
||||
if (IS_VAR_DATA_TYPE(p->rowKey.pks[i].type)) {
|
||||
taosMemoryFree(p->rowKey.pks[i].pData);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (p->pLDataIterArray) {
|
||||
destroySttBlockReader(p->pLDataIterArray, NULL);
|
||||
}
|
||||
|
@ -325,13 +333,26 @@ void* tsdbCacherowsReaderClose(void* pReader) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
static void freeItem(void* pItem) {
|
||||
static void freeItemOfRow(void* pItem) {
|
||||
SLastCol* pCol = (SLastCol*)pItem;
|
||||
if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) {
|
||||
taosMemoryFree(pCol->colVal.value.pData);
|
||||
}
|
||||
}
|
||||
|
||||
static void freeItemWithPk(void* pItem) {
|
||||
SLastCol* pCol = (SLastCol*)pItem;
|
||||
for (int i = 0; i < pCol->rowKey.numOfPKs; i++) {
|
||||
if (IS_VAR_DATA_TYPE(pCol->rowKey.pks[i].type)) {
|
||||
taosMemoryFree(pCol->rowKey.pks[i].pData);
|
||||
}
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->colVal.value.type) && pCol->colVal.value.pData) {
|
||||
taosMemoryFree(pCol->colVal.value.pData);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t tsdbCacheQueryReseek(void* pQHandle) {
|
||||
int32_t code = 0;
|
||||
SCacheRowsReader* pReader = pQHandle;
|
||||
|
@ -407,6 +428,16 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
struct STColumn* pCol = &pr->pSchema->columns[slotId];
|
||||
SLastCol p = {.rowKey.ts = INT64_MIN, .colVal.value.type = pCol->type, .colVal.flag = CV_FLAG_NULL};
|
||||
|
||||
if (pr->rowKey.numOfPKs > 0) {
|
||||
p.rowKey.numOfPKs = pr->rowKey.numOfPKs;
|
||||
for (int32_t j = 0; j < pr->rowKey.numOfPKs; j++) {
|
||||
p.rowKey.pks[j].type = pr->pkColumn.type;
|
||||
if (IS_VAR_DATA_TYPE(pr->pkColumn.type)) {
|
||||
p.rowKey.pks[j].pData = taosMemoryCalloc(1, pr->pkColumn.bytes);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
|
||||
}
|
||||
|
@ -420,7 +451,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
|
||||
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
|
||||
if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
taosArrayClearEx(pRow, freeItemOfRow);
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -432,7 +463,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
SLastCol* p = taosArrayGet(pLastCols, k);
|
||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k);
|
||||
|
||||
if (pColVal->rowKey.ts > p->rowKey.ts) {
|
||||
if (tRowKeyCompare(&pColVal->rowKey, &p->rowKey) > 0) {
|
||||
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||
if (!COL_VAL_IS_VALUE(&p->colVal)) {
|
||||
hasNotNullRow = false;
|
||||
|
@ -445,6 +476,15 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
|
||||
hasRes = true;
|
||||
p->rowKey.ts = pColVal->rowKey.ts;
|
||||
for (int32_t j = 0; j < p->rowKey.numOfPKs; j++) {
|
||||
if (IS_VAR_DATA_TYPE(p->rowKey.pks[j].type)) {
|
||||
memcpy(p->rowKey.pks[j].pData, pColVal->rowKey.pks[j].pData, pColVal->rowKey.pks[j].nData);
|
||||
p->rowKey.pks[j].nData = pColVal->rowKey.pks[j].nData;
|
||||
} else {
|
||||
p->rowKey.pks[j].val = pColVal->rowKey.pks[j].val;
|
||||
}
|
||||
}
|
||||
|
||||
if (k == 0) {
|
||||
if (TARRAY_SIZE(pTableUidList) == 0) {
|
||||
taosArrayPush(pTableUidList, &uid);
|
||||
|
@ -483,26 +523,26 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
|||
}
|
||||
}
|
||||
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
taosArrayClearEx(pRow, freeItemOfRow);
|
||||
}
|
||||
|
||||
if (hasRes) {
|
||||
saveOneRow(pLastCols, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
||||
}
|
||||
|
||||
taosArrayDestroyEx(pLastCols, freeItem);
|
||||
taosArrayDestroyEx(pLastCols, freeItemWithPk);
|
||||
} else if (HASTYPE(pr->type, CACHESCAN_RETRIEVE_TYPE_ALL)) {
|
||||
for (int32_t i = pr->tableIndex; i < pr->numOfTables; ++i) {
|
||||
tb_uid_t uid = pTableList[i].uid;
|
||||
|
||||
tsdbCacheGetBatch(pr->pTsdb, uid, pRow, pr, ltype);
|
||||
if (TARRAY_SIZE(pRow) <= 0 || COL_VAL_IS_NONE(&((SLastCol*)TARRAY_DATA(pRow))[0].colVal)) {
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
taosArrayClearEx(pRow, freeItemOfRow);
|
||||
continue;
|
||||
}
|
||||
|
||||
saveOneRow(pRow, pResBlock, pr, slotIds, dstSlotIds, pRes, pr->idstr);
|
||||
taosArrayClearEx(pRow, freeItem);
|
||||
taosArrayClearEx(pRow, freeItemOfRow);
|
||||
|
||||
taosArrayPush(pTableUidList, &uid);
|
||||
|
||||
|
|
Loading…
Reference in New Issue