Merge pull request #25380 from taosdata/fix/3_liaohj

fix(stream): add return.
This commit is contained in:
Haojun Liao 2024-04-17 15:04:00 +08:00 committed by GitHub
commit 9fdf13dccc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 5 additions and 12 deletions

View File

@ -1524,8 +1524,6 @@ void* blockDataDestroy(SSDataBlock* pBlock) {
}
if (IS_VAR_DATA_TYPE(pBlock->info.pks[0].type)) {
uInfo("1====free pk:%p, %p pBlock", pBlock->info.pks[0].pData, pBlock);
uInfo("2====free pk:%p, %p pBlock", pBlock->info.pks[1].pData, pBlock);
taosMemoryFreeClear(pBlock->info.pks[0].pData);
taosMemoryFreeClear(pBlock->info.pks[1].pData);
}
@ -1705,10 +1703,6 @@ SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData) {
p->pData = taosMemoryCalloc(1, pDataBlock->info.pks[1].nData);
p->nData = pDataBlock->info.pks[1].nData;
memcpy(p->pData, pDataBlock->info.pks[1].pData, p->nData);
uInfo("===========clone block, with varchar, %p, 0---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[0].pData, pDataBlock, pDataBlock->info.pks[0].pData);
uInfo("===========clone block, with varchar, %p, 1---addr:%p, src:%p, %p", pBlock, pBlock->info.pks[1].pData, pDataBlock, pDataBlock->info.pks[1].pData);
} else {
uInfo("===========clone block without varchar pk, %p, src:%p", pBlock, pDataBlock);
}
if (copyData) {

View File

@ -45,7 +45,7 @@ int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
// 1. the vnode has already been restored.
// 2. the vnode should be the leader.
// 3. the stream is not suspended yet.
if ((!tsDisableStream) && (numOfTasks > 0) /* && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)*/) {
if ((!tsDisableStream) && (numOfTasks > 0)) {
tqScanWalAsync(pTq, true);
}

View File

@ -121,11 +121,7 @@ int32_t pkCompEx(SRowKey* p1, SRowKey* p2) {
return ret > 0 ? 1 : -1;
}
} else {
if (p1->pks[0].val == p2->pks[0].val) {
return 0;
} else {
return tValueCompare(&p1->pks[0], &p2->pks[0]);
}
return tValueCompare(&p1->pks[0], &p2->pks[0]);
}
}
}

View File

@ -591,6 +591,7 @@ int32_t doStreamExecTask(SStreamTask* pTask) {
if (ret == EXEC_AFTER_IDLE) {
ASSERT(pInput == NULL && numOfBlocks == 0);
setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL);
return 0;
} else {
if (pInput == NULL) {
ASSERT(numOfBlocks == 0);

View File

@ -130,6 +130,8 @@ class TDTestCase:
sql = "select count(*) from sta"
# loop wait max 60s to check count is ok
tdLog.info("loop wait result ...")
time.sleep(5)
tdSql.checkDataLoop(0, 0, 100000, sql, loopCount=120, waitTime=0.5)
time.sleep(5)