fix:change datablock to old version for compatibility

This commit is contained in:
wangmm0220 2024-02-22 15:44:50 +08:00
parent 0e1ef540c4
commit 4037a7a1a5
13 changed files with 28 additions and 23 deletions

View File

@ -32,6 +32,9 @@ typedef struct SBlockOrderInfo {
SColumnInfoData* pColData; SColumnInfoData* pColData;
} SBlockOrderInfo; } SBlockOrderInfo;
#define BLOCK_VERSION_1 1
#define BLOCK_VERSION_2 2
#define NBIT (3u) #define NBIT (3u)
#define BitPos(_n) ((_n) & ((1 << NBIT) - 1)) #define BitPos(_n) ((_n) & ((1 << NBIT) - 1))
#define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT]) #define BMCharPos(bm_, r_) ((bm_)[(r_) >> NBIT])
@ -253,7 +256,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index); SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);
int32_t blockGetEncodeSize(const SSDataBlock* pBlock); int32_t blockGetEncodeSize(const SSDataBlock* pBlock);
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols); int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols, int32_t bVersion);
const char* blockDecode(SSDataBlock* pBlock, const char* pData); const char* blockDecode(SSDataBlock* pBlock, const char* pData);
// for debug // for debug

View File

@ -1811,7 +1811,7 @@ static int32_t estimateJsonLen(SReqResultInfo* pResultInfo, int32_t numOfCols, i
char* pStart = p + len; char* pStart = p + len;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) { if (pResultInfo->fields[i].type == TSDB_DATA_TYPE_JSON) {
int32_t* offset = (int32_t*)pStart; int32_t* offset = (int32_t*)pStart;
@ -1910,8 +1910,8 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
char* pStart = p; char* pStart = p;
char* pStart1 = p1; char* pStart1 = p1;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
int32_t colLen = (blockVersion == 1) ? htonl(colLength[i]) : colLength[i]; int32_t colLen = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength[i]) : colLength[i];
int32_t colLen1 = (blockVersion == 1) ? htonl(colLength1[i]) : colLength1[i]; int32_t colLen1 = (blockVersion == BLOCK_VERSION_1) ? htonl(colLength1[i]) : colLength1[i];
if (ASSERT(colLen < dataLen)) { if (ASSERT(colLen < dataLen)) {
tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen); tscError("doConvertJson error: colLen:%d >= dataLen:%d", colLen, dataLen);
return TSDB_CODE_TSC_INTERNAL_ERROR; return TSDB_CODE_TSC_INTERNAL_ERROR;
@ -1970,7 +1970,7 @@ static int32_t doConvertJson(SReqResultInfo* pResultInfo, int32_t numOfCols, int
} }
colLen1 = len; colLen1 = len;
totalLen += colLen1; totalLen += colLen1;
colLength1[i] = (blockVersion == 1) ? htonl(len) : len; colLength1[i] = (blockVersion == BLOCK_VERSION_1) ? htonl(len) : len;
} else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) { } else if (IS_VAR_DATA_TYPE(pResultInfo->fields[i].type)) {
len = numOfRows * sizeof(int32_t); len = numOfRows * sizeof(int32_t);
memcpy(pStart1, pStart, len); memcpy(pStart1, pStart, len);
@ -2059,7 +2059,7 @@ int32_t setResultDataPtr(SReqResultInfo* pResultInfo, TAOS_FIELD* pFields, int32
char* pStart = p; char* pStart = p;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
if(blockVersion == 1){ if(blockVersion == BLOCK_VERSION_1){
colLength[i] = htonl(colLength[i]); colLength[i] = htonl(colLength[i]);
} }
if (colLength[i] >= dataLen) { if (colLength[i] >= dataLen) {

View File

@ -499,7 +499,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS); (*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS); int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS, BLOCK_VERSION_1);
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
if (len != rspSize - sizeof(SRetrieveTableRsp)) { if (len != rspSize - sizeof(SRetrieveTableRsp)) {
@ -611,7 +611,7 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS); (*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS); int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS, BLOCK_VERSION_1);
blockDataDestroy(pBlock); blockDataDestroy(pBlock);
if (len != rspSize - sizeof(SRetrieveTableRsp)) { if (len != rspSize - sizeof(SRetrieveTableRsp)) {

View File

@ -2191,12 +2191,12 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) { int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols, int32_t bVersion) {
int32_t dataLen = 0; int32_t dataLen = 0;
// todo extract method // todo extract method
int32_t* version = (int32_t*)data; int32_t* version = (int32_t*)data;
*version = 2; *version = bVersion;
data += sizeof(int32_t); data += sizeof(int32_t);
int32_t* actualLen = (int32_t*)data; int32_t* actualLen = (int32_t*)data;
@ -2277,7 +2277,9 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
data += colSizes[col]; data += colSizes[col];
} }
// colSizes[col] = htonl(colSizes[col]); if(bVersion == BLOCK_VERSION_1){
colSizes[col] = htonl(colSizes[col]);
}
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type, // uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
// htonl(colSizes[col]), colSizes[col]); // htonl(colSizes[col]), colSizes[col]);
} }
@ -2338,7 +2340,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
pStart += sizeof(int32_t) * numOfCols; pStart += sizeof(int32_t) * numOfCols;
for (int32_t i = 0; i < numOfCols; ++i) { for (int32_t i = 0; i < numOfCols; ++i) {
if(version == 1){ if(version == BLOCK_VERSION_1){
colLen[i] = htonl(colLen[i]); colLen[i] = htonl(colLen[i]);
} }
ASSERT(colLen[i] >= 0); ASSERT(colLen[i] >= 0);

View File

@ -401,7 +401,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
pStart += sizeof(SSysTableSchema); pStart += sizeof(SSysTableSchema);
} }
int32_t len = blockEncode(pBlock, pStart, numOfCols); int32_t len = blockEncode(pBlock, pStart, numOfCols, BLOCK_VERSION_1);
pRsp->numOfRows = htonl(pBlock->info.rows); pRsp->numOfRows = htonl(pBlock->info.rows);
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision

