Merge branch 'feat/TS-4243-3.0' of https://github.com/taosdata/TDengine into feat/ly-TS-4243-3.0

This commit is contained in:
54liuyao 2024-03-27 09:14:50 +08:00
commit 715489e22d
8 changed files with 48 additions and 16 deletions

View File

@ -191,7 +191,7 @@ typedef struct TsdReader {
typedef struct SStoreCacheReader {
int32_t (*openReader)(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr,
SArray *pFuncTypeList);
SArray *pFuncTypeList, SColumnInfo* pPkCol, int32_t numOfPks);
void *(*closeReader)(void *pReader);
int32_t (*retrieveRows)(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUidList);

View File

@ -181,7 +181,7 @@ void tsdbReaderSetNotifyCb(STsdbReader* pReader, TsdReaderNotifyCbFn not
int32_t tsdbReuseCacherowsReader(void *pReader, void *pTableIdList, int32_t numOfTables);
int32_t tsdbCacherowsReaderOpen(void *pVnode, int32_t type, void *pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray *pCidList, int32_t *pSlotIds, uint64_t suid, void **pReader, const char *idstr,
SArray* pFuncTypeList);
SArray* pFuncTypeList, SColumnInfo* pkCol, int32_t numOfPks);
int32_t tsdbRetrieveCacheRows(void *pReader, SSDataBlock *pResBlock, const int32_t *slotIds, const int32_t *dstSlotIds,
SArray *pTableUids);
void *tsdbCacherowsReaderClose(void *pReader);

View File

@ -2288,6 +2288,8 @@ static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb
.loadTombFn = loadSttTomb,
.pReader = pr,
.idstr = pr->idstr,
.comparFn = pr->pkComparFn,
.pCurRowKey = &pr->rowKey,
};
code = tMergeTreeOpen2(&iter->mergeTree, &conf, NULL);

View File

