Merge remote-tracking branch 'origin/fix/main_bugfix_wxy' into fix/liaohj

This commit is contained in:
Xiaoyu Wang 2023-01-28 10:36:23 +08:00
commit 6d1621c221
15 changed files with 250 additions and 159 deletions

View File

@ -2,7 +2,7 @@
# taos-tools
ExternalProject_Add(taos-tools
GIT_REPOSITORY https://github.com/taosdata/taos-tools.git
GIT_TAG 5c53cc8
GIT_TAG 7d24ed5
SOURCE_DIR "${TD_SOURCE_DIR}/tools/taos-tools"
BINARY_DIR ""
#BUILD_IN_SOURCE TRUE

View File

@ -108,7 +108,8 @@ int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char*
void* smlInitHandle(SQuery* pQuery);
void smlDestroyHandle(void* pHandle);
int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols, bool format, STableMeta* pTableMeta,
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf, int16_t msgBufLen);
char* tableName, const char* sTableName, int32_t sTableNameLen, int32_t ttl, char* msgBuf,
int16_t msgBufLen);
int32_t smlBuildOutput(void* handle, SHashObj* pVgHash);
int32_t rewriteToVnodeModifyOpStmt(SQuery* pQuery, SArray* pBufArray);

View File

@ -2,7 +2,7 @@
#
# Generate the deb package for ubuntu, or rpm package for centos, or tar.gz package for other linux os
# set -e
set -e
# set -x
# release.sh -v [cluster | edge]

View File