View File

@ -328,7 +328,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
pStart += sizeof(SSysTableSchema); pStart += sizeof(SSysTableSchema);
} }
int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns); int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns, BLOCK_VERSION_1);
} }
pRsp->numOfRows = htonl(rowsRead); pRsp->numOfRows = htonl(rowsRead);

View File

@ -3150,7 +3150,7 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
// pStart += sizeof(SSysTableSchema); // pStart += sizeof(SSysTableSchema);
// } // }
// //
// int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns); // int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns, BLOCK_VERSION_1);
// } // }
// //
// pRsp->numOfRows = htonl(rowsRead); // pRsp->numOfRows = htonl(rowsRead);

View File

@ -28,7 +28,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
pRetrieve->compressed = 0; pRetrieve->compressed = 0;
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_2);
actualLen += sizeof(SRetrieveTableRspForTmq); actualLen += sizeof(SRetrieveTableRspForTmq);
taosArrayPush(pRsp->blockDataLen, &actualLen); taosArrayPush(pRsp->blockDataLen, &actualLen);
taosArrayPush(pRsp->blockData, &buf); taosArrayPush(pRsp->blockData, &buf);

View File

@ -40,7 +40,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows); (*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
(*pRsp)->numOfCols = htonl(numOfCols); (*pRsp)->numOfCols = htonl(numOfCols);
int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols); int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols, BLOCK_VERSION_1);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -1817,7 +1817,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
rsp->completed = 1; rsp->completed = 1;
rsp->numOfRows = htobe64((int64_t)rowNum); rsp->numOfRows = htobe64((int64_t)rowNum);
int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock)); int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock), BLOCK_VERSION_1);
rsp->compLen = htonl(len); rsp->compLen = htonl(len);

View File

@ -76,7 +76,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
pEntry->dataLen = 0; pEntry->dataLen = 0;
pBuf->useSize = sizeof(SDataCacheEntry); pBuf->useSize = sizeof(SDataCacheEntry);
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols); pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols, BLOCK_VERSION_1);
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8)); // ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4)); // ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));

View File

@ -718,7 +718,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end; goto end;
} }
fields += sizeof(int8_t) + sizeof(int32_t); fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength && version == 1) { if (needChangeLength && version == BLOCK_VERSION_1) {
pStart += htonl(colLength[j]); pStart += htonl(colLength[j]);
} else { } else {
pStart += colLength[j]; pStart += colLength[j];
@ -749,7 +749,7 @@ int rawBlockBindData(SQuery* query, STableMeta* pTableMeta, void* data, SVCreate
goto end; goto end;
} }
fields += sizeof(int8_t) + sizeof(int32_t); fields += sizeof(int8_t) + sizeof(int32_t);
if (needChangeLength && version == 1) { if (needChangeLength && version == BLOCK_VERSION_1) {
pStart += htonl(colLength[i]); pStart += htonl(colLength[i]);
} else { } else {
pStart += colLength[i]; pStart += colLength[i];

View File

@ -188,7 +188,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
pRetrieve->ekey = htobe64(pBlock->info.window.ekey); pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
pRetrieve->version = htobe64(pBlock->info.version); pRetrieve->version = htobe64(pBlock->info.version);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_1);
SStreamRetrieveReq req = { SStreamRetrieveReq req = {
.streamId = pTask->id.streamId, .streamId = pTask->id.streamId,
@ -772,7 +772,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock); int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
pRetrieve->numOfCols = htonl(numOfCols); pRetrieve->numOfCols = htonl(numOfCols);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols); int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_1);
actualLen += sizeof(SRetrieveTableRsp); actualLen += sizeof(SRetrieveTableRsp);
ASSERT(actualLen <= dataStrLen); ASSERT(actualLen <= dataStrLen);
taosArrayPush(pReq->dataLen, &actualLen); taosArrayPush(pReq->dataLen, &actualLen);