@ -209,7 +209,7 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf
int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList, int32_t numOfTables, int32_t numOfCols,
SArray* pCidList, int32_t* pSlotIds, uint64_t suid, void** pReader, const char* idstr,
SArray* pFuncTypeList) {
SArray* pFuncTypeList, SColumnInfo* pPkCol, int32_t numOfPks) {
*pReader = NULL;
SCacheRowsReader* p = taosMemoryCalloc(1, sizeof(SCacheRowsReader));
if (p == NULL) {
@ -226,6 +226,15 @@ int32_t tsdbCacherowsReaderOpen(void* pVnode, int32_t type, void* pTableIdList,
p->pSlotIds = pSlotIds;
p->pFuncTypeList = pFuncTypeList;
p->rowKey.numOfPKs = numOfPks;
if (numOfPks > 0) {
p->pkComparFn = getComparFunc(pPkCol->type, 0);
p->rowKey.pks[0].type = pPkCol->type;
if (IS_VAR_DATA_TYPE(pPkCol->type)) {
p->rowKey.pks[0].pData = taosMemoryCalloc(1, pPkCol->bytes);
}
}
if (numOfTables == 0) {
*pReader = p;
return TSDB_CODE_SUCCESS;
@ -359,10 +368,11 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32
for (int32_t j = 0; j < pr->numOfCols; ++j) {
int32_t bytes;
if (slotIds[j] == -1)
if (slotIds[j] == -1) {
bytes = 1;
else
} else {
bytes = pr->pSchema->columns[slotIds[j]].bytes;
}
pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + bytes + VARSTR_HEADER_SIZE);
SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]);

View File

@ -369,6 +369,8 @@ typedef struct SCacheRowsReader {
char* idstr;
int64_t lastTs;
SArray* pFuncTypeList;
__compar_fn_t pkComparFn;
SRowKey rowKey;
} SCacheRowsReader;
int32_t tsdbCacheGetBatch(STsdb* pTsdb, tb_uid_t uid, SArray* pLastArray, SCacheRowsReader* pr, int8_t ltype);

View File

@ -46,6 +46,8 @@ typedef struct SCacheRowsScanInfo {
int32_t indexOfBufferedRes;
STableListInfo* pTableList;
SArray* pFuncTypeList;
int32_t numOfPks;
SColumnInfo pkCol;
} SCacheRowsScanInfo;
static SSDataBlock* doScanCache(SOperatorInfo* pOperator);
@ -106,6 +108,16 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
goto _error;
}
for(int32_t i = 0; i < taosArrayGetSize(pInfo->matchInfo.pList); ++i) {
SColMatchItem* pItem = taosArrayGet(pInfo->matchInfo.pList, i);
if (pItem->isPk) {
pInfo->numOfPks += 1;
pInfo->pkCol.type = pItem->dataType.type; // only record one primary key
pInfo->pkCol.bytes = pItem->dataType.bytes; // only record one primary key
pInfo->pkCol.pk = 1;
}
}
SArray* pCidList = taosArrayInit(numOfCols, sizeof(int16_t));
pInfo->pFuncTypeList = taosArrayInit(taosArrayGetSize(pScanNode->pFuncTypes), sizeof(int32_t));
taosArrayAddAll(pInfo->pFuncTypeList, pScanNode->pFuncTypes);
@ -140,7 +152,8 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe
uint64_t suid = tableListGetSuid(pTableListInfo);
code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, totalTables,
taosArrayGetSize(pInfo->matchInfo.pList), pCidList, pInfo->pSlotIds,
suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pScanNode->pFuncTypes);
suid, &pInfo->pLastrowReader, pTaskInfo->id.str, pScanNode->pFuncTypes,
&pInfo->pkCol, pInfo->numOfPks);
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
@ -282,7 +295,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
if (NULL == pInfo->pLastrowReader) {
code = pInfo->readHandle.api.cacheFn.openReader(pInfo->readHandle.vnode, pInfo->retrieveType, pList, num,
taosArrayGetSize(pInfo->matchInfo.pList), pInfo->pCidList, pInfo->pSlotIds, suid, &pInfo->pLastrowReader,
pTaskInfo->id.str, pInfo->pFuncTypeList);
pTaskInfo->id.str, pInfo->pFuncTypeList, &pInfo->pkCol, pInfo->numOfPks);
if (code != TSDB_CODE_SUCCESS) {
pInfo->currentGroupIndex += 1;
taosArrayClear(pInfo->pUidList);

View File

@ -1632,7 +1632,7 @@ static int32_t translateIrate(SFunctionNode* pFunc, char* pErrBuf, int32_t len)
static int32_t translateIrateImpl(SFunctionNode* pFunc, char* pErrBuf, int32_t len, bool isPartial) {
uint8_t colType = getSDataTypeFromNode(nodesListGetNode(pFunc->pParameterList, 0))->type;
if (isPartial) {
if (3 != LIST_LENGTH(pFunc->pParameterList)) {
if (3 != LIST_LENGTH(pFunc->pParameterList) && 4 != LIST_LENGTH(pFunc->pParameterList)) {
return invaildFuncParaNumErrMsg(pErrBuf, len, pFunc->functionName);
}
if (!IS_NUMERIC_TYPE(colType)) {

View File

@ -6223,12 +6223,17 @@ static void doSaveRateInfo(SRateInfo* pRateInfo, bool isFirst, int64_t ts, char*
}
}
static void initializeRateInfo(SqlFunctionCtx* pCtx, SRateInfo* pRateInfo) {
static void initializeRateInfo(SqlFunctionCtx* pCtx, SRateInfo* pRateInfo, bool isMerge) {
if (pCtx->hasPrimaryKey) {
pRateInfo->pkType = pCtx->input.pPrimaryKey->info.type;
pRateInfo->pkBytes = pCtx->input.pPrimaryKey->info.bytes;
pRateInfo->firstPk = pRateInfo->pkData;
pRateInfo->lastPk = pRateInfo->pkData + pRateInfo->pkBytes;
if (!isMerge) {
pRateInfo->pkType = pCtx->input.pPrimaryKey->info.type;
pRateInfo->pkBytes = pCtx->input.pPrimaryKey->info.bytes;
pRateInfo->firstPk = pRateInfo->pkData;
pRateInfo->lastPk = pRateInfo->pkData + pRateInfo->pkBytes;
} else {
pRateInfo->firstPk = pRateInfo->pkData;
pRateInfo->lastPk = pRateInfo->pkData + pRateInfo->pkBytes;
}
} else {
pRateInfo->firstPk = NULL;
pRateInfo->lastPk = NULL;
@ -6246,7 +6251,7 @@ int32_t irateFunction(SqlFunctionCtx* pCtx) {
funcInputUpdate(pCtx);
initializeRateInfo(pCtx, pRateInfo);
initializeRateInfo(pCtx, pRateInfo, false);
int32_t numOfElems = 0;
int32_t type = pInputCol->info.type;
@ -6368,13 +6373,13 @@ int32_t irateFunctionMerge(SqlFunctionCtx* pCtx) {
}
SRateInfo* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx));
initializeRateInfo(pCtx, pInfo);
initializeRateInfo(pCtx, pInfo, true);
int32_t start = pInput->startRowIndex;
for (int32_t i = start; i < start + pInput->numOfRows; ++i) {
char* data = colDataGetData(pCol, i);
SRateInfo* pInputInfo = (SRateInfo*)varDataVal(data);
initializeRateInfo(pCtx, pInfo);
initializeRateInfo(pCtx, pInfo, true);
if (pInputInfo->hasResult) {
int32_t code = irateTransferInfo(pInputInfo, pInfo);
if (code != TSDB_CODE_SUCCESS) {