fix memory error

This commit is contained in:
Liu Jicong 2022-03-21 23:36:21 +08:00
parent 8a20b2fe50
commit b08c3120bf
8 changed files with 232 additions and 198 deletions

View File

@ -101,13 +101,27 @@ void* blockDataDestroy(SSDataBlock* pBlock);
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) {
if (pBlock == NULL) {
return;
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
// WARNING: do not use info.numOfCols,
// sometimes info.numOfCols != array size
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
for (int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
tfree(pColInfoData->varmeta.offset);
} else {
tfree(pColInfoData->nullbitmap);
}
blockDataDestroy(pBlock);
tfree(pColInfoData->pData);
}
taosArrayDestroy(pBlock->pDataBlock);
tfree(pBlock->pBlockAgg);
}
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { blockDestroyInner(pBlock); }
static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) {
int32_t tlen = 0;
int32_t sz = 0;
@ -157,7 +171,7 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) {
}
free(pRsp->schema);
}
taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock);
taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))blockDestroyInner);
pRsp->pBlockData = NULL;
}

View File

@ -2145,11 +2145,12 @@ typedef struct {
typedef struct {
SMqRspHead head;
// TODO: remove from msg
int64_t reqOffset;
int64_t rspOffset;
int32_t skipLogNum;
// TODO: replace with topic name
int32_t numOfTopics;
// TODO: remove from msg
SSchemaWrapper* schema;
SArray* pBlockData; // SArray<SSDataBlock>
} SMqPollRsp;

View File

@ -735,7 +735,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
SMqClientVg* pVg = pParam->pVg;
tmq_t* tmq = pParam->tmq;
if (code != 0) {
printf("msg discard\n");
printf("msg discard %x\n", code);
goto WRITE_QUEUE_FAIL;
}

View File

@ -53,8 +53,7 @@ bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2) {
}
for (int32_t i = 0; i < s1->numOfEps; i++) {
if (s1->eps[i].port != s2->eps[i].port
|| strncmp(s1->eps[i].fqdn, s2->eps[i].fqdn, TSDB_FQDN_LEN) != 0)
if (s1->eps[i].port != s2->eps[i].port || strncmp(s1->eps[i].fqdn, s2->eps[i].fqdn, TSDB_FQDN_LEN) != 0)
return false;
}
return true;
@ -75,7 +74,6 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) {
return ep;
}
int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) {
ASSERT(pColumnInfoData != NULL);
if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) {
@ -134,18 +132,39 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
} else {
char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow;
switch (type) {
case TSDB_DATA_TYPE_BOOL: {*(bool*) p = *(bool*) pData;break;}
case TSDB_DATA_TYPE_BOOL: {
*(bool*)p = *(bool*)pData;
break;
}
case TSDB_DATA_TYPE_TINYINT:
case TSDB_DATA_TYPE_UTINYINT: {*(int8_t*) p = *(int8_t*) pData;break;}
case TSDB_DATA_TYPE_UTINYINT: {
*(int8_t*)p = *(int8_t*)pData;
break;
}
case TSDB_DATA_TYPE_SMALLINT:
case TSDB_DATA_TYPE_USMALLINT: {*(int16_t*) p = *(int16_t*) pData;break;}
case TSDB_DATA_TYPE_USMALLINT: {
*(int16_t*)p = *(int16_t*)pData;
break;
}
case TSDB_DATA_TYPE_INT:
case TSDB_DATA_TYPE_UINT: {*(int32_t*) p = *(int32_t*) pData;break;}
case TSDB_DATA_TYPE_UINT: {
*(int32_t*)p = *(int32_t*)pData;
break;
}
case TSDB_DATA_TYPE_TIMESTAMP:
case TSDB_DATA_TYPE_BIGINT:
case TSDB_DATA_TYPE_UBIGINT: {*(int64_t*) p = *(int64_t*) pData;break;}
case TSDB_DATA_TYPE_FLOAT: {*(float*) p = *(float*) pData;break;}
case TSDB_DATA_TYPE_DOUBLE: {*(double*) p = *(double*) pData;break;}
case TSDB_DATA_TYPE_UBIGINT: {
*(int64_t*)p = *(int64_t*)pData;
break;
}
case TSDB_DATA_TYPE_FLOAT: {
*(float*)p = *(float*)pData;
break;
}
case TSDB_DATA_TYPE_DOUBLE: {
*(double*)p = *(double*)pData;
break;
}
default:
assert(0);
}
@ -154,7 +173,8 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
return 0;
}
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource, int32_t numOfRow2) {
static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource,
int32_t numOfRow2) {
uint32_t total = numOfRow1 + numOfRow2;
if (BitmapLen(numOfRow1) < BitmapLen(total)) {
@ -188,7 +208,8 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c
}
}
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2) {
int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource,
uint32_t numOfRow2) {
ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type);
if (numOfRow2 == 0) {
@ -244,9 +265,7 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) {
return pBlock->info.numOfCols;
}
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) {
return pBlock->info.rows;
}
size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; }
int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) {
if (pDataBlock == NULL || pDataBlock->info.rows <= 0) {
@ -315,7 +334,8 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) {
// the number of tuples can be fit in one page.
// Actual data rows pluses the corresponding meta data must fit in one memory buffer of the given page size.
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize) {
int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex,
int32_t pageSize) {
ASSERT(pBlock != NULL && stopIndex != NULL);
int32_t numOfCols = pBlock->info.numOfCols;
@ -632,7 +652,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) {
return 0;
}
static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock, int32_t tupleIndex) {
static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock,
int32_t tupleIndex) {
int32_t code = 0;
int32_t numOfCols = pSrcBlock->info.numOfCols;
@ -803,22 +824,30 @@ static int32_t* createTupleIndex(size_t rows) {
return index;
}
static void destroyTupleIndex(int32_t* index) {
tfree(index);
}
static void destroyTupleIndex(int32_t* index) { tfree(index); }
static __compar_fn_t getComparFn(int32_t type, int32_t order) {
switch (type) {
case TSDB_DATA_TYPE_TINYINT: return order == TSDB_ORDER_ASC? compareInt8Val:compareInt8ValDesc;
case TSDB_DATA_TYPE_SMALLINT: return order == TSDB_ORDER_ASC? compareInt16Val:compareInt16ValDesc;
case TSDB_DATA_TYPE_INT: return order == TSDB_ORDER_ASC? compareInt32Val:compareInt32ValDesc;
case TSDB_DATA_TYPE_BIGINT: return order == TSDB_ORDER_ASC? compareInt64Val:compareInt64ValDesc;
case TSDB_DATA_TYPE_FLOAT: return order == TSDB_ORDER_ASC? compareFloatVal:compareFloatValDesc;
case TSDB_DATA_TYPE_DOUBLE: return order == TSDB_ORDER_ASC? compareDoubleVal:compareDoubleValDesc;
case TSDB_DATA_TYPE_UTINYINT: return order == TSDB_ORDER_ASC? compareUint8Val:compareUint8ValDesc;
case TSDB_DATA_TYPE_USMALLINT:return order == TSDB_ORDER_ASC? compareUint16Val:compareUint16ValDesc;
case TSDB_DATA_TYPE_UINT: return order == TSDB_ORDER_ASC? compareUint32Val:compareUint32ValDesc;
case TSDB_DATA_TYPE_UBIGINT: return order == TSDB_ORDER_ASC? compareUint64Val:compareUint64ValDesc;
case TSDB_DATA_TYPE_TINYINT:
return order == TSDB_ORDER_ASC ? compareInt8Val : compareInt8ValDesc;
case TSDB_DATA_TYPE_SMALLINT:
return order == TSDB_ORDER_ASC ? compareInt16Val : compareInt16ValDesc;
case TSDB_DATA_TYPE_INT:
return order == TSDB_ORDER_ASC ? compareInt32Val : compareInt32ValDesc;
case TSDB_DATA_TYPE_BIGINT:
return order == TSDB_ORDER_ASC ? compareInt64Val : compareInt64ValDesc;
case TSDB_DATA_TYPE_FLOAT:
return order == TSDB_ORDER_ASC ? compareFloatVal : compareFloatValDesc;
case TSDB_DATA_TYPE_DOUBLE:
return order == TSDB_ORDER_ASC ? compareDoubleVal : compareDoubleValDesc;
case TSDB_DATA_TYPE_UTINYINT:
return order == TSDB_ORDER_ASC ? compareUint8Val : compareUint8ValDesc;
case TSDB_DATA_TYPE_USMALLINT:
return order == TSDB_ORDER_ASC ? compareUint16Val : compareUint16ValDesc;
case TSDB_DATA_TYPE_UINT:
return order == TSDB_ORDER_ASC ? compareUint32Val : compareUint32ValDesc;
case TSDB_DATA_TYPE_UBIGINT:
return order == TSDB_ORDER_ASC ? compareUint64Val : compareUint64ValDesc;
default:
return order == TSDB_ORDER_ASC ? compareInt32Val : compareInt32ValDesc;
}
@ -865,10 +894,8 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
return TSDB_CODE_SUCCESS;
} else { // var data type
}
} else if (pDataBlock->info.numOfCols == 2) {
}
}
@ -909,7 +936,8 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
copyBackToBlock(pDataBlock, pCols);
int64_t p4 = taosGetTimestampUs();
printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1-p0, p2 - p1, p3 - p2, p4-p3, rows);
printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1,
p3 - p2, p4 - p3, rows);
destroyTupleIndex(index);
return TSDB_CODE_SUCCESS;
@ -917,7 +945,11 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs
typedef struct SHelper {
int32_t index;
union {char *pData; int64_t i64; double d64;};
union {
char* pData;
int64_t i64;
double d64;
};
} SHelper;
SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* pBlock) {
@ -945,7 +977,8 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock*
SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i);
for (int32_t j = 0; j < numOfRows; ++j) {
phelper[j].i64 = *(int32_t*)pInfo->pColData->pData + pInfo->pColData->info.bytes * j;
// memcpy(phelper[j].pData + offset, pInfo->pColData->pData + pInfo->pColData->info.bytes * j, pInfo->pColData->info.bytes);
// memcpy(phelper[j].pData + offset, pInfo->pColData->pData + pInfo->pColData->info.bytes * j,
// pInfo->pColData->info.bytes);
}
offset += pInfo->pColData->info.bytes;
@ -1013,9 +1046,7 @@ int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) {
return 0;
}
int32_t varColSort(SColumnInfoData* pColumnInfoData, SBlockOrderInfo* pOrder) {
return 0;
}
int32_t varColSort(SColumnInfoData* pColumnInfoData, SBlockOrderInfo* pOrder) { return 0; }
int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) {
// Allocate the additional buffer.
@ -1052,7 +1083,8 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF
copyBackToBlock(pDataBlock, pCols);
int64_t p4 = taosGetTimestampUs();
printf("sort:%" PRId64 ", create:%" PRId64", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows);
printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1,
p3 - p2, p4 - p3, rows);
// destroyTupleIndex(index);
return 0;
}
@ -1127,21 +1159,8 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
return NULL;
}
int32_t numOfOutput = pBlock->info.numOfCols;
for(int32_t i = 0; i < numOfOutput; ++i) {
SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
tfree(pColInfoData->varmeta.offset);
} else {
tfree(pColInfoData->nullbitmap);
}
tfree(pColInfoData->pData);
}
taosArrayDestroy(pBlock->pDataBlock);
tfree(pBlock->pBlockAgg);
// tfree(pBlock);
blockDestroyInner(pBlock);
tfree(pBlock);
return NULL;
}

View File

@ -337,9 +337,9 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup)
while (key[i] != TMQ_SEPARATOR) {
i++;
}
memcpy(topic, key, i - 1);
topic[i] = 0;
strcpy(cgroup, &key[i + 1]);
memcpy(cgroup, key, i);
cgroup[i] = 0;
strcpy(topic, &key[i + 1]);
return 0;
}
@ -539,8 +539,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
mndSplitSubscribeKey(pSub->key, topic, cgroup);
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, topic,
pConsumerEp->consumerId);
mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId,
topic, pConsumerEp->consumerId, cgroup);
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
mndReleaseTopic(pMnode, pTopic);

View File

@ -316,7 +316,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead));
tEncodeSMqPollRsp(&abuf, &rsp);
taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);
/*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/
pMsg->pCont = buf;
pMsg->contLen = tlen;
pMsg->code = 0;

View File

@ -306,7 +306,7 @@ int32_t init_env() {
}
//const char* sql = "select * from tu1";
sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s%d", g_stConfInfo.stbName, 0);
sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s", g_stConfInfo.stbName);
/*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/
pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) {