diff --git a/cmake/taostools_CMakeLists.txt.in b/cmake/taostools_CMakeLists.txt.in index 1e2247eedc..a69e1578c0 100644 --- a/cmake/taostools_CMakeLists.txt.in +++ b/cmake/taostools_CMakeLists.txt.in @@ -2,7 +2,7 @@ # taos-tools ExternalProject_Add(taos-tools GIT_REPOSITORY https://github.com/taosdata/taos-tools.git - GIT_TAG 5c53cc8 + GIT_TAG 7d24ed5 SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools" BINARY_DIR "" #BUILD_IN_SOURCE TRUE diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 9be79a539f..52edd9708c 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -108,7 +108,8 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* void* smlInitHandle(SQuery* pQuery); void smlDestroyHandle(void* pHandle); int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta, - char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen); + char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, + int16_t msgBufLen); int32_t smlBuildOutput(void* handle, SHashObj* pVgHash); int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray); diff --git a/packaging/release.sh b/packaging/release.sh index e442e07c6a..1dfbf2b112 100755 --- a/packaging/release.sh +++ b/packaging/release.sh @@ -2,7 +2,7 @@ # # Generate the deb package for ubuntu, or rpm package for centos, or tar.gz package for other linux os -# set -e +set -e # set -x # release.sh -v [cluster | edge] diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index 1ec6450228..5646f58ee1 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -152,9 +152,10 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } -int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, const char* sTableName, bool autoCreateTbl) { +int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, const char* sTableName, + bool autoCreateTbl) { STscStmt* pStmt = (STscStmt*)stmt; - char tbFName[TSDB_TABLE_FNAME_LEN]; + char tbFName[TSDB_TABLE_FNAME_LEN]; tNameExtractFullName(tbName, tbFName); memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName)); @@ -772,9 +773,9 @@ int stmtAddBatch(TAOS_STMT* stmt) { int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { tscDebug("stmt start to update tbUid, blockNum: %d", pRsp->nBlocks); - int32_t code = 0; - int32_t finalCode = 0; - size_t keyLen = 0; + int32_t code = 0; + int32_t finalCode = 0; + size_t keyLen = 0; STableDataBlocks** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL); while (pIter) { STableDataBlocks* pBlock = *pIter; @@ -844,7 +845,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) { pMeta->uid = pTableMeta->uid; pStmt->bInfo.tbUid = pTableMeta->uid; - taosMemoryFree(pTableMeta); + taosMemoryFree(pTableMeta); } pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ee93756f0e..85282a2340 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -1758,11 +1758,14 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } if (minKey == k.ts) { + STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); + if (pSchema == NULL) { + return terrno; + } if (init) { - tRowMerge(&merge, pRow); + tRowMergerAdd(&merge, pRow, pSchema); } else { init = true; - STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid); int32_t code = tRowMergerInit(&merge, pRow, pSchema); if (code != TSDB_CODE_SUCCESS) { return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 86adc1dc80..ab10960970 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -731,6 +731,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal); if (key.version > pMerger->version) { +#if 0 if (!COL_VAL_IS_NONE(pColVal)) { if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) { SColVal *tColVal = taosArrayGet(pMerger->pArray, iCol); @@ -746,6 +747,28 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { taosArraySet(pMerger->pArray, iCol, pColVal); } } +#endif + if (!COL_VAL_IS_NONE(pColVal)) { + if (IS_VAR_DATA_TYPE(pColVal->type)) { + SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); + if (!COL_VAL_IS_NULL(pColVal)) { + code = tRealloc(&pTColVal->value.pData, pColVal->value.nData); + if (code) return code; + + pTColVal->value.nData = pColVal->value.nData; + if (pTColVal->value.nData) { + memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData); + } + pTColVal->flag = 0; + } else { + tFree(pTColVal->value.pData); + pTColVal->value.pData = NULL; + taosArraySet(pMerger->pArray, iCol, pColVal); + } + } else { + taosArraySet(pMerger->pArray, iCol, pColVal); + } + } } else if (key.version < pMerger->version) { SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 5676e19cdf..bf4b9a2599 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -593,8 +593,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf int32_t pageId = 0; pPage = getNewBufPage(pInfo->pBuf, &pageId); - taosArrayPush(p->pPageList, &pageId); + if (pPage == NULL) { + return pPage; + } + taosArrayPush(p->pPageList, &pageId); *(int32_t*)pPage = 0; } else { int32_t* curId = taosArrayGetLast(p->pPageList); @@ -612,6 +615,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf // add a new page for current group int32_t pageId = 0; pPage = getNewBufPage(pInfo->pBuf, &pageId); + if (pPage == NULL) { + qError("failed to get new buffer, code:%s", tstrerror(terrno)); + return NULL; + } + taosArrayPush(p->pPageList, &pageId); memset(pPage, 0, getBufPageSize(pInfo->pBuf)); } diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 324011f238..e4081ddf0d 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -3061,14 +3061,12 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, if (pHandle->currentPage == -1) { pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } pPage->num = sizeof(SFilePage); } else { pPage = getBufPage(pHandle->pBuf, pHandle->currentPage); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } if (pPage->num + length > getBufPageSize(pHandle->pBuf)) { @@ -3076,7 +3074,6 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf, releaseBufPage(pHandle->pBuf, pPage); pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } pPage->num = sizeof(SFilePage); @@ -3123,7 +3120,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf if (pHandle->pBuf != NULL) { SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId); if (pPage == NULL) { - terrno = TSDB_CODE_NO_AVAIL_DISK; return terrno; } memcpy(pPage->data + pPos->offset, pBuf, length); diff --git a/source/libs/function/src/tpercentile.c b/source/libs/function/src/tpercentile.c index 871b668cfd..6b85155486 100644 --- a/source/libs/function/src/tpercentile.c +++ b/source/libs/function/src/tpercentile.c @@ -43,8 +43,8 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx) if (pg == NULL) { return NULL; } - memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); + memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes)); offset += (int32_t)(pg->num * pMemBucket->bytes); } @@ -109,7 +109,7 @@ int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) { int32_t *pageId = taosArrayGet(list, 0); SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId); if (pPage == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } ASSERT(pPage->num == 1); @@ -276,7 +276,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval, return NULL; } - int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", tsTempDir); + int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 1024, "1", tsTempDir); if (ret != 0) { tMemBucketDestroy(pBucket); return NULL; @@ -388,7 +388,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) { pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId); if (pSlot->info.data == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } pSlot->info.pageId = pageId; taosArrayPush(pPageIdList, &pageId); @@ -482,8 +482,9 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction // data in buffer and file are merged together to be processed. SFilePage *buffer = loadDataFromFilePage(pMemBucket, i); if (buffer == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } + int32_t currentIdx = count - num; char *thisVal = buffer->data + pMemBucket->bytes * currentIdx; @@ -520,7 +521,7 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction int32_t *pageId = taosArrayGet(list, f); SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId); if (pg == NULL) { - return TSDB_CODE_NO_AVAIL_DISK; + return terrno; } int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num); diff --git a/source/libs/parser/src/parInsertStmt.c b/source/libs/parser/src/parInsertStmt.c index 1f437e8a8c..cfdbbbbc36 100644 --- a/source/libs/parser/src/parInsertStmt.c +++ b/source/libs/parser/src/parInsertStmt.c @@ -47,7 +47,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) { STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock; SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; - int32_t code = TSDB_CODE_SUCCESS; + int32_t code = TSDB_CODE_SUCCESS; SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags; if (NULL == tags) { return TSDB_CODE_APP_ERROR; @@ -137,7 +137,8 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch } SVCreateTbReq tbReq = {0}; - insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL); + insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags, + TSDB_DEFAULT_TABLE_TTL); code = insBuildCreateTbMsg(pDataBlock, &tbReq); tdDestroySVCreateTbReq(&tbReq); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 05d49bb027..9991c2c6ae 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -1550,11 +1550,14 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p // select percentile() without from clause is also valid if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) || (TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType && - TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) || - NULL != pSelect->pPartitionByList) { + TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType)))) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE, "%s is only supported in single table query", pFunc->functionName); } + if (NULL != pSelect->pPartitionByList) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC, + "%s function is not supported in partition query", pFunc->functionName); + } return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index a7eac2c853..d85e4ca10d 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -348,7 +348,8 @@ static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) { return false; } if (NULL != pAgg->pGroupKeys) { - return stbSplHasPartTbname(pAgg->pGroupKeys) && stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)); + return stbSplHasPartTbname(pAgg->pGroupKeys) && + stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)); } return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0))); } @@ -1025,21 +1026,29 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) return code; } -static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - SLogicNode* pSplitNode = pInfo->pSplitNode; +static int32_t stbSplGetSplitNodeForScan(SStableSplitInfo* pInfo, SLogicNode** pSplitNode) { + *pSplitNode = pInfo->pSplitNode; if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) && NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) { - pSplitNode = pInfo->pSplitNode->pParent; + *pSplitNode = pInfo->pSplitNode->pParent; if (NULL != pInfo->pSplitNode->pLimit) { - pSplitNode->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit); - if (NULL == pSplitNode->pLimit) { + (*pSplitNode)->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit); + if (NULL == (*pSplitNode)->pLimit) { return TSDB_CODE_OUT_OF_MEMORY; } ((SLimitNode*)pInfo->pSplitNode->pLimit)->limit += ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset; ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset = 0; } } - int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE); + return TSDB_CODE_SUCCESS; +} + +static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) { + SLogicNode* pSplitNode = NULL; + int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode); + if (TSDB_CODE_SUCCESS == code) { + code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE); + } if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); @@ -1049,12 +1058,11 @@ static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSp } static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) { - SLogicNode* pSplitNode = pInfo->pSplitNode; - if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) && - NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) { - pSplitNode = pInfo->pSplitNode->pParent; + SLogicNode* pSplitNode = NULL; + int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode); + if (TSDB_CODE_SUCCESS == code) { + code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true); } - int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true); if (TSDB_CODE_SUCCESS == code) { code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT)); diff --git a/source/util/src/tarray.c b/source/util/src/tarray.c index 4bd8294423..237edfdeb2 100644 --- a/source/util/src/tarray.c +++ b/source/util/src/tarray.c @@ -20,7 +20,10 @@ // todo refactor API SArray* taosArrayInit(size_t size, size_t elemSize) { - assert(elemSize > 0); + if (elemSize == 0) { + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } if (size < TARRAY_MIN_SIZE) { size = TARRAY_MIN_SIZE; @@ -96,8 +99,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles) { } void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) { - assert(pArray); - size_t size = pArray->size; if (size <= 1) { return; @@ -136,8 +137,6 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp) } void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) { - assert(pArray); - size_t size = pArray->size; if (size <= 1) { return; @@ -195,11 +194,10 @@ void* taosArrayReserve(SArray* pArray, int32_t num) { } void* taosArrayPop(SArray* pArray) { - assert(pArray != NULL); - if (pArray->size == 0) { return NULL; } + pArray->size -= 1; return TARRAY_GET_ELEM(pArray, pArray->size); } @@ -208,16 +206,21 @@ void* taosArrayGet(const SArray* pArray, size_t index) { if (NULL == pArray) { return NULL; } - assert(index < pArray->size); + + if (index >= pArray->size) { + uError("index is out of range, current:%"PRIzu" max:%d", index, pArray->capacity); + return NULL; + } + return TARRAY_GET_ELEM(pArray, index); } void* taosArrayGetP(const SArray* pArray, size_t index) { - assert(index < pArray->size); - - void* d = TARRAY_GET_ELEM(pArray, index); - - return *(void**)d; + void** p = taosArrayGet(pArray, index); + if (p == NULL) { + return NULL; + } + return *p; } void* taosArrayGetLast(const SArray* pArray) { return TARRAY_GET_ELEM(pArray, pArray->size - 1); } @@ -296,9 +299,12 @@ void taosArrayRemove(SArray* pArray, size_t index) { } SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) { - assert(src != NULL && elemSize > 0); - SArray* pDst = taosArrayInit(size, elemSize); + if (elemSize <= 0) { + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } + SArray* pDst = taosArrayInit(size, elemSize); memcpy(pDst->pData, src, elemSize * size); pDst->size = size; @@ -306,8 +312,6 @@ SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) { } SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) { - assert(pSrc != NULL); - if (pSrc->size == 0) { // empty array list return taosArrayInit(8, pSrc->elemSize); } @@ -399,14 +403,10 @@ void taosArrayDestroyEx(SArray* pArray, FDelete fp) { } void taosArraySort(SArray* pArray, __compar_fn_t compar) { - ASSERT(pArray != NULL && compar != NULL); taosSort(pArray->pData, pArray->size, pArray->elemSize, compar); } void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags) { - assert(pArray != NULL && comparFn != NULL); - assert(key != NULL); - return taosbsearch(key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags); } diff --git a/source/util/src/tcache.c b/source/util/src/tcache.c index 7d1686ef80..761da6986b 100644 --- a/source/util/src/tcache.c +++ b/source/util/src/tcache.c @@ -921,7 +921,7 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1) void taosStopCacheRefreshWorker(void) { stopRefreshWorker = true; TdThreadOnce tmp = PTHREAD_ONCE_INIT; - if (memcmp(&cacheRefreshWorker, &tmp, sizeof(TdThreadOnce)) != 0) taosThreadJoin(cacheRefreshWorker, NULL); + if (memcmp(&cacheThreadInit, &tmp, sizeof(TdThreadOnce)) != 0) taosThreadJoin(cacheRefreshWorker, NULL); taosArrayDestroy(pCacheArrayList); } diff --git a/source/util/src/tpagedbuf.c b/source/util/src/tpagedbuf.c index 87b44b2d13..7c60862c56 100644 --- a/source/util/src/tpagedbuf.c +++ b/source/util/src/tpagedbuf.c @@ -5,7 +5,10 @@ #include "thash.h" #include "tlog.h" -#define GET_DATA_PAYLOAD(_p) ((char*)(_p)->pData + POINTER_BYTES) +#define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES) +#define BUF_PAGE_IN_MEM(_p) ((_p)->pData != NULL) +#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL) +#define HAS_DATA_IN_DISK(_p) ((_p)->offset >= 0) #define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages) typedef struct SPageDiskInfo { @@ -14,7 +17,7 @@ typedef struct SPageDiskInfo { } SPageDiskInfo, SFreeListItem; struct SPageInfo { - SListNode* pn; // point to list node struct + SListNode* pn; // point to list node struct. it is NULL when the page is evicted from the in-memory buffer void* pData; int64_t offset; int32_t pageId; @@ -89,7 +92,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskba return data; } -static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { +static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) { if (pBuf->pFree == NULL) { return pBuf->nextPos; } else { @@ -112,10 +115,6 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) { } } -static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; } - -static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); } - /** * +--------------------------+-------------------+--------------+ * | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes| @@ -124,23 +123,31 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize * @param pg * @return */ -static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { - ASSERT(!pg->used && pg->pData != NULL); + +static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); } + +static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { + if (pg->pData == NULL || pg->used) { + uError("invalid params in paged buffer process when flushing buf to disk, %s", pBuf->id); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } int32_t size = pBuf->pageSize; char* t = NULL; - if (pg->offset == -1 || pg->dirty) { - void* payload = GET_DATA_PAYLOAD(pg); + if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) { + void* payload = GET_PAYLOAD_DATA(pg); t = doCompressData(payload, pBuf->pageSize, &size, pBuf); - ASSERTS(size >= 0, "size is negative"); + if (size < 0) { + uError("failed to compress data when flushing data to disk, %s", pBuf->id); + return NULL; + } } // this page is flushed to disk for the first time if (pg->dirty) { - if (pg->offset == -1) { - ASSERTS(pg->dirty == true, "pg->dirty is false"); - - pg->offset = allocatePositionInFile(pBuf, size); + if (!HAS_DATA_IN_DISK(pg)) { + pg->offset = allocateNewPositionInFile(pBuf, size); pBuf->nextPos += size; int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET); @@ -155,6 +162,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return NULL; } + // extend the file size if (pBuf->fileSize < pg->offset + size) { pBuf->fileSize = pg->offset + size; } @@ -169,7 +177,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { taosArrayPush(pBuf->pFree, &dinfo); // 2. allocate new position, and update the info - pg->offset = allocatePositionInFile(pBuf, size); + pg->offset = allocateNewPositionInFile(pBuf, size); pBuf->nextPos += size; } @@ -197,20 +205,19 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { size = pg->length; } - ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1)); - char* pDataBuf = pg->pData; memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize)); + #ifdef BUF_PAGE_DEBUG uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset); #endif + pg->length = size; // on disk size return pDataBuf; } -static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { +static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) { int32_t ret = TSDB_CODE_SUCCESS; - ASSERT(((int64_t)pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages); if (pBuf->pFile == NULL) { if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) { @@ -219,22 +226,27 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { } } - char* p = doFlushPageToDisk(pBuf, pg); - setPageNotInBuf(pg); - pg->dirty = false; + char* p = doFlushBufPage(pBuf, pg); + CLEAR_BUF_PAGE_IN_MEM_FLAG(pg); + pg->dirty = false; return p; } // load file block data in disk static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { + if (pg->offset < 0 || pg->length <= 0) { + uError("failed to load buf page from disk, offset:%"PRId64", length:%d, %s", pg->offset, pg->length, pBuf->id); + return TSDB_CODE_INVALID_PARA; + } + int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET); if (ret == -1) { ret = TAOS_SYSTEM_ERROR(errno); return ret; } - void* pPage = (void*)GET_DATA_PAYLOAD(pg); + void* pPage = (void*)GET_PAYLOAD_DATA(pg); ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length); if (ret != pg->length) { ret = TAOS_SYSTEM_ERROR(errno); @@ -249,10 +261,14 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) { return 0; } -static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) { +static SPageInfo* registerNewPageInfo(SDiskbasedBuf* pBuf, int32_t pageId) { pBuf->numOfPages += 1; SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo)); + if (ppi == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } ppi->pageId = pageId; ppi->pData = NULL; @@ -272,46 +288,33 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) { SListNode* pn = NULL; while ((pn = tdListNext(&iter)) != NULL) { SPageInfo* pageInfo = *(SPageInfo**)pn->data; - ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn); + + SPageInfo* p = *(SPageInfo**)(pageInfo->pData); + ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn && p == pageInfo); if (!pageInfo->used) { - // printf("%d is chosen\n", pageInfo->pageId); break; - } else { - // printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty); } } return pn; } -static char* evacOneDataPage(SDiskbasedBuf* pBuf) { - char* bufPage = NULL; +static char* evictBufPage(SDiskbasedBuf* pBuf) { SListNode* pn = getEldestUnrefedPage(pBuf); - terrno = 0; - - // all pages are referenced by user, try to allocate new space - if (pn == NULL) { - int32_t prev = pBuf->inMemPages; - - // increase by 50% of previous mem pages - pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f); - - // qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev, - // pBuf->inMemPages, pBuf->pageSize); - } else { - tdListPopNode(pBuf->lruList, pn); - - SPageInfo* d = *(SPageInfo**)pn->data; - ASSERTS(d->pn == pn, "d->pn not equal pn"); - - d->pn = NULL; - taosMemoryFreeClear(pn); - - bufPage = flushPageToDisk(pBuf, d); + if (pn == NULL) { // no available buffer pages now, return. + return NULL; } - return bufPage; + terrno = 0; + tdListPopNode(pBuf->lruList, pn); + + SPageInfo* d = *(SPageInfo**)pn->data; + + d->pn = NULL; + taosMemoryFreeClear(pn); + + return flushBufPage(pBuf, d); } static void lruListPushFront(SList* pList, SPageInfo* pi) { @@ -338,13 +341,12 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem SDiskbasedBuf* pPBuf = *pBuf; if (pPBuf == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + goto _error; } pPBuf->pageSize = pagesize; pPBuf->numOfPages = 0; // all pages are in buffer in the first place pPBuf->totalBufSize = 0; - pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit. pPBuf->allocateId = -1; pPBuf->pFile = NULL; pPBuf->id = strdup(id); @@ -353,33 +355,69 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem pPBuf->freePgList = tdListNew(POINTER_BYTES); // at least more than 2 pages must be in memory - ASSERT(inMemBufSize >= pagesize * 2); + if (inMemBufSize < pagesize * 2) { + inMemBufSize = pagesize * 2; + } + pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit. pPBuf->lruList = tdListNew(POINTER_BYTES); + if (pPBuf->lruList == NULL) { + goto _error; + } // init id hash table _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT); pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES); + if (pPBuf->pIdList == NULL) { + goto _error; + } pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES - pPBuf->all = taosHashInit(10, fn, true, false); - pPBuf->prefix = (char*) dir; + if (pPBuf->assistBuf == NULL) { + goto _error; + } + pPBuf->all = taosHashInit(10, fn, true, false); + if (pPBuf->all == NULL) { + goto _error; + } + + pPBuf->prefix = (char*) dir; pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t)); // qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId, - // pPBuf->pageSize, - // pPBuf->inMemPages, pPBuf->path); + // pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path); return TSDB_CODE_SUCCESS; + _error: + destroyDiskbasedBuf(pPBuf); + return TSDB_CODE_OUT_OF_MEMORY; +} + +static char* doExtractPage(SDiskbasedBuf* pBuf) { + char* availablePage = NULL; + if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { + availablePage = evictBufPage(pBuf); + if (availablePage == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + uWarn("no available buf pages, current:%d, max:%d", listNEles(pBuf->lruList), pBuf->inMemPages) + } + } else { + availablePage = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased. + if (availablePage == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + } + } + + return availablePage; } void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { pBuf->statis.getPages += 1; - char* availablePage = NULL; - if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { - availablePage = evacOneDataPage(pBuf); + char* availablePage = doExtractPage(pBuf); + if (availablePage == NULL) { + return NULL; } SPageInfo* pi = NULL; @@ -394,7 +432,10 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { *pageId = (++pBuf->allocateId); // register page id info - pi = registerPage(pBuf, *pageId); + pi = registerNewPageInfo(pBuf, *pageId); + if (pi == NULL) { + return NULL; + } // add to hash map taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES); @@ -402,63 +443,62 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) { } // add to LRU list - ASSERT(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0); lruListPushFront(pBuf->lruList, pi); - - // allocate buf - if (availablePage == NULL) { - pi->pData = - taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased. - } else { - pi->pData = availablePage; - } + pi->pData = availablePage; ((void**)pi->pData)[0] = pi; #ifdef BUF_PAGE_DEBUG uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset); #endif - return (void*)(GET_DATA_PAYLOAD(pi)); + + return (void*)(GET_PAYLOAD_DATA(pi)); } void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { - ASSERT(pBuf != NULL && id >= 0); + if (id < 0) { + terrno = TSDB_CODE_INVALID_PARA; + uError("invalid page id:%d, %s", id, pBuf->id); + return NULL; + } + pBuf->statis.getPages += 1; SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t)); - ASSERT(pi != NULL && *pi != NULL); + if (pi == NULL || *pi == NULL) { + uError("failed to locate the buffer page:%d, %s", id, pBuf->id); + terrno = TSDB_CODE_INVALID_PARA; + return NULL; + } - if ((*pi)->pData != NULL) { // it is in memory + if (BUF_PAGE_IN_MEM(*pi)) { // it is in memory // no need to update the LRU list if only one page exists if (pBuf->numOfPages == 1) { (*pi)->used = true; - return (void*)(GET_DATA_PAYLOAD(*pi)); + return (void*)(GET_PAYLOAD_DATA(*pi)); } SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data); - ASSERT(*pInfo == *pi); + if (*pInfo != *pi) { + uError("inconsistently data in paged buffer, pInfo:%p, pi:%p, %s", *pInfo, *pi, pBuf->id); + return NULL; + } lruListMoveToFront(pBuf->lruList, (*pi)); (*pi)->used = true; + #ifdef BUF_PAGE_DEBUG uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset); #endif - return (void*)(GET_DATA_PAYLOAD(*pi)); + return (void*)(GET_PAYLOAD_DATA(*pi)); } else { // not in memory - ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL && + ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL && (((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1))); - char* availablePage = NULL; - if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) { - availablePage = evacOneDataPage(pBuf); - if (availablePage == NULL) { - return NULL; - } - } + (*pi)->pData = doExtractPage(pBuf); - if (availablePage == NULL) { - (*pi)->pData = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); - } else { - (*pi)->pData = availablePage; + // failed to evict buffer page, return with error code. + if ((*pi)->pData == NULL) { + return NULL; } // set the ptr to the new SPageInfo @@ -468,23 +508,25 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) { (*pi)->used = true; // some data has been flushed to disk, and needs to be loaded into buffer again. - if ((*pi)->length > 0 && (*pi)->offset >= 0) { + if (HAS_DATA_IN_DISK(*pi)) { int32_t code = loadPageFromDisk(pBuf, *pi); if (code != 0) { + terrno = code; return NULL; } } #ifdef BUF_PAGE_DEBUG uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset); #endif - return (void*)(GET_DATA_PAYLOAD(*pi)); + return (void*)(GET_PAYLOAD_DATA(*pi)); } } void releaseBufPage(SDiskbasedBuf* pBuf, void* page) { - if (ASSERTS(pBuf != NULL && page != NULL, "pBuf or page is NULL")) { + if (page == NULL) { return; } + SPageInfo* ppi = getPageInfoFromPayload(page); releaseBufPageInfo(pBuf, ppi); } @@ -493,7 +535,13 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { #ifdef BUF_PAGE_DEBUG uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset); #endif - if (ASSERTS(pi->pData != NULL, "pi->pData is NULL")) { + + if (pi == NULL) { + return; + } + + if (pi->pData == NULL) { + uError("pi->pData (page data) is null"); return; } @@ -504,7 +552,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) { size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; } SArray* getDataBufPagesIdList(SDiskbasedBuf* pBuf) { - ASSERT(pBuf != NULL); return pBuf->pIdList; } @@ -582,7 +629,6 @@ SPageInfo* getLastPageInfo(SArray* pList) { } int32_t getPageId(const SPageInfo* pPgInfo) { - ASSERT(pPgInfo != NULL); return pPgInfo->pageId; }