refactor(stream)

This commit is contained in:
Liu Jicong 2022-07-10 14:48:16 +08:00
parent bafa54778a
commit 5640036f66
5 changed files with 232 additions and 213 deletions

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include "cJSON.h"
#include "clientInt.h" #include "clientInt.h"
#include "clientLog.h" #include "clientLog.h"
#include "parser.h" #include "parser.h"
@ -23,7 +24,6 @@
#include "tqueue.h" #include "tqueue.h"
#include "tref.h" #include "tref.h"
#include "ttimer.h" #include "ttimer.h"
#include "cJSON.h"
int32_t tmqAskEp(tmq_t* tmq, bool async); int32_t tmqAskEp(tmq_t* tmq, bool async);
@ -953,6 +953,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
goto FAIL; goto FAIL;
} }
tscInfo("consumer %ld is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
return pTmq; return pTmq;
FAIL: FAIL:
@ -1194,10 +1196,10 @@ bool tmqUpdateEp2(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) {
for (int32_t j = 0; j < vgNumCur; j++) { for (int32_t j = 0; j < vgNumCur; j++) {
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
char buf[50]; char buf[80];
tFormatOffset(buf, 50, &pVgCur->currentOffsetNew); tFormatOffset(buf, 80, &pVgCur->currentOffsetNew);
tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch, pVgCur->vgId, tscDebug("consumer:%" PRId64 ", epoch %d vgId:%d vgKey is %s, offset is %s", tmq->consumerId, epoch,
vgKey, buf); pVgCur->vgId, vgKey, buf);
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal)); taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffsetNew, sizeof(STqOffsetVal));
} }
} }
@ -1564,7 +1566,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
if (vgStatus != TMQ_VG_STATUS__IDLE) { if (vgStatus != TMQ_VG_STATUS__IDLE) {
int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1);
tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); tscTrace("consumer:%" PRId64 ", epoch %d skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId,
vgSkipCnt);
continue; continue;
/*if (vgSkipCnt < 10000) continue;*/ /*if (vgSkipCnt < 10000) continue;*/
#if 0 #if 0
@ -1620,8 +1623,8 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
char offsetFormatBuf[80]; char offsetFormatBuf[80];
tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew); tFormatOffset(offsetFormatBuf, 80, &pVg->currentOffsetNew);
tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64, tmq->consumerId, tscDebug("consumer:%" PRId64 ", send poll to %s vgId:%d, epoch %d, req offset:%s, reqId:%" PRIu64,
pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId); tmq->consumerId, pTopic->topicName, pVg->vgId, tmq->epoch, offsetFormatBuf, pReq->reqId);
/*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/ /*printf("send vgId:%d %" PRId64 "\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
pVg->pollCnt++; pVg->pollCnt++;
@ -1669,7 +1672,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) { if (pollRspWrapper->dataRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/
pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset; pVg->currentOffsetNew = pollRspWrapper->dataRsp.rspOffset;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
if (pollRspWrapper->dataRsp.blockNum == 0) { if (pollRspWrapper->dataRsp.blockNum == 0) {
@ -1691,7 +1695,8 @@ void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
int32_t consumerEpoch = atomic_load_32(&tmq->epoch); int32_t consumerEpoch = atomic_load_32(&tmq->epoch);
if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) {
SMqClientVg* pVg = pollRspWrapper->vgHandle; SMqClientVg* pVg = pollRspWrapper->vgHandle;
/*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset, rspMsg->msg.rspOffset);*/ /*printf("vgId:%d offset %" PRId64 " up to %" PRId64 "\n", pVg->vgId, pVg->currentOffset,
* rspMsg->msg.rspOffset);*/
pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset; pVg->currentOffsetNew.version = pollRspWrapper->metaRsp.rspOffset;
pVg->currentOffsetNew.type = TMQ_OFFSET__LOG; pVg->currentOffsetNew.type = TMQ_OFFSET__LOG;
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
@ -1860,7 +1865,8 @@ tmq_raw_data *tmq_get_raw_meta(TAOS_RES* res) {
return NULL; return NULL;
} }
static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
int8_t t) {
char* string = NULL; char* string = NULL;
cJSON* json = cJSON_CreateObject(); cJSON* json = cJSON_CreateObject();
if (json == NULL) { if (json == NULL) {
@ -2049,9 +2055,11 @@ static char *processCreateTable(SMqMetaRsp *metaRsp){
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq; pCreateReq = req.pReqs + iReq;
if (pCreateReq->type == TSDB_CHILD_TABLE) { if (pCreateReq->type == TSDB_CHILD_TABLE) {
string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid); string =
buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid);
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) { } else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); string =
buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
} }
} }

