fix(stream):check release function of stream

This commit is contained in:
54liuyao 2024-10-10 17:17:21 +08:00
parent 9f72bd7e70
commit 1ce919b5b7
2 changed files with 7 additions and 2 deletions

View File

@ -2497,6 +2497,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
// for debug // for debug
int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) { int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf, const char* taskIdStr) {
int32_t lino = 0;
int32_t size = 2048 * 1024; int32_t size = 2048 * 1024;
int32_t code = 0; int32_t code = 0;
char* dumpBuf = NULL; char* dumpBuf = NULL;
@ -2530,6 +2531,7 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k); SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, k);
if (pColInfoData == NULL) { if (pColInfoData == NULL) {
code = terrno; code = terrno;
lino = __LINE__;
goto _exit; goto _exit;
} }
@ -2609,6 +2611,7 @@ int32_t dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf
code = taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf); code = taosUcs4ToMbs((TdUcs4*)varDataVal(pData), dataSize, pBuf);
if (code < 0) { if (code < 0) {
uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code)); uError("func %s failed to convert to ucs charset since %s", __func__, tstrerror(code));
lino = __LINE__;
goto _exit; goto _exit;
} }
len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf); len += snprintf(dumpBuf + len, size - len, " %15s |", pBuf);
@ -2626,7 +2629,7 @@ _exit:
*pDataBuf = dumpBuf; *pDataBuf = dumpBuf;
dumpBuf = NULL; dumpBuf = NULL;
} else { } else {
uError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); uError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
if (dumpBuf) { if (dumpBuf) {
taosMemoryFree(dumpBuf); taosMemoryFree(dumpBuf);
} }

View File

@ -1635,7 +1635,9 @@ int32_t getTableListInfo(const SExecTaskInfo* pTaskInfo, SArray** pList) {
int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) { int32_t qStreamOperatorReleaseState(qTaskInfo_t tInfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo; SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tInfo;
if (pTaskInfo->pRoot->fpSet.releaseStreamStateFn != NULL) {
pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot); pTaskInfo->pRoot->fpSet.releaseStreamStateFn(pTaskInfo->pRoot);
}
return 0; return 0;
} }