other: merge 3.0

This commit is contained in:
Haojun Liao 2024-07-01 13:32:30 +08:00
commit f09be802ea
14 changed files with 241 additions and 57 deletions

View File

@ -101,6 +101,7 @@ typedef struct SParseContext {
int32_t qParseSql(SParseContext* pCxt, SQuery** pQuery);
bool qIsInsertValuesSql(const char* pStr, size_t length);
bool qParseDbName(const char* pStr, size_t length, char** pDbName);
// for async mode
int32_t qParseSqlSyntax(SParseContext* pCxt, SQuery** pQuery, struct SCatalogReq* pCatalogReq);

View File

@ -219,6 +219,7 @@ const char *stmtErrstr(TAOS_STMT *stmt);
int stmtAffectedRows(TAOS_STMT *stmt);
int stmtAffectedRowsOnce(TAOS_STMT *stmt);
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
int stmtSetDbName(TAOS_STMT* stmt, const char* dbName);
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName);
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
int stmtGetTagFields(TAOS_STMT *stmt, int *nums, TAOS_FIELD_E **fields);

View File

@ -896,6 +896,12 @@ int stmtPrepare(TAOS_STMT* stmt, const char* sql, unsigned long length) {
pStmt->sql.sqlLen = length;
pStmt->sql.stbInterlaceMode = pStmt->stbInterlaceMode;
char* dbName = NULL;
if (qParseDbName(sql, length, &dbName)) {
stmtSetDbName(stmt, dbName);
taosMemoryFreeClear(dbName);
}
return TSDB_CODE_SUCCESS;
}
@ -924,6 +930,22 @@ int32_t stmtInitStbInterlaceTableInfo(STscStmt* pStmt) {
return TSDB_CODE_SUCCESS;
}
int stmtSetDbName(TAOS_STMT* stmt, const char* dbName) {
STscStmt *pStmt = (STscStmt *) stmt;
STMT_DLOG("start to set dbName: %s", dbName);
STMT_ERR_RET(stmtCreateRequest(pStmt));
// The SQL statement specifies a database name, overriding the previously specified database
taosMemoryFreeClear(pStmt->exec.pRequest->pDb);
pStmt->exec.pRequest->pDb = taosStrdup(dbName);
if (pStmt->exec.pRequest->pDb == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
return TSDB_CODE_SUCCESS;
}
int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
STscStmt* pStmt = (STscStmt*)stmt;

View File

@ -964,7 +964,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
}
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SCheckpointTriggerRsp* pRsp = pMsg->pCont;
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId);
if (pTask == NULL) {

View File

@ -2265,13 +2265,16 @@ static void processPrimaryKey(SSDataBlock* pBlock, bool hasPrimaryKey, STqOffset
doBlockDataPrimaryKeyFilter(pBlock, offset);
SColumnInfoData* pColPk = taosArrayGet(pBlock->pDataBlock, 1);
if (pBlock->info.rows < 1) {
return ;
}
void* tmp = colDataGetData(pColPk, pBlock->info.rows - 1);
val.type = pColPk->info.type;
if(IS_VAR_DATA_TYPE(pColPk->info.type)) {
if (IS_VAR_DATA_TYPE(pColPk->info.type)) {
val.pData = taosMemoryMalloc(varDataLen(tmp));
val.nData = varDataLen(tmp);
memcpy(val.pData, varDataVal(tmp), varDataLen(tmp));
}else{
} else {
memcpy(&val.val, tmp, pColPk->info.bytes);
}
}
@ -2292,14 +2295,20 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
}
if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) {
while (1) {
SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp);
if (pResult && pResult->info.rows > 0) {
bool hasPrimaryKey = pAPI->tqReaderFn.tqGetTablePrimaryKey(pInfo->tqReader);
processPrimaryKey(pResult, hasPrimaryKey, &pTaskInfo->streamInfo.currentOffset);
qDebug("tmqsnap doQueueScan get data uid:%" PRId64 "", pResult->info.id.uid);
if (pResult->info.rows > 0) {
return pResult;
}
} else {
break;
}
}
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
pAPI->tsdReader.tsdReaderClose(pTSInfo->base.dataReader);

View File

