Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/ly_query

This commit is contained in:
54liuyao 2024-04-07 09:22:02 +08:00
commit 6ebe8da634
10 changed files with 81 additions and 45 deletions

View File

@ -355,6 +355,8 @@ typedef struct SMetaHbInfo SMetaHbInfo;
typedef struct SDispatchMsgInfo {
SStreamDispatchReq* pData; // current dispatch data
int8_t dispatchMsgType;
int64_t checkpointId;// checkpoint id msg
int32_t transId; // transId for current checkpoint
int16_t msgType; // dispatch msg type
int32_t retryCount; // retry send data count
int64_t startTs; // dispatch start time, record total elapsed time for dispatch

View File

@ -1626,6 +1626,22 @@ void changeByteEndian(char* pData){
}
}
static void tmqGetRawDataRowsPrecisionFromRes(void *pRetrieve, void** rawData, int64_t *rows, int32_t *precision){
if(*(int64_t*)pRetrieve == 0){
*rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
*rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
if(precision != NULL){
*precision = ((SRetrieveTableRsp*)pRetrieve)->precision;
}
}else if(*(int64_t*)pRetrieve == 1){
*rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
*rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
if(precision != NULL){
*precision = ((SRetrieveTableRspForTmq*)pRetrieve)->precision;
}
}
}
static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg* pVg, int64_t* numOfRows, SMqRspObj* pRspObj) {
(*numOfRows) = 0;
tstrncpy(pRspObj->topic, pWrapper->topicHandle->topicName, TSDB_TOPIC_FNAME_LEN);
@ -1648,13 +1664,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
void* rawData = NULL;
int64_t rows = 0;
// deal with compatibility
if(*(int64_t*)pRetrieve == 0){
rawData = ((SRetrieveTableRsp*)pRetrieve)->data;
rows = htobe64(((SRetrieveTableRsp*)pRetrieve)->numOfRows);
}else if(*(int64_t*)pRetrieve == 1){
rawData = ((SRetrieveTableRspForTmq*)pRetrieve)->data;
rows = htobe64(((SRetrieveTableRspForTmq*)pRetrieve)->numOfRows);
}
tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, NULL);
pVg->numOfRows += rows;
(*numOfRows) += rows;
@ -2625,18 +2635,22 @@ SReqResultInfo* tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4) {
pRspObj->resIter++;
if (pRspObj->resIter < pRspObj->rsp.blockNum) {
SRetrieveTableRspForTmq* pRetrieveTmq =
(SRetrieveTableRspForTmq*)taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
if (pRspObj->rsp.withSchema) {
doFreeReqResultInfo(&pRspObj->resInfo);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(pRspObj->rsp.blockSchema, pRspObj->resIter);
setResSchemaInfo(&pRspObj->resInfo, pSW->pSchema, pSW->nCols);
}
pRspObj->resInfo.pData = (void*)pRetrieveTmq->data;
pRspObj->resInfo.numOfRows = htobe64(pRetrieveTmq->numOfRows);
void* pRetrieve = taosArrayGetP(pRspObj->rsp.blockData, pRspObj->resIter);
void* rawData = NULL;
int64_t rows = 0;
int32_t precision = 0;
tmqGetRawDataRowsPrecisionFromRes(pRetrieve, &rawData, &rows, &precision);
pRspObj->resInfo.pData = rawData;
pRspObj->resInfo.numOfRows = rows;
pRspObj->resInfo.current = 0;
pRspObj->resInfo.precision = pRetrieveTmq->precision;
pRspObj->resInfo.precision = precision;
// TODO handle the compressed case
pRspObj->resInfo.totalRows += pRspObj->resInfo.numOfRows;

View File

@ -160,6 +160,7 @@ typedef struct {
ETrnConflct conflict;
ETrnExec exec;
EOperType oper;
bool changeless;
int32_t code;
int32_t failedTimes;
void* rpcRsp;

View File

@ -81,6 +81,7 @@ void mndTransSetDbName(STrans *pTrans, const char *dbname, const char *stbnam
void mndTransSetArbGroupId(STrans *pTrans, int32_t groupId);
void mndTransSetSerial(STrans *pTrans);
void mndTransSetParallel(STrans *pTrans);
void mndTransSetChangeless(STrans *pTrans);
void mndTransSetOper(STrans *pTrans, EOperType oper);
int32_t mndTransCheckConflict(SMnode *pMnode, STrans *pTrans);
#ifndef BUILD_NO_CALL

View File

@ -739,6 +739,8 @@ void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
void mndTransSetParallel(STrans *pTrans) { pTrans->exec = TRN_EXEC_PARALLEL; }
void mndTransSetChangeless(STrans *pTrans) { pTrans->changeless = true; }
void mndTransSetOper(STrans *pTrans, EOperType oper) { pTrans->oper = oper; }
static int32_t mndTransSync(SMnode *pMnode, STrans *pTrans) {
@ -862,7 +864,7 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return -1;
}
if (taosArrayGetSize(pTrans->commitActions) <= 0) {
if (!pTrans->changeless && taosArrayGetSize(pTrans->commitActions) <= 0) {
terrno = TSDB_CODE_MND_TRANS_CLOG_IS_NULL;
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
return -1;

View File

@ -2342,24 +2342,7 @@ int32_t mndAddVgroupBalanceToTrans(SMnode *pMnode, SVgObj *pVgroup, STrans *pTra
return -1;
}
if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, pVgroup) != 0) {
mError("trans:%d, vgid:%d failed to be balanced to dnode:%d", pTrans->id, vgid, dnodeId);
return -1;
}
mndReleaseDb(pMnode, pDb);
SSdbRaw *pRaw = mndVgroupActionEncode(pVgroup);
if (pRaw == NULL) {
mError("trans:%d, vgid:%d failed to encode action to dnode:%d", pTrans->id, vgid, dnodeId);
return -1;
}
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
sdbFreeRaw(pRaw);
mError("trans:%d, vgid:%d failed to append commit log dnode:%d", pTrans->id, vgid, dnodeId);
return -1;
}
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
} else {
mInfo("trans:%d, vgid:%d cant be balanced to dnode:%d, exist:%d, online:%d", pTrans->id, vgid, dnodeId, exist,
online);

View File

@ -850,12 +850,18 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
taosThreadMutexLock(&pTask->lock);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d",
pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
streamTaskClearCheckInfo(pTask, true);
streamTaskSetStatusReady(pTask);
}
taosThreadMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}

View File

@ -321,6 +321,8 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) {
destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask));
}
pMsgInfo->checkpointId = -1;
pMsgInfo->transId = -1;
pMsgInfo->pData = NULL;
pMsgInfo->dispatchMsgType = 0;
}
@ -332,6 +334,12 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
pTask->msgInfo.dispatchMsgType = pData->type;
if (pData->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
SSDataBlock* p = taosArrayGet(pData->blocks, 0);
pTask->msgInfo.checkpointId = p->info.version;
pTask->msgInfo.transId = p->info.window.ekey;
}
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
@ -950,9 +958,21 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) {
// this message has been sent successfully, let's try next one.
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId) {
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
if (delayDispatch) {
pTask->chkInfo.dispatchCheckpointTrigger = true;
taosThreadMutexLock(&pTask->lock);
// we only set the dispatch msg info for current checkpoint trans
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && pTask->chkInfo.checkpointingId == pTask->msgInfo.checkpointId) {
ASSERT(pTask->chkInfo.transId == pTask->msgInfo.transId);
pTask->chkInfo.dispatchCheckpointTrigger = true;
stDebug("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d confirmed",
pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
} else {
stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64 " transId:%d discard, since expired",
pTask->id.idStr, pTask->msgInfo.checkpointId, pTask->msgInfo.transId);
}
taosThreadMutexUnlock(&pTask->lock);
}
clearBufferedDispatchMsg(pTask);

