adj par name catch

This commit is contained in:
54liuyao 2024-06-20 09:28:04 +08:00
parent da63a40796
commit ecfb5e5f4c
7 changed files with 15 additions and 12 deletions

View File

@ -329,7 +329,7 @@ typedef struct {
typedef struct SStateStore {
int32_t (*streamStatePutParName)(SStreamState* pState, int64_t groupId, const char* tbname);
int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal);
int32_t (*streamStateGetParName)(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache);
int32_t (*streamStateAddIfNotExist)(SStreamState* pState, const SWinKey* key, void** pVal, int32_t* pVLen);
int32_t (*streamStateReleaseBuf)(SStreamState* pState, void* pVal, bool used);

View File

@ -101,7 +101,7 @@ int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname);
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache);
void streamStateReloadInfo(SStreamState* pState, TSKEY ts);

View File

@ -1157,7 +1157,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
pDest->info.parTbName[0] = 0;
if (pInfo->tbnameCalSup.numOfExprs > 0) {
void* tbname = NULL;
if (pAPI->stateStore.streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname) == 0) {
if (pAPI->stateStore.streamStateGetParName(pOperator->pTaskInfo->streamInfo.pState, pParInfo->groupId, &tbname, false) == 0) {
memcpy(pDest->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
pAPI->stateStore.streamStateFreeVal(tbname);
}
@ -1178,7 +1178,7 @@ static SSDataBlock* buildStreamPartitionResult(SOperatorInfo* pOperator) {
void appendCreateTableRow(void* pState, SExprSupp* pTableSup, SExprSupp* pTagSup, uint64_t groupId,
SSDataBlock* pSrcBlock, int32_t rowId, SSDataBlock* pDestBlock, SStateStore* pAPI) {
void* pValue = NULL;
if (pAPI->streamStateGetParName(pState, groupId, &pValue) != 0) {
if (pAPI->streamStateGetParName(pState, groupId, &pValue, true) != 0) {
SSDataBlock* pTmpBlock = blockCopyOneRow(pSrcBlock, rowId);
memset(pTmpBlock->info.parTbName, 0, TSDB_TABLE_NAME_LEN);
pTmpBlock->info.id.groupId = groupId;

View File

@ -1892,7 +1892,7 @@ static int32_t generatePartitionDelResBlock(SStreamScanInfo* pInfo, SSDataBlock*
groupId = calGroupIdByData(&pInfo->partitionSup, pInfo->pPartScalarSup, pPreRes, preJ);
if (pInfo->pPartTbnameSup) {
void* parTbname = NULL;
int32_t code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
int32_t code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname, false);
if (code != TSDB_CODE_SUCCESS) {
calBlockTbName(pInfo, pPreRes, preJ);
memcpy(varDataVal(tbname), pPreRes->info.parTbName, strlen(pPreRes->info.parTbName));
@ -1938,7 +1938,7 @@ static int32_t generateDeleteResultBlockImpl(SStreamScanInfo* pInfo, SSDataBlock
}
if (pInfo->tbnameCalSup.pExprInfo) {
void* parTbname = NULL;
int32_t code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
int32_t code = pInfo->stateStore.streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname, false);
if (code != TSDB_CODE_SUCCESS) {
SSDataBlock* pPreRes = readPreVersionData(pInfo->pTableScanOp, srcUid, srcStartTsCol[i], srcStartTsCol[i], ver);
printDataBlock(pPreRes, "pre res", GET_TASKID(pInfo->pStreamScanOp->pTaskInfo));

View File

@ -707,7 +707,7 @@ static void buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
void* tbname = NULL;
pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname);
pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname, false);
if (tbname == NULL) {
colDataSetNULL(pTableCol, pBlock->info.rows);
} else {

View File

@ -376,7 +376,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
for (int32_t i = *index; i < size; i++) {
SWinKey* pWin = taosArrayGet(pWins, i);
void* tbname = NULL;
pInfo->stateStore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname);
pInfo->stateStore.streamStateGetParName(pInfo->pState, pWin->groupId, &tbname, false);
if (tbname == NULL) {
appendDataToSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL);
} else {
@ -750,7 +750,7 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDat
if (pBlock->info.id.groupId == 0) {
pBlock->info.id.groupId = groupId;
void* tbname = NULL;
if (pAPI->stateStore.streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
if (pAPI->stateStore.streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname, false) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
@ -2276,7 +2276,7 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
void* tbname = NULL;
pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname);
pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname, false);
if (tbname == NULL) {
colDataSetNULL(pTableCol, pBlock->info.rows);
} else {
@ -2446,7 +2446,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
void* tbname = NULL;
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId,
&tbname) < 0) {
&tbname, false) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);

View File

@ -1106,10 +1106,13 @@ int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char
#endif
}
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal, bool onlyCache) {
#ifdef USE_ROCKSDB
void* pStr = tSimpleHashGet(pState->parNameMap, &groupId, sizeof(int64_t));
if (!pStr) {
if (onlyCache) {
return TSDB_CODE_FAILED;
}
int32_t code = streamStateGetParName_rocksdb(pState, groupId, pVal);
if (code == TSDB_CODE_SUCCESS) {
tSimpleHashPut(pState->parNameMap, &groupId, sizeof(int64_t), *pVal, TSDB_TABLE_NAME_LEN);