diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 697a53e570..378f9ffb24 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -137,8 +137,8 @@ int32_t create_topic() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); + pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); + /*pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");*/ if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -199,7 +199,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); - tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + /*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/ tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index dedc06a2b9..7e6231d7dd 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -2826,8 +2826,8 @@ typedef struct { static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp) { int32_t tlen = 0; - tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); - tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); + // tlen += taosEncodeFixedI64(buf, pRsp->reqOffset); + // tlen += taosEncodeFixedI64(buf, pRsp->rspOffset); tlen += taosEncodeFixedI16(buf, pRsp->resMsgType); tlen += taosEncodeFixedI32(buf, pRsp->metaRspLen); tlen += taosEncodeBinary(buf, pRsp->metaRsp, pRsp->metaRspLen); @@ -2835,8 +2835,8 @@ static FORCE_INLINE int32_t tEncodeSMqMetaRsp(void** buf, const SMqMetaRsp* pRsp } static FORCE_INLINE void* tDecodeSMqMetaRsp(const void* buf, SMqMetaRsp* pRsp) { - buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); - buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); + // buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); + // buf = taosDecodeFixedI64(buf, &pRsp->rspOffset); buf = taosDecodeFixedI16(buf, &pRsp->resMsgType); buf = taosDecodeFixedI32(buf, &pRsp->metaRspLen); buf = taosDecodeBinary(buf, &pRsp->metaRsp, pRsp->metaRspLen); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 45fa94b3bf..7b4565a99f 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -30,13 +30,15 @@ struct SRpcMsg; struct SSubplan; typedef struct SReadHandle { - void* reader; + void* streamReader; void* meta; void* config; void* vnode; void* mnd; SMsgCb* pMsgCb; - bool tqReader; + bool initMetaReader; + bool initTableReader; + bool initStreamReader; } SReadHandle; typedef enum { diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 67074c789e..52f671e176 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -223,7 +223,7 @@ typedef struct { SEpSet epSet; } SStreamChildEpInfo; -struct SStreamTask { +typedef struct SStreamTask { int64_t streamId; int32_t taskId; int8_t isDataScan; @@ -235,6 +235,11 @@ struct SStreamTask { int8_t taskStatus; int8_t execStatus; + // exec info + int64_t enqueueVer; + int64_t processedVer; + int64_t checkpointVer; + // node info int32_t selfChildId; int32_t nodeId; @@ -277,7 +282,7 @@ struct SStreamTask { // msg handle SMsgCb* pMsgCb; -}; +} SStreamTask; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo); @@ -288,6 +293,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask); void tFreeSStreamTask(SStreamTask* pTask); static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) { +#if 0 while (1) { int8_t inputStatus = atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING); @@ -296,6 +302,7 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem } ASSERT(0); } +#endif if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem); @@ -316,8 +323,10 @@ static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__IN_ACTIVE, TASK_TRIGGER_STATUS__ACTIVE); } +#if 0 // TODO: back pressure atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); +#endif return 0; } diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index c11651970c..e32a8d1055 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -178,7 +178,6 @@ void walFsync(SWal *, bool force); // apis for lifecycle management int32_t walCommit(SWal *, int64_t ver); -// truncate after int32_t walRollback(SWal *, int64_t ver); // notify that previous logs can be pruned safely int32_t walBeginSnapshot(SWal *, int64_t ver); @@ -207,10 +206,11 @@ void walCloseRef(SWalRef *); int32_t walRefVer(SWalRef *, int64_t ver); int32_t walUnrefVer(SWal *); +// help function for raft bool walLogExist(SWal *, int64_t ver); +bool walIsEmpty(SWal *); // lifecycle check -bool walIsEmpty(SWal *); int64_t walGetFirstVer(SWal *); int64_t walGetSnapshotVer(SWal *); int64_t walGetLastVer(SWal *); diff --git a/include/util/tutil.h b/include/util/tutil.h index 6a1a40f14c..2e96c5b88e 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -45,6 +45,7 @@ void taosIp2String(uint32_t ip, char *str); void taosIpPort2String(uint32_t ip, uint16_t port, char *str); void *tmemmem(const char *haystack, int hlen, const char *needle, int nlen); +char *strDupUnquo(const char *src); static FORCE_INLINE void taosEncryptPass(uint8_t *inBuf, size_t inLen, char *target) { T_MD5_CTX context; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 667f5b1dbc..331f149e13 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -49,19 +49,18 @@ struct tmq_list_t { }; struct tmq_conf_t { - char clientId[256]; - char groupId[TSDB_CGROUP_LEN]; - int8_t autoCommit; - int8_t resetOffset; - int8_t withTbName; - int8_t spEnable; - int32_t spBatchSize; - uint16_t port; - int32_t autoCommitInterval; - char* ip; - char* user; - char* pass; - /*char* db;*/ + char clientId[256]; + char groupId[TSDB_CGROUP_LEN]; + int8_t autoCommit; + int8_t resetOffset; + int8_t withTbName; + int8_t spEnable; + int32_t spBatchSize; + uint16_t port; + int32_t autoCommitInterval; + char* ip; + char* user; + char* pass; tmq_commit_cb* commitCb; void* commitCbUserParam; }; @@ -337,7 +336,7 @@ tmq_list_t* tmq_list_new() { int32_t tmq_list_append(tmq_list_t* list, const char* src) { SArray* container = &list->container; - char* topic = strdup(src); + char* topic = strDupUnquo(src); if (taosArrayPush(container, &topic) == NULL) return -1; return 0; } diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 4e1b2db44a..7a147f5ffa 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -332,7 +332,7 @@ int32_t tdProcessRSmaCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, con } SReadHandle handle = { - .reader = pReadHandle, + .streamReader = pReadHandle, .meta = pMeta, .pMsgCb = pMsgCb, .vnode = pVnode, diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5ce3cfab45..95ec99c9e5 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -28,8 +28,12 @@ int32_t tqInit() { atomic_store_8(&tqMgmt.inited, 0); return -1; } + if (streamInit() < 0) { + return -1; + } atomic_store_8(&tqMgmt.inited, 1); } + return 0; } @@ -361,8 +365,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { ASSERT(IS_META_MSG(pHead->msgType)); tqInfo("fetch meta msg, ver: %ld, type: %d", pHead->version, pHead->msgType); SMqMetaRsp metaRsp = {0}; - metaRsp.reqOffset = pReq->reqOffset.version; - metaRsp.rspOffset = fetchVer; + /*metaRsp.reqOffset = pReq->reqOffset.version;*/ + /*metaRsp.rspOffset = fetchVer;*/ + /*metaRsp.rspOffsetNew.version = fetchVer;*/ + tqOffsetResetToLog(&metaRsp.reqOffsetNew, pReq->reqOffset.version); + tqOffsetResetToLog(&metaRsp.rspOffsetNew, fetchVer); metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRsp = pHead->body; @@ -448,10 +455,10 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { req.qmsg = NULL; for (int32_t i = 0; i < 5; i++) { SReadHandle handle = { - .reader = pHandle->execHandle.pExecReader[i], + .streamReader = pHandle->execHandle.pExecReader[i], .meta = pTq->pVnode->pMeta, .vnode = pTq->pVnode, - .tqReader = true, + .initTableReader = true, }; pHandle->execHandle.execCol.task[i] = qCreateStreamExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle); ASSERT(pHandle->execHandle.execCol.task[i]); @@ -522,11 +529,11 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { if (pTask->execType != TASK_EXEC__NONE) { // expand runners if (pTask->isDataScan) { - SStreamReader* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta); - SReadHandle handle = { - .reader = pStreamReader, - .meta = pTq->pVnode->pMeta, - .vnode = pTq->pVnode, + /*SStreamReader* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);*/ + SReadHandle handle = { + .meta = pTq->pVnode->pMeta, + .vnode = pTq->pVnode, + .initStreamReader = 1, }; /*pTask->exec.inputHandle = pStreamReader;*/ pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle); diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index 398a09ecbc..17f4dc5426 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -84,7 +84,7 @@ int32_t tqMetaOpen(STQ* pTq) { if (handle.execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { for (int32_t i = 0; i < 5; i++) { SReadHandle reader = { - .reader = handle.execHandle.pExecReader[i], + .streamReader = handle.execHandle.pExecReader[i], .meta = pTq->pVnode->pMeta, .pMsgCb = &pTq->pVnode->msgCb, }; diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 00f2e09e0c..9c7dc234ec 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -338,7 +338,7 @@ typedef struct SessionWindowSupporter { uint8_t parentType; } SessionWindowSupporter; -typedef struct SStreamBlockScanInfo { +typedef struct SStreamScanInfo { uint64_t tableUid; // queried super table uid SExprInfo* pPseudoExpr; int32_t numOfPseudoExpr; @@ -355,7 +355,7 @@ typedef struct SStreamBlockScanInfo { int32_t blockType; // current block type int32_t validBlockIndex; // Is current data has returned? uint64_t numOfExec; // execution times - void* streamBlockReader;// stream block reader handle + void* streamReader;// stream block reader handle int32_t tsArrayIndex; SArray* tsArray; @@ -364,7 +364,7 @@ typedef struct SStreamBlockScanInfo { EStreamScanMode scanMode; SOperatorInfo* pStreamScanOp; - SOperatorInfo* pSnapshotReadOp; + SOperatorInfo* pTableScanOp; SArray* childIds; SessionWindowSupporter sessionSup; bool assignBlockUid; // assign block uid to groupId, temporarily used for generating rollup SMA. @@ -373,7 +373,7 @@ typedef struct SStreamBlockScanInfo { SSDataBlock* pPullDataRes; // pull data SSDataBlock SSDataBlock* pDeleteDataRes; // delete data SSDataBlock int32_t deleteDataIndex; -} SStreamBlockScanInfo; +} SStreamScanInfo; typedef struct SSysTableScanInfo { SRetrieveMetaTableRsp* pRsp; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 0e76607c8f..3591aaf975 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -37,7 +37,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu } else { pOperator->status = OP_NOT_OPENED; - SStreamBlockScanInfo* pInfo = pOperator->info; + SStreamScanInfo* pInfo = pOperator->info; pInfo->assignBlockUid = assignUid; // TODO: if a block was set but not consumed, @@ -45,7 +45,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu pInfo->blockType = type; if (type == STREAM_INPUT__DATA_SUBMIT) { - if (tqReadHandleSetMsg(pInfo->streamBlockReader, input, 0) < 0) { + if (tqReadHandleSetMsg(pInfo->streamReader, input, 0) < 0) { qError("submit msg messed up when initing stream block, %s" PRIx64, id); return TSDB_CODE_QRY_APP_ERROR; } @@ -130,7 +130,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { return pTaskInfo; } -static SArray* filterQualifiedChildTables(const SStreamBlockScanInfo* pScanInfo, const SArray* tableIdList) { +static SArray* filterQualifiedChildTables(const SStreamScanInfo* pScanInfo, const SArray* tableIdList) { SArray* qa = taosArrayInit(4, sizeof(tb_uid_t)); // let's discard the tables those are not created according to the queried super table. @@ -168,17 +168,17 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bo pInfo = pInfo->pDownstream[0]; } - int32_t code = 0; - SStreamBlockScanInfo* pScanInfo = pInfo->info; + int32_t code = 0; + SStreamScanInfo* pScanInfo = pInfo->info; if (isAdd) { // add new table id SArray* qa = filterQualifiedChildTables(pScanInfo, tableIdList); qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa)); - code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa); + code = tqReadHandleAddTbUidList(pScanInfo->streamReader, qa); taosArrayDestroy(qa); } else { // remove the table id in current list qDebug(" %d remove child tables from the stream scanner", (int32_t)taosArrayGetSize(tableIdList)); - code = tqReadHandleRemoveTbUidList(pScanInfo->streamBlockReader, tableIdList); + code = tqReadHandleRemoveTbUidList(pScanInfo->streamReader, tableIdList); } return code; diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 25b61e15c3..c92fc1b1fb 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -2847,12 +2847,12 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { pOperator->status = OP_OPENED; if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamBlockScanInfo* pScanInfo = pOperator->info; + SStreamScanInfo* pScanInfo = pOperator->info; pScanInfo->blockType = STREAM_INPUT__DATA_SCAN; - pScanInfo->pSnapshotReadOp->status = OP_OPENED; + pScanInfo->pTableScanOp->status = OP_OPENED; - STableScanInfo* pInfo = pScanInfo->pSnapshotReadOp->info; + STableScanInfo* pInfo = pScanInfo->pTableScanOp->info; ASSERT(pInfo->scanMode == TABLE_SCAN__TABLE_ORDER); if (uid == 0) { @@ -2912,8 +2912,8 @@ int32_t doPrepareScan(SOperatorInfo* pOperator, uint64_t uid, int64_t ts) { int32_t doGetScanStatus(SOperatorInfo* pOperator, uint64_t* uid, int64_t* ts) { int32_t type = pOperator->operatorType; if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - SStreamBlockScanInfo* pScanInfo = pOperator->info; - STableScanInfo* pSnapShotScanInfo = pScanInfo->pSnapshotReadOp->info; + SStreamScanInfo* pScanInfo = pOperator->info; + STableScanInfo* pSnapShotScanInfo = pScanInfo->pTableScanOp->info; *uid = pSnapShotScanInfo->lastStatus.uid; *ts = pSnapShotScanInfo->lastStatus.ts; } else { @@ -4537,9 +4537,9 @@ static int32_t extractTbscanInStreamOpTree(SOperatorInfo* pOperator, STableScanI } return extractTbscanInStreamOpTree(pOperator->pDownstream[0], ppInfo); } else { - SStreamBlockScanInfo* pInfo = pOperator->info; - ASSERT(pInfo->pSnapshotReadOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); - *ppInfo = pInfo->pSnapshotReadOp->info; + SStreamScanInfo* pInfo = pOperator->info; + ASSERT(pInfo->pTableScanOp->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); + *ppInfo = pInfo->pTableScanOp->info; return 0; } } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 515efb86f3..2ef0b7470e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -788,7 +788,7 @@ _error: return NULL; } -static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { +static void doClearBufferedBlocks(SStreamScanInfo* pInfo) { size_t total = taosArrayGetSize(pInfo->pBlockLists); pInfo->validBlockIndex = 0; @@ -799,11 +799,11 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { taosArrayClear(pInfo->pBlockLists); } -static bool isSessionWindow(SStreamBlockScanInfo* pInfo) { +static bool isSessionWindow(SStreamScanInfo* pInfo) { return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION; } -static bool isStateWindow(SStreamBlockScanInfo* pInfo) { +static bool isStateWindow(SStreamScanInfo* pInfo) { return pInfo->sessionSup.parentType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE; } @@ -828,23 +828,21 @@ static uint64_t getGroupId(SOperatorInfo* pOperator, uint64_t uid) { */ } -static void setGroupId(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) { +static void setGroupId(SStreamScanInfo* pInfo, SSDataBlock* pBlock, int32_t groupColIndex, int32_t rowIndex) { ASSERT(rowIndex < pBlock->info.rows); - switch (pBlock->info.type) - { - case STREAM_DELETE_DATA: - case STREAM_RETRIEVE: { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); - uint64_t* groupCol = (uint64_t*)pColInfo->pData; - pInfo->groupId = groupCol[rowIndex]; - } - break; - default: - break; + switch (pBlock->info.type) { + case STREAM_DELETE_DATA: + case STREAM_RETRIEVE: { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, groupColIndex); + uint64_t* groupCol = (uint64_t*)pColInfo->pData; + pInfo->groupId = groupCol[rowIndex]; + } break; + default: + break; } } -static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { +static bool prepareDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { STimeWindow win = { .skey = INT64_MIN, .ekey = INT64_MAX, @@ -864,11 +862,10 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3 win = pCurWin->win; (*pRowIndex) += updateSessionWindowInfo(pCurWin, tsCols, NULL, pSDB->info.rows, *pRowIndex, gap, NULL); } else { - win = - getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL); + win = getActiveTimeWindow(NULL, &dumyInfo, tsCols[*pRowIndex], &pInfo->interval, pInfo->interval.precision, NULL); setGroupId(pInfo, pSDB, GROUPID_COLUMN_INDEX, *pRowIndex); - (*pRowIndex) += getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, - TSDB_ORDER_ASC); + (*pRowIndex) += + getNumOfRowsInTimeWindow(&pSDB->info, tsCols, *pRowIndex, win.ekey, binarySearchForKey, NULL, TSDB_ORDER_ASC); } needRead = true; } else if (isStateWindow(pInfo)) { @@ -886,7 +883,7 @@ static bool prepareDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int3 if (!needRead) { return false; } - STableScanInfo* pTableScanInfo = pInfo->pSnapshotReadOp->info; + STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info; pTableScanInfo->cond.twindows[0] = win; pTableScanInfo->curTWinIdx = 0; // tsdbResetReadHandle(pTableScanInfo->dataReader, &pTableScanInfo->cond, 0); @@ -911,14 +908,14 @@ static void copyOneRow(SSDataBlock* dest, SSDataBlock* source, int32_t sourceRow dest->info.rows++; } -static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { +static SSDataBlock* doDataScan(SStreamScanInfo* pInfo, SSDataBlock* pSDB, int32_t tsColIndex, int32_t* pRowIndex) { while (1) { SSDataBlock* pResult = NULL; - pResult = doTableScan(pInfo->pSnapshotReadOp); + pResult = doTableScan(pInfo->pTableScanOp); if (pResult == NULL) { if (prepareDataScan(pInfo, pSDB, tsColIndex, pRowIndex)) { // scan next window data - pResult = doTableScan(pInfo->pSnapshotReadOp); + pResult = doTableScan(pInfo->pTableScanOp); } } if (!pResult) { @@ -943,7 +940,8 @@ static SSDataBlock* doDataScan(SStreamBlockScanInfo* pInfo, SSDataBlock* pSDB, i */ } -static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, SSDataBlock* pUpdateRes) { +static void copyDeleteDataBlock(SStreamScanInfo* pInfo, SSDataBlock* pDelBlock, SOperatorInfo* pOperator, + SSDataBlock* pUpdateRes) { if (pDelBlock->info.rows == 0) { return; } @@ -951,18 +949,20 @@ static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBl blockDataEnsureCapacity(pUpdateRes, 64); ASSERT(taosArrayGetSize(pDelBlock->pDataBlock) >= 3); SColumnInfoData* pStartTsCol = taosArrayGet(pDelBlock->pDataBlock, START_TS_COLUMN_INDEX); - TSKEY* startData = (TSKEY*)pStartTsCol->pData; + TSKEY* startData = (TSKEY*)pStartTsCol->pData; SColumnInfoData* pEndTsCol = taosArrayGet(pDelBlock->pDataBlock, END_TS_COLUMN_INDEX); - TSKEY* endData = (TSKEY*)pEndTsCol->pData; + TSKEY* endData = (TSKEY*)pEndTsCol->pData; SColumnInfoData* pGpCol = taosArrayGet(pDelBlock->pDataBlock, UID_COLUMN_INDEX); - uint64_t* uidCol = (uint64_t*)pGpCol->pData; + uint64_t* uidCol = (uint64_t*)pGpCol->pData; SColumnInfoData* pDestTsCol = taosArrayGet(pUpdateRes->pDataBlock, START_TS_COLUMN_INDEX); SColumnInfoData* pDestGpCol = taosArrayGet(pUpdateRes->pDataBlock, DELETE_GROUPID_COLUMN_INDEX); - for (int32_t i = pInfo->deleteDataIndex ; i < pDelBlock->info.rows && - i < pDelBlock->info.capacity - (endData[i] - startData[i])/pInfo->interval.interval - 1; i++) { + for (int32_t i = pInfo->deleteDataIndex; + i < pDelBlock->info.rows && + i < pDelBlock->info.capacity - (endData[i] - startData[i]) / pInfo->interval.interval - 1; + i++) { uint64_t groupId = getGroupId(pOperator, uidCol[i]); - for (TSKEY startTs = startData[i]; startTs <= endData[i]; ) { + for (TSKEY startTs = startData[i]; startTs <= endData[i];) { colDataAppend(pDestTsCol, pUpdateRes->info.rows, (const char*)&startTs, false); colDataAppend(pDestGpCol, pUpdateRes->info.rows, (const char*)&groupId, false); pUpdateRes->info.rows++; @@ -977,7 +977,7 @@ static void copyDeleteDataBlock(SStreamBlockScanInfo* pInfo, SSDataBlock* pDelBl } } -static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) { +static void setUpdateData(SStreamScanInfo* pInfo, SSDataBlock* pBlock, SSDataBlock* pUpdateBlock) { blockDataCleanup(pUpdateBlock); int32_t size = taosArrayGetSize(pInfo->tsArray); if (pInfo->tsArrayIndex < size) { @@ -986,11 +986,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa blockDataEnsureCapacity(pUpdateBlock, size); int32_t rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, pInfo->tsArrayIndex); - pInfo->groupId = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid); + pInfo->groupId = getGroupId(pInfo->pTableScanOp, pBlock->info.uid); int32_t i = 0; for (; i < size; i++) { rowId = *(int32_t*)taosArrayGet(pInfo->tsArray, i + pInfo->tsArrayIndex); - uint64_t id = getGroupId(pInfo->pSnapshotReadOp, pBlock->info.uid); + uint64_t id = getGroupId(pInfo->pTableScanOp, pBlock->info.uid); if (pInfo->groupId != id) { break; } @@ -1009,12 +1009,11 @@ static void setUpdateData(SStreamBlockScanInfo* pInfo, SSDataBlock* pBlock, SSDa } if (size == 0) { - copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pUpdateBlock); + copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pUpdateBlock); } } -static void checkUpdateData(SStreamBlockScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, - bool out) { +static void checkUpdateData(SStreamScanInfo* pInfo, bool invertible, SSDataBlock* pBlock, bool out) { SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, pInfo->primaryTsIndex); ASSERT(pColDataInfo->info.type == TSDB_DATA_TYPE_TIMESTAMP); TSKEY* ts = (TSKEY*)pColDataInfo->pData; @@ -1030,15 +1029,15 @@ static void setBlockGroupId(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32 SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, uidColIndex); uint64_t* uidCol = (uint64_t*)pColDataInfo->pData; ASSERT(pBlock->info.rows > 0); - for (int32_t i = 0 ; i < pBlock->info.rows; i++) { + for (int32_t i = 0; i < pBlock->info.rows; i++) { uidCol[i] = getGroupId(pOperator, uidCol[i]); } } -static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { +static SSDataBlock* doStreamScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStreamBlockScanInfo* pInfo = pOperator->info; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStreamScanInfo* pInfo = pOperator->info; pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) { @@ -1056,30 +1055,29 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { int32_t current = pInfo->validBlockIndex++; SSDataBlock* pBlock = taosArrayGetP(pInfo->pBlockLists, current); + // TODO move into scan blockDataUpdateTsWindow(pBlock, 0); switch (pBlock->info.type) { - case STREAM_RETRIEVE:{ - pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE; - copyDataBlock(pInfo->pPullDataRes, pBlock); - pInfo->pullDataResIndex = 0; - prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex); - updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); - } - break; - case STREAM_DELETE_DATA: { - pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; - pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; - copyDataBlock(pInfo->pDeleteDataRes, pBlock); - copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pSnapshotReadOp, pInfo->pUpdateRes); - pInfo->updateResIndex = 0; - prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex); - pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA; - return pInfo->pUpdateRes; - } - break; - default: - break; + case STREAM_RETRIEVE: { + pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER_RETRIEVE; + copyDataBlock(pInfo->pPullDataRes, pBlock); + pInfo->pullDataResIndex = 0; + prepareDataScan(pInfo, pInfo->pPullDataRes, START_TS_COLUMN_INDEX, &pInfo->pullDataResIndex); + updateInfoAddCloseWindowSBF(pInfo->pUpdateInfo); + } break; + case STREAM_DELETE_DATA: { + pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; + pInfo->scanMode = STREAM_SCAN_FROM_DATAREADER; + copyDataBlock(pInfo->pDeleteDataRes, pBlock); + copyDeleteDataBlock(pInfo, pInfo->pDeleteDataRes, pInfo->pTableScanOp, pInfo->pUpdateRes); + pInfo->updateResIndex = 0; + prepareDataScan(pInfo, pInfo->pUpdateRes, START_TS_COLUMN_INDEX, &pInfo->updateResIndex); + pInfo->pUpdateRes->info.type = STREAM_DELETE_DATA; + return pInfo->pUpdateRes; + } break; + default: + break; } return pBlock; } else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) { @@ -1128,11 +1126,11 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; blockDataCleanup(pInfo->pRes); - while (tqNextDataBlock(pInfo->streamBlockReader)) { + while (tqNextDataBlock(pInfo->streamReader)) { SSDataBlock block = {0}; // todo refactor - int32_t code = tqRetrieveDataBlock(&block, pInfo->streamBlockReader); + int32_t code = tqRetrieveDataBlock(&block, pInfo->streamReader); uint64_t groupId = block.info.groupId; uint64_t uid = block.info.uid; @@ -1228,11 +1226,13 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { } } } + return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes; + } else if (pInfo->blockType == STREAM_INPUT__DATA_SCAN) { // check reader last status // if not match, reset status - SSDataBlock* pResult = doTableScan(pInfo->pSnapshotReadOp); + SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); return pResult && pResult->info.rows > 0 ? pResult : NULL; } else { @@ -1256,8 +1256,8 @@ static SArray* extractTableIdList(const STableListInfo* pTableGroupInfo) { SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SExecTaskInfo* pTaskInfo, STimeWindowAggSupp* pTwSup, uint64_t queryId, uint64_t taskId) { - SStreamBlockScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamBlockScanInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + SStreamScanInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamScanInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; @@ -1295,32 +1295,40 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys } if (pHandle) { - SOperatorInfo* pTableScanDummy = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId); - STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanDummy->info; + SOperatorInfo* pTableScanOp = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo, queryId, taskId); + STableScanInfo* pSTInfo = (STableScanInfo*)pTableScanOp->info; SArray* tableList = taosArrayGetP(pTaskInfo->tableqinfoList.pGroupList, 0); - if (pHandle->tqReader) { + if (pHandle->initTableReader) { pSTInfo->scanMode = TABLE_SCAN__TABLE_ORDER; pSTInfo->dataReader = tsdbReaderOpen(pHandle->vnode, &pSTInfo->cond, tableList, 0, 0); } + if (pHandle->initStreamReader) { + ASSERT(pHandle->streamReader == NULL); + pInfo->streamReader = tqInitSubmitMsgScanner(pHandle->meta); + ASSERT(pInfo->streamReader); + } else { + ASSERT(pHandle->streamReader); + pInfo->streamReader = pHandle->streamReader; + } + if (pSTInfo->interval.interval > 0) { pInfo->pUpdateInfo = updateInfoInitP(&pSTInfo->interval, pTwSup->waterMark); } else { pInfo->pUpdateInfo = NULL; } - pInfo->pSnapshotReadOp = pTableScanDummy; + + pInfo->pTableScanOp = pTableScanOp; pInfo->interval = pSTInfo->interval; pInfo->readHandle = *pHandle; - ASSERT(pHandle->reader); - pInfo->streamBlockReader = pHandle->reader; pInfo->tableUid = pScanPhyNode->uid; // set the extract column id to streamHandle - tqReadHandleSetColIdList((SStreamReader*)pHandle->reader, pColIds); + tqReadHandleSetColIdList((SStreamReader*)pHandle->streamReader, pColIds); SArray* tableIdList = extractTableIdList(&pTaskInfo->tableqinfoList); - int32_t code = tqReadHandleSetTbUidList(pHandle->reader, tableIdList); + int32_t code = tqReadHandleSetTbUidList(pHandle->streamReader, tableIdList); if (code != 0) { taosArrayDestroy(tableIdList); goto _error; @@ -1344,7 +1352,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pInfo->deleteDataIndex = 0; pInfo->pDeleteDataRes = createPullDataBlock(); - pOperator->name = "StreamBlockScanOperator"; + pOperator->name = "StreamScanOperator"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN; pOperator->blocking = false; pOperator->status = OP_NOT_OPENED; @@ -1353,7 +1361,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys pOperator->pTaskInfo = pTaskInfo; pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doStreamBlockScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); + createOperatorFpSet(operatorDummyOpenFn, doStreamScan, NULL, NULL, operatorDummyCloseFn, NULL, NULL, NULL); return pOperator; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index fdef432d95..24749fde2c 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2980,7 +2980,7 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, int64_t gap, int64_t waterMark, uint8_t type) { ASSERT(downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN); - SStreamBlockScanInfo* pScanInfo = downstream->info; + SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->sessionSup = (SessionWindowSupporter){.pStreamAggSup = pAggSup, .gap = gap, .parentType = type}; pScanInfo->pUpdateInfo = updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, waterMark); } @@ -3464,8 +3464,8 @@ typedef SResultWindowInfo* (*__get_win_info_)(void*); SResultWindowInfo* getResWinForSession(void* pData) { return (SResultWindowInfo*)pData; } SResultWindowInfo* getResWinForState(void* pData) { return &((SStateWindowInfo*)pData)->winInfo; } -int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, - SArray* pClosed, __get_win_info_ fn, bool delete) { +int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArray* pClosed, __get_win_info_ fn, + bool delete) { // Todo(liuyao) save window to tdb void** pIte = NULL; size_t keyLen = 0; @@ -3604,8 +3604,8 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; - closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, - getResWinForSession, pInfo->ignoreExpiredData); + closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForSession, + pInfo->ignoreExpiredData); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData); copyUpdateResult(pStUpdated, pUpdated); taosHashCleanup(pStUpdated); @@ -4097,8 +4097,8 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { // restore the value pOperator->status = OP_RES_TO_RETURN; - closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, - getResWinForState, pInfo->ignoreExpiredData); + closeSessionWindow(pInfo->streamAggSup.pResultRows, &pInfo->twAggSup, pUpdated, getResWinForState, + pInfo->ignoreExpiredData); closeChildSessionWindow(pInfo->pChildren, pInfo->twAggSup.maxTs, pInfo->ignoreExpiredData); copyUpdateResult(pSeUpdated, pUpdated); taosHashCleanup(pSeUpdated); diff --git a/source/libs/stream/inc/streamInc.h b/source/libs/stream/inc/streamInc.h index 2f41c08354..1629c863d5 100644 --- a/source/libs/stream/inc/streamInc.h +++ b/source/libs/stream/inc/streamInc.h @@ -17,6 +17,7 @@ #define _STREAM_INC_H_ #include "executor.h" +#include "tref.h" #include "tstream.h" #ifdef __cplusplus @@ -24,8 +25,9 @@ extern "C" { #endif typedef struct { - int8_t inited; - void* timer; + int8_t inited; + int32_t refPool; + void* timer; } SStreamGlobalEnv; static SStreamGlobalEnv streamEnv; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 56d063ae51..8b8badd67a 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -76,9 +76,6 @@ void streamTriggerByTimer(void* param, void* tmrId) { int32_t streamSetupTrigger(SStreamTask* pTask) { if (pTask->triggerParam != 0) { - if (streamInit() < 0) { - return -1; - } pTask->timer = taosTmrStart(streamTriggerByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer); pTask->triggerStatus = TASK_TRIGGER_STATUS__IN_ACTIVE; } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 27f12259bc..d7fa6695d0 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -79,13 +79,13 @@ int32_t walCommit(SWal *pWal, int64_t ver) { } int32_t walRollback(SWal *pWal, int64_t ver) { + taosThreadMutexLock(&pWal->mutex); int64_t code; char fnameStr[WAL_FILE_LEN]; if (ver > pWal->vers.lastVer || ver < pWal->vers.commitVer) { terrno = TSDB_CODE_WAL_INVALID_VER; return -1; } - taosThreadMutexLock(&pWal->mutex); // find correct file if (ver < walGetLastFileFirstVer(pWal)) { diff --git a/source/util/src/tutil.c b/source/util/src/tutil.c index addb9f55ba..7f3728e2ad 100644 --- a/source/util/src/tutil.c +++ b/source/util/src/tutil.c @@ -64,6 +64,20 @@ int32_t strdequote(char *z) { return j + 1; // only one quote, do nothing } +char *strDupUnquo(const char *src) { + if (src == NULL) return NULL; + if (src[0] != '`') return strdup(src); + int32_t len = (int32_t)strlen(src); + if (src[len - 1] != '`') return NULL; + char *ret = taosMemoryMalloc(len); + if (ret == NULL) return NULL; + for (int32_t i = 0; i < len - 1; i++) { + ret[i] = src[i + 1]; + } + ret[len - 1] = 0; + return ret; +} + size_t strtrim(char *z) { int32_t i = 0; int32_t j = 0; diff --git a/tests/system-test/7-tmq/stbTagFilter.py b/tests/system-test/7-tmq/stbTagFilter.py index 2a2cb40c09..65609629bc 100644 --- a/tests/system-test/7-tmq/stbTagFilter.py +++ b/tests/system-test/7-tmq/stbTagFilter.py @@ -25,7 +25,7 @@ class TDTestCase: paraDict = {'dbName': 'db2', 'dropFlag': 1, 'event': '', - 'vgroups': 4, + 'vgroups': 1, 'stbName': 'stb', 'colPrefix': 'c', 'tagPrefix': 't', @@ -44,7 +44,7 @@ class TDTestCase: topicNameList = ['topic1'] expectRowsList = [] tmqCom.initConsumerTable() - tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1) + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=1,replica=1) tdLog.info("create stb") tdCom.create_stable(tdSql, dbname=paraDict["dbName"],stbname=paraDict["stbName"], column_elm_list=paraDict['colSchema'], tag_elm_list=paraDict['tagSchema']) tdLog.info("create ctb")