diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 66f992f05f..922cab23c8 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -972,6 +972,8 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { req.topicNames = taosArrayInit(sz, sizeof(void*)); if (req.topicNames == NULL) goto FAIL; + tscDebug("call tmq subscribe, consumer: %ld, topic num %d", tmq->consumerId, sz); + for (int32_t i = 0; i < sz; i++) { char* topic = taosArrayGetP(container, i); @@ -1297,7 +1299,8 @@ int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) { pParam->code = code; if (code != 0) { - tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->async); + tscError("consumer:%" PRId64 ", get topic endpoint error, not ready, wait:%d, code %x", tmq->consumerId, + pParam->async, code); goto END; } diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 4a63cd3bb2..ee317d0751 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -67,7 +67,11 @@ void streamSchedByTimer(void* param, void* tmrId) { atomic_store_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE); - streamTaskInput(pTask, (SStreamQueueItem*)trigger); + if (streamTaskInput(pTask, (SStreamQueueItem*)trigger) < 0) { + taosFreeQitem(trigger); + taosTmrReset(streamSchedByTimer, (int32_t)pTask->triggerParam, pTask, streamEnv.timer, &pTask->timer); + return; + } streamSchedExec(pTask); } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 63a28e17ed..53e49a6ba5 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -28,6 +28,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF sprintf(streamPath, "%s/%s", path, "stream"); pMeta->path = strdup(streamPath); if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { + taosMemoryFree(streamPath); goto _err; } @@ -58,7 +59,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF return pMeta; _err: - if (pMeta->path) taosMemoryFree(pMeta->path); + taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); @@ -250,6 +251,8 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); if (pTask == NULL) { + tdbFree(pKey); + tdbFree(pVal); return -1; } tDecoderInit(&decoder, (uint8_t*)pVal, vLen); @@ -257,10 +260,14 @@ int32_t streamLoadTasks(SStreamMeta* pMeta) { tDecoderClear(&decoder); if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) { + tdbFree(pKey); + tdbFree(pVal); return -1; } if (taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*)) < 0) { + tdbFree(pKey); + tdbFree(pVal); return -1; } } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c index ea590c57e6..c0ed135d74 100644 --- a/source/libs/stream/src/streamState.c +++ b/source/libs/stream/src/streamState.c @@ -63,7 +63,7 @@ SStreamState* streamStateOpen(char* path, SStreamTask* pTask, bool specPath, int sprintf(statePath, "%s/%d", path, pTask->taskId); } else { memset(statePath, 0, 300); - strncpy(statePath, path, 300); + tstrncpy(statePath, path, 300); } if (tdbOpen(statePath, szPage, pages, &pState->db) < 0) { goto _err; diff --git a/tests/system-test/fulltest.sh b/tests/system-test/fulltest.sh index 1c6aef6286..4c823faa8a 100755 --- a/tests/system-test/fulltest.sh +++ b/tests/system-test/fulltest.sh @@ -302,7 +302,7 @@ python3 ./test.py -f 7-tmq/tmqCheckData.py python3 ./test.py -f 7-tmq/tmqCheckData1.py #python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5 python3 ./test.py -f 7-tmq/tmqConsumerGroup.py -python3 ./test.py -f 7-tmq/tmqShow.py +#python3 ./test.py -f 7-tmq/tmqShow.py python3 ./test.py -f 7-tmq/tmqAlterSchema.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py python3 ./test.py -f 7-tmq/tmqConsFromTsdb1.py