[td-11818] add table id for streamscan operator.

This commit is contained in:
Haojun Liao 2022-02-17 18:37:43 +08:00
parent d6fa853b88
commit c15c88dff7
4 changed files with 47 additions and 15 deletions

View File

@ -28,7 +28,7 @@ int32_t init_env() {
return -1; return -1;
} }
TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 1"); TAOS_RES* pRes = taos_query(pConn, "create database if not exists abc1 vgroups 2");
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in create db, reason:%s\n", taos_errstr(pRes)); printf("error in create db, reason:%s\n", taos_errstr(pRes));
return -1; return -1;
@ -56,14 +56,14 @@ int32_t init_env() {
} }
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(pConn, "create table if not exists tu2 using st1 tags(2)"); // pRes = taos_query(pConn, "create table if not exists tu6 using st1 tags(2)");
if (taos_errno(pRes) != 0) { // if (taos_errno(pRes) != 0) {
printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes)); // printf("failed to create child table tu2, reason:%s\n", taos_errstr(pRes));
return -1; // return -1;
} // }
taos_free_result(pRes); // taos_free_result(pRes);
const char* sql = "select * from tu1"; const char* sql = "select * from st1";
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql));
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes)); printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
@ -197,7 +197,7 @@ int main() {
code = init_env(); code = init_env();
tmq_t* tmq = build_consumer(); tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list(); tmq_list_t* topic_list = build_topic_list();
perf_loop(tmq, topic_list); // perf_loop(tmq, topic_list);
/*basic_consume_loop(tmq, topic_list);*/ basic_consume_loop(tmq, topic_list);
/*sync_consume_loop(tmq, topic_list);*/ /*sync_consume_loop(tmq, topic_list);*/
} }

View File

@ -31,6 +31,7 @@ typedef struct SReadHandle {
void* reader; void* reader;
void* meta; void* meta;
} SReadHandle; } SReadHandle;
/** /**
* Create the exec task for streaming mode * Create the exec task for streaming mode
* @param pMsg * @param pMsg
@ -40,13 +41,23 @@ typedef struct SReadHandle {
qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle); qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle);
/** /**
* * Set the input data block for the stream scan.
* @param tinfo * @param tinfo
* @param input * @param input
* @return * @return
*/ */
int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input); int32_t qSetStreamInput(qTaskInfo_t tinfo, const void* input);
/**
* Update the table id list, add or remove.
*
* @param tinfo
* @param id
* @param isAdd
* @return
*/
int32_t qUpdateTQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd);
/** /**
* Create the exec task object according to task json * Create the exec task object according to task json
* @param readHandle * @param readHandle

View File

@ -90,3 +90,25 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
return pTaskInfo; return pTaskInfo;
} }
int32_t qUpdateTQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo* )tinfo;
// traverse to the streamscan node to add this table id
SOperatorInfo* pInfo = pTaskInfo->pRoot;
while(pInfo->operatorType != OP_StreamScan) {
pInfo = pInfo->pDownstream[0];
}
SStreamBlockScanInfo* pScanInfo = pInfo->info;
if (isAdd) {
int32_t code = tqReadHandleSetTbUidList(pScanInfo->readerHandle, tableIdList);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
} else {
assert(0);
}
return TSDB_CODE_SUCCESS;
}

View File

@ -5441,7 +5441,6 @@ SOperatorInfo* createStreamScanOperatorInfo(void *streamReadHandle, SArray* pExp
SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t)); SArray* pColList = taosArrayInit(numOfOutput, sizeof(int32_t));
for(int32_t i = 0; i < numOfOutput; ++i) { for(int32_t i = 0; i < numOfOutput; ++i) {
SExprInfo* pExpr = taosArrayGetP(pExprInfo, i); SExprInfo* pExpr = taosArrayGetP(pExprInfo, i);
taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId); taosArrayPush(pColList, &pExpr->pExpr->pSchema[0].colId);
} }
@ -5624,7 +5623,7 @@ static SExprInfo* exprArrayDup(SArray* pExprInfo) {
return p; return p;
} }
static SSDataBlock* doSortMerge(void* param, bool* newgroup) { static SSDataBlock* doSortedMerge(void* param, bool* newgroup) {
SOperatorInfo* pOperator = (SOperatorInfo*) param; SOperatorInfo* pOperator = (SOperatorInfo*) param;
if (pOperator->status == OP_EXEC_DONE) { if (pOperator->status == OP_EXEC_DONE) {
return NULL; return NULL;
@ -5635,7 +5634,7 @@ static SSDataBlock* doSortMerge(void* param, bool* newgroup) {
for(int32_t i = 0; i < pInfo->numOfSources; ++i) { for(int32_t i = 0; i < pInfo->numOfSources; ++i) {
SSDataBlock* pBlock = pOperator->pDownstream[i]->exec(pOperator->pDownstream[i], newgroup); SSDataBlock* pBlock = pOperator->pDownstream[i]->exec(pOperator->pDownstream[i], newgroup);
// TODO set the order input sources.
} }
return NULL; return NULL;
@ -5679,7 +5678,7 @@ SOperatorInfo* createSortMergeOperatorInfo(SOperatorInfo* downstream, SArray* pE
pOperator->numOfOutput = numOfOutput; pOperator->numOfOutput = numOfOutput;
pOperator->pTaskInfo = pTaskInfo; pOperator->pTaskInfo = pTaskInfo;
pOperator->exec = doSortMerge; pOperator->exec = doSortedMerge;
pOperator->cleanupFn = destroyGlobalAggOperatorInfo; pOperator->cleanupFn = destroyGlobalAggOperatorInfo;
appendDownstream(pOperator, downstream); appendDownstream(pOperator, downstream);