fix(query): do some internal refactor.
This commit is contained in:
parent
640338893d
commit
116d1451e4
|
@ -29,12 +29,14 @@ typedef struct SCacheRowsReader {
|
||||||
SArray* pTableList; // table id list
|
SArray* pTableList; // table id list
|
||||||
} SCacheRowsReader;
|
} SCacheRowsReader;
|
||||||
|
|
||||||
|
#define HASTYPE(_type, _t) (((_type) & (_t)) == (_t))
|
||||||
|
|
||||||
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
|
static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds,
|
||||||
void** pRes) {
|
void** pRes) {
|
||||||
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
|
ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock));
|
||||||
int32_t numOfRows = pBlock->info.rows;
|
int32_t numOfRows = pBlock->info.rows;
|
||||||
|
|
||||||
if ((pReader->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) {
|
if (HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
|
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]);
|
||||||
|
@ -54,7 +56,7 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
|
||||||
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) {
|
||||||
varDataSetLen(p->buf, pColVal->colVal.value.nData);
|
varDataSetLen(p->buf, pColVal->colVal.value.nData);
|
||||||
memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData);
|
||||||
p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE;
|
p->bytes = pColVal->colVal.value.nData + VARSTR_HEADER_SIZE; // binary needs to plus the header size
|
||||||
} else {
|
} else {
|
||||||
memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
|
memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes);
|
||||||
p->bytes = pReader->pSchema->columns[slotId].bytes;
|
p->bytes = pReader->pSchema->columns[slotId].bytes;
|
||||||
|
@ -62,12 +64,13 @@ static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pRea
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// pColInfoData->info.bytes includes the VARSTR_HEADER_SIZE, need to substruct it
|
||||||
p->hasResult = true;
|
p->hasResult = true;
|
||||||
varDataSetLen(pRes[i], pColInfoData->info.bytes);
|
varDataSetLen(pRes[i], pColInfoData->info.bytes - VARSTR_HEADER_SIZE);
|
||||||
colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
|
colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
ASSERT((pReader->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW);
|
ASSERT(HASTYPE(pReader->type, CACHESCAN_RETRIEVE_LAST_ROW));
|
||||||
|
|
||||||
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
for (int32_t i = 0; i < pReader->numOfCols; ++i) {
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
@ -177,6 +180,13 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void freeItem(void* pItem) {
|
||||||
|
SLastCol* pCol = (SLastCol*) pItem;
|
||||||
|
if (IS_VAR_DATA_TYPE(pCol->colVal.type)) {
|
||||||
|
taosMemoryFree(pCol->colVal.value.pData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
|
int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32_t* slotIds, SArray* pTableUidList) {
|
||||||
if (pReader == NULL || pResBlock == NULL) {
|
if (pReader == NULL || pResBlock == NULL) {
|
||||||
return TSDB_CODE_INVALID_PARA;
|
return TSDB_CODE_INVALID_PARA;
|
||||||
|
@ -217,7 +227,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
if (IS_VAR_DATA_TYPE(pCol->type)) {
|
||||||
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
|
p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char));
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayPush(pLastCols, &p);
|
taosArrayPush(pLastCols, &p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,19 +246,25 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
|
|
||||||
{
|
{
|
||||||
for (int32_t k = 0; k < pr->numOfCols; ++k) {
|
for (int32_t k = 0; k < pr->numOfCols; ++k) {
|
||||||
SLastCol* p = taosArrayGet(pLastCols, k);
|
int32_t slotId = slotIds[k];
|
||||||
if (slotIds[k] == -1) { // the primary timestamp
|
|
||||||
SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, k);
|
if (slotId == -1) { // the primary timestamp
|
||||||
|
SLastCol* p = taosArrayGet(pLastCols, 0);
|
||||||
|
SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, 0);
|
||||||
if (pCol->ts > p->ts) {
|
if (pCol->ts > p->ts) {
|
||||||
hasRes = true;
|
hasRes = true;
|
||||||
p->ts = pCol->ts;
|
p->ts = pCol->ts;
|
||||||
p->colVal = pCol->colVal;
|
p->colVal = pCol->colVal;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
int32_t slotId = slotIds[k];
|
SLastCol* p = taosArrayGet(pLastCols, slotId);
|
||||||
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId);
|
||||||
|
|
||||||
if (pColVal->ts > p->ts) {
|
if (pColVal->ts > p->ts) {
|
||||||
|
if (!COL_VAL_IS_VALUE(&pColVal->colVal) && HASTYPE(pr->type, CACHESCAN_RETRIEVE_LAST)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
hasRes = true;
|
hasRes = true;
|
||||||
p->ts = pColVal->ts;
|
p->ts = pColVal->ts;
|
||||||
|
|
||||||
|
@ -308,6 +323,6 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(pRes);
|
taosMemoryFree(pRes);
|
||||||
taosArrayDestroy(pLastCols);
|
taosArrayDestroyEx(pLastCols, freeItem);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -39,6 +39,7 @@ if $data02 != 5.000000000 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data03 != 3 then
|
if $data03 != 3 then
|
||||||
|
print expect 3, actual: $data03
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data04 != @70-01-01 07:59:57.000@ then
|
if $data04 != @70-01-01 07:59:57.000@ then
|
||||||
|
@ -210,7 +211,7 @@ if $data01 != 6 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data02 != 37.000000000 then
|
if $data02 != 37.000000000 then
|
||||||
print $data02
|
print expect 37.000000000 actual: $data02
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data03 != 27 then
|
if $data03 != 27 then
|
||||||
|
@ -233,7 +234,7 @@ if $data01 != 6 then
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data02 != 37.000000000 then
|
if $data02 != 37.000000000 then
|
||||||
print $data02
|
print expect 37.000000000, acutal: $data02
|
||||||
return -1
|
return -1
|
||||||
endi
|
endi
|
||||||
if $data03 != 27 then
|
if $data03 != 27 then
|
||||||
|
|
Loading…
Reference in New Issue