Merge pull request #14893 from taosdata/feature/3_liaohj
fix(query): set correct fill output column index and fix some memory leaks
This commit is contained in:
commit
b23e08db85
|
@ -161,7 +161,6 @@ typedef struct SQueryTableDataCond {
|
||||||
int64_t endVersion;
|
int64_t endVersion;
|
||||||
} SQueryTableDataCond;
|
} SQueryTableDataCond;
|
||||||
|
|
||||||
void* blockDataDestroy(SSDataBlock* pBlock);
|
|
||||||
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
|
int32_t tEncodeDataBlock(void** buf, const SSDataBlock* pBlock);
|
||||||
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
|
void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock);
|
||||||
|
|
||||||
|
@ -169,19 +168,6 @@ int32_t tEncodeDataBlocks(void** buf, const SArray* blocks);
|
||||||
void* tDecodeDataBlocks(const void* buf, SArray** blocks);
|
void* tDecodeDataBlocks(const void* buf, SArray** blocks);
|
||||||
void colDataDestroy(SColumnInfoData* pColData);
|
void colDataDestroy(SColumnInfoData* pColData);
|
||||||
|
|
||||||
static FORCE_INLINE void blockDestroyInner(SSDataBlock* pBlock) {
|
|
||||||
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
|
|
||||||
for (int32_t i = 0; i < numOfOutput; ++i) {
|
|
||||||
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
|
|
||||||
colDataDestroy(pColInfoData);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pBlock->pDataBlock);
|
|
||||||
taosMemoryFreeClear(pBlock->pBlockAgg);
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { blockDestroyInner(pBlock); }
|
|
||||||
|
|
||||||
//======================================================================================================================
|
//======================================================================================================================
|
||||||
// the following structure shared by parser and executor
|
// the following structure shared by parser and executor
|
||||||
typedef struct SColumn {
|
typedef struct SColumn {
|
||||||
|
|
|
@ -226,8 +226,12 @@ int32_t blockDataKeepFirstNRows(SSDataBlock* pBlock, size_t n);
|
||||||
|
|
||||||
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
|
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src);
|
||||||
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
|
int32_t copyDataBlock(SSDataBlock* dst, const SSDataBlock* src);
|
||||||
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
|
|
||||||
SSDataBlock* createDataBlock();
|
SSDataBlock* createDataBlock();
|
||||||
|
void* blockDataDestroy(SSDataBlock* pBlock);
|
||||||
|
void blockDataFreeRes(SSDataBlock* pBlock);
|
||||||
|
SSDataBlock* createOneDataBlock(const SSDataBlock* pDataBlock, bool copyData);
|
||||||
|
|
||||||
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
|
int32_t blockDataAppendColInfo(SSDataBlock* pBlock, SColumnInfoData* pColInfoData);
|
||||||
|
|
||||||
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
|
SColumnInfoData createColumnInfoData(int16_t type, int32_t bytes, int16_t colId);
|
||||||
|
|
|
@ -340,12 +340,12 @@ static FORCE_INLINE int32_t streamTaskOutput(SStreamTask* pTask, SStreamDataBloc
|
||||||
if (pTask->sinkType == TASK_SINK__TABLE) {
|
if (pTask->sinkType == TASK_SINK__TABLE) {
|
||||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||||
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
pTask->tbSink.tbSinkFunc(pTask, pTask->tbSink.vnode, 0, pBlock->blocks);
|
||||||
taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
|
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
|
||||||
taosFreeQitem(pBlock);
|
taosFreeQitem(pBlock);
|
||||||
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
} else if (pTask->sinkType == TASK_SINK__SMA) {
|
||||||
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
ASSERT(pTask->dispatchType == TASK_DISPATCH__NONE);
|
||||||
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
|
pTask->smaSink.smaSink(pTask->smaSink.vnode, pTask->smaSink.smaId, pBlock->blocks);
|
||||||
taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
|
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
|
||||||
taosFreeQitem(pBlock);
|
taosFreeQitem(pBlock);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
|
ASSERT(pTask->dispatchType != TASK_DISPATCH__NONE);
|
||||||
|
|
|
@ -1197,15 +1197,28 @@ int32_t blockDataEnsureCapacity(SSDataBlock* pDataBlock, uint32_t numOfRows) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void blockDataFreeRes(SSDataBlock* pBlock) {
|
||||||
|
int32_t numOfOutput = taosArrayGetSize(pBlock->pDataBlock);
|
||||||
|
for (int32_t i = 0; i < numOfOutput; ++i) {
|
||||||
|
SColumnInfoData* pColInfoData = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, i);
|
||||||
|
colDataDestroy(pColInfoData);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pBlock->pDataBlock);
|
||||||
|
taosMemoryFreeClear(pBlock->pBlockAgg);
|
||||||
|
memset(&pBlock->info, 0, sizeof(SDataBlockInfo));
|
||||||
|
}
|
||||||
|
|
||||||
void* blockDataDestroy(SSDataBlock* pBlock) {
|
void* blockDataDestroy(SSDataBlock* pBlock) {
|
||||||
if (pBlock == NULL) {
|
if (pBlock == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
blockDestroyInner(pBlock);
|
blockDataFreeRes(pBlock);
|
||||||
taosMemoryFreeClear(pBlock);
|
taosMemoryFreeClear(pBlock);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
int32_t assignOneDataBlock(SSDataBlock* dst, const SSDataBlock* src) {
|
||||||
ASSERT(src != NULL);
|
ASSERT(src != NULL);
|
||||||
|
|
||||||
|
|
|
@ -319,7 +319,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader) {
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
tDeleteSSDataBlock(pBlock);
|
blockDataFreeRes(pBlock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -209,10 +209,10 @@ static void resetDataBlockScanInfo(SHashObj* pTableMap) {
|
||||||
p->iterInit = false;
|
p->iterInit = false;
|
||||||
p->iiter.hasVal = false;
|
p->iiter.hasVal = false;
|
||||||
if (p->iter.iter != NULL) {
|
if (p->iter.iter != NULL) {
|
||||||
tsdbTbDataIterDestroy(p->iter.iter);
|
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(p->delSkyline);
|
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -224,18 +224,15 @@ static void destroyBlockScanInfo(SHashObj* pTableMap) {
|
||||||
p->iiter.hasVal = false;
|
p->iiter.hasVal = false;
|
||||||
|
|
||||||
if (p->iter.iter != NULL) {
|
if (p->iter.iter != NULL) {
|
||||||
tsdbTbDataIterDestroy(p->iter.iter);
|
p->iter.iter = tsdbTbDataIterDestroy(p->iter.iter);
|
||||||
p->iter.iter = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (p->iiter.iter != NULL) {
|
if (p->iiter.iter != NULL) {
|
||||||
tsdbTbDataIterDestroy(p->iiter.iter);
|
p->iiter.iter = tsdbTbDataIterDestroy(p->iiter.iter);
|
||||||
p->iiter.iter = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(p->delSkyline);
|
p->delSkyline = taosArrayDestroy(p->delSkyline);
|
||||||
taosArrayDestroy(p->pBlockList);
|
p->pBlockList = taosArrayDestroy(p->pBlockList);
|
||||||
p->delSkyline = NULL;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
taosHashCleanup(pTableMap);
|
taosHashCleanup(pTableMap);
|
||||||
|
|
|
@ -1193,8 +1193,6 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(pBlock->pDataBlock);
|
|
||||||
|
|
||||||
ASSERT(pInfo->pRes->pDataBlock != NULL);
|
ASSERT(pInfo->pRes->pDataBlock != NULL);
|
||||||
|
|
||||||
// currently only the tbname pseudo column
|
// currently only the tbname pseudo column
|
||||||
|
@ -1202,12 +1200,14 @@ static int32_t setBlockIntoRes(SStreamScanInfo* pInfo, const SSDataBlock* pBlock
|
||||||
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
int32_t code = addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes,
|
||||||
GET_TASKID(pTaskInfo));
|
GET_TASKID(pTaskInfo));
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
blockDataFreeRes((SSDataBlock*) pBlock);
|
||||||
longjmp(pTaskInfo->env, code);
|
longjmp(pTaskInfo->env, code);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
doFilter(pInfo->pCondition, pInfo->pRes);
|
doFilter(pInfo->pCondition, pInfo->pRes);
|
||||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||||
|
blockDataFreeRes((SSDataBlock*) pBlock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -71,13 +71,8 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
||||||
SPoint point1, point2, point;
|
SPoint point1, point2, point;
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
||||||
|
|
||||||
// set the primary timestamp column value
|
// set the primary timestamp column value
|
||||||
int32_t index = pFillInfo->numOfCurrent;
|
int32_t index = pFillInfo->numOfCurrent;
|
||||||
SColumnInfoData* pCol0 = taosArrayGet(pBlock->pDataBlock, pFillInfo->tsSlotId);
|
|
||||||
char* val = colDataGetData(pCol0, index);
|
|
||||||
|
|
||||||
// set the primary timestamp value
|
|
||||||
*(TSKEY*)val = pFillInfo->currentKey;
|
|
||||||
|
|
||||||
// set the other values
|
// set the other values
|
||||||
if (pFillInfo->type == TSDB_FILL_PREV) {
|
if (pFillInfo->type == TSDB_FILL_PREV) {
|
||||||
|
@ -92,7 +87,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
||||||
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
|
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
|
||||||
|
|
||||||
if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
colDataAppend(pDstColInfoData, index, (const char*)&ts, false);
|
colDataAppend(pDstColInfoData, index, (const char*)&pFillInfo->currentKey, false);
|
||||||
} else {
|
} else {
|
||||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||||
doSetVal(pDstColInfoData, index, pKey);
|
doSetVal(pDstColInfoData, index, pKey);
|
||||||
|
@ -101,41 +96,51 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
||||||
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
|
} else if (pFillInfo->type == TSDB_FILL_NEXT) {
|
||||||
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
|
SArray* p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next : pFillInfo->prev;
|
||||||
// todo refactor: start from 0 not 1
|
// todo refactor: start from 0 not 1
|
||||||
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SGroupKeys* pKey = taosArrayGet(p, i);
|
|
||||||
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
|
SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
|
||||||
doSetVal(pDstColInfoData, index, pKey);
|
|
||||||
|
if (pDstColInfoData->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
|
colDataAppend(pDstColInfoData, index, (const char*)&pFillInfo->currentKey, false);
|
||||||
|
} else {
|
||||||
|
SGroupKeys* pKey = taosArrayGet(p, i);
|
||||||
|
doSetVal(pDstColInfoData, index, pKey);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
|
} else if (pFillInfo->type == TSDB_FILL_LINEAR) {
|
||||||
// TODO : linear interpolation supports NULL value
|
// TODO : linear interpolation supports NULL value
|
||||||
if (outOfBound) {
|
if (outOfBound) {
|
||||||
setNullRow(pBlock, pFillInfo->currentKey, index);
|
setNullRow(pBlock, pFillInfo->currentKey, index);
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||||
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
if (TSDB_COL_IS_TAG(pCol->flag)) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
|
|
||||||
|
|
||||||
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
|
int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
|
||||||
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
|
||||||
|
|
||||||
int16_t type = pCol->pExpr->base.resSchema.type;
|
int16_t type = pDstCol->info.type;
|
||||||
|
if (type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
|
colDataAppend(pDstCol, index, (const char*)&pFillInfo->currentKey, false);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
|
SGroupKeys* pKey = taosArrayGet(pFillInfo->prev, i);
|
||||||
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
|
if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
|
||||||
colDataAppendNULL(pDstCol, index);
|
colDataAppendNULL(pDstCol, index);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, 0);
|
SGroupKeys* pKey1 = taosArrayGet(pFillInfo->prev, pFillInfo->tsSlotId);
|
||||||
int64_t prevTs = *(int64_t*)pKey1->pData;
|
|
||||||
|
int64_t prevTs = *(int64_t*)pKey1->pData;
|
||||||
|
int32_t srcSlotId = GET_SRC_SLOT_ID(pCol);
|
||||||
|
|
||||||
SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
|
SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
|
||||||
char* data = colDataGetData(pSrcCol, pFillInfo->index);
|
char* data = colDataGetData(pSrcCol, pFillInfo->index);
|
||||||
|
@ -153,7 +158,7 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
||||||
} else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL
|
} else if (pFillInfo->type == TSDB_FILL_NULL) { // fill with NULL
|
||||||
setNullRow(pBlock, pFillInfo->currentKey, index);
|
setNullRow(pBlock, pFillInfo->currentKey, index);
|
||||||
} else { // fill with user specified value for each column
|
} else { // fill with user specified value for each column
|
||||||
for (int32_t i = 1; i < pFillInfo->numOfCols; ++i) {
|
for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
|
||||||
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
SFillColInfo* pCol = &pFillInfo->pFillCol[i];
|
||||||
if (TSDB_COL_IS_TAG(pCol->flag) /* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
|
if (TSDB_COL_IS_TAG(pCol->flag) /* || IS_VAR_DATA_TYPE(pCol->schema.type)*/) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -176,6 +181,8 @@ static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock*
|
||||||
colDataAppend(pDst, index, (char*)&v, false);
|
colDataAppend(pDst, index, (char*)&v, false);
|
||||||
} else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
} else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
|
||||||
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
|
colDataAppend(pDst, index, (const char*)&pFillInfo->currentKey, false);
|
||||||
|
} else { // varchar/nchar data
|
||||||
|
colDataAppendNULL(pDst, index);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -234,8 +241,7 @@ static void copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SArray
|
||||||
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
|
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
|
||||||
pFillInfo->numOfCurrent = 0;
|
pFillInfo->numOfCurrent = 0;
|
||||||
|
|
||||||
// todo make sure the first column is always the primary timestamp column?
|
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->tsSlotId);
|
||||||
SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, 0);
|
|
||||||
|
|
||||||
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
|
||||||
bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
|
bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
|
||||||
|
|
|
@ -296,8 +296,8 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
|
|
||||||
switch (call->callType) {
|
switch (call->callType) {
|
||||||
case TSDB_UDF_CALL_SCALA_PROC: {
|
case TSDB_UDF_CALL_SCALA_PROC: {
|
||||||
tDeleteSSDataBlock(&call->block);
|
blockDataFreeRes(&call->block);
|
||||||
tDeleteSSDataBlock(&subRsp->resultData);
|
blockDataFreeRes(&subRsp->resultData);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_UDF_CALL_AGG_INIT: {
|
case TSDB_UDF_CALL_AGG_INIT: {
|
||||||
|
@ -305,7 +305,7 @@ void udfdProcessCallRequest(SUvUdfWork *uvUdf, SUdfRequest *request) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_UDF_CALL_AGG_PROC: {
|
case TSDB_UDF_CALL_AGG_PROC: {
|
||||||
tDeleteSSDataBlock(&call->block);
|
blockDataFreeRes(&call->block);
|
||||||
freeUdfInterBuf(&subRsp->resultBuf);
|
freeUdfInterBuf(&subRsp->resultBuf);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -116,7 +116,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
|
||||||
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
|
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
|
||||||
taosFreeQitem(data);
|
taosFreeQitem(data);
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)tDeleteSSDataBlock);
|
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
|
||||||
taosFreeQitem(data);
|
taosFreeQitem(data);
|
||||||
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
streamDataSubmitRefDec((SStreamDataSubmit*)data);
|
streamDataSubmitRefDec((SStreamDataSubmit*)data);
|
||||||
|
|
|
@ -313,7 +313,7 @@ int32_t streamDispatch(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
taosArrayDestroyEx(pBlock->blocks, (FDelete)tDeleteSSDataBlock);
|
taosArrayDestroyEx(pBlock->blocks, (FDelete)blockDataFreeRes);
|
||||||
taosFreeQitem(pBlock);
|
taosFreeQitem(pBlock);
|
||||||
|
|
||||||
tmsgSendReq(pEpSet, &dispatchMsg);
|
tmsgSendReq(pEpSet, &dispatchMsg);
|
||||||
|
|
|
@ -106,7 +106,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||||
qDebug("stream task %d exec end", pTask->taskId);
|
qDebug("stream task %d exec end", pTask->taskId);
|
||||||
|
|
||||||
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
if (pTask->taskStatus == TASK_STATUS__DROPPING) {
|
||||||
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -121,7 +121,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
|
||||||
qRes->blocks = pRes;
|
qRes->blocks = pRes;
|
||||||
if (streamTaskOutput(pTask, qRes) < 0) {
|
if (streamTaskOutput(pTask, qRes) < 0) {
|
||||||
/*streamQueueProcessFail(pTask->inputQueue);*/
|
/*streamQueueProcessFail(pTask->inputQueue);*/
|
||||||
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
taosFreeQitem(qRes);
|
taosFreeQitem(qRes);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -155,7 +155,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
pRes = streamExecForQall(pTask, pRes);
|
pRes = streamExecForQall(pTask, pRes);
|
||||||
if (pRes == NULL) goto FAIL;
|
if (pRes == NULL) goto FAIL;
|
||||||
|
|
||||||
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
|
atomic_store_8(&pTask->execStatus, TASK_EXEC_STATUS__IDLE);
|
||||||
qDebug("stream exec, return result");
|
qDebug("stream exec, return result");
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -163,7 +163,7 @@ int32_t streamExec(SStreamTask* pTask, SMsgCb* pMsgCb) {
|
||||||
continue;
|
continue;
|
||||||
} else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
|
} else if (execStatus == TASK_EXEC_STATUS__EXECUTING) {
|
||||||
ASSERT(taosArrayGetSize(pRes) == 0);
|
ASSERT(taosArrayGetSize(pRes) == 0);
|
||||||
taosArrayDestroyEx(pRes, (FDelete)tDeleteSSDataBlock);
|
taosArrayDestroyEx(pRes, (FDelete)blockDataFreeRes);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
|
Loading…
Reference in New Issue