diff --git a/include/libs/stream/streamState.h b/include/libs/stream/streamState.h index 9443df5e14..7234d306a4 100644 --- a/include/libs/stream/streamState.h +++ b/include/libs/stream/streamState.h @@ -35,6 +35,7 @@ typedef struct { TTB* pFuncStateDb; TTB* pFillStateDb; // todo refactor TTB* pSessionStateDb; + TTB* pParNameDb; TXN txn; int32_t number; } SStreamState; @@ -99,6 +100,9 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname); +int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal); + #if 0 char* streamStateSessionDump(SStreamState* pState); #endif diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 45873f2744..5c5ba1205e 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -398,6 +398,35 @@ bool tqNextDataBlock(STqReader* pReader) { return false; } +int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) { + int32_t code; + + int32_t cnt = 0; + for (int32_t i = 0; i < pSrc->nCols; i++) { + cnt += mask[i]; + } + + pDst->nCols = cnt; + pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema)); + if (pDst->pSchema == NULL) { + return -1; + } + + int32_t j = 0; + for (int32_t i = 0; i < pSrc->nCols; i++) { + if (mask[i]) { + pDst->pSchema[j++] = pSrc->pSchema[i]; + SColumnInfoData colInfo = + createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId); + code = blockDataAppendColInfo(pBlock, &colInfo); + if (code != 0) { + return -1; + } + } + } + return 0; +} + bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) { while (1) { if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { @@ -527,6 +556,119 @@ FAIL: return -1; } +int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* schemas) { + int32_t sversion = htonl(pReader->pBlock->sversion); + + if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion || + pReader->cachedSchemaSuid != pReader->msgIter.suid) { + if (pReader->pSchema) taosMemoryFree(pReader->pSchema); + pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1); + if (pReader->pSchema == NULL) { + tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table", + pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer); + /*ASSERT(0);*/ + pReader->cachedSchemaSuid = 0; + terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; + return -1; + } + + if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper); + pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1); + if (pReader->pSchemaWrapper == NULL) { + tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table", + pReader->msgIter.uid, pReader->cachedSchemaVer); + /*ASSERT(0);*/ + pReader->cachedSchemaSuid = 0; + terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND; + return -1; + } + pReader->cachedSchemaVer = sversion; + pReader->cachedSchemaSuid = pReader->msgIter.suid; + } + + STSchema* pTschema = pReader->pSchema; + SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; + + int32_t colAtMost = pSchemaWrapper->nCols; + + int32_t curRow = 0; + + char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols); + if (assigned) return -1; + + tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter); + STSRowIter iter = {0}; + tdSTSRowIterInit(&iter, pTschema); + STSRow* row; + + while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) { + bool buildNew = false; + tdSTSRowIterReset(&iter, row); + + for (int32_t i = 0; i < colAtMost; i++) { + SCellVal sVal = {0}; + if (!tdSTSRowIterFetch(&iter, pSchemaWrapper->pSchema[i].colId, pSchemaWrapper->pSchema[i].type, &sVal)) { + break; + } + if (curRow == 0) { + assigned[i] = sVal.valType != TD_VTYPE_NONE; + buildNew = true; + } else { + bool currentRowAssigned = sVal.valType != TD_VTYPE_NONE; + if (currentRowAssigned != assigned[i]) { + assigned[i] = currentRowAssigned; + buildNew = true; + } + } + } + + if (buildNew) { + SSDataBlock block; + SSchemaWrapper sw; + if (tqMaskBlock(&sw, &block, pSchemaWrapper, assigned) < 0) { + goto FAIL; + } + + taosArrayPush(blocks, &block); + taosArrayPush(schemas, &sw); + } + + SSDataBlock* pBlock = taosArrayGetLast(blocks); + pBlock->info.uid = pReader->msgIter.uid; + pBlock->info.rows = pReader->msgIter.numOfRows; + pBlock->info.version = pReader->pMsg->version; + + if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows - curRow) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } + + tdSTSRowIterInit(&iter, pTschema); + for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) { + SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i); + SCellVal sVal = {0}; + + if (!tdSTSRowIterFetch(&iter, pColData->info.colId, pColData->info.type, &sVal)) { + break; + } + + ASSERT(sVal.valType != TD_VTYPE_NONE); + + if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) { + goto FAIL; + } + } + curRow++; + } + + taosMemoryFree(assigned); + return 0; + +FAIL: + taosMemoryFree(assigned); + return -1; +} + void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; } int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0e2635459d..d083cd4d21 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -504,7 +504,6 @@ typedef struct SStreamScanInfo { STimeWindow updateWin; STimeWindowAggSupp twAggSup; SSDataBlock* pUpdateDataRes; - SHashObj* pGroupIdTbNameMap; // status for tmq SNodeList* pGroupTags; SNode* pTagCond; @@ -612,7 +611,6 @@ typedef struct SStreamIntervalOperatorInfo { SArray* pChildren; SStreamState* pState; SWinKey delKey; - SHashObj* pGroupIdTbNameMap; // uint64_t -> char[TSDB_TABLE_NAME_LEN] } SStreamIntervalOperatorInfo; typedef struct SAggOperatorInfo { @@ -745,7 +743,6 @@ typedef struct SStreamSessionAggOperatorInfo { SPhysiNode* pPhyNode; // create new child bool isFinal; bool ignoreExpiredData; - SHashObj* pGroupIdTbNameMap; } SStreamSessionAggOperatorInfo; typedef struct SStreamStateAggOperatorInfo { @@ -761,7 +758,6 @@ typedef struct SStreamStateAggOperatorInfo { void* pDelIterator; SArray* pChildren; // cache for children's result; bool ignoreExpiredData; - SHashObj* pGroupIdTbNameMap; } SStreamStateAggOperatorInfo; typedef struct SStreamPartitionOperatorInfo { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index cf890ac3d7..6ee0f11e48 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1054,7 +1054,7 @@ void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pCol } SFilterColumnParam param1 = {.numOfCols = taosArrayGetSize(pBlock->pDataBlock), .pDataBlock = pBlock->pDataBlock}; - int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); + int32_t code = filterSetDataFromSlotId(pFilterInfo, ¶m1); SColumnInfoData* p = NULL; int32_t status = 0; @@ -1064,7 +1064,7 @@ void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pCol extractQualifiedTupleByFilterResult(pBlock, p, keep, status); if (pColMatchInfo != NULL) { - size_t size = taosArrayGetSize(pColMatchInfo->pList); + size_t size = taosArrayGetSize(pColMatchInfo->pList); for (int32_t i = 0; i < size; ++i) { SColMatchItem* pInfo = taosArrayGet(pColMatchInfo->pList, i); if (pInfo->colId == PRIMARYKEY_TIMESTAMP_COL_ID) { @@ -1336,27 +1336,14 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr pBlock->info.groupId = 0; ASSERT(!pbInfo->mergeResultBlock); doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); - if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { - SStreamStateAggOperatorInfo* pInfo = pOperator->info; - char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); - if (tbname != NULL) { - memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - } else { - pBlock->info.parTbName[0] = 0; - } - } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || - pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || - pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { - SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - - char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); - if (tbname != NULL) { - memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - } else { - pBlock->info.parTbName[0] = 0; - } + void* tbname = NULL; + if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) { + pBlock->info.parTbName[0] = 0; + } else { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); } + tdbFree(tbname); } void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, @@ -2145,7 +2132,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator) { break; } - doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo ); + doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo); if (fillResult->info.rows > 0) { break; } @@ -2372,9 +2359,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock; pInfo->groupId = UINT64_MAX; - setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = - createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL); + setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, + pTaskInfo); + pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL); if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; @@ -3351,13 +3338,13 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat if (pBlock->info.groupId == 0) { pBlock->info.groupId = pPos->groupId; - SStreamIntervalOperatorInfo* pInfo = pOperator->info; - char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); - if (tbname != NULL) { - memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - } else { + void* tbname = NULL; + if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) { pBlock->info.parTbName[0] = 0; + } else { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); } + tdbFree(tbname); } else { // current value belongs to different group, it can't be packed into one datablock if (pBlock->info.groupId != pPos->groupId) { @@ -3443,30 +3430,13 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta if (pBlock->info.groupId == 0) { pBlock->info.groupId = pKey->groupId; - if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { - SStreamStateAggOperatorInfo* pInfo = pOperator->info; - - char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); - if (tbname != NULL) { - memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - } else { - pBlock->info.parTbName[0] = 0; - } - } else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || - pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || - pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { - SStreamSessionAggOperatorInfo* pInfo = pOperator->info; - - char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); - if (tbname != NULL) { - memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - } else { - pBlock->info.parTbName[0] = 0; - } + void* tbname = NULL; + if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) { + pBlock->info.parTbName[0] = 0; } else { - ASSERT(0); + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); } - + tdbFree(tbname); } else { // current value belongs to different group, it can't be packed into one datablock if (pBlock->info.groupId != pKey->groupId) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index d9b985516e..37d5e877ec 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -524,7 +524,7 @@ static STableCachedVal* createTableCacheVal(const SMetaReader* pMetaReader) { // only child table has tag value if (pMetaReader->me.type == TSDB_CHILD_TABLE) { - STag* pTag = (STag*) pMetaReader->me.ctbEntry.pTags; + STag* pTag = (STag*)pMetaReader->me.ctbEntry.pTags; pVal->pTags = taosMemoryMalloc(pTag->len); memcpy(pVal->pTags, pTag, pTag->len); } @@ -560,7 +560,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int code = metaGetTableEntryByUid(&mr, pBlock->info.uid); if (code != TSDB_CODE_SUCCESS) { if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { - qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); + qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, + tstrerror(terrno), idStr); } else { qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); } @@ -583,9 +584,11 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int code = metaGetTableEntryByUid(&mr, pBlock->info.uid); if (code != TSDB_CODE_SUCCESS) { if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) { - qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); + qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", + pBlock->info.uid, tstrerror(terrno), idStr); } else { - qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr); + qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), + idStr); } metaReaderClear(&mr); return terrno; @@ -1539,7 +1542,9 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version); } if (pInfo->tbnameCalSup.pExprInfo) { - char* parTbname = taosHashGet(pInfo->pGroupIdTbNameMap, &groupId, sizeof(int64_t)); + void* parTbname = NULL; + streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname); + memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN); varDataSetLen(tbname, strlen(varDataVal(tbname))); } @@ -1589,15 +1594,18 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock* } void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { - SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; + SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup; + SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState; if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return; if (pBlock == NULL || pBlock->info.rows == 0) return; - if (pBlock->info.groupId) { - char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t)); - if (tbname != NULL) { - memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); - } + + void* tbname = NULL; + if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) { + pBlock->info.parTbName[0] = 0; + } else { + memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); } + tdbFree(tbname); SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0); ASSERT(pSrcBlock->info.rows == 1); @@ -1625,9 +1633,8 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) { pBlock->info.parTbName[0] = 0; } - if (pBlock->info.groupId) { - taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), pBlock->info.parTbName, - TSDB_TABLE_NAME_LEN); + if (pBlock->info.groupId && pBlock->info.parTbName[0]) { + streamStatePutParName(pState, pBlock->info.groupId, pBlock->info.parTbName); } blockDataDestroy(pSrcBlock); @@ -2030,6 +2037,9 @@ FETCH_NEXT_BLOCK: int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); + if (pBlock->info.groupId && pBlock->info.parTbName[0]) { + streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, pBlock->info.parTbName); + } // TODO move into scan pBlock->info.calWin.skey = INT64_MIN; pBlock->info.calWin.ekey = INT64_MAX; @@ -2428,7 +2438,6 @@ static void destroyStreamScanOperatorInfo(void* param) { } cleanupExprSupp(&pStreamScan->tbnameCalSup); - taosHashCleanup(pStreamScan->pGroupIdTbNameMap); updateInfoDestroy(pStreamScan->pUpdateInfo); blockDataDestroy(pStreamScan->pRes); @@ -2486,8 +2495,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) { goto _error; } - pInfo->pGroupIdTbNameMap = - taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); } if (pTableScanNode->pTags != NULL) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 582181b891..a5eab8735e 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1450,7 +1450,6 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp } } tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter); - /*taosHashRemove(pInfo->pGroupIdTbNameMap, &pWinKey->groupId, sizeof(int64_t));*/ } } return TSDB_CODE_SUCCESS; @@ -1547,7 +1546,8 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin uint64_t uid = 0; for (int32_t i = *index; i < size; i++) { SWinKey* pWin = taosArrayGet(pWins, i); - char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pWin->groupId, sizeof(int64_t)); + void* tbname = NULL; + streamStateGetParName(pInfo->pState, pWin->groupId, &tbname); if (tbname == NULL) { appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL); } else { @@ -1555,6 +1555,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName); } + tdbFree(tbname); (*index)++; } } @@ -1617,7 +1618,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) { nodesDestroyNode((SNode*)pInfo->pPhyNode); colDataDestroy(&pInfo->twAggSup.timeWindowData); cleanupGroupResInfo(&pInfo->groupResInfo); - taosHashCleanup(pInfo->pGroupIdTbNameMap); taosMemoryFreeClear(param); } @@ -3153,11 +3153,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv"); - if (pBlock->info.parTbName[0]) { - taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, - TSDB_TABLE_NAME_LEN); - } - ASSERT(pBlock->info.type != STREAM_INVERT); if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; @@ -3372,9 +3367,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; - pInfo->pGroupIdTbNameMap = - taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); - pOperator->operatorType = pPhyNode->type; pOperator->blocking = true; pOperator->status = OP_NOT_OPENED; @@ -3425,7 +3417,6 @@ void destroyStreamSessionAggOperatorInfo(void* param) { blockDataDestroy(pInfo->pWinBlock); blockDataDestroy(pInfo->pUpdateRes); tSimpleHashCleanup(pInfo->pStDeleted); - taosHashCleanup(pInfo->pGroupIdTbNameMap); taosMemoryFreeClear(param); } @@ -3857,30 +3848,18 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); colDataAppendNULL(pCalEdCol, pBlock->info.rows); - SHashObj* pGroupIdTbNameMap = NULL; - if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || - pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION || - pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) { - SStreamSessionAggOperatorInfo* pInfo = pOp->info; - pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap; - } else if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) { - SStreamStateAggOperatorInfo* pInfo = pOp->info; - pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap; - } else { - ASSERT(0); - } - - char* tbname = taosHashGet(pGroupIdTbNameMap, &res->groupId, sizeof(int64_t)); SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX); + + void* tbname = NULL; + streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname); if (tbname == NULL) { - /*printf("\n\n no tbname for group id %" PRId64 "%p %p\n\n", res->groupId, pOp->info, pGroupIdTbNameMap);*/ colDataAppendNULL(pTableCol, pBlock->info.rows); } else { char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN]; STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName)); colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false); - /*printf("\n\n get tbname %s group id %" PRId64 "\n\n", tbname, res->groupId);*/ } + tdbFree(tbname); pBlock->info.rows += 1; } if ((*Ite) == NULL) { @@ -4053,19 +4032,6 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv"); - if (pBlock->info.parTbName[0]) { - taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, - TSDB_TABLE_NAME_LEN); - /*printf("\n\n put tbname %s group id %" PRId64 "\n\n into %p %p", pBlock->info.parTbName, pBlock->info.groupId,*/ - /*pInfo, pInfo->pGroupIdTbNameMap);*/ - } - - if (pBlock->info.parTbName[0]) { - taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, - TSDB_TABLE_NAME_LEN); - /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ - } - if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); @@ -4209,8 +4175,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh pInfo->isFinal = false; pInfo->pPhyNode = pPhyNode; pInfo->ignoreExpiredData = pSessionNode->window.igExpired; - pInfo->pGroupIdTbNameMap = - taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -4285,12 +4249,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, "semi session recv"); - if (pBlock->info.parTbName[0]) { - taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, - TSDB_TABLE_NAME_LEN); - /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ - } - if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { // gap must be 0 @@ -4418,7 +4376,6 @@ void destroyStreamStateOperatorInfo(void* param) { colDataDestroy(&pInfo->twAggSup.timeWindowData); blockDataDestroy(pInfo->pDelRes); tSimpleHashCleanup(pInfo->pSeDeleted); - taosHashCleanup(pInfo->pGroupIdTbNameMap); taosMemoryFreeClear(param); } @@ -4612,12 +4569,6 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, "single state recv"); - if (pBlock->info.parTbName[0]) { - taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, - TSDB_TABLE_NAME_LEN); - /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ - } - if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { SArray* pWins = taosArrayInit(16, sizeof(SSessionKey)); @@ -4731,9 +4682,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->pChildren = NULL; pInfo->ignoreExpiredData = pStateNode->window.igExpired; - pInfo->pGroupIdTbNameMap = - taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); - setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = @@ -5384,12 +5332,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } printDataBlock(pBlock, "single interval recv"); - if (pBlock->info.parTbName[0]) { - taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName, - TSDB_TABLE_NAME_LEN); - /*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/ - } - if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap); @@ -5547,9 +5489,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->delKey.ts = INT64_MAX; pInfo->delKey.groupId = 0; - pInfo->pGroupIdTbNameMap = - taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK); - setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index aefe30116b..63ec0caa95 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -16,6 +16,7 @@ #include "executor.h" #include "streamInc.h" #include "tcommon.h" +#include "tcompare.h" #include "ttimer.h" // todo refactor @@ -144,6 +145,11 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int goto _err; } + if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->db, &pState->pParNameDb, 0) < + 0) { + goto _err; + } + if (streamStateBegin(pState) < 0) { goto _err; } @@ -157,6 +163,7 @@ _err: tdbTbClose(pState->pFuncStateDb); tdbTbClose(pState->pFillStateDb); tdbTbClose(pState->pSessionStateDb); + tdbTbClose(pState->pParNameDb); tdbClose(pState->db); taosMemoryFree(pState); return NULL; @@ -169,6 +176,7 @@ void streamStateClose(SStreamState* pState) { tdbTbClose(pState->pFuncStateDb); tdbTbClose(pState->pFillStateDb); tdbTbClose(pState->pSessionStateDb); + tdbTbClose(pState->pParNameDb); tdbClose(pState->db); taosMemoryFree(pState); @@ -812,6 +820,16 @@ _end: return res; } +int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) { + tdbTbUpsert(pState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, &pState->txn); + return 0; +} + +int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) { + int32_t len; + return tdbTbGet(pState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len); +} + #if 0 char* streamStateSessionDump(SStreamState* pState) { SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));