@ -2428,6 +2428,20 @@ static int32_t checkTableClauseFirstToken(SInsertParseContext* pCxt, SVnodeModif
return buildSyntaxErrMsg(&pCxt->msg, "table_name is expected", pTbName->z);
}
// db.? situationensure that the only thing following the '.' mark is '?'
char *tbNameAfterDbName = strchr(pTbName->z, '.');
if ((tbNameAfterDbName != NULL) && (tbNameAfterDbName + 1 - pTbName->z == pTbName->n - 1) &&
(*(tbNameAfterDbName + 1) == '?')) {
char *tbName = NULL;
int32_t code = (*pCxt->pComCxt->pStmtCb->getTbNameFn)(pCxt->pComCxt->pStmtCb->pStmt, &tbName);
if (TSDB_CODE_SUCCESS == code) {
pTbName->z = tbName;
pTbName->n = strlen(tbName);
} else {
return code;
}
}
*pHasData = true;
return TSDB_CODE_SUCCESS;
}

View File

@ -781,8 +781,9 @@ SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr, bool* pIgnoreC
if ('.' == str[*i + t0.n]) {
len = tGetToken(&str[*i + t0.n + 1], &type);
// only id and string are valid
if (((TK_NK_STRING != t0.type) && (TK_NK_ID != t0.type)) || ((TK_NK_STRING != type) && (TK_NK_ID != type))) {
// only id、string and ? are valid
if (((TK_NK_STRING != t0.type) && (TK_NK_ID != t0.type)) ||
((TK_NK_STRING != type) && (TK_NK_ID != type) && (TK_NK_QUESTION != type))) {
t0.type = TK_NK_ILLEGAL;
t0.n = 0;

View File

@ -12807,8 +12807,7 @@ static int32_t parseOneStbRow(SMsgBuf* pMsgBuf, SParseFileContext* pParFileCtx)
if (TSDB_CODE_SUCCESS == code) {
SArray* aTagNames = pParFileCtx->tagNameFilled ? NULL : pParFileCtx->aTagNames;
code = parseTagValue(pMsgBuf, &pParFileCtx->pSql, precision, (SSchema*)pTagSchema, &token,
pParFileCtx->aTagNames, pParFileCtx->aTagVals, &pParFileCtx->pTag);
pParFileCtx->tagNameFilled = true;
aTagNames, pParFileCtx->aTagVals, &pParFileCtx->pTag);
}
} else {
// parse tbname
@ -12826,7 +12825,8 @@ static int32_t parseOneStbRow(SMsgBuf* pMsgBuf, SParseFileContext* pParFileCtx)
if (TSDB_CODE_SUCCESS != code) break;
}
if (TSDB_CODE_SUCCESS == code) { // may fail to handle json
if (TSDB_CODE_SUCCESS == code) {
pParFileCtx->tagNameFilled = true;
code = tTagNew(pParFileCtx->aTagVals, 1, false, &pParFileCtx->pTag);
}

View File

@ -48,6 +48,47 @@ bool qIsInsertValuesSql(const char* pStr, size_t length) {
return false;
}
bool qParseDbName(const char* pStr, size_t length, char** pDbName) {
(void) length;
int32_t index = 0;
SToken t;
if (NULL == pStr) {
*pDbName = NULL;
return false;
}
t = tStrGetToken((char *) pStr, &index, false, NULL);
if (TK_INSERT != t.type && TK_IMPORT != t.type) {
*pDbName = NULL;
return false;
}
t = tStrGetToken((char *) pStr, &index, false, NULL);
if (TK_INTO != t.type) {
*pDbName = NULL;
return false;
}
t = tStrGetToken((char *) pStr, &index, false, NULL);
if (t.n == 0 || t.z == NULL) {
*pDbName = NULL;
return false;
}
char *dotPos = strnchr(t.z, '.', t.n, true);
if (dotPos != NULL) {
int dbNameLen = dotPos - t.z;
*pDbName = taosMemoryMalloc(dbNameLen + 1);
if (*pDbName == NULL) {
return false;
}
strncpy(*pDbName, t.z, dbNameLen);
(*pDbName)[dbNameLen] = '\0';
return true;
}
return false;
}
static int32_t analyseSemantic(SParseContext* pCxt, SQuery* pQuery, SParseMetaCache* pMetaCache) {
int32_t code = authenticate(pCxt, pQuery, pMetaCache);

View File

@ -25,15 +25,12 @@ static int32_t deleteCheckpoint(const char* id);
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId);
int32_t transId, int32_t srcTaskId);
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
static void checkpointTriggerMonitorFn(void* param, void* tmrId);
static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId);
SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId) {
int32_t transId, int32_t srcTaskId) {
SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -41,6 +38,10 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint
}
pChkpoint->type = checkpointType;
if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) {
pChkpoint->srcTaskId = srcTaskId;
ASSERT(srcTaskId != 0);
}
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
@ -64,8 +65,9 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint
return pChkpoint;
}
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId) {
SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId);
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
int32_t srcTaskId) {
SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -90,7 +92,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// and this is the last item in the inputQ.
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId);
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId, -1);
}
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
@ -102,15 +104,16 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
return TSDB_CODE_SUCCESS;
}
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
pRsp->upstreamTaskId);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
SRpcHandleInfo* pRpcInfo, int32_t code) {
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
void* pBuf = rpcMallocCont(size);
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId);
@ -118,6 +121,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
pRsp->streamId = pTask->id.streamId;
pRsp->upstreamTaskId = pTask->id.taskId;
pRsp->taskId = dstTaskId;
pRsp->rspCode = code;
if (code == TSDB_CODE_SUCCESS) {
pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
@ -127,9 +131,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
pRsp->transId = -1;
}
pRsp->rspCode = code;
SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = size, .info = *pRpcInfo};
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
}
@ -267,7 +269,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointTriggerBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1);
streamFreeQitem((SStreamQueueItem*)pBlock);
}
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
@ -364,9 +366,8 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
taosThreadMutexUnlock(&pInfo->lock);
if (notReady == 0) {
stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task",
id);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId);
stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
}
return 0;
@ -706,11 +707,10 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
}
} else { // clear the checkpoint info if failed
taosThreadMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, false);
streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info
taosThreadMutexUnlock(&pTask->lock);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
streamTaskSetFailedCheckpointId(pTask);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
}