@ -152,9 +152,10 @@ int32_t stmtRestoreQueryFields(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, const char* sTableName, bool autoCreateTbl) {
int32_t stmtUpdateBindInfo(TAOS_STMT* stmt, STableMeta* pTableMeta, void* tags, SName* tbName, const char* sTableName,
bool autoCreateTbl) {
STscStmt* pStmt = (STscStmt*)stmt;
char tbFName[TSDB_TABLE_FNAME_LEN];
char tbFName[TSDB_TABLE_FNAME_LEN];
tNameExtractFullName(tbName, tbFName);
memcpy(&pStmt->bInfo.sname, tbName, sizeof(*tbName));
@ -772,9 +773,9 @@ int stmtAddBatch(TAOS_STMT* stmt) {
int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
tscDebug("stmt start to update tbUid, blockNum: %d", pRsp->nBlocks);
int32_t code = 0;
int32_t finalCode = 0;
size_t keyLen = 0;
int32_t code = 0;
int32_t finalCode = 0;
size_t keyLen = 0;
STableDataBlocks** pIter = taosHashIterate(pStmt->exec.pBlockHash, NULL);
while (pIter) {
STableDataBlocks* pBlock = *pIter;
@ -844,7 +845,7 @@ int stmtUpdateTableUid(STscStmt* pStmt, SSubmitRsp* pRsp) {
pMeta->uid = pTableMeta->uid;
pStmt->bInfo.tbUid = pTableMeta->uid;
taosMemoryFree(pTableMeta);
taosMemoryFree(pTableMeta);
}
pIter = taosHashIterate(pStmt->exec.pBlockHash, pIter);

View File

@ -1758,11 +1758,14 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo*
}
if (minKey == k.ts) {
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
if (pSchema == NULL) {
return terrno;
}
if (init) {
tRowMerge(&merge, pRow);
tRowMergerAdd(&merge, pRow, pSchema);
} else {
init = true;
STSchema* pSchema = doGetSchemaForTSRow(TSDBROW_SVERSION(pRow), pReader, pBlockScanInfo->uid);
int32_t code = tRowMergerInit(&merge, pRow, pSchema);
if (code != TSDB_CODE_SUCCESS) {
return code;

View File

@ -731,6 +731,7 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
tsdbRowGetColVal(pRow, pTSchema, jCol++, pColVal);
if (key.version > pMerger->version) {
#if 0
if (!COL_VAL_IS_NONE(pColVal)) {
if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *tColVal = taosArrayGet(pMerger->pArray, iCol);
@ -746,6 +747,28 @@ int32_t tRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
}
#endif
if (!COL_VAL_IS_NONE(pColVal)) {
if (IS_VAR_DATA_TYPE(pColVal->type)) {
SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol);
if (!COL_VAL_IS_NULL(pColVal)) {
code = tRealloc(&pTColVal->value.pData, pColVal->value.nData);
if (code) return code;
pTColVal->value.nData = pColVal->value.nData;
if (pTColVal->value.nData) {
memcpy(pTColVal->value.pData, pColVal->value.pData, pTColVal->value.nData);
}
pTColVal->flag = 0;
} else {
tFree(pTColVal->value.pData);
pTColVal->value.pData = NULL;
taosArraySet(pMerger->pArray, iCol, pColVal);
}
} else {
taosArraySet(pMerger->pArray, iCol, pColVal);
}
}
} else if (key.version < pMerger->version) {
SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol);
if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) {

View File

@ -593,8 +593,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, &pageId);
taosArrayPush(p->pPageList, &pageId);
if (pPage == NULL) {
return pPage;
}
taosArrayPush(p->pPageList, &pageId);
*(int32_t*)pPage = 0;
} else {
int32_t* curId = taosArrayGetLast(p->pPageList);
@ -612,6 +615,11 @@ void* getCurrentDataGroupInfo(const SPartitionOperatorInfo* pInfo, SDataGroupInf
// add a new page for current group
int32_t pageId = 0;
pPage = getNewBufPage(pInfo->pBuf, &pageId);
if (pPage == NULL) {
qError("failed to get new buffer, code:%s", tstrerror(terrno));
return NULL;
}
taosArrayPush(p->pPageList, &pageId);
memset(pPage, 0, getBufPageSize(pInfo->pBuf));
}

View File

@ -3061,14 +3061,12 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
if (pHandle->currentPage == -1) {
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
pPage->num = sizeof(SFilePage);
} else {
pPage = getBufPage(pHandle->pBuf, pHandle->currentPage);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
if (pPage->num + length > getBufPageSize(pHandle->pBuf)) {
@ -3076,7 +3074,6 @@ static int32_t doSaveTupleData(SSerializeDataHandle* pHandle, const void* pBuf,
releaseBufPage(pHandle->pBuf, pPage);
pPage = getNewBufPage(pHandle->pBuf, &pHandle->currentPage);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
pPage->num = sizeof(SFilePage);
@ -3123,7 +3120,6 @@ static int32_t doUpdateTupleData(SSerializeDataHandle* pHandle, const void* pBuf
if (pHandle->pBuf != NULL) {
SFilePage* pPage = getBufPage(pHandle->pBuf, pPos->pageId);
if (pPage == NULL) {
terrno = TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
memcpy(pPage->data + pPos->offset, pBuf, length);

View File

@ -43,8 +43,8 @@ static SFilePage *loadDataFromFilePage(tMemBucket *pMemBucket, int32_t slotIdx)
if (pg == NULL) {
return NULL;
}
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
memcpy(buffer->data + offset, pg->data, (size_t)(pg->num * pMemBucket->bytes));
offset += (int32_t)(pg->num * pMemBucket->bytes);
}
@ -109,7 +109,7 @@ int32_t findOnlyResult(tMemBucket *pMemBucket, double *result) {
int32_t *pageId = taosArrayGet(list, 0);
SFilePage *pPage = getBufPage(pMemBucket->pBuffer, *pageId);
if (pPage == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
ASSERT(pPage->num == 1);
@ -276,7 +276,7 @@ tMemBucket *tMemBucketCreate(int16_t nElemSize, int16_t dataType, double minval,
return NULL;
}
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 512, "1", tsTempDir);
int32_t ret = createDiskbasedBuf(&pBucket->pBuffer, pBucket->bufPageSize, pBucket->bufPageSize * 1024, "1", tsTempDir);
if (ret != 0) {
tMemBucketDestroy(pBucket);
return NULL;
@ -388,7 +388,7 @@ int32_t tMemBucketPut(tMemBucket *pBucket, const void *data, size_t size) {
pSlot->info.data = getNewBufPage(pBucket->pBuffer, &pageId);
if (pSlot->info.data == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
pSlot->info.pageId = pageId;
taosArrayPush(pPageIdList, &pageId);
@ -482,8 +482,9 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
// data in buffer and file are merged together to be processed.
SFilePage *buffer = loadDataFromFilePage(pMemBucket, i);
if (buffer == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
int32_t currentIdx = count - num;
char *thisVal = buffer->data + pMemBucket->bytes * currentIdx;
@ -520,7 +521,7 @@ int32_t getPercentileImpl(tMemBucket *pMemBucket, int32_t count, double fraction
int32_t *pageId = taosArrayGet(list, f);
SFilePage *pg = getBufPage(pMemBucket->pBuffer, *pageId);
if (pg == NULL) {
return TSDB_CODE_NO_AVAIL_DISK;
return terrno;
}
int32_t code = tMemBucketPut(pMemBucket, pg->data, (int32_t)pg->num);

View File

@ -47,7 +47,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen) {
STableDataBlocks* pDataBlock = (STableDataBlocks*)pBlock;
SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen};
int32_t code = TSDB_CODE_SUCCESS;
int32_t code = TSDB_CODE_SUCCESS;
SParsedDataColInfo* tags = (SParsedDataColInfo*)boundTags;
if (NULL == tags) {
return TSDB_CODE_APP_ERROR;
@ -137,7 +137,8 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch
}
SVCreateTbReq tbReq = {0};
insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags, TSDB_DEFAULT_TABLE_TTL);
insBuildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags,
TSDB_DEFAULT_TABLE_TTL);
code = insBuildCreateTbMsg(pDataBlock, &tbReq);
tdDestroySVCreateTbReq(&tbReq);

View File

@ -1550,11 +1550,14 @@ static int32_t translateRepeatScanFunc(STranslateContext* pCxt, SFunctionNode* p
// select percentile() without from clause is also valid
if ((NULL != pTable && (QUERY_NODE_REAL_TABLE != nodeType(pTable) ||
(TSDB_CHILD_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType &&
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType))) ||
NULL != pSelect->pPartitionByList) {
TSDB_NORMAL_TABLE != ((SRealTableNode*)pTable)->pMeta->tableType)))) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_ONLY_SUPPORT_SINGLE_TABLE,
"%s is only supported in single table query", pFunc->functionName);
}
if (NULL != pSelect->pPartitionByList) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_NOT_ALLOWED_FUNC,
"%s function is not supported in partition query", pFunc->functionName);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -348,7 +348,8 @@ static bool stbSplIsPartTableAgg(SAggLogicNode* pAgg) {
return false;
}
if (NULL != pAgg->pGroupKeys) {
return stbSplHasPartTbname(pAgg->pGroupKeys) && stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
return stbSplHasPartTbname(pAgg->pGroupKeys) &&
stbSplNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
}
return stbSplHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
}
@ -1025,21 +1026,29 @@ static int32_t stbSplSplitSortNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return code;
}
static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pSplitNode = pInfo->pSplitNode;
static int32_t stbSplGetSplitNodeForScan(SStableSplitInfo* pInfo, SLogicNode** pSplitNode) {
*pSplitNode = pInfo->pSplitNode;
if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
pSplitNode = pInfo->pSplitNode->pParent;
*pSplitNode = pInfo->pSplitNode->pParent;
if (NULL != pInfo->pSplitNode->pLimit) {
pSplitNode->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit);
if (NULL == pSplitNode->pLimit) {
(*pSplitNode)->pLimit = nodesCloneNode(pInfo->pSplitNode->pLimit);
if (NULL == (*pSplitNode)->pLimit) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SLimitNode*)pInfo->pSplitNode->pLimit)->limit += ((SLimitNode*)pInfo->pSplitNode->pLimit)->offset;
((SLimitNode*)pInfo->pSplitNode->pLimit)->offset = 0;
}
}
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE);
return TSDB_CODE_SUCCESS;
}
static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pSplitNode = NULL;
int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
if (TSDB_CODE_SUCCESS == code) {
code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pSplitNode, SUBPLAN_TYPE_MERGE);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));
@ -1049,12 +1058,11 @@ static int32_t stbSplSplitScanNodeWithoutPartTags(SSplitContext* pCxt, SStableSp
}
static int32_t stbSplSplitScanNodeWithPartTags(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pSplitNode = pInfo->pSplitNode;
if (NULL != pInfo->pSplitNode->pParent && QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pInfo->pSplitNode->pParent) &&
NULL == pInfo->pSplitNode->pParent->pLimit && NULL == pInfo->pSplitNode->pParent->pSlimit) {
pSplitNode = pInfo->pSplitNode->pParent;
SLogicNode* pSplitNode = NULL;
int32_t code = stbSplGetSplitNodeForScan(pInfo, &pSplitNode);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true);
}
int32_t code = stbSplCreateMergeNode(pCxt, pInfo->pSubplan, pSplitNode, NULL, pSplitNode, true);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, pSplitNode, SPLIT_FLAG_STABLE_SPLIT));

