Merge branch '3.0' into test3.0/lihui

This commit is contained in:
plum-lihui 2022-06-30 21:25:45 +08:00
commit 4b6ab7fe26
10 changed files with 192 additions and 138 deletions

View File

@ -1547,7 +1547,7 @@ SMqRspObj* tmqBuildRspFromWrapper(SMqPollRspWrapper* pWrapper) {
}
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
/*printf("call poll\n");*/
/*tscDebug("call poll");*/
for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
for (int j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
@ -1708,6 +1708,7 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
/*tscDebug("call poll1");*/
void* rspObj;
int64_t startTime = taosGetTimestampMs();

View File

@ -5402,9 +5402,9 @@ int32_t tFormatOffset(char *buf, int32_t maxLen, const STqOffsetVal *pVal) {
} else if (pVal->type == TMQ_OFFSET__LOG) {
snprintf(buf, maxLen, "offset(log) ver:%ld", pVal->version);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_DATA) {
snprintf(buf, maxLen, "offset(snapshot data) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
snprintf(buf, maxLen, "offset(ss data) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
} else if (pVal->type == TMQ_OFFSET__SNAPSHOT_META) {
snprintf(buf, maxLen, "offset(snapshot meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
snprintf(buf, maxLen, "offset(ss meta) uid:%ld, ts:%ld", pVal->uid, pVal->ts);
} else {
ASSERT(0);
}

View File

@ -154,10 +154,10 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
};
tmsgSendRsp(&rsp);
char buf1[50];
char buf2[50];
tFormatOffset(buf1, 50, &pRsp->reqOffset);
tFormatOffset(buf2, 50, &pRsp->rspOffset);
char buf1[80];
char buf2[80];
tFormatOffset(buf1, 80, &pRsp->reqOffset);
tFormatOffset(buf2, 80, &pRsp->rspOffset);
tqDebug("vg %d from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %s, rspOffset: %s",
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
@ -238,8 +238,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
STqOffsetVal fetchOffsetNew;
// 1.find handle
char buf[50];
tFormatOffset(buf, 50, &reqOffset);
char buf[80];
tFormatOffset(buf, 80, &reqOffset);
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req offset %s", consumerId, pReq->epoch,
TD_VID(pTq->pVnode), buf);
@ -360,7 +360,6 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType);
SMqMetaRsp metaRsp = {0};
metaRsp.reqOffset = pReq->reqOffset.version;
/*tqOffsetResetToLog(&metaR)*/
metaRsp.rspOffset = fetchVer;
metaRsp.resMsgType = pHead->msgType;
metaRsp.metaRspLen = pHead->bodyLen;
@ -380,18 +379,14 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// 2. get data (rebuild reader if needed)
// 3. get new uid and ts
char formatBuf[50];
tFormatOffset(formatBuf, 50, &dataRsp.reqOffset);
tqInfo("retrieve using snapshot req offset %s", formatBuf);
tqInfo("retrieve using snapshot req offset: uid %ld ts %ld", dataRsp.reqOffset.uid, dataRsp.reqOffset.ts);
if (tqScanSnapshot(pTq, &pHandle->execHandle, &dataRsp, fetchOffsetNew, workerId) < 0) {
ASSERT(0);
}
// 4. send rsp
if (dataRsp.blockNum != 0) {
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
if (tqSendDataRsp(pTq, pMsg, pReq, &dataRsp) < 0) {
code = -1;
}
} else if (fetchOffsetNew.type == TMQ_OFFSET__SNAPSHOT_META) {
ASSERT(0);

View File

@ -2837,12 +2837,10 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
if (pInfo->lastStatus.uid != uid || pInfo->lastStatus.ts != ts) {
tsdbSetTableId(pInfo->dataReader, uid);
SQueryTableDataCond tmpCond = pInfo->cond;
tmpCond.twindows[0] = (STimeWindow){
.skey = ts,
.ekey = INT64_MAX,
};
tsdbResetReadHandle(pInfo->dataReader, &tmpCond, 0);
int64_t oldSkey = pInfo->cond.twindows[0].skey;
pInfo->cond.twindows[0].skey = ts;
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
pInfo->cond.twindows[0].skey = oldSkey;
pInfo->scanTimes = 0;
pInfo->curTWinIdx = 0;
}

View File

@ -518,7 +518,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
// if scan table by table
if (pInfo->scanMode == TABLE_SCAN__TABLE_ORDER) {
// check status
while (1) {
SSDataBlock* result = doTableScanGroup(pOperator);
if (result) {
@ -530,7 +529,6 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator) {
return NULL;
}
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, pInfo->currentTable);
/*pTableInfo->uid */
tsdbSetTableId(pInfo->dataReader, pTableInfo->uid);
tsdbResetReadHandle(pInfo->dataReader, &pInfo->cond, 0);
pInfo->scanTimes = 0;

View File

@ -1246,6 +1246,9 @@ void doClearWindowImpl(SResultRowPosition* p1, SDiskbasedBuf* pResultBuf, SExprS
pCtx[i].fpSet.init(&pCtx[i], pResInfo);
}
}
SFilePage* bufPage = getBufPage(pResultBuf, p1->pageId);
setBufPageDirty(bufPage, true);
releaseBufPage(pResultBuf, bufPage);
}
bool doClearWindow(SAggSupporter* pAggSup, SExprSupp* pSup, char* pData, int16_t bytes, uint64_t groupId,
@ -3171,6 +3174,10 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SExprSupp* pSup,
getSessionTimeWindow(pAggSup, tsCols[i], INT64_MIN, pBlock->info.groupId, gap, &winIndex);
step = updateSessionWindowInfo(pCurWin, tsCols, NULL, pBlock->info.rows, i, gap, NULL);
ASSERT(isInWindow(pCurWin, tsCols[i], gap));
if (pCurWin->pos.pageId == -1) {
// window has been closed.
continue;
}
doClearWindowImpl(&pCurWin->pos, pAggSup->pResultBuf, pSup, numOfOutput);
if (result) {
taosArrayPush(result, pCurWin);
@ -3246,12 +3253,12 @@ static void rebuildTimeWindow(SStreamSessionAggOperatorInfo* pInfo, SArray* pWin
setWindowOutputBuf(pChWin, &pChResult, pChild->exprSupp.pCtx, groupId, numOfOutput,
pChild->exprSupp.rowEntryInfoOffset, &pChInfo->streamAggSup, pTaskInfo);
compactFunctions(pSup->pCtx, pChild->exprSupp.pCtx, numOfOutput, pTaskInfo);
SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pChWin->pos.pageId);
setBufPageDirty(bufPage, true);
releaseBufPage(pInfo->streamAggSup.pResultBuf, bufPage);
SFilePage* bufPage = getBufPage(pChInfo->streamAggSup.pResultBuf, pChWin->pos.pageId);
releaseBufPage(pChInfo->streamAggSup.pResultBuf, bufPage);
continue;
} else if (!pChWin->isClosed) {
break;
}
break;
}
}
SFilePage* bufPage = getBufPage(pInfo->streamAggSup.pResultBuf, pParentWin->pos.pageId);
@ -3265,7 +3272,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*);
SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; }
SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; }
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn) {
int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup,
SArray* pClosed, __get_win_info_ fn, bool delete) {
// Todo(liuyao) save window to tdb
void** pIte = NULL;
size_t keyLen = 0;
@ -3279,10 +3287,18 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
if (isCloseWindow(&pSeWin->win, pTwSup)) {
if (!pSeWin->isClosed) {
pSeWin->isClosed = true;
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
if (pTwSup->calTrigger == STREAM_TRIGGER_WINDOW_CLOSE && pClosed) {
int32_t code = saveResult(pSeWin->win.skey, pSeWin->pos.pageId, pSeWin->pos.offset, *pGroupId, pClosed);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pSeWin->isOutput = true;
}
if (delete) {
taosArrayRemove(pWins, i);
i--;
size = taosArrayGetSize(pWins);
}
}
continue;
}
@ -3292,6 +3308,16 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
return TSDB_CODE_SUCCESS;
}
static void closeChildSessionWindow(SArray* pChildren, TSKEY maxTs, bool delete) {
int32_t size = taosArrayGetSize(pChildren);
for (int32_t i = 0; i < size; i++) {
SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i);
SStreamSessionAggOperatorInfo* pChInfo = pChildOp->info;
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
closeSessionWindow(pChInfo->streamAggSup.pResultRows, &pChInfo->twAggSup, NULL, getResWinForSession, delete);
}
}
int32_t getAllSessionWindow(SHashObj* pHashMap, SArray* pClosed, __get_win_info_ fn) {
void** pIte = NULL;
while ((pIte = taosHashIterate(pHashMap, pIte)) != NULL) {
@ -3339,6 +3365,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
if (pBlock == NULL) {
break;
}
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "Final Session Recv" : "Single Session Recv");
if (pBlock->info.type == STREAM_CLEAR) {
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
@ -3385,7 +3412,9 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession);
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getResWinForSession, pInfo->ignoreCloseWindow);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow);
copyUpdateResult(pStUpdated, pUpdated);
taosHashCleanup(pStUpdated);
@ -3437,10 +3466,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
} else if (pOperator->status == OP_RES_TO_RETURN) {
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "Semi Session");
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0) {
if (pBInfo->pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
if (pInfo->pUpdateRes->info.rows == 0) {
// semi interval operator clear disk buffer
@ -3449,9 +3479,11 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
}
// process the rest of the data
pOperator->status = OP_OPENED;
printDataBlock(pInfo->pUpdateRes, "Semi Session");
return pInfo->pUpdateRes;
}
return pInfo->binfo.pRes;
printDataBlock(pBInfo->pRes, "Semi Session");
return pBInfo->pRes;
}
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
@ -3495,21 +3527,24 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
finalizeUpdatedResult(pOperator->exprSupp.numOfExprs, pInfo->streamAggSup.pResultBuf, pUpdated,
pSup->rowEntryInfoOffset);
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
doBuildDeleteDataBlock(pInfo->pStDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "Semi Session");
return pInfo->pDelRes;
}
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pInfo->binfo.pRes->info.rows == 0) {
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
if (pBInfo->pRes->info.rows == 0) {
pOperator->status = OP_EXEC_DONE;
if (pInfo->pUpdateRes->info.rows == 0) {
return NULL;
}
// process the rest of the data
pOperator->status = OP_OPENED;
printDataBlock(pInfo->pUpdateRes, "Semi Session");
return pInfo->pUpdateRes;
}
printDataBlock(pBInfo->pRes, "Semi Session");
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
}
@ -3867,7 +3902,9 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
// restore the value
pOperator->status = OP_RES_TO_RETURN;
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState);
closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated,
getResWinForState, pInfo->ignoreCloseWindow);
closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreCloseWindow);
copyUpdateResult(pSeUpdated, pUpdated);
taosHashCleanup(pSeUpdated);

