[td-5250]<enhance>: compress the tableMeta info msg.
This commit is contained in:
parent
b68bb2a84a
commit
84da4d656f
|
@ -13,7 +13,10 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <tscompression.h>
|
||||
#include "os.h"
|
||||
#include "qPlan.h"
|
||||
#include "qTableMeta.h"
|
||||
#include "tcmdtype.h"
|
||||
#include "tlockfree.h"
|
||||
#include "trpc.h"
|
||||
|
@ -21,10 +24,8 @@
|
|||
#include "tscLog.h"
|
||||
#include "tscProfile.h"
|
||||
#include "tscUtil.h"
|
||||
#include "qTableMeta.h"
|
||||
#include "tsclient.h"
|
||||
#include "ttimer.h"
|
||||
#include "qPlan.h"
|
||||
|
||||
int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo) = {0};
|
||||
|
||||
|
@ -2048,16 +2049,27 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
|||
}
|
||||
|
||||
SSqlCmd *pParentCmd = &pParentSql->cmd;
|
||||
|
||||
SHashObj *pSet = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
|
||||
char* pMsg = pMultiMeta->meta;
|
||||
char* buf = NULL;
|
||||
if (pMultiMeta->compressed) {
|
||||
buf = malloc(pMultiMeta->rawLen - sizeof(SMultiTableMeta));
|
||||
int32_t len = tsDecompressString(pMultiMeta->meta, pMultiMeta->contLen - sizeof(SMultiTableMeta), 1,
|
||||
buf, pMultiMeta->rawLen - sizeof(SMultiTableMeta), ONE_STAGE_COMP, NULL, 0);
|
||||
assert(len == pMultiMeta->rawLen - sizeof(SMultiTableMeta));
|
||||
|
||||
pMsg = buf;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pMultiMeta->numOfTables; i++) {
|
||||
STableMetaMsg *pMetaMsg = (STableMetaMsg *)pMsg;
|
||||
int32_t code = tableMetaMsgConvert(pMetaMsg);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosHashCleanup(pSet);
|
||||
taosReleaseRef(tscObjRef, pParentSql->self);
|
||||
|
||||
tfree(buf);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2066,6 +2078,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
|||
tscError("0x%"PRIx64" invalid table meta from mnode, name:%s", pSql->self, pMetaMsg->tableFname);
|
||||
taosHashCleanup(pSet);
|
||||
taosReleaseRef(tscObjRef, pParentSql->self);
|
||||
|
||||
tfree(buf);
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
|
@ -2115,6 +2129,8 @@ int tscProcessMultiTableMetaRsp(SSqlObj *pSql) {
|
|||
|
||||
taosHashCleanup(pSet);
|
||||
taosReleaseRef(tscObjRef, pParentSql->self);
|
||||
|
||||
tfree(buf);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -760,10 +760,12 @@ typedef struct STableMetaMsg {
|
|||
} STableMetaMsg;
|
||||
|
||||
typedef struct SMultiTableMeta {
|
||||
int32_t numOfTables;
|
||||
int32_t numOfVgroup;
|
||||
int32_t contLen;
|
||||
char meta[];
|
||||
int32_t numOfTables;
|
||||
int32_t numOfVgroup;
|
||||
uint32_t contLen:31;
|
||||
uint8_t compressed:1; // denote if compressed or not
|
||||
uint32_t rawLen; // size before compress
|
||||
char meta[];
|
||||
} SMultiTableMeta;
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -2892,7 +2892,7 @@ static SMultiTableMeta* ensureMsgBufferSpace(SMultiTableMeta *pMultiMeta, SArray
|
|||
(*totalMallocLen) *= 2;
|
||||
}
|
||||
|
||||
pMultiMeta = rpcReallocCont(pMultiMeta, *totalMallocLen);
|
||||
pMultiMeta = realloc(pMultiMeta, *totalMallocLen);
|
||||
if (pMultiMeta == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
@ -2923,8 +2923,8 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
// first malloc 80KB, subsequent reallocation will expand the size as twice of the original size
|
||||
int32_t totalMallocLen = sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16);
|
||||
pMultiMeta = rpcMallocCont(totalMallocLen);
|
||||
int32_t totalMallocLen = sizeof(SMultiTableMeta) + sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16);
|
||||
pMultiMeta = calloc(1, totalMallocLen);
|
||||
if (pMultiMeta == NULL) {
|
||||
code = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
goto _end;
|
||||
|
@ -2957,7 +2957,7 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
int remain = totalMallocLen - pMultiMeta->contLen;
|
||||
if (remain <= sizeof(STableMetaMsg) + sizeof(SSchema) * (TSDB_MAX_TAGS + TSDB_MAX_COLUMNS + 16)) {
|
||||
totalMallocLen *= 2;
|
||||
pMultiMeta = rpcReallocCont(pMultiMeta, totalMallocLen);
|
||||
pMultiMeta = realloc(pMultiMeta, totalMallocLen);
|
||||
if (pMultiMeta == NULL) {
|
||||
mnodeDecTableRef(pMsg->pTable);
|
||||
code = TSDB_CODE_MND_OUT_OF_MEMORY;
|
||||
|
@ -3027,16 +3027,36 @@ static int32_t mnodeProcessMultiTableMetaMsg(SMnodeMsg *pMsg) {
|
|||
pMsg->rpcRsp.len = pMultiMeta->contLen;
|
||||
code = TSDB_CODE_SUCCESS;
|
||||
|
||||
char* tmp = rpcMallocCont(pMultiMeta->contLen + 2);
|
||||
int32_t len = tsCompressString(pMultiMeta->meta, (int32_t)pMultiMeta->contLen - sizeof(SMultiTableMeta), 1,
|
||||
tmp + sizeof(SMultiTableMeta), (int32_t)pMultiMeta->contLen - sizeof(SMultiTableMeta) + 2, ONE_STAGE_COMP, NULL, 0);
|
||||
|
||||
pMultiMeta->rawLen = pMultiMeta->contLen;
|
||||
if (len == -1 || len + sizeof(SMultiTableMeta) >= pMultiMeta->contLen + 2) { // compress failed, do not compress this binary data
|
||||
pMultiMeta->compressed = 0;
|
||||
memcpy(tmp, pMultiMeta, sizeof(SMultiTableMeta) + pMultiMeta->contLen);
|
||||
} else {
|
||||
pMultiMeta->compressed = 1;
|
||||
pMultiMeta->contLen = sizeof(SMultiTableMeta) + len;
|
||||
|
||||
// copy the header and the compressed payload
|
||||
memcpy(tmp, pMultiMeta, sizeof(SMultiTableMeta));
|
||||
}
|
||||
|
||||
pMsg->rpcRsp.rsp = tmp;
|
||||
pMsg->rpcRsp.len = pMultiMeta->contLen;
|
||||
|
||||
SMultiTableMeta* p = (SMultiTableMeta*) tmp;
|
||||
|
||||
mDebug("multiTable info build completed, original:%d, compressed:%d, comp:%d", p->rawLen, p->contLen, p->compressed);
|
||||
|
||||
_end:
|
||||
tfree(str);
|
||||
tfree(nameList);
|
||||
taosArrayDestroy(pList);
|
||||
pMsg->pTable = NULL;
|
||||
pMsg->pVgroup = NULL;
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
rpcFreeCont(pMultiMeta);
|
||||
}
|
||||
tfree(pMultiMeta);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue