enh(stream): add tbname map cache
This commit is contained in:
parent
015c00aaca
commit
ab57d069d4
|
@ -504,6 +504,7 @@ typedef struct SStreamScanInfo {
|
|||
STimeWindow updateWin;
|
||||
STimeWindowAggSupp twAggSup;
|
||||
SSDataBlock* pUpdateDataRes;
|
||||
SHashObj* pGroupIdTbNameMap;
|
||||
// status for tmq
|
||||
SNodeList* pGroupTags;
|
||||
SNode* pTagCond;
|
||||
|
@ -893,11 +894,11 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul
|
|||
void doDestroyExchangeOperatorInfo(void* param);
|
||||
|
||||
void setOperatorCompleted(SOperatorInfo* pOperator);
|
||||
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, void* pInfo,
|
||||
SExecTaskInfo* pTaskInfo);
|
||||
void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status,
|
||||
void* pInfo, SExecTaskInfo* pTaskInfo);
|
||||
void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo);
|
||||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr,
|
||||
SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo * pCache);
|
||||
int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock,
|
||||
int32_t rows, const char* idStr, STableMetaCacheInfo* pCache);
|
||||
|
||||
void cleanupAggSup(SAggSupporter* pAggSup);
|
||||
void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle);
|
||||
|
@ -1038,6 +1039,7 @@ void appendOneRowToStreamSpecialBlock(SSDataBlock* pBlock, TSKEY* pStartTs, TSKE
|
|||
uint64_t* pGp, void* pTbName);
|
||||
void printDataBlock(SSDataBlock* pBlock, const char* flag);
|
||||
uint64_t calGroupIdByData(SPartitionBySupporter* pParSup, SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t rowId);
|
||||
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock);
|
||||
|
||||
int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup,
|
||||
SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
|
||||
|
@ -1061,7 +1063,7 @@ int32_t setOutputBuf(SStreamState* pState, STimeWindow* win, SResultRow** pResul
|
|||
int32_t releaseOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult);
|
||||
int32_t saveOutputBuf(SStreamState* pState, SWinKey* pKey, SResultRow* pResult, int32_t resSize);
|
||||
void getNextIntervalWindow(SInterval* pInterval, STimeWindow* tw, int32_t order);
|
||||
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo *pInfo);
|
||||
int32_t qAppendTaskStopInfo(SExecTaskInfo* pTaskInfo, SExchangeOpStopInfo* pInfo);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -1515,10 +1515,17 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
for (int32_t i = 0; i < pSrcBlock->info.rows; i++) {
|
||||
uint64_t srcUid = srcUidData[i];
|
||||
uint64_t groupId = srcGp[i];
|
||||
char* tbname[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN] = {0};
|
||||
if (groupId == 0) {
|
||||
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
|
||||
}
|
||||
appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId, NULL);
|
||||
if (pInfo->tbnameCalSup.pExprInfo) {
|
||||
char* parTbname = taosHashGet(pInfo->pGroupIdTbNameMap, &groupId, sizeof(int64_t));
|
||||
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
|
||||
varDataSetLen(tbname, strlen(varDataVal(tbname)));
|
||||
}
|
||||
appendOneRowToStreamSpecialBlock(pDestBlock, srcStartTsCol + i, srcEndTsCol + i, srcUidData + i, &groupId,
|
||||
tbname[0] == 0 ? NULL : tbname);
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -1562,9 +1569,16 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock*
|
|||
blockDataDestroy(pSrcBlock);
|
||||
}
|
||||
|
||||
static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
|
||||
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
||||
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
|
||||
if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return;
|
||||
if (pBlock == NULL || pBlock->info.rows == 0) return;
|
||||
if (pBlock->info.groupId) {
|
||||
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||
if (tbname != NULL) {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
}
|
||||
|
||||
SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
|
||||
ASSERT(pSrcBlock->info.rows == 1);
|
||||
|
@ -1592,6 +1606,11 @@ static void calBlockTbName(SExprSupp* pTbNameCalSup, SSDataBlock* pBlock) {
|
|||
pBlock->info.parTbName[0] = 0;
|
||||
}
|
||||
|
||||
if (pBlock->info.groupId) {
|
||||
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), pBlock->info.parTbName,
|
||||
TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
|
||||
blockDataDestroy(pSrcBlock);
|
||||
blockDataDestroy(pResBlock);
|
||||
}
|
||||
|
@ -1713,7 +1732,7 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
|||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||
blockDataFreeRes((SSDataBlock*)pBlock);
|
||||
|
||||
calBlockTbName(&pInfo->tbnameCalSup, pInfo->pRes);
|
||||
calBlockTbName(pInfo, pInfo->pRes);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -1960,7 +1979,7 @@ static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) {
|
|||
if (pTaskInfo->streamInfo.recoverStep == STREAM_RECOVER_STEP__SCAN) {
|
||||
SSDataBlock* pBlock = doTableScan(pInfo->pTableScanOp);
|
||||
if (pBlock != NULL) {
|
||||
calBlockTbName(&pInfo->tbnameCalSup, pBlock);
|
||||
calBlockTbName(pInfo, pBlock);
|
||||
updateInfoFillBlockData(pInfo->pUpdateInfo, pBlock, pInfo->primaryTsIndex);
|
||||
qDebug("stream recover scan get block, rows %d", pBlock->info.rows);
|
||||
return pBlock;
|
||||
|
@ -2081,7 +2100,7 @@ FETCH_NEXT_BLOCK:
|
|||
pSDB->info.type = pInfo->scanMode == STREAM_SCAN_FROM_DATAREADER_RANGE ? STREAM_NORMAL : STREAM_PULL_DATA;
|
||||
checkUpdateData(pInfo, true, pSDB, false);
|
||||
// printDataBlock(pSDB, "stream scan update");
|
||||
calBlockTbName(&pInfo->tbnameCalSup, pSDB);
|
||||
calBlockTbName(pInfo, pSDB);
|
||||
return pSDB;
|
||||
}
|
||||
blockDataCleanup(pInfo->pUpdateDataRes);
|
||||
|
@ -2386,6 +2405,7 @@ static void destroyStreamScanOperatorInfo(void* param) {
|
|||
}
|
||||
|
||||
cleanupExprSupp(&pStreamScan->tbnameCalSup);
|
||||
taosHashCleanup(pStreamScan->pGroupIdTbNameMap);
|
||||
|
||||
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
||||
blockDataDestroy(pStreamScan->pRes);
|
||||
|
@ -2443,6 +2463,8 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
|
||||
goto _error;
|
||||
}
|
||||
pInfo->pGroupIdTbNameMap =
|
||||
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||
}
|
||||
|
||||
if (pTableScanNode->pTags != NULL) {
|
||||
|
|
|
@ -328,7 +328,7 @@ int32_t walEndSnapshot(SWal *pWal) {
|
|||
pInfo++;
|
||||
}
|
||||
if (POINTER_DISTANCE(pInfo, pWal->fileInfoSet->pData) > 0) {
|
||||
wDebug("vgId:%d, wal end remove from %" PRId64, pWal->cfg.vgId, pInfo->firstVer);
|
||||
wDebug("vgId:%d, wal end remove for %" PRId64, pWal->cfg.vgId, pInfo->firstVer);
|
||||
} else {
|
||||
wDebug("vgId:%d, wal no remove", pWal->cfg.vgId);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue