refactor(stream): state/session map id to tbname
This commit is contained in:
parent
fbfae317b3
commit
26fae13c61
|
@ -117,9 +117,13 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
|
||||||
SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
|
SArray *pArray = taosArrayInit(innerSz, sizeof(void *));
|
||||||
for (int32_t j = 0; j < innerSz; j++) {
|
for (int32_t j = 0; j < innerSz; j++) {
|
||||||
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
SStreamTask *pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
|
||||||
if (pTask == NULL) return -1;
|
if (pTask == NULL) {
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
if (tDecodeSStreamTask(pDecoder, pTask) < 0) {
|
if (tDecodeSStreamTask(pDecoder, pTask) < 0) {
|
||||||
taosMemoryFree(pTask);
|
taosMemoryFree(pTask);
|
||||||
|
taosArrayDestroy(pArray);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosArrayPush(pArray, &pTask);
|
taosArrayPush(pArray, &pTask);
|
||||||
|
|
|
@ -180,6 +180,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
tdbFree(pKey);
|
||||||
tdbTbcClose(pCur);
|
tdbTbcClose(pCur);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -54,7 +54,7 @@ int32_t tqOffsetSnapRead(STqOffsetReader* pReader, uint8_t** ppData) {
|
||||||
|
|
||||||
char* fname = tqOffsetBuildFName(pReader->pTq->path, 0);
|
char* fname = tqOffsetBuildFName(pReader->pTq->path, 0);
|
||||||
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
|
TdFilePtr pFile = taosOpenFile(fname, TD_FILE_READ);
|
||||||
if (pFile != NULL) {
|
if (pFile == NULL) {
|
||||||
taosMemoryFree(fname);
|
taosMemoryFree(fname);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -767,6 +767,7 @@ typedef struct SStreamSessionAggOperatorInfo {
|
||||||
SPhysiNode* pPhyNode; // create new child
|
SPhysiNode* pPhyNode; // create new child
|
||||||
bool isFinal;
|
bool isFinal;
|
||||||
bool ignoreExpiredData;
|
bool ignoreExpiredData;
|
||||||
|
SHashObj* pGroupIdTbNameMap;
|
||||||
} SStreamSessionAggOperatorInfo;
|
} SStreamSessionAggOperatorInfo;
|
||||||
|
|
||||||
typedef struct SStreamPartitionOperatorInfo {
|
typedef struct SStreamPartitionOperatorInfo {
|
||||||
|
@ -844,6 +845,7 @@ typedef struct SStreamStateAggOperatorInfo {
|
||||||
void* pDelIterator;
|
void* pDelIterator;
|
||||||
SArray* pChildren; // cache for children's result;
|
SArray* pChildren; // cache for children's result;
|
||||||
bool ignoreExpiredData;
|
bool ignoreExpiredData;
|
||||||
|
SHashObj* pGroupIdTbNameMap;
|
||||||
} SStreamStateAggOperatorInfo;
|
} SStreamStateAggOperatorInfo;
|
||||||
|
|
||||||
typedef struct SSortOperatorInfo {
|
typedef struct SSortOperatorInfo {
|
||||||
|
@ -898,8 +900,12 @@ void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs);
|
||||||
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize,
|
||||||
const char* pkey);
|
const char* pkey);
|
||||||
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
|
void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows);
|
||||||
|
|
||||||
|
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
|
SDiskbasedBuf* pBuf);
|
||||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
SDiskbasedBuf* pBuf);
|
SDiskbasedBuf* pBuf);
|
||||||
|
|
||||||
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
|
int32_t handleLimitOffset(SOperatorInfo* pOperator, SLimitInfo* pLimitInfo, SSDataBlock* pBlock, bool holdDataInBuf);
|
||||||
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo);
|
||||||
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
|
void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo);
|
||||||
|
|
|
@ -1389,6 +1389,46 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
|
SDiskbasedBuf* pBuf) {
|
||||||
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
SSDataBlock* pBlock = pbInfo->pRes;
|
||||||
|
|
||||||
|
// set output datablock version
|
||||||
|
pBlock->info.version = pTaskInfo->version;
|
||||||
|
|
||||||
|
blockDataCleanup(pBlock);
|
||||||
|
if (!hasRemainResults(pGroupResInfo)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// clear the existed group id
|
||||||
|
pBlock->info.groupId = 0;
|
||||||
|
ASSERT(!pbInfo->mergeResultBlock);
|
||||||
|
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo);
|
||||||
|
if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) {
|
||||||
|
SStreamStateAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||||
|
if (tbname != NULL) {
|
||||||
|
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||||
|
} else {
|
||||||
|
pBlock->info.parTbName[0] = 0;
|
||||||
|
}
|
||||||
|
} else if (pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION ||
|
||||||
|
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION ||
|
||||||
|
pOperator->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) {
|
||||||
|
SStreamSessionAggOperatorInfo* pInfo = pOperator->info;
|
||||||
|
|
||||||
|
char* tbname = taosHashGet(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t));
|
||||||
|
if (tbname != NULL) {
|
||||||
|
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
|
||||||
|
} else {
|
||||||
|
pBlock->info.parTbName[0] = 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo,
|
||||||
SDiskbasedBuf* pBuf) {
|
SDiskbasedBuf* pBuf) {
|
||||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||||
|
|
|
@ -3712,6 +3712,9 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh
|
||||||
pInfo->pPhyNode = pPhyNode;
|
pInfo->pPhyNode = pPhyNode;
|
||||||
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
|
pInfo->ignoreExpiredData = pSessionNode->window.igExpired;
|
||||||
|
|
||||||
|
pInfo->pGroupIdTbNameMap =
|
||||||
|
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
pOperator->name = "StreamSessionWindowAggOperator";
|
pOperator->name = "StreamSessionWindowAggOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION;
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
|
@ -4335,7 +4338,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildStreamResBlock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -4354,6 +4357,12 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv");
|
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "final session recv" : "single session recv");
|
||||||
|
|
||||||
|
if (pBlock->info.parTbName[0]) {
|
||||||
|
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
|
||||||
|
TSDB_TABLE_NAME_LEN);
|
||||||
|
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
|
||||||
|
}
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_CLEAR) {
|
if (pBlock->info.type == STREAM_CLEAR) {
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX,
|
doClearSessionWindows(&pInfo->streamAggSup, &pOperator->exprSupp, pBlock, START_TS_COLUMN_INDEX,
|
||||||
|
@ -4435,7 +4444,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
printDataBlock(pInfo->pDelRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildStreamResBlock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
printDataBlock(pBInfo->pRes, IS_FINAL_OP(pInfo) ? "final session" : "single session");
|
||||||
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
@ -4466,7 +4475,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildStreamResBlock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pBInfo->pRes->info.rows > 0) {
|
if (pBInfo->pRes->info.rows > 0) {
|
||||||
printDataBlock(pBInfo->pRes, "semi session");
|
printDataBlock(pBInfo->pRes, "semi session");
|
||||||
return pBInfo->pRes;
|
return pBInfo->pRes;
|
||||||
|
@ -4507,6 +4516,12 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
printDataBlock(pBlock, "semi session recv");
|
printDataBlock(pBlock, "semi session recv");
|
||||||
|
|
||||||
|
if (pBlock->info.parTbName[0]) {
|
||||||
|
taosHashPut(pInfo->pGroupIdTbNameMap, &pBlock->info.groupId, sizeof(int64_t), &pBlock->info.parTbName,
|
||||||
|
TSDB_TABLE_NAME_LEN);
|
||||||
|
/*printf("\n\n put tbname %s\n\n", pBlock->info.parTbName);*/
|
||||||
|
}
|
||||||
|
|
||||||
if (pBlock->info.type == STREAM_CLEAR) {
|
if (pBlock->info.type == STREAM_CLEAR) {
|
||||||
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
SArray* pWins = taosArrayInit(16, sizeof(SResultWindowInfo));
|
||||||
doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, START_TS_COLUMN_INDEX, pSup->numOfExprs, 0, pWins);
|
doClearSessionWindows(&pInfo->streamAggSup, pSup, pBlock, START_TS_COLUMN_INDEX, pSup->numOfExprs, 0, pWins);
|
||||||
|
@ -4550,7 +4565,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) {
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
|
||||||
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
blockDataEnsureCapacity(pBInfo->pRes, pOperator->resultInfo.capacity);
|
||||||
|
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildStreamResBlock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pBInfo->pRes->info.rows > 0) {
|
if (pBInfo->pRes->info.rows > 0) {
|
||||||
printDataBlock(pBInfo->pRes, "semi session");
|
printDataBlock(pBInfo->pRes, "semi session");
|
||||||
return pBInfo->pRes;
|
return pBInfo->pRes;
|
||||||
|
@ -4594,6 +4609,10 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
|
createOperatorFpSet(operatorDummyOpenFn, doStreamSessionSemiAgg, NULL, NULL,
|
||||||
destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
destroyStreamSessionAggOperatorInfo, aggEncodeResultRow, aggDecodeResultRow, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pInfo->pGroupIdTbNameMap =
|
||||||
|
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
pOperator->operatorType = pPhyNode->type;
|
pOperator->operatorType = pPhyNode->type;
|
||||||
if (numOfChild > 0) {
|
if (numOfChild > 0) {
|
||||||
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
|
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
|
||||||
|
@ -4894,7 +4913,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pDelRes, "single state");
|
printDataBlock(pInfo->pDelRes, "single state");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
doBuildResultDatablock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildStreamResBlock(pOperator, pBInfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
|
if (pBInfo->pRes->info.rows == 0 || !hasRemainResults(&pInfo->groupResInfo)) {
|
||||||
doSetOperatorCompleted(pOperator);
|
doSetOperatorCompleted(pOperator);
|
||||||
}
|
}
|
||||||
|
@ -4956,7 +4975,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
|
||||||
printDataBlock(pInfo->pDelRes, "single state");
|
printDataBlock(pInfo->pDelRes, "single state");
|
||||||
return pInfo->pDelRes;
|
return pInfo->pDelRes;
|
||||||
}
|
}
|
||||||
doBuildResultDatablock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
doBuildStreamResBlock(pOperator, &pInfo->binfo, &pInfo->groupResInfo, pInfo->streamAggSup.pResultBuf);
|
||||||
printDataBlock(pBInfo->pRes, "single state");
|
printDataBlock(pBInfo->pRes, "single state");
|
||||||
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes;
|
||||||
}
|
}
|
||||||
|
@ -5029,6 +5048,9 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys
|
||||||
pInfo->pChildren = NULL;
|
pInfo->pChildren = NULL;
|
||||||
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
|
pInfo->ignoreExpiredData = pStateNode->window.igExpired;
|
||||||
|
|
||||||
|
pInfo->pGroupIdTbNameMap =
|
||||||
|
taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_NO_LOCK);
|
||||||
|
|
||||||
pOperator->name = "StreamStateAggOperator";
|
pOperator->name = "StreamStateAggOperator";
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE;
|
||||||
pOperator->blocking = true;
|
pOperator->blocking = true;
|
||||||
|
|
|
@ -150,7 +150,6 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
const char* idxPattern = "^[0-9]+.idx$";
|
const char* idxPattern = "^[0-9]+.idx$";
|
||||||
regex_t logRegPattern;
|
regex_t logRegPattern;
|
||||||
regex_t idxRegPattern;
|
regex_t idxRegPattern;
|
||||||
SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
|
|
||||||
|
|
||||||
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
|
regcomp(&logRegPattern, logPattern, REG_EXTENDED);
|
||||||
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
|
regcomp(&idxRegPattern, idxPattern, REG_EXTENDED);
|
||||||
|
@ -163,6 +162,8 @@ int walCheckAndRepairMeta(SWal* pWal) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SArray* actualLog = taosArrayInit(8, sizeof(SWalFileInfo));
|
||||||
|
|
||||||
// scan log files and build new meta
|
// scan log files and build new meta
|
||||||
TdDirEntryPtr pDirEntry;
|
TdDirEntryPtr pDirEntry;
|
||||||
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
|
while ((pDirEntry = taosReadDir(pDir)) != NULL) {
|
||||||
|
|
Loading…
Reference in New Issue