From b08c3120bfbfab9d5a3867d046030d072dddafb3 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Mon, 21 Mar 2022 23:36:21 +0800 Subject: [PATCH] fix memory error --- include/common/tcommon.h | 24 +- include/common/tmsg.h | 9 +- source/client/src/tmq.c | 2 +- source/common/src/tdatablock.c | 373 +++++++++++---------- source/dnode/mnode/impl/src/mndSubscribe.c | 10 +- source/dnode/mnode/impl/src/mndTopic.c | 8 +- source/dnode/vnode/src/tq/tq.c | 2 +- tests/test/c/tmqDemo.c | 2 +- 8 files changed, 232 insertions(+), 198 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 9c36856f1d..5308be72bb 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -101,13 +101,27 @@ void* blockDataDestroy(SSDataBlock* pBlock); int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock); void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock); -static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { - if (pBlock == NULL) { - return; +static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) { + // WARNING: do not use info.numOfCols, + // sometimes info.numOfCols != array size + int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < numOfOutput; ++i) { + SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i); + if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { + tfree(pColInfoData->varmeta.offset); + } else { + tfree(pColInfoData->nullbitmap); + } + + tfree(pColInfoData->pData); } - blockDataDestroy(pBlock); + + taosArrayDestroy(pBlock->pDataBlock); + tfree(pBlock->pBlockAgg); } +static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { blockDestroyInner(pBlock); } + static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) { int32_t tlen = 0; int32_t sz = 0; @@ -157,7 +171,7 @@ static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { } free(pRsp->schema); } - taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))tDeleteSSDataBlock); + taosArrayDestroyEx(pRsp->pBlockData, (void (*)(void*))blockDestroyInner); pRsp->pBlockData = NULL; } diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 58e991c294..c7e37b1613 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2145,11 +2145,12 @@ typedef struct { typedef struct { SMqRspHead head; + int64_t reqOffset; + int64_t rspOffset; + int32_t skipLogNum; + // TODO: replace with topic name + int32_t numOfTopics; // TODO: remove from msg - int64_t reqOffset; - int64_t rspOffset; - int32_t skipLogNum; - int32_t numOfTopics; SSchemaWrapper* schema; SArray* pBlockData; // SArray } SMqPollRsp; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 4cb39ef4f8..7c35b7c149 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -735,7 +735,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { SMqClientVg* pVg = pParam->pVg; tmq_t* tmq = pParam->tmq; if (code != 0) { - printf("msg discard\n"); + printf("msg discard %x\n", code); goto WRITE_QUEUE_FAIL; } diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 31328cc4b2..0d48d7cc14 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -18,14 +18,14 @@ #include "tcompare.h" #include "tglobal.h" -int32_t taosGetFqdnPortFromEp(const char *ep, SEp* pEp) { +int32_t taosGetFqdnPortFromEp(const char* ep, SEp* pEp) { pEp->port = 0; strcpy(pEp->fqdn, ep); - char *temp = strchr(pEp->fqdn, ':'); + char* temp = strchr(pEp->fqdn, ':'); if (temp) { *temp = 0; - pEp->port = atoi(temp+1); + pEp->port = atoi(temp + 1); } if (pEp->port == 0) { @@ -36,7 +36,7 @@ int32_t taosGetFqdnPortFromEp(const char *ep, SEp* pEp) { return 0; } -void addEpIntoEpSet(SEpSet *pEpSet, const char* fqdn, uint16_t port) { +void addEpIntoEpSet(SEpSet* pEpSet, const char* fqdn, uint16_t port) { if (pEpSet == NULL || fqdn == NULL || strlen(fqdn) == 0) { return; } @@ -47,26 +47,25 @@ void addEpIntoEpSet(SEpSet *pEpSet, const char* fqdn, uint16_t port) { pEpSet->numOfEps += 1; } -bool isEpsetEqual(const SEpSet *s1, const SEpSet *s2) { +bool isEpsetEqual(const SEpSet* s1, const SEpSet* s2) { if (s1->numOfEps != s2->numOfEps || s1->inUse != s2->inUse) { return false; } for (int32_t i = 0; i < s1->numOfEps; i++) { - if (s1->eps[i].port != s2->eps[i].port - || strncmp(s1->eps[i].fqdn, s2->eps[i].fqdn, TSDB_FQDN_LEN) != 0) + if (s1->eps[i].port != s2->eps[i].port || strncmp(s1->eps[i].fqdn, s2->eps[i].fqdn, TSDB_FQDN_LEN) != 0) return false; } return true; } -void updateEpSet_s(SCorEpSet *pEpSet, SEpSet *pNewEpSet) { +void updateEpSet_s(SCorEpSet* pEpSet, SEpSet* pNewEpSet) { taosCorBeginWrite(&pEpSet->version); pEpSet->epSet = *pNewEpSet; taosCorEndWrite(&pEpSet->version); } -SEpSet getEpSet_s(SCorEpSet *pEpSet) { +SEpSet getEpSet_s(SCorEpSet* pEpSet) { SEpSet ep = {0}; taosCorBeginRead(&pEpSet->version); ep = pEpSet->epSet; @@ -75,7 +74,6 @@ SEpSet getEpSet_s(SCorEpSet *pEpSet) { return ep; } - int32_t colDataGetLength(const SColumnInfoData* pColumnInfoData, int32_t numOfRows) { ASSERT(pColumnInfoData != NULL); if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { @@ -95,7 +93,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con if (isNull) { // There is a placehold for each NULL value of binary or nchar type. if (IS_VAR_DATA_TYPE(pColumnInfoData->info.type)) { - pColumnInfoData->varmeta.offset[currentRow] = -1; // it is a null value of VAR type. + pColumnInfoData->varmeta.offset[currentRow] = -1; // it is a null value of VAR type. } else { colDataSetNull_f(pColumnInfoData->nullbitmap, currentRow); } @@ -113,7 +111,7 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con newSize = 8; } - while(newSize < pAttr->length + varDataTLen(pData)) { + while (newSize < pAttr->length + varDataTLen(pData)) { newSize = newSize * 1.5; } @@ -133,19 +131,40 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con pColumnInfoData->varmeta.length += varDataTLen(pData); } else { char* p = pColumnInfoData->pData + pColumnInfoData->info.bytes * currentRow; - switch(type) { - case TSDB_DATA_TYPE_BOOL: {*(bool*) p = *(bool*) pData;break;} + switch (type) { + case TSDB_DATA_TYPE_BOOL: { + *(bool*)p = *(bool*)pData; + break; + } case TSDB_DATA_TYPE_TINYINT: - case TSDB_DATA_TYPE_UTINYINT: {*(int8_t*) p = *(int8_t*) pData;break;} + case TSDB_DATA_TYPE_UTINYINT: { + *(int8_t*)p = *(int8_t*)pData; + break; + } case TSDB_DATA_TYPE_SMALLINT: - case TSDB_DATA_TYPE_USMALLINT: {*(int16_t*) p = *(int16_t*) pData;break;} + case TSDB_DATA_TYPE_USMALLINT: { + *(int16_t*)p = *(int16_t*)pData; + break; + } case TSDB_DATA_TYPE_INT: - case TSDB_DATA_TYPE_UINT: {*(int32_t*) p = *(int32_t*) pData;break;} + case TSDB_DATA_TYPE_UINT: { + *(int32_t*)p = *(int32_t*)pData; + break; + } case TSDB_DATA_TYPE_TIMESTAMP: case TSDB_DATA_TYPE_BIGINT: - case TSDB_DATA_TYPE_UBIGINT: {*(int64_t*) p = *(int64_t*) pData;break;} - case TSDB_DATA_TYPE_FLOAT: {*(float*) p = *(float*) pData;break;} - case TSDB_DATA_TYPE_DOUBLE: {*(double*) p = *(double*) pData;break;} + case TSDB_DATA_TYPE_UBIGINT: { + *(int64_t*)p = *(int64_t*)pData; + break; + } + case TSDB_DATA_TYPE_FLOAT: { + *(float*)p = *(float*)pData; + break; + } + case TSDB_DATA_TYPE_DOUBLE: { + *(double*)p = *(double*)pData; + break; + } default: assert(0); } @@ -154,7 +173,8 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con return 0; } -static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource, int32_t numOfRow2) { +static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, const SColumnInfoData* pSource, + int32_t numOfRow2) { uint32_t total = numOfRow1 + numOfRow2; if (BitmapLen(numOfRow1) < BitmapLen(total)) { @@ -188,7 +208,8 @@ static void doBitmapMerge(SColumnInfoData* pColumnInfoData, int32_t numOfRow1, c } } -int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, uint32_t numOfRow2) { +int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, const SColumnInfoData* pSource, + uint32_t numOfRow2) { ASSERT(pColumnInfoData != NULL && pSource != NULL && pColumnInfoData->info.type == pSource->info.type); if (numOfRow2 == 0) { @@ -202,8 +223,8 @@ int32_t colDataMergeCol(SColumnInfoData* pColumnInfoData, uint32_t numOfRow1, co // TODO } - pColumnInfoData->varmeta.offset = (int32_t*) p; - for(int32_t i = 0; i < numOfRow2; ++i) { + pColumnInfoData->varmeta.offset = (int32_t*)p; + for (int32_t i = 0; i < numOfRow2; ++i) { pColumnInfoData->varmeta.offset[i + numOfRow1] = pSource->varmeta.offset[i] + pColumnInfoData->varmeta.length; } @@ -244,9 +265,7 @@ size_t blockDataGetNumOfCols(const SSDataBlock* pBlock) { return pBlock->info.numOfCols; } -size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { - return pBlock->info.rows; -} +size_t blockDataGetNumOfRows(const SSDataBlock* pBlock) { return pBlock->info.rows; } int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) { if (pDataBlock == NULL || pDataBlock->info.rows <= 0) { @@ -263,8 +282,8 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock) { } ASSERT(pColInfoData->nullbitmap == NULL); - pDataBlock->info.window.skey = *(TSKEY*) colDataGetData(pColInfoData, 0); - pDataBlock->info.window.ekey = *(TSKEY*) colDataGetData(pColInfoData, (pDataBlock->info.rows - 1)); + pDataBlock->info.window.skey = *(TSKEY*)colDataGetData(pColInfoData, 0); + pDataBlock->info.window.ekey = *(TSKEY*)colDataGetData(pColInfoData, (pDataBlock->info.rows - 1)); return 0; } @@ -272,7 +291,7 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { assert(pSrc != NULL && pDest != NULL && pDest->info.numOfCols == pSrc->info.numOfCols); int32_t numOfCols = pSrc->info.numOfCols; - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol2 = taosArrayGet(pDest->pDataBlock, i); SColumnInfoData* pCol1 = taosArrayGet(pSrc->pDataBlock, i); @@ -280,7 +299,7 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { uint32_t newLen = colDataGetLength(pCol1, pSrc->info.rows); int32_t newSize = oldLen + newLen; - char* tmp = realloc(pCol2->pData, newSize); + char* tmp = realloc(pCol2->pData, newSize); if (tmp != NULL) { pCol2->pData = tmp; colDataMergeCol(pCol2, pDest->info.rows, pCol1, pSrc->info.rows); @@ -296,10 +315,10 @@ int32_t blockDataMerge(SSDataBlock* pDest, const SSDataBlock* pSrc) { size_t blockDataGetSize(const SSDataBlock* pBlock) { assert(pBlock != NULL); - size_t total = 0; + size_t total = 0; int32_t numOfCols = pBlock->info.numOfCols; - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); total += colDataGetLength(pColInfoData, pBlock->info.rows); @@ -315,7 +334,8 @@ size_t blockDataGetSize(const SSDataBlock* pBlock) { // the number of tuples can be fit in one page. // Actual data rows pluses the corresponding meta data must fit in one memory buffer of the given page size. -int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, int32_t pageSize) { +int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startIndex, int32_t* stopIndex, + int32_t pageSize) { ASSERT(pBlock != NULL && stopIndex != NULL); int32_t numOfCols = pBlock->info.numOfCols; @@ -323,13 +343,13 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd int32_t bitmapChar = 1; - size_t headerSize = sizeof(int32_t); + size_t headerSize = sizeof(int32_t); size_t colHeaderSize = sizeof(int32_t) * numOfCols; - size_t payloadSize = pageSize - (headerSize + colHeaderSize); + size_t payloadSize = pageSize - (headerSize + colHeaderSize); // TODO speedup by checking if the whole page can fit in firstly. if (!hasVarCol) { - size_t rowSize = blockDataGetRowSize(pBlock); + size_t rowSize = blockDataGetRowSize(pBlock); int32_t capacity = (payloadSize / (rowSize * 8 + bitmapChar * numOfCols)) * 8; *stopIndex = startIndex + capacity; @@ -342,7 +362,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd // iterate the rows that can be fit in this buffer page int32_t size = (headerSize + colHeaderSize); - for(int32_t j = startIndex; j < numOfRows; ++j) { + for (int32_t j = startIndex; j < numOfRows; ++j) { for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = TARRAY_GET_ELEM(pBlock->pDataBlock, i); if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { @@ -359,7 +379,7 @@ int32_t blockDataSplitRows(SSDataBlock* pBlock, bool hasVarCol, int32_t startInd size += pColInfoData->info.bytes; if (((j - startIndex) & 0x07) == 0) { - size += 1; // the space for null bitmap + size += 1; // the space for null bitmap } } } @@ -393,8 +413,8 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 pDst->info.rows = 0; pDst->pDataBlock = taosArrayInit(pBlock->info.numOfCols, sizeof(SColumnInfoData)); - for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { - SColumnInfoData colInfo = {0}; + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + SColumnInfoData colInfo = {0}; SColumnInfoData* pSrcCol = taosArrayGet(pBlock->pDataBlock, i); colInfo.info = pSrcCol->info; @@ -414,7 +434,7 @@ SSDataBlock* blockDataExtractBlock(SSDataBlock* pBlock, int32_t startIndex, int3 SColumnInfoData* pDstCol = taosArrayGet(pDst->pDataBlock, i); for (int32_t j = startIndex; j < (startIndex + rowCount); ++j) { - bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg); + bool isNull = colDataIsNull(pColData, pBlock->info.rows, j, pBlock->pBlockAgg); char* p = colDataGetData(pColData, j); colDataAppend(pDstCol, j - startIndex, p, isNull); @@ -440,14 +460,14 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { ASSERT(pBlock != NULL); // write the number of rows - *(uint32_t*) buf = pBlock->info.rows; + *(uint32_t*)buf = pBlock->info.rows; int32_t numOfCols = pBlock->info.numOfCols; int32_t numOfRows = pBlock->info.rows; char* pStart = buf + sizeof(uint32_t); - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); if (IS_VAR_DATA_TYPE(pCol->info.type)) { memcpy(pStart, pCol->varmeta.offset, numOfRows * sizeof(int32_t)); @@ -459,7 +479,7 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { uint32_t dataSize = colDataGetLength(pCol, numOfRows); - *(int32_t*) pStart = dataSize; + *(int32_t*)pStart = dataSize; pStart += sizeof(int32_t); memcpy(pStart, pCol->pData, dataSize); @@ -470,12 +490,12 @@ int32_t blockDataToBuf(char* buf, const SSDataBlock* pBlock) { } int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { - pBlock->info.rows = *(int32_t*) buf; + pBlock->info.rows = *(int32_t*)buf; - int32_t numOfCols = pBlock->info.numOfCols; + int32_t numOfCols = pBlock->info.numOfCols; const char* pStart = buf + sizeof(uint32_t); - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, i); size_t metaSize = pBlock->info.rows * sizeof(int32_t); @@ -487,7 +507,7 @@ int32_t blockDataFromBuf(SSDataBlock* pBlock, const char* buf) { pStart += BitmapLen(pBlock->info.rows); } - int32_t colLength = *(int32_t*) pStart; + int32_t colLength = *(int32_t*)pStart; pStart += sizeof(int32_t); if (IS_VAR_DATA_TYPE(pCol->info.type)) { @@ -517,7 +537,7 @@ size_t blockDataGetRowSize(const SSDataBlock* pBlock) { size_t rowSize = 0; size_t numOfCols = pBlock->info.numOfCols; - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); rowSize += pColInfo->info.bytes; } @@ -537,10 +557,10 @@ size_t blockDataGetSerialMetaSize(const SSDataBlock* pBlock) { SSchema* blockDataExtractSchema(const SSDataBlock* pBlock, int32_t* numOfCols) { SSchema* pSchema = calloc(pBlock->info.numOfCols, sizeof(SSchema)); - for(int32_t i = 0; i < pBlock->info.numOfCols; ++i) { + for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); pSchema[i].bytes = pColInfoData->info.bytes; - pSchema[i].type = pColInfoData->info.type; + pSchema[i].type = pColInfoData->info.type; pSchema[i].colId = pColInfoData->info.colId; } @@ -556,14 +576,14 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { double rowSize = 0; size_t numOfCols = pBlock->info.numOfCols; - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); rowSize += pColInfo->info.bytes; if (IS_VAR_DATA_TYPE(pColInfo->info.type)) { rowSize += sizeof(int32_t); } else { - rowSize += 1/8.0; + rowSize += 1 / 8.0; } } @@ -571,56 +591,56 @@ double blockDataGetSerialRowSize(const SSDataBlock* pBlock) { } typedef struct SSDataBlockSortHelper { - SArray *orderInfo; // SArray - SSDataBlock *pDataBlock; + SArray* orderInfo; // SArray + SSDataBlock* pDataBlock; bool nullFirst; } SSDataBlockSortHelper; int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { - const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*) param; + const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param; SSDataBlock* pDataBlock = pHelper->pDataBlock; - int32_t left = *(int32_t*) p1; - int32_t right = *(int32_t*) p2; + int32_t left = *(int32_t*)p1; + int32_t right = *(int32_t*)p2; SArray* pInfo = pHelper->orderInfo; - for(int32_t i = 0; i < pInfo->size; ++i) { + for (int32_t i = 0; i < pInfo->size; ++i) { SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, i); - SColumnInfoData* pColInfoData = pOrder->pColData;//TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex); + SColumnInfoData* pColInfoData = pOrder->pColData; // TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex); if (pColInfoData->hasNull) { - bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, pDataBlock->pBlockAgg); + bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, pDataBlock->pBlockAgg); bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, pDataBlock->pBlockAgg); if (leftNull && rightNull) { - continue; // continue to next slot + continue; // continue to next slot } if (rightNull) { - return pHelper->nullFirst? 1:-1; + return pHelper->nullFirst ? 1 : -1; } if (leftNull) { - return pHelper->nullFirst? -1:1; + return pHelper->nullFirst ? -1 : 1; } } - void* left1 = colDataGetData(pColInfoData, left); + void* left1 = colDataGetData(pColInfoData, left); void* right1 = colDataGetData(pColInfoData, right); - switch(pColInfoData->info.type) { + switch (pColInfoData->info.type) { case TSDB_DATA_TYPE_INT: { - int32_t leftx = *(int32_t*) left1; - int32_t rightx = *(int32_t*) right1; + int32_t leftx = *(int32_t*)left1; + int32_t rightx = *(int32_t*)right1; if (leftx == rightx) { break; } else { if (pOrder->order == TSDB_ORDER_ASC) { - return (leftx < rightx)? -1:1; + return (leftx < rightx) ? -1 : 1; } else { - return (leftx < rightx)? 1:-1; + return (leftx < rightx) ? 1 : -1; } } } @@ -632,7 +652,8 @@ int32_t dataBlockCompar(const void* p1, const void* p2, const void* param) { return 0; } -static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock, int32_t tupleIndex) { +static int32_t doAssignOneTuple(SColumnInfoData* pDstCols, int32_t numOfRows, const SSDataBlock* pSrcBlock, + int32_t tupleIndex) { int32_t code = 0; int32_t numOfCols = pSrcBlock->info.numOfCols; @@ -666,17 +687,17 @@ static int32_t blockDataAssign(SColumnInfoData* pCols, const SSDataBlock* pDataB } } #else - for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { + for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* pDst = &pCols[i]; SColumnInfoData* pSrc = taosArrayGet(pDataBlock->pDataBlock, i); if (IS_VAR_DATA_TYPE(pSrc->info.type)) { - memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length); - pDst->varmeta.length = pSrc->varmeta.length; + memcpy(pDst->pData, pSrc->pData, pSrc->varmeta.length); + pDst->varmeta.length = pSrc->varmeta.length; - for(int32_t j = 0; j < pDataBlock->info.rows; ++j) { - pDst->varmeta.offset[j] = pSrc->varmeta.offset[index[j]]; - } + for (int32_t j = 0; j < pDataBlock->info.rows; ++j) { + pDst->varmeta.offset[j] = pSrc->varmeta.offset[index[j]]; + } } else { switch (pSrc->info.type) { case TSDB_DATA_TYPE_UINT: @@ -749,7 +770,7 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { return NULL; } - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i); pCols[i].info = pColInfoData->info; @@ -771,7 +792,7 @@ static SColumnInfoData* createHelpColInfoData(const SSDataBlock* pDataBlock) { static void copyBackToBlock(SSDataBlock* pDataBlock, SColumnInfoData* pCols) { int32_t numOfCols = pDataBlock->info.numOfCols; - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, i); pColInfoData->info = pCols[i].info; @@ -796,31 +817,39 @@ static int32_t* createTupleIndex(size_t rows) { return NULL; } - for(int32_t i = 0; i < rows; ++i) { + for (int32_t i = 0; i < rows; ++i) { index[i] = i; } return index; } -static void destroyTupleIndex(int32_t* index) { - tfree(index); -} +static void destroyTupleIndex(int32_t* index) { tfree(index); } static __compar_fn_t getComparFn(int32_t type, int32_t order) { - switch(type) { - case TSDB_DATA_TYPE_TINYINT: return order == TSDB_ORDER_ASC? compareInt8Val:compareInt8ValDesc; - case TSDB_DATA_TYPE_SMALLINT: return order == TSDB_ORDER_ASC? compareInt16Val:compareInt16ValDesc; - case TSDB_DATA_TYPE_INT: return order == TSDB_ORDER_ASC? compareInt32Val:compareInt32ValDesc; - case TSDB_DATA_TYPE_BIGINT: return order == TSDB_ORDER_ASC? compareInt64Val:compareInt64ValDesc; - case TSDB_DATA_TYPE_FLOAT: return order == TSDB_ORDER_ASC? compareFloatVal:compareFloatValDesc; - case TSDB_DATA_TYPE_DOUBLE: return order == TSDB_ORDER_ASC? compareDoubleVal:compareDoubleValDesc; - case TSDB_DATA_TYPE_UTINYINT: return order == TSDB_ORDER_ASC? compareUint8Val:compareUint8ValDesc; - case TSDB_DATA_TYPE_USMALLINT:return order == TSDB_ORDER_ASC? compareUint16Val:compareUint16ValDesc; - case TSDB_DATA_TYPE_UINT: return order == TSDB_ORDER_ASC? compareUint32Val:compareUint32ValDesc; - case TSDB_DATA_TYPE_UBIGINT: return order == TSDB_ORDER_ASC? compareUint64Val:compareUint64ValDesc; + switch (type) { + case TSDB_DATA_TYPE_TINYINT: + return order == TSDB_ORDER_ASC ? compareInt8Val : compareInt8ValDesc; + case TSDB_DATA_TYPE_SMALLINT: + return order == TSDB_ORDER_ASC ? compareInt16Val : compareInt16ValDesc; + case TSDB_DATA_TYPE_INT: + return order == TSDB_ORDER_ASC ? compareInt32Val : compareInt32ValDesc; + case TSDB_DATA_TYPE_BIGINT: + return order == TSDB_ORDER_ASC ? compareInt64Val : compareInt64ValDesc; + case TSDB_DATA_TYPE_FLOAT: + return order == TSDB_ORDER_ASC ? compareFloatVal : compareFloatValDesc; + case TSDB_DATA_TYPE_DOUBLE: + return order == TSDB_ORDER_ASC ? compareDoubleVal : compareDoubleValDesc; + case TSDB_DATA_TYPE_UTINYINT: + return order == TSDB_ORDER_ASC ? compareUint8Val : compareUint8ValDesc; + case TSDB_DATA_TYPE_USMALLINT: + return order == TSDB_ORDER_ASC ? compareUint16Val : compareUint16ValDesc; + case TSDB_DATA_TYPE_UINT: + return order == TSDB_ORDER_ASC ? compareUint32Val : compareUint32ValDesc; + case TSDB_DATA_TYPE_UBIGINT: + return order == TSDB_ORDER_ASC ? compareUint64Val : compareUint64ValDesc; default: - return order == TSDB_ORDER_ASC? compareInt32Val:compareInt32ValDesc; + return order == TSDB_ORDER_ASC ? compareInt32Val : compareInt32ValDesc; } } @@ -865,10 +894,8 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs return TSDB_CODE_SUCCESS; } else { // var data type - } } else if (pDataBlock->info.numOfCols == 2) { - } } @@ -881,7 +908,7 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs int64_t p0 = taosGetTimestampUs(); SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; - for(int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(helper.orderInfo); ++i) { struct SBlockOrderInfo* pInfo = taosArrayGet(helper.orderInfo, i); pInfo->pColData = taosArrayGet(pDataBlock->pDataBlock, pInfo->colIndex); } @@ -909,7 +936,8 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs copyBackToBlock(pDataBlock, pCols); int64_t p4 = taosGetTimestampUs(); - printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1-p0, p2 - p1, p3 - p2, p4-p3, rows); + printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1, + p3 - p2, p4 - p3, rows); destroyTupleIndex(index); return TSDB_CODE_SUCCESS; @@ -917,14 +945,18 @@ int32_t blockDataSort(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirs typedef struct SHelper { int32_t index; - union {char *pData; int64_t i64; double d64;}; + union { + char* pData; + int64_t i64; + double d64; + }; } SHelper; SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* pBlock) { int32_t sortValLengthPerRow = 0; int32_t numOfCols = taosArrayGetSize(pOrderInfo); - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, pInfo->colIndex); pInfo->pColData = pColInfo; @@ -933,19 +965,20 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* size_t len = sortValLengthPerRow * pBlock->info.rows; - char* buf = calloc(1, len); + char* buf = calloc(1, len); SHelper* phelper = calloc(numOfRows, sizeof(SHelper)); - for(int32_t i = 0; i < numOfRows; ++i) { + for (int32_t i = 0; i < numOfRows; ++i) { phelper[i].index = i; phelper[i].pData = buf + sortValLengthPerRow * i; } int32_t offset = 0; - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SBlockOrderInfo* pInfo = taosArrayGet(pOrderInfo, i); - for(int32_t j = 0; j < numOfRows; ++j) { - phelper[j].i64 = *(int32_t*) pInfo->pColData->pData + pInfo->pColData->info.bytes * j; -// memcpy(phelper[j].pData + offset, pInfo->pColData->pData + pInfo->pColData->info.bytes * j, pInfo->pColData->info.bytes); + for (int32_t j = 0; j < numOfRows; ++j) { + phelper[j].i64 = *(int32_t*)pInfo->pColData->pData + pInfo->pColData->info.bytes * j; + // memcpy(phelper[j].pData + offset, pInfo->pColData->pData + pInfo->pColData->info.bytes * j, + // pInfo->pColData->info.bytes); } offset += pInfo->pColData->info.bytes; @@ -955,70 +988,68 @@ SHelper* createTupleIndex_rv(int32_t numOfRows, SArray* pOrderInfo, SSDataBlock* } int32_t dataBlockCompar_rv(const void* p1, const void* p2, const void* param) { - const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*) param; + const SSDataBlockSortHelper* pHelper = (const SSDataBlockSortHelper*)param; -// SSDataBlock* pDataBlock = pHelper->pDataBlock; + // SSDataBlock* pDataBlock = pHelper->pDataBlock; - SHelper* left = (SHelper*) p1; - SHelper* right = (SHelper*) p2; + SHelper* left = (SHelper*)p1; + SHelper* right = (SHelper*)p2; SArray* pInfo = pHelper->orderInfo; int32_t offset = 0; -// for(int32_t i = 0; i < pInfo->size; ++i) { -// SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, 0); -// SColumnInfoData* pColInfoData = pOrder->pColData;//TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex); + // for(int32_t i = 0; i < pInfo->size; ++i) { + // SBlockOrderInfo* pOrder = TARRAY_GET_ELEM(pInfo, 0); + // SColumnInfoData* pColInfoData = pOrder->pColData;//TARRAY_GET_ELEM(pDataBlock->pDataBlock, pOrder->colIndex); -// if (pColInfoData->hasNull) { -// bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, pDataBlock->pBlockAgg); -// bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, pDataBlock->pBlockAgg); -// if (leftNull && rightNull) { -// continue; // continue to next slot -// } -// -// if (rightNull) { -// return pHelper->nullFirst? 1:-1; -// } -// -// if (leftNull) { -// return pHelper->nullFirst? -1:1; -// } -// } + // if (pColInfoData->hasNull) { + // bool leftNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, left, pDataBlock->pBlockAgg); + // bool rightNull = colDataIsNull(pColInfoData, pDataBlock->info.rows, right, pDataBlock->pBlockAgg); + // if (leftNull && rightNull) { + // continue; // continue to next slot + // } + // + // if (rightNull) { + // return pHelper->nullFirst? 1:-1; + // } + // + // if (leftNull) { + // return pHelper->nullFirst? -1:1; + // } + // } -// void* left1 = colDataGetData(pColInfoData, left); -// void* right1 = colDataGetData(pColInfoData, right); + // void* left1 = colDataGetData(pColInfoData, left); + // void* right1 = colDataGetData(pColInfoData, right); -// switch(pColInfoData->info.type) { -// case TSDB_DATA_TYPE_INT: { - int32_t leftx = *(int32_t*)left->pData;//*(int32_t*)(left->pData + offset); - int32_t rightx = *(int32_t*)right->pData;//*(int32_t*)(right->pData + offset); + // switch(pColInfoData->info.type) { + // case TSDB_DATA_TYPE_INT: { + int32_t leftx = *(int32_t*)left->pData; //*(int32_t*)(left->pData + offset); + int32_t rightx = *(int32_t*)right->pData; //*(int32_t*)(right->pData + offset); -// offset += pColInfoData->info.bytes; - if (leftx == rightx) { -// break; - return 0; - } else { -// if (pOrder->order == TSDB_ORDER_ASC) { - return (leftx < rightx)? -1:1; -// } else { -// return (leftx < rightx)? 1:-1; -// } - } -// } -// default: -// assert(0); -// } -// } + // offset += pColInfoData->info.bytes; + if (leftx == rightx) { + // break; + return 0; + } else { + // if (pOrder->order == TSDB_ORDER_ASC) { + return (leftx < rightx) ? -1 : 1; + // } else { + // return (leftx < rightx)? 1:-1; + // } + } + // } + // default: + // assert(0); + // } + // } return 0; } -int32_t varColSort(SColumnInfoData* pColumnInfoData, SBlockOrderInfo* pOrder) { - return 0; -} +int32_t varColSort(SColumnInfoData* pColumnInfoData, SBlockOrderInfo* pOrder) { return 0; } int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullFirst) { -// Allocate the additional buffer. + // Allocate the additional buffer. int64_t p0 = taosGetTimestampUs(); SSDataBlockSortHelper helper = {.nullFirst = nullFirst, .pDataBlock = pDataBlock, .orderInfo = pOrderInfo}; @@ -1032,7 +1063,7 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF taosqsort(index, rows, sizeof(SHelper), &helper, dataBlockCompar_rv); - int64_t p1 = taosGetTimestampUs(); + int64_t p1 = taosGetTimestampUs(); SColumnInfoData* pCols = createHelpColInfoData(pDataBlock); if (pCols == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -1052,7 +1083,8 @@ int32_t blockDataSort_rv(SSDataBlock* pDataBlock, SArray* pOrderInfo, bool nullF copyBackToBlock(pDataBlock, pCols); int64_t p4 = taosGetTimestampUs(); - printf("sort:%" PRId64 ", create:%" PRId64", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1, p3 - p2, p4 - p3, rows); + printf("sort:%" PRId64 ", create:%" PRId64 ", assign:%" PRId64 ", copyback:%" PRId64 ", rows:%d\n", p1 - p0, p2 - p1, + p3 - p2, p4 - p3, rows); // destroyTupleIndex(index); return 0; } @@ -1110,8 +1142,8 @@ int32_t blockDataEnsureColumnCapacity(SColumnInfoData* pColumn, uint32_t numOfRo int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) { int32_t code = 0; - - for(int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { + + for (int32_t i = 0; i < pDataBlock->info.numOfCols; ++i) { SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); code = blockDataEnsureColumnCapacity(p, numOfRows); if (code) { @@ -1127,21 +1159,8 @@ void* blockDataDestroy(SSDataBlock* pBlock) { return NULL; } - int32_t numOfOutput = pBlock->info.numOfCols; - for(int32_t i = 0; i < numOfOutput; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) { - tfree(pColInfoData->varmeta.offset); - } else { - tfree(pColInfoData->nullbitmap); - } - - tfree(pColInfoData->pData); - } - - taosArrayDestroy(pBlock->pDataBlock); - tfree(pBlock->pBlockAgg); - // tfree(pBlock); + blockDestroyInner(pBlock); + tfree(pBlock); return NULL; } @@ -1154,7 +1173,7 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock) { pBlock->info.numOfCols = numOfCols; pBlock->info.hasVarCol = pDataBlock->info.hasVarCol; - for(int32_t i = 0; i < numOfCols; ++i) { + for (int32_t i = 0; i < numOfCols; ++i) { SColumnInfoData colInfo = {0}; SColumnInfoData* p = taosArrayGet(pDataBlock->pDataBlock, i); colInfo.info = p->info; @@ -1230,4 +1249,4 @@ void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) { taosArrayPush(pBlock->pDataBlock, &data); } return (void*)buf; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index ead8d6805b..8b52f30474 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -337,9 +337,9 @@ static int32_t mndSplitSubscribeKey(const char *key, char *topic, char *cgroup) while (key[i] != TMQ_SEPARATOR) { i++; } - memcpy(topic, key, i - 1); - topic[i] = 0; - strcpy(cgroup, &key[i + 1]); + memcpy(cgroup, key, i); + cgroup[i] = 0; + strcpy(topic, &key[i + 1]); return 0; } @@ -539,8 +539,8 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) { mndSplitSubscribeKey(pSub->key, topic, cgroup); SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); - mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 "", pConsumerEp->vgId, topic, - pConsumerEp->consumerId); + mInfo("mq set conn: assign vgroup %d of topic %s to consumer %" PRId64 " cgroup: %s", pConsumerEp->vgId, + topic, pConsumerEp->consumerId, cgroup); mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp); mndReleaseTopic(pMnode, pTopic); diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index e94ae4f8ec..de3a686d15 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -241,12 +241,12 @@ static int32_t mndGetPlanString(SCMCreateTopicReq *pCreate, char **pStr) { return TSDB_CODE_SUCCESS; } - SNode* pAst = NULL; + SNode *pAst = NULL; int32_t code = nodesStringToNode(pCreate->ast, &pAst); - SQueryPlan* pPlan = NULL; + SQueryPlan *pPlan = NULL; if (TSDB_CODE_SUCCESS == code) { - SPlanContext cxt = { .pAstRoot = pAst, .topicQuery = true }; + SPlanContext cxt = {.pAstRoot = pAst, .topicQuery = true}; code = qCreateQueryPlan(&cxt, &pPlan, NULL); } @@ -274,7 +274,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq topicObj.logicalPlan = ""; topicObj.sqlLen = strlen(pCreate->sql); - char* pPlanStr = NULL; + char *pPlanStr = NULL; if (TSDB_CODE_SUCCESS != mndGetPlanString(pCreate, &pPlanStr)) { mError("topic:%s, failed to get plan since %s", pCreate->name, terrstr()); return -1; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d9337d8edc..83b3649aa6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -316,7 +316,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); tEncodeSMqPollRsp(&abuf, &rsp); - taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); + /*taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock);*/ pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index 08e49a7efe..cfbbf52057 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -306,7 +306,7 @@ int32_t init_env() { } //const char* sql = "select * from tu1"; - sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s%d", g_stConfInfo.stbName, 0); + sprintf(sqlStr, "create topic test_stb_topic_1 as select * from %s", g_stConfInfo.stbName); /*pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));*/ pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) {