View File

@ -40,6 +40,20 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->contLen = contLen;
}
static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) {
pInfo->startTs = taosGetTimestampMs();
pInfo->rspTs = -1;
pInfo->msgId = msgId;
}
static void clearDispatchInfo(SDispatchMsgInfo* pInfo) {
pInfo->startTs = -1;
pInfo->msgId = -1;
pInfo->rspTs = -1;
}
static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; }
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
int32_t numOfBlocks, int64_t dstTaskId, int32_t type) {
pReq->streamId = pTask->id.streamId;
@ -225,12 +239,15 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) {
destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask));
}
taosThreadMutexLock(&pMsgInfo->lock);
pMsgInfo->checkpointId = -1;
pMsgInfo->transId = -1;
pMsgInfo->pData = NULL;
pMsgInfo->dispatchMsgType = 0;
taosThreadMutexLock(&pMsgInfo->lock);
clearDispatchInfo(pMsgInfo);
taosArrayClear(pTask->msgInfo.pSendInfo);
taosThreadMutexUnlock(&pMsgInfo->lock);
}
@ -643,20 +660,6 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
return 0;
}
static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) {
pInfo->startTs = taosGetTimestampMs();
pInfo->rspTs = -1;
pInfo->msgId = msgId;
}
static void clearDispatchInfo(SDispatchMsgInfo* pInfo) {
pInfo->startTs = -1;
pInfo->msgId = -1;
pInfo->rspTs = -1;
}
static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; }
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
@ -699,7 +702,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
type == STREAM_INPUT__TRANS_STATE);
pTask->execInfo.dispatch += 1;
taosThreadMutexLock(&pTask->msgInfo.lock);
initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch);
taosThreadMutexUnlock(&pTask->msgInfo.lock);
int32_t code = doBuildDispatchMsg(pTask, pBlock);
if (code == 0) {
@ -1222,10 +1228,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
int32_t msgId = pMsgInfo->msgId;
int64_t now = taosGetTimestampMs();
int32_t totalRsp = 0;
taosThreadMutexLock(&pMsgInfo->lock);
int32_t msgId = pMsgInfo->msgId;
taosThreadMutexUnlock(&pMsgInfo->lock);
// follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id,

View File

@ -175,15 +175,19 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) {
entry.checkpointInfo.failed =
((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId;
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId;
SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo;
if (p->activeId != 0) {
entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = p->activeId;
entry.checkpointInfo.activeTransId = p->transId;
if (entry.checkpointInfo.failed) {
stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d", (*pTask)->id.idStr,
(*pTask)->chkInfo.pActiveInfo->transId);
stInfo("s-task:%s set kill checkpoint trans in hbMsg, transId:%d, clear the active checkpointInfo",
(*pTask)->id.idStr, p->transId);
taosThreadMutexLock(&(*pTask)->lock);
streamTaskClearCheckInfo((*pTask), true);
taosThreadMutexUnlock(&(*pTask)->lock);
}
}

View File

@ -1067,10 +1067,10 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
pInfo->activeId = 0; // clear the checkpoint id
pInfo->failedId = 0;
pInfo->transId = 0;
pInfo->allUpstreamTriggerRecv = 0;
pInfo->dispatchTrigger = false;
pInfo->failedId = 0;
taosArrayClear(pInfo->pDispatchTriggerList);
taosArrayClear(pInfo->pCheckpointReadyRecvList);

View File

@ -12,6 +12,7 @@ from util.dnodes import *
from util.common import *
from taos.tmq import *
from util.dnodes import *
from util.cluster import *
import datetime
sys.path.append("./7-tmq")
@ -137,6 +138,7 @@ class TDTestCase:
print("index:" + str(index))
finally:
consumer.close()
tdSql.execute(f'drop topic topic_pk_query;')
def primaryKeyTestIntStable(self):
print("==============Case 2: primary key test int for stable")
@ -247,6 +249,7 @@ class TDTestCase:
print("index:" + str(index))
finally:
consumer.close()
tdSql.execute(f'drop topic topic_pk_stable;')
def primaryKeyTestInt(self):
print("==============Case 3: primary key test int for db")
@ -356,6 +359,7 @@ class TDTestCase:
print("index:" + str(index))
finally:
consumer.close()
tdSql.execute(f'drop topic topic_in;')
def primaryKeyTestString(self):
print("==============Case 4: primary key test string for db")
@ -468,12 +472,90 @@ class TDTestCase:
print("index:" + str(index))
finally:
consumer.close()
tdSql.execute(f'drop topic topic_pk_string;')
def primaryKeyTestTD_30755(self):
print("==============Case 5: primary key test td-30755 for query")
tdSql.execute(f'create database if not exists db_pk_query_30755 vgroups 1 wal_retention_period 3600;')
tdSql.execute(f'use db_pk_query_30755;')
tdSql.execute(f'create table if not exists pk (ts timestamp, c1 int primary key, c2 int);')
for i in range(0, 100000):
tdSql.execute(f'insert into pk values(1669092069068, {i}, 1);')
tdSql.execute(f'flush database db_pk_query_30755')
tdSql.execute(f'create topic topic_pk_query_30755 as select * from pk')
consumer_dict = {
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
"experimental.snapshot.enable": "true",
}
consumer = Consumer(consumer_dict)
try:
consumer.subscribe(["topic_pk_query_30755"])
except TmqError:
tdLog.exit(f"subscribe error")
firstConsume = 0
try:
while firstConsume < 50000:
res = consumer.poll(1)
if not res:
continue
val = res.value()
if val is None:
continue
for block in val:
data = block.fetchall()
firstConsume += len(data)
consumer.commit(res)
finally:
consumer.close()
tdSql.query(f'show subscriptions;')
sub = tdSql.getData(0, 4);
print(sub)
if not sub.startswith("tsdb"):
tdLog.exit(f"show subscriptions error")
tdDnodes.stop(1)
time.sleep(2)
tdDnodes.start(1)
consumer = Consumer(consumer_dict)
tdSql.execute(f'use db_pk_query_30755;')
try:
consumer.subscribe(["topic_pk_query_30755"])
except TmqError:
tdLog.exit(f"subscribe error")
secondConsume = 0
try:
while firstConsume + secondConsume < 100000:
res = consumer.poll(1)
if not res:
continue
val = res.value()
if val is None:
continue
for block in val:
data = block.fetchall()
secondConsume += len(data)
consumer.commit(res)
finally:
consumer.close()
tdSql.execute(f'drop topic topic_pk_query_30755;')
def run(self):
self.primaryKeyTestIntQuery()
self.primaryKeyTestIntStable()
self.primaryKeyTestInt()
self.primaryKeyTestString()
self.primaryKeyTestTD_30755()
def stop(self):
tdSql.close()