View File

@ -783,6 +783,13 @@ static EDealRes translateValueImpl(STranslateContext* pCxt, SValueNode* pVal, SD
if (pVal->placeholderNo > 0 || pVal->isNull) {
return DEAL_RES_CONTINUE;
}
if (TSDB_DATA_TYPE_NULL == pVal->node.resType.type) {
// TODO
//pVal->node.resType = targetDt;
pVal->translate = true;
pVal->isNull = true;
return DEAL_RES_CONTINUE;
}
if (pVal->isDuration) {
if (parseNatualDuration(pVal->literal, strlen(pVal->literal), &pVal->datum.i, &pVal->unit, precision) !=
TSDB_CODE_SUCCESS) {
@ -5355,7 +5362,7 @@ static int32_t buildKVRowForAllTags(STranslateContext* pCxt, SCreateSubTableClau
if (code != TSDB_CODE_SUCCESS) {
goto end;
}
} else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL) {
} else if (pVal->node.resType.type != TSDB_DATA_TYPE_NULL && !pVal->isNull) {
char* tmpVal = nodesGetValueFromNode(pVal);
STagVal val = {.cid = pTagSchema->colId, .type = pTagSchema->type};
if (IS_VAR_DATA_TYPE(pTagSchema->type)) {
@ -5622,8 +5629,8 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
return TSDB_CODE_OUT_OF_MEMORY;
}
if (DEAL_RES_ERROR ==
translateValueImpl(pCxt, pStmt->pVal, schemaToDataType(pTableMeta->tableInfo.precision, pSchema))) {
SDataType targetDt = schemaToDataType(pTableMeta->tableInfo.precision, pSchema);
if (DEAL_RES_ERROR == translateValueImpl(pCxt, pStmt->pVal, targetDt)) {
return pCxt->errCode;
}
@ -5632,7 +5639,8 @@ static int32_t buildUpdateTagValReq(STranslateContext* pCxt, SAlterTableStmt* pS
}
pReq->isNull = (TSDB_DATA_TYPE_NULL == pStmt->pVal->node.resType.type);
if (pStmt->pVal->node.resType.type == TSDB_DATA_TYPE_JSON) {
if (targetDt.type == TSDB_DATA_TYPE_JSON) {
pReq->isNull = 0;
if (pStmt->pVal->literal &&
strlen(pStmt->pVal->literal) > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return buildSyntaxErrMsg(&pCxt->msgBuf, "json string too long than 4095", pStmt->pVal->literal);

View File

@ -16,24 +16,24 @@
#define _DEFAULT_SOURCE
#include "tlrucache.h"
#include "os.h"
#include "tdef.h"
#include "taoserror.h"
#include "tlog.h"
#include "tarray.h"
#include "tdef.h"
#include "tlog.h"
typedef struct SLRUEntry SLRUEntry;
typedef struct SLRUEntry SLRUEntry;
typedef struct SLRUEntryTable SLRUEntryTable;
typedef struct SLRUCacheShard SLRUCacheShard;
typedef struct SShardedCache SShardedCache;
typedef struct SShardedCache SShardedCache;
enum {
TAOS_LRU_IN_CACHE = (1 << 0), // Whether this entry is referenced by the hash table.
TAOS_LRU_IN_CACHE = (1 << 0), // Whether this entry is referenced by the hash table.
TAOS_LRU_IS_HIGH_PRI = (1 << 1), // Whether this entry is high priority entry.
TAOS_LRU_IS_HIGH_PRI = (1 << 1), // Whether this entry is high priority entry.
TAOS_LRU_IN_HIGH_PRI_POOL = (1 << 2), // Whether this entry is in high-pri pool.
TAOS_LRU_IN_HIGH_PRI_POOL = (1 << 2), // Whether this entry is in high-pri pool.
TAOS_LRU_HAS_HIT = (1 << 3), // Whether this entry has had any lookups (hits).
TAOS_LRU_HAS_HIT = (1 << 3), // Whether this entry has had any lookups (hits).
};
struct SLRUEntry {
@ -50,18 +50,39 @@ struct SLRUEntry {
char keyData[1];
};
#define TAOS_LRU_ENTRY_IN_CACHE(h) ((h)->flags & TAOS_LRU_IN_CACHE)
#define TAOS_LRU_ENTRY_IN_CACHE(h) ((h)->flags & TAOS_LRU_IN_CACHE)
#define TAOS_LRU_ENTRY_IN_HIGH_POOL(h) ((h)->flags & TAOS_LRU_IN_HIGH_PRI_POOL)
#define TAOS_LRU_ENTRY_IS_HIGH_PRI(h) ((h)->flags & TAOS_LRU_IS_HIGH_PRI)
#define TAOS_LRU_ENTRY_HAS_HIT(h) ((h)->flags & TAOS_LRU_HAS_HIT)
#define TAOS_LRU_ENTRY_IS_HIGH_PRI(h) ((h)->flags & TAOS_LRU_IS_HIGH_PRI)
#define TAOS_LRU_ENTRY_HAS_HIT(h) ((h)->flags & TAOS_LRU_HAS_HIT)
#define TAOS_LRU_ENTRY_SET_IN_CACHE(h, inCache) do { if(inCache) {(h)->flags |= TAOS_LRU_IN_CACHE;} else {(h)->flags &= ~TAOS_LRU_IN_CACHE;} } while(0)
#define TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(h, inHigh) do { if(inHigh) {(h)->flags |= TAOS_LRU_IN_HIGH_PRI_POOL;} else {(h)->flags &= ~TAOS_LRU_IN_HIGH_PRI_POOL;} } while(0)
#define TAOS_LRU_ENTRY_SET_PRIORITY(h, priority) do { if(priority == TAOS_LRU_PRIORITY_HIGH) {(h)->flags |= TAOS_LRU_IS_HIGH_PRI;} else {(h)->flags &= ~TAOS_LRU_IS_HIGH_PRI;} } while(0)
#define TAOS_LRU_ENTRY_SET_IN_CACHE(h, inCache) \
do { \
if (inCache) { \
(h)->flags |= TAOS_LRU_IN_CACHE; \
} else { \
(h)->flags &= ~TAOS_LRU_IN_CACHE; \
} \
} while (0)
#define TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(h, inHigh) \
do { \
if (inHigh) { \
(h)->flags |= TAOS_LRU_IN_HIGH_PRI_POOL; \
} else { \
(h)->flags &= ~TAOS_LRU_IN_HIGH_PRI_POOL; \
} \
} while (0)
#define TAOS_LRU_ENTRY_SET_PRIORITY(h, priority) \
do { \
if (priority == TAOS_LRU_PRIORITY_HIGH) { \
(h)->flags |= TAOS_LRU_IS_HIGH_PRI; \
} else { \
(h)->flags &= ~TAOS_LRU_IS_HIGH_PRI; \
} \
} while (0)
#define TAOS_LRU_ENTRY_SET_HIT(h) ((h)->flags |= TAOS_LRU_HAS_HIT)
#define TAOS_LRU_ENTRY_HAS_REFS(h) ((h)->refs > 0)
#define TAOS_LRU_ENTRY_REF(h) (++(h)->refs)
#define TAOS_LRU_ENTRY_REF(h) (++(h)->refs)
static bool taosLRUEntryUnref(SLRUEntry *entry) {
assert(entry->refs > 0);
@ -90,7 +111,7 @@ struct SLRUEntryTable {
static int taosLRUEntryTableInit(SLRUEntryTable *table, int maxUpperHashBits) {
table->lengthBits = 4;
table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry*));
table->list = taosMemoryCalloc(1 << table->lengthBits, sizeof(SLRUEntry *));
if (!table->list) {
return -1;
}
@ -125,7 +146,7 @@ static void taosLRUEntryTableCleanup(SLRUEntryTable *table) {
taosMemoryFree(table->list);
}
static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable * table, const void *key, size_t keyLen, uint32_t hash) {
static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) {
SLRUEntry **entry = &table->list[hash >> (32 - table->lengthBits)];
while (*entry && ((*entry)->hash != hash || memcmp(key, (*entry)->keyData, keyLen) != 0)) {
entry = &(*entry)->nextHash;
@ -134,7 +155,7 @@ static SLRUEntry **taosLRUEntryTableFindPtr(SLRUEntryTable * table, const void *
return entry;
}
static void taosLRUEntryTableResize(SLRUEntryTable * table) {
static void taosLRUEntryTableResize(SLRUEntryTable *table) {
int lengthBits = table->lengthBits;
if (lengthBits >= table->maxLengthBits) {
return;
@ -144,9 +165,9 @@ static void taosLRUEntryTableResize(SLRUEntryTable * table) {
return;
}
uint32_t oldLength = 1 << lengthBits;
int newLengthBits = lengthBits + 1;
SLRUEntry **newList = taosMemoryCalloc(1 << newLengthBits, sizeof(SLRUEntry*));
uint32_t oldLength = 1 << lengthBits;
int newLengthBits = lengthBits + 1;
SLRUEntry **newList = taosMemoryCalloc(1 << newLengthBits, sizeof(SLRUEntry *));
if (!newList) {
return;
}
@ -154,8 +175,8 @@ static void taosLRUEntryTableResize(SLRUEntryTable * table) {
for (uint32_t i = 0; i < oldLength; ++i) {
SLRUEntry *entry = table->list[i];
while (entry) {
SLRUEntry *next = entry->nextHash;
uint32_t hash = entry->hash;
SLRUEntry *next = entry->nextHash;
uint32_t hash = entry->hash;
SLRUEntry **ptr = &newList[hash >> (32 - newLengthBits)];
entry->nextHash = *ptr;
*ptr = entry;
@ -170,13 +191,13 @@ static void taosLRUEntryTableResize(SLRUEntryTable * table) {
table->lengthBits = newLengthBits;
}
static SLRUEntry *taosLRUEntryTableLookup(SLRUEntryTable * table, const void *key, size_t keyLen, uint32_t hash) {
static SLRUEntry *taosLRUEntryTableLookup(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) {
return *taosLRUEntryTableFindPtr(table, key, keyLen, hash);
}
static SLRUEntry *taosLRUEntryTableInsert(SLRUEntryTable * table, SLRUEntry *entry) {
static SLRUEntry *taosLRUEntryTableInsert(SLRUEntryTable *table, SLRUEntry *entry) {
SLRUEntry **ptr = taosLRUEntryTableFindPtr(table, entry->keyData, entry->keyLength, entry->hash);
SLRUEntry *old = *ptr;
SLRUEntry *old = *ptr;
entry->nextHash = (old == NULL) ? NULL : old->nextHash;
*ptr = entry;
if (old == NULL) {
@ -189,9 +210,9 @@ static SLRUEntry *taosLRUEntryTableInsert(SLRUEntryTable * table, SLRUEntry *ent
return old;
}
static SLRUEntry *taosLRUEntryTableRemove(SLRUEntryTable * table, const void *key, size_t keyLen, uint32_t hash) {
static SLRUEntry *taosLRUEntryTableRemove(SLRUEntryTable *table, const void *key, size_t keyLen, uint32_t hash) {
SLRUEntry **entry = taosLRUEntryTableFindPtr(table, key, keyLen, hash);
SLRUEntry *result = *entry;
SLRUEntry *result = *entry;
if (result) {
*entry = result->nextHash;
--table->elems;
@ -201,17 +222,17 @@ static SLRUEntry *taosLRUEntryTableRemove(SLRUEntryTable * table, const void *ke
}
struct SLRUCacheShard {
size_t capacity;
size_t highPriPoolUsage;
bool strictCapacity;
double highPriPoolRatio;
double highPriPoolCapacity;
SLRUEntry lru;
SLRUEntry *lruLowPri;
SLRUEntryTable table;
size_t usage; // Memory size for entries residing in the cache.
size_t lruUsage; // Memory size for entries residing only in the LRU list.
TdThreadMutex mutex;
size_t capacity;
size_t highPriPoolUsage;
bool strictCapacity;
double highPriPoolRatio;
double highPriPoolCapacity;
SLRUEntry lru;
SLRUEntry *lruLowPri;
SLRUEntryTable table;
size_t usage; // Memory size for entries residing in the cache.
size_t lruUsage; // Memory size for entries residing only in the LRU list.
TdThreadMutex mutex;
};
#define TAOS_LRU_CACHE_SHARD_HASH32(key, len) (MurmurHash3_32((key), (len)))
@ -231,8 +252,7 @@ static void taosLRUCacheShardLRUInsert(SLRUCacheShard *shard, SLRUEntry *e) {
assert(e->next == NULL);
assert(e->prev == NULL);
if (shard->highPriPoolRatio > 0
&& (TAOS_LRU_ENTRY_IS_HIGH_PRI(e) || TAOS_LRU_ENTRY_HAS_HIT(e))) {
if (shard->highPriPoolRatio > 0 && (TAOS_LRU_ENTRY_IS_HIGH_PRI(e) || TAOS_LRU_ENTRY_HAS_HIT(e))) {
e->next = &shard->lru;
e->prev = shard->lru.prev;
@ -248,7 +268,7 @@ static void taosLRUCacheShardLRUInsert(SLRUCacheShard *shard, SLRUEntry *e) {
e->prev->next = e;
e->next->prev = e;
TAOS_LRU_ENTRY_SET_IN_HIGH_POOL(e, false);
shard->lruLowPri = e;
}
@ -304,13 +324,13 @@ static void taosLRUCacheShardSetCapacity(SLRUCacheShard *shard, size_t capacity)
for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) {
SLRUEntry *entry = taosArrayGetP(lastReferenceList, i);
taosLRUEntryFree(entry);
taosLRUEntryFree(entry);
}
taosArrayDestroy(lastReferenceList);
}
static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict,
double highPriPoolRatio, int maxUpperHashBits) {
static int taosLRUCacheShardInit(SLRUCacheShard *shard, size_t capacity, bool strict, double highPriPoolRatio,
int maxUpperHashBits) {
if (taosLRUEntryTableInit(&shard->table, maxUpperHashBits) < 0) {
return -1;
}
@ -341,23 +361,24 @@ static void taosLRUCacheShardCleanup(SLRUCacheShard *shard) {
taosLRUEntryTableCleanup(&shard->table);
}
static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle, bool freeOnFail) {
static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *e, LRUHandle **handle,
bool freeOnFail) {
LRUStatus status = TAOS_LRU_STATUS_OK;
SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES);
SArray *lastReferenceList = taosArrayInit(16, POINTER_BYTES);
taosThreadMutexLock(&shard->mutex);
taosLRUCacheShardEvictLRU(shard, e->totalCharge, lastReferenceList);
if (shard->usage + e->totalCharge > shard->capacity && (shard->strictCapacity || handle == NULL)) {
TAOS_LRU_ENTRY_SET_IN_CACHE(e, false);
if (handle == NULL) {
taosArrayPush(lastReferenceList, &e);
} else {
if (freeOnFail) {
taosMemoryFree(e);
taosMemoryFree(e);
*handle = NULL;
*handle = NULL;
}
status = TAOS_LRU_STATUS_INCOMPLETE;
@ -371,21 +392,21 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
assert(TAOS_LRU_ENTRY_IN_CACHE(old));
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
taosLRUCacheShardLRURemove(shard, old);
assert(shard->usage >= old->totalCharge);
shard->usage -= old->totalCharge;
taosLRUCacheShardLRURemove(shard, old);
assert(shard->usage >= old->totalCharge);
shard->usage -= old->totalCharge;
taosArrayPush(lastReferenceList, &old);
taosArrayPush(lastReferenceList, &old);
}
}
if (handle == NULL) {
taosLRUCacheShardLRUInsert(shard, e);
} else {
if (!TAOS_LRU_ENTRY_HAS_REFS(e)) {
TAOS_LRU_ENTRY_REF(e);
TAOS_LRU_ENTRY_REF(e);
}
*handle = (LRUHandle*) e;
*handle = (LRUHandle *)e;
}
}
@ -394,7 +415,7 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) {
SLRUEntry *entry = taosArrayGetP(lastReferenceList, i);
taosLRUEntryFree(entry);
taosLRUEntryFree(entry);
}
taosArrayDestroy(lastReferenceList);
@ -402,8 +423,8 @@ static LRUStatus taosLRUCacheShardInsertEntry(SLRUCacheShard *shard, SLRUEntry *
}
static LRUStatus taosLRUCacheShardInsert(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash,
void *value, size_t charge, _taos_lru_deleter_t deleter,
LRUHandle **handle, LRUPriority priority) {
void *value, size_t charge, _taos_lru_deleter_t deleter, LRUHandle **handle,
LRUPriority priority) {
SLRUEntry *e = taosMemoryCalloc(1, sizeof(SLRUEntry) - 1 + keyLen);
if (!e) {
return TAOS_LRU_STATUS_FAIL;
@ -442,7 +463,7 @@ static LRUHandle *taosLRUCacheShardLookup(SLRUCacheShard *shard, const void *key
taosThreadMutexUnlock(&shard->mutex);
return (LRUHandle *) e;
return (LRUHandle *)e;
}
static void taosLRUCacheShardErase(SLRUCacheShard *shard, const void *key, size_t keyLen, uint32_t hash) {
@ -482,7 +503,7 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) {
TAOS_LRU_ENTRY_SET_IN_CACHE(old, false);
assert(shard->usage >= old->totalCharge);
shard->usage -= old->totalCharge;
taosArrayPush(lastReferenceList, &old);
}
@ -491,14 +512,14 @@ static void taosLRUCacheShardEraseUnrefEntries(SLRUCacheShard *shard) {
for (int i = 0; i < taosArrayGetSize(lastReferenceList); ++i) {
SLRUEntry *entry = taosArrayGetP(lastReferenceList, i);
taosLRUEntryFree(entry);
taosLRUEntryFree(entry);
}
taosArrayDestroy(lastReferenceList);
}
static bool taosLRUCacheShardRef(SLRUCacheShard *shard, LRUHandle *handle) {
SLRUEntry *e = (SLRUEntry *) handle;
SLRUEntry *e = (SLRUEntry *)handle;
taosThreadMutexLock(&shard->mutex);
assert(TAOS_LRU_ENTRY_HAS_REFS(e));
@ -514,8 +535,8 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b
return false;
}
SLRUEntry *e = (SLRUEntry *) handle;
bool lastReference = false;
SLRUEntry *e = (SLRUEntry *)handle;
bool lastReference = false;
taosThreadMutexLock(&shard->mutex);
@ -537,7 +558,7 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b
assert(shard->usage >= e->totalCharge);
shard->usage -= e->totalCharge;
}
taosThreadMutexUnlock(&shard->mutex);
if (lastReference) {
@ -549,7 +570,7 @@ static bool taosLRUCacheShardRelease(SLRUCacheShard *shard, LRUHandle *handle, b
static size_t taosLRUCacheShardGetUsage(SLRUCacheShard *shard) {
size_t usage = 0;
taosThreadMutexLock(&shard->mutex);
usage = shard->usage;
taosThreadMutexUnlock(&shard->mutex);
@ -559,7 +580,7 @@ static size_t taosLRUCacheShardGetUsage(SLRUCacheShard *shard) {
static size_t taosLRUCacheShardGetPinnedUsage(SLRUCacheShard *shard) {
size_t usage = 0;
taosThreadMutexLock(&shard->mutex);
assert(shard->usage >= shard->lruUsage);
@ -579,11 +600,11 @@ static void taosLRUCacheShardSetStrictCapacity(SLRUCacheShard *shard, bool stric
}
struct SShardedCache {
uint32_t shardMask;
TdThreadMutex capacityMutex;
size_t capacity;
bool strictCapacity;
uint64_t lastId; // atomic var for last id
uint32_t shardMask;
TdThreadMutex capacityMutex;
size_t capacity;
bool strictCapacity;
uint64_t lastId; // atomic var for last id
};
struct SLRUCache {
@ -593,7 +614,7 @@ struct SLRUCache {
};
static int getDefaultCacheShardBits(size_t capacity) {
int numShardBits = 0;
int numShardBits = 0;
size_t minShardSize = 512 * 1024;
size_t numShards = capacity / minShardSize;
while (numShards >>= 1) {
@ -621,7 +642,7 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
numShardBits = getDefaultCacheShardBits(capacity);
}
int numShards = 1 << numShardBits;
int numShards = 1 << numShardBits;
cache->shards = taosMemoryCalloc(numShards, sizeof(SLRUCacheShard));
if (!cache->shards) {
taosMemoryFree(cache);
@ -629,7 +650,7 @@ SLRUCache *taosLRUCacheInit(size_t capacity, int numShardBits, double highPriPoo
return NULL;
}
bool strictCapacity = 1;
bool strictCapacity = 1;
size_t perShard = (capacity + (numShards - 1)) / numShards;
for (int i = 0; i < numShards; ++i) {
taosLRUCacheShardInit(&cache->shards[i], perShard, strictCapacity, highPriPoolRatio, 32 - numShardBits);
@ -653,7 +674,7 @@ void taosLRUCacheCleanup(SLRUCache *cache) {
int numShards = cache->numShards;
assert(numShards > 0);
for (int i = 0; i < numShards; ++i) {
taosLRUCacheShardCleanup(&cache->shards[i]);
taosLRUCacheShardCleanup(&cache->shards[i]);
}
taosMemoryFree(cache->shards);
cache->shards = 0;
@ -666,11 +687,12 @@ void taosLRUCacheCleanup(SLRUCache *cache) {
}
LRUStatus taosLRUCacheInsert(SLRUCache *cache, const void *key, size_t keyLen, void *value, size_t charge,
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority) {
_taos_lru_deleter_t deleter, LRUHandle **handle, LRUPriority priority) {
uint32_t hash = TAOS_LRU_CACHE_SHARD_HASH32(key, keyLen);
uint32_t shardIndex = hash & cache->shardedCache.shardMask;
return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, handle, priority);
return taosLRUCacheShardInsert(&cache->shards[shardIndex], key, keyLen, hash, value, charge, deleter, handle,
priority);
}
LRUHandle *taosLRUCacheLookup(SLRUCache *cache, const void *key, size_t keyLen) {
@ -699,7 +721,7 @@ bool taosLRUCacheRef(SLRUCache *cache, LRUHandle *handle) {
return false;
}
uint32_t hash = ((SLRUEntry *) handle)->hash;
uint32_t hash = ((SLRUEntry *)handle)->hash;
uint32_t shardIndex = hash & cache->shardedCache.shardMask;
return taosLRUCacheShardRef(&cache->shards[shardIndex], handle);
@ -710,15 +732,13 @@ bool taosLRUCacheRelease(SLRUCache *cache, LRUHandle *handle, bool eraseIfLastRe
return false;
}
uint32_t hash = ((SLRUEntry *) handle)->hash;
uint32_t hash = ((SLRUEntry *)handle)->hash;
uint32_t shardIndex = hash & cache->shardedCache.shardMask;
return taosLRUCacheShardRelease(&cache->shards[shardIndex], handle, eraseIfLastRef);
}
void* taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle) {
return ((SLRUEntry*) handle)->value;
}
void *taosLRUCacheValue(SLRUCache *cache, LRUHandle *handle) { return ((SLRUEntry *)handle)->value; }
size_t taosLRUCacheGetUsage(SLRUCache *cache) {
size_t usage = 0;
@ -742,7 +762,7 @@ size_t taosLRUCacheGetPinnedUsage(SLRUCache *cache) {
void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity) {
uint32_t numShards = cache->numShards;
size_t perShard = (capacity + (numShards = 1)) / numShards;
size_t perShard = (capacity + (numShards - 1)) / numShards;
taosThreadMutexLock(&cache->shardedCache.capacityMutex);
@ -751,7 +771,7 @@ void taosLRUCacheSetCapacity(SLRUCache *cache, size_t capacity) {
}
cache->shardedCache.capacity = capacity;
taosThreadMutexUnlock(&cache->shardedCache.capacityMutex);
}
@ -777,7 +797,7 @@ void taosLRUCacheSetStrictCapacity(SLRUCache *cache, bool strict) {
}
cache->shardedCache.strictCapacity = strict;
taosThreadMutexUnlock(&cache->shardedCache.capacityMutex);
}

View File

@ -111,7 +111,7 @@ endi
$consumerId = 0
$totalMsgOfStb = $ctbNum * $rowsPerCtb
$expectmsgcnt = 1
$expectmsgcnt = 1000000
$expectrowcnt = 100
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
@ -131,9 +131,6 @@ endi
if $data[0][1] != $consumerId then
return -1
endi
if $data[0][2] != $expectmsgcnt then
return -1
endi
if $data[0][3] != $expectrowcnt then
return -1
endi
@ -183,7 +180,7 @@ endi
$consumerId = 0
$totalMsgOfCtb = $rowsPerCtb
$expectmsgcnt = 1
$expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ctb
@ -254,7 +251,7 @@ endi
$consumerId = 0
$totalMsgOfNtb = $rowsPerCtb
$expectmsgcnt = $totalMsgOfNtb
$expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
print == start consumer to pull msgs from ntb

View File

@ -80,7 +80,7 @@ $topicList = $topicList . '
$consumerId = 0
$totalMsgOfOneTopic = $ctbNum * $rowsPerCtb
$totalMsgOfStb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfStb
$expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
@ -168,7 +168,7 @@ $consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfCtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfCtb
$expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )
$topicList = ' . topic_ctb_function
@ -245,7 +245,7 @@ $topicList = $topicList . '
$consumerId = 0
$totalMsgOfOneTopic = $rowsPerCtb
$totalMsgOfNtb = $totalMsgOfOneTopic * $topicNum
$expectmsgcnt = $totalMsgOfNtb
$expectmsgcnt = 1000000
sql insert into consumeinfo values (now , $consumerId , $topicList , $keyList , $expectmsgcnt , $ifcheckdata , $ifmanualcommit )