View File

@ -543,8 +543,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
return;
}
taosThreadMutexLock(&pTask->lock);
pSM->prev.state = pSM->current;
pSM->prev.evt = 0;
@ -552,8 +550,6 @@ void streamTaskSetStatusReady(SStreamTask* pTask) {
pSM->startTs = taosGetTimestampMs();
pSM->pActiveTrans = NULL;
taosArrayClear(pSM->pWaitingEventList);
taosThreadMutexUnlock(&pTask->lock);
}
STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event, __state_trans_fn fn,

View File

@ -29,9 +29,11 @@ class TDTestCase:
tdSql.execute(f'use db_stmt')
tdSql.query("select ts,k from st")
tdSql.checkRows(2)
tdSql.checkRows(self.expected_affected_rows)
tdSql.execute(f'create topic t_unorder_data as select ts,k from st')
tdSql.execute(f'create topic t_unorder_data_none as select i,k from st')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
@ -41,7 +43,7 @@ class TDTestCase:
consumer = Consumer(consumer_dict)
try:
consumer.subscribe(["t_unorder_data"])
consumer.subscribe(["t_unorder_data", "t_unorder_data_none"])
except TmqError:
tdLog.exit(f"subscribe error")
@ -51,18 +53,15 @@ class TDTestCase:
res = consumer.poll(1)
print(res)
if not res:
if cnt == 0:
if cnt == 0 or cnt != 2*self.expected_affected_rows:
tdLog.exit("consume error")
break
val = res.value()
if val is None:
continue
for block in val:
print(block.fetchall(),len(block.fetchall()))
cnt += len(block.fetchall())
if cnt != 2:
tdLog.exit("consume error")
finally:
consumer.close()
@ -110,20 +109,32 @@ class TDTestCase:
params = new_multi_binds(2)
params[0].timestamp((1626861392589, 1626861392590))
params[1].int([3, None])
# print(type(stmt))
tdLog.debug("bind_param_batch start")
stmt.bind_param_batch(params)
tdLog.debug("bind_param_batch end")
stmt.execute()
tdLog.debug("execute end")
conn.execute("flush database %s" % dbname)
params1 = new_multi_binds(2)
params1[0].timestamp((1626861392587,1626861392586))
params1[1].int([None,3])
stmt.bind_param_batch(params1)
stmt.execute()
end = datetime.now()
print("elapsed time: ", end - start)
assert stmt.affected_rows == 2
print(stmt.affected_rows)
self.expected_affected_rows = 4
if stmt.affected_rows != self.expected_affected_rows :
tdLog.exit("affected_rows error")
tdLog.debug("close start")
stmt.close()
# conn.execute("drop database if exists %s" % dbname)
conn.close()