View File

@ -20,7 +20,10 @@
// todo refactor API
SArray* taosArrayInit(size_t size, size_t elemSize) {
assert(elemSize > 0);
if (elemSize == 0) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
if (size < TARRAY_MIN_SIZE) {
size = TARRAY_MIN_SIZE;
@ -96,8 +99,6 @@ void* taosArrayAddBatch(SArray* pArray, const void* pData, int32_t nEles) {
}
void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) {
assert(pArray);
size_t size = pArray->size;
if (size <= 1) {
return;
@ -136,8 +137,6 @@ void taosArrayRemoveDuplicate(SArray* pArray, __compar_fn_t comparFn, void (*fp)
}
void taosArrayRemoveDuplicateP(SArray* pArray, __compar_fn_t comparFn, void (*fp)(void*)) {
assert(pArray);
size_t size = pArray->size;
if (size <= 1) {
return;
@ -195,11 +194,10 @@ void* taosArrayReserve(SArray* pArray, int32_t num) {
}
void* taosArrayPop(SArray* pArray) {
assert(pArray != NULL);
if (pArray->size == 0) {
return NULL;
}
pArray->size -= 1;
return TARRAY_GET_ELEM(pArray, pArray->size);
}
@ -208,16 +206,21 @@ void* taosArrayGet(const SArray* pArray, size_t index) {
if (NULL == pArray) {
return NULL;
}
assert(index < pArray->size);
if (index >= pArray->size) {
uError("index is out of range, current:%"PRIzu" max:%d", index, pArray->capacity);
return NULL;
}
return TARRAY_GET_ELEM(pArray, index);
}
void* taosArrayGetP(const SArray* pArray, size_t index) {
assert(index < pArray->size);
void* d = TARRAY_GET_ELEM(pArray, index);
return *(void**)d;
void** p = taosArrayGet(pArray, index);
if (p == NULL) {
return NULL;
}
return *p;
}
void* taosArrayGetLast(const SArray* pArray) { return TARRAY_GET_ELEM(pArray, pArray->size - 1); }
@ -296,9 +299,12 @@ void taosArrayRemove(SArray* pArray, size_t index) {
}
SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) {
assert(src != NULL && elemSize > 0);
SArray* pDst = taosArrayInit(size, elemSize);
if (elemSize <= 0) {
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
SArray* pDst = taosArrayInit(size, elemSize);
memcpy(pDst->pData, src, elemSize * size);
pDst->size = size;
@ -306,8 +312,6 @@ SArray* taosArrayFromList(const void* src, size_t size, size_t elemSize) {
}
SArray* taosArrayDup(const SArray* pSrc, __array_item_dup_fn_t fn) {
assert(pSrc != NULL);
if (pSrc->size == 0) { // empty array list
return taosArrayInit(8, pSrc->elemSize);
}
@ -399,14 +403,10 @@ void taosArrayDestroyEx(SArray* pArray, FDelete fp) {
}
void taosArraySort(SArray* pArray, __compar_fn_t compar) {
ASSERT(pArray != NULL && compar != NULL);
taosSort(pArray->pData, pArray->size, pArray->elemSize, compar);
}
void* taosArraySearch(const SArray* pArray, const void* key, __compar_fn_t comparFn, int32_t flags) {
assert(pArray != NULL && comparFn != NULL);
assert(key != NULL);
return taosbsearch(key, pArray->pData, pArray->size, pArray->elemSize, comparFn, flags);
}

View File

@ -921,7 +921,7 @@ void taosCacheRefresh(SCacheObj *pCacheObj, __cache_trav_fn_t fp, void *param1)
void taosStopCacheRefreshWorker(void) {
stopRefreshWorker = true;
TdThreadOnce tmp = PTHREAD_ONCE_INIT;
if (memcmp(&cacheRefreshWorker, &tmp, sizeof(TdThreadOnce)) != 0) taosThreadJoin(cacheRefreshWorker, NULL);
if (memcmp(&cacheThreadInit, &tmp, sizeof(TdThreadOnce)) != 0) taosThreadJoin(cacheRefreshWorker, NULL);
taosArrayDestroy(pCacheArrayList);
}

View File

@ -5,7 +5,10 @@
#include "thash.h"
#include "tlog.h"
#define GET_DATA_PAYLOAD(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define GET_PAYLOAD_DATA(_p) ((char*)(_p)->pData + POINTER_BYTES)
#define BUF_PAGE_IN_MEM(_p) ((_p)->pData != NULL)
#define CLEAR_BUF_PAGE_IN_MEM_FLAG(_p) ((_p)->pData = NULL)
#define HAS_DATA_IN_DISK(_p) ((_p)->offset >= 0)
#define NO_IN_MEM_AVAILABLE_PAGES(_b) (listNEles((_b)->lruList) >= (_b)->inMemPages)
typedef struct SPageDiskInfo {
@ -14,7 +17,7 @@ typedef struct SPageDiskInfo {
} SPageDiskInfo, SFreeListItem;
struct SPageInfo {
SListNode* pn; // point to list node struct
SListNode* pn; // point to list node struct. it is NULL when the page is evicted from the in-memory buffer
void* pData;
int64_t offset;
int32_t pageId;
@ -89,7 +92,7 @@ static char* doDecompressData(void* data, int32_t srcSize, int32_t* dst, SDiskba
return data;
}
static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
static uint64_t allocateNewPositionInFile(SDiskbasedBuf* pBuf, size_t size) {
if (pBuf->pFree == NULL) {
return pBuf->nextPos;
} else {
@ -112,10 +115,6 @@ static uint64_t allocatePositionInFile(SDiskbasedBuf* pBuf, size_t size) {
}
}
static void setPageNotInBuf(SPageInfo* pPageInfo) { pPageInfo->pData = NULL; }
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }
/**
* +--------------------------+-------------------+--------------+
* | PTR to SPageInfo (8bytes)| Payload (PageSize)| 2 Extra Bytes|
@ -124,23 +123,31 @@ static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize
* @param pg
* @return
*/
static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
ASSERT(!pg->used && pg->pData != NULL);
static FORCE_INLINE size_t getAllocPageSize(int32_t pageSize) { return pageSize + POINTER_BYTES + sizeof(SFilePage); }
static char* doFlushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
if (pg->pData == NULL || pg->used) {
uError("invalid params in paged buffer process when flushing buf to disk, %s", pBuf->id);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
int32_t size = pBuf->pageSize;
char* t = NULL;
if (pg->offset == -1 || pg->dirty) {
void* payload = GET_DATA_PAYLOAD(pg);
if ((!HAS_DATA_IN_DISK(pg)) || pg->dirty) {
void* payload = GET_PAYLOAD_DATA(pg);
t = doCompressData(payload, pBuf->pageSize, &size, pBuf);
ASSERTS(size >= 0, "size is negative");
if (size < 0) {
uError("failed to compress data when flushing data to disk, %s", pBuf->id);
return NULL;
}
}
// this page is flushed to disk for the first time
if (pg->dirty) {
if (pg->offset == -1) {
ASSERTS(pg->dirty == true, "pg->dirty is false");
pg->offset = allocatePositionInFile(pBuf, size);
if (!HAS_DATA_IN_DISK(pg)) {
pg->offset = allocateNewPositionInFile(pBuf, size);
pBuf->nextPos += size;
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
@ -155,6 +162,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return NULL;
}
// extend the file size
if (pBuf->fileSize < pg->offset + size) {
pBuf->fileSize = pg->offset + size;
}
@ -169,7 +177,7 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
taosArrayPush(pBuf->pFree, &dinfo);
// 2. allocate new position, and update the info
pg->offset = allocatePositionInFile(pBuf, size);
pg->offset = allocateNewPositionInFile(pBuf, size);
pBuf->nextPos += size;
}
@ -197,20 +205,19 @@ static char* doFlushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
size = pg->length;
}
ASSERT(size > 0 || (pg->offset == -1 && pg->length == -1));
char* pDataBuf = pg->pData;
memset(pDataBuf, 0, getAllocPageSize(pBuf->pageSize));
#ifdef BUF_PAGE_DEBUG
uDebug("page_flush %p, pageId:%d, offset:%d", pDataBuf, pg->pageId, pg->offset);
#endif
pg->length = size; // on disk size
return pDataBuf;
}
static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
static char* flushBufPage(SDiskbasedBuf* pBuf, SPageInfo* pg) {
int32_t ret = TSDB_CODE_SUCCESS;
ASSERT(((int64_t)pBuf->numOfPages * pBuf->pageSize) == pBuf->totalBufSize && pBuf->numOfPages >= pBuf->inMemPages);
if (pBuf->pFile == NULL) {
if ((ret = createDiskFile(pBuf)) != TSDB_CODE_SUCCESS) {
@ -219,22 +226,27 @@ static char* flushPageToDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
}
}
char* p = doFlushPageToDisk(pBuf, pg);
setPageNotInBuf(pg);
pg->dirty = false;
char* p = doFlushBufPage(pBuf, pg);
CLEAR_BUF_PAGE_IN_MEM_FLAG(pg);
pg->dirty = false;
return p;
}
// load file block data in disk
static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
if (pg->offset < 0 || pg->length <= 0) {
uError("failed to load buf page from disk, offset:%"PRId64", length:%d, %s", pg->offset, pg->length, pBuf->id);
return TSDB_CODE_INVALID_PARA;
}
int32_t ret = taosLSeekFile(pBuf->pFile, pg->offset, SEEK_SET);
if (ret == -1) {
ret = TAOS_SYSTEM_ERROR(errno);
return ret;
}
void* pPage = (void*)GET_DATA_PAYLOAD(pg);
void* pPage = (void*)GET_PAYLOAD_DATA(pg);
ret = (int32_t)taosReadFile(pBuf->pFile, pPage, pg->length);
if (ret != pg->length) {
ret = TAOS_SYSTEM_ERROR(errno);
@ -249,10 +261,14 @@ static int32_t loadPageFromDisk(SDiskbasedBuf* pBuf, SPageInfo* pg) {
return 0;
}
static SPageInfo* registerPage(SDiskbasedBuf* pBuf, int32_t pageId) {
static SPageInfo* registerNewPageInfo(SDiskbasedBuf* pBuf, int32_t pageId) {
pBuf->numOfPages += 1;
SPageInfo* ppi = taosMemoryMalloc(sizeof(SPageInfo));
if (ppi == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
ppi->pageId = pageId;
ppi->pData = NULL;
@ -272,46 +288,33 @@ static SListNode* getEldestUnrefedPage(SDiskbasedBuf* pBuf) {
SListNode* pn = NULL;
while ((pn = tdListNext(&iter)) != NULL) {
SPageInfo* pageInfo = *(SPageInfo**)pn->data;
ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn);
SPageInfo* p = *(SPageInfo**)(pageInfo->pData);
ASSERT(pageInfo->pageId >= 0 && pageInfo->pn == pn && p == pageInfo);
if (!pageInfo->used) {
// printf("%d is chosen\n", pageInfo->pageId);
break;
} else {
// printf("page %d is used, dirty:%d\n", pageInfo->pageId, pageInfo->dirty);
}
}
return pn;
}
static char* evacOneDataPage(SDiskbasedBuf* pBuf) {
char* bufPage = NULL;
static char* evictBufPage(SDiskbasedBuf* pBuf) {
SListNode* pn = getEldestUnrefedPage(pBuf);
terrno = 0;
// all pages are referenced by user, try to allocate new space
if (pn == NULL) {
int32_t prev = pBuf->inMemPages;
// increase by 50% of previous mem pages
pBuf->inMemPages = (int32_t)(pBuf->inMemPages * 1.5f);
// qWarn("%p in memory buf page not sufficient, expand from %d to %d, page size:%d", pBuf, prev,
// pBuf->inMemPages, pBuf->pageSize);
} else {
tdListPopNode(pBuf->lruList, pn);
SPageInfo* d = *(SPageInfo**)pn->data;
ASSERTS(d->pn == pn, "d->pn not equal pn");
d->pn = NULL;
taosMemoryFreeClear(pn);
bufPage = flushPageToDisk(pBuf, d);
if (pn == NULL) { // no available buffer pages now, return.
return NULL;
}
return bufPage;
terrno = 0;
tdListPopNode(pBuf->lruList, pn);
SPageInfo* d = *(SPageInfo**)pn->data;
d->pn = NULL;
taosMemoryFreeClear(pn);
return flushBufPage(pBuf, d);
}
static void lruListPushFront(SList* pList, SPageInfo* pi) {
@ -338,13 +341,12 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
SDiskbasedBuf* pPBuf = *pBuf;
if (pPBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
goto _error;
}
pPBuf->pageSize = pagesize;
pPBuf->numOfPages = 0; // all pages are in buffer in the first place
pPBuf->totalBufSize = 0;
pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit.
pPBuf->allocateId = -1;
pPBuf->pFile = NULL;
pPBuf->id = strdup(id);
@ -353,33 +355,69 @@ int32_t createDiskbasedBuf(SDiskbasedBuf** pBuf, int32_t pagesize, int32_t inMem
pPBuf->freePgList = tdListNew(POINTER_BYTES);
// at least more than 2 pages must be in memory
ASSERT(inMemBufSize >= pagesize * 2);
if (inMemBufSize < pagesize * 2) {
inMemBufSize = pagesize * 2;
}
pPBuf->inMemPages = inMemBufSize / pagesize; // maximum allowed pages, it is a soft limit.
pPBuf->lruList = tdListNew(POINTER_BYTES);
if (pPBuf->lruList == NULL) {
goto _error;
}
// init id hash table
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT);
pPBuf->pIdList = taosArrayInit(4, POINTER_BYTES);
if (pPBuf->pIdList == NULL) {
goto _error;
}
pPBuf->assistBuf = taosMemoryMalloc(pPBuf->pageSize + 2); // EXTRA BYTES
pPBuf->all = taosHashInit(10, fn, true, false);
pPBuf->prefix = (char*) dir;
if (pPBuf->assistBuf == NULL) {
goto _error;
}
pPBuf->all = taosHashInit(10, fn, true, false);
if (pPBuf->all == NULL) {
goto _error;
}
pPBuf->prefix = (char*) dir;
pPBuf->emptyDummyIdList = taosArrayInit(1, sizeof(int32_t));
// qDebug("QInfo:0x%"PRIx64" create resBuf for output, page size:%d, inmem buf pages:%d, file:%s", qId,
// pPBuf->pageSize,
// pPBuf->inMemPages, pPBuf->path);
// pPBuf->pageSize, pPBuf->inMemPages, pPBuf->path);
return TSDB_CODE_SUCCESS;
_error:
destroyDiskbasedBuf(pPBuf);
return TSDB_CODE_OUT_OF_MEMORY;
}
static char* doExtractPage(SDiskbasedBuf* pBuf) {
char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
availablePage = evictBufPage(pBuf);
if (availablePage == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
uWarn("no available buf pages, current:%d, max:%d", listNEles(pBuf->lruList), pBuf->inMemPages)
}
} else {
availablePage = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased.
if (availablePage == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
}
}
return availablePage;
}
void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
pBuf->statis.getPages += 1;
char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
availablePage = evacOneDataPage(pBuf);
char* availablePage = doExtractPage(pBuf);
if (availablePage == NULL) {
return NULL;
}
SPageInfo* pi = NULL;
@ -394,7 +432,10 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
*pageId = (++pBuf->allocateId);
// register page id info
pi = registerPage(pBuf, *pageId);
pi = registerNewPageInfo(pBuf, *pageId);
if (pi == NULL) {
return NULL;
}
// add to hash map
taosHashPut(pBuf->all, pageId, sizeof(int32_t), &pi, POINTER_BYTES);
@ -402,63 +443,62 @@ void* getNewBufPage(SDiskbasedBuf* pBuf, int32_t* pageId) {
}
// add to LRU list
ASSERT(listNEles(pBuf->lruList) < pBuf->inMemPages && pBuf->inMemPages > 0);
lruListPushFront(pBuf->lruList, pi);
// allocate buf
if (availablePage == NULL) {
pi->pData =
taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize)); // add extract bytes in case of zipped buffer increased.
} else {
pi->pData = availablePage;
}
pi->pData = availablePage;
((void**)pi->pData)[0] = pi;
#ifdef BUF_PAGE_DEBUG
uDebug("page_getNewBufPage , pi->pData:%p, pageId:%d, offset:%" PRId64, pi->pData, pi->pageId, pi->offset);
#endif
return (void*)(GET_DATA_PAYLOAD(pi));
return (void*)(GET_PAYLOAD_DATA(pi));
}
void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
ASSERT(pBuf != NULL && id >= 0);
if (id < 0) {
terrno = TSDB_CODE_INVALID_PARA;
uError("invalid page id:%d, %s", id, pBuf->id);
return NULL;
}
pBuf->statis.getPages += 1;
SPageInfo** pi = taosHashGet(pBuf->all, &id, sizeof(int32_t));
ASSERT(pi != NULL && *pi != NULL);
if (pi == NULL || *pi == NULL) {
uError("failed to locate the buffer page:%d, %s", id, pBuf->id);
terrno = TSDB_CODE_INVALID_PARA;
return NULL;
}
if ((*pi)->pData != NULL) { // it is in memory
if (BUF_PAGE_IN_MEM(*pi)) { // it is in memory
// no need to update the LRU list if only one page exists
if (pBuf->numOfPages == 1) {
(*pi)->used = true;
return (void*)(GET_DATA_PAYLOAD(*pi));
return (void*)(GET_PAYLOAD_DATA(*pi));
}
SPageInfo** pInfo = (SPageInfo**)((*pi)->pn->data);
ASSERT(*pInfo == *pi);
if (*pInfo != *pi) {
uError("inconsistently data in paged buffer, pInfo:%p, pi:%p, %s", *pInfo, *pi, pBuf->id);
return NULL;
}
lruListMoveToFront(pBuf->lruList, (*pi));
(*pi)->used = true;
#ifdef BUF_PAGE_DEBUG
uDebug("page_getBufPage1 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
#endif
return (void*)(GET_DATA_PAYLOAD(*pi));
return (void*)(GET_PAYLOAD_DATA(*pi));
} else { // not in memory
ASSERT((*pi)->pData == NULL && (*pi)->pn == NULL &&
ASSERT((!BUF_PAGE_IN_MEM(*pi)) && (*pi)->pn == NULL &&
(((*pi)->length >= 0 && (*pi)->offset >= 0) || ((*pi)->length == -1 && (*pi)->offset == -1)));
char* availablePage = NULL;
if (NO_IN_MEM_AVAILABLE_PAGES(pBuf)) {
availablePage = evacOneDataPage(pBuf);
if (availablePage == NULL) {
return NULL;
}
}
(*pi)->pData = doExtractPage(pBuf);
if (availablePage == NULL) {
(*pi)->pData = taosMemoryCalloc(1, getAllocPageSize(pBuf->pageSize));
} else {
(*pi)->pData = availablePage;
// failed to evict buffer page, return with error code.
if ((*pi)->pData == NULL) {
return NULL;
}
// set the ptr to the new SPageInfo
@ -468,23 +508,25 @@ void* getBufPage(SDiskbasedBuf* pBuf, int32_t id) {
(*pi)->used = true;
// some data has been flushed to disk, and needs to be loaded into buffer again.
if ((*pi)->length > 0 && (*pi)->offset >= 0) {
if (HAS_DATA_IN_DISK(*pi)) {
int32_t code = loadPageFromDisk(pBuf, *pi);
if (code != 0) {
terrno = code;
return NULL;
}
}
#ifdef BUF_PAGE_DEBUG
uDebug("page_getBufPage2 pageId:%d, offset:%" PRId64, (*pi)->pageId, (*pi)->offset);
#endif
return (void*)(GET_DATA_PAYLOAD(*pi));
return (void*)(GET_PAYLOAD_DATA(*pi));
}
}
void releaseBufPage(SDiskbasedBuf* pBuf, void* page) {
if (ASSERTS(pBuf != NULL && page != NULL, "pBuf or page is NULL")) {
if (page == NULL) {
return;
}
SPageInfo* ppi = getPageInfoFromPayload(page);
releaseBufPageInfo(pBuf, ppi);
}
@ -493,7 +535,13 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
#ifdef BUF_PAGE_DEBUG
uDebug("page_releaseBufPageInfo pageId:%d, used:%d, offset:%" PRId64, pi->pageId, pi->used, pi->offset);
#endif
if (ASSERTS(pi->pData != NULL, "pi->pData is NULL")) {
if (pi == NULL) {
return;
}
if (pi->pData == NULL) {
uError("pi->pData (page data) is null");
return;
}
@ -504,7 +552,6 @@ void releaseBufPageInfo(SDiskbasedBuf* pBuf, SPageInfo* pi) {
size_t getTotalBufSize(const SDiskbasedBuf* pBuf) { return (size_t)pBuf->totalBufSize; }
SArray* getDataBufPagesIdList(SDiskbasedBuf* pBuf) {
ASSERT(pBuf != NULL);
return pBuf->pIdList;
}
@ -582,7 +629,6 @@ SPageInfo* getLastPageInfo(SArray* pList) {
}
int32_t getPageId(const SPageInfo* pPgInfo) {
ASSERT(pPgInfo != NULL);
return pPgInfo->pageId;
}