fix:change datablock to old version for compatibility
This commit is contained in:
parent
4037a7a1a5
commit
604ae220f3
|
@ -256,7 +256,7 @@ SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId
|
||||||
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);
|
SColumnInfoData* bdGetColumnInfoData(const SSDataBlock* pBlock, int32_t index);
|
||||||
|
|
||||||
int32_t blockGetEncodeSize(const SSDataBlock* pBlock);
|
int32_t blockGetEncodeSize(const SSDataBlock* pBlock);
|
||||||
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols, int32_t bVersion);
|
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols);
|
||||||
const char* blockDecode(SSDataBlock* pBlock, const char* pData);
|
const char* blockDecode(SSDataBlock* pBlock, const char* pData);
|
||||||
|
|
||||||
// for debug
|
// for debug
|
||||||
|
|
|
@ -499,7 +499,7 @@ static int32_t buildShowVariablesRsp(SArray* pVars, SRetrieveTableRsp** pRsp) {
|
||||||
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||||
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
|
(*pRsp)->numOfCols = htonl(SHOW_VARIABLES_RESULT_COLS);
|
||||||
|
|
||||||
int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS, BLOCK_VERSION_1);
|
int32_t len = blockEncode(pBlock, (*pRsp)->data, SHOW_VARIABLES_RESULT_COLS);
|
||||||
blockDataDestroy(pBlock);
|
blockDataDestroy(pBlock);
|
||||||
|
|
||||||
if (len != rspSize - sizeof(SRetrieveTableRsp)) {
|
if (len != rspSize - sizeof(SRetrieveTableRsp)) {
|
||||||
|
@ -611,7 +611,7 @@ static int32_t buildRetriveTableRspForCompactDb(SCompactDbRsp* pCompactDb, SRetr
|
||||||
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||||
(*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
|
(*pRsp)->numOfCols = htonl(COMPACT_DB_RESULT_COLS);
|
||||||
|
|
||||||
int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS, BLOCK_VERSION_1);
|
int32_t len = blockEncode(pBlock, (*pRsp)->data, COMPACT_DB_RESULT_COLS);
|
||||||
blockDataDestroy(pBlock);
|
blockDataDestroy(pBlock);
|
||||||
|
|
||||||
if (len != rspSize - sizeof(SRetrieveTableRsp)) {
|
if (len != rspSize - sizeof(SRetrieveTableRsp)) {
|
||||||
|
|
|
@ -1562,17 +1562,39 @@ SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) {
|
||||||
return pRspObj;
|
return pRspObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
|
||||||
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
|
||||||
pRspObj->resType = RES_TYPE__TMQ;
|
|
||||||
|
|
||||||
|
void changeByteEndian(char* pData){
|
||||||
|
char* p = pData;
|
||||||
|
|
||||||
|
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column length |
|
||||||
|
// version:
|
||||||
|
int32_t blockVersion = *(int32_t*)p;
|
||||||
|
ASSERT(blockVersion == BLOCK_VERSION_1);
|
||||||
|
*(int32_t*)p = BLOCK_VERSION_2;
|
||||||
|
|
||||||
|
p += sizeof(int32_t);
|
||||||
|
p += sizeof(int32_t);
|
||||||
|
p += sizeof(int32_t);
|
||||||
|
int32_t cols = *(int32_t*)p;
|
||||||
|
p += sizeof(int32_t);
|
||||||
|
p += sizeof(int32_t);
|
||||||
|
p += sizeof(uint64_t);
|
||||||
|
// check fields
|
||||||
|
p += cols * (sizeof(int8_t) + sizeof(int32_t));
|
||||||
|
|
||||||
|
int32_t* colLength = (int32_t*)p;
|
||||||
|
for (int32_t i = 0; i < cols; ++i) {
|
||||||
|
colLength[i] = htonl(colLength[i]);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) {
|
||||||
(*numOfRows) = 0;
|
(*numOfRows) = 0;
|
||||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||||
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
||||||
|
|
||||||
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
||||||
pRspObj->resIter = -1;
|
pRspObj->resIter = -1;
|
||||||
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
|
|
||||||
|
|
||||||
pRspObj->resInfo.totalRows = 0;
|
pRspObj->resInfo.totalRows = 0;
|
||||||
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
||||||
|
@ -1584,11 +1606,21 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg,
|
||||||
}
|
}
|
||||||
// extract the rows in this data packet
|
// extract the rows in this data packet
|
||||||
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
|
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
|
||||||
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i);
|
void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, i);
|
||||||
int64_t rows = htobe64(pRetrieve->numOfRows);
|
void* rawData = NULL;
|
||||||
|
int64_t rows = 0;
|
||||||
|
// deal with compatibility
|
||||||
|
if(*(int64_t*)pRetrieve == 0){
|
||||||
|
rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
|
||||||
|
rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
|
||||||
|
}else if(*(int64_t*)pRetrieve == 1){
|
||||||
|
rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
|
||||||
|
rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
|
||||||
|
}
|
||||||
|
|
||||||
pVg->numOfRows += rows;
|
pVg->numOfRows += rows;
|
||||||
(*numOfRows) += rows;
|
(*numOfRows) += rows;
|
||||||
|
changeByteEndian(rawData);
|
||||||
if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable
|
if (needTransformSchema) { //withSchema is false if subscribe subquery, true if subscribe db or stable
|
||||||
SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
|
SSchemaWrapper *schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema);
|
||||||
if(schema){
|
if(schema){
|
||||||
|
@ -1596,29 +1628,22 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
||||||
|
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
||||||
|
pRspObj->resType = RES_TYPE__TMQ;
|
||||||
|
memcpy(&pRspObj->rsp, &pWrapper->dataRsp, sizeof(SMqDataRsp));
|
||||||
|
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, pRspObj);
|
||||||
return pRspObj;
|
return pRspObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows) {
|
||||||
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
|
SMqTaosxRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqTaosxRspObj));
|
||||||
pRspObj->resType = RES_TYPE__TMQ_METADATA;
|
pRspObj->resType = RES_TYPE__TMQ_METADATA;
|
||||||
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
|
|
||||||
tstrncpy(pRspObj->db, pWrapper->topicHandle->db, TSDB_DB_FNAME_LEN);
|
|
||||||
pRspObj->vgId = pWrapper->vgHandle->vgId;
|
|
||||||
pRspObj->resIter = -1;
|
|
||||||
memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
|
memcpy(&pRspObj->rsp, &pWrapper->taosxRsp, sizeof(STaosxRsp));
|
||||||
|
|
||||||
pRspObj->resInfo.totalRows = 0;
|
tmqBuildRspFromWrapperInner(pWrapper, pVg, numOfRows, (SMqRspObj*)pRspObj);
|
||||||
pRspObj->resInfo.precision = TSDB_TIME_PRECISION_MILLI;
|
|
||||||
|
|
||||||
// extract the rows in this data packet
|
|
||||||
for (int32_t i = 0; i < pRspObj->rsp.blockNum; ++i) {
|
|
||||||
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, i);
|
|
||||||
int64_t rows = htobe64(pRetrieve->numOfRows);
|
|
||||||
pVg->numOfRows += rows;
|
|
||||||
(*numOfRows) += rows;
|
|
||||||
}
|
|
||||||
return pRspObj;
|
return pRspObj;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2191,12 +2191,12 @@ int32_t buildCtbNameByGroupIdImpl(const char* stbFullName, uint64_t groupId, cha
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols, int32_t bVersion) {
|
int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols) {
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
|
|
||||||
// todo extract method
|
// todo extract method
|
||||||
int32_t* version = (int32_t*)data;
|
int32_t* version = (int32_t*)data;
|
||||||
*version = bVersion;
|
*version = BLOCK_VERSION_1;
|
||||||
data += sizeof(int32_t);
|
data += sizeof(int32_t);
|
||||||
|
|
||||||
int32_t* actualLen = (int32_t*)data;
|
int32_t* actualLen = (int32_t*)data;
|
||||||
|
@ -2277,9 +2277,7 @@ int32_t blockEncode(const SSDataBlock* pBlock, char* data, int32_t numOfCols, in
|
||||||
data += colSizes[col];
|
data += colSizes[col];
|
||||||
}
|
}
|
||||||
|
|
||||||
if(bVersion == BLOCK_VERSION_1){
|
colSizes[col] = htonl(colSizes[col]);
|
||||||
colSizes[col] = htonl(colSizes[col]);
|
|
||||||
}
|
|
||||||
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
|
// uError("blockEncode col bytes:%d, type:%d, size:%d, htonl size:%d", pColRes->info.bytes, pColRes->info.type,
|
||||||
// htonl(colSizes[col]), colSizes[col]);
|
// htonl(colSizes[col]), colSizes[col]);
|
||||||
}
|
}
|
||||||
|
@ -2340,9 +2338,7 @@ const char* blockDecode(SSDataBlock* pBlock, const char* pData) {
|
||||||
pStart += sizeof(int32_t) * numOfCols;
|
pStart += sizeof(int32_t) * numOfCols;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||||
if(version == BLOCK_VERSION_1){
|
colLen[i] = htonl(colLen[i]);
|
||||||
colLen[i] = htonl(colLen[i]);
|
|
||||||
}
|
|
||||||
ASSERT(colLen[i] >= 0);
|
ASSERT(colLen[i] >= 0);
|
||||||
|
|
||||||
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
|
|
@ -401,7 +401,7 @@ int32_t dmProcessRetrieve(SDnodeMgmt *pMgmt, SRpcMsg *pMsg) {
|
||||||
pStart += sizeof(SSysTableSchema);
|
pStart += sizeof(SSysTableSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = blockEncode(pBlock, pStart, numOfCols, BLOCK_VERSION_1);
|
int32_t len = blockEncode(pBlock, pStart, numOfCols);
|
||||||
|
|
||||||
pRsp->numOfRows = htonl(pBlock->info.rows);
|
pRsp->numOfRows = htonl(pBlock->info.rows);
|
||||||
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
|
pRsp->precision = TSDB_TIME_PRECISION_MILLI; // millisecond time precision
|
||||||
|
|
|
@ -328,7 +328,7 @@ static int32_t mndProcessRetrieveSysTableReq(SRpcMsg *pReq) {
|
||||||
pStart += sizeof(SSysTableSchema);
|
pStart += sizeof(SSysTableSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns, BLOCK_VERSION_1);
|
int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
|
||||||
}
|
}
|
||||||
|
|
||||||
pRsp->numOfRows = htonl(rowsRead);
|
pRsp->numOfRows = htonl(rowsRead);
|
||||||
|
|
|
@ -3150,7 +3150,7 @@ void mndExtractTbNameFromStbFullName(const char *stbFullName, char *dst, int32_t
|
||||||
// pStart += sizeof(SSysTableSchema);
|
// pStart += sizeof(SSysTableSchema);
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns, BLOCK_VERSION_1);
|
// int32_t len = blockEncode(pBlock, pStart, pShow->pMeta->numOfColumns);
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
// pRsp->numOfRows = htonl(rowsRead);
|
// pRsp->numOfRows = htonl(rowsRead);
|
||||||
|
|
|
@ -28,7 +28,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
|
||||||
pRetrieve->compressed = 0;
|
pRetrieve->compressed = 0;
|
||||||
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||||
|
|
||||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_2);
|
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
|
||||||
actualLen += sizeof(SRetrieveTableRspForTmq);
|
actualLen += sizeof(SRetrieveTableRspForTmq);
|
||||||
taosArrayPush(pRsp->blockDataLen, &actualLen);
|
taosArrayPush(pRsp->blockDataLen, &actualLen);
|
||||||
taosArrayPush(pRsp->blockData, &buf);
|
taosArrayPush(pRsp->blockData, &buf);
|
||||||
|
|
|
@ -40,7 +40,7 @@ static int32_t buildRetrieveTableRsp(SSDataBlock* pBlock, int32_t numOfCols, SRe
|
||||||
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
(*pRsp)->numOfRows = htobe64((int64_t)pBlock->info.rows);
|
||||||
(*pRsp)->numOfCols = htonl(numOfCols);
|
(*pRsp)->numOfCols = htonl(numOfCols);
|
||||||
|
|
||||||
int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols, BLOCK_VERSION_1);
|
int32_t len = blockEncode(pBlock, (*pRsp)->data, numOfCols);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1817,7 +1817,7 @@ int32_t qExplainGetRspFromCtx(void *ctx, SRetrieveTableRsp **pRsp) {
|
||||||
rsp->completed = 1;
|
rsp->completed = 1;
|
||||||
rsp->numOfRows = htobe64((int64_t)rowNum);
|
rsp->numOfRows = htobe64((int64_t)rowNum);
|
||||||
|
|
||||||
int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock), BLOCK_VERSION_1);
|
int32_t len = blockEncode(pBlock, rsp->data, taosArrayGetSize(pBlock->pDataBlock));
|
||||||
|
|
||||||
rsp->compLen = htonl(len);
|
rsp->compLen = htonl(len);
|
||||||
|
|
||||||
|
|
|
@ -76,7 +76,7 @@ static void toDataCacheEntry(SDataDispatchHandle* pHandle, const SInputData* pIn
|
||||||
pEntry->dataLen = 0;
|
pEntry->dataLen = 0;
|
||||||
|
|
||||||
pBuf->useSize = sizeof(SDataCacheEntry);
|
pBuf->useSize = sizeof(SDataCacheEntry);
|
||||||
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols, BLOCK_VERSION_1);
|
pEntry->dataLen = blockEncode(pInput->pData, pEntry->data, numOfCols);
|
||||||
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
|
// ASSERT(pEntry->numOfRows == *(int32_t*)(pEntry->data + 8));
|
||||||
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
// ASSERT(pEntry->numOfCols == *(int32_t*)(pEntry->data + 8 + 4));
|
||||||
|
|
||||||
|
|
|
@ -188,7 +188,7 @@ int32_t streamBroadcastToChildren(SStreamTask* pTask, const SSDataBlock* pBlock)
|
||||||
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
|
pRetrieve->ekey = htobe64(pBlock->info.window.ekey);
|
||||||
pRetrieve->version = htobe64(pBlock->info.version);
|
pRetrieve->version = htobe64(pBlock->info.version);
|
||||||
|
|
||||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_1);
|
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
|
||||||
|
|
||||||
SStreamRetrieveReq req = {
|
SStreamRetrieveReq req = {
|
||||||
.streamId = pTask->id.streamId,
|
.streamId = pTask->id.streamId,
|
||||||
|
@ -772,7 +772,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
|
||||||
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
int32_t numOfCols = (int32_t)taosArrayGetSize(pBlock->pDataBlock);
|
||||||
pRetrieve->numOfCols = htonl(numOfCols);
|
pRetrieve->numOfCols = htonl(numOfCols);
|
||||||
|
|
||||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols, BLOCK_VERSION_1);
|
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, numOfCols);
|
||||||
actualLen += sizeof(SRetrieveTableRsp);
|
actualLen += sizeof(SRetrieveTableRsp);
|
||||||
ASSERT(actualLen <= dataStrLen);
|
ASSERT(actualLen <= dataStrLen);
|
||||||
taosArrayPush(pReq->dataLen, &actualLen);
|
taosArrayPush(pReq->dataLen, &actualLen);
|
||||||
|
|
Loading…
Reference in New Issue