Merge pull request #18266 from taosdata/feature/stream
enh(stream): put user defined tb name into stream state
This commit is contained in:
commit
a12cf7f91d
|
@ -35,6 +35,7 @@ typedef struct {
|
|||
TTB* pFuncStateDb;
|
||||
TTB* pFillStateDb; // todo refactor
|
||||
TTB* pSessionStateDb;
|
||||
TTB* pParNameDb;
|
||||
TXN txn;
|
||||
int32_t number;
|
||||
} SStreamState;
|
||||
|
@ -99,6 +100,9 @@ int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur);
|
|||
int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur);
|
||||
int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur);
|
||||
|
||||
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char* tbname);
|
||||
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal);
|
||||
|
||||
#if 0
|
||||
char* streamStateSessionDump(SStreamState* pState);
|
||||
#endif
|
||||
|
|
|
@ -398,6 +398,35 @@ bool tqNextDataBlock(STqReader* pReader) {
|
|||
return false;
|
||||
}
|
||||
|
||||
int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) {
|
||||
int32_t code;
|
||||
|
||||
int32_t cnt = 0;
|
||||
for (int32_t i = 0; i < pSrc->nCols; i++) {
|
||||
cnt += mask[i];
|
||||
}
|
||||
|
||||
pDst->nCols = cnt;
|
||||
pDst->pSchema = taosMemoryCalloc(cnt, sizeof(SSchema));
|
||||
if (pDst->pSchema == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t j = 0;
|
||||
for (int32_t i = 0; i < pSrc->nCols; i++) {
|
||||
if (mask[i]) {
|
||||
pDst->pSchema[j++] = pSrc->pSchema[i];
|
||||
SColumnInfoData colInfo =
|
||||
createColumnInfoData(pSrc->pSchema[i].type, pSrc->pSchema[i].bytes, pSrc->pSchema[i].colId);
|
||||
code = blockDataAppendColInfo(pBlock, &colInfo);
|
||||
if (code != 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool tqNextDataBlockFilterOut(STqReader* pHandle, SHashObj* filterOutUids) {
|
||||
while (1) {
|
||||
if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) {
|
||||
|
@ -527,6 +556,119 @@ FAIL:
|
|||
return -1;
|
||||
}
|
||||
|
||||
int32_t tqSplitRetrieveDataBlock(STqReader* pReader, SArray* blocks, SArray* schemas) {
|
||||
int32_t sversion = htonl(pReader->pBlock->sversion);
|
||||
|
||||
if (pReader->cachedSchemaSuid == 0 || pReader->cachedSchemaVer != sversion ||
|
||||
pReader->cachedSchemaSuid != pReader->msgIter.suid) {
|
||||
if (pReader->pSchema) taosMemoryFree(pReader->pSchema);
|
||||
pReader->pSchema = metaGetTbTSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
|
||||
if (pReader->pSchema == NULL) {
|
||||
tqWarn("cannot found tsschema for table: uid:%" PRId64 " (suid:%" PRId64 "), version %d, possibly dropped table",
|
||||
pReader->msgIter.uid, pReader->msgIter.suid, pReader->cachedSchemaVer);
|
||||
/*ASSERT(0);*/
|
||||
pReader->cachedSchemaSuid = 0;
|
||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pReader->pSchemaWrapper) tDeleteSSchemaWrapper(pReader->pSchemaWrapper);
|
||||
pReader->pSchemaWrapper = metaGetTableSchema(pReader->pVnodeMeta, pReader->msgIter.uid, sversion, 1);
|
||||
if (pReader->pSchemaWrapper == NULL) {
|
||||
tqWarn("cannot found schema wrapper for table: suid:%" PRId64 ", version %d, possibly dropped table",
|
||||
pReader->msgIter.uid, pReader->cachedSchemaVer);
|
||||
/*ASSERT(0);*/
|
||||
pReader->cachedSchemaSuid = 0;
|
||||
terrno = TSDB_CODE_TQ_TABLE_SCHEMA_NOT_FOUND;
|
||||
return -1;
|
||||
}
|
||||
pReader->cachedSchemaVer = sversion;
|
||||
pReader->cachedSchemaSuid = pReader->msgIter.suid;
|
||||
}
|
||||
|
||||
STSchema* pTschema = pReader->pSchema;
|
||||
SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper;
|
||||
|
||||
int32_t colAtMost = pSchemaWrapper->nCols;
|
||||
|
||||
int32_t curRow = 0;
|
||||
|
||||
char* assigned = taosMemoryCalloc(1, pSchemaWrapper->nCols);
|
||||
if (assigned) return -1;
|
||||
|
||||
tInitSubmitBlkIter(&pReader->msgIter, pReader->pBlock, &pReader->blkIter);
|
||||
STSRowIter iter = {0};
|
||||
tdSTSRowIterInit(&iter, pTschema);
|
||||
STSRow* row;
|
||||
|
||||
while ((row = tGetSubmitBlkNext(&pReader->blkIter)) != NULL) {
|
||||
bool buildNew = false;
|
||||
tdSTSRowIterReset(&iter, row);
|
||||
|
||||
for (int32_t i = 0; i < colAtMost; i++) {
|
||||
SCellVal sVal = {0};
|
||||
if (!tdSTSRowIterFetch(&iter, pSchemaWrapper->pSchema[i].colId, pSchemaWrapper->pSchema[i].type, &sVal)) {
|
||||
break;
|
||||
}
|
||||
if (curRow == 0) {
|
||||
assigned[i] = sVal.valType != TD_VTYPE_NONE;
|
||||
buildNew = true;
|
||||
} else {
|
||||
bool currentRowAssigned = sVal.valType != TD_VTYPE_NONE;
|
||||
if (currentRowAssigned != assigned[i]) {
|
||||
assigned[i] = currentRowAssigned;
|
||||
buildNew = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (buildNew) {
|
||||
SSDataBlock block;
|
||||
SSchemaWrapper sw;
|
||||
if (tqMaskBlock(&sw, &block, pSchemaWrapper, assigned) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
taosArrayPush(blocks, &block);
|
||||
taosArrayPush(schemas, &sw);
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = taosArrayGetLast(blocks);
|
||||
pBlock->info.uid = pReader->msgIter.uid;
|
||||
pBlock->info.rows = pReader->msgIter.numOfRows;
|
||||
pBlock->info.version = pReader->pMsg->version;
|
||||
|
||||
if (blockDataEnsureCapacity(pBlock, pReader->msgIter.numOfRows - curRow) < 0) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto FAIL;
|
||||
}
|
||||
|
||||
tdSTSRowIterInit(&iter, pTschema);
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pBlock->pDataBlock); i++) {
|
||||
SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
|
||||
SCellVal sVal = {0};
|
||||
|
||||
if (!tdSTSRowIterFetch(&iter, pColData->info.colId, pColData->info.type, &sVal)) {
|
||||
break;
|
||||
}
|
||||
|
||||
ASSERT(sVal.valType != TD_VTYPE_NONE);
|
||||
|
||||
if (colDataAppend(pColData, curRow, sVal.val, sVal.valType != TD_VTYPE_NORM) < 0) {
|
||||
goto FAIL;
|
||||
}
|
||||
}
|
||||
curRow++;
|
||||
}
|
||||
|
||||
taosMemoryFree(assigned);
|
||||
return 0;
|
||||
|
||||
FAIL:
|
||||
taosMemoryFree(assigned);
|
||||
return -1;
|
||||
}
|
||||
|
||||
void tqReaderSetColIdList(STqReader* pReader, SArray* pColIdList) { pReader->pColIdList = pColIdList; }
|
||||
|
||||
int tqReaderSetTbUidList(STqReader* pReader, const SArray* tbUidList) {
|
||||
|
|
|
@ -504,7 +504,6 @@ typedef struct SStreamScanInfo {
|
|||
STimeWindow updateWin;
|
||||
STimeWindowAggSupp twAggSup;
|
||||
SSDataBlock* pUpdateDataRes;
|
||||
SHashObj* pGroupIdTbNameMap;
|
||||
// status for tmq
|
||||
SNodeList* pGroupTags;
|
||||
SNode* pTagCond;
|
||||
|
@ -612,7 +611,6 @@ typedef struct SStreamIntervalOperatorInfo {
|
|||
SArray* pChildren;
|
||||
SStreamState* pState;
|
||||
SWinKey delKey;
|
||||
SHashObj* pGroupIdTbNameMap; // uint64_t -> char[TSDB_TABLE_NAME_LEN]
|
||||
} SStreamIntervalOperatorInfo;
|
||||
|
||||
typedef struct SAggOperatorInfo {
|
||||
|
@ -745,7 +743,6 @@ typedef struct SStreamSessionAggOperatorInfo {
|
|||
SPhysiNode* pPhyNode; // create new child
|
||||
bool isFinal;
|
||||
bool ignoreExpiredData;
|
||||
SHashObj* pGroupIdTbNameMap;
|
||||
} SStreamSessionAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamStateAggOperatorInfo {
|
||||
|
@ -761,7 +758,6 @@ typedef struct SStreamStateAggOperatorInfo {
|
|||
void* pDelIterator;
|
||||
SArray* pChildren; // cache for children's result;
|
||||
bool ignoreExpiredData;
|
||||
SHashObj* pGroupIdTbNameMap;
|
||||
} SStreamStateAggOperatorInfo;
|
||||
|
||||
typedef struct SStreamPartitionOperatorInfo {
|
||||
|
|
|
@ -1336,27 +1336,14 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr
|
|||
pBlock->info.groupId = 0;
|
||||
ASSERT(!pbInfo->mergeResultBlock);
|
||||
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
|
||||
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||
if (tbname != NULL) {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
} else {
|
||||
void* tbname = NULL;
|
||||
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
}
|
||||
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||
if (tbname != NULL) {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
} else {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
}
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
tdbFree(tbname);
|
||||
}
|
||||
|
||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||
|
@ -2372,9 +2359,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
|
|||
pInfo->binfo.mergeResultBlock = pAggNode->mergeDataBlock;
|
||||
pInfo->groupId = UINT64_MAX;
|
||||
|
||||
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
pOperator->fpSet =
|
||||
createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL);
|
||||
setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo,
|
||||
pTaskInfo);
|
||||
pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, NULL);
|
||||
|
||||
if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
|
||||
STableScanInfo* pTableScanInfo = downstream->info;
|
||||
|
@ -3351,13 +3338,13 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pStat
|
|||
|
||||
if (pBlock->info.groupId == 0) {
|
||||
pBlock->info.groupId = pPos->groupId;
|
||||
SStreamIntervalOperatorInfo* pInfo = pOperator->info;
|
||||
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||
if (tbname != NULL) {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
} else {
|
||||
void* tbname = NULL;
|
||||
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
} else {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
tdbFree(tbname);
|
||||
} else {
|
||||
// current value belongs to different group, it can't be packed into one datablock
|
||||
if (pBlock->info.groupId != pPos->groupId) {
|
||||
|
@ -3443,30 +3430,13 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
|
|||
if (pBlock->info.groupId == 0) {
|
||||
pBlock->info.groupId = pKey->groupId;
|
||||
|
||||
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
|
||||
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||
if (tbname != NULL) {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
} else {
|
||||
void* tbname = NULL;
|
||||
if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
}
|
||||
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
|
||||
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||
|
||||
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||
if (tbname != NULL) {
|
||||
} else {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
} else {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
}
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
tdbFree(tbname);
|
||||
} else {
|
||||
// current value belongs to different group, it can't be packed into one datablock
|
||||
if (pBlock->info.groupId != pKey->groupId) {
|
||||
|
|
|
@ -560,7 +560,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
|
||||
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid,
|
||||
tstrerror(terrno), idStr);
|
||||
} else {
|
||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
|
||||
}
|
||||
|
@ -583,9 +584,11 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
|||
code = metaGetTableEntryByUid(&mr, pBlock->info.uid);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
if (terrno == TSDB_CODE_PAR_TABLE_NOT_EXIST) {
|
||||
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
|
||||
qWarn("failed to get table meta, table may have been dropped, uid:0x%" PRIx64 ", code:%s, %s",
|
||||
pBlock->info.uid, tstrerror(terrno), idStr);
|
||||
} else {
|
||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno), idStr);
|
||||
qError("failed to get table meta, uid:0x%" PRIx64 ", code:%s, %s", pBlock->info.uid, tstrerror(terrno),
|
||||
idStr);
|
||||
}
|
||||
metaReaderClear(&mr);
|
||||
return terrno;
|
||||
|
@ -1539,7 +1542,9 @@ static int32_t generateDeleteResultBlock(SStreamScanInfo* pInfo, SSDataBlock* pS
|
|||
groupId = getGroupIdByData(pInfo, srcUid, srcStartTsCol[i], version);
|
||||
}
|
||||
if (pInfo->tbnameCalSup.pExprInfo) {
|
||||
char* parTbname = taosHashGet(pInfo->pGroupIdTbNameMap, &groupId, sizeof(int64_t));
|
||||
void* parTbname = NULL;
|
||||
streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, groupId, &parTbname);
|
||||
|
||||
memcpy(varDataVal(tbname), parTbname, TSDB_TABLE_NAME_LEN);
|
||||
varDataSetLen(tbname, strlen(varDataVal(tbname)));
|
||||
}
|
||||
|
@ -1590,14 +1595,17 @@ static void calBlockTag(SExprSupp* pTagCalSup, SSDataBlock* pBlock, SSDataBlock*
|
|||
|
||||
void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
||||
SExprSupp* pTbNameCalSup = &pInfo->tbnameCalSup;
|
||||
SStreamState* pState = pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState;
|
||||
if (pTbNameCalSup == NULL || pTbNameCalSup->numOfExprs == 0) return;
|
||||
if (pBlock == NULL || pBlock->info.rows == 0) return;
|
||||
if (pBlock->info.groupId) {
|
||||
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||
if (tbname != NULL) {
|
||||
|
||||
void* tbname = NULL;
|
||||
if (streamStateGetParName(pInfo->pStreamScanOp->pTaskInfo->streamInfo.pState, pBlock->info.groupId, &tbname) < 0) {
|
||||
pBlock->info.parTbName[0] = 0;
|
||||
} else {
|
||||
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
}
|
||||
tdbFree(tbname);
|
||||
|
||||
SSDataBlock* pSrcBlock = blockCopyOneRow(pBlock, 0);
|
||||
ASSERT(pSrcBlock->info.rows == 1);
|
||||
|
@ -1625,9 +1633,8 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock) {
|
|||
pBlock->info.parTbName[0] = 0;
|
||||
}
|
||||
|
||||
if (pBlock->info.groupId) {
|
||||
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), pBlock->info.parTbName,
|
||||
TSDB_TABLE_NAME_LEN);
|
||||
if (pBlock->info.groupId && pBlock->info.parTbName[0]) {
|
||||
streamStatePutParName(pState, pBlock->info.groupId, pBlock->info.parTbName);
|
||||
}
|
||||
|
||||
blockDataDestroy(pSrcBlock);
|
||||
|
@ -2030,6 +2037,9 @@ FETCH_NEXT_BLOCK:
|
|||
|
||||
int32_t current = pInfo->validBlockIndex++;
|
||||
SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current);
|
||||
if (pBlock->info.groupId && pBlock->info.parTbName[0]) {
|
||||
streamStatePutParName(pTaskInfo->streamInfo.pState, pBlock->info.groupId, pBlock->info.parTbName);
|
||||
}
|
||||
// TODO move into scan
|
||||
pBlock->info.calWin.skey = INT64_MIN;
|
||||
pBlock->info.calWin.ekey = INT64_MAX;
|
||||
|
@ -2428,7 +2438,6 @@ static void destroyStreamScanOperatorInfo(void* param) {
|
|||
}
|
||||
|
||||
cleanupExprSupp(&pStreamScan->tbnameCalSup);
|
||||
taosHashCleanup(pStreamScan->pGroupIdTbNameMap);
|
||||
|
||||
updateInfoDestroy(pStreamScan->pUpdateInfo);
|
||||
blockDataDestroy(pStreamScan->pRes);
|
||||
|
@ -2486,8 +2495,6 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys
|
|||
if (initExprSupp(&pInfo->tbnameCalSup, pSubTableExpr, 1) != 0) {
|
||||
goto _error;
|
||||
}
|
||||
pInfo->pGroupIdTbNameMap =
|
||||
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||
}
|
||||
|
||||
if (pTableScanNode->pTags != NULL) {
|
||||
|
|
|
@ -1450,7 +1450,6 @@ static int32_t closeStreamIntervalWindow(SSHashObj* pHashMap, STimeWindowAggSupp
|
|||
}
|
||||
}
|
||||
tSimpleHashIterateRemove(pHashMap, pWinKey, sizeof(SWinKey), &pIte, &iter);
|
||||
/*taosHashRemove(pInfo->pGroupIdTbNameMap, &pWinKey->groupId, sizeof(int64_t));*/
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1547,7 +1546,8 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
|
|||
uint64_t uid = 0;
|
||||
for (int32_t i = *index; i < size; i++) {
|
||||
SWinKey* pWin = taosArrayGet(pWins, i);
|
||||
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pWin->groupId, sizeof(int64_t));
|
||||
void* tbname = NULL;
|
||||
streamStateGetParName(pInfo->pState, pWin->groupId, &tbname);
|
||||
if (tbname == NULL) {
|
||||
appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, NULL);
|
||||
} else {
|
||||
|
@ -1555,6 +1555,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin
|
|||
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
|
||||
appendOneRowToStreamSpecialBlock(pBlock, &pWin->ts, &pWin->ts, &uid, &pWin->groupId, parTbName);
|
||||
}
|
||||
tdbFree(tbname);
|
||||
(*index)++;
|
||||
}
|
||||
}
|
||||
|
@ -1617,7 +1618,6 @@ void destroyStreamFinalIntervalOperatorInfo(void* param) {
|
|||
nodesDestroyNode((SNode*)pInfo->pPhyNode);
|
||||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
cleanupGroupResInfo(&pInfo->groupResInfo);
|
||||
taosHashCleanup(pInfo->pGroupIdTbNameMap);
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
@ -3153,11 +3153,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
|
||||
|
||||
if (pBlock->info.parTbName[0]) {
|
||||
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
|
||||
TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
|
||||
ASSERT(pBlock->info.type != STREAM_INVERT);
|
||||
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
|
||||
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
||||
|
@ -3372,9 +3367,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
pInfo->delKey.ts = INT64_MAX;
|
||||
pInfo->delKey.groupId = 0;
|
||||
|
||||
pInfo->pGroupIdTbNameMap =
|
||||
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||
|
||||
pOperator->operatorType = pPhyNode->type;
|
||||
pOperator->blocking = true;
|
||||
pOperator->status = OP_NOT_OPENED;
|
||||
|
@ -3425,7 +3417,6 @@ void destroyStreamSessionAggOperatorInfo(void* param) {
|
|||
blockDataDestroy(pInfo->pWinBlock);
|
||||
blockDataDestroy(pInfo->pUpdateRes);
|
||||
tSimpleHashCleanup(pInfo->pStDeleted);
|
||||
taosHashCleanup(pInfo->pGroupIdTbNameMap);
|
||||
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
@ -3857,30 +3848,18 @@ void doBuildDeleteDataBlock(SOperatorInfo* pOp, SSHashObj* pStDeleted, SSDataBlo
|
|||
SColumnInfoData* pCalEdCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
|
||||
colDataAppendNULL(pCalEdCol, pBlock->info.rows);
|
||||
|
||||
SHashObj* pGroupIdTbNameMap = NULL;
|
||||
if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
||||
pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
|
||||
pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||
SStreamSessionAggOperatorInfo* pInfo = pOp->info;
|
||||
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
|
||||
} else if (pOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
|
||||
SStreamStateAggOperatorInfo* pInfo = pOp->info;
|
||||
pGroupIdTbNameMap = pInfo->pGroupIdTbNameMap;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
char* tbname = taosHashGet(pGroupIdTbNameMap, &res->groupId, sizeof(int64_t));
|
||||
SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
|
||||
|
||||
void* tbname = NULL;
|
||||
streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, res->groupId, &tbname);
|
||||
if (tbname == NULL) {
|
||||
/*printf("\n\n no tbname for group id %" PRId64 "%p %p\n\n", res->groupId, pOp->info, pGroupIdTbNameMap);*/
|
||||
colDataAppendNULL(pTableCol, pBlock->info.rows);
|
||||
} else {
|
||||
char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
|
||||
STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
|
||||
colDataAppend(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
|
||||
/*printf("\n\n get tbname %s group id %" PRId64 "\n\n", tbname, res->groupId);*/
|
||||
}
|
||||
tdbFree(tbname);
|
||||
pBlock->info.rows += 1;
|
||||
}
|
||||
if ((*Ite) == NULL) {
|
||||
|
@ -4053,19 +4032,6 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv");
|
||||
|
||||
if (pBlock->info.parTbName[0]) {
|
||||
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
|
||||
TSDB_TABLE_NAME_LEN);
|
||||
/*printf("\n\n put tbname %s group id %" PRId64 "\n\n into %p %p", pBlock->info.parTbName, pBlock->info.groupId,*/
|
||||
/*pInfo, pInfo->pGroupIdTbNameMap);*/
|
||||
}
|
||||
|
||||
if (pBlock->info.parTbName[0]) {
|
||||
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
|
||||
TSDB_TABLE_NAME_LEN);
|
||||
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
|
||||
}
|
||||
|
||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||
|
@ -4209,8 +4175,6 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
|||
pInfo->isFinal = false;
|
||||
pInfo->pPhyNode = pPhyNode;
|
||||
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
|
||||
pInfo->pGroupIdTbNameMap =
|
||||
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||
|
||||
setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true,
|
||||
OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||
|
@ -4285,12 +4249,6 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
printDataBlock(pBlock, "semi session recv");
|
||||
|
||||
if (pBlock->info.parTbName[0]) {
|
||||
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
|
||||
TSDB_TABLE_NAME_LEN);
|
||||
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
|
||||
}
|
||||
|
||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
// gap must be 0
|
||||
|
@ -4418,7 +4376,6 @@ void destroyStreamStateOperatorInfo(void* param) {
|
|||
colDataDestroy(&pInfo->twAggSup.timeWindowData);
|
||||
blockDataDestroy(pInfo->pDelRes);
|
||||
tSimpleHashCleanup(pInfo->pSeDeleted);
|
||||
taosHashCleanup(pInfo->pGroupIdTbNameMap);
|
||||
taosMemoryFreeClear(param);
|
||||
}
|
||||
|
||||
|
@ -4612,12 +4569,6 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
printDataBlock(pBlock, "single state recv");
|
||||
|
||||
if (pBlock->info.parTbName[0]) {
|
||||
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
|
||||
TSDB_TABLE_NAME_LEN);
|
||||
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
|
||||
}
|
||||
|
||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
SArray* pWins = taosArrayInit(16, sizeof(SSessionKey));
|
||||
|
@ -4731,9 +4682,6 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->pChildren = NULL;
|
||||
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
|
||||
|
||||
pInfo->pGroupIdTbNameMap =
|
||||
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||
|
||||
setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
pOperator->fpSet =
|
||||
|
@ -5384,12 +5332,6 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
printDataBlock(pBlock, "single interval recv");
|
||||
|
||||
if (pBlock->info.parTbName[0]) {
|
||||
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
|
||||
TSDB_TABLE_NAME_LEN);
|
||||
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
|
||||
}
|
||||
|
||||
if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
pBlock->info.type == STREAM_CLEAR) {
|
||||
doDeleteWindows(pOperator, &pInfo->interval, pBlock, pInfo->pDelWins, pUpdatedMap);
|
||||
|
@ -5547,9 +5489,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
pInfo->delKey.ts = INT64_MAX;
|
||||
pInfo->delKey.groupId = 0;
|
||||
|
||||
pInfo->pGroupIdTbNameMap =
|
||||
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||
|
||||
setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED,
|
||||
pInfo, pTaskInfo);
|
||||
pOperator->fpSet =
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "executor.h"
|
||||
#include "streamInc.h"
|
||||
#include "tcommon.h"
|
||||
#include "tcompare.h"
|
||||
#include "ttimer.h"
|
||||
|
||||
// todo refactor
|
||||
|
@ -144,6 +145,11 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int
|
|||
goto _err;
|
||||
}
|
||||
|
||||
if (tdbTbOpen("parname.state.db", sizeof(int64_t), TSDB_TABLE_NAME_LEN, NULL, pState->db, &pState->pParNameDb, 0) <
|
||||
0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (streamStateBegin(pState) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
@ -157,6 +163,7 @@ _err:
|
|||
tdbTbClose(pState->pFuncStateDb);
|
||||
tdbTbClose(pState->pFillStateDb);
|
||||
tdbTbClose(pState->pSessionStateDb);
|
||||
tdbTbClose(pState->pParNameDb);
|
||||
tdbClose(pState->db);
|
||||
taosMemoryFree(pState);
|
||||
return NULL;
|
||||
|
@ -169,6 +176,7 @@ void streamStateClose(SStreamState* pState) {
|
|||
tdbTbClose(pState->pFuncStateDb);
|
||||
tdbTbClose(pState->pFillStateDb);
|
||||
tdbTbClose(pState->pSessionStateDb);
|
||||
tdbTbClose(pState->pParNameDb);
|
||||
tdbClose(pState->db);
|
||||
|
||||
taosMemoryFree(pState);
|
||||
|
@ -812,6 +820,16 @@ _end:
|
|||
return res;
|
||||
}
|
||||
|
||||
int32_t streamStatePutParName(SStreamState* pState, int64_t groupId, const char tbname[TSDB_TABLE_NAME_LEN]) {
|
||||
tdbTbUpsert(pState->pParNameDb, &groupId, sizeof(int64_t), tbname, TSDB_TABLE_NAME_LEN, &pState->txn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t streamStateGetParName(SStreamState* pState, int64_t groupId, void** pVal) {
|
||||
int32_t len;
|
||||
return tdbTbGet(pState->pParNameDb, &groupId, sizeof(int64_t), pVal, &len);
|
||||
}
|
||||
|
||||
#if 0
|
||||
char* streamStateSessionDump(SStreamState* pState) {
|
||||
SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur));
|
||||
|
|
Loading…
Reference in New Issue