fix coverity scan problem
This commit is contained in:
parent
55ee9d6c25
commit
e16a3935a5
|
@ -1143,7 +1143,8 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < vlen; ++i) {
|
for (int32_t i = 0; i < vlen; ++i) {
|
||||||
SVnodeLoad vload = {0};
|
SVnodeLoad vload = {0};
|
||||||
int64_t reserved = 0;
|
int64_t reserved64 = 0;
|
||||||
|
int32_t reserved32 = 0;
|
||||||
if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1;
|
if (tDecodeI32(&decoder, &vload.vgId) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &vload.syncState) < 0) return -1;
|
if (tDecodeI8(&decoder, &vload.syncState) < 0) return -1;
|
||||||
if (tDecodeI8(&decoder, &vload.syncRestore) < 0) return -1;
|
if (tDecodeI8(&decoder, &vload.syncRestore) < 0) return -1;
|
||||||
|
@ -1155,9 +1156,9 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
|
||||||
if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
|
if (tDecodeI64(&decoder, &vload.compStorage) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
|
if (tDecodeI64(&decoder, &vload.pointsWritten) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
|
if (tDecodeI32(&decoder, &vload.numOfCachedTables) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, (int32_t *)&reserved) < 0) return -1;
|
if (tDecodeI32(&decoder, (int32_t *)&reserved32) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
if (tDecodeI64(&decoder, &reserved64) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &reserved) < 0) return -1;
|
if (tDecodeI64(&decoder, &reserved64) < 0) return -1;
|
||||||
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
|
if (taosArrayPush(pReq->pVloads, &vload) == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -1545,6 +1546,7 @@ int32_t tSerializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pR
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRsp) {
|
int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRsp) {
|
||||||
|
char *key = NULL, *value = NULL;
|
||||||
pRsp->createdDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
pRsp->createdDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
pRsp->readDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
pRsp->readDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
pRsp->writeDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
pRsp->writeDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
|
@ -1553,40 +1555,40 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
|
||||||
pRsp->useDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
pRsp->useDbs = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
if (pRsp->createdDbs == NULL || pRsp->readDbs == NULL || pRsp->writeDbs == NULL || pRsp->readTbs == NULL ||
|
if (pRsp->createdDbs == NULL || pRsp->readDbs == NULL || pRsp->writeDbs == NULL || pRsp->readTbs == NULL ||
|
||||||
pRsp->writeTbs == NULL || pRsp->useDbs == NULL) {
|
pRsp->writeTbs == NULL || pRsp->useDbs == NULL) {
|
||||||
return -1;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tDecodeCStrTo(pDecoder, pRsp->user) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, pRsp->user) < 0) goto _err;
|
||||||
if (tDecodeI8(pDecoder, &pRsp->superAuth) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pRsp->superAuth) < 0) goto _err;
|
||||||
if (tDecodeI8(pDecoder, &pRsp->sysInfo) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pRsp->sysInfo) < 0) goto _err;
|
||||||
if (tDecodeI8(pDecoder, &pRsp->enable) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pRsp->enable) < 0) goto _err;
|
||||||
if (tDecodeI8(pDecoder, &pRsp->reserve) < 0) return -1;
|
if (tDecodeI8(pDecoder, &pRsp->reserve) < 0) goto _err;
|
||||||
if (tDecodeI32(pDecoder, &pRsp->version) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pRsp->version) < 0) goto _err;
|
||||||
|
|
||||||
int32_t numOfCreatedDbs = 0;
|
int32_t numOfCreatedDbs = 0;
|
||||||
int32_t numOfReadDbs = 0;
|
int32_t numOfReadDbs = 0;
|
||||||
int32_t numOfWriteDbs = 0;
|
int32_t numOfWriteDbs = 0;
|
||||||
if (tDecodeI32(pDecoder, &numOfCreatedDbs) < 0) return -1;
|
if (tDecodeI32(pDecoder, &numOfCreatedDbs) < 0) goto _err;
|
||||||
if (tDecodeI32(pDecoder, &numOfReadDbs) < 0) return -1;
|
if (tDecodeI32(pDecoder, &numOfReadDbs) < 0) goto _err;
|
||||||
if (tDecodeI32(pDecoder, &numOfWriteDbs) < 0) return -1;
|
if (tDecodeI32(pDecoder, &numOfWriteDbs) < 0) goto _err;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfCreatedDbs; ++i) {
|
for (int32_t i = 0; i < numOfCreatedDbs; ++i) {
|
||||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||||
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, db) < 0) goto _err;
|
||||||
int32_t len = strlen(db);
|
int32_t len = strlen(db);
|
||||||
taosHashPut(pRsp->createdDbs, db, len, db, len + 1);
|
taosHashPut(pRsp->createdDbs, db, len, db, len + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfReadDbs; ++i) {
|
for (int32_t i = 0; i < numOfReadDbs; ++i) {
|
||||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||||
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, db) < 0) goto _err;
|
||||||
int32_t len = strlen(db);
|
int32_t len = strlen(db);
|
||||||
taosHashPut(pRsp->readDbs, db, len, db, len + 1);
|
taosHashPut(pRsp->readDbs, db, len, db, len + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfWriteDbs; ++i) {
|
for (int32_t i = 0; i < numOfWriteDbs; ++i) {
|
||||||
char db[TSDB_DB_FNAME_LEN] = {0};
|
char db[TSDB_DB_FNAME_LEN] = {0};
|
||||||
if (tDecodeCStrTo(pDecoder, db) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, db) < 0) goto _err;
|
||||||
int32_t len = strlen(db);
|
int32_t len = strlen(db);
|
||||||
taosHashPut(pRsp->writeDbs, db, len, db, len + 1);
|
taosHashPut(pRsp->writeDbs, db, len, db, len + 1);
|
||||||
}
|
}
|
||||||
|
@ -1595,67 +1597,80 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
|
||||||
int32_t numOfReadTbs = 0;
|
int32_t numOfReadTbs = 0;
|
||||||
int32_t numOfWriteTbs = 0;
|
int32_t numOfWriteTbs = 0;
|
||||||
int32_t numOfUseDbs = 0;
|
int32_t numOfUseDbs = 0;
|
||||||
if (tDecodeI32(pDecoder, &numOfReadTbs) < 0) return -1;
|
if (tDecodeI32(pDecoder, &numOfReadTbs) < 0) goto _err;
|
||||||
if (tDecodeI32(pDecoder, &numOfWriteTbs) < 0) return -1;
|
if (tDecodeI32(pDecoder, &numOfWriteTbs) < 0) goto _err;
|
||||||
if (tDecodeI32(pDecoder, &numOfUseDbs) < 0) return -1;
|
if (tDecodeI32(pDecoder, &numOfUseDbs) < 0) goto _err;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfReadTbs; ++i) {
|
for (int32_t i = 0; i < numOfReadTbs; ++i) {
|
||||||
int32_t keyLen = 0;
|
int32_t keyLen = 0;
|
||||||
if (tDecodeI32(pDecoder, &keyLen) < 0) return -1;
|
if (tDecodeI32(pDecoder, &keyLen) < 0) goto _err;
|
||||||
|
|
||||||
char *key = taosMemoryCalloc(keyLen + 1, sizeof(char));
|
key = taosMemoryCalloc(keyLen + 1, sizeof(char));
|
||||||
if (tDecodeCStrTo(pDecoder, key) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, key) < 0) goto _err;
|
||||||
|
|
||||||
int32_t valuelen = 0;
|
int32_t valuelen = 0;
|
||||||
if (tDecodeI32(pDecoder, &valuelen) < 0) return -1;
|
if (tDecodeI32(pDecoder, &valuelen) < 0) goto _err;
|
||||||
char *value = taosMemoryCalloc(valuelen + 1, sizeof(char));
|
|
||||||
if (tDecodeCStrTo(pDecoder, value) < 0) return -1;
|
value = taosMemoryCalloc(valuelen + 1, sizeof(char));
|
||||||
|
if (tDecodeCStrTo(pDecoder, value) < 0) goto _err;
|
||||||
|
|
||||||
taosHashPut(pRsp->readTbs, key, strlen(key), value, valuelen + 1);
|
taosHashPut(pRsp->readTbs, key, strlen(key), value, valuelen + 1);
|
||||||
|
|
||||||
taosMemoryFree(key);
|
taosMemoryFreeClear(key);
|
||||||
taosMemoryFree(value);
|
taosMemoryFreeClear(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfWriteTbs; ++i) {
|
for (int32_t i = 0; i < numOfWriteTbs; ++i) {
|
||||||
int32_t keyLen = 0;
|
int32_t keyLen = 0;
|
||||||
if (tDecodeI32(pDecoder, &keyLen) < 0) return -1;
|
if (tDecodeI32(pDecoder, &keyLen) < 0) goto _err;
|
||||||
|
|
||||||
char *key = taosMemoryCalloc(keyLen + 1, sizeof(char));
|
key = taosMemoryCalloc(keyLen + 1, sizeof(char));
|
||||||
if (tDecodeCStrTo(pDecoder, key) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, key) < 0) goto _err;
|
||||||
|
|
||||||
int32_t valuelen = 0;
|
int32_t valuelen = 0;
|
||||||
if (tDecodeI32(pDecoder, &valuelen) < 0) return -1;
|
if (tDecodeI32(pDecoder, &valuelen) < 0) goto _err;
|
||||||
char *value = taosMemoryCalloc(valuelen + 1, sizeof(char));
|
|
||||||
if (tDecodeCStrTo(pDecoder, value) < 0) return -1;
|
value = taosMemoryCalloc(valuelen + 1, sizeof(char));
|
||||||
|
if (tDecodeCStrTo(pDecoder, value) < 0) goto _err;
|
||||||
|
|
||||||
taosHashPut(pRsp->writeTbs, key, strlen(key), value, valuelen + 1);
|
taosHashPut(pRsp->writeTbs, key, strlen(key), value, valuelen + 1);
|
||||||
|
|
||||||
taosMemoryFree(key);
|
taosMemoryFreeClear(key);
|
||||||
taosMemoryFree(value);
|
taosMemoryFreeClear(value);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfUseDbs; ++i) {
|
for (int32_t i = 0; i < numOfUseDbs; ++i) {
|
||||||
int32_t keyLen = 0;
|
int32_t keyLen = 0;
|
||||||
if (tDecodeI32(pDecoder, &keyLen) < 0) return -1;
|
if (tDecodeI32(pDecoder, &keyLen) < 0) goto _err;
|
||||||
|
|
||||||
char *key = taosMemoryCalloc(keyLen + 1, sizeof(char));
|
key = taosMemoryCalloc(keyLen + 1, sizeof(char));
|
||||||
if (tDecodeCStrTo(pDecoder, key) < 0) return -1;
|
if (tDecodeCStrTo(pDecoder, key) < 0) goto _err;
|
||||||
|
|
||||||
int32_t ref = 0;
|
int32_t ref = 0;
|
||||||
if (tDecodeI32(pDecoder, &ref) < 0) return -1;
|
if (tDecodeI32(pDecoder, &ref) < 0) goto _err;
|
||||||
|
|
||||||
taosHashPut(pRsp->useDbs, key, strlen(key), &ref, sizeof(ref));
|
taosHashPut(pRsp->useDbs, key, strlen(key), &ref, sizeof(ref));
|
||||||
taosMemoryFree(key);
|
taosMemoryFreeClear(key);
|
||||||
}
|
}
|
||||||
// since 3.0.7.0
|
// since 3.0.7.0
|
||||||
if (!tDecodeIsEnd(pDecoder)) {
|
if (!tDecodeIsEnd(pDecoder)) {
|
||||||
if (tDecodeI32(pDecoder, &pRsp->passVer) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pRsp->passVer) < 0) goto _err;
|
||||||
} else {
|
} else {
|
||||||
pRsp->passVer = 0;
|
pRsp->passVer = 0;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
_err:
|
||||||
|
taosHashCleanup(pRsp->createdDbs);
|
||||||
|
taosHashCleanup(pRsp->readDbs);
|
||||||
|
taosHashCleanup(pRsp->writeDbs);
|
||||||
|
taosHashCleanup(pRsp->writeTbs);
|
||||||
|
taosHashCleanup(pRsp->readTbs);
|
||||||
|
taosHashCleanup(pRsp->useDbs);
|
||||||
|
|
||||||
|
taosMemoryFreeClear(key);
|
||||||
|
taosMemoryFreeClear(value);
|
||||||
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pRsp) {
|
int32_t tDeserializeSGetUserAuthRsp(void *buf, int32_t bufLen, SGetUserAuthRsp *pRsp) {
|
||||||
|
@ -2844,7 +2859,6 @@ int32_t tSerializeSDbHbRspImp(SEncoder *pEncoder, const SDbHbRsp *pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tSerializeSDbHbBatchRsp(void *buf, int32_t bufLen, SDbHbBatchRsp *pRsp) {
|
int32_t tSerializeSDbHbBatchRsp(void *buf, int32_t bufLen, SDbHbBatchRsp *pRsp) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
@ -2908,7 +2922,7 @@ int32_t tDeserializeSUseDbRsp(void *buf, int32_t bufLen, SUseDbRsp *pRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDeserializeSDbHbRspImp(SDecoder* decoder, SDbHbRsp* pRsp) {
|
int32_t tDeserializeSDbHbRspImp(SDecoder *decoder, SDbHbRsp *pRsp) {
|
||||||
int8_t flag = 0;
|
int8_t flag = 0;
|
||||||
if (tDecodeI8(decoder, &flag) < 0) return -1;
|
if (tDecodeI8(decoder, &flag) < 0) return -1;
|
||||||
if (flag) {
|
if (flag) {
|
||||||
|
@ -3196,7 +3210,7 @@ int32_t tSerializeSDbCfgRsp(void *buf, int32_t bufLen, const SDbCfgRsp *pRsp) {
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDeserializeSDbCfgRspImpl(SDecoder* decoder, SDbCfgRsp *pRsp) {
|
int32_t tDeserializeSDbCfgRspImpl(SDecoder *decoder, SDbCfgRsp *pRsp) {
|
||||||
if (tDecodeCStrTo(decoder, pRsp->db) < 0) return -1;
|
if (tDecodeCStrTo(decoder, pRsp->db) < 0) return -1;
|
||||||
if (tDecodeI64(decoder, &pRsp->dbId) < 0) return -1;
|
if (tDecodeI64(decoder, &pRsp->dbId) < 0) return -1;
|
||||||
if (tDecodeI32(decoder, &pRsp->cfgVersion) < 0) return -1;
|
if (tDecodeI32(decoder, &pRsp->cfgVersion) < 0) return -1;
|
||||||
|
@ -5306,10 +5320,10 @@ int32_t tDeserializeSMqAskEpReq(void *buf, int32_t bufLen, SMqAskEpReq *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDeatroySMqHbReq(SMqHbReq* pReq){
|
int32_t tDeatroySMqHbReq(SMqHbReq *pReq) {
|
||||||
for(int i = 0; i < taosArrayGetSize(pReq->topics); i++){
|
for (int i = 0; i < taosArrayGetSize(pReq->topics); i++) {
|
||||||
TopicOffsetRows* vgs = taosArrayGet(pReq->topics, i);
|
TopicOffsetRows *vgs = taosArrayGet(pReq->topics, i);
|
||||||
if(vgs) taosArrayDestroy(vgs->offsetRows);
|
if (vgs) taosArrayDestroy(vgs->offsetRows);
|
||||||
}
|
}
|
||||||
taosArrayDestroy(pReq->topics);
|
taosArrayDestroy(pReq->topics);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -5326,7 +5340,7 @@ int32_t tSerializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
||||||
int32_t sz = taosArrayGetSize(pReq->topics);
|
int32_t sz = taosArrayGetSize(pReq->topics);
|
||||||
if (tEncodeI32(&encoder, sz) < 0) return -1;
|
if (tEncodeI32(&encoder, sz) < 0) return -1;
|
||||||
for (int32_t i = 0; i < sz; ++i) {
|
for (int32_t i = 0; i < sz; ++i) {
|
||||||
TopicOffsetRows* vgs = (TopicOffsetRows*)taosArrayGet(pReq->topics, i);
|
TopicOffsetRows *vgs = (TopicOffsetRows *)taosArrayGet(pReq->topics, i);
|
||||||
if (tEncodeCStr(&encoder, vgs->topicName) < 0) return -1;
|
if (tEncodeCStr(&encoder, vgs->topicName) < 0) return -1;
|
||||||
int32_t szVgs = taosArrayGetSize(vgs->offsetRows);
|
int32_t szVgs = taosArrayGetSize(vgs->offsetRows);
|
||||||
if (tEncodeI32(&encoder, szVgs) < 0) return -1;
|
if (tEncodeI32(&encoder, szVgs) < 0) return -1;
|
||||||
|
@ -5356,19 +5370,19 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
||||||
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->epoch) < 0) return -1;
|
||||||
int32_t sz = 0;
|
int32_t sz = 0;
|
||||||
if (tDecodeI32(&decoder, &sz) < 0) return -1;
|
if (tDecodeI32(&decoder, &sz) < 0) return -1;
|
||||||
if(sz > 0){
|
if (sz > 0) {
|
||||||
pReq->topics = taosArrayInit(sz, sizeof(TopicOffsetRows));
|
pReq->topics = taosArrayInit(sz, sizeof(TopicOffsetRows));
|
||||||
if (NULL == pReq->topics) return -1;
|
if (NULL == pReq->topics) return -1;
|
||||||
for (int32_t i = 0; i < sz; ++i) {
|
for (int32_t i = 0; i < sz; ++i) {
|
||||||
TopicOffsetRows* data = taosArrayReserve(pReq->topics, 1);
|
TopicOffsetRows *data = taosArrayReserve(pReq->topics, 1);
|
||||||
tDecodeCStrTo(&decoder, data->topicName);
|
tDecodeCStrTo(&decoder, data->topicName);
|
||||||
int32_t szVgs = 0;
|
int32_t szVgs = 0;
|
||||||
if (tDecodeI32(&decoder, &szVgs) < 0) return -1;
|
if (tDecodeI32(&decoder, &szVgs) < 0) return -1;
|
||||||
if(szVgs > 0){
|
if (szVgs > 0) {
|
||||||
data->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
|
data->offsetRows = taosArrayInit(szVgs, sizeof(OffsetRows));
|
||||||
if (NULL == data->offsetRows) return -1;
|
if (NULL == data->offsetRows) return -1;
|
||||||
for (int32_t j= 0; j < szVgs; ++j) {
|
for (int32_t j = 0; j < szVgs; ++j) {
|
||||||
OffsetRows* offRows = taosArrayReserve(data->offsetRows, 1);
|
OffsetRows *offRows = taosArrayReserve(data->offsetRows, 1);
|
||||||
if (tDecodeI32(&decoder, &offRows->vgId) < 0) return -1;
|
if (tDecodeI32(&decoder, &offRows->vgId) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &offRows->rows) < 0) return -1;
|
if (tDecodeI64(&decoder, &offRows->rows) < 0) return -1;
|
||||||
if (tDecodeSTqOffsetVal(&decoder, &offRows->offset) < 0) return -1;
|
if (tDecodeSTqOffsetVal(&decoder, &offRows->offset) < 0) return -1;
|
||||||
|
@ -5382,7 +5396,6 @@ int32_t tDeserializeSMqHbReq(void *buf, int32_t bufLen, SMqHbReq *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t tSerializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq) {
|
int32_t tSerializeSMqSeekReq(void *buf, int32_t bufLen, SMqSeekReq *pReq) {
|
||||||
int32_t headLen = sizeof(SMsgHead);
|
int32_t headLen = sizeof(SMsgHead);
|
||||||
if (buf != NULL) {
|
if (buf != NULL) {
|
||||||
|
@ -5610,9 +5623,9 @@ int32_t tSerializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
|
||||||
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
|
int32_t tDeserializeSMqPollReq(void *buf, int32_t bufLen, SMqPollReq *pReq) {
|
||||||
int32_t headLen = sizeof(SMsgHead);
|
int32_t headLen = sizeof(SMsgHead);
|
||||||
|
|
||||||
// SMsgHead *pHead = buf;
|
// SMsgHead *pHead = buf;
|
||||||
// pHead->vgId = pReq->head.vgId;
|
// pHead->vgId = pReq->head.vgId;
|
||||||
// pHead->contLen = pReq->head.contLen;
|
// pHead->contLen = pReq->head.contLen;
|
||||||
|
|
||||||
SDecoder decoder = {0};
|
SDecoder decoder = {0};
|
||||||
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
|
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
|
||||||
|
@ -6983,7 +6996,7 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSVAlterTbReqSetCtime(SDecoder* pDecoder, SVAlterTbReq* pReq, int64_t ctimeMs) {
|
int32_t tDecodeSVAlterTbReqSetCtime(SDecoder *pDecoder, SVAlterTbReq *pReq, int64_t ctimeMs) {
|
||||||
if (tStartDecode(pDecoder) < 0) return -1;
|
if (tStartDecode(pDecoder) < 0) return -1;
|
||||||
if (tDecodeSVAlterTbReqCommon(pDecoder, pReq) < 0) return -1;
|
if (tDecodeSVAlterTbReqCommon(pDecoder, pReq) < 0) return -1;
|
||||||
|
|
||||||
|
@ -7216,13 +7229,13 @@ int32_t tDecodeSTqOffset(SDecoder *pDecoder, STqOffset *pOffset) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tEncodeMqVgOffset(SEncoder* pEncoder, const SMqVgOffset* pOffset) {
|
int32_t tEncodeMqVgOffset(SEncoder *pEncoder, const SMqVgOffset *pOffset) {
|
||||||
if (tEncodeSTqOffset(pEncoder, &pOffset->offset) < 0) return -1;
|
if (tEncodeSTqOffset(pEncoder, &pOffset->offset) < 0) return -1;
|
||||||
if (tEncodeI64(pEncoder, pOffset->consumerId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pOffset->consumerId) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeMqVgOffset(SDecoder* pDecoder, SMqVgOffset* pOffset) {
|
int32_t tDecodeMqVgOffset(SDecoder *pDecoder, SMqVgOffset *pOffset) {
|
||||||
if (tDecodeSTqOffset(pDecoder, &pOffset->offset) < 0) return -1;
|
if (tDecodeSTqOffset(pDecoder, &pOffset->offset) < 0) return -1;
|
||||||
if (tDecodeI64(pDecoder, &pOffset->consumerId) < 0) return -1;
|
if (tDecodeI64(pDecoder, &pOffset->consumerId) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -7407,7 +7420,7 @@ int32_t tEncodeSTaosxRsp(SEncoder *pEncoder, const STaosxRsp *pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
|
int32_t tDecodeSTaosxRsp(SDecoder *pDecoder, STaosxRsp *pRsp) {
|
||||||
if (tDecodeMqDataRsp(pDecoder, (SMqDataRsp*)pRsp) < 0) return -1;
|
if (tDecodeMqDataRsp(pDecoder, (SMqDataRsp *)pRsp) < 0) return -1;
|
||||||
|
|
||||||
if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1;
|
if (tDecodeI32(pDecoder, &pRsp->createTableNum) < 0) return -1;
|
||||||
if (pRsp->createTableNum) {
|
if (pRsp->createTableNum) {
|
||||||
|
|
|
@ -455,7 +455,7 @@ int metaAddIndexToSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (diffIdx == -1 && diffIdx == 0) {
|
if (diffIdx == -1 || diffIdx == 0) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1654,10 +1654,11 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
|
||||||
if (ret < 0) {
|
if (ret < 0) {
|
||||||
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
terrno = TSDB_CODE_TDB_TABLE_NOT_EXIST;
|
||||||
return -1;
|
return -1;
|
||||||
|
} else {
|
||||||
|
uid = *(tb_uid_t *)pVal;
|
||||||
|
tdbFree(pVal);
|
||||||
|
pVal = NULL;
|
||||||
}
|
}
|
||||||
uid = *(tb_uid_t *)pVal;
|
|
||||||
tdbFree(pVal);
|
|
||||||
pVal = NULL;
|
|
||||||
|
|
||||||
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(tb_uid_t), &pVal, &nVal) == -1) {
|
if (tdbTbGet(pMeta->pUidIdx, &uid, sizeof(tb_uid_t), &pVal, &nVal) == -1) {
|
||||||
ret = -1;
|
ret = -1;
|
||||||
|
@ -1736,12 +1737,16 @@ static int metaAddTagIndex(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTb
|
||||||
nTagData = tDataTypes[pCol->type].bytes;
|
nTagData = tDataTypes[pCol->type].bytes;
|
||||||
}
|
}
|
||||||
if (metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, uid, &pTagIdxKey, &nTagIdxKey) < 0) {
|
if (metaCreateTagIdxKey(suid, pCol->colId, pTagData, nTagData, pCol->type, uid, &pTagIdxKey, &nTagIdxKey) < 0) {
|
||||||
|
tdbFree(pKey);
|
||||||
|
tdbFree(pVal);
|
||||||
metaDestroyTagIdxKey(pTagIdxKey);
|
metaDestroyTagIdxKey(pTagIdxKey);
|
||||||
|
tdbTbcClose(pCtbIdxc);
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
|
tdbTbUpsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, pMeta->txn);
|
||||||
metaDestroyTagIdxKey(pTagIdxKey);
|
metaDestroyTagIdxKey(pTagIdxKey);
|
||||||
}
|
}
|
||||||
|
tdbTbcClose(pCtbIdxc);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
_err:
|
_err:
|
||||||
|
|
|
@ -189,9 +189,9 @@ void streamBackendCleanup(void* arg) {
|
||||||
taosThreadMutexDestroy(&pHandle->mutex);
|
taosThreadMutexDestroy(&pHandle->mutex);
|
||||||
|
|
||||||
taosThreadMutexDestroy(&pHandle->cfMutex);
|
taosThreadMutexDestroy(&pHandle->cfMutex);
|
||||||
|
qDebug("destroy stream backend backend:%p", pHandle);
|
||||||
|
|
||||||
taosMemoryFree(pHandle);
|
taosMemoryFree(pHandle);
|
||||||
qDebug("destroy stream backend backend:%p", pHandle);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SListNode* streamBackendAddCompare(void* backend, void* arg) {
|
SListNode* streamBackendAddCompare(void* backend, void* arg) {
|
||||||
|
@ -704,7 +704,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
|
||||||
char suffix[64] = {0};
|
char suffix[64] = {0};
|
||||||
|
|
||||||
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
|
rocksdb_options_t** cfOpts = taosMemoryCalloc(nCf, sizeof(rocksdb_options_t*));
|
||||||
RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam*));
|
RocksdbCfParam* params = taosMemoryCalloc(nCf, sizeof(RocksdbCfParam));
|
||||||
rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t**));
|
rocksdb_comparator_t** pCompare = taosMemoryCalloc(nCf, sizeof(rocksdb_comparator_t**));
|
||||||
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
|
rocksdb_column_family_handle_t** cfHandle = taosMemoryCalloc(nCf, sizeof(rocksdb_column_family_handle_t*));
|
||||||
|
|
||||||
|
@ -861,7 +861,7 @@ int streamStateOpenBackend(void* backend, SStreamState* pState) {
|
||||||
param[i].tableOpt = tableOpt;
|
param[i].tableOpt = tableOpt;
|
||||||
};
|
};
|
||||||
|
|
||||||
rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t**));
|
rocksdb_comparator_t** pCompare = taosMemoryCalloc(cfLen, sizeof(rocksdb_comparator_t*));
|
||||||
for (int i = 0; i < cfLen; i++) {
|
for (int i = 0; i < cfLen; i++) {
|
||||||
SCfInit* cf = &ginitDict[i];
|
SCfInit* cf = &ginitDict[i];
|
||||||
|
|
||||||
|
@ -1012,16 +1012,15 @@ bool streamStateIterSeekAndValid(rocksdb_iterator_t* iter, char* buf, size_t len
|
||||||
}
|
}
|
||||||
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot,
|
rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfName, rocksdb_snapshot_t** snapshot,
|
||||||
rocksdb_readoptions_t** readOpt) {
|
rocksdb_readoptions_t** readOpt) {
|
||||||
int idx = streamStateGetCfIdx(pState, cfName);
|
int idx = streamStateGetCfIdx(pState, cfName);
|
||||||
|
|
||||||
if (snapshot != NULL) {
|
|
||||||
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
|
|
||||||
}
|
|
||||||
rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create();
|
rocksdb_readoptions_t* rOpt = rocksdb_readoptions_create();
|
||||||
*readOpt = rOpt;
|
*readOpt = rOpt;
|
||||||
|
|
||||||
rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
|
if (snapshot != NULL) {
|
||||||
rocksdb_readoptions_set_fill_cache(rOpt, 0);
|
*snapshot = (rocksdb_snapshot_t*)rocksdb_create_snapshot(pState->pTdbState->rocksdb);
|
||||||
|
rocksdb_readoptions_set_snapshot(rOpt, *snapshot);
|
||||||
|
rocksdb_readoptions_set_fill_cache(rOpt, 0);
|
||||||
|
}
|
||||||
|
|
||||||
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt,
|
return rocksdb_create_iterator_cf(pState->pTdbState->rocksdb, rOpt,
|
||||||
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]);
|
((rocksdb_column_family_handle_t**)pState->pTdbState->pHandle)[idx]);
|
||||||
|
@ -1049,8 +1048,8 @@ rocksdb_iterator_t* streamStateIterCreate(SStreamState* pState, const char* cfNa
|
||||||
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
|
int32_t ttlVLen = ginitDict[i].enValueFunc((char*)value, vLen, 0, &ttlV); \
|
||||||
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
|
rocksdb_put_cf(db, opts, pHandle, (const char*)buf, klen, (const char*)ttlV, (size_t)ttlVLen, &err); \
|
||||||
if (err != NULL) { \
|
if (err != NULL) { \
|
||||||
taosMemoryFree(err); \
|
|
||||||
qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
|
qError("streamState str: %s failed to write to %s, err: %s", toString, funcname, err); \
|
||||||
|
taosMemoryFree(err); \
|
||||||
code = -1; \
|
code = -1; \
|
||||||
} else { \
|
} else { \
|
||||||
qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \
|
qTrace("streamState str:%s succ to write to %s, rowValLen:%d, ttlValLen:%d", toString, funcname, vLen, ttlVLen); \
|
||||||
|
@ -1389,8 +1388,6 @@ int32_t streamStateSessionPut_rocksdb(SStreamState* pState, const SSessionKey* k
|
||||||
int code = 0;
|
int code = 0;
|
||||||
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
SStateSessionKey sKey = {.key = *key, .opNum = pState->number};
|
||||||
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
|
STREAM_STATE_PUT_ROCKSDB(pState, "sess", &sKey, value, vLen);
|
||||||
if (code == -1) {
|
|
||||||
}
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
|
int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, void** pVal, int32_t* pVLen) {
|
||||||
|
@ -1408,8 +1405,10 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo
|
||||||
code = -1;
|
code = -1;
|
||||||
} else {
|
} else {
|
||||||
*key = resKey;
|
*key = resKey;
|
||||||
*pVal = taosMemoryCalloc(1, *pVLen);
|
if (pVal != NULL && pVLen != NULL) {
|
||||||
memcpy(*pVal, tmp, *pVLen);
|
*pVal = taosMemoryCalloc(1, *pVLen);
|
||||||
|
memcpy(*pVal, tmp, *pVLen);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosMemoryFree(tmp);
|
taosMemoryFree(tmp);
|
||||||
|
|
|
@ -391,7 +391,13 @@ static void httpHandleReq(SHttpMsg* msg) {
|
||||||
|
|
||||||
// set up timeout to avoid stuck;
|
// set up timeout to avoid stuck;
|
||||||
int32_t fd = taosCreateSocketWithTimeout(5);
|
int32_t fd = taosCreateSocketWithTimeout(5);
|
||||||
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
|
if (fd < 0) {
|
||||||
|
tError("http-report failed to open socket, dst:%s:%d", cli->addr, cli->port);
|
||||||
|
taosReleaseRef(httpRefMgt, httpRef);
|
||||||
|
destroyHttpClient(cli);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
int ret = uv_tcp_open((uv_tcp_t*)&cli->tcp, fd);
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
|
tError("http-report failed to open socket, reason:%s, dst:%s:%d", uv_strerror(ret), cli->addr, cli->port);
|
||||||
taosReleaseRef(httpRefMgt, httpRef);
|
taosReleaseRef(httpRefMgt, httpRef);
|
||||||
|
|
Loading…
Reference in New Issue