diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index e7c6491b9d..138fad0ddb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -355,6 +355,8 @@ typedef struct SMetaHbInfo SMetaHbInfo; typedef struct SDispatchMsgInfo { SStreamDispatchReq* pData; // current dispatch data int8_t dispatchMsgType; + int64_t checkpointId;// checkpoint id msg + int32_t transId; // transId for current checkpoint int16_t msgType; // dispatch msg type int32_t retryCount; // retry send data count int64_t startTs; // dispatch start time, record total elapsed time for dispatch diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c8fc556150..ce149921e3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1626,6 +1626,22 @@ void changeByteEndian(char* pData){ } } +static void tmqGetRawDataRowsPrecisionFromRes(void *pRetrieve, void** rawData, int64_t *rows, int32_t *precision){ + if(*(int64_t*)pRetrieve == 0){ + *rawData = ((SRetrieveTableRsp*)pRetrieve)->data; + *rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows); + if(precision != NULL){ + *precision = ((SRetrieveTableRsp*)pRetrieve)->precision; + } + }else if(*(int64_t*)pRetrieve == 1){ + *rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; + *rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows); + if(precision != NULL){ + *precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision; + } + } +} + static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) { (*numOfRows) = 0; tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN); @@ -1648,13 +1664,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg void* rawData = NULL; int64_t rows = 0; // deal with compatibility - if(*(int64_t*)pRetrieve == 0){ - rawData = ((SRetrieveTableRsp*)pRetrieve)->data; - rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows); - }else if(*(int64_t*)pRetrieve == 1){ - rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data; - rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows); - } + tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL); pVg->numOfRows += rows; (*numOfRows) += rows; @@ -2625,18 +2635,22 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) { pRspObj->resIter++; if (pRspObj->resIter < pRspObj->rsp.blockNum) { - SRetrieveTableRspForTmq* pRetrieveTmq = - (SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); if (pRspObj->rsp.withSchema) { doFreeReqResultInfo(&pRspObj->resInfo); SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter); setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols); } - pRspObj->resInfo.pData = (void*)pRetrieveTmq->data; - pRspObj->resInfo.numOfRows = htobe64(pRetrieveTmq->numOfRows); + void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter); + void* rawData = NULL; + int64_t rows = 0; + int32_t precision = 0; + tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision); + + pRspObj->resInfo.pData = rawData; + pRspObj->resInfo.numOfRows = rows; pRspObj->resInfo.current = 0; - pRspObj->resInfo.precision = pRetrieveTmq->precision; + pRspObj->resInfo.precision = precision; // TODO handle the compressed case pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows; diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a63157445b..924af2b0a7 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -160,6 +160,7 @@ typedef struct { ETrnConflct conflict; ETrnExec exec; EOperType oper; + bool changeless; int32_t code; int32_t failedTimes; void* rpcRsp; diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 8689df98af..8c9ca87fb1 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -81,6 +81,7 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbnam void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId); void mndTransSetSerial(STrans *pTrans); void mndTransSetParallel(STrans *pTrans); +void mndTransSetChangeless(STrans *pTrans); void mndTransSetOper(STrans *pTrans, EOperType oper); int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans); #ifndef BUILD_NO_CALL diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 3af372a432..0e4f4210fb 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -739,6 +739,8 @@ void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; } void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; } +void mndTransSetChangeless(STrans *pTrans) { pTrans->changeless = true; } + void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; } static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) { @@ -862,7 +864,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) { return -1; } - if (taosArrayGetSize(pTrans->commitActions) <= 0) { + if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) { terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL; mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); return -1; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 571f17fab6..2ee9af0486 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -2342,24 +2342,7 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra return -1; } - if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) { - mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId); - return -1; - } - mndReleaseDb(pMnode, pDb); - - SSdbRaw *pRaw = mndVgroupActionEncode(pVgroup); - if (pRaw == NULL) { - mError("trans:%d, vgid:%d failed to encode action to dnode:%d", pTrans->id, vgid, dnodeId); - return -1; - } - if (mndTransAppendCommitlog(pTrans, pRaw) != 0) { - sdbFreeRaw(pRaw); - mError("trans:%d, vgid:%d failed to append commit log dnode:%d", pTrans->id, vgid, dnodeId); - return -1; - } - (void)sdbSetRawStatus(pRaw, SDB_STATUS_READY); } else { mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist, online); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 1b67dce9b0..0f7f74f78b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -850,12 +850,18 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); + taosThreadMutexLock(&pTask->lock); + // clear flag set during do checkpoint, and open inputQ for all upstream tasks if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { + tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d", + pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); streamTaskClearCheckInfo(pTask, true); streamTaskSetStatusReady(pTask); } + taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pMeta, pTask); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 9542009d72..baf5ebf8cb 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -321,6 +321,8 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) { destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask)); } + pMsgInfo->checkpointId = -1; + pMsgInfo->transId = -1; pMsgInfo->pData = NULL; pMsgInfo->dispatchMsgType = 0; } @@ -332,6 +334,12 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD pTask->msgInfo.dispatchMsgType = pData->type; + if (pData->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + SSDataBlock* p = taosArrayGet(pData->blocks, 0); + pTask->msgInfo.checkpointId = p->info.version; + pTask->msgInfo.transId = p->info.window.ekey; + } + if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq)); @@ -950,9 +958,21 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) { // this message has been sent successfully, let's try next one. static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) { stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData); + bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); if (delayDispatch) { - pTask->chkInfo.dispatchCheckpointTrigger = true; + taosThreadMutexLock(&pTask->lock); + // we only set the dispatch msg info for current checkpoint trans + if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && pTask->chkInfo.checkpointingId == pTask->msgInfo.checkpointId) { + ASSERT(pTask->chkInfo.transId == pTask->msgInfo.transId); + pTask->chkInfo.dispatchCheckpointTrigger = true; + stDebug("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d confirmed", + pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); + } else { + stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d discard, since expired", + pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId); + } + taosThreadMutexUnlock(&pTask->lock); } clearBufferedDispatchMsg(pTask); diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index 6aa215586a..cfa94209f6 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -543,8 +543,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { return; } - taosThreadMutexLock(&pTask->lock); - pSM->prev.state = pSM->current; pSM->prev.evt = 0; @@ -552,8 +550,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) { pSM->startTs = taosGetTimestampMs(); pSM->pActiveTrans = NULL; taosArrayClear(pSM->pWaitingEventList); - - taosThreadMutexUnlock(&pTask->lock); } STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn, diff --git a/tests/system-test/7-tmq/tmq_ts4563.py b/tests/system-test/7-tmq/tmq_ts4563.py index fc1cc259ce..13f510ffe6 100644 --- a/tests/system-test/7-tmq/tmq_ts4563.py +++ b/tests/system-test/7-tmq/tmq_ts4563.py @@ -29,9 +29,11 @@ class TDTestCase: tdSql.execute(f'use db_stmt') tdSql.query("select ts,k from st") - tdSql.checkRows(2) + tdSql.checkRows(self.expected_affected_rows) tdSql.execute(f'create topic t_unorder_data as select ts,k from st') + tdSql.execute(f'create topic t_unorder_data_none as select i,k from st') + consumer_dict = { "group.id": "g1", "td.connect.user": "root", @@ -41,7 +43,7 @@ class TDTestCase: consumer = Consumer(consumer_dict) try: - consumer.subscribe(["t_unorder_data"]) + consumer.subscribe(["t_unorder_data", "t_unorder_data_none"]) except TmqError: tdLog.exit(f"subscribe error") @@ -51,18 +53,15 @@ class TDTestCase: res = consumer.poll(1) print(res) if not res: - if cnt == 0: + if cnt == 0 or cnt != 2*self.expected_affected_rows: tdLog.exit("consume error") break val = res.value() if val is None: continue for block in val: + print(block.fetchall(),len(block.fetchall())) cnt += len(block.fetchall()) - - if cnt != 2: - tdLog.exit("consume error") - finally: consumer.close() @@ -110,20 +109,32 @@ class TDTestCase: params = new_multi_binds(2) params[0].timestamp((1626861392589, 1626861392590)) params[1].int([3, None]) - + # print(type(stmt)) tdLog.debug("bind_param_batch start") stmt.bind_param_batch(params) + tdLog.debug("bind_param_batch end") stmt.execute() tdLog.debug("execute end") + conn.execute("flush database %s" % dbname) + + params1 = new_multi_binds(2) + params1[0].timestamp((1626861392587,1626861392586)) + params1[1].int([None,3]) + stmt.bind_param_batch(params1) + stmt.execute() + end = datetime.now() print("elapsed time: ", end - start) - assert stmt.affected_rows == 2 + print(stmt.affected_rows) + self.expected_affected_rows = 4 + if stmt.affected_rows != self.expected_affected_rows : + tdLog.exit("affected_rows error") tdLog.debug("close start") stmt.close() - + # conn.execute("drop database if exists %s" % dbname) conn.close()