refactor(query): do some internal refactor.
This commit is contained in:
parent
70927458e5
commit
bb2146cd75
|
@ -227,6 +227,9 @@ int32_t blockDataTrimFirstNRows(SSDataBlock* pBlock, size_t n);
|
||||||
|
|
||||||
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
|
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
|
||||||
|
|
||||||
|
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress);
|
||||||
|
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData);
|
||||||
|
|
||||||
void blockDebugShowData(const SArray* dataBlocks);
|
void blockDebugShowData(const SArray* dataBlocks);
|
||||||
|
|
||||||
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks, STSchema* pTSchema, int32_t vgId,
|
||||||
|
@ -246,54 +249,6 @@ static FORCE_INLINE int32_t blockCompressColData(SColumnInfoData* pColRes, int32
|
||||||
colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
|
colSize + COMP_OVERFLOW_BYTES, compressed, NULL, 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols,
|
|
||||||
int8_t needCompress) {
|
|
||||||
int32_t* actualLen = (int32_t*)data;
|
|
||||||
data += sizeof(int32_t);
|
|
||||||
|
|
||||||
uint64_t* groupId = (uint64_t*)data;
|
|
||||||
data += sizeof(uint64_t);
|
|
||||||
|
|
||||||
int32_t* colSizes = (int32_t*)data;
|
|
||||||
data += numOfCols * sizeof(int32_t);
|
|
||||||
|
|
||||||
*dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t));
|
|
||||||
|
|
||||||
int32_t numOfRows = pBlock->info.rows;
|
|
||||||
for (int32_t col = 0; col < numOfCols; ++col) {
|
|
||||||
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
|
|
||||||
|
|
||||||
// copy the null bitmap
|
|
||||||
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
|
|
||||||
size_t metaSize = numOfRows * sizeof(int32_t);
|
|
||||||
memcpy(data, pColRes->varmeta.offset, metaSize);
|
|
||||||
data += metaSize;
|
|
||||||
(*dataLen) += metaSize;
|
|
||||||
} else {
|
|
||||||
int32_t len = BitmapLen(numOfRows);
|
|
||||||
memcpy(data, pColRes->nullbitmap, len);
|
|
||||||
data += len;
|
|
||||||
(*dataLen) += len;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (needCompress) {
|
|
||||||
colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress);
|
|
||||||
data += colSizes[col];
|
|
||||||
(*dataLen) += colSizes[col];
|
|
||||||
} else {
|
|
||||||
colSizes[col] = colDataGetLength(pColRes, numOfRows);
|
|
||||||
(*dataLen) += colSizes[col];
|
|
||||||
memmove(data, pColRes->pData, colSizes[col]);
|
|
||||||
data += colSizes[col];
|
|
||||||
}
|
|
||||||
|
|
||||||
colSizes[col] = htonl(colSizes[col]);
|
|
||||||
}
|
|
||||||
|
|
||||||
*actualLen = *dataLen;
|
|
||||||
*groupId = pBlock->info.groupId;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -567,6 +567,7 @@ TEST(testCase, insert_test) {
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
TEST(testCase, projection_query_tables) {
|
TEST(testCase, projection_query_tables) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
@ -625,23 +626,23 @@ TEST(testCase, projection_query_tables) {
|
||||||
|
|
||||||
printf("start to insert next table\n");
|
printf("start to insert next table\n");
|
||||||
|
|
||||||
for(int32_t i = 0; i < 1000000; i += 20) {
|
// for(int32_t i = 0; i < 1000000; i += 20) {
|
||||||
char sql[1024] = {0};
|
// char sql[1024] = {0};
|
||||||
sprintf(sql,
|
// sprintf(sql,
|
||||||
"insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
// "insert into tu2 values(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
||||||
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
// "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
||||||
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
// "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)"
|
||||||
"(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
|
// "(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)(now+%da, %d)",
|
||||||
i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7,
|
// i, i, i + 1, i + 1, i + 2, i + 2, i + 3, i + 3, i + 4, i + 4, i + 5, i + 5, i + 6, i + 6, i + 7, i + 7,
|
||||||
i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14,
|
// i + 8, i + 8, i + 9, i + 9, i + 10, i + 10, i + 11, i + 11, i + 12, i + 12, i + 13, i + 13, i + 14, i + 14,
|
||||||
i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
|
// i + 15, i + 15, i + 16, i + 16, i + 17, i + 17, i + 18, i + 18, i + 19, i + 19);
|
||||||
TAOS_RES* p = taos_query(pConn, sql);
|
// TAOS_RES* p = taos_query(pConn, sql);
|
||||||
if (taos_errno(p) != 0) {
|
// if (taos_errno(p) != 0) {
|
||||||
printf("failed to insert data, reason:%s\n", taos_errstr(p));
|
// printf("failed to insert data, reason:%s\n", taos_errstr(p));
|
||||||
}
|
// }
|
||||||
|
//
|
||||||
taos_free_result(p);
|
// taos_free_result(p);
|
||||||
}
|
// }
|
||||||
|
|
||||||
// pRes = taos_query(pConn, "select * from tu");
|
// pRes = taos_query(pConn, "select * from tu");
|
||||||
// if (taos_errno(pRes) != 0) {
|
// if (taos_errno(pRes) != 0) {
|
||||||
|
@ -663,7 +664,7 @@ TEST(testCase, projection_query_tables) {
|
||||||
// taos_free_result(pRes);
|
// taos_free_result(pRes);
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
#if 0
|
||||||
TEST(testCase, projection_query_stables) {
|
TEST(testCase, projection_query_stables) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
@ -692,8 +693,6 @@ TEST(testCase, projection_query_stables) {
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
TEST(testCase, agg_query_tables) {
|
TEST(testCase, agg_query_tables) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
@ -734,5 +733,6 @@ TEST(testCase, agg_query_tables) {
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#pragma GCC diagnostic pop
|
#pragma GCC diagnostic pop
|
||||||
|
|
|
@ -1764,3 +1764,98 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
|
||||||
ret->length = htonl(ret->length);
|
ret->length = htonl(ret->length);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void blockCompressEncode(const SSDataBlock* pBlock, char* data, int32_t* dataLen, int32_t numOfCols, int8_t needCompress) {
|
||||||
|
int32_t* actualLen = (int32_t*)data;
|
||||||
|
data += sizeof(int32_t);
|
||||||
|
|
||||||
|
uint64_t* groupId = (uint64_t*)data;
|
||||||
|
data += sizeof(uint64_t);
|
||||||
|
|
||||||
|
int32_t* colSizes = (int32_t*)data;
|
||||||
|
data += numOfCols * sizeof(int32_t);
|
||||||
|
|
||||||
|
*dataLen = (numOfCols * sizeof(int32_t) + sizeof(uint64_t) + sizeof(int32_t));
|
||||||
|
|
||||||
|
int32_t numOfRows = pBlock->info.rows;
|
||||||
|
for (int32_t col = 0; col < numOfCols; ++col) {
|
||||||
|
SColumnInfoData* pColRes = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, col);
|
||||||
|
|
||||||
|
// copy the null bitmap
|
||||||
|
if (IS_VAR_DATA_TYPE(pColRes->info.type)) {
|
||||||
|
size_t metaSize = numOfRows * sizeof(int32_t);
|
||||||
|
memcpy(data, pColRes->varmeta.offset, metaSize);
|
||||||
|
data += metaSize;
|
||||||
|
(*dataLen) += metaSize;
|
||||||
|
} else {
|
||||||
|
int32_t len = BitmapLen(numOfRows);
|
||||||
|
memcpy(data, pColRes->nullbitmap, len);
|
||||||
|
data += len;
|
||||||
|
(*dataLen) += len;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (needCompress) {
|
||||||
|
colSizes[col] = blockCompressColData(pColRes, numOfRows, data, needCompress);
|
||||||
|
data += colSizes[col];
|
||||||
|
(*dataLen) += colSizes[col];
|
||||||
|
} else {
|
||||||
|
colSizes[col] = colDataGetLength(pColRes, numOfRows);
|
||||||
|
(*dataLen) += colSizes[col];
|
||||||
|
memmove(data, pColRes->pData, colSizes[col]);
|
||||||
|
data += colSizes[col];
|
||||||
|
}
|
||||||
|
|
||||||
|
colSizes[col] = htonl(colSizes[col]);
|
||||||
|
}
|
||||||
|
|
||||||
|
*actualLen = *dataLen;
|
||||||
|
*groupId = pBlock->info.groupId;
|
||||||
|
}
|
||||||
|
|
||||||
|
const char* blockCompressDecode(SSDataBlock* pBlock, int32_t numOfCols, int32_t numOfRows, const char* pData) {
|
||||||
|
blockDataEnsureCapacity(pBlock, numOfRows);
|
||||||
|
const char* pStart = pData;
|
||||||
|
|
||||||
|
int32_t dataLen = *(int32_t*)pStart;
|
||||||
|
pStart += sizeof(int32_t);
|
||||||
|
|
||||||
|
pBlock->info.groupId = *(uint64_t*)pStart;
|
||||||
|
pStart += sizeof(uint64_t);
|
||||||
|
|
||||||
|
int32_t* colLen = (int32_t*)pStart;
|
||||||
|
pStart += sizeof(int32_t) * numOfCols;
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
|
colLen[i] = htonl(colLen[i]);
|
||||||
|
ASSERT(colLen[i] >= 0);
|
||||||
|
|
||||||
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
||||||
|
pColInfoData->varmeta.length = colLen[i];
|
||||||
|
pColInfoData->varmeta.allocLen = colLen[i];
|
||||||
|
|
||||||
|
memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
|
||||||
|
pStart += sizeof(int32_t) * numOfRows;
|
||||||
|
|
||||||
|
if (colLen[i] > 0) {
|
||||||
|
pColInfoData->pData = taosMemoryMalloc(colLen[i]);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
|
||||||
|
pStart += BitmapLen(numOfRows);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (colLen[i] > 0) {
|
||||||
|
memcpy(pColInfoData->pData, pStart, colLen[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO
|
||||||
|
// setting this flag to true temporarily so aggregate function on stable will
|
||||||
|
// examine NULL value for non-primary key column
|
||||||
|
pColInfoData->hasNull = true;
|
||||||
|
pStart += colLen[i];
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pStart - pData == dataLen);
|
||||||
|
return pStart;
|
||||||
|
}
|
|
@ -356,8 +356,7 @@ SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pR
|
||||||
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId &&
|
if (pResultRowInfo->cur.pageId != -1 && ((pResult == NULL) || (pResult->pageId != pResultRowInfo->cur.pageId &&
|
||||||
pResult->offset != pResultRowInfo->cur.offset))) {
|
pResult->offset != pResultRowInfo->cur.offset))) {
|
||||||
SResultRowPosition pos = pResultRowInfo->cur;
|
SResultRowPosition pos = pResultRowInfo->cur;
|
||||||
SFilePage*
|
SFilePage* pPage = getBufPage(pResultBuf, pos.pageId);
|
||||||
pPage = getBufPage(pResultBuf, pos.pageId);
|
|
||||||
releaseBufPage(pResultBuf, pPage);
|
releaseBufPage(pResultBuf, pPage);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2523,46 +2522,7 @@ int32_t setSDataBlockFromFetchRsp(SSDataBlock* pRes, SLoadRemoteDataInfo* pLoadI
|
||||||
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
int32_t compLen, int32_t numOfOutput, int64_t startTs, uint64_t* total,
|
||||||
SArray* pColList) {
|
SArray* pColList) {
|
||||||
if (pColList == NULL) { // data from other sources
|
if (pColList == NULL) { // data from other sources
|
||||||
blockDataEnsureCapacity(pRes, numOfRows);
|
blockCompressDecode(pRes, numOfOutput, numOfRows, pData);
|
||||||
|
|
||||||
int32_t dataLen = *(int32_t*)pData;
|
|
||||||
pData += sizeof(int32_t);
|
|
||||||
|
|
||||||
pRes->info.groupId = *(uint64_t*)pData;
|
|
||||||
pData += sizeof(uint64_t);
|
|
||||||
|
|
||||||
int32_t* colLen = (int32_t*)pData;
|
|
||||||
|
|
||||||
char* pStart = pData + sizeof(int32_t) * numOfOutput;
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
|
||||||
colLen[i] = htonl(colLen[i]);
|
|
||||||
ASSERT(colLen[i] >= 0);
|
|
||||||
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pRes->pDataBlock, i);
|
|
||||||
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
|
|
||||||
pColInfoData->varmeta.length = colLen[i];
|
|
||||||
pColInfoData->varmeta.allocLen = colLen[i];
|
|
||||||
|
|
||||||
memcpy(pColInfoData->varmeta.offset, pStart, sizeof(int32_t) * numOfRows);
|
|
||||||
pStart += sizeof(int32_t) * numOfRows;
|
|
||||||
|
|
||||||
if (colLen[i] > 0) {
|
|
||||||
pColInfoData->pData = taosMemoryMalloc(colLen[i]);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
memcpy(pColInfoData->nullbitmap, pStart, BitmapLen(numOfRows));
|
|
||||||
pStart += BitmapLen(numOfRows);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (colLen[i] > 0) {
|
|
||||||
memcpy(pColInfoData->pData, pStart, colLen[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO setting this flag to true temporarily so aggregate function on stable will
|
|
||||||
// examine NULL value for non-primary key column
|
|
||||||
pColInfoData->hasNull = true;
|
|
||||||
pStart += colLen[i];
|
|
||||||
}
|
|
||||||
} else { // extract data according to pColList
|
} else { // extract data according to pColList
|
||||||
ASSERT(numOfOutput == taosArrayGetSize(pColList));
|
ASSERT(numOfOutput == taosArrayGetSize(pColList));
|
||||||
char* pStart = pData;
|
char* pStart = pData;
|
||||||
|
|
Loading…
Reference in New Issue