Merge pull request #24833 from taosdata/opti/TD-28118

fix:change datablock to old version for compatibility
This commit is contained in:
dapan1121 2024-02-23 09:02:11 +08:00 committed by GitHub
commit a7b1ee9d59
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 62 additions and 36 deletions

View File

@ -32,6 +32,9 @@ typedef struct SBlockOrderInfo {
SColumnInfoData* pColData;
} SBlockOrderInfo;
#define BLOCK_VERSION_1 1
#define BLOCK_VERSION_2 2
#define NBIT (3u)
#define BitPos(_n) ((_n) & ((1 << NBIT) - 1))
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])

View File

@ -1850,7 +1850,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
char* pStart = p + len;
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i];
int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
int32_t* offset = (int32_t*)pStart;
@ -1949,8 +1949,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
char* pStart = p;
char* pStart1 = p1;
for (int32_t i = 0; i < numOfCols; ++i) {
int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i];
int32_t colLen1 = (blockVersion == 1) ? htonl(colLength1[i]) : colLength1[i];
int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
if (ASSERT(colLen < dataLen)) {
tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
return TSDB_CODE_TSC_INTERNAL_ERROR;
@ -2009,7 +2009,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
}
colLen1 = len;
totalLen += colLen1;
colLength1[i] = (blockVersion == 1) ? htonl(len) : len;
colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
len = numOfRows * sizeof(int32_t);
memcpy(pStart1, pStart, len);
@ -2098,7 +2098,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
char* pStart = p;
for (int32_t i = 0; i < numOfCols; ++i) {
if(blockVersion == 1){
if(blockVersion == BLOCK_VERSION_1){
colLength[i] = htonl(colLength[i]);
}
if (colLength[i] >= dataLen) {

View File

@ -1611,17 +1611,39 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
return pRspObj;
}
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
void changeByteEndian(char* pData){
char* p = pData;
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
// version:
int32_t blockVersion = *(int32_t*)p;
ASSERT(blockVersion == BLOCK_VERSION_1);
*(int32_t*)p = BLOCK_VERSION_2;
p += sizeof(int32_t);
p += sizeof(int32_t);
p += sizeof(int32_t);
int32_t cols = *(int32_t*)p;
p += sizeof(int32_t);
p += sizeof(int32_t);
p += sizeof(uint64_t);
// check fields
p += cols * (sizeof(int8_t) + sizeof(int32_t));
int32_t* colLength = (int32_t*)p;
for (int32_t i = 0; i < cols; ++i) {
colLength[i] = htonl(colLength[i]);
}
}
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) {
(*numOfRows) = 0;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
@ -1633,41 +1655,44 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg,
}
// extract the rows in this data packet
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i);
int64_t rows = htobe64(pRetrieve->numOfRows);
void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, i);
void* rawData = NULL;
int64_t rows = 0;
// deal with compatibility
if(*(int64_t*)pRetrieve == 0){
rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
}else if(*(int64_t*)pRetrieve == 1){
rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
}
pVg->numOfRows += rows;
(*numOfRows) += rows;
if (needTransformSchema) { // withSchema is false if subscribe subquery, true if subscribe db or stable
SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
if (schema) {
changeByteEndian(rawData);
if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable
SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
if(schema){
taosArrayPush(pRspObj->rsp.blockSchema, &schema);
}
}
}
}
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
pRspObj->resType = RES_TYPE__TMQ;
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, pRspObj);
return pRspObj;
}
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
pRspObj->resType = RES_TYPE__TMQ_METADATA;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
pRspObj->vgId = pWrapper->vgHandle->vgId;
pRspObj->resIter = -1;
memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
pRspObj->resInfo.totalRows = 0;
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
// extract the rows in this data packet
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i);
int64_t rows = htobe64(pRetrieve->numOfRows);
pVg->numOfRows += rows;
(*numOfRows) += rows;
}
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, (SMqRspObj*)pRspObj);
return pRspObj;
}

View File

@ -2196,7 +2196,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
// todo extract method
int32_t* version = (int32_t*)data;
*version = 2;
*version = BLOCK_VERSION_1;
data += sizeof(int32_t);
int32_t* actualLen = (int32_t*)data;
@ -2277,7 +2277,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
data += colSizes[col];
}
// colSizes[col] = htonl(colSizes[col]);
colSizes[col] = htonl(colSizes[col]);
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
// htonl(colSizes[col]), colSizes[col]);
}
@ -2342,9 +2342,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
pStart += sizeof(int32_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) {
if(version == 1){
colLen[i] = htonl(colLen[i]);
}
colLen[i] = htonl(colLen[i]);
ASSERT(colLen[i] >= 0);
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);

View File

@ -721,7 +721,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end;
}
fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength && version == 1) {
if (needChangeLength && version == BLOCK_VERSION_1) {
pStart += htonl(colLength[j]);
} else {
pStart += colLength[j];
@ -752,7 +752,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end;
}
fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength && version == 1) {
if (needChangeLength && version == BLOCK_VERSION_1) {
pStart += htonl(colLength[i]);
} else {
pStart += colLength[i];