Merge pull request #26728 from taosdata/enh/TD-30987-11

enh
This commit is contained in:
Hongze Cheng 2024-07-27 17:04:52 +08:00 committed by GitHub
commit 61f074bae0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 270 additions and 226 deletions

View File

@ -683,36 +683,55 @@ typedef struct {
} SColCmprWrapper;
static FORCE_INLINE SColCmprWrapper* tCloneSColCmprWrapper(const SColCmprWrapper* pSrcWrapper) {
if (pSrcWrapper->pColCmpr == NULL || pSrcWrapper->nCols == 0) return NULL;
if (pSrcWrapper->pColCmpr == NULL || pSrcWrapper->nCols == 0) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
SColCmprWrapper* pDstWrapper = (SColCmprWrapper*)taosMemoryMalloc(sizeof(SColCmprWrapper));
if (pDstWrapper == NULL) {
return NULL;
}
pDstWrapper->nCols = pSrcWrapper->nCols;
pDstWrapper->version = pSrcWrapper->version;
int32_t size = sizeof(SColCmpr) * pDstWrapper->nCols;
pDstWrapper->pColCmpr = (SColCmpr*)taosMemoryCalloc(1, size);
if (pDstWrapper->pColCmpr == NULL) {
taosMemoryFree(pDstWrapper);
return NULL;
}
(void)memcpy(pDstWrapper->pColCmpr, pSrcWrapper->pColCmpr, size);
return pDstWrapper;
}
static FORCE_INLINE void tInitDefaultSColCmprWrapperByCols(SColCmprWrapper* pCmpr, int32_t nCols) {
static FORCE_INLINE int32_t tInitDefaultSColCmprWrapperByCols(SColCmprWrapper* pCmpr, int32_t nCols) {
assert(!pCmpr->pColCmpr);
pCmpr->pColCmpr = (SColCmpr*)taosMemoryCalloc(nCols, sizeof(SColCmpr));
if (pCmpr->pColCmpr == NULL) {
return terrno;
}
pCmpr->nCols = nCols;
return 0;
}
static FORCE_INLINE void tInitDefaultSColCmprWrapper(SColCmprWrapper* pCmpr, SSchemaWrapper* pSchema) {
static FORCE_INLINE int32_t tInitDefaultSColCmprWrapper(SColCmprWrapper* pCmpr, SSchemaWrapper* pSchema) {
pCmpr->nCols = pSchema->nCols;
assert(!pCmpr->pColCmpr);
pCmpr->pColCmpr = (SColCmpr*)taosMemoryCalloc(pCmpr->nCols, sizeof(SColCmpr));
if (pCmpr->pColCmpr == NULL) {
return terrno;
}
for (int32_t i = 0; i < pCmpr->nCols; i++) {
SColCmpr* pColCmpr = &pCmpr->pColCmpr[i];
SSchema* pColSchema = &pSchema->pSchema[i];
pColCmpr->id = pColSchema->colId;
pColCmpr->alg = 0;
}
return 0;
}
static FORCE_INLINE void tDeleteSColCmprWrapper(SColCmprWrapper* pWrapper) {
if (pWrapper == NULL) return;
@ -723,7 +742,9 @@ static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* p
if (pSchemaWrapper->pSchema == NULL) return NULL;
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
if (pSW == NULL) return pSW;
if (pSW == NULL) {
return NULL;
}
pSW->nCols = pSchemaWrapper->nCols;
pSW->version = pSchemaWrapper->version;
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
@ -770,32 +791,32 @@ static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
}
static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) {
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1;
if (tEncodeI16v(pEncoder, pSchema->colId) < 0) return -1;
if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pSchema->type));
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pSchema->flags));
TAOS_CHECK_RETURN(tEncodeI32v(pEncoder, pSchema->bytes));
TAOS_CHECK_RETURN(tEncodeI16v(pEncoder, pSchema->colId));
TAOS_CHECK_RETURN(tEncodeCStr(pEncoder, pSchema->name));
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) {
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1;
if (tDecodeI16v(pDecoder, &pSchema->colId) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pSchema->type));
TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pSchema->flags));
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pSchema->bytes));
TAOS_CHECK_RETURN(tDecodeI16v(pDecoder, &pSchema->colId));
TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, pSchema->name));
return 0;
}
static FORCE_INLINE int32_t tEncodeSSchemaExt(SEncoder* pEncoder, const SSchemaExt* pSchemaExt) {
if (tEncodeI16v(pEncoder, pSchemaExt->colId) < 0) return -1;
if (tEncodeU32(pEncoder, pSchemaExt->compress) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI16v(pEncoder, pSchemaExt->colId));
TAOS_CHECK_RETURN(tEncodeU32(pEncoder, pSchemaExt->compress));
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchemaExt(SDecoder* pDecoder, SSchemaExt* pSchemaExt) {
if (tDecodeI16v(pDecoder, &pSchemaExt->colId) < 0) return -1;
if (tDecodeU32(pDecoder, &pSchemaExt->compress) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI16v(pDecoder, &pSchemaExt->colId));
TAOS_CHECK_RETURN(tDecodeU32(pDecoder, &pSchemaExt->compress));
return 0;
}
@ -828,36 +849,39 @@ static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapp
}
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
if (tEncodeI32v(pEncoder, pSW->version) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI32v(pEncoder, pSW->nCols));
TAOS_CHECK_RETURN(tEncodeI32v(pEncoder, pSW->version));
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeSSchema(pEncoder, &pSW->pSchema[i]));
}
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->version) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pSW->nCols));
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pSW->version));
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
if (pSW->pSchema == NULL) return -1;
if (pSW->pSchema == NULL) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeSSchema(pDecoder, &pSW->pSchema[i]));
}
return 0;
}
static FORCE_INLINE int32_t tDecodeSSchemaWrapperEx(SDecoder* pDecoder, SSchemaWrapper* pSW) {
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
if (tDecodeI32v(pDecoder, &pSW->version) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pSW->nCols));
TAOS_CHECK_RETURN(tDecodeI32v(pDecoder, &pSW->version));
pSW->pSchema = (SSchema*)tDecoderMalloc(pDecoder, pSW->nCols * sizeof(SSchema));
if (pSW->pSchema == NULL) return -1;
if (pSW->pSchema == NULL) {
TAOS_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
for (int32_t i = 0; i < pSW->nCols; i++) {
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeSSchema(pDecoder, &pSW->pSchema[i]));
}
return 0;
@ -2838,13 +2862,13 @@ static FORCE_INLINE int32_t tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeR
pReq->topicNames = taosArrayInit(topicNum, sizeof(void*));
if (pReq->topicNames == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
for (int32_t i = 0; i < topicNum; i++) {
char* name = NULL;
buf = taosDecodeString(buf, &name);
if (taosArrayPush(pReq->topicNames, &name) == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
}
@ -2875,6 +2899,7 @@ static FORCE_INLINE SMqRebInfo* tNewSMqRebSubscribe(const char* key) {
}
pRebInfo->newConsumers = taosArrayInit(0, sizeof(int64_t));
if (pRebInfo->newConsumers == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
return pRebInfo;
@ -3358,30 +3383,32 @@ int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchR
void tFreeSClientHbBatchRsp(SClientHbBatchRsp* pBatchRsp);
static FORCE_INLINE int32_t tEncodeSKv(SEncoder* pEncoder, const SKv* pKv) {
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
if (tEncodeBinary(pEncoder, (uint8_t*)pKv->value, pKv->valueLen) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pKv->key));
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pKv->valueLen));
TAOS_CHECK_RETURN(tEncodeBinary(pEncoder, (uint8_t*)pKv->value, pKv->valueLen));
return 0;
}
static FORCE_INLINE int32_t tDecodeSKv(SDecoder* pDecoder, SKv* pKv) {
if (tDecodeI32(pDecoder, &pKv->key) < 0) return -1;
if (tDecodeI32(pDecoder, &pKv->valueLen) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pKv->key));
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pKv->valueLen));
pKv->value = taosMemoryMalloc(pKv->valueLen + 1);
if (pKv->value == NULL) return -1;
if (tDecodeCStrTo(pDecoder, (char*)pKv->value) < 0) return -1;
if (pKv->value == NULL) {
TAOS_CHECK_RETURN(TSDB_CODE_OUT_OF_MEMORY);
}
TAOS_CHECK_RETURN(tDecodeCStrTo(pDecoder, (char*)pKv->value));
return 0;
}
static FORCE_INLINE int32_t tEncodeSClientHbKey(SEncoder* pEncoder, const SClientHbKey* pKey) {
if (tEncodeI64(pEncoder, pKey->tscRid) < 0) return -1;
if (tEncodeI8(pEncoder, pKey->connType) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI64(pEncoder, pKey->tscRid));
TAOS_CHECK_RETURN(tEncodeI8(pEncoder, pKey->connType));
return 0;
}
static FORCE_INLINE int32_t tDecodeSClientHbKey(SDecoder* pDecoder, SClientHbKey* pKey) {
if (tDecodeI64(pDecoder, &pKey->tscRid) < 0) return -1;
if (tDecodeI8(pDecoder, &pKey->connType) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI64(pDecoder, &pKey->tscRid));
TAOS_CHECK_RETURN(tDecodeI8(pDecoder, &pKey->connType));
return 0;
}
@ -3432,7 +3459,9 @@ static FORCE_INLINE void* taosDecodeSMqTopicInfoMsg(void* buf, SMqTopicInfo* pTo
for (int32_t i = 0; i < sz; i++) {
SMqReportVgInfo vgInfo;
buf = taosDecodeSMqVgInfo(buf, &vgInfo);
taosArrayPush(pTopicInfo->pVgInfo, &vgInfo);
if (taosArrayPush(pTopicInfo->pVgInfo, &vgInfo) == NULL) {
return NULL;
}
}
return buf;
}
@ -3468,7 +3497,9 @@ static FORCE_INLINE void* taosDecodeSMqReportMsg(void* buf, SMqReportReq* pMsg)
for (int32_t i = 0; i < sz; i++) {
SMqTopicInfo topicInfo;
buf = taosDecodeSMqTopicInfoMsg(buf, &topicInfo);
taosArrayPush(pMsg->pTopics, &topicInfo);
if (taosArrayPush(pMsg->pTopics, &topicInfo) == NULL) {
return NULL;
}
}
return buf;
}
@ -3605,9 +3636,9 @@ static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ve
int32_t tEncodeSTqOffsetVal(SEncoder* pEncoder, const STqOffsetVal* pOffsetVal);
int32_t tDecodeSTqOffsetVal(SDecoder* pDecoder, STqOffsetVal* pOffsetVal);
void tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal);
int32_t tFormatOffset(char* buf, int32_t maxLen, const STqOffsetVal* pVal);
bool tOffsetEqual(const STqOffsetVal* pLeft, const STqOffsetVal* pRight);
void tOffsetCopy(STqOffsetVal* pLeft, const STqOffsetVal* pOffsetVal);
int32_t tOffsetCopy(STqOffsetVal* pLeft, const STqOffsetVal* pRight);
void tOffsetDestroy(void* pVal);
typedef struct {
@ -3808,17 +3839,17 @@ int32_t tEncodeTSma(SEncoder* pCoder, const STSma* pSma);
int32_t tDecodeTSma(SDecoder* pCoder, STSma* pSma, bool deepCopy);
static int32_t tEncodeTSmaWrapper(SEncoder* pEncoder, const STSmaWrapper* pReq) {
if (tEncodeI32(pEncoder, pReq->number) < 0) return -1;
TAOS_CHECK_RETURN(tEncodeI32(pEncoder, pReq->number));
for (int32_t i = 0; i < pReq->number; ++i) {
tEncodeTSma(pEncoder, pReq->tSma + i);
TAOS_CHECK_RETURN(tEncodeTSma(pEncoder, pReq->tSma + i));
}
return 0;
}
static int32_t tDecodeTSmaWrapper(SDecoder* pDecoder, STSmaWrapper* pReq, bool deepCopy) {
if (tDecodeI32(pDecoder, &pReq->number) < 0) return -1;
TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pReq->number));
for (int32_t i = 0; i < pReq->number; ++i) {
tDecodeTSma(pDecoder, pReq->tSma + i, deepCopy);
TAOS_CHECK_RETURN(tDecodeTSma(pDecoder, pReq->tSma + i, deepCopy));
}
return 0;
}
@ -4041,7 +4072,10 @@ static FORCE_INLINE void* tDecodeSMqAskEpRsp(void* buf, SMqAskEpRsp* pRsp) {
for (int32_t i = 0; i < sz; i++) {
SMqSubTopicEp topicEp;
buf = tDecodeMqSubTopicEp(buf, &topicEp);
if (taosArrayPush(pRsp->topics, &topicEp) == NULL) {
if (buf == NULL) {
return NULL;
}
if ((taosArrayPush(pRsp->topics, &topicEp) == NULL)) {
return NULL;
}
}

View File

@ -384,7 +384,8 @@ static int32_t tRowBuildKVRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, c
payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid);
payloadSize += tPutU32v(payload + payloadSize, colValArray[colValIndex].value.nData);
if (colValArray[colValIndex].value.nData > 0) {
(void)memcpy(payload + payloadSize, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData);
(void)memcpy(payload + payloadSize, colValArray[colValIndex].value.pData,
colValArray[colValIndex].value.nData);
}
payloadSize += colValArray[colValIndex].value.nData;
} else {
@ -480,7 +481,10 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted,
}
colVal = COL_VAL_VALUE(infos[iInfo].columnId, value);
}
taosArrayPush(colValArray, &colVal);
if (taosArrayPush(colValArray, &colVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
SRow *row;
@ -689,7 +693,12 @@ static int32_t tRowMergeImpl(SArray *aRowP, STSchema *pTSchema, int32_t iStart,
}
}
if (pColVal) taosArrayPush(aColVal, pColVal);
if (pColVal) {
if (taosArrayPush(aColVal, pColVal) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
}
}
// build
@ -1750,7 +1759,10 @@ int32_t tTagToValArray(const STag *pTag, SArray **ppArray) {
offset = pTag->idx[iTag];
}
tGetTagVal(p + offset, &tv, pTag->flags & TD_TAG_JSON);
taosArrayPush(*ppArray, &tv);
if (taosArrayPush(*ppArray, &tv) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
return code;
@ -1783,6 +1795,7 @@ void tTagSetCid(const STag *pTag, int16_t iTag, int16_t cid) {
STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) {
STSchema *pTSchema = taosMemoryCalloc(1, sizeof(STSchema) + sizeof(STColumn) * numOfCols);
if (pTSchema == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
@ -2592,7 +2605,8 @@ static FORCE_INLINE void tColDataGetValue4(SColData *pColData, int32_t iVal, SCo
}
value.pData = pColData->pData + pColData->aOffset[iVal];
} else {
(void)memcpy(&value.val, pColData->pData + tDataTypes[pColData->type].bytes * iVal, tDataTypes[pColData->type].bytes);
(void)memcpy(&value.val, pColData->pData + tDataTypes[pColData->type].bytes * iVal,
tDataTypes[pColData->type].bytes);
}
*pColVal = COL_VAL_VALUE(pColData->cid, value);
}
@ -3209,12 +3223,12 @@ static int32_t tColDataMergeSortMerge(SColData *aColData, int32_t start, int32_t
if (end > start) {
aDstColData = taosMemoryCalloc(1, sizeof(SColData) * nColData);
for (int c = 0; c < nColData; ++c) {
tColDataInit(&aDstColData[c], aColData[c].cid, aColData[c].type, aColData[c].cflag);
}
if (aDstColData == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
for (int c = 0; c < nColData; ++c) {
tColDataInit(&aDstColData[c], aColData[c].cid, aColData[c].type, aColData[c].cflag);
}
}
tColDataArrGetRowKey(aColData, nColData, i, &keyi);
@ -4476,6 +4490,9 @@ int32_t tDecompressData(void *input, // input
buffer = &local;
}
code = tBufferEnsureCapacity(buffer, info->originalSize + COMP_OVERFLOW_BYTES);
if (code) {
return code;
}
int32_t decompressedSize = tDataCompress[info->dataType].decompFunc(
input, // input

File diff suppressed because it is too large Load Diff

View File

@ -181,7 +181,7 @@ int32_t tNameSetDbName(SName* dst, int32_t acct, const char* dbName, size_t name
int32_t tNameAddTbName(SName* dst, const char* tbName, size_t nameLen) {
// too long account id or too long db name
if (nameLen >= tListLen(dst->tname) || nameLen <= 0) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
dst->type = TSDB_TABLE_NAME_T;
@ -225,14 +225,14 @@ bool tNameTbNameEqual(SName* left, SName* right) {
int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
if (strlen(str) == 0) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
char* p = NULL;
if ((type & T_NAME_ACCT) == T_NAME_ACCT) {
p = strstr(str, TS_PATH_DELIMITER);
if (p == NULL) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
dst->acctId = taosStr2Int32(str, NULL, 10);
@ -252,7 +252,7 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
// too long account id or too long db name
if ((len >= tListLen(dst->dbname)) || (len <= 0)) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
memcpy(dst->dbname, start, len);
@ -266,7 +266,7 @@ int32_t tNameFromString(SName* dst, const char* str, uint32_t type) {
// too long account id or too long db name
int32_t len = (int32_t)strlen(start);
if ((len >= tListLen(dst->tname)) || (len <= 0)) {
return -1;
return TSDB_CODE_INVALID_PARA;
}
memcpy(dst->tname, start, len);