diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 4c4505544b..ad41b9a542 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -101,6 +101,7 @@ typedef struct SParseContext { int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery); bool qIsInsertValuesSql(const char* pStr, size_t length); +bool qParseDbName(const char* pStr, size_t length, char** pDbName); // for async mode int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq); diff --git a/source/client/inc/clientStmt.h b/source/client/inc/clientStmt.h index 64b2acf732..68765ee47a 100644 --- a/source/client/inc/clientStmt.h +++ b/source/client/inc/clientStmt.h @@ -219,6 +219,7 @@ const char *stmtErrstr(TAOS_STMT *stmt); int stmtAffectedRows(TAOS_STMT *stmt); int stmtAffectedRowsOnce(TAOS_STMT *stmt); int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length); +int stmtSetDbName(TAOS_STMT* stmt, const char* dbName); int stmtSetTbName(TAOS_STMT *stmt, const char *tbName); int stmtSetTbTags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags); int stmtGetTagFields(TAOS_STMT *stmt, int *nums, TAOS_FIELD_E **fields); diff --git a/source/client/src/clientStmt.c b/source/client/src/clientStmt.c index c8307e4a13..e8b76d34c2 100644 --- a/source/client/src/clientStmt.c +++ b/source/client/src/clientStmt.c @@ -896,6 +896,12 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) { pStmt->sql.sqlLen = length; pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode; + char* dbName = NULL; + if (qParseDbName(sql, length, &dbName)) { + stmtSetDbName(stmt, dbName); + taosMemoryFreeClear(dbName); + } + return TSDB_CODE_SUCCESS; } @@ -924,6 +930,22 @@ int32_t stmtInitStbInterlaceTableInfo(STscStmt* pStmt) { return TSDB_CODE_SUCCESS; } +int stmtSetDbName(TAOS_STMT* stmt, const char* dbName) { + STscStmt *pStmt = (STscStmt *) stmt; + + STMT_DLOG("start to set dbName: %s", dbName); + + STMT_ERR_RET(stmtCreateRequest(pStmt)); + + // The SQL statement specifies a database name, overriding the previously specified database + taosMemoryFreeClear(pStmt->exec.pRequest->pDb); + pStmt->exec.pRequest->pDb = taosStrdup(dbName); + if (pStmt->exec.pRequest->pDb == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return TSDB_CODE_SUCCESS; +} + int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) { STscStmt* pStmt = (STscStmt*)stmt; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index c4524b4078..dabd4ff455 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -964,7 +964,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) } int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { - SCheckpointTriggerRsp* pRsp = pMsg->pCont; + SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId); if (pTask == NULL) { diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f7c51c43b5..6c512d4859 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2265,13 +2265,16 @@ static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffset doBlockDataPrimaryKeyFilter(pBlock, offset); SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1); + if (pBlock->info.rows < 1) { + return ; + } void* tmp = colDataGetData(pColPk, pBlock->info.rows - 1); val.type = pColPk->info.type; - if(IS_VAR_DATA_TYPE(pColPk->info.type)) { + if (IS_VAR_DATA_TYPE(pColPk->info.type)) { val.pData = taosMemoryMalloc(varDataLen(tmp)); val.nData = varDataLen(tmp); memcpy(val.pData, varDataVal(tmp), varDataLen(tmp)); - }else{ + } else { memcpy(&val.val, tmp, pColPk->info.bytes); } } @@ -2292,13 +2295,19 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { } if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { - SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); + while (1) { + SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); - if (pResult && pResult->info.rows > 0) { - bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader); - processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); - qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid); - return pResult; + if (pResult && pResult->info.rows > 0) { + bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader); + processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset); + qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid); + if (pResult->info.rows > 0) { + return pResult; + } + } else { + break; + } } STableScanInfo* pTSInfo = pInfo->pTableScanOp->info; diff --git a/source/libs/parser/src/parInsertSql.c b/source/libs/parser/src/parInsertSql.c index 6b3a6cbf6b..9393a62e26 100644 --- a/source/libs/parser/src/parInsertSql.c +++ b/source/libs/parser/src/parInsertSql.c @@ -2428,6 +2428,20 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif return buildSyntaxErrMsg(&pCxt->msg, "table_name is expected", pTbName->z); } + // db.? situation,ensure that the only thing following the '.' mark is '?' + char *tbNameAfterDbName = strchr(pTbName->z, '.'); + if ((tbNameAfterDbName != NULL) && (tbNameAfterDbName + 1 - pTbName->z == pTbName->n - 1) && + (*(tbNameAfterDbName + 1) == '?')) { + char *tbName = NULL; + int32_t code = (*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName); + if (TSDB_CODE_SUCCESS == code) { + pTbName->z = tbName; + pTbName->n = strlen(tbName); + } else { + return code; + } + } + *pHasData = true; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/parTokenizer.c b/source/libs/parser/src/parTokenizer.c index ff48f81fc1..53ea19f416 100644 --- a/source/libs/parser/src/parTokenizer.c +++ b/source/libs/parser/src/parTokenizer.c @@ -781,8 +781,9 @@ SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr, bool* pIgnoreC if ('.' == str[*i + t0.n]) { len = tGetToken(&str[*i + t0.n + 1], &type); - // only id and string are valid - if (((TK_NK_STRING != t0.type) && (TK_NK_ID != t0.type)) || ((TK_NK_STRING != type) && (TK_NK_ID != type))) { + // only id、string and ? are valid + if (((TK_NK_STRING != t0.type) && (TK_NK_ID != t0.type)) || + ((TK_NK_STRING != type) && (TK_NK_ID != type) && (TK_NK_QUESTION != type))) { t0.type = TK_NK_ILLEGAL; t0.n = 0; diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index ce57d6f435..b01377010a 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -12807,8 +12807,7 @@ static int32_t parseOneStbRow(SMsgBuf* pMsgBuf, SParseFileContext* pParFileCtx) if (TSDB_CODE_SUCCESS == code) { SArray* aTagNames = pParFileCtx->tagNameFilled ? NULL : pParFileCtx->aTagNames; code = parseTagValue(pMsgBuf, &pParFileCtx->pSql, precision, (SSchema*)pTagSchema, &token, - pParFileCtx->aTagNames, pParFileCtx->aTagVals, &pParFileCtx->pTag); - pParFileCtx->tagNameFilled = true; + aTagNames, pParFileCtx->aTagVals, &pParFileCtx->pTag); } } else { // parse tbname @@ -12826,7 +12825,8 @@ static int32_t parseOneStbRow(SMsgBuf* pMsgBuf, SParseFileContext* pParFileCtx) if (TSDB_CODE_SUCCESS != code) break; } - if (TSDB_CODE_SUCCESS == code) { // may fail to handle json + if (TSDB_CODE_SUCCESS == code) { + pParFileCtx->tagNameFilled = true; code = tTagNew(pParFileCtx->aTagVals, 1, false, &pParFileCtx->pTag); } diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index bdeccafa36..a5cbe90598 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -48,6 +48,47 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) { return false; } +bool qParseDbName(const char* pStr, size_t length, char** pDbName) { + (void) length; + int32_t index = 0; + SToken t; + + if (NULL == pStr) { + *pDbName = NULL; + return false; + } + + t = tStrGetToken((char *) pStr, &index, false, NULL); + if (TK_INSERT != t.type && TK_IMPORT != t.type) { + *pDbName = NULL; + return false; + } + + t = tStrGetToken((char *) pStr, &index, false, NULL); + if (TK_INTO != t.type) { + *pDbName = NULL; + return false; + } + + t = tStrGetToken((char *) pStr, &index, false, NULL); + if (t.n == 0 || t.z == NULL) { + *pDbName = NULL; + return false; + } + char *dotPos = strnchr(t.z, '.', t.n, true); + if (dotPos != NULL) { + int dbNameLen = dotPos - t.z; + *pDbName = taosMemoryMalloc(dbNameLen + 1); + if (*pDbName == NULL) { + return false; + } + strncpy(*pDbName, t.z, dbNameLen); + (*pDbName)[dbNameLen] = '\0'; + return true; + } + return false; +} + static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) { int32_t code = authenticate(pCxt, pQuery, pMetaCache); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index a82628ea52..2ee2abe65e 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -25,15 +25,12 @@ static int32_t deleteCheckpoint(const char* id); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId); + int32_t transId, int32_t srcTaskId); static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList); static void checkpointTriggerMonitorFn(void* param, void* tmrId); -static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId); - SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, - int32_t transId) { + int32_t transId, int32_t srcTaskId) { SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); if (pChkpoint == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -41,6 +38,10 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint } pChkpoint->type = checkpointType; + if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) { + pChkpoint->srcTaskId = srcTaskId; + ASSERT(srcTaskId != 0); + } SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); if (pBlock == NULL) { @@ -64,8 +65,9 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint return pChkpoint; } -int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId) { - SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId); +int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId, + int32_t srcTaskId) { + SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId); if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) { return TSDB_CODE_OUT_OF_MEMORY; @@ -90,7 +92,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // and this is the last item in the inputQ. - return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId); + return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId, -1); } int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) { @@ -102,15 +104,16 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri return TSDB_CODE_SUCCESS; } - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId, + pRsp->upstreamTaskId); return TSDB_CODE_SUCCESS; } int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, SRpcHandleInfo* pRpcInfo, int32_t code) { int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); + void* pBuf = rpcMallocCont(size); - void* pBuf = rpcMallocCont(size); SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId); @@ -118,6 +121,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId pRsp->streamId = pTask->id.streamId; pRsp->upstreamTaskId = pTask->id.taskId; pRsp->taskId = dstTaskId; + pRsp->rspCode = code; if (code == TSDB_CODE_SUCCESS) { pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId; @@ -127,9 +131,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId pRsp->transId = -1; } - pRsp->rspCode = code; - - SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = size, .info = *pRpcInfo}; + SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo}; tmsgSendRsp(&rspMsg); return 0; } @@ -267,7 +269,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); continueDispatchCheckpointTriggerBlock(pBlock, pTask); } else { // only one task exists, no need to dispatch downstream info - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1); streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { @@ -364,9 +366,8 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId taosThreadMutexUnlock(&pInfo->lock); if (notReady == 0) { - stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", - id); - appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId); + stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); + appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); } return 0; @@ -706,11 +707,10 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) { } } else { // clear the checkpoint info if failed taosThreadMutexLock(&pTask->lock); - streamTaskClearCheckInfo(pTask, false); + streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info taosThreadMutexUnlock(&pTask->lock); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); - streamTaskSetFailedCheckpointId(pTask); stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 789cb5cbcf..83e73e8c88 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -40,6 +40,20 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) { pMsg->contLen = contLen; } +static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) { + pInfo->startTs = taosGetTimestampMs(); + pInfo->rspTs = -1; + pInfo->msgId = msgId; +} + +static void clearDispatchInfo(SDispatchMsgInfo* pInfo) { + pInfo->startTs = -1; + pInfo->msgId = -1; + pInfo->rspTs = -1; +} + +static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; } + static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks, int64_t dstTaskId, int32_t type) { pReq->streamId = pTask->id.streamId; @@ -225,12 +239,15 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) { destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask)); } + taosThreadMutexLock(&pMsgInfo->lock); + pMsgInfo->checkpointId = -1; pMsgInfo->transId = -1; pMsgInfo->pData = NULL; pMsgInfo->dispatchMsgType = 0; - taosThreadMutexLock(&pMsgInfo->lock); + clearDispatchInfo(pMsgInfo); + taosArrayClear(pTask->msgInfo.pSendInfo); taosThreadMutexUnlock(&pMsgInfo->lock); } @@ -643,20 +660,6 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S return 0; } -static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) { - pInfo->startTs = taosGetTimestampMs(); - pInfo->rspTs = -1; - pInfo->msgId = msgId; -} - -static void clearDispatchInfo(SDispatchMsgInfo* pInfo) { - pInfo->startTs = -1; - pInfo->msgId = -1; - pInfo->rspTs = -1; -} - -static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; } - int32_t streamDispatchStreamBlock(SStreamTask* pTask) { ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); @@ -699,7 +702,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { type == STREAM_INPUT__TRANS_STATE); pTask->execInfo.dispatch += 1; + + taosThreadMutexLock(&pTask->msgInfo.lock); initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch); + taosThreadMutexUnlock(&pTask->msgInfo.lock); int32_t code = doBuildDispatchMsg(pTask, pBlock); if (code == 0) { @@ -1222,10 +1228,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; - int32_t msgId = pMsgInfo->msgId; int64_t now = taosGetTimestampMs(); int32_t totalRsp = 0; + taosThreadMutexLock(&pMsgInfo->lock); + int32_t msgId = pMsgInfo->msgId; + taosThreadMutexUnlock(&pMsgInfo->lock); + // follower not handle the dispatch rsp if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, diff --git a/source/libs/stream/src/streamHb.c b/source/libs/stream/src/streamHb.c index a4bd43e6e6..7bee454bd4 100644 --- a/source/libs/stream/src/streamHb.c +++ b/source/libs/stream/src/streamHb.c @@ -175,15 +175,19 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) { entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); } - if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) { - entry.checkpointInfo.failed = - ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; - entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId; - entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId; + SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo; + if (p->activeId != 0) { + entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0; + entry.checkpointInfo.activeId = p->activeId; + entry.checkpointInfo.activeTransId = p->transId; if (entry.checkpointInfo.failed) { - stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d", (*pTask)->id.idStr, - (*pTask)->chkInfo.pActiveInfo->transId); + stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d, clear the active checkpointInfo", + (*pTask)->id.idStr, p->transId); + + taosThreadMutexLock(&(*pTask)->lock); + streamTaskClearCheckInfo((*pTask), true); + taosThreadMutexUnlock(&(*pTask)->lock); } } diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4052f6dc49..e6bb6bf90c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1067,10 +1067,10 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) { void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { pInfo->activeId = 0; // clear the checkpoint id - pInfo->failedId = 0; pInfo->transId = 0; pInfo->allUpstreamTriggerRecv = 0; pInfo->dispatchTrigger = false; + pInfo->failedId = 0; taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pCheckpointReadyRecvList); diff --git a/tests/system-test/7-tmq/tmq_primary_key.py b/tests/system-test/7-tmq/tmq_primary_key.py index 7d0d3da4fc..80888ddbe6 100644 --- a/tests/system-test/7-tmq/tmq_primary_key.py +++ b/tests/system-test/7-tmq/tmq_primary_key.py @@ -12,6 +12,7 @@ from util.dnodes import * from util.common import * from taos.tmq import * from util.dnodes import * +from util.cluster import * import datetime sys.path.append("./7-tmq") @@ -137,6 +138,7 @@ class TDTestCase: print("index:" + str(index)) finally: consumer.close() + tdSql.execute(f'drop topic topic_pk_query;') def primaryKeyTestIntStable(self): print("==============Case 2: primary key test int for stable") @@ -247,6 +249,7 @@ class TDTestCase: print("index:" + str(index)) finally: consumer.close() + tdSql.execute(f'drop topic topic_pk_stable;') def primaryKeyTestInt(self): print("==============Case 3: primary key test int for db") @@ -356,6 +359,7 @@ class TDTestCase: print("index:" + str(index)) finally: consumer.close() + tdSql.execute(f'drop topic topic_in;') def primaryKeyTestString(self): print("==============Case 4: primary key test string for db") @@ -468,12 +472,90 @@ class TDTestCase: print("index:" + str(index)) finally: consumer.close() + tdSql.execute(f'drop topic topic_pk_string;') + def primaryKeyTestTD_30755(self): + print("==============Case 5: primary key test td-30755 for query") + tdSql.execute(f'create database if not exists db_pk_query_30755 vgroups 1 wal_retention_period 3600;') + tdSql.execute(f'use db_pk_query_30755;') + tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);') + for i in range(0, 100000): + tdSql.execute(f'insert into pk values(1669092069068, {i}, 1);') + tdSql.execute(f'flush database db_pk_query_30755') + + tdSql.execute(f'create topic topic_pk_query_30755 as select * from pk') + + consumer_dict = { + "group.id": "g1", + "td.connect.user": "root", + "td.connect.pass": "taosdata", + "auto.offset.reset": "earliest", + "enable.auto.commit": "false", + "experimental.snapshot.enable": "true", + } + consumer = Consumer(consumer_dict) + + try: + consumer.subscribe(["topic_pk_query_30755"]) + except TmqError: + tdLog.exit(f"subscribe error") + + firstConsume = 0 + try: + while firstConsume < 50000: + res = consumer.poll(1) + if not res: + continue + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + firstConsume += len(data) + consumer.commit(res) + finally: + consumer.close() + + tdSql.query(f'show subscriptions;') + sub = tdSql.getData(0, 4); + print(sub) + if not sub.startswith("tsdb"): + tdLog.exit(f"show subscriptions error") + + tdDnodes.stop(1) + time.sleep(2) + tdDnodes.start(1) + + consumer = Consumer(consumer_dict) + + tdSql.execute(f'use db_pk_query_30755;') + try: + consumer.subscribe(["topic_pk_query_30755"]) + except TmqError: + tdLog.exit(f"subscribe error") + + secondConsume = 0 + try: + while firstConsume + secondConsume < 100000: + res = consumer.poll(1) + if not res: + continue + val = res.value() + if val is None: + continue + for block in val: + data = block.fetchall() + secondConsume += len(data) + consumer.commit(res) + finally: + consumer.close() + tdSql.execute(f'drop topic topic_pk_query_30755;') def run(self): self.primaryKeyTestIntQuery() self.primaryKeyTestIntStable() self.primaryKeyTestInt() self.primaryKeyTestString() + self.primaryKeyTestTD_30755() def stop(self): tdSql.close()