View File

@ -89,8 +89,10 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
} }
} }
if (pOffset->type == TMQ_OFFSET__LOG) { if (pOffset->type == TMQ_OFFSET__LOG) {
continue;
} else {
rowCnt += pDataBlock->info.rows; rowCnt += pDataBlock->info.rows;
if (rowCnt >= 4096) break; if (rowCnt <= 4096) continue;
} }
continue; continue;
} }
@ -116,6 +118,7 @@ int64_t tqScan(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffset
return 0; return 0;
} }
#if 0
int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) { int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, STqOffsetVal offset, int32_t workerId) {
ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN); ASSERT(pExec->subType == TOPIC_SUB_TYPE__COLUMN);
qTaskInfo_t task = pExec->execCol.task[workerId]; qTaskInfo_t task = pExec->execCol.task[workerId];
@ -163,6 +166,7 @@ int32_t tqScanSnapshot(STQ* pTq, const STqExecHandle* pExec, SMqDataRsp* pRsp, S
return 0; return 0;
} }
#endif
int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) { int32_t tqLogScanExec(STQ* pTq, STqExecHandle* pExec, SSubmitReq* pReq, SMqDataRsp* pRsp, int32_t workerId) {
if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) { if (pExec->subType == TOPIC_SUB_TYPE__COLUMN) {

View File

@ -299,10 +299,12 @@ typedef struct STableScanInfo {
uint64_t queryId; // todo remove it uint64_t queryId; // todo remove it
uint64_t taskId; // todo remove it uint64_t taskId; // todo remove it
#if 0
struct { struct {
uint64_t uid; uint64_t uid;
int64_t ts; int64_t ts;
} lastStatus; } lastStatus;
#endif
int8_t scanMode; int8_t scanMode;
int8_t noTable; int8_t noTable;

View File

@ -351,6 +351,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
return 0; return 0;
} }
#if 0
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) { int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
@ -370,3 +371,4 @@ int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
return doGetScanStatus(pTaskInfo->pRoot, uid, ts); return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
} }
#endif

View File

@ -2844,7 +2844,7 @@ int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scan
} }
} }
} }
#if 0
int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) {
uint8_t type = pOperator->operatorType; uint8_t type = pOperator->operatorType;
@ -2930,6 +2930,7 @@ int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
#endif
// this is a blocking operator // this is a blocking operator
static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
@ -3341,8 +3342,8 @@ static void doHandleRemainBlockForNewGroupImpl(SFillOperatorInfo* pInfo, SResult
SExecTaskInfo* pTaskInfo) { SExecTaskInfo* pTaskInfo) {
pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows; pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
int64_t ekey = Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey int64_t ekey =
: pInfo->existNewGroupBlock->info.window.ekey; Q_STATUS_EQUAL(pTaskInfo->status, TASK_COMPLETED) ? pInfo->win.ekey : pInfo->existNewGroupBlock->info.window.ekey;
taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo)); taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey); taosFillSetStartInfo(pInfo->pFillInfo, pInfo->existNewGroupBlock->info.rows, ekey);
@ -4465,7 +4466,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
.precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision}; .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision};
int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId, pPhyNode->pConditions, pTaskInfo); pOptr = createMergeAlignedIntervalOperatorInfo(ops[0], pExprInfo, num, pResBlock, &interval, tsSlotId,
pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL == type) {
SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode; SMergeIntervalPhysiNode* pIntervalPhyNode = (SMergeIntervalPhysiNode*)pPhyNode;
@ -4504,8 +4506,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc);
int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId; int32_t tsSlotId = ((SColumnNode*)pSessionNode->window.pTspk)->slotId;
pOptr = pOptr = createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as,
createSessionAggOperatorInfo(ops[0], pExprInfo, num, pResBlock, pSessionNode->gap, tsSlotId, &as, pPhyNode->pConditions, pTaskInfo); pPhyNode->pConditions, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION == type) {
pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); pOptr = createStreamSessionAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_SESSION == type) {
@ -4527,7 +4529,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr; SColumnNode* pColNode = (SColumnNode*)((STargetNode*)pStateNode->pStateKey)->pExpr;
SColumn col = extractColumnFromColumnNode(pColNode); SColumn col = extractColumnFromColumnNode(pColNode);
pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions, pTaskInfo); pOptr = createStatewindowOperatorInfo(ops[0], pExprInfo, num, pResBlock, &as, tsSlotId, &col, pPhyNode->pConditions,
pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE == type) {
pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo); pOptr = createStreamStateAggOperatorInfo(ops[0], pPhyNode, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) { } else if (